@ -1182,8 +1182,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
queue . ReturnForReuse ( batch )
n := nPendingSamples + nPendingExemplars
s . sendSamples ( ctx , pendingData [ : n ] , nPendingSamples , nPendingExemplars , pBuf , & buf )
s . qm . metrics . pendingSamples . Sub ( float64 ( nPendingSamples ) )
s . qm . metrics . pendingExemplars . Sub ( float64 ( nPendingExemplars ) )
stop ( )
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
@ -1206,8 +1204,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
n := nPendingSamples + nPendingExemplars
level . Debug ( s . qm . logger ) . Log ( "msg" , "runShard timer ticked, sending buffered data" , "samples" , nPendingSamples , "exemplars" , nPendingExemplars , "shard" , shardNum )
s . sendSamples ( ctx , pendingData [ : n ] , nPendingSamples , nPendingExemplars , pBuf , & buf )
s . qm . metrics . pendingSamples . Sub ( float64 ( nPendingSamples ) )
s . qm . metrics . pendingExemplars . Sub ( float64 ( nPendingExemplars ) )
}
queue . ReturnForReuse ( batch )
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
@ -1253,6 +1249,12 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
s . qm . dataOut . incr ( int64 ( len ( samples ) ) )
s . qm . dataOutDuration . incr ( int64 ( time . Since ( begin ) ) )
s . qm . lastSendTimestamp . Store ( time . Now ( ) . Unix ( ) )
// Pending samples/exemplars also should be subtracted as an error means
// they will not be retried.
s . qm . metrics . pendingSamples . Sub ( float64 ( sampleCount ) )
s . qm . metrics . pendingExemplars . Sub ( float64 ( exemplarCount ) )
s . enqueuedSamples . Sub ( int64 ( sampleCount ) )
s . enqueuedExemplars . Sub ( int64 ( exemplarCount ) )
}
// sendSamples to the remote storage with backoff for recoverable errors.