|
|
|
@ -12,51 +12,14 @@ import (
|
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/consul/structs" |
|
|
|
|
"github.com/hashicorp/consul/lib" |
|
|
|
|
"github.com/hashicorp/serf/coordinate" |
|
|
|
|
"github.com/hashicorp/serf/serf" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
// clientRPCMinReuseDuration controls the minimum amount of time RPC
|
|
|
|
|
// queries are sent over an established connection to a single server
|
|
|
|
|
clientRPCMinReuseDuration = 120 * time.Second |
|
|
|
|
|
|
|
|
|
// clientRPCJitterFraction determines the amount of jitter added to
|
|
|
|
|
// clientRPCMinReuseDuration before a connection is expired and a new
|
|
|
|
|
// connection is established in order to rebalance load across consul
|
|
|
|
|
// servers. The cluster-wide number of connections per second from
|
|
|
|
|
// rebalancing is applied after this jitter to ensure the CPU impact
|
|
|
|
|
// is always finite. See newRebalanceConnsPerSecPerServer's comment
|
|
|
|
|
// for additional commentary.
|
|
|
|
|
//
|
|
|
|
|
// For example, in a 10K consul cluster with 5x servers, this default
|
|
|
|
|
// averages out to ~13 new connections from rebalancing per server
|
|
|
|
|
// per second (each connection is reused for 120s to 180s).
|
|
|
|
|
clientRPCJitterFraction = 2 |
|
|
|
|
|
|
|
|
|
// Limit the number of new connections a server receives per second
|
|
|
|
|
// for connection rebalancing. This limit caps the load caused by
|
|
|
|
|
// continual rebalancing efforts when a cluster is in equilibrium. A
|
|
|
|
|
// lower value comes at the cost of increased recovery time after a
|
|
|
|
|
// partition. This parameter begins to take effect when there are
|
|
|
|
|
// more than ~48K clients querying 5x servers or at lower server
|
|
|
|
|
// values when there is a partition.
|
|
|
|
|
//
|
|
|
|
|
// For example, in a 100K consul cluster with 5x servers, it will
|
|
|
|
|
// take ~5min for all servers to rebalance their connections. If
|
|
|
|
|
// 99,995 agents are in the minority talking to only one server, it
|
|
|
|
|
// will take ~26min for all servers to rebalance. A 10K cluster in
|
|
|
|
|
// the same scenario will take ~2.6min to rebalance.
|
|
|
|
|
newRebalanceConnsPerSecPerServer = 64 |
|
|
|
|
|
|
|
|
|
// clientRPCConnMaxIdle controls how long we keep an idle connection
|
|
|
|
|
// open to a server. 127s was chosen as the first prime above 120s
|
|
|
|
|
// (arbitrarily chose to use a prime) with the intent of reusing
|
|
|
|
|
// connections who are used by once-a-minute cron(8) jobs *and* who
|
|
|
|
|
// use a 60s jitter window (e.g. in vixie cron job execution can
|
|
|
|
|
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
|
|
|
|
|
clientRPCConnMaxIdle = 127 * time.Second |
|
|
|
|
// clientRPCCache controls how long we keep an idle connection
|
|
|
|
|
// open to a server
|
|
|
|
|
clientRPCCache = 30 * time.Second |
|
|
|
|
|
|
|
|
|
// clientMaxStreams controls how many idle streams we keep
|
|
|
|
|
// open to a server
|
|
|
|
@ -93,10 +56,6 @@ type Client struct {
|
|
|
|
|
lastServer *serverParts |
|
|
|
|
lastRPCTime time.Time |
|
|
|
|
|
|
|
|
|
// connRebalanceTime is the time at which we should change the server
|
|
|
|
|
// we query for RPC requests.
|
|
|
|
|
connRebalanceTime time.Time |
|
|
|
|
|
|
|
|
|
// Logger uses the provided LogOutput
|
|
|
|
|
logger *log.Logger |
|
|
|
|
|
|
|
|
@ -144,7 +103,7 @@ func NewClient(config *Config) (*Client, error) {
|
|
|
|
|
// Create server
|
|
|
|
|
c := &Client{ |
|
|
|
|
config: config, |
|
|
|
|
connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap), |
|
|
|
|
connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), |
|
|
|
|
eventCh: make(chan serf.Event, 256), |
|
|
|
|
logger: logger, |
|
|
|
|
shutdownCh: make(chan struct{}), |
|
|
|
@ -369,64 +328,37 @@ func (c *Client) localEvent(event serf.UserEvent) {
|
|
|
|
|
|
|
|
|
|
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
|
|
|
|
func (c *Client) RPC(method string, args interface{}, reply interface{}) error { |
|
|
|
|
// Check to make sure we haven't spent too much time querying a
|
|
|
|
|
// single server
|
|
|
|
|
now := time.Now() |
|
|
|
|
if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) { |
|
|
|
|
c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.lastServer.Addr) |
|
|
|
|
c.lastServer = nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Allocate these vars on the stack before the goto
|
|
|
|
|
var numConsulServers int |
|
|
|
|
var clusterWideRebalanceConnsPerSec float64 |
|
|
|
|
var connReuseLowWaterMark time.Duration |
|
|
|
|
var numLANMembers int |
|
|
|
|
|
|
|
|
|
// Check the last RPC time, continue to reuse cached connection for
|
|
|
|
|
// up to clientRPCMinReuseDuration unless exceeded
|
|
|
|
|
// clientRPCConnMaxIdle
|
|
|
|
|
lastRPCTime := now.Sub(c.lastRPCTime) |
|
|
|
|
// Check the last rpc time
|
|
|
|
|
var server *serverParts |
|
|
|
|
if c.lastServer != nil && lastRPCTime < clientRPCConnMaxIdle { |
|
|
|
|
if time.Now().Sub(c.lastRPCTime) < clientRPCCache { |
|
|
|
|
server = c.lastServer |
|
|
|
|
goto TRY_RPC |
|
|
|
|
if server != nil { |
|
|
|
|
goto TRY_RPC |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Bail if we can't find any servers
|
|
|
|
|
c.consulLock.RLock() |
|
|
|
|
numConsulServers = len(c.consuls) |
|
|
|
|
if numConsulServers == 0 { |
|
|
|
|
if len(c.consuls) == 0 { |
|
|
|
|
c.consulLock.RUnlock() |
|
|
|
|
return structs.ErrNoServers |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Select a random addr
|
|
|
|
|
server = c.consuls[rand.Int31n(int32(numConsulServers))] |
|
|
|
|
server = c.consuls[rand.Int31()%int32(len(c.consuls))] |
|
|
|
|
c.consulLock.RUnlock() |
|
|
|
|
|
|
|
|
|
// Limit this connection's life based on the size (and health) of the
|
|
|
|
|
// cluster. Never rebalance a connection more frequently than
|
|
|
|
|
// connReuseLowWaterMark, and make sure we never exceed
|
|
|
|
|
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
|
|
|
|
clusterWideRebalanceConnsPerSec = float64(numConsulServers * newRebalanceConnsPerSecPerServer) |
|
|
|
|
connReuseLowWaterMark = clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) |
|
|
|
|
numLANMembers = len(c.LANMembers()) |
|
|
|
|
c.connRebalanceTime = now.Add(lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWaterMark, numLANMembers)) |
|
|
|
|
c.logger.Printf("[DEBUG] consul: connection to server %s will expire at %v", server.Addr, c.connRebalanceTime) |
|
|
|
|
|
|
|
|
|
// Forward to remote Consul
|
|
|
|
|
TRY_RPC: |
|
|
|
|
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { |
|
|
|
|
c.connRebalanceTime = time.Time{} |
|
|
|
|
c.lastRPCTime = time.Time{} |
|
|
|
|
c.lastServer = nil |
|
|
|
|
c.lastRPCTime = time.Time{} |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Cache the last server
|
|
|
|
|
c.lastServer = server |
|
|
|
|
c.lastRPCTime = now |
|
|
|
|
c.lastRPCTime = time.Now() |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|