Merge pull request #9934 from bboreham/remote-write-struct

remote-write: buffer struct instead of interface to reduce garbage-collection
pull/10011/head
Chris Marchbanks 3 years ago committed by GitHub
commit 0a8d28ea93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -507,7 +507,6 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
// Append queues a sample to be sent to the remote storage. Blocks until all samples are // Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received. // enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool { func (t *QueueManager) Append(samples []record.RefSample) bool {
var appendSample prompb.Sample
outer: outer:
for _, s := range samples { for _, s := range samples {
t.seriesMtx.Lock() t.seriesMtx.Lock()
@ -530,9 +529,12 @@ outer:
return false return false
default: default:
} }
appendSample.Value = s.V if t.shards.enqueue(s.Ref, sampleOrExemplar{
appendSample.Timestamp = s.T seriesLabels: lbls,
if t.shards.enqueue(s.Ref, writeSample{lbls, appendSample}) { timestamp: s.T,
value: s.V,
isSample: true,
}) {
continue outer continue outer
} }
@ -552,7 +554,6 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool {
return true return true
} }
var appendExemplar prompb.Exemplar
outer: outer:
for _, e := range exemplars { for _, e := range exemplars {
t.seriesMtx.Lock() t.seriesMtx.Lock()
@ -576,10 +577,12 @@ outer:
return false return false
default: default:
} }
appendExemplar.Labels = labelsToLabelsProto(e.Labels, nil) if t.shards.enqueue(e.Ref, sampleOrExemplar{
appendExemplar.Timestamp = e.T seriesLabels: lbls,
appendExemplar.Value = e.V timestamp: e.T,
if t.shards.enqueue(e.Ref, writeExemplar{lbls, appendExemplar}) { value: e.V,
exemplarLabels: e.Labels,
}) {
continue outer continue outer
} }
@ -901,16 +904,6 @@ func (t *QueueManager) newShards() *shards {
return s return s
} }
type writeSample struct {
seriesLabels labels.Labels
sample prompb.Sample
}
type writeExemplar struct {
seriesLabels labels.Labels
exemplar prompb.Exemplar
}
type shards struct { type shards struct {
mtx sync.RWMutex // With the WAL, this is never actually contended. mtx sync.RWMutex // With the WAL, this is never actually contended.
@ -999,7 +992,7 @@ func (s *shards) stop() {
// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding, // enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry. // will return false; in this case, you should back off and retry.
func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool { func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
s.mtx.RLock() s.mtx.RLock()
defer s.mtx.RUnlock() defer s.mtx.RUnlock()
@ -1018,43 +1011,48 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data interface{}) bool {
if !appended { if !appended {
return false return false
} }
switch data.(type) { if data.isSample {
case writeSample:
s.qm.metrics.pendingSamples.Inc() s.qm.metrics.pendingSamples.Inc()
s.enqueuedSamples.Inc() s.enqueuedSamples.Inc()
case writeExemplar: } else {
s.qm.metrics.pendingExemplars.Inc() s.qm.metrics.pendingExemplars.Inc()
s.enqueuedExemplars.Inc() s.enqueuedExemplars.Inc()
default:
level.Warn(s.qm.logger).Log("msg", "Invalid object type in shards enqueue")
} }
return true return true
} }
} }
type queue struct { type queue struct {
batch []interface{} batch []sampleOrExemplar
batchQueue chan []interface{} batchQueue chan []sampleOrExemplar
// Since we know there are a limited number of batches out, using a stack // Since we know there are a limited number of batches out, using a stack
// is easy and safe so a sync.Pool is not necessary. // is easy and safe so a sync.Pool is not necessary.
batchPool [][]interface{} batchPool [][]sampleOrExemplar
// This mutex covers adding and removing batches from the batchPool. // This mutex covers adding and removing batches from the batchPool.
poolMux sync.Mutex poolMux sync.Mutex
} }
type sampleOrExemplar struct {
seriesLabels labels.Labels
value float64
timestamp int64
exemplarLabels labels.Labels
isSample bool
}
func newQueue(batchSize, capacity int) *queue { func newQueue(batchSize, capacity int) *queue {
batches := capacity / batchSize batches := capacity / batchSize
return &queue{ return &queue{
batch: make([]interface{}, 0, batchSize), batch: make([]sampleOrExemplar, 0, batchSize),
batchQueue: make(chan []interface{}, batches), batchQueue: make(chan []sampleOrExemplar, batches),
// batchPool should have capacity for everything in the channel + 1 for // batchPool should have capacity for everything in the channel + 1 for
// the batch being processed. // the batch being processed.
batchPool: make([][]interface{}, 0, batches+1), batchPool: make([][]sampleOrExemplar, 0, batches+1),
} }
} }
func (q *queue) Append(datum interface{}, stop <-chan struct{}) bool { func (q *queue) Append(datum sampleOrExemplar, stop <-chan struct{}) bool {
q.batch = append(q.batch, datum) q.batch = append(q.batch, datum)
if len(q.batch) == cap(q.batch) { if len(q.batch) == cap(q.batch) {
select { select {
@ -1070,20 +1068,20 @@ func (q *queue) Append(datum interface{}, stop <-chan struct{}) bool {
return true return true
} }
func (q *queue) Chan() <-chan []interface{} { func (q *queue) Chan() <-chan []sampleOrExemplar {
return q.batchQueue return q.batchQueue
} }
// Batch returns the current batch and allocates a new batch. Must not be // Batch returns the current batch and allocates a new batch. Must not be
// called concurrently with Append. // called concurrently with Append.
func (q *queue) Batch() []interface{} { func (q *queue) Batch() []sampleOrExemplar {
batch := q.batch batch := q.batch
q.batch = q.newBatch(cap(batch)) q.batch = q.newBatch(cap(batch))
return batch return batch
} }
// ReturnForReuse adds the batch buffer back to the internal pool. // ReturnForReuse adds the batch buffer back to the internal pool.
func (q *queue) ReturnForReuse(batch []interface{}) { func (q *queue) ReturnForReuse(batch []sampleOrExemplar) {
q.poolMux.Lock() q.poolMux.Lock()
defer q.poolMux.Unlock() defer q.poolMux.Unlock()
if len(q.batchPool) < cap(q.batchPool) { if len(q.batchPool) < cap(q.batchPool) {
@ -1106,7 +1104,7 @@ func (q *queue) FlushAndShutdown(done <-chan struct{}) {
close(q.batchQueue) close(q.batchQueue)
} }
func (q *queue) newBatch(capacity int) []interface{} { func (q *queue) newBatch(capacity int) []sampleOrExemplar {
q.poolMux.Lock() q.poolMux.Lock()
defer q.poolMux.Unlock() defer q.poolMux.Unlock()
batches := len(q.batchPool) batches := len(q.batchPool)
@ -1115,7 +1113,7 @@ func (q *queue) newBatch(capacity int) []interface{} {
q.batchPool = q.batchPool[:batches-1] q.batchPool = q.batchPool[:batches-1]
return batch return batch
} }
return make([]interface{}, 0, capacity) return make([]sampleOrExemplar, 0, capacity)
} }
func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
@ -1192,7 +1190,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
// traffic instances. // traffic instances.
s.mtx.Lock() s.mtx.Lock()
// First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock. // First, we need to see if we can happen to get a batch from the queue if it filled while acquiring the lock.
var batch []interface{} var batch []sampleOrExemplar
select { select {
case batch = <-batchQueue: case batch = <-batchQueue:
default: default:
@ -1211,9 +1209,9 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
func (s *shards) populateTimeSeries(batch []interface{}, pendingData []prompb.TimeSeries) (int, int) { func (s *shards) populateTimeSeries(batch []sampleOrExemplar, pendingData []prompb.TimeSeries) (int, int) {
var nPendingSamples, nPendingExemplars int var nPendingSamples, nPendingExemplars int
for nPending, sample := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars { if s.qm.sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
@ -1221,14 +1219,20 @@ func (s *shards) populateTimeSeries(batch []interface{}, pendingData []prompb.Ti
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index. // stop reading from the queue. This makes it safe to reference pendingSamples by index.
switch d := sample.(type) { if d.isSample {
case writeSample:
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample) pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
Value: d.value,
Timestamp: d.timestamp,
})
nPendingSamples++ nPendingSamples++
case writeExemplar: } else {
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar) pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value,
Timestamp: d.timestamp,
})
nPendingExemplars++ nPendingExemplars++
} }
} }

@ -578,22 +578,6 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
} }
} }
func (c *TestWriteClient) expectDataCount(numSamples int) {
if !c.withWaitGroup {
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
c.wg.Add(numSamples)
}
func (c *TestWriteClient) waitForExpectedDataCount() {
if !c.withWaitGroup {
return
}
c.wg.Wait()
}
func (c *TestWriteClient) Store(_ context.Context, req []byte) error { func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
@ -682,7 +666,15 @@ func (c *TestBlockingWriteClient) Endpoint() string {
return "http://test-remote-blocking.com/1234" return "http://test-remote-blocking.com/1234"
} }
func BenchmarkSampleDelivery(b *testing.B) { // For benchmarking the send and not the receive side.
type NopWriteClient struct{}
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} }
func (c *NopWriteClient) Store(_ context.Context, req []byte) error { return nil }
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
func BenchmarkSampleSend(b *testing.B) {
// Send one sample per series, which is the typical remote_write case // Send one sample per series, which is the typical remote_write case
const numSamples = 1 const numSamples = 1
const numSeries = 10000 const numSeries = 10000
@ -707,12 +699,13 @@ func BenchmarkSampleDelivery(b *testing.B) {
} }
samples, series := createTimeseries(numSamples, numSeries, extraLabels...) samples, series := createTimeseries(numSamples, numSeries, extraLabels...)
c := NewTestWriteClient() c := NewNopWriteClient()
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MaxShards = 1 cfg.MinShards = 20
cfg.MaxShards = 20
dir := b.TempDir() dir := b.TempDir()
@ -726,11 +719,9 @@ func BenchmarkSampleDelivery(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
c.expectDataCount(len(samples)) m.Append(samples)
go m.Append(samples)
m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does
m.SeriesReset(i + 1) m.SeriesReset(i + 1)
c.waitForExpectedDataCount()
} }
// Do not include shutdown // Do not include shutdown
b.StopTimer() b.StopTimer()

Loading…
Cancel
Save