diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 8d426cace..a0e4b5b41 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -186,6 +186,9 @@ type StorageClient interface { // indicated by the provided StorageClient. Implements writeTo interface // used by WAL Watcher. type QueueManager struct { + // https://golang.org/pkg/sync/atomic/#pkg-note-BUG + lastSendTimestamp int64 + logger log.Logger flushDeadline time.Duration cfg config.QueueConfig @@ -479,14 +482,30 @@ func (t *QueueManager) updateShardsLoop() { for { select { case <-ticker.C: - t.calculateDesiredShards() + desiredShards := t.calculateDesiredShards() + if desiredShards == t.numShards { + continue + } + // Resharding can take some time, and we want this loop + // to stay close to shardUpdateDuration. + select { + case t.reshardChan <- desiredShards: + level.Info(t.logger).Log("msg", "Remote storage resharding", "from", t.numShards, "to", numShards) + t.numShards = desiredShards + default: + level.Info(t.logger).Log("msg", "Currently resharding, skipping.") + } case <-t.quit: return } } } -func (t *QueueManager) calculateDesiredShards() { +// calculateDesiredShards returns the number of desired shards, which will be +// the current QueueManager.numShards if resharding should not occur for reasons +// outlined in this functions implementation. It is up to the caller to reshard, or not, +// based on the return value. +func (t *QueueManager) calculateDesiredShards() int { t.samplesOut.tick() t.samplesDropped.tick() t.samplesOutDuration.tick() @@ -507,7 +526,7 @@ func (t *QueueManager) calculateDesiredShards() { ) if samplesOutRate <= 0 { - return + return t.numShards } // We use an integral accumulator, like in a PID, to help dampen @@ -523,6 +542,15 @@ func (t *QueueManager) calculateDesiredShards() { } t.integralAccumulator += samplesPendingRate * integralGain + // We shouldn't reshard if Prometheus hasn't been able to send to the + // remote endpoint successfully within some period of time. + minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() + lsts := atomic.LoadInt64(&t.lastSendTimestamp) + if lsts < minSendTimestamp { + level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp) + return t.numShards + } + var ( timePerSample = samplesOutDuration / samplesOutRate desiredShards = timePerSample * (samplesInRate + t.integralAccumulator) @@ -549,7 +577,7 @@ func (t *QueueManager) calculateDesiredShards() { level.Debug(t.logger).Log("msg", "QueueManager.updateShardsLoop", "lowerBound", lowerBound, "desiredShards", desiredShards, "upperBound", upperBound) if lowerBound <= desiredShards && desiredShards <= upperBound { - return + return t.numShards } numShards := int(math.Ceil(desiredShards)) @@ -559,19 +587,7 @@ func (t *QueueManager) calculateDesiredShards() { } else if numShards < t.cfg.MinShards { numShards = t.cfg.MinShards } - if numShards == t.numShards { - return - } - - // Resharding can take some time, and we want this loop - // to stay close to shardUpdateDuration. - select { - case t.reshardChan <- numShards: - level.Info(t.logger).Log("msg", "Remote storage resharding", "from", t.numShards, "to", numShards) - t.numShards = numShards - default: - level.Info(t.logger).Log("msg", "Currently resharding, skipping.") - } + return numShards } func (t *QueueManager) reshardLoop() { @@ -813,6 +829,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti if err == nil { s.qm.succeededSamplesTotal.Add(float64(len(samples))) s.qm.highestSentTimestampMetric.Set(float64(highest / 1000)) + atomic.StoreInt64(&s.qm.lastSendTimestamp, time.Now().Unix()) return nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 229bcacaf..eef8082e4 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -281,6 +281,51 @@ func TestReleaseNoninternedString(t *testing.T) { testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric)) } +func TestCalculateDesiredsShards(t *testing.T) { + type testcase struct { + startingShards int + samplesIn, samplesOut int64 + reshard bool + } + cases := []testcase{ + { + // Test that ensures that if we haven't successfully sent a + // sample recently the queue will not reshard. + startingShards: 10, + reshard: false, + samplesIn: 1000, + samplesOut: 10, + }, + { + startingShards: 5, + reshard: true, + samplesIn: 1000, + samplesOut: 10, + }, + } + for _, c := range cases { + client := NewTestStorageClient() + m := NewQueueManager(nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) + m.numShards = c.startingShards + m.samplesIn.incr(c.samplesIn) + m.samplesOut.incr(c.samplesOut) + m.lastSendTimestamp = time.Now().Unix() + + // Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago. + if !c.reshard { + m.lastSendTimestamp = m.lastSendTimestamp - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second) + } + m.Start() + desiredShards := m.calculateDesiredShards() + m.Stop() + if !c.reshard { + testutil.Assert(t, desiredShards == m.numShards, "expected calculateDesiredShards to not want to reshard, wants to change from %d to %d shards", m.numShards, desiredShards) + } else { + testutil.Assert(t, desiredShards != m.numShards, "expected calculateDesiredShards to want to reshard, wants to change from %d to %d shards", m.numShards, desiredShards) + } + } +} + func createTimeseries(n int) ([]record.RefSample, []record.RefSeries) { samples := make([]record.RefSample, 0, n) series := make([]record.RefSeries, 0, n)