Browse Source

Use samplesPending rather than integral

The integral accumulator in the remote write sharding code is just a
second way of keeping track of the number of samples pending. Remove
integralAccumulator and use the samplesPending value we already
calculate to calculate the number of shards.

This has the added benefit of fixing a bug where the integralAccumulator
was not being initialized correctly due to not taking into account the
number of ticks being counted, causing the integralAccumulator initial
value to be off by an order of magnitude in some cases.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
pull/6511/head
Chris Marchbanks 5 years ago
parent
commit
9e24e1f9e8
  1. 25
      storage/remote/queue_manager.go
  2. 1
      storage/remote/queue_manager_test.go

25
storage/remote/queue_manager.go

@ -214,8 +214,6 @@ type QueueManager struct {
wg sync.WaitGroup
samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate
integralAccumulator float64
startedAt time.Time
highestSentTimestampMetric *maxGauge
pendingSamplesMetric prometheus.Gauge
@ -316,8 +314,6 @@ outer:
// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *QueueManager) Start() {
t.startedAt = time.Now()
// Setup the QueueManagers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's, stopping them,
// and then starting new ones in storage/remote/storage.go ApplyConfig.
@ -537,19 +533,6 @@ func (t *QueueManager) calculateDesiredShards() int {
return t.numShards
}
// We use an integral accumulator, like in a PID, to help dampen
// oscillation. The accumulator will correct for any errors not accounted
// for in the desired shard calculation by adjusting for pending samples.
const integralGain = 0.2
// Initialise the integral accumulator as the average rate of samples
// pending. This accounts for pending samples that were created while the
// WALWatcher starts up.
if t.integralAccumulator == 0 {
elapsed := time.Since(t.startedAt) / time.Second
t.integralAccumulator = integralGain * samplesPending / float64(elapsed)
}
t.integralAccumulator += samplesPendingRate * integralGain
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
@ -559,9 +542,14 @@ func (t *QueueManager) calculateDesiredShards() int {
return t.numShards
}
// When behind we will try to catch up on a proporation of samples per tick.
// This works similarly to an integral accumulator in that pending samples
// is the result of the error integral.
const integralGain = 0.1 / float64(shardUpdateDuration/time.Second)
var (
timePerSample = samplesOutDuration / samplesOutRate
desiredShards = timePerSample * (samplesInRate + t.integralAccumulator)
desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending)
)
t.desiredNumShards.Set(desiredShards)
level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards",
@ -575,7 +563,6 @@ func (t *QueueManager) calculateDesiredShards() int {
"desiredShards", desiredShards,
"highestSent", highestSent,
"highestRecv", highestRecv,
"integralAccumulator", t.integralAccumulator,
)
// Changes in the number of shards must be greater than shardToleranceFraction.

1
storage/remote/queue_manager_test.go

@ -624,7 +624,6 @@ func TestCalculateDesiredShards(t *testing.T) {
}
ts := time.Duration(0)
m.startedAt = startedAt
for ; ts < 120*time.Second; ts += shardUpdateDuration {
addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts)
m.numShards = m.calculateDesiredShards()

Loading…
Cancel
Save