diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 85ac163ba3..e9c8fb0c2a 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -32,6 +32,7 @@ type View interface { // separately and passed in in case the return type needs an Index field // populating. This allows implementations to not worry about maintaining // indexes seen during Update. + // TODO: remove error return value. Result(index uint64) (interface{}, error) // Reset the view to the zero state, done in preparation for receiving a new diff --git a/agent/submatview/store.go b/agent/submatview/store.go index f5c37173d4..69bcf8d411 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -20,9 +20,9 @@ type entry struct { materializer *Materializer expiry *ttlcache.Entry stop func() - // notifier is the count of active Notify goroutines. This entry will + // requests is the count of active requests using this entry. This entry will // remain in the store as long as this count remains > 0. - notifier int + requests int } // TODO: start expiration loop @@ -56,8 +56,8 @@ func (s *Store) Run(ctx context.Context) { e := s.byKey[he.Key()] - // Only stop the materializer if there are no active calls to Notify. - if e.notifier == 0 { + // Only stop the materializer if there are no active requests. + if e.requests == 0 { e.stop() delete(s.byKey, he.Key()) } @@ -68,24 +68,23 @@ func (s *Store) Run(ctx context.Context) { } // TODO: godoc -var idleTTL = 20 * time.Minute +type Request interface { + cache.Request + NewMaterializer() *Materializer + Type() string +} // Get a value from the store, blocking if the store has not yet seen the // req.Index value. // See agent/cache.Cache.Get for complete documentation. func (s *Store) Get( ctx context.Context, - // TODO: remove typ param, make it part of the Request interface. - typ string, req Request, // TODO: only the Index field of ResultMeta is relevant, return a result struct instead. ) (interface{}, cache.ResultMeta, error) { info := req.CacheInfo() - e := s.getEntry(getEntryOpts{ - typ: typ, - info: info, - newMaterializer: req.NewMaterializer, - }) + key, e := s.getEntry(req) + defer s.releaseEntry(key) // TODO: no longer any need to return cache.FetchResult from Materializer.Fetch // TODO: pass context instead of Done chan, also replaces Timeout param @@ -93,6 +92,7 @@ func (s *Store) Get( MinIndex: info.MinIndex, Timeout: info.Timeout, }) + return result.Value, cache.ResultMeta{Index: result.Index}, err } @@ -100,31 +100,17 @@ func (s *Store) Get( // See agent/cache.Cache.Notify for complete documentation. func (s *Store) Notify( ctx context.Context, - typ string, req Request, correlationID string, updateCh chan<- cache.UpdateEvent, ) error { info := req.CacheInfo() - e := s.getEntry(getEntryOpts{ - typ: typ, - info: info, - newMaterializer: req.NewMaterializer, - notifier: true, - }) + key, e := s.getEntry(req) go func() { - index := info.MinIndex - - // TODO: better way to handle this? - defer func() { - s.lock.Lock() - e.notifier-- - s.byKey[e.expiry.Key()] = e - s.expiryHeap.Update(e.expiry.Index(), idleTTL) - s.lock.Unlock() - }() + defer s.releaseEntry(key) + index := info.MinIndex for { result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index}) switch { @@ -149,56 +135,66 @@ func (s *Store) Notify( case <-ctx.Done(): return } - } }() return nil } -func (s *Store) getEntry(opts getEntryOpts) entry { - info := opts.info - key := makeEntryKey(opts.typ, info) +// getEntry from the store, and increment the requests counter. releaseEntry +// must be called when the request is finished to decrement the counter. +func (s *Store) getEntry(req Request) (string, entry) { + info := req.CacheInfo() + key := makeEntryKey(req.Type(), info) s.lock.Lock() defer s.lock.Unlock() e, ok := s.byKey[key] if ok { - s.expiryHeap.Update(e.expiry.Index(), info.Timeout+idleTTL) - if opts.notifier { - e.notifier++ - } - return e + e.requests++ + s.byKey[key] = e + return key, e } ctx, cancel := context.WithCancel(context.Background()) - mat := opts.newMaterializer() + mat := req.NewMaterializer() go mat.Run(ctx) e = entry{ materializer: mat, stop: cancel, - expiry: s.expiryHeap.Add(key, info.Timeout+idleTTL), - } - if opts.notifier { - e.notifier++ + requests: 1, } s.byKey[key] = e - return e + return key, e } -// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. -func makeEntryKey(typ string, r cache.RequestInfo) string { - return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key) -} +// idleTTL is the duration of time an entry should remain in the Store after the +// last request for that entry has been terminated. +var idleTTL = 20 * time.Minute -type Request interface { - cache.Request - NewMaterializer() *Materializer +// releaseEntry decrements the request count and starts an expiry timer if the +// count has reached 0. Must be called once for every call to getEntry. +func (s *Store) releaseEntry(key string) { + s.lock.Lock() + defer s.lock.Unlock() + e := s.byKey[key] + e.requests-- + s.byKey[key] = e + + if e.requests > 0 { + return + } + + if e.expiry.Index() == ttlcache.NotIndexed { + e.expiry = s.expiryHeap.Add(key, idleTTL) + s.byKey[key] = e + return + } + + s.expiryHeap.Update(e.expiry.Index(), idleTTL) } -type getEntryOpts struct { - typ string - info cache.RequestInfo - newMaterializer func() *Materializer - notifier bool +// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. +func makeEntryKey(typ string, r cache.RequestInfo) string { + return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key) } diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index a567720faa..97eed9cccb 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -30,7 +30,7 @@ func TestStore_Get_Fresh(t *testing.T) { newEventServiceHealthRegister(10, 1, "srv1"), newEventServiceHealthRegister(22, 2, "srv1")) - result, md, err := store.Get(ctx, "test", req) + result, md, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(22), md.Index) @@ -39,11 +39,11 @@ func TestStore_Get_Fresh(t *testing.T) { require.Len(t, r.srvs, 2) require.Equal(t, uint64(22), r.index) + store.lock.Lock() require.Len(t, store.byKey, 1) - e := store.byKey[makeEntryKey("test", req.CacheInfo())] + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] require.Equal(t, 0, e.expiry.Index()) - store.lock.Lock() defer store.lock.Unlock() require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) } @@ -80,6 +80,10 @@ func (r *fakeRequest) NewMaterializer() *Materializer { }) } +func (r *fakeRequest) Type() string { + return fmt.Sprintf("%T", r) +} + type fakeView struct { srvs map[string]*pbservice.CheckServiceNode }