From 0003720f787ed8fae1b8f2973c014663879a3019 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 15 Oct 2020 13:51:57 -0400 Subject: [PATCH] agent/router: refactor calculation of delay between rebalances. This change attempts to make the delay logic more obvious by: * remove indirection, inline a bunch of function calls * move all the code and constants next to each other * replace the two constant values with a single value * reword the comments. --- agent/router/manager.go | 132 ++++++++++++++------------ agent/router/manager_internal_test.go | 6 +- agent/router/router.go | 9 +- 3 files changed, 76 insertions(+), 71 deletions(-) diff --git a/agent/router/manager.go b/agent/router/manager.go index 4aaab97597..8d2e9658ee 100644 --- a/agent/router/manager.go +++ b/agent/router/manager.go @@ -12,44 +12,10 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/lib" - "github.com/hashicorp/consul/logging" "github.com/hashicorp/go-hclog" -) -const ( - // clientRPCJitterFraction determines the amount of jitter added to - // clientRPCMinReuseDuration before a connection is expired and a new - // connection is established in order to rebalance load across consul - // servers. The cluster-wide number of connections per second from - // rebalancing is applied after this jitter to ensure the CPU impact - // is always finite. See newRebalanceConnsPerSecPerServer's comment - // for additional commentary. - // - // For example, in a 10K consul cluster with 5x servers, this default - // averages out to ~13 new connections from rebalancing per server - // per second (each connection is reused for 120s to 180s). - clientRPCJitterFraction = 2 - - // clientRPCMinReuseDuration controls the minimum amount of time RPC - // queries are sent over an established connection to a single server - clientRPCMinReuseDuration = 120 * time.Second - - // Limit the number of new connections a server receives per second - // for connection rebalancing. This limit caps the load caused by - // continual rebalancing efforts when a cluster is in equilibrium. A - // lower value comes at the cost of increased recovery time after a - // partition. This parameter begins to take effect when there are - // more than ~48K clients querying 5x servers or at lower server - // values when there is a partition. - // - // For example, in a 100K consul cluster with 5x servers, it will - // take ~5min for all servers to rebalance their connections. If - // 99,995 agents are in the minority talking to only one server, it - // will take ~26min for all servers to rebalance. A 10K cluster in - // the same scenario will take ~2.6min to rebalance. - newRebalanceConnsPerSecPerServer = 64 + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/logging" ) // ManagerSerfCluster is an interface wrapper around Serf in order to make this @@ -278,7 +244,7 @@ func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfC m.logger = logger.Named(logging.Manager) m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle - m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) + m.rebalanceTimer = time.NewTimer(delayer.MinDelay) m.shutdownCh = shutdownCh m.rebalancer = rb m.serverName = serverName @@ -497,44 +463,28 @@ func (m *Manager) RemoveServer(s *metadata.Server) { } } -// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires. -func (m *Manager) refreshServerRebalanceTimer() time.Duration { - l := m.getServerList() - numServers := len(l.servers) - // Limit this connection's life based on the size (and health) of the - // cluster. Never rebalance a connection more frequently than - // connReuseLowWatermarkDuration, and make sure we never exceed - // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. - clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer) - connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - numLANMembers := m.clusterInfo.NumNodes() - connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) - - m.rebalanceTimer.Reset(connRebalanceTimeout) - return connRebalanceTimeout -} - // ResetRebalanceTimer resets the rebalance timer. This method exists for // testing and should not be used directly. func (m *Manager) ResetRebalanceTimer() { m.listLock.Lock() defer m.listLock.Unlock() - m.rebalanceTimer.Reset(clientRPCMinReuseDuration) + m.rebalanceTimer.Reset(delayer.MinDelay) } -// Start is used to start and manage the task of automatically shuffling and -// rebalancing the list of Consul servers. This maintenance only happens -// periodically based on the expiration of the timer. Failed servers are -// automatically cycled to the end of the list. New servers are appended to -// the list. The order of the server list must be shuffled periodically to -// distribute load across all known and available Consul servers. -func (m *Manager) Start() { +// Run periodically shuffles the list of servers to evenly distribute load. +// Run exits when shutdownCh is closed. +// +// When a server fails it is moved to the end of the list, and new servers are +// appended to the end of the list. Run ensures that load is distributed evenly +// to all servers by randomly shuffling the list. +func (m *Manager) Run() { for { select { case <-m.rebalanceTimer.C: m.rebalancer() m.RebalanceServers() - m.refreshServerRebalanceTimer() + delay := delayer.Delay(len(m.getServerList().servers), m.clusterInfo.NumNodes()) + m.rebalanceTimer.Reset(delay) case <-m.shutdownCh: m.logger.Info("shutting down") @@ -542,3 +492,59 @@ func (m *Manager) Start() { } } } + +// delayer is used to calculate the time to wait between calls to rebalance the +// servers. Rebalancing is necessary to ensure that load is balanced evenly +// across all the servers. +// +// The values used by delayer must balance perfectly distributed server load +// against the overhead of a client reconnecting to a server. Rebalancing on +// every request would cause a lot of unnecessary load as clients reconnect, +// where as never rebalancing would lead to situations where one or two servers +// handle a lot more requests than others. +// +// These values result in a minimum delay of 120-180s. Once the number of +// nodes/server exceeds 11520, the value will be determined by multiplying the +// node/server ratio by 15.625ms. +var delayer = rebalanceDelayer{ + MinDelay: 2 * time.Minute, + MaxJitter: time.Minute, + // Once the number of nodes/server exceeds 11520 this value is used to + // increase the delay between rebalances to set a limit on the number of + // reconnections per server in a given time frame. + // + // A higher value comes at the cost of increased recovery time after a + // partition. + // + // For example, in a 100,000 node consul cluster with 5 servers, it will + // take ~5min for all clients to rebalance their connections. If + // 99,995 agents are in the minority talking to only one server, it + // will take ~26min for all clients to rebalance. A 10K cluster in + // the same scenario will take ~2.6min to rebalance. + DelayPerNode: 15*time.Millisecond + 625*time.Microsecond, +} + +type rebalanceDelayer struct { + // MinDelay that may be returned by Delay + MinDelay time.Duration + // MaxJitter to add to MinDelay to ensure there is some randomness in the + // delay. + MaxJitter time.Duration + // DelayPerNode is the duration to add to each node when calculating delay. + // The value is divided by the number of servers to arrive at the final + // delay value. + DelayPerNode time.Duration +} + +func (d *rebalanceDelayer) Delay(servers int, nodes int) time.Duration { + min := d.MinDelay + time.Duration(rand.Int63n(int64(d.MaxJitter))) + if servers == 0 { + return min + } + + delay := time.Duration(float64(nodes) * float64(d.DelayPerNode) / float64(servers)) + if delay < min { + return min + } + return delay +} diff --git a/agent/router/manager_internal_test.go b/agent/router/manager_internal_test.go index e172345fac..92714a0155 100644 --- a/agent/router/manager_internal_test.go +++ b/agent/router/manager_internal_test.go @@ -264,7 +264,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { return true, nil } -func TestManager_refreshServerRebalanceTimer(t *testing.T) { +func TestRebalanceDelayer(t *testing.T) { type testCase struct { servers int nodes int @@ -319,9 +319,7 @@ func TestManager_refreshServerRebalanceTimer(t *testing.T) { } for _, tc := range testCases { - m := &Manager{clusterInfo: &fauxSerf{numNodes: tc.nodes}, rebalanceTimer: time.NewTimer(0)} - m.saveServerList(serverList{servers: make([]*metadata.Server, tc.servers)}) - delay := m.refreshServerRebalanceTimer() + delay := delayer.Delay(tc.servers, tc.nodes) if tc.expected != 0 { assert.Equal(t, tc.expected, delay, "nodes=%d, servers=%d", tc.nodes, tc.servers) diff --git a/agent/router/router.go b/agent/router/router.go index 8244745c3b..9aaae8739b 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -5,14 +5,15 @@ import ( "sort" "sync" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/serf/coordinate" + "github.com/hashicorp/serf/serf" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/serf/coordinate" - "github.com/hashicorp/serf/serf" ) // Router keeps track of a set of network areas and their associated Serf @@ -269,7 +270,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager { managers := r.managers[dc] r.managers[dc] = append(managers, manager) - go manager.Start() + go manager.Run() return manager }