From 45e49f31de3b54fad31561d904bdd683ef4dcdec Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sun, 3 Jun 2018 13:15:09 -0700 Subject: [PATCH] agent/cache: change behavior to return error rather than retry The cache behavior should not be to mask errors and retry. Instead, it should aim to return errors as quickly as possible. We do that here. --- agent/cache/cache.go | 47 +++++++++++++++++++-------------------- agent/cache/cache_test.go | 10 ++------- agent/cache/testing.go | 2 ++ 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index c03224eeda..54f2707e63 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -192,7 +192,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) { key := c.entryKey(&info) // First time through - var attempt uint + first := true // timeoutCh for watching our timeout var timeoutCh <-chan time.Time @@ -209,7 +209,7 @@ RETRY_GET: // we have. if ok && entry.Valid { if info.MinIndex == 0 || info.MinIndex < entry.Index { - if attempt == 0 { + if first { metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) atomic.AddUint64(&c.hits, 1) } @@ -220,7 +220,11 @@ RETRY_GET: c.entriesExpiryHeap.Fix(entry.Expiry) c.entriesLock.Unlock() - return entry.Value, entry.Error + // We purposely do not return an error here since the cache + // only works with fetching values that either have a value + // or have an error, but not both. The Error may be non-nil + // in the entry because of this to note future fetch errors. + return entry.Value, nil } } @@ -229,11 +233,11 @@ RETRY_GET: // a retry loop getting the same error for the entire duration of the // timeout. Instead, we make one effort to fetch a new value, and if // there was an error, we return. - if attempt > 0 && entry.Error != nil { + if !first && entry.Error != nil { return entry.Value, entry.Error } - if attempt == 0 { + if first { // Record the miss if its our first time through atomic.AddUint64(&c.misses, 1) @@ -248,7 +252,7 @@ RETRY_GET: } // No longer our first time through - attempt++ + first = false // Set our timeout channel if we must if info.Timeout > 0 && timeoutCh == nil { @@ -257,7 +261,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh, err := c.fetch(t, key, r, true, attempt) + waiterCh, err := c.fetch(t, key, r, true, 0) if err != nil { return nil, err } @@ -296,16 +300,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- return nil, fmt.Errorf("unknown type in cache: %s", t) } - // If we're over the attempt minimum, start an exponential backoff. - if attempt > CacheRefreshBackoffMin { - waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second - if waitTime > CacheRefreshMaxWait { - waitTime = CacheRefreshMaxWait - } - - time.Sleep(waitTime) - } - // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() @@ -354,7 +348,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- // A new value was given, so we create a brand new entry. newEntry.Value = result.Value newEntry.Index = result.Index - newEntry.Error = err // This is a valid entry with a result newEntry.Valid = true @@ -374,12 +367,8 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- // Increment attempt counter attempt++ - // If the entry wasn't valid, we set an error. If it was valid, - // we don't set an error so that the prior value can continue - // being used. This will be evicted if the TTL comes up. - if !newEntry.Valid { - newEntry.Error = err - } + // Set the error that should be used if the fetch is failing. + newEntry.Error = err } // Create a new waiter that will be used for the next fetch. @@ -448,6 +437,16 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin return } + // If we're over the attempt minimum, start an exponential backoff. + if attempt > CacheRefreshBackoffMin { + waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second + if waitTime > CacheRefreshMaxWait { + waitTime = CacheRefreshMaxWait + } + + time.Sleep(waitTime) + } + // If we have a timer, wait for it if opts.RefreshTimer > 0 { time.Sleep(opts.RefreshTimer) diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 1f643ed9f3..0d8ecb5b0b 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -285,16 +285,10 @@ func TestCacheGet_blockingIndexBackoff(t *testing.T) { resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) TestCacheGetChResult(t, resultCh, 1) - // Fetch should block + // Fetch should not block and should return error resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ Key: "hello", MinIndex: 7, Timeout: 1 * time.Minute})) - - // Should block - select { - case <-resultCh: - t.Fatal("should block") - case <-time.After(50 * time.Millisecond): - } + TestCacheGetChResult(t, resultCh, nil) // Wait a bit time.Sleep(100 * time.Millisecond) diff --git a/agent/cache/testing.go b/agent/cache/testing.go index 365dc3b4e5..46f072e423 100644 --- a/agent/cache/testing.go +++ b/agent/cache/testing.go @@ -44,7 +44,9 @@ func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface if !reflect.DeepEqual(result, expected) { t.Fatalf("Result doesn't match!\n\n%#v\n\n%#v", result, expected) } + case <-time.After(50 * time.Millisecond): + t.Fatalf("Result not sent on channel") } }