From 78c5ce3196e729ded7dbe13cc5c92eb4196c2d19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Tudur=C3=AD?= Date: Fri, 5 Jan 2024 19:40:30 +0100 Subject: [PATCH] Drop old inmemory samples (#13002) * Drop old inmemory samples Co-authored-by: Paschalis Tsilias Signed-off-by: Paschalis Tsilias Signed-off-by: Marc Tuduri * Avoid copying timeseries when the feature is disabled Signed-off-by: Paschalis Tsilias Signed-off-by: Marc Tuduri * Run gofmt Signed-off-by: Paschalis Tsilias Signed-off-by: Marc Tuduri * Clarify docs Signed-off-by: Marc Tuduri * Add more logging info Signed-off-by: Marc Tuduri * Remove loggers Signed-off-by: Marc Tuduri * optimize function and add tests Signed-off-by: Marc Tuduri * Simplify filter Signed-off-by: Marc Tuduri * rename var Signed-off-by: Marc Tuduri * Update help info from metrics Signed-off-by: Marc Tuduri * use metrics to keep track of drop elements during buildWriteRequest Signed-off-by: Marc Tuduri * rename var in tests Signed-off-by: Marc Tuduri * pass time.Now as parameter Signed-off-by: Marc Tuduri * Change buildwriterequest during retries Signed-off-by: Marc Tuduri * Revert "Remove loggers" This reverts commit 54f91dfcae20488944162335ab4ad8be459df1ab. Signed-off-by: Marc Tuduri * use log level debug for loggers Signed-off-by: Marc Tuduri * Fix linter Signed-off-by: Paschalis Tsilias * Remove noisy debug-level logs; add 'reason' label to drop metrics Signed-off-by: Paschalis Tsilias * Remove accidentally committed files Signed-off-by: Paschalis Tsilias * Propagate logger to buildWriteRequest to log dropped data Signed-off-by: Paschalis Tsilias * Fix docs comment Signed-off-by: Paschalis Tsilias * Make drop reason more specific Signed-off-by: Paschalis Tsilias * Remove unnecessary pass of logger Signed-off-by: Paschalis Tsilias * Use snake_case for reason label Signed-off-by: Paschalis Tsilias * Fix dropped samples metric Signed-off-by: Paschalis Tsilias --------- Signed-off-by: Paschalis Tsilias Signed-off-by: Marc Tuduri Signed-off-by: Paschalis Tsilias Co-authored-by: Paschalis Tsilias Co-authored-by: Paschalis Tsilias --- config/config.go | 3 + docs/configuration/configuration.md | 4 + storage/remote/codec_test.go | 2 +- storage/remote/queue_manager.go | 190 ++++++++++++++++--- storage/remote/queue_manager_test.go | 272 ++++++++++++++++++++++++++- storage/remote/write_handler_test.go | 24 +-- 6 files changed, 449 insertions(+), 46 deletions(-) diff --git a/config/config.go b/config/config.go index ddcca84dc..c544d7e74 100644 --- a/config/config.go +++ b/config/config.go @@ -1124,6 +1124,9 @@ type QueueConfig struct { MinBackoff model.Duration `yaml:"min_backoff,omitempty"` MaxBackoff model.Duration `yaml:"max_backoff,omitempty"` RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"` + + // Samples older than the limit will be dropped. + SampleAgeLimit model.Duration `yaml:"sample_age_limit,omitempty"` } // MetadataConfig is the configuration for sending metadata to remote diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 5e2f31c1c..3a7b71a2f 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -3619,6 +3619,10 @@ queue_config: # Retry upon receiving a 429 status code from the remote-write storage. # This is experimental and might change in the future. [ retry_on_http_429: | default = false ] + # If set, any sample that is older than sample_age_limit + # will not be sent to the remote storage. The default value is 0s, + # which means that all samples are sent. + [ sample_age_limit: | default = 0s ] # Configures the sending of series metadata to remote storage. # Metadata configuration is subject to change at any point diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index ac8b0f0b5..dbc22a377 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -517,7 +517,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) { } func TestDecodeWriteRequest(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) + buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil) require.NoError(t, err) actual, err := DecodeWriteRequest(bytes.NewReader(buf)) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a25c7d90c..e37ec8c70 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" @@ -51,6 +52,10 @@ const ( // Allow 30% too many shards before scaling down. shardToleranceFraction = 0.3 + + reasonTooOld = "too_old" + reasonDroppedSeries = "dropped_series" + reasonUnintentionalDroppedSeries = "unintentionally_dropped_series" ) type queueManagerMetrics struct { @@ -68,9 +73,9 @@ type queueManagerMetrics struct { retriedExemplarsTotal prometheus.Counter retriedHistogramsTotal prometheus.Counter retriedMetadataTotal prometheus.Counter - droppedSamplesTotal prometheus.Counter - droppedExemplarsTotal prometheus.Counter - droppedHistogramsTotal prometheus.Counter + droppedSamplesTotal *prometheus.CounterVec + droppedExemplarsTotal *prometheus.CounterVec + droppedHistogramsTotal *prometheus.CounterVec enqueueRetriesTotal prometheus.Counter sentBatchDuration prometheus.Histogram highestSentTimestamp *maxTimestamp @@ -180,27 +185,27 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.", ConstLabels: constLabels, }) - m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + m.droppedSamplesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "samples_dropped_total", - Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", + Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.", ConstLabels: constLabels, - }) - m.droppedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{"reason"}) + m.droppedExemplarsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "exemplars_dropped_total", - Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", + Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.", ConstLabels: constLabels, - }) - m.droppedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{"reason"}) + m.droppedHistogramsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "histograms_dropped_total", - Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.", + Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.", ConstLabels: constLabels, - }) + }, []string{"reason"}) m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -391,7 +396,8 @@ type WriteClient interface { // indicated by the provided WriteClient. Implements writeTo interface // used by WAL Watcher. type QueueManager struct { - lastSendTimestamp atomic.Int64 + lastSendTimestamp atomic.Int64 + buildRequestLimitTimestamp atomic.Int64 logger log.Logger flushDeadline time.Duration @@ -529,7 +535,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { // Build the WriteRequest with no samples. - req, _, err := buildWriteRequest(nil, metadata, pBuf, nil) + req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil) if err != nil { return err } @@ -575,18 +581,65 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p return nil } +func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) bool { + if sampleAgeLimit == 0 { + // If sampleAgeLimit is unset, then we never skip samples due to their age. + return false + } + limitTs := baseTime.Add(-sampleAgeLimit) + sampleTs := timestamp.Time(ts) + return sampleTs.Before(limitTs) +} + +func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool { + return func(ts prompb.TimeSeries) bool { + if sampleAgeLimit == 0 { + // If sampleAgeLimit is unset, then we never skip samples due to their age. + return false + } + switch { + // Only the first element should be set in the series, therefore we only check the first element. + case len(ts.Samples) > 0: + if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) { + metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() + return true + } + case len(ts.Histograms) > 0: + if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) { + metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() + return true + } + case len(ts.Exemplars) > 0: + if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) { + metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() + return true + } + default: + return false + } + return false + } +} + // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. func (t *QueueManager) Append(samples []record.RefSample) bool { + currentTime := time.Now() outer: for _, s := range samples { + if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) { + t.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc() + continue + } t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { - t.metrics.droppedSamplesTotal.Inc() t.dataDropped.incr(1) if _, ok := t.droppedSeries[s.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) + t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() + } else { + t.metrics.droppedSamplesTotal.WithLabelValues(reasonDroppedSeries).Inc() } t.seriesMtx.Unlock() continue @@ -629,17 +682,23 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool { if !t.sendExemplars { return true } - + currentTime := time.Now() outer: for _, e := range exemplars { + if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), e.T) { + t.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc() + continue + } t.seriesMtx.Lock() lbls, ok := t.seriesLabels[e.Ref] if !ok { - t.metrics.droppedExemplarsTotal.Inc() // Track dropped exemplars in the same EWMA for sharding calc. t.dataDropped.incr(1) if _, ok := t.droppedSeries[e.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref) + t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() + } else { + t.metrics.droppedExemplarsTotal.WithLabelValues(reasonDroppedSeries).Inc() } t.seriesMtx.Unlock() continue @@ -678,16 +737,22 @@ func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample) if !t.sendNativeHistograms { return true } - + currentTime := time.Now() outer: for _, h := range histograms { + if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) { + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() + continue + } t.seriesMtx.Lock() lbls, ok := t.seriesLabels[h.Ref] if !ok { - t.metrics.droppedHistogramsTotal.Inc() t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() + } else { + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc() } t.seriesMtx.Unlock() continue @@ -725,16 +790,22 @@ func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHi if !t.sendNativeHistograms { return true } - + currentTime := time.Now() outer: for _, h := range floatHistograms { + if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) { + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc() + continue + } t.seriesMtx.Lock() lbls, ok := t.seriesLabels[h.Ref] if !ok { - t.metrics.droppedHistogramsTotal.Inc() t.dataDropped.incr(1) if _, ok := t.droppedSeries[h.Ref]; !ok { level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() + } else { + t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc() } t.seriesMtx.Unlock() continue @@ -1490,7 +1561,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s // sendSamples to the remote storage with backoff for recoverable errors. func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error { // Build the WriteRequest with no metadata. - req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf) + req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, *buf, nil) + s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. @@ -1504,6 +1576,25 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti // without causing a memory leak, and it has the nice effect of not propagating any // parameters for sendSamplesWithBackoff/3. attemptStore := func(try int) error { + currentTime := time.Now() + lowest := s.qm.buildRequestLimitTimestamp.Load() + if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) { + // This will filter out old samples during retries. + req, _, lowest, err := buildWriteRequest( + s.qm.logger, + samples, + nil, + pBuf, + *buf, + isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)), + ) + s.qm.buildRequestLimitTimestamp.Store(lowest) + if err != nil { + return err + } + *buf = req + } + ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") defer span.End() @@ -1608,9 +1699,27 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l } } -func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) { +func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) { var highest int64 - for _, ts := range samples { + var lowest int64 + var droppedSamples, droppedExemplars, droppedHistograms int + + keepIdx := 0 + lowest = math.MaxInt64 + for i, ts := range timeSeries { + if filter != nil && filter(ts) { + if len(ts.Samples) > 0 { + droppedSamples++ + } + if len(ts.Exemplars) > 0 { + droppedExemplars++ + } + if len(ts.Histograms) > 0 { + droppedHistograms++ + } + continue + } + // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { highest = ts.Samples[0].Timestamp @@ -1621,10 +1730,37 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { highest = ts.Histograms[0].Timestamp } + + // Get lowest timestamp + if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest { + lowest = ts.Samples[0].Timestamp + } + if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest { + lowest = ts.Exemplars[0].Timestamp + } + if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest { + lowest = ts.Histograms[0].Timestamp + } + + // Move the current element to the write position and increment the write pointer + timeSeries[keepIdx] = timeSeries[i] + keepIdx++ + } + + timeSeries = timeSeries[:keepIdx] + return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms +} + +func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) { + highest, lowest, timeSeries, + droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) + + if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { + level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) } req := &prompb.WriteRequest{ - Timeseries: samples, + Timeseries: timeSeries, Metadata: metadata, } @@ -1635,7 +1771,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta } err := pBuf.Marshal(req) if err != nil { - return nil, highest, err + return nil, highest, lowest, err } // snappy uses len() to see if it needs to allocate a new slice. Make the @@ -1644,5 +1780,5 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta buf = buf[0:cap(buf)] } compressed := snappy.Encode(buf, pBuf.Bytes()) - return compressed, highest, nil + return compressed, highest, lowest, nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 17a904fcd..778861e2b 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -548,7 +548,7 @@ func TestShouldReshard(t *testing.T) { func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) { samples := make([]record.RefSample, 0, numSamples) series := make([]record.RefSeries, 0, numSeries) - b := labels.ScratchBuilder{} + lb := labels.ScratchBuilder{} for i := 0; i < numSeries; i++ { name := fmt.Sprintf("test_metric_%d", i) for j := 0; j < numSamples; j++ { @@ -559,15 +559,15 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([ }) } // Create Labels that is name of series plus any extra labels supplied. - b.Reset() - b.Add(labels.MetricName, name) + lb.Reset() + lb.Add(labels.MetricName, name) for _, l := range extraLabels { - b.Add(l.Name, l.Value) + lb.Add(l.Name, l.Value) } - b.Sort() + lb.Sort() series = append(series, record.RefSeries{ Ref: chunks.HeadSeriesRef(i), - Labels: b.Labels(), + Labels: lb.Labels(), }) } return samples, series @@ -1321,3 +1321,263 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) { t.FailNow() } } + +func TestDropOldTimeSeries(t *testing.T) { + size := 10 + nSeries := 6 + nSamples := config.DefaultQueueConfig.Capacity * size + samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries) + + c := NewTestWriteClient() + c.expectSamples(newSamples, series) + + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + cfg.MaxShards = 1 + cfg.SampleAgeLimit = model.Duration(60 * time.Second) + dir := t.TempDir() + + metrics := newQueueManagerMetrics(nil, "", "") + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) + m.StoreSeries(series, 0) + + m.Start() + defer m.Stop() + + m.Append(samples) + c.waitForExpectedData(t) +} + +func TestIsSampleOld(t *testing.T) { + currentTime := time.Now() + require.True(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-61*time.Second)))) + require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second)))) +} + +func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) { + newSamples := make([]record.RefSample, 0, numSamples) + samples := make([]record.RefSample, 0, numSamples) + series := make([]record.RefSeries, 0, numSeries) + lb := labels.ScratchBuilder{} + for i := 0; i < numSeries; i++ { + name := fmt.Sprintf("test_metric_%d", i) + // We create half of the samples in the past. + past := timestamp.FromTime(time.Now().Add(-5 * time.Minute)) + for j := 0; j < numSamples/2; j++ { + samples = append(samples, record.RefSample{ + Ref: chunks.HeadSeriesRef(i), + T: past + int64(j), + V: float64(i), + }) + } + for j := 0; j < numSamples/2; j++ { + sample := record.RefSample{ + Ref: chunks.HeadSeriesRef(i), + T: int64(int(time.Now().UnixMilli()) + j), + V: float64(i), + } + samples = append(samples, sample) + newSamples = append(newSamples, sample) + } + // Create Labels that is name of series plus any extra labels supplied. + lb.Reset() + lb.Add(labels.MetricName, name) + for _, l := range extraLabels { + lb.Add(l.Name, l.Value) + } + lb.Sort() + series = append(series, record.RefSeries{ + Ref: chunks.HeadSeriesRef(i), + Labels: lb.Labels(), + }) + } + return samples, newSamples, series +} + +func filterTsLimit(limit int64, ts prompb.TimeSeries) bool { + return limit > ts.Samples[0].Timestamp +} + +func TestBuildTimeSeries(t *testing.T) { + testCases := []struct { + name string + ts []prompb.TimeSeries + filter func(ts prompb.TimeSeries) bool + lowestTs int64 + highestTs int64 + droppedSamples int + responseLen int + }{ + { + name: "No filter applied", + ts: []prompb.TimeSeries{ + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567890, + Value: 1.23, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567891, + Value: 2.34, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567892, + Value: 3.34, + }, + }, + }, + }, + filter: nil, + responseLen: 3, + lowestTs: 1234567890, + highestTs: 1234567892, + }, + { + name: "Filter applied, samples in order", + ts: []prompb.TimeSeries{ + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567890, + Value: 1.23, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567891, + Value: 2.34, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567892, + Value: 3.45, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567893, + Value: 3.45, + }, + }, + }, + }, + filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) }, + responseLen: 2, + lowestTs: 1234567892, + highestTs: 1234567893, + droppedSamples: 2, + }, + { + name: "Filter applied, samples out of order", + ts: []prompb.TimeSeries{ + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567892, + Value: 3.45, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567890, + Value: 1.23, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567893, + Value: 3.45, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567891, + Value: 2.34, + }, + }, + }, + }, + filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) }, + responseLen: 2, + lowestTs: 1234567892, + highestTs: 1234567893, + droppedSamples: 2, + }, + { + name: "Filter applied, samples not consecutive", + ts: []prompb.TimeSeries{ + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567890, + Value: 1.23, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567892, + Value: 3.45, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567895, + Value: 6.78, + }, + }, + }, + { + Samples: []prompb.Sample{ + { + Timestamp: 1234567897, + Value: 6.78, + }, + }, + }, + }, + filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) }, + responseLen: 2, + lowestTs: 1234567895, + highestTs: 1234567897, + droppedSamples: 2, + }, + } + + // Run the test cases + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter) + require.NotNil(t, result) + require.Len(t, result, tc.responseLen) + require.Equal(t, tc.highestTs, highest) + require.Equal(t, tc.lowestTs, lowest) + require.Equal(t, tc.droppedSamples, droppedSamples) + }) + } +} diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index fd5b34ecd..9f7dcd175 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -38,7 +38,7 @@ import ( ) func TestRemoteWriteHandler(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) + buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -84,10 +84,10 @@ func TestRemoteWriteHandler(t *testing.T) { } func TestOutOfOrderSample(t *testing.T) { - buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - }}, nil, nil, nil) + }}, nil, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -109,10 +109,10 @@ func TestOutOfOrderSample(t *testing.T) { // don't fail on ingestion errors since the exemplar storage is // still experimental. func TestOutOfOrderExemplar(t *testing.T) { - buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, - }}, nil, nil, nil) + }}, nil, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -132,10 +132,10 @@ func TestOutOfOrderExemplar(t *testing.T) { } func TestOutOfOrderHistogram(t *testing.T) { - buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, - }}, nil, nil, nil) + }}, nil, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -158,13 +158,13 @@ func BenchmarkRemoteWritehandler(b *testing.B) { reqs := []*http.Request{} for i := 0; i < b.N; i++ { num := strings.Repeat(strconv.Itoa(i), 16) - buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ Labels: []prompb.Label{ {Name: "__name__", Value: "test_metric"}, {Name: "test_label_name_" + num, Value: labelValue + num}, }, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)}, - }}, nil, nil, nil) + }}, nil, nil, nil, nil) require.NoError(b, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) require.NoError(b, err) @@ -182,7 +182,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) { } func TestCommitErr(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) + buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -219,7 +219,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head()) - buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) + buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil) require.NoError(b, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -232,7 +232,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { var bufRequests [][]byte for i := 0; i < 100; i++ { - buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil) + buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil) require.NoError(b, err) bufRequests = append(bufRequests, buf) }