diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index dc2be46e9..361113fc6 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -507,7 +507,6 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. func (t *QueueManager) Append(samples []record.RefSample) bool { - var appendSample prompb.Sample outer: for _, s := range samples { t.seriesMtx.Lock() @@ -530,9 +529,12 @@ outer: return false default: } - appendSample.Value = s.V - appendSample.Timestamp = s.T - if t.shards.enqueue(s.Ref, writeSample{lbls, appendSample}) { + if t.shards.enqueue(s.Ref, sampleOrExemplar{ + seriesLabels: lbls, + timestamp: s.T, + value: s.V, + isSample: true, + }) { continue outer } @@ -552,7 +554,6 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool { return true } - var appendExemplar prompb.Exemplar outer: for _, e := range exemplars { t.seriesMtx.Lock() @@ -576,10 +577,12 @@ outer: return false default: } - appendExemplar.Labels = labelsToLabelsProto(e.Labels, nil) - appendExemplar.Timestamp = e.T - appendExemplar.Value = e.V - if t.shards.enqueue(e.Ref, writeExemplar{lbls, appendExemplar}) { + if t.shards.enqueue(e.Ref, sampleOrExemplar{ + seriesLabels: lbls, + timestamp: e.T, + value: e.V, + exemplarLabels: e.Labels, + }) { continue outer } @@ -901,16 +904,6 @@ func (t *QueueManager) newShards() *shards { return s } -type writeSample struct { - seriesLabels labels.Labels - sample prompb.Sample -} - -type writeExemplar struct { - seriesLabels labels.Labels - exemplar prompb.Exemplar -} - type shards struct { mtx sync.RWMutex // With the WAL, this is never actually contended. @@ -999,7 +992,7 @@ func (s *shards) stop() { // enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding, // will return false; in this case, you should back off and retry. -func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool { +func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { s.mtx.RLock() defer s.mtx.RUnlock() @@ -1018,43 +1011,49 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool { if !appended { return false } - switch data.(type) { - case writeSample: + switch data.isSample { + case true: s.qm.metrics.pendingSamples.Inc() s.enqueuedSamples.Inc() - case writeExemplar: + case false: s.qm.metrics.pendingExemplars.Inc() s.enqueuedExemplars.Inc() - default: - level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue") } return true } } type queue struct { - batch []interface{} - batchQueue chan []interface{} + batch []sampleOrExemplar + batchQueue chan []sampleOrExemplar // Since we know there are a limited number of batches out, using a stack // is easy and safe so a sync.Pool is not necessary. - batchPool [][]interface{} + batchPool [][]sampleOrExemplar // This mutex covers adding and removing batches from the batchPool. poolMux sync.Mutex } +type sampleOrExemplar struct { + seriesLabels labels.Labels + value float64 + timestamp int64 + exemplarLabels labels.Labels + isSample bool +} + func newQueue(batchSize, capacity int) *queue { batches := capacity / batchSize return &queue{ - batch: make([]interface{}, 0, batchSize), - batchQueue: make(chan []interface{}, batches), + batch: make([]sampleOrExemplar, 0, batchSize), + batchQueue: make(chan []sampleOrExemplar, batches), // batchPool should have capacity for everything in the channel + 1 for // the batch being processed. - batchPool: make([][]interface{}, 0, batches+1), + batchPool: make([][]sampleOrExemplar, 0, batches+1), } } -func (q *queue) Append(datum interface{}, stop <-chan struct{}) bool { +func (q *queue) Append(datum sampleOrExemplar, stop <-chan struct{}) bool { q.batch = append(q.batch, datum) if len(q.batch) == cap(q.batch) { select { @@ -1070,20 +1069,20 @@ func (q *queue) Append(datum interface{}, stop <-chan struct{}) bool { return true } -func (q *queue) Chan() <-chan []interface{} { +func (q *queue) Chan() <-chan []sampleOrExemplar { return q.batchQueue } // Batch returns the current batch and allocates a new batch. Must not be // called concurrently with Append. -func (q *queue) Batch() []interface{} { +func (q *queue) Batch() []sampleOrExemplar { batch := q.batch q.batch = q.newBatch(cap(batch)) return batch } // ReturnForReuse adds the batch buffer back to the internal pool. -func (q *queue) ReturnForReuse(batch []interface{}) { +func (q *queue) ReturnForReuse(batch []sampleOrExemplar) { q.poolMux.Lock() defer q.poolMux.Unlock() if len(q.batchPool) < cap(q.batchPool) { @@ -1106,7 +1105,7 @@ func (q *queue) FlushAndShutdown(done <-chan struct{}) { close(q.batchQueue) } -func (q *queue) newBatch(capacity int) []interface{} { +func (q *queue) newBatch(capacity int) []sampleOrExemplar { q.poolMux.Lock() defer q.poolMux.Unlock() batches := len(q.batchPool) @@ -1115,7 +1114,7 @@ func (q *queue) newBatch(capacity int) []interface{} { q.batchPool = q.batchPool[:batches-1] return batch } - return make([]interface{}, 0, capacity) + return make([]sampleOrExemplar, 0, capacity) } func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { @@ -1192,7 +1191,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { // traffic instances. s.mtx.Lock() // First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock. - var batch []interface{} + var batch []sampleOrExemplar select { case batch = <-batchQueue: default: @@ -1211,9 +1210,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } -func (s *shards) populateTimeSeries(batch []interface{}, pendingData []prompb.TimeSeries) (int, int) { +func (s *shards) populateTimeSeries(batch []sampleOrExemplar, pendingData []prompb.TimeSeries) (int, int) { var nPendingSamples, nPendingExemplars int - for nPending, sample := range batch { + for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] if s.qm.sendExemplars { pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] @@ -1221,14 +1220,21 @@ func (s *shards) populateTimeSeries(batch []interface{}, pendingData []prompb.Ti // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. - switch d := sample.(type) { - case writeSample: + switch d.isSample { + case true: pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample) + pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ + Value: d.value, + Timestamp: d.timestamp, + }) nPendingSamples++ - case writeExemplar: + case false: pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar) + pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{ + Labels: labelsToLabelsProto(d.exemplarLabels, nil), + Value: d.value, + Timestamp: d.timestamp, + }) nPendingExemplars++ } }