diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e0b056e5b..284b36b18 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -908,6 +908,189 @@ func TestCalculateDesiredShards(t *testing.T) { require.Equal(t, int64(0), pendingSamples, "Remote write never caught up, there are still %d pending samples.", pendingSamples) } +func TestCalculateDesiredShardsDetail(t *testing.T) { + c := NewTestWriteClient() + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + + dir, err := ioutil.TempDir("", "TestCalculateDesiredShards") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + + metrics := newQueueManagerMetrics(nil, "", "") + samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + + for _, tc := range []struct { + name string + prevShards int + dataIn int64 // Quantities normalised to seconds. + dataOut int64 + dataDropped int64 + dataOutDuration float64 + backlog float64 + expectedShards int + }{ + { + name: "nothing in or out 1", + prevShards: 1, + expectedShards: 1, // Shards stays the same. + }, + { + name: "nothing in or out 10", + prevShards: 10, + expectedShards: 10, // Shards stays the same. + }, + { + name: "steady throughput", + prevShards: 1, + dataIn: 10, + dataOut: 10, + dataOutDuration: 1, + expectedShards: 1, + }, + { + name: "scale down", + prevShards: 10, + dataIn: 10, + dataOut: 10, + dataOutDuration: 5, + expectedShards: 5, + }, + { + name: "scale down constrained", + prevShards: 7, + dataIn: 10, + dataOut: 10, + dataOutDuration: 5, + expectedShards: 7, + }, + { + name: "scale up", + prevShards: 1, + dataIn: 10, + dataOut: 10, + dataOutDuration: 10, + expectedShards: 10, + }, + { + name: "scale up constrained", + prevShards: 8, + dataIn: 10, + dataOut: 10, + dataOutDuration: 10, + expectedShards: 8, + }, + { + name: "backlogged 20s", + prevShards: 2, + dataIn: 10, + dataOut: 10, + dataOutDuration: 2, + backlog: 20, + expectedShards: 2, // ? should be trying to catch up + }, + { + name: "backlogged 90s", + prevShards: 2, + dataIn: 10, + dataOut: 10, + dataOutDuration: 2, + backlog: 90, + expectedShards: 4, // ?! should be trying to go much faster + }, + { + name: "backlog reduced", + prevShards: 4, + dataIn: 10, + dataOut: 20, + dataOutDuration: 4, + backlog: 10, + expectedShards: 3, // ?! shouldn't downshard from 4 to 3: less than 30% change + }, + { + name: "backlog eliminated", + prevShards: 3, + dataIn: 10, + dataOut: 10, + dataOutDuration: 2, + backlog: 0, + expectedShards: 2, // Shard back down. + }, + { + name: "slight slowdown", + prevShards: 1, + dataIn: 10, + dataOut: 10, + dataOutDuration: 1.2, + expectedShards: 1, // no reaction - less than 30% change + }, + { + name: "bigger slowdown", + prevShards: 1, + dataIn: 10, + dataOut: 10, + dataOutDuration: 1.4, + expectedShards: 2, // now it shards up + }, + { + name: "speed up", + prevShards: 2, + dataIn: 10, + dataOut: 10, + dataOutDuration: 1.2, + backlog: 0, + expectedShards: 2, // No reaction - 1.2 is rounded up to 2. + }, + { + name: "speed up more", + prevShards: 2, + dataIn: 10, + dataOut: 10, + dataOutDuration: 0.9, + backlog: 0, + expectedShards: 1, + }, + { + name: "marginal decision A", + prevShards: 3, + dataIn: 10, + dataOut: 10, + dataOutDuration: 2.01, + backlog: 0, + expectedShards: 3, // 2.01 rounds up to 3. + }, + { + name: "marginal decision B", + prevShards: 3, + dataIn: 10, + dataOut: 10, + dataOutDuration: 1.99, + backlog: 0, + expectedShards: 2, // 1.99 rounds up to 2. + }, + } { + t.Run(tc.name, func(t *testing.T) { + m.numShards = tc.prevShards + forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second)) + samplesIn.tick() + forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second)) + forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second)) + forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration))) + m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value. + + require.Equal(t, tc.expectedShards, m.calculateDesiredShards()) + }) + } +} + +func forceEMWA(r *ewmaRate, rate int64) { + r.init = false + r.newEvents.Store(rate) +} + func TestQueueManagerMetrics(t *testing.T) { reg := prometheus.NewPedanticRegistry() metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234")