diff --git a/agent/cache/cache.go b/agent/cache/cache.go index a05ae85cc8..44d6958bcc 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -214,6 +214,47 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { return c.getWithIndex(t, r, r.CacheInfo().MinIndex) } +// getEntryLocked retrieves a cache entry and checks if it is ready to be +// returned given the other parameters. It reads from entries and the caller +// has to issue a read lock if necessary. +func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duration, revalidate bool, minIndex uint64) (bool, bool, cacheEntry) { + entry, ok := c.entries[key] + cacheHit := false + + if !ok { + return ok, cacheHit, entry + } + + // Check if we have a hit + cacheHit = ok && entry.Valid + + supportsBlocking := tEntry.Type.SupportsBlocking() + + // Check index is not specified or lower than value, or the type doesn't + // support blocking. + if cacheHit && supportsBlocking && + minIndex > 0 && minIndex >= entry.Index { + // MinIndex was given and matches or is higher than current value so we + // ignore the cache and fallthrough to blocking on a new value below. + cacheHit = false + } + + // Check MaxAge is not exceeded if this is not a background refreshing type + // and MaxAge was specified. + if cacheHit && !tEntry.Opts.Refresh && maxAge > 0 && + !entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) { + cacheHit = false + } + + // Check if we are requested to revalidate. If so the first time round the + // loop is not a hit but subsequent ones should be treated normally. + if cacheHit && !tEntry.Opts.Refresh && revalidate { + cacheHit = false + } + + return ok, cacheHit, entry +} + // getWithIndex implements the main Get functionality but allows internal // callers (Watch) to manipulate the blocking index separately from the actual // request object. @@ -249,36 +290,9 @@ RETRY_GET: // Get the current value c.entriesLock.RLock() - entry, ok := c.entries[key] + _, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) c.entriesLock.RUnlock() - // Check if we have a hit - cacheHit := ok && entry.Valid - - supportsBlocking := tEntry.Type.SupportsBlocking() - - // Check index is not specified or lower than value, or the type doesn't - // support blocking. - if cacheHit && supportsBlocking && - minIndex > 0 && minIndex >= entry.Index { - // MinIndex was given and matches or is higher than current value so we - // ignore the cache and fallthrough to blocking on a new value below. - cacheHit = false - } - - // Check MaxAge is not exceeded if this is not a background refreshing type - // and MaxAge was specified. - if cacheHit && !tEntry.Opts.Refresh && info.MaxAge > 0 && - !entry.FetchedAt.IsZero() && info.MaxAge < time.Since(entry.FetchedAt) { - cacheHit = false - } - - // Check if we are requested to revalidate. If so the first time round the - // loop is not a hit but subsequent ones should be treated normally. - if cacheHit && !tEntry.Opts.Refresh && info.MustRevalidate && first { - cacheHit = false - } - if cacheHit { meta := ResultMeta{Index: entry.Index} if first { @@ -344,9 +358,6 @@ RETRY_GET: } } - // No longer our first time through - first = false - // Set our timeout channel if we must if info.Timeout > 0 && timeoutCh == nil { timeoutCh = time.After(info.Timeout) @@ -354,11 +365,14 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh, err := c.fetch(t, key, r, true, 0) + waiterCh, err := c.fetch(t, key, r, true, 0, minIndex, false, !first) if err != nil { return nil, ResultMeta{Index: entry.Index}, err } + // No longer our first time through + first = false + select { case <-waiterCh: // Our fetch returned, retry the get from the cache. @@ -384,7 +398,7 @@ func (c *Cache) entryKey(t string, r *RequestInfo) string { // If allowNew is true then the fetch should create the cache entry // if it doesn't exist. If this is false, then fetch will do nothing // if the entry doesn't exist. This latter case is to support refreshing. -func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-chan struct{}, error) { +func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) { // Get the type that we're fetching c.typesLock.RLock() tEntry, ok := c.types[t] @@ -393,10 +407,20 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- return nil, fmt.Errorf("unknown type in cache: %s", t) } + info := r.CacheInfo() + // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() - entry, ok := c.entries[key] + ok, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex) + + // This handles the case where a fetch succeeded after checking for its existence in + // getWithIndex. This ensures that we don't miss updates. + if ok && cacheHit && !ignoreExisting { + ch := make(chan struct{}) + close(ch) + return ch, nil + } // If we aren't allowing new values and we don't have an existing value, // return immediately. We return an immediately-closed channel so nothing @@ -656,7 +680,7 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin // Trigger. The "allowNew" field is false because in the time we were // waiting to refresh we may have expired and got evicted. If that // happened, we don't want to create a new entry. - c.fetch(t, key, r, false, attempt) + c.fetch(t, key, r, false, attempt, 0, true, true) } // runExpiryLoop is a blocking function that watches the expiration diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index cf5279a7d9..efab9d53b8 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -1123,7 +1123,6 @@ func TestCacheGet_nonBlockingType(t *testing.T) { t.Parallel() typ := TestTypeNonBlocking(t) - defer typ.AssertExpectations(t) c := TestCache(t) c.RegisterType("t", typ, nil)