From c478d6477a6360f94caf34213376fe9199a470a7 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 30 Nov 2021 12:21:53 +0000 Subject: [PATCH 1/3] remote-write: benchmark just sending, on 20 shards Previously BenchmarkSampleDelivery spent a lot of effort checking each sample had arrived, so was largely showing the performance of test-only code. Increase the number of shards to be more realistic for a large workload. Signed-off-by: Bryan Boreham --- storage/remote/queue_manager_test.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 65d59b028..1f12c39b1 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -682,7 +682,15 @@ func (c *TestBlockingWriteClient) Endpoint() string { return "http://test-remote-blocking.com/1234" } -func BenchmarkSampleDelivery(b *testing.B) { +// For benchmarking the send and not the receive side. +type NopWriteClient struct{} + +func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} } +func (c *NopWriteClient) Store(_ context.Context, req []byte) error { return nil } +func (c *NopWriteClient) Name() string { return "nopwriteclient" } +func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } + +func BenchmarkSampleSend(b *testing.B) { // Send one sample per series, which is the typical remote_write case const numSamples = 1 const numSeries = 10000 @@ -707,12 +715,13 @@ func BenchmarkSampleDelivery(b *testing.B) { } samples, series := createTimeseries(numSamples, numSeries, extraLabels...) - c := NewTestWriteClient() + c := NewNopWriteClient() cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) - cfg.MaxShards = 1 + cfg.MinShards = 20 + cfg.MaxShards = 20 dir := b.TempDir() @@ -726,11 +735,9 @@ func BenchmarkSampleDelivery(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c.expectDataCount(len(samples)) - go m.Append(samples) + m.Append(samples) m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does m.SeriesReset(i + 1) - c.waitForExpectedDataCount() } // Do not include shutdown b.StopTimer() From 50878ebe5eb09a929d6141e2c6661d2711529c9c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 3 Dec 2021 14:30:42 +0000 Subject: [PATCH 2/3] remote-write: buffer struct instead of interface This reduces the amount of individual objects allocated, allowing sends to run a bit faster. Signed-off-by: Bryan Boreham --- storage/remote/queue_manager.go | 96 +++++++++++++++++---------------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index dc2be46e9..361113fc6 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -507,7 +507,6 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. func (t *QueueManager) Append(samples []record.RefSample) bool { - var appendSample prompb.Sample outer: for _, s := range samples { t.seriesMtx.Lock() @@ -530,9 +529,12 @@ outer: return false default: } - appendSample.Value = s.V - appendSample.Timestamp = s.T - if t.shards.enqueue(s.Ref, writeSample{lbls, appendSample}) { + if t.shards.enqueue(s.Ref, sampleOrExemplar{ + seriesLabels: lbls, + timestamp: s.T, + value: s.V, + isSample: true, + }) { continue outer } @@ -552,7 +554,6 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool { return true } - var appendExemplar prompb.Exemplar outer: for _, e := range exemplars { t.seriesMtx.Lock() @@ -576,10 +577,12 @@ outer: return false default: } - appendExemplar.Labels = labelsToLabelsProto(e.Labels, nil) - appendExemplar.Timestamp = e.T - appendExemplar.Value = e.V - if t.shards.enqueue(e.Ref, writeExemplar{lbls, appendExemplar}) { + if t.shards.enqueue(e.Ref, sampleOrExemplar{ + seriesLabels: lbls, + timestamp: e.T, + value: e.V, + exemplarLabels: e.Labels, + }) { continue outer } @@ -901,16 +904,6 @@ func (t *QueueManager) newShards() *shards { return s } -type writeSample struct { - seriesLabels labels.Labels - sample prompb.Sample -} - -type writeExemplar struct { - seriesLabels labels.Labels - exemplar prompb.Exemplar -} - type shards struct { mtx sync.RWMutex // With the WAL, this is never actually contended. @@ -999,7 +992,7 @@ func (s *shards) stop() { // enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding, // will return false; in this case, you should back off and retry. -func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool { +func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { s.mtx.RLock() defer s.mtx.RUnlock() @@ -1018,43 +1011,49 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool { if !appended { return false } - switch data.(type) { - case writeSample: + switch data.isSample { + case true: s.qm.metrics.pendingSamples.Inc() s.enqueuedSamples.Inc() - case writeExemplar: + case false: s.qm.metrics.pendingExemplars.Inc() s.enqueuedExemplars.Inc() - default: - level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue") } return true } } type queue struct { - batch []interface{} - batchQueue chan []interface{} + batch []sampleOrExemplar + batchQueue chan []sampleOrExemplar // Since we know there are a limited number of batches out, using a stack // is easy and safe so a sync.Pool is not necessary. - batchPool [][]interface{} + batchPool [][]sampleOrExemplar // This mutex covers adding and removing batches from the batchPool. poolMux sync.Mutex } +type sampleOrExemplar struct { + seriesLabels labels.Labels + value float64 + timestamp int64 + exemplarLabels labels.Labels + isSample bool +} + func newQueue(batchSize, capacity int) *queue { batches := capacity / batchSize return &queue{ - batch: make([]interface{}, 0, batchSize), - batchQueue: make(chan []interface{}, batches), + batch: make([]sampleOrExemplar, 0, batchSize), + batchQueue: make(chan []sampleOrExemplar, batches), // batchPool should have capacity for everything in the channel + 1 for // the batch being processed. - batchPool: make([][]interface{}, 0, batches+1), + batchPool: make([][]sampleOrExemplar, 0, batches+1), } } -func (q *queue) Append(datum interface{}, stop <-chan struct{}) bool { +func (q *queue) Append(datum sampleOrExemplar, stop <-chan struct{}) bool { q.batch = append(q.batch, datum) if len(q.batch) == cap(q.batch) { select { @@ -1070,20 +1069,20 @@ func (q *queue) Append(datum interface{}, stop <-chan struct{}) bool { return true } -func (q *queue) Chan() <-chan []interface{} { +func (q *queue) Chan() <-chan []sampleOrExemplar { return q.batchQueue } // Batch returns the current batch and allocates a new batch. Must not be // called concurrently with Append. -func (q *queue) Batch() []interface{} { +func (q *queue) Batch() []sampleOrExemplar { batch := q.batch q.batch = q.newBatch(cap(batch)) return batch } // ReturnForReuse adds the batch buffer back to the internal pool. -func (q *queue) ReturnForReuse(batch []interface{}) { +func (q *queue) ReturnForReuse(batch []sampleOrExemplar) { q.poolMux.Lock() defer q.poolMux.Unlock() if len(q.batchPool) < cap(q.batchPool) { @@ -1106,7 +1105,7 @@ func (q *queue) FlushAndShutdown(done <-chan struct{}) { close(q.batchQueue) } -func (q *queue) newBatch(capacity int) []interface{} { +func (q *queue) newBatch(capacity int) []sampleOrExemplar { q.poolMux.Lock() defer q.poolMux.Unlock() batches := len(q.batchPool) @@ -1115,7 +1114,7 @@ func (q *queue) newBatch(capacity int) []interface{} { q.batchPool = q.batchPool[:batches-1] return batch } - return make([]interface{}, 0, capacity) + return make([]sampleOrExemplar, 0, capacity) } func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { @@ -1192,7 +1191,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { // traffic instances. s.mtx.Lock() // First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock. - var batch []interface{} + var batch []sampleOrExemplar select { case batch = <-batchQueue: default: @@ -1211,9 +1210,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } } -func (s *shards) populateTimeSeries(batch []interface{}, pendingData []prompb.TimeSeries) (int, int) { +func (s *shards) populateTimeSeries(batch []sampleOrExemplar, pendingData []prompb.TimeSeries) (int, int) { var nPendingSamples, nPendingExemplars int - for nPending, sample := range batch { + for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] if s.qm.sendExemplars { pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] @@ -1221,14 +1220,21 @@ func (s *shards) populateTimeSeries(batch []interface{}, pendingData []prompb.Ti // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. - switch d := sample.(type) { - case writeSample: + switch d.isSample { + case true: pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample) + pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ + Value: d.value, + Timestamp: d.timestamp, + }) nPendingSamples++ - case writeExemplar: + case false: pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) - pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar) + pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{ + Labels: labelsToLabelsProto(d.exemplarLabels, nil), + Value: d.value, + Timestamp: d.timestamp, + }) nPendingExemplars++ } } From bd6436605d4bd886cb18cbfba5bb8e75da1264ec Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 9 Dec 2021 14:40:44 +0000 Subject: [PATCH 3/3] Review feedback Signed-off-by: Bryan Boreham --- storage/remote/queue_manager.go | 10 ++++------ storage/remote/queue_manager_test.go | 16 ---------------- 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 361113fc6..49de2c8a3 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1011,11 +1011,10 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { if !appended { return false } - switch data.isSample { - case true: + if data.isSample { s.qm.metrics.pendingSamples.Inc() s.enqueuedSamples.Inc() - case false: + } else { s.qm.metrics.pendingExemplars.Inc() s.enqueuedExemplars.Inc() } @@ -1220,15 +1219,14 @@ func (s *shards) populateTimeSeries(batch []sampleOrExemplar, pendingData []prom // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. - switch d.isSample { - case true: + if d.isSample { pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ Value: d.value, Timestamp: d.timestamp, }) nPendingSamples++ - case false: + } else { pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{ Labels: labelsToLabelsProto(d.exemplarLabels, nil), diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 1f12c39b1..f55c4b472 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -578,22 +578,6 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { } } -func (c *TestWriteClient) expectDataCount(numSamples int) { - if !c.withWaitGroup { - return - } - c.mtx.Lock() - defer c.mtx.Unlock() - c.wg.Add(numSamples) -} - -func (c *TestWriteClient) waitForExpectedDataCount() { - if !c.withWaitGroup { - return - } - c.wg.Wait() -} - func (c *TestWriteClient) Store(_ context.Context, req []byte) error { c.mtx.Lock() defer c.mtx.Unlock()