Review feedback

pull/2494/head
Tom Wilkie 8 years ago
parent 77cce900b8
commit 75bb0f3253

@ -33,24 +33,27 @@ const (
subsystem = "remote_storage" subsystem = "remote_storage"
queue = "queue" queue = "queue"
// With a maximum of 500 shards, assuming an average of 100ms remote write // With a maximum of 1000 shards, assuming an average of 100ms remote write
// time and 100 samples per batch, we will be able to push 500k samples/s. // time and 100 samples per batch, we will be able to push 1M samples/s.
defaultMaxShards = 500 defaultMaxShards = 1000
defaultMaxSamplesPerSend = 100 defaultMaxSamplesPerSend = 100
// defaultQueueCapacity is per shard - at 500 shards, this will buffer // defaultQueueCapacity is per shard - at 1000 shards, this will buffer
// 50m samples. It is configured to buffer 1024 batches, which at 100ms // 100M samples. It is configured to buffer 1000 batches, which at 100ms
// per batch is 1:40mins. // per batch is 1:40mins.
defaultQueueCapacity = defaultMaxSamplesPerSend * 1024 defaultQueueCapacity = defaultMaxSamplesPerSend * 1000
defaultBatchSendDeadline = 5 * time.Second defaultBatchSendDeadline = 5 * time.Second
// We track samples in/out and how long pushes take using an Exponentially // We track samples in/out and how long pushes take using an Exponentially
// Weighted Moving Average. // Weighted Moving Average.
ewmaWeight = 0.2 ewmaWeight = 0.2
shardUpdateDuration = 10 * time.Second shardUpdateDuration = 10 * time.Second
shardToleranceFraction = 0.3 // allow 30% too many shards before scaling down
logRateLimit = 0.1 // Limit to 1 log event every 10s // Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
// Limit to 1 log event every 10s
logRateLimit = 0.1
logBurst = 10 logBurst = 10
) )
@ -114,7 +117,7 @@ var (
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "shards_total", Name: "shards",
Help: "The number of shards used for parallel sending to the remote storage.", Help: "The number of shards used for parallel sending to the remote storage.",
}, },
[]string{queue}, []string{queue},
@ -196,14 +199,10 @@ func NewQueueManager(cfg QueueManagerConfig) *QueueManager {
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
} }
t.shards = t.newShards(1) t.shards = t.newShards(t.numShards)
numShards.WithLabelValues(t.queueName).Set(float64(1)) numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))
t.wg.Add(2)
go t.updateShardsLoop()
go t.reshardLoop()
return t return t
} }
@ -253,6 +252,10 @@ func (*QueueManager) NeedsThrottling() bool {
// Start the queue manager sending samples to the remote storage. // Start the queue manager sending samples to the remote storage.
// Does not block. // Does not block.
func (t *QueueManager) Start() { func (t *QueueManager) Start() {
t.wg.Add(2)
go t.updateShardsLoop()
go t.reshardLoop()
t.shardsMtx.Lock() t.shardsMtx.Lock()
defer t.shardsMtx.Unlock() defer t.shardsMtx.Unlock()
t.shards.start() t.shards.start()
@ -264,6 +267,7 @@ func (t *QueueManager) Stop() {
log.Infof("Stopping remote storage...") log.Infof("Stopping remote storage...")
close(t.quit) close(t.quit)
t.wg.Wait() t.wg.Wait()
t.shardsMtx.Lock() t.shardsMtx.Lock()
defer t.shardsMtx.Unlock() defer t.shardsMtx.Unlock()
t.shards.stop() t.shards.stop()
@ -277,14 +281,14 @@ func (t *QueueManager) updateShardsLoop() {
for { for {
select { select {
case <-ticker: case <-ticker:
t.caclulateDesiredShards() t.calculateDesiredShards()
case <-t.quit: case <-t.quit:
return return
} }
} }
} }
func (t *QueueManager) caclulateDesiredShards() { func (t *QueueManager) calculateDesiredShards() {
t.samplesIn.tick() t.samplesIn.tick()
t.samplesOut.tick() t.samplesOut.tick()
t.samplesOutDuration.tick() t.samplesOutDuration.tick()
@ -292,7 +296,7 @@ func (t *QueueManager) caclulateDesiredShards() {
// We use the number of incoming samples as a prediction of how much work we // We use the number of incoming samples as a prediction of how much work we
// will need to do next iteration. We add to this any pending samples // will need to do next iteration. We add to this any pending samples
// (received - send) so we can catch up with any backlog. We use the average // (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how how many shards we need. // outgoing batch latency to work out how many shards we need.
var ( var (
samplesIn = t.samplesIn.rate() samplesIn = t.samplesIn.rate()
samplesOut = t.samplesOut.rate() samplesOut = t.samplesOut.rate()
@ -314,7 +318,7 @@ func (t *QueueManager) caclulateDesiredShards() {
log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f", log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f",
samplesIn, samplesOut, samplesPending, desiredShards) samplesIn, samplesOut, samplesPending, desiredShards)
// Changes in the number of shards must be greated than shardToleranceFraction. // Changes in the number of shards must be greater than shardToleranceFraction.
var ( var (
lowerBound = float64(t.numShards) * (1. - shardToleranceFraction) lowerBound = float64(t.numShards) * (1. - shardToleranceFraction)
upperBound = float64(t.numShards) * (1. + shardToleranceFraction) upperBound = float64(t.numShards) * (1. + shardToleranceFraction)

Loading…
Cancel
Save