diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 4485e0f368..f7dd61fb62 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -6,11 +6,14 @@ import ( "sync" "time" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/lib/ttlcache" ) type Store struct { + logger hclog.Logger lock sync.RWMutex byKey map[string]entry expiryHeap *ttlcache.ExpiryHeap @@ -25,8 +28,9 @@ type entry struct { requests int } -func NewStore() *Store { +func NewStore(logger hclog.Logger) *Store { return &Store{ + logger: logger, byKey: make(map[string]entry), expiryHeap: ttlcache.NewExpiryHeap(), } @@ -117,8 +121,13 @@ func (s *Store) Notify( return case err != nil: // TODO: cache.Notify sends errors on updateCh, should this do the same? - // It seems like only fetch errors would ever get sent along. - // TODO: log warning + // It seems like only fetch errors would ever get sent along and eventually + // logged, so sending may not provide any benefit here. + + s.logger.Warn("handling error in Store.Notify", + "error", err, + "request-type", req.Type(), + "index", index) continue } diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 75c411fd8d..12a960385e 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -10,16 +10,18 @@ import ( "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/sdk/testutil/retry" ) func TestStore_Get(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - store := NewStore() + store := NewStore(hclog.New(nil)) go store.Run(ctx) req := &fakeRequest{ @@ -126,12 +128,17 @@ type resultOrError struct { type fakeRequest struct { index uint64 + key string client *TestStreamingClient } func (r *fakeRequest) CacheInfo() cache.RequestInfo { + key := r.key + if key == "" { + key = "key" + } return cache.RequestInfo{ - Key: "key", + Key: key, Token: "abcd", Datacenter: "dc1", Timeout: 4 * time.Second, @@ -203,15 +210,186 @@ func (f *fakeView) Reset() { f.srvs = make(map[string]*pbservice.CheckServiceNode) } -// TODO: Get with Notify - func TestStore_Notify(t *testing.T) { - // TODO: Notify with no existing entry - // TODO: Notify with Get - // TODO: Notify multiple times same key - // TODO: Notify no update if index is not past MinIndex. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := NewStore(hclog.New(nil)) + go store.Run(ctx) + + req := &fakeRequest{ + client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace), + } + req.client.QueueEvents( + newEndOfSnapshotEvent(2), + newEventServiceHealthRegister(10, 1, "srv1"), + newEventServiceHealthRegister(22, 2, "srv1")) + + cID := "correlate" + ch := make(chan cache.UpdateEvent) + + err := store.Notify(ctx, req, cID, ch) + require.NoError(t, err) + + runStep(t, "from empty store, starts materializer", func(t *testing.T) { + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 1) + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(t, ttlcache.NotIndexed, e.expiry.Index()) + require.Equal(t, 1, e.requests) + }) + + runStep(t, "updates are received", func(t *testing.T) { + select { + case update := <-ch: + require.NoError(t, update.Err) + require.Equal(t, cID, update.CorrelationID) + require.Equal(t, uint64(22), update.Meta.Index) + require.Equal(t, uint64(22), update.Result.(fakeResult).index) + case <-time.After(100 * time.Millisecond): + t.Fatalf("expected Get to unblock when new events are received") + } + + req.client.QueueEvents(newEventServiceHealthRegister(24, 2, "srv1")) + + select { + case update := <-ch: + require.NoError(t, update.Err) + require.Equal(t, cID, update.CorrelationID) + require.Equal(t, uint64(24), update.Meta.Index) + require.Equal(t, uint64(24), update.Result.(fakeResult).index) + case <-time.After(100 * time.Millisecond): + t.Fatalf("expected Get to unblock when new events are received") + } + }) + + runStep(t, "closing the notify starts the expiry counter", func(t *testing.T) { + cancel() + + retry.Run(t, func(r *retry.R) { + store.lock.Lock() + defer store.lock.Unlock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(r, 0, e.expiry.Index()) + require.Equal(r, 0, e.requests) + require.Equal(r, store.expiryHeap.Next().Entry, e.expiry) + }) + }) } +func TestStore_Notify_ManyRequests(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := NewStore(hclog.New(nil)) + go store.Run(ctx) + + req := &fakeRequest{ + client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace), + } + req.client.QueueEvents(newEndOfSnapshotEvent(2)) + + cID := "correlate" + ch1 := make(chan cache.UpdateEvent) + ch2 := make(chan cache.UpdateEvent) + + require.NoError(t, store.Notify(ctx, req, cID, ch1)) + assertRequestCount(t, store, req, 1) + + require.NoError(t, store.Notify(ctx, req, cID, ch2)) + assertRequestCount(t, store, req, 2) + + req.index = 15 + + go func() { + _, _ = store.Get(ctx, req) + }() + + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 3) + }) + + go func() { + _, _ = store.Get(ctx, req) + }() + + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 4) + }) + + var req2 *fakeRequest + + runStep(t, "Get and Notify with a different key", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req2 = &fakeRequest{client: req.client, key: "key2"} + + require.NoError(t, store.Notify(ctx, req2, cID, ch1)) + go func() { + _, _ = store.Get(ctx, req2) + }() + + // the original entry should still be at count 4 + assertRequestCount(t, store, req, 4) + // the new entry should be at count 2 + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req2, 2) + }) + }) + + runStep(t, "end all the requests", func(t *testing.T) { + req.client.QueueEvents( + newEventServiceHealthRegister(10, 1, "srv1"), + newEventServiceHealthRegister(12, 2, "srv1"), + newEventServiceHealthRegister(13, 1, "srv2"), + newEventServiceHealthRegister(16, 3, "srv2")) + + // The two Get requests should exit now that the index has been updated + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 2) + }) + + // Cancel the context so all requests terminate + cancel() + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 0) + }) + }) + + runStep(t, "the expiry heap should contain two entries", func(t *testing.T) { + store.lock.Lock() + defer store.lock.Unlock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + e2 := store.byKey[makeEntryKey(req2.Type(), req2.CacheInfo())] + require.Equal(t, 0, e2.expiry.Index()) + require.Equal(t, 1, e.expiry.Index()) + + require.Equal(t, store.expiryHeap.Next().Entry, e2.expiry) + }) +} + +type testingT interface { + Helper() + Fatalf(string, ...interface{}) +} + +func assertRequestCount(t testingT, s *Store, req Request, expected int) { + t.Helper() + + key := makeEntryKey(req.Type(), req.CacheInfo()) + + s.lock.Lock() + defer s.lock.Unlock() + actual := s.byKey[key].requests + if actual != expected { + t.Fatalf("expected request count to be %d, got %d", expected, actual) + } +} + +// TODO: test expiration + func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.Helper() if !t.Run(name, fn) {