diff --git a/agent/cache/watch.go b/agent/cache/watch.go index af3b097c5b..ed22a70d39 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -50,26 +50,30 @@ type UpdateEvent struct { // value that allows them to disambiguate between events in the returned chan // when sharing a chan between multiple cache entries. If the chan is closed, // the notify loop will terminate. -func (c *Cache) Notify(ctx context.Context, t string, r Request, - correlationID string, ch chan<- UpdateEvent) error { - - // Get the type that we're fetching +func (c *Cache) Notify( + ctx context.Context, + t string, + r Request, + correlationID string, + ch chan<- UpdateEvent, +) error { c.typesLock.RLock() tEntry, ok := c.types[t] c.typesLock.RUnlock() if !ok { return fmt.Errorf("unknown type in cache: %s", t) } + if tEntry.Type.SupportsBlocking() { go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch) - } else { - info := r.CacheInfo() - if info.MaxAge == 0 { - return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") - } - go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) + return nil } + info := r.CacheInfo() + if info.MaxAge == 0 { + return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") + } + go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) return nil } @@ -107,10 +111,10 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req index = meta.Index } + var wait time.Duration // Handle errors with backoff. Badly behaved blocking calls that returned // a zero index are considered as failures since we need to not get stuck // in a busy loop. - wait := 0 * time.Second if err == nil && meta.Index > 0 { failures = 0 } else { @@ -173,6 +177,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ failures++ } + var wait time.Duration // Determining how long to wait before the next poll is complicated. // First off the happy path and the error path waits are handled distinctly // @@ -194,23 +199,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ // as this would eliminate the single-flighting of these requests in the cache and // the efficiencies gained by it. if failures > 0 { - - errWait := backOffWait(failures) - select { - case <-time.After(errWait): - case <-ctx.Done(): - return - } + wait = backOffWait(failures) } else { - // Default to immediately re-poll. This only will happen if the data - // we just got out of the cache is already too stale - pollWait := 0 * time.Second - // Calculate when the cached data's Age will get too stale and // need to be re-queried. When the data's Age already exceeds the // maxAge the pollWait value is left at 0 to immediately re-poll if meta.Age <= maxAge { - pollWait = maxAge - meta.Age + wait = maxAge - meta.Age } // Add a small amount of random jitter to the polling time. One @@ -222,13 +217,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ // and then immediately have to re-fetch again. That wouldn't // be terrible but it would expend a bunch more cpu cycles when // we can definitely avoid it. - pollWait += lib.RandomStagger(maxAge / 16) + wait += lib.RandomStagger(maxAge / 16) + } - select { - case <-time.After(pollWait): - case <-ctx.Done(): - return - } + select { + case <-time.After(wait): + case <-ctx.Done(): + return } } }