Scales coordinate sends to hit a fixed aggregate rate across the cluster.

pull/1331/head
James Phillips 10 years ago
parent 66a3d29743
commit ad65d953f6

@ -566,7 +566,9 @@ func (a *Agent) ResumeSync() {
// to the server. Closing the agent's shutdownChannel will cause this to exit.
func (a *Agent) sendCoordinate() {
for {
intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers()))
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
intv := rateScaledInterval(rate, min, len(a.LANMembers()))
intv = intv + randomStagger(intv)
select {

@ -375,12 +375,17 @@ type Config struct {
// DisableCoordinates controls features related to network coordinates.
DisableCoordinates bool `mapstructure:"disable_coordinates" json:"-"`
// SyncCoordinateInterval controls the interval for sending network
// coordinates to the server. Defaults to every 20s, but scales up as
// the number of nodes increases in the network, to prevent servers from
// being overwhelmed. If you update this, you may need to adjust the
// tuning of CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"`
// SyncCoordinateRateTarget controls the rate for sending network
// coordinates to the server, in updates per second. This is the max rate
// that the server supports, so we scale our interval based on the size
// of the cluster to try to achieve this in aggregate at the server.
SyncCoordinateRateTarget float64 `mapstructure:"-" json:"-"`
// SyncCoordinateIntervalMin sets the minimum interval that coordinates
// will be sent to the server. We scale the interval based on the cluster
// size, but below a certain interval it doesn't make sense send them any
// faster.
SyncCoordinateIntervalMin time.Duration `mapstructure:"-" json:"-"`
// Checks holds the provided check definitions
Checks []*CheckDefinition `mapstructure:"-" json:"-"`
@ -471,18 +476,25 @@ func DefaultConfig() *Config {
DNSConfig: DNSConfig{
MaxStale: 5 * time.Second,
},
StatsitePrefix: "consul",
SyslogFacility: "LOCAL0",
Protocol: consul.ProtocolVersionMax,
CheckUpdateInterval: 5 * time.Minute,
AEInterval: time.Minute,
DisableCoordinates: false,
SyncCoordinateInterval: 20 * time.Second,
ACLTTL: 30 * time.Second,
ACLDownPolicy: "extend-cache",
ACLDefaultPolicy: "allow",
RetryInterval: 30 * time.Second,
RetryIntervalWan: 30 * time.Second,
StatsitePrefix: "consul",
SyslogFacility: "LOCAL0",
Protocol: consul.ProtocolVersionMax,
CheckUpdateInterval: 5 * time.Minute,
AEInterval: time.Minute,
DisableCoordinates: false,
// SyncCoordinateRateTarget is set based on the rate that we want
// the server to handle as an aggregate across the entire cluster.
// If you update this, you'll need to adjust CoordinateUpdate* in
// the server-side config accordingly.
SyncCoordinateRateTarget: 100.0, // updates / second
SyncCoordinateIntervalMin: 5 * time.Second,
ACLTTL: 30 * time.Second,
ACLDownPolicy: "extend-cache",
ACLDefaultPolicy: "allow",
RetryInterval: 30 * time.Second,
RetryIntervalWan: 30 * time.Second,
}
}

@ -806,9 +806,10 @@ func TestAgent_nestedPauseResume(t *testing.T) {
func TestAgent_sendCoordinate(t *testing.T) {
conf := nextConfig()
conf.SyncCoordinateInterval = 10 * time.Millisecond
conf.SyncCoordinateRateTarget = 10.0 // updates/sec
conf.SyncCoordinateIntervalMin = 1 * time.Millisecond
conf.ConsulConfig.CoordinateUpdatePeriod = 100 * time.Millisecond
conf.ConsulConfig.CoordinateUpdateBatchSize = 15
conf.ConsulConfig.CoordinateUpdateBatchSize = 10
conf.ConsulConfig.CoordinateUpdateMaxBatches = 1
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)

@ -39,6 +39,17 @@ func aeScale(interval time.Duration, n int) time.Duration {
return time.Duration(multiplier) * interval
}
// rateScaledInterval is used to choose an interval to perform an action in order
// to target an aggregate number of actions per second across the whole cluster.
func rateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
interval := time.Duration(float64(time.Second) * float64(n) / rate)
if interval < min {
return min
}
return interval
}
// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))

@ -24,6 +24,29 @@ func TestAEScale(t *testing.T) {
}
}
func TestRateScaledInterval(t *testing.T) {
min := 1*time.Second
rate := 200.0
if v := rateScaledInterval(rate, min, 0); v != min {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 100); v != min {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 200); v != 1*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 1000); v != 5*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 5000); v != 25*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 10000); v != 50*time.Second {
t.Fatalf("Bad: %v", v)
}
}
func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {

@ -276,13 +276,11 @@ func DefaultConfig() *Config {
SessionTTLMin: 10 * time.Second,
DisableCoordinates: false,
// SyncCoordinateInterval defaults to 20 seconds, and scales up
// as the number of nodes in the cluster goes up. For 100k nodes,
// it will move up to 201 seconds, which gives an update rate of
// just under 500 per second coming in from the clients. With this
// tuning we will be doing 1 transaction per second at this load.
// These are tuned to provide a total throughput of 128 updates
// per second. If you update these, you should update the client-
// side SyncCoordinateRateTarget parameter accordingly.
CoordinateUpdatePeriod: 5 * time.Second,
CoordinateUpdateBatchSize: 512,
CoordinateUpdateBatchSize: 128,
CoordinateUpdateMaxBatches: 5,
}

@ -60,7 +60,7 @@ func (c *Coordinate) batchApplyUpdates() error {
limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches
size := len(pending)
if size > limit {
c.srv.logger.Printf("[ERR] consul.coordinate: Discarded %d coordinate updates; increase SyncCoordinateInterval", size - limit)
c.srv.logger.Printf("[WARN] consul.coordinate: Discarded %d coordinate updates", size - limit)
size = limit
}

Loading…
Cancel
Save