diff --git a/agent/agent.go b/agent/agent.go index 2d7d32c4fa..d9565d9974 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -664,7 +664,7 @@ func (a *Agent) Start(ctx context.Context) error { a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger) // create the cache - a.cache = cache.New(nil) + a.cache = cache.New(c.Cache) // create the config for the rpc server/client consulCfg, err := a.consulConfig() diff --git a/agent/agent_test.go b/agent/agent_test.go index f04d36b0e9..75366fcf6b 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -17,6 +17,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "testing" "time" @@ -857,6 +858,98 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) { } } +func TestCacheRateLimit(test *testing.T) { + test.Parallel() + tests := []struct { + // count := number of updates performed (1 every 10ms) + count int + // rateLimit rate limiting of cache + rateLimit float64 + // Minimum number of updates to see from a cache perspective + // We add a value with tolerance to work even on a loaded CI + minUpdates int + }{ + // 250 => we have a test running for at least 2.5s + {250, 0.5, 1}, + {250, 1, 1}, + {300, 2, 2}, + } + for _, currentTest := range tests { + test.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) { + tt := currentTest + t.Parallel() + a := NewTestAgent(t, fmt.Sprintf("cache = { entry_fetch_rate = %v, entry_fetch_max_burst = 1 }", tt.rateLimit)) + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + var wg sync.WaitGroup + stillProcessing := true + + injectService := func(i int) { + srv := &structs.NodeService{ + Service: "redis", + ID: "redis", + Port: 1024 + i, + Address: fmt.Sprintf("10.0.1.%d", i%255), + } + + err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote) + require.Nil(t, err) + } + + runUpdates := func() { + wg.Add(tt.count) + for i := 0; i < tt.count; i++ { + time.Sleep(10 * time.Millisecond) + injectService(i) + wg.Done() + } + stillProcessing = false + } + + getIndex := func(t *testing.T, oldIndex int) int { + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil) + require.NoError(t, err) + + resp := httptest.NewRecorder() + a.srv.Handler.ServeHTTP(resp, req) + // Key doesn't actually exist so we should get 404 + if got, want := resp.Code, http.StatusOK; got != want { + t.Fatalf("bad response code got %d want %d", got, want) + } + index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index")) + require.NoError(t, err) + return index + } + + { + start := time.Now() + injectService(0) + // Get the first index + index := getIndex(t, 0) + require.Greater(t, index, 2) + go runUpdates() + numberOfUpdates := 0 + for stillProcessing { + oldIndex := index + index = getIndex(t, oldIndex) + require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only") + numberOfUpdates++ + } + elapsed := time.Since(start) + qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed) + summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit) + + // We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock + require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary)) + // We must have at least being notified a few times + require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary)) + } + wg.Wait() + }) + } +} + func TestAddServiceIPv4TaggedDefault(t *testing.T) { t.Helper() diff --git a/agent/cache/cache.go b/agent/cache/cache.go index a216d4ab8a..281b76c57b 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -25,6 +25,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/lib" + "golang.org/x/time/rate" ) //go:generate mockery -all -inpkg @@ -81,6 +82,10 @@ type Cache struct { stopped uint32 // stopCh is closed when Close is called stopCh chan struct{} + // options includes a per Cache Rate limiter specification to avoid performing too many queries + options Options + rateLimitContext context.Context + rateLimitCancel context.CancelFunc } // typeEntry is a single type that is registered with a Cache. @@ -122,23 +127,29 @@ type ResultMeta struct { // Options are options for the Cache. type Options struct { - // Nothing currently, reserved. + // EntryFetchMaxBurst max burst size of RateLimit for a single cache entry + EntryFetchMaxBurst int + // EntryFetchRate represents the max calls/sec for a single cache entry + EntryFetchRate rate.Limit } // New creates a new cache with the given RPC client and reasonable defaults. // Further settings can be tweaked on the returned value. -func New(*Options) *Cache { +func New(options Options) *Cache { // Initialize the heap. The buffer of 1 is really important because // its possible for the expiry loop to trigger the heap to update // itself and it'd block forever otherwise. h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} heap.Init(h) - + ctx, cancel := context.WithCancel(context.Background()) c := &Cache{ types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), entriesExpiryHeap: h, stopCh: make(chan struct{}), + options: options, + rateLimitContext: ctx, + rateLimitCancel: cancel, } // Start the expiry watcher @@ -454,7 +465,14 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // If we don't have an entry, then create it. The entry must be marked // as invalid so that it isn't returned as a valid value for a zero index. if !ok { - entry = cacheEntry{Valid: false, Waiter: make(chan struct{})} + entry = cacheEntry{ + Valid: false, + Waiter: make(chan struct{}), + FetchRateLimiter: rate.NewLimiter( + c.options.EntryFetchRate, + c.options.EntryFetchMaxBurst, + ), + } } // Set that we're fetching to true, which makes it so that future @@ -504,7 +522,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign Index: entry.Index, } } - + if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil { + if connectedTimer != nil { + connectedTimer.Stop() + } + entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error()) + return + } // Start building the new entry by blocking on the fetch. result, err := r.Fetch(fOpts) if connectedTimer != nil { @@ -728,6 +752,7 @@ func (c *Cache) Close() error { if wasStopped == 0 { // First time only, close stop chan close(c.stopCh) + c.rateLimitCancel() } return nil } @@ -747,6 +772,10 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro FetchedAt: time.Now(), Waiter: make(chan struct{}), Expiry: &cacheEntryExpiry{Key: key}, + FetchRateLimiter: rate.NewLimiter( + c.options.EntryFetchRate, + c.options.EntryFetchMaxBurst, + ), } c.entriesLock.Lock() c.entries[key] = newEntry diff --git a/agent/cache/entry.go b/agent/cache/entry.go index 9d7678a056..440c654ba8 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -3,6 +3,8 @@ package cache import ( "container/heap" "time" + + "golang.org/x/time/rate" ) // cacheEntry stores a single cache entry. @@ -41,6 +43,8 @@ type cacheEntry struct { // background request has be blocking for at least 5 seconds, which ever // happens first. RefreshLostContact time.Time + // FetchRateLimiter limits the rate at which fetch is called for this entry. + FetchRateLimiter *rate.Limiter } // cacheEntryExpiry contains the expiration information for a cache diff --git a/agent/cache/testing.go b/agent/cache/testing.go index edce5473bc..3e03144dae 100644 --- a/agent/cache/testing.go +++ b/agent/cache/testing.go @@ -8,12 +8,13 @@ import ( "github.com/mitchellh/go-testing-interface" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" ) // TestCache returns a Cache instance configuring for testing. func TestCache(t testing.T) *Cache { // Simple but lets us do some fine-tuning later if we want to. - return New(nil) + return New(Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2}) } // TestCacheGetCh returns a channel that returns the result of the Get call. diff --git a/agent/config/builder.go b/agent/config/builder.go index 54b7659605..2dbe98df87 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -14,6 +14,7 @@ import ( "strings" "time" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul" @@ -32,6 +33,15 @@ import ( "golang.org/x/time/rate" ) +// The following constants are default values for some settings +// Ensure to update documentation if you modify those values +const ( + // DefaultEntryFetchMaxBurst is the default value for cache.entry_fetch_max_burst + DefaultEntryFetchMaxBurst = 2 + // DefaultEntryFetchRate is the default value for cache.entry_fetch_rate + DefaultEntryFetchRate = float64(rate.Inf) +) + // Builder constructs a valid runtime configuration from multiple // configuration sources. // @@ -882,11 +892,17 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { }, // Agent - AdvertiseAddrLAN: advertiseAddrLAN, - AdvertiseAddrWAN: advertiseAddrWAN, - BindAddr: bindAddr, - Bootstrap: b.boolVal(c.Bootstrap), - BootstrapExpect: b.intVal(c.BootstrapExpect), + AdvertiseAddrLAN: advertiseAddrLAN, + AdvertiseAddrWAN: advertiseAddrWAN, + BindAddr: bindAddr, + Bootstrap: b.boolVal(c.Bootstrap), + BootstrapExpect: b.intVal(c.BootstrapExpect), + Cache: cache.Options{ + EntryFetchRate: rate.Limit( + b.float64ValWithDefault(c.Cache.EntryFetchRate, DefaultEntryFetchRate)), + EntryFetchMaxBurst: b.intValWithDefault( + c.Cache.EntryFetchMaxBurst, DefaultEntryFetchMaxBurst), + }, CAFile: b.stringVal(c.CAFile), CAPath: b.stringVal(c.CAPath), CertFile: b.stringVal(c.CertFile), @@ -1014,6 +1030,13 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { Watches: c.Watches, } + if rt.Cache.EntryFetchMaxBurst <= 0 { + return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst) + } + if rt.Cache.EntryFetchRate <= 0 { + return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_rate must be strictly positive, was: %v", rt.Cache.EntryFetchRate) + } + if entCfg, err := b.BuildEnterpriseRuntimeConfig(&c); err != nil { return RuntimeConfig{}, err } else { @@ -1645,14 +1668,18 @@ func (b *Builder) stringVal(v *string) string { return b.stringValWithDefault(v, "") } -func (b *Builder) float64Val(v *float64) float64 { +func (b *Builder) float64ValWithDefault(v *float64, defaultVal float64) float64 { if v == nil { - return 0 + return defaultVal } return *v } +func (b *Builder) float64Val(v *float64) float64 { + return b.float64ValWithDefault(v, 0) +} + func (b *Builder) cidrsVal(name string, v []string) (nets []*net.IPNet) { if v == nil { return diff --git a/agent/config/config.go b/agent/config/config.go index 2ed81f6b92..eaa10d62a1 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -58,6 +58,14 @@ func Parse(data string, format string) (c Config, md mapstructure.Metadata, err return c, md, nil } +// Cache is the tunning configuration for cache, values are optional +type Cache struct { + // EntryFetchMaxBurst max burst size of RateLimit for a single cache entry + EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"` + // EntryFetchRate represents the max calls/sec for a single cache entry + EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"` +} + // Config defines the format of a configuration file in either JSON or // HCL format. // @@ -101,6 +109,7 @@ type Config struct { BindAddr *string `json:"bind_addr,omitempty" hcl:"bind_addr" mapstructure:"bind_addr"` Bootstrap *bool `json:"bootstrap,omitempty" hcl:"bootstrap" mapstructure:"bootstrap"` BootstrapExpect *int `json:"bootstrap_expect,omitempty" hcl:"bootstrap_expect" mapstructure:"bootstrap_expect"` + Cache Cache `json:"cache,omitempty" hcl:"cache" mapstructure:"cache"` CAFile *string `json:"ca_file,omitempty" hcl:"ca_file" mapstructure:"ca_file"` CAPath *string `json:"ca_path,omitempty" hcl:"ca_path" mapstructure:"ca_path"` CertFile *string `json:"cert_file,omitempty" hcl:"cert_file" mapstructure:"cert_file"` diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 0cbeec8b49..db3f926a04 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" @@ -444,6 +445,9 @@ type RuntimeConfig struct { // flag: -bootstrap-expect=int BootstrapExpect int + // Cache represent cache configuration of agent + Cache cache.Options + // CAFile is a path to a certificate authority file. This is used with // VerifyIncoming or VerifyOutgoing to verify the TLS connection. // diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 9399063aad..d3a564df2b 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" @@ -4411,6 +4412,10 @@ func TestFullConfig(t *testing.T) { "bind_addr": "16.99.34.17", "bootstrap": true, "bootstrap_expect": 53, + "cache": { + "entry_fetch_max_burst": 42, + "entry_fetch_rate": 0.334 + }, "ca_file": "erA7T0PM", "ca_path": "mQEN1Mfp", "cert_file": "7s4QAzDk", @@ -5071,6 +5076,10 @@ func TestFullConfig(t *testing.T) { bind_addr = "16.99.34.17" bootstrap = true bootstrap_expect = 53 + cache = { + entry_fetch_max_burst = 42 + entry_fetch_rate = 0.334 + }, ca_file = "erA7T0PM" ca_path = "mQEN1Mfp" cert_file = "7s4QAzDk" @@ -5797,10 +5806,14 @@ func TestFullConfig(t *testing.T) { BindAddr: ipAddr("16.99.34.17"), Bootstrap: true, BootstrapExpect: 53, - CAFile: "erA7T0PM", - CAPath: "mQEN1Mfp", - CertFile: "7s4QAzDk", - CheckOutputMaxSize: checks.DefaultBufSize, + Cache: cache.Options{ + EntryFetchMaxBurst: 42, + EntryFetchRate: 0.334, + }, + CAFile: "erA7T0PM", + CAPath: "mQEN1Mfp", + CertFile: "7s4QAzDk", + CheckOutputMaxSize: checks.DefaultBufSize, Checks: []*structs.CheckDefinition{ { ID: "uAjE6m9Z", @@ -6679,6 +6692,10 @@ func TestSanitize(t *testing.T) { &net.TCPAddr{IP: net.ParseIP("1.2.3.4"), Port: 5678}, &net.UnixAddr{Name: "/var/run/foo"}, }, + Cache: cache.Options{ + EntryFetchMaxBurst: 42, + EntryFetchRate: 0.334, + }, ConsulCoordinateUpdatePeriod: 15 * time.Second, RetryJoinLAN: []string{ "foo=bar key=baz secret=boom bang=bar", @@ -6749,6 +6766,10 @@ func TestSanitize(t *testing.T) { "BindAddr": "127.0.0.1", "Bootstrap": false, "BootstrapExpect": 0, + "Cache": { + "EntryFetchMaxBurst": 42, + "EntryFetchRate": 0.334 + }, "CAFile": "", "CAPath": "", "CertFile": "", diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index d2154b537a..bcd1a476ba 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -7,6 +7,7 @@ import ( "github.com/mitchellh/copystructure" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" @@ -461,7 +462,7 @@ func TestManager_deliverLatest(t *testing.T) { // None of these need to do anything to test this method just be valid logger := testutil.Logger(t) cfg := ManagerConfig{ - Cache: cache.New(nil), + Cache: cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2}), State: local.NewState(local.Config{}, logger, &token.Store{}), Source: &structs.QuerySource{ Node: "node1", diff --git a/website/pages/docs/agent/options.mdx b/website/pages/docs/agent/options.mdx index 153ee5652b..3b3975ca76 100644 --- a/website/pages/docs/agent/options.mdx +++ b/website/pages/docs/agent/options.mdx @@ -959,6 +959,24 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `bind_addr` Equivalent to the [`-bind` command-line flag](#_bind). +- `cache` Cache configuration of agent. The configurable values are the following: + + - `entry_fetch_max_burst`: The size of the token bucket used to recharge the rate-limit per + cache entry. The default value is 2 and means that when cache has not been updated + for a long time, 2 successive queries can be made as long as the rate-limit is not + reached. + + - `entry_fetch_rate`: configures the rate-limit at which the cache may refresh a single + entry. On a cluster with many changes/s, watching changes in the cache might put high + pressure on the servers. This ensures the number of requests for a single cache entry + will never go beyond this limit, even when a given service changes every 1/100s. + Since this is a per cache entry limit, having a highly unstable service will only rate + limit the watched on this service, but not the other services/entries. + The value is strictly positive, expressed in queries per second as a float, + 1 means 1 query per second, 0.1 mean 1 request every 10s maximum. + The default value is "No limit" and should be tuned on large + clusters to avoid performing too many RPCs on entries changing a lot. + - `ca_file` This provides a file path to a PEM-encoded certificate authority. The certificate authority is used to check the authenticity of client and server connections with the appropriate [`verify_incoming`](#verify_incoming)