diff --git a/agent/cache-types/catalog_list_services.go b/agent/cache-types/catalog_list_services.go index 7d38f23ae3..1c602cdb29 100644 --- a/agent/cache-types/catalog_list_services.go +++ b/agent/cache-types/catalog_list_services.go @@ -38,9 +38,12 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request) // going to be served from cache and end up arbitrarily stale anyway. This // allows cached service-discover to automatically read scale across all // servers too. - reqReal.AllowStale = true + reqReal.QueryOptions.AllowStale = true + + if opts.LastResult != nil { + reqReal.QueryOptions.AllowNotModifiedResponse = true + } - // Fetch var reply structs.IndexedServices if err := c.RPC.RPC("Catalog.ListServices", reqReal, &reply); err != nil { return result, err @@ -48,5 +51,6 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request) result.Value = &reply result.Index = reply.QueryMeta.Index + result.NotModified = reply.QueryMeta.NotModified return result, nil } diff --git a/agent/cache-types/catalog_list_services_test.go b/agent/cache-types/catalog_list_services_test.go index ee806ff88e..efb1ff33fd 100644 --- a/agent/cache-types/catalog_list_services_test.go +++ b/agent/cache-types/catalog_list_services_test.go @@ -1,6 +1,7 @@ package cachetype import ( + "context" "testing" "time" @@ -60,3 +61,58 @@ func TestCatalogListServices_badReqType(t *testing.T) { require.Contains(t, err.Error(), "wrong type") rpc.AssertExpectations(t) } + +func TestCatalogListServices_IntegrationWithCache_NotModifiedResponse(t *testing.T) { + rpc := &MockRPC{} + typ := &CatalogListServices{RPC: rpc} + + services := map[string][]string{ + "foo": {"prod", "linux"}, + "bar": {"qa", "windows"}, + } + rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.DCSpecificRequest) + require.True(t, req.AllowStale) + require.True(t, req.AllowNotModifiedResponse) + + reply := args.Get(2).(*structs.IndexedServices) + reply.QueryMeta.Index = 44 + reply.NotModified = true + }) + + c := cache.New(nil) + c.RegisterType(CatalogListServicesName, typ) + last := cache.FetchResult{ + Value: &structs.IndexedServices{ + Services: services, + QueryMeta: structs.QueryMeta{Index: 42}, + }, + Index: 42, + } + req := &structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{ + Token: "token", + MinQueryIndex: 44, + MaxQueryTime: time.Second, + }, + } + + err := c.Prepopulate(CatalogListServicesName, last, "dc1", "token", req.CacheInfo().Key) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + actual, _, err := c.Get(ctx, CatalogListServicesName, req) + require.NoError(t, err) + + expected := &structs.IndexedServices{ + Services: services, + QueryMeta: structs.QueryMeta{Index: 42}, + } + require.Equal(t, expected, actual) + + rpc.AssertExpectations(t) +} diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 9b8845b649..2b1f3c1202 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -18,6 +18,7 @@ import ( "container/heap" "context" "fmt" + "strconv" "sync" "sync/atomic" "time" @@ -167,15 +168,12 @@ type RegisterOptions struct { // client requests them with MinIndex. SupportsBlocking bool - // RefreshTimer is the time between attempting to refresh data. + // RefreshTimer is the time to sleep between attempts to refresh data. // If this is zero, then data is refreshed immediately when a fetch // is returned. // - // RefreshTimeout determines the maximum query time for a refresh - // operation. This is specified as part of the query options and is - // expected to be implemented by the Type itself. - // - // Using these values, various "refresh" mechanisms can be implemented: + // Using different values for RefreshTimer and RefreshTimeout, various + // "refresh" mechanisms can be implemented: // // * With a high timer duration and a low timeout, a timer-based // refresh can be set that minimizes load on the Consul servers. @@ -184,7 +182,11 @@ type RegisterOptions struct { // refresh can be set so that changes in server data are recognized // within the cache very quickly. // - RefreshTimer time.Duration + RefreshTimer time.Duration + + // RefreshTimeout is the default value for the maximum query time for a fetch + // operation. It is set as FetchOptions.Timeout so that cache.Type + // implementations can use it as the MaxQueryTime. RefreshTimeout time.Duration } @@ -473,8 +475,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // keepalives are every 30 seconds so the RPC should fail if the packets are // being blackholed for more than 30 seconds. var connectedTimer *time.Timer - if tEntry.Opts.Refresh && entry.Index > 0 && - tEntry.Opts.RefreshTimeout > (31*time.Second) { + if tEntry.Opts.Refresh && entry.Index > 0 && tEntry.Opts.RefreshTimeout > 31*time.Second { connectedTimer = time.AfterFunc(31*time.Second, func() { c.entriesLock.Lock() defer c.entriesLock.Unlock() @@ -520,7 +521,9 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign if result.Value != nil { // A new value was given, so we create a brand new entry. - newEntry.Value = result.Value + if !result.NotModified { + newEntry.Value = result.Value + } newEntry.State = result.State newEntry.Index = result.Index newEntry.FetchedAt = time.Now() @@ -551,8 +554,9 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // Error handling if err == nil { - metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) - metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1) + labels := []metrics.Label{{Name: "result_not_modified", Value: strconv.FormatBool(result.NotModified)}} + metrics.IncrCounterWithLabels([]string{"consul", "cache", "fetch_success"}, 1, labels) + metrics.IncrCounterWithLabels([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1, labels) if result.Index > 0 { // Reset the attempts counter so we don't have any backoff diff --git a/agent/cache/type.go b/agent/cache/type.go index deae8a573c..febdc18d7b 100644 --- a/agent/cache/type.go +++ b/agent/cache/type.go @@ -78,4 +78,8 @@ type FetchResult struct { // Index is the corresponding index value for this data. Index uint64 + + // NotModified indicates that the Value has not changed since LastResult, and + // the LastResult value should be used instead of Value. + NotModified bool }