mirror of https://github.com/hashicorp/consul
agent/router: refactor calculation of delay between rebalances.
This change attempts to make the delay logic more obvious by: * remove indirection, inline a bunch of function calls * move all the code and constants next to each other * replace the two constant values with a single value * reword the comments.pull/8973/head
parent
119c446cf2
commit
0003720f78
|
@ -12,44 +12,10 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
|
||||||
"github.com/hashicorp/consul/lib"
|
|
||||||
"github.com/hashicorp/consul/logging"
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
// clientRPCJitterFraction determines the amount of jitter added to
|
"github.com/hashicorp/consul/logging"
|
||||||
// clientRPCMinReuseDuration before a connection is expired and a new
|
|
||||||
// connection is established in order to rebalance load across consul
|
|
||||||
// servers. The cluster-wide number of connections per second from
|
|
||||||
// rebalancing is applied after this jitter to ensure the CPU impact
|
|
||||||
// is always finite. See newRebalanceConnsPerSecPerServer's comment
|
|
||||||
// for additional commentary.
|
|
||||||
//
|
|
||||||
// For example, in a 10K consul cluster with 5x servers, this default
|
|
||||||
// averages out to ~13 new connections from rebalancing per server
|
|
||||||
// per second (each connection is reused for 120s to 180s).
|
|
||||||
clientRPCJitterFraction = 2
|
|
||||||
|
|
||||||
// clientRPCMinReuseDuration controls the minimum amount of time RPC
|
|
||||||
// queries are sent over an established connection to a single server
|
|
||||||
clientRPCMinReuseDuration = 120 * time.Second
|
|
||||||
|
|
||||||
// Limit the number of new connections a server receives per second
|
|
||||||
// for connection rebalancing. This limit caps the load caused by
|
|
||||||
// continual rebalancing efforts when a cluster is in equilibrium. A
|
|
||||||
// lower value comes at the cost of increased recovery time after a
|
|
||||||
// partition. This parameter begins to take effect when there are
|
|
||||||
// more than ~48K clients querying 5x servers or at lower server
|
|
||||||
// values when there is a partition.
|
|
||||||
//
|
|
||||||
// For example, in a 100K consul cluster with 5x servers, it will
|
|
||||||
// take ~5min for all servers to rebalance their connections. If
|
|
||||||
// 99,995 agents are in the minority talking to only one server, it
|
|
||||||
// will take ~26min for all servers to rebalance. A 10K cluster in
|
|
||||||
// the same scenario will take ~2.6min to rebalance.
|
|
||||||
newRebalanceConnsPerSecPerServer = 64
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ManagerSerfCluster is an interface wrapper around Serf in order to make this
|
// ManagerSerfCluster is an interface wrapper around Serf in order to make this
|
||||||
|
@ -278,7 +244,7 @@ func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfC
|
||||||
m.logger = logger.Named(logging.Manager)
|
m.logger = logger.Named(logging.Manager)
|
||||||
m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
|
m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
|
||||||
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
|
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
|
||||||
m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
|
m.rebalanceTimer = time.NewTimer(delayer.MinDelay)
|
||||||
m.shutdownCh = shutdownCh
|
m.shutdownCh = shutdownCh
|
||||||
m.rebalancer = rb
|
m.rebalancer = rb
|
||||||
m.serverName = serverName
|
m.serverName = serverName
|
||||||
|
@ -497,44 +463,28 @@ func (m *Manager) RemoveServer(s *metadata.Server) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires.
|
|
||||||
func (m *Manager) refreshServerRebalanceTimer() time.Duration {
|
|
||||||
l := m.getServerList()
|
|
||||||
numServers := len(l.servers)
|
|
||||||
// Limit this connection's life based on the size (and health) of the
|
|
||||||
// cluster. Never rebalance a connection more frequently than
|
|
||||||
// connReuseLowWatermarkDuration, and make sure we never exceed
|
|
||||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
|
||||||
clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer)
|
|
||||||
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
|
||||||
numLANMembers := m.clusterInfo.NumNodes()
|
|
||||||
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
|
||||||
|
|
||||||
m.rebalanceTimer.Reset(connRebalanceTimeout)
|
|
||||||
return connRebalanceTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResetRebalanceTimer resets the rebalance timer. This method exists for
|
// ResetRebalanceTimer resets the rebalance timer. This method exists for
|
||||||
// testing and should not be used directly.
|
// testing and should not be used directly.
|
||||||
func (m *Manager) ResetRebalanceTimer() {
|
func (m *Manager) ResetRebalanceTimer() {
|
||||||
m.listLock.Lock()
|
m.listLock.Lock()
|
||||||
defer m.listLock.Unlock()
|
defer m.listLock.Unlock()
|
||||||
m.rebalanceTimer.Reset(clientRPCMinReuseDuration)
|
m.rebalanceTimer.Reset(delayer.MinDelay)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start is used to start and manage the task of automatically shuffling and
|
// Run periodically shuffles the list of servers to evenly distribute load.
|
||||||
// rebalancing the list of Consul servers. This maintenance only happens
|
// Run exits when shutdownCh is closed.
|
||||||
// periodically based on the expiration of the timer. Failed servers are
|
//
|
||||||
// automatically cycled to the end of the list. New servers are appended to
|
// When a server fails it is moved to the end of the list, and new servers are
|
||||||
// the list. The order of the server list must be shuffled periodically to
|
// appended to the end of the list. Run ensures that load is distributed evenly
|
||||||
// distribute load across all known and available Consul servers.
|
// to all servers by randomly shuffling the list.
|
||||||
func (m *Manager) Start() {
|
func (m *Manager) Run() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-m.rebalanceTimer.C:
|
case <-m.rebalanceTimer.C:
|
||||||
m.rebalancer()
|
m.rebalancer()
|
||||||
m.RebalanceServers()
|
m.RebalanceServers()
|
||||||
m.refreshServerRebalanceTimer()
|
delay := delayer.Delay(len(m.getServerList().servers), m.clusterInfo.NumNodes())
|
||||||
|
m.rebalanceTimer.Reset(delay)
|
||||||
|
|
||||||
case <-m.shutdownCh:
|
case <-m.shutdownCh:
|
||||||
m.logger.Info("shutting down")
|
m.logger.Info("shutting down")
|
||||||
|
@ -542,3 +492,59 @@ func (m *Manager) Start() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// delayer is used to calculate the time to wait between calls to rebalance the
|
||||||
|
// servers. Rebalancing is necessary to ensure that load is balanced evenly
|
||||||
|
// across all the servers.
|
||||||
|
//
|
||||||
|
// The values used by delayer must balance perfectly distributed server load
|
||||||
|
// against the overhead of a client reconnecting to a server. Rebalancing on
|
||||||
|
// every request would cause a lot of unnecessary load as clients reconnect,
|
||||||
|
// where as never rebalancing would lead to situations where one or two servers
|
||||||
|
// handle a lot more requests than others.
|
||||||
|
//
|
||||||
|
// These values result in a minimum delay of 120-180s. Once the number of
|
||||||
|
// nodes/server exceeds 11520, the value will be determined by multiplying the
|
||||||
|
// node/server ratio by 15.625ms.
|
||||||
|
var delayer = rebalanceDelayer{
|
||||||
|
MinDelay: 2 * time.Minute,
|
||||||
|
MaxJitter: time.Minute,
|
||||||
|
// Once the number of nodes/server exceeds 11520 this value is used to
|
||||||
|
// increase the delay between rebalances to set a limit on the number of
|
||||||
|
// reconnections per server in a given time frame.
|
||||||
|
//
|
||||||
|
// A higher value comes at the cost of increased recovery time after a
|
||||||
|
// partition.
|
||||||
|
//
|
||||||
|
// For example, in a 100,000 node consul cluster with 5 servers, it will
|
||||||
|
// take ~5min for all clients to rebalance their connections. If
|
||||||
|
// 99,995 agents are in the minority talking to only one server, it
|
||||||
|
// will take ~26min for all clients to rebalance. A 10K cluster in
|
||||||
|
// the same scenario will take ~2.6min to rebalance.
|
||||||
|
DelayPerNode: 15*time.Millisecond + 625*time.Microsecond,
|
||||||
|
}
|
||||||
|
|
||||||
|
type rebalanceDelayer struct {
|
||||||
|
// MinDelay that may be returned by Delay
|
||||||
|
MinDelay time.Duration
|
||||||
|
// MaxJitter to add to MinDelay to ensure there is some randomness in the
|
||||||
|
// delay.
|
||||||
|
MaxJitter time.Duration
|
||||||
|
// DelayPerNode is the duration to add to each node when calculating delay.
|
||||||
|
// The value is divided by the number of servers to arrive at the final
|
||||||
|
// delay value.
|
||||||
|
DelayPerNode time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *rebalanceDelayer) Delay(servers int, nodes int) time.Duration {
|
||||||
|
min := d.MinDelay + time.Duration(rand.Int63n(int64(d.MaxJitter)))
|
||||||
|
if servers == 0 {
|
||||||
|
return min
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := time.Duration(float64(nodes) * float64(d.DelayPerNode) / float64(servers))
|
||||||
|
if delay < min {
|
||||||
|
return min
|
||||||
|
}
|
||||||
|
return delay
|
||||||
|
}
|
||||||
|
|
|
@ -264,7 +264,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestManager_refreshServerRebalanceTimer(t *testing.T) {
|
func TestRebalanceDelayer(t *testing.T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
servers int
|
servers int
|
||||||
nodes int
|
nodes int
|
||||||
|
@ -319,9 +319,7 @@ func TestManager_refreshServerRebalanceTimer(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
m := &Manager{clusterInfo: &fauxSerf{numNodes: tc.nodes}, rebalanceTimer: time.NewTimer(0)}
|
delay := delayer.Delay(tc.servers, tc.nodes)
|
||||||
m.saveServerList(serverList{servers: make([]*metadata.Server, tc.servers)})
|
|
||||||
delay := m.refreshServerRebalanceTimer()
|
|
||||||
|
|
||||||
if tc.expected != 0 {
|
if tc.expected != 0 {
|
||||||
assert.Equal(t, tc.expected, delay, "nodes=%d, servers=%d", tc.nodes, tc.servers)
|
assert.Equal(t, tc.expected, delay, "nodes=%d, servers=%d", tc.nodes, tc.servers)
|
||||||
|
|
|
@ -5,14 +5,15 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/serf/coordinate"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/serf/coordinate"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Router keeps track of a set of network areas and their associated Serf
|
// Router keeps track of a set of network areas and their associated Serf
|
||||||
|
@ -269,7 +270,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
|
||||||
|
|
||||||
managers := r.managers[dc]
|
managers := r.managers[dc]
|
||||||
r.managers[dc] = append(managers, manager)
|
r.managers[dc] = append(managers, manager)
|
||||||
go manager.Start()
|
go manager.Run()
|
||||||
|
|
||||||
return manager
|
return manager
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue