diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e4a2b5399..d7981a093 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -831,14 +831,12 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } - // When behind we will try to catch up on a proporation of samples per tick. - // This works similarly to an integral accumulator in that pending samples - // is the result of the error integral. - const integralGain = 0.1 / float64(shardUpdateDuration/time.Second) - var ( + // When behind we will try to catch up on 5% of samples per second. + backlogCatchup = 0.05 * dataPending + // Calculate Time to send one sample, averaged across all sends done this tick. timePerSample = dataOutDuration / dataOutRate - desiredShards = timePerSample * (dataInRate*dataKeptRatio + integralGain*dataPending) + desiredShards = timePerSample * (dataInRate*dataKeptRatio + backlogCatchup) ) t.metrics.desiredNumShards.Set(desiredShards) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 284b36b18..56498a0f0 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -990,25 +990,25 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { dataOut: 10, dataOutDuration: 2, backlog: 20, - expectedShards: 2, // ? should be trying to catch up + expectedShards: 4, }, { name: "backlogged 90s", - prevShards: 2, + prevShards: 4, dataIn: 10, dataOut: 10, - dataOutDuration: 2, + dataOutDuration: 4, backlog: 90, - expectedShards: 4, // ?! should be trying to go much faster + expectedShards: 22, }, { name: "backlog reduced", - prevShards: 4, + prevShards: 22, dataIn: 10, dataOut: 20, dataOutDuration: 4, backlog: 10, - expectedShards: 3, // ?! shouldn't downshard from 4 to 3: less than 30% change + expectedShards: 3, }, { name: "backlog eliminated",