From c6556841422571a72de9496485f92cf2eb803842 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 30 Nov 2021 10:53:04 -0700 Subject: [PATCH] Subtract from enqueued samples/exemplars upon send Right now the values for enqueuedSamples and enqueuedExemplars is never subtracted leading to inflated values for failedSamples/failedExemplars when a hard shutdown of a shard occurs. Signed-off-by: Chris Marchbanks --- storage/remote/queue_manager.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3ed27ccec..dc2be46e9 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1182,8 +1182,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { queue.ReturnForReuse(batch) n := nPendingSamples + nPendingExemplars s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) - s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1206,8 +1204,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { n := nPendingSamples + nPendingExemplars level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, pBuf, &buf) - s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples)) - s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars)) } queue.ReturnForReuse(batch) timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1253,6 +1249,12 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s s.qm.dataOut.incr(int64(len(samples))) s.qm.dataOutDuration.incr(int64(time.Since(begin))) s.qm.lastSendTimestamp.Store(time.Now().Unix()) + // Pending samples/exemplars also should be subtracted as an error means + // they will not be retried. + s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) + s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) + s.enqueuedSamples.Sub(int64(sampleCount)) + s.enqueuedExemplars.Sub(int64(exemplarCount)) } // sendSamples to the remote storage with backoff for recoverable errors.