diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index ba1212886..16bc0da14 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -544,7 +544,7 @@ func (t *QueueManager) updateShardsLoop() { select { case <-ticker.C: desiredShards := t.calculateDesiredShards() - if desiredShards == t.numShards { + if !t.shouldReshard(desiredShards) { continue } // Resharding can take some time, and we want this loop @@ -562,6 +562,22 @@ func (t *QueueManager) updateShardsLoop() { } } +// shouldReshard returns if resharding should occur +func (t *QueueManager) shouldReshard(desiredShards int) bool { + if desiredShards == t.numShards { + return false + } + // We shouldn't reshard if Prometheus hasn't been able to send to the + // remote endpoint successfully within some period of time. + minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() + lsts := atomic.LoadInt64(&t.lastSendTimestamp) + if lsts < minSendTimestamp { + level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp) + return false + } + return true +} + // calculateDesiredShards returns the number of desired shards, which will be // the current QueueManager.numShards if resharding should not occur for reasons // outlined in this functions implementation. It is up to the caller to reshard, or not, @@ -591,15 +607,6 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } - // We shouldn't reshard if Prometheus hasn't been able to send to the - // remote endpoint successfully within some period of time. - minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() - lsts := atomic.LoadInt64(&t.lastSendTimestamp) - if lsts < minSendTimestamp { - level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp) - return t.numShards - } - // When behind we will try to catch up on a proporation of samples per tick. // This works similarly to an integral accumulator in that pending samples // is the result of the error integral. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 74a7fddbc..78c7fa3be 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -292,26 +292,27 @@ func TestReleaseNoninternedString(t *testing.T) { testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric)) } -func TestCalculateDesiredsShards(t *testing.T) { +func TestShouldReshard(t *testing.T) { type testcase struct { - startingShards int - samplesIn, samplesOut int64 - reshard bool + startingShards int + samplesIn, samplesOut, lastSendTimestamp int64 + expectedToReshard bool } cases := []testcase{ { - // Test that ensures that if we haven't successfully sent a - // sample recently the queue will not reshard. - startingShards: 10, - reshard: false, - samplesIn: 1000, - samplesOut: 10, + // Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago. + startingShards: 10, + samplesIn: 1000, + samplesOut: 10, + lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second), + expectedToReshard: false, }, { - startingShards: 5, - reshard: true, - samplesIn: 1000, - samplesOut: 10, + startingShards: 5, + samplesIn: 1000, + samplesOut: 10, + lastSendTimestamp: time.Now().Unix(), + expectedToReshard: true, }, } for _, c := range cases { @@ -321,20 +322,16 @@ func TestCalculateDesiredsShards(t *testing.T) { m.numShards = c.startingShards m.samplesIn.incr(c.samplesIn) m.samplesOut.incr(c.samplesOut) - m.lastSendTimestamp = time.Now().Unix() + m.lastSendTimestamp = c.lastSendTimestamp - // Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago. - if !c.reshard { - m.lastSendTimestamp = m.lastSendTimestamp - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second) - } m.Start() + desiredShards := m.calculateDesiredShards() + shouldReshard := m.shouldReshard(desiredShards) + m.Stop() - if !c.reshard { - testutil.Assert(t, desiredShards == m.numShards, "expected calculateDesiredShards to not want to reshard, wants to change from %d to %d shards", m.numShards, desiredShards) - } else { - testutil.Assert(t, desiredShards != m.numShards, "expected calculateDesiredShards to want to reshard, wants to change from %d to %d shards", m.numShards, desiredShards) - } + + testutil.Equals(t, c.expectedToReshard, shouldReshard) } }