|
|
@ -222,7 +222,6 @@ func TestStore_Notify(t *testing.T) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
req.client.QueueEvents(
|
|
|
|
req.client.QueueEvents(
|
|
|
|
newEndOfSnapshotEvent(2),
|
|
|
|
newEndOfSnapshotEvent(2),
|
|
|
|
newEventServiceHealthRegister(10, 1, "srv1"),
|
|
|
|
|
|
|
|
newEventServiceHealthRegister(22, 2, "srv1"))
|
|
|
|
newEventServiceHealthRegister(22, 2, "srv1"))
|
|
|
|
|
|
|
|
|
|
|
|
cID := "correlate"
|
|
|
|
cID := "correlate"
|
|
|
@ -388,7 +387,56 @@ func assertRequestCount(t testingT, s *Store, req Request, expected int) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: test expiration
|
|
|
|
func TestStore_Run_ExpiresEntries(t *testing.T) {
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ttl := 10 * time.Millisecond
|
|
|
|
|
|
|
|
patchIdleTTL(t, ttl)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
store := NewStore(hclog.New(nil))
|
|
|
|
|
|
|
|
go store.Run(ctx)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
req := &fakeRequest{
|
|
|
|
|
|
|
|
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
req.client.QueueEvents(newEndOfSnapshotEvent(2))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cID := "correlate"
|
|
|
|
|
|
|
|
ch1 := make(chan cache.UpdateEvent)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
reqCtx, reqCancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
defer reqCancel()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
require.NoError(t, store.Notify(reqCtx, req, cID, ch1))
|
|
|
|
|
|
|
|
assertRequestCount(t, store, req, 1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Get a copy of the entry so that we can check it was expired later
|
|
|
|
|
|
|
|
store.lock.Lock()
|
|
|
|
|
|
|
|
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
|
|
|
|
|
|
|
|
store.lock.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
reqCancel()
|
|
|
|
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
|
|
|
|
|
|
assertRequestCount(r, store, req, 0)
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// wait for the entry to expire, with lots of buffer
|
|
|
|
|
|
|
|
time.Sleep(3 * ttl)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
store.lock.Lock()
|
|
|
|
|
|
|
|
defer store.lock.Unlock()
|
|
|
|
|
|
|
|
require.Len(t, store.byKey, 0)
|
|
|
|
|
|
|
|
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func patchIdleTTL(t *testing.T, ttl time.Duration) {
|
|
|
|
|
|
|
|
orig := idleTTL
|
|
|
|
|
|
|
|
idleTTL = ttl
|
|
|
|
|
|
|
|
t.Cleanup(func() {
|
|
|
|
|
|
|
|
idleTTL = orig
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|
|
|
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|
|
|
t.Helper()
|
|
|
|
t.Helper()
|
|
|
|