From cccbe72514e4f5a86b490f9a45edecb401f091fe Mon Sep 17 00:00:00 2001 From: Vanshika <102902652+Vanshikav123@users.noreply.github.com> Date: Wed, 23 Oct 2024 21:04:28 +0530 Subject: [PATCH] TSDB: Fix some edge cases when OOO is enabled (#14710) Fix some edge cases when OOO is enabled Signed-off-by: Vanshikav123 Signed-off-by: Vanshika <102902652+Vanshikav123@users.noreply.github.com> Signed-off-by: Jesus Vazquez Co-authored-by: Jesus Vazquez --- CHANGELOG.md | 92 +++++++++++++- cmd/prometheus/main.go | 3 + rules/fixtures/rules1.yaml | 5 + rules/group.go | 4 + rules/manager_test.go | 47 ++++++++ scrape/helpers_test.go | 4 + scrape/scrape.go | 4 +- scrape/scrape_test.go | 173 ++++++++++++++++++++++++++- storage/fanout.go | 10 ++ storage/interface.go | 8 ++ storage/remote/write.go | 5 + storage/remote/write_handler_test.go | 4 + tsdb/agent/db.go | 5 + tsdb/head_append.go | 20 +++- util/teststorage/storage.go | 14 ++- 15 files changed, 388 insertions(+), 10 deletions(-) create mode 100644 rules/fixtures/rules1.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index f1321829e..72d9f7a11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,9 @@ ## unreleased * [CHANGE] Scraping: Remove implicit fallback to the Prometheus text format in case of invalid/missing Content-Type and fail the scrape instead. Add ability to specify a `fallback_scrape_protocol` in the scrape config. #15136 -* [BUGFIX] PromQL: Fix stddev+stdvar aggregations to always ignore native histograms. #14941 -* [BUGFIX] PromQL: Fix stddev+stdvar aggregations to treat Infinity consistently. #14941 +* [ENHANCEMENT] Scraping, rules: handle targets reappearing, or rules moving group, when out-of-order is enabled. #14710 +- [BUGFIX] PromQL: Fix stddev+stdvar aggregations to always ignore native histograms. #14941 +- [BUGFIX] PromQL: Fix stddev+stdvar aggregations to treat Infinity consistently. #14941 ## 3.0.0-beta.1 / 2024-10-09 @@ -20,7 +21,6 @@ * [ENHANCEMENT] PromQL: Introduce exponential interpolation for native histograms. #14677 * [ENHANCEMENT] TSDB: Add support for ingestion of out-of-order native histogram samples. #14850, #14546 * [ENHANCEMENT] Alerts: remove metrics for removed Alertmanagers. #13909 -* [ENHANCEMENT] Scraping: support Created-Timestamp feature on native histograms. #14694 * [ENHANCEMENT] Kubernetes SD: Support sidecar containers in endpoint discovery. #14929 * [ENHANCEMENT] Consul SD: Support catalog filters. #11224 * [PERF] TSDB: Parallelize deletion of postings after head compaction. #14975 @@ -41,6 +41,10 @@ Release 3.0.0-beta.0 includes new features such as a brand new UI and UTF-8 supp As is traditional with a beta release, we do **not** recommend users install 3.0.0-beta on critical production systems, but we do want everyone to test it out and find bugs. +<<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> b10c3696c (Revert "updated changelog") * [CHANGE] UI: The old web UI has been replaced by a completely new one that is less cluttered and adds a few new features (PromLens-style tree view, better metrics explorer, "Explain" tab). However, it is still missing some features of the old UI (notably, exemplar display and heatmaps). To switch back to the old UI, you can use the feature flag `--enable-feature=old-ui` for the time being. #14872 * [CHANGE] PromQL: Range selectors and the lookback delta are now left-open, i.e. a sample coinciding with the lower time limit is excluded rather than included. #13904 * [CHANGE] Kubernetes SD: Remove support for `discovery.k8s.io/v1beta1` API version of EndpointSlice. This version is no longer served as of Kubernetes v1.25. #14365 @@ -52,6 +56,7 @@ As is traditional with a beta release, we do **not** recommend users install 3.0 * [CHANGE] Remove deprecated `remote-write-receiver`,`promql-at-modifier`, and `promql-negative-offset` feature flags. #13456, #14526 * [CHANGE] Remove deprecated `storage.tsdb.allow-overlapping-blocks`, `alertmanager.timeout`, and `storage.tsdb.retention` flags. #14640, #14643 * [ENHANCEMENT] Move AM discovery page from "Monitoring status" to "Server status". #14875 +<<<<<<< HEAD * [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769 * [BUGFIX] Scrape: Do not override target parameter labels with config params. #11029 @@ -85,6 +90,87 @@ As is traditional with a beta release, we do **not** recommend users install 3.0 * [BUGFIX] Remote-Write: Return 4xx not 5xx when timeseries has duplicate label. #14716 * [BUGFIX] Experimental Native Histograms: many fixes for incorrect results, panics, warnings. #14513, #14575, #14598, #14609, #14611, #14771, #14821 * [BUGFIX] TSDB: Only count unknown record types in `record_decode_failures_total` metric. #14042 +======= +- [CHANGE] UI: The old web UI has been replaced by a completely new one that is less cluttered and adds a few new features (PromLens-style tree view, better metrics explorer, "Explain" tab). However, it is still missing some features of the old UI (notably, exemplar display and heatmaps). To switch back to the old UI, you can use the feature flag `--enable-feature=old-ui` for the time being. #14872 +- [CHANGE] PromQL: Range selectors and the lookback delta are now left-open, i.e. a sample coinciding with the lower time limit is excluded rather than included. #13904 +- [CHANGE] Kubernetes SD: Remove support for `discovery.k8s.io/v1beta1` API version of EndpointSlice. This version is no longer served as of Kubernetes v1.25. #14365 +- [CHANGE] Kubernetes SD: Remove support for `networking.k8s.io/v1beta1` API version of Ingress. This version is no longer served as of Kubernetes v1.22. #14365 +- [CHANGE] UTF-8: Enable UTF-8 support by default. Prometheus now allows all UTF-8 characters in metric and label names. The corresponding `utf8-name` feature flag has been removed. #14705 +- [CHANGE] Console: Remove example files for the console feature. Users can continue using the console feature by supplying their own JavaScript and templates. #14807 +- [CHANGE] SD: Enable the new service discovery manager by default. This SD manager does not restart unchanged discoveries upon reloading. This makes reloads faster and reduces pressure on service discoveries' sources. The corresponding `new-service-discovery-manager` feature flag has been removed. #14770 +- [CHANGE] Agent mode has been promoted to stable. The feature flag `agent` has been removed. To run Prometheus in Agent mode, use the new `--agent` cmdline arg instead. #14747 +- [CHANGE] Remove deprecated `remote-write-receiver`,`promql-at-modifier`, and `promql-negative-offset` feature flags. #13456, #14526 +- [CHANGE] Remove deprecated `storage.tsdb.allow-overlapping-blocks`, `alertmanager.timeout`, and `storage.tsdb.retention` flags. #14640, #14643 +- [ENHANCEMENT] Move AM discovery page from "Monitoring status" to "Server status". #14875 +- [BUGFIX] Scrape: Do not override target parameter labels with config params. #11029 + +## 2.55.0-rc.0 / 2024-09-20 + +- [FEATURE] Support UTF-8 characters in label names - feature flag `utf8-names`. #14482, #14880, #14736, #14727 +- [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769 +- [FEATURE] Scraping: Add the ability to set custom `http_headers` in config. #14817 +- [FEATURE] Scraping: Support feature flag `created-timestamp-zero-ingestion` in OpenMetrics. #14356, #14815 +- [FEATURE] Scraping: `scrape_failure_log_file` option to log failures to a file. #14734 +- [FEATURE] OTLP receiver: Optional promotion of resource attributes to series labels. #14200 +- [FEATURE] Remote-Write: Support Google Cloud Monitoring authorization. #14346 +- [FEATURE] Promtool: `tsdb create-blocks` new option to add labels. #14403 +- [FEATURE] Promtool: `promtool test` adds `--junit` flag to format results. #14506 +- [ENHANCEMENT] OTLP receiver: Warn on exponential histograms with zero count and non-zero sum. #14706 +- [ENHANCEMENT] OTLP receiver: Interrupt translation on context cancellation/timeout. #14612 +- [ENHANCEMENT] Remote Read client: Enable streaming remote read if the server supports it. #11379 +- [ENHANCEMENT] Remote-Write: Don't reshard if we haven't successfully sent a sample since last update. #14450 +- [ENHANCEMENT] PromQL: Delay deletion of `__name__` label to the end of the query evaluation. This is **experimental** and enabled under the feature-flag `promql-delayed-name-removal`. #14477 +- [ENHANCEMENT] PromQL: Experimental `sort_by_label` and `sort_by_label_desc` sort by all labels when label is equal. #14655 +- [ENHANCEMENT] PromQL: Clarify error message logged when Go runtime panic occurs during query evaluation. #14621 +- [ENHANCEMENT] PromQL: Use Kahan summation for better accuracy in `avg` and `avg_over_time`. #14413 +- [ENHANCEMENT] Tracing: Improve PromQL tracing, including showing the operation performed for aggregates, operators, and calls. #14816 +- [ENHANCEMENT] API: Support multiple listening addresses. #14665 +- [ENHANCEMENT] TSDB: Backward compatibility with upcoming index v3. #14934 +- [PERF] TSDB: Query in-order and out-of-order series together. #14354, #14693, #14714, #14831, #14874, #14948 +- [PERF] TSDB: Streamline reading of overlapping out-of-order head chunks. #14729 +- [BUGFIX] SD: Fix dropping targets (with feature flag `new-service-discovery-manager`). #13147 +- [BUGFIX] SD: Stop storing stale targets (with feature flag `new-service-discovery-manager`). #13622 +- [BUGFIX] Scraping: exemplars could be dropped in protobuf scraping. #14810 +- [BUGFIX] Remote-Write: fix metadata sending for experimental Remote-Write V2. #14766 +- [BUGFIX] Remote-Write: Return 4xx not 5xx when timeseries has duplicate label. #14716 +- [BUGFIX] Experimental Native Histograms: many fixes for incorrect results, panics, warnings. #14513, #14575, #14598, #14609, #14611, #14771, #14821 +- [BUGFIX] TSDB: Only count unknown record types in `record_decode_failures_total` metric. #14042 +>>>>>>> 58173ab1e (updated changelog) +======= +* [BUGFIX] Scrape: Do not override target parameter labels with config params. #11029 + +## 2.55.0-rc.0 / 2024-09-20 + +* [FEATURE] Support UTF-8 characters in label names - feature flag `utf8-names`. #14482, #14880, #14736, #14727 +* [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769 +* [FEATURE] Scraping: Add the ability to set custom `http_headers` in config. #14817 +* [FEATURE] Scraping: Support feature flag `created-timestamp-zero-ingestion` in OpenMetrics. #14356, #14815 +* [FEATURE] Scraping: `scrape_failure_log_file` option to log failures to a file. #14734 +* [FEATURE] OTLP receiver: Optional promotion of resource attributes to series labels. #14200 +* [FEATURE] Remote-Write: Support Google Cloud Monitoring authorization. #14346 +* [FEATURE] Promtool: `tsdb create-blocks` new option to add labels. #14403 +* [FEATURE] Promtool: `promtool test` adds `--junit` flag to format results. #14506 +* [ENHANCEMENT] OTLP receiver: Warn on exponential histograms with zero count and non-zero sum. #14706 +* [ENHANCEMENT] OTLP receiver: Interrupt translation on context cancellation/timeout. #14612 +* [ENHANCEMENT] Remote Read client: Enable streaming remote read if the server supports it. #11379 +* [ENHANCEMENT] Remote-Write: Don't reshard if we haven't successfully sent a sample since last update. #14450 +* [ENHANCEMENT] PromQL: Delay deletion of `__name__` label to the end of the query evaluation. This is **experimental** and enabled under the feature-flag `promql-delayed-name-removal`. #14477 +* [ENHANCEMENT] PromQL: Experimental `sort_by_label` and `sort_by_label_desc` sort by all labels when label is equal. #14655 +* [ENHANCEMENT] PromQL: Clarify error message logged when Go runtime panic occurs during query evaluation. #14621 +* [ENHANCEMENT] PromQL: Use Kahan summation for better accuracy in `avg` and `avg_over_time`. #14413 +* [ENHANCEMENT] Tracing: Improve PromQL tracing, including showing the operation performed for aggregates, operators, and calls. #14816 +* [ENHANCEMENT] API: Support multiple listening addresses. #14665 +* [ENHANCEMENT] TSDB: Backward compatibility with upcoming index v3. #14934 +* [PERF] TSDB: Query in-order and out-of-order series together. #14354, #14693, #14714, #14831, #14874, #14948 +* [PERF] TSDB: Streamline reading of overlapping out-of-order head chunks. #14729 +* [BUGFIX] SD: Fix dropping targets (with feature flag `new-service-discovery-manager`). #13147 +* [BUGFIX] SD: Stop storing stale targets (with feature flag `new-service-discovery-manager`). #13622 +* [BUGFIX] Scraping: exemplars could be dropped in protobuf scraping. #14810 +* [BUGFIX] Remote-Write: fix metadata sending for experimental Remote-Write V2. #14766 +* [BUGFIX] Remote-Write: Return 4xx not 5xx when timeseries has duplicate label. #14716 +* [BUGFIX] Experimental Native Histograms: many fixes for incorrect results, panics, warnings. #14513, #14575, #14598, #14609, #14611, #14771, #14821 +* [BUGFIX] TSDB: Only count unknown record types in `record_decode_failures_total` metric. #14042 +>>>>>>> b10c3696c (Revert "updated changelog") ## 2.54.1 / 2024-08-27 diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4a70d63bf..045389770 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1639,6 +1639,9 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender { type notReadyAppender struct{} +// SetOptions does nothing in this appender implementation. +func (n notReadyAppender) SetOptions(opts *storage.AppendOptions) {} + func (n notReadyAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { return 0, tsdb.ErrNotReady } diff --git a/rules/fixtures/rules1.yaml b/rules/fixtures/rules1.yaml new file mode 100644 index 000000000..76fbf71f3 --- /dev/null +++ b/rules/fixtures/rules1.yaml @@ -0,0 +1,5 @@ +groups: + - name: test_1 + rules: + - record: test_2 + expr: vector(2) diff --git a/rules/group.go b/rules/group.go index e9ef2be3a..7dd046b57 100644 --- a/rules/group.go +++ b/rules/group.go @@ -75,6 +75,7 @@ type Group struct { // concurrencyController controls the rules evaluation concurrency. concurrencyController RuleConcurrencyController + appOpts *storage.AppendOptions } // GroupEvalIterationFunc is used to implement and extend rule group @@ -145,6 +146,7 @@ func NewGroup(o GroupOptions) *Group { metrics: metrics, evalIterationFunc: evalIterationFunc, concurrencyController: concurrencyController, + appOpts: &storage.AppendOptions{DiscardOutOfOrder: true}, } } @@ -564,6 +566,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if s.H != nil { _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) } else { + app.SetOptions(g.appOpts) _, err = app.Append(0, s.Metric, s.T, s.F) } @@ -660,6 +663,7 @@ func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { return } app := g.opts.Appendable.Appender(ctx) + app.SetOptions(g.appOpts) queryOffset := g.QueryOffset() for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. diff --git a/rules/manager_test.go b/rules/manager_test.go index 198d6bd07..6afac993d 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1195,6 +1195,53 @@ func countStaleNaN(t *testing.T, st storage.Storage) int { return c } +func TestRuleMovedBetweenGroups(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + storage := teststorage.New(t, 600000) + defer storage.Close() + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(opts) + ruleManager := NewManager(&ManagerOptions{ + Appendable: storage, + Queryable: storage, + QueryFunc: EngineQueryFunc(engine, storage), + Context: context.Background(), + Logger: promslog.NewNopLogger(), + }) + var stopped bool + ruleManager.start() + defer func() { + if !stopped { + ruleManager.Stop() + } + }() + + rule2 := "fixtures/rules2.yaml" + rule1 := "fixtures/rules1.yaml" + + // Load initial configuration of rules2 + require.NoError(t, ruleManager.Update(1*time.Second, []string{rule2}, labels.EmptyLabels(), "", nil)) + + // Wait for rule to be evaluated + time.Sleep(3 * time.Second) + + // Reload configuration of rules1 + require.NoError(t, ruleManager.Update(1*time.Second, []string{rule1}, labels.EmptyLabels(), "", nil)) + + // Wait for rule to be evaluated in new location and potential staleness marker + time.Sleep(3 * time.Second) + + require.Equal(t, 0, countStaleNaN(t, storage)) // Not expecting any stale markers. +} + func TestGroupHasAlertingRules(t *testing.T) { tests := []struct { group *Group diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 4f7918f79..12a56d707 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -43,6 +43,8 @@ func (a nopAppendable) Appender(_ context.Context) storage.Appender { type nopAppender struct{} +func (a nopAppender) SetOptions(opts *storage.AppendOptions) {} + func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) { return 0, nil } @@ -114,6 +116,8 @@ type collectResultAppender struct { pendingMetadata []metadata.Metadata } +func (a *collectResultAppender) SetOptions(opts *storage.AppendOptions) {} + func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { a.mtx.Lock() defer a.mtx.Unlock() diff --git a/scrape/scrape.go b/scrape/scrape.go index f5f02d245..7e270bb3a 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1864,7 +1864,9 @@ loop: if err == nil { sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. + app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) + app.SetOptions(nil) switch { case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): // Do not count these in logging, as this is expected if a target @@ -1970,7 +1972,7 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) { ts := timestamp.FromTime(start) - + app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) stale := math.Float64frombits(value.StaleNaN) b := labels.NewBuilder(labels.EmptyLabels()) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index da964a230..f75e1db89 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -86,6 +86,97 @@ func TestNewScrapePool(t *testing.T) { require.NotNil(t, sp.newLoop, "newLoop function not initialized.") } +func TestStorageHandlesOutOfOrderTimestamps(t *testing.T) { + // Test with default OutOfOrderTimeWindow (0) + t.Run("Out-Of-Order Sample Disabled", func(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + runScrapeLoopTest(t, s, false) + }) + + // Test with specific OutOfOrderTimeWindow (600000) + t.Run("Out-Of-Order Sample Enabled", func(t *testing.T) { + s := teststorage.New(t, 600000) + defer s.Close() + + runScrapeLoopTest(t, s, true) + }) +} + +func runScrapeLoopTest(t *testing.T, s *teststorage.TestStorage, expectOutOfOrder bool) { + // Create an appender for adding samples to the storage. + app := s.Appender(context.Background()) + capp := &collectResultAppender{next: app} + sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0) + + // Current time for generating timestamps. + now := time.Now() + + // Calculate timestamps for the samples based on the current time. + now = now.Truncate(time.Minute) // round down the now timestamp to the nearest minute + timestampInorder1 := now + timestampOutOfOrder := now.Add(-5 * time.Minute) + timestampInorder2 := now.Add(5 * time.Minute) + + slApp := sl.appender(context.Background()) + _, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", timestampInorder1) + require.NoError(t, err) + + _, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 2`), "", timestampOutOfOrder) + require.NoError(t, err) + + _, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 3`), "", timestampInorder2) + require.NoError(t, err) + + require.NoError(t, slApp.Commit()) + + // Query the samples back from the storage. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano()) + require.NoError(t, err) + defer q.Close() + + // Use a matcher to filter the metric name. + series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "metric_a")) + + var results []floatSample + for series.Next() { + it := series.At().Iterator(nil) + for it.Next() == chunkenc.ValFloat { + t, v := it.At() + results = append(results, floatSample{ + metric: series.At().Labels(), + t: t, + f: v, + }) + } + require.NoError(t, it.Err()) + } + require.NoError(t, series.Err()) + + // Define the expected results + want := []floatSample{ + { + metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), + t: timestamp.FromTime(timestampInorder1), + f: 1, + }, + { + metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), + t: timestamp.FromTime(timestampInorder2), + f: 3, + }, + } + + if expectOutOfOrder { + require.NotEqual(t, want, results, "Expected results to include out-of-order sample:\n%s", results) + } else { + require.Equal(t, want, results, "Appended samples not as expected:\n%s", results) + } +} + func TestDroppedTargetsList(t *testing.T) { var ( app = &nopAppendable{} @@ -1157,6 +1248,87 @@ func BenchmarkScrapeLoopAppendOM(b *testing.B) { } } +func TestSetOptionsHandlingStaleness(t *testing.T) { + s := teststorage.New(t, 600000) + defer s.Close() + + signal := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Function to run the scrape loop + runScrapeLoop := func(ctx context.Context, t *testing.T, cue int, action func(*scrapeLoop)) { + var ( + scraper = &testScraper{} + app = func(ctx context.Context) storage.Appender { + return s.Appender(ctx) + } + ) + sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond) + numScrapes := 0 + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes++ + if numScrapes == cue { + action(sl) + } + w.Write([]byte(fmt.Sprintf("metric_a{a=\"1\",b=\"1\"} %d\n", 42+numScrapes))) + return nil + } + sl.run(nil) + } + go func() { + runScrapeLoop(ctx, t, 2, func(sl *scrapeLoop) { + go sl.stop() + // Wait a bit then start a new target. + time.Sleep(100 * time.Millisecond) + go func() { + runScrapeLoop(ctx, t, 4, func(_ *scrapeLoop) { + cancel() + }) + signal <- struct{}{} + }() + }) + }() + + select { + case <-signal: + case <-time.After(10 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } + + ctx1, cancel := context.WithCancel(context.Background()) + defer cancel() + + q, err := s.Querier(0, time.Now().UnixNano()) + + require.NoError(t, err) + defer q.Close() + + series := q.Select(ctx1, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "metric_a")) + + var results []floatSample + for series.Next() { + it := series.At().Iterator(nil) + for it.Next() == chunkenc.ValFloat { + t, v := it.At() + results = append(results, floatSample{ + metric: series.At().Labels(), + t: t, + f: v, + }) + } + require.NoError(t, it.Err()) + } + require.NoError(t, series.Err()) + var c int + for _, s := range results { + if value.IsStaleNaN(s.f) { + c++ + } + } + require.Equal(t, 0, c, "invalid count of staleness markers after stopping the engine") +} + func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { appender := &collectResultAppender{} var ( @@ -4032,7 +4204,6 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t * case <-time.After(5 * time.Second): t.Fatalf("Scrape wasn't stopped.") } - // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // each scrape successful or not. require.Len(t, appender.resultFloats, 27, "Appended samples not as expected:\n%s", appender) diff --git a/storage/fanout.go b/storage/fanout.go index 6ff517895..4d076788a 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -147,6 +147,16 @@ type fanoutAppender struct { secondaries []Appender } +// SetOptions propagates the hints to both primary and secondary appenders. +func (f *fanoutAppender) SetOptions(opts *AppendOptions) { + if f.primary != nil { + f.primary.SetOptions(opts) + } + for _, appender := range f.secondaries { + appender.SetOptions(opts) + } +} + func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) { ref, err := f.primary.Append(ref, l, t, v) if err != nil { diff --git a/storage/interface.go b/storage/interface.go index b7ef14ce9..56bb53dfe 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -243,6 +243,10 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) { return f(mint, maxt) } +type AppendOptions struct { + DiscardOutOfOrder bool +} + // Appender provides batched appends against a storage. // It must be completed with a call to Commit or Rollback and must not be reused afterwards. // @@ -271,6 +275,10 @@ type Appender interface { // Appender has to be discarded after rollback. Rollback() error + // SetOptions configures the appender with specific append options such as + // discarding out-of-order samples even if out-of-order is enabled in the TSDB. + SetOptions(opts *AppendOptions) + ExemplarAppender HistogramAppender MetadataUpdater diff --git a/storage/remote/write.go b/storage/remote/write.go index 20e4ed10d..00e4fa3a0 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -278,6 +278,7 @@ func (rws *WriteStorage) Close() error { type timestampTracker struct { writeStorage *WriteStorage + appendOptions *storage.AppendOptions samples int64 exemplars int64 histograms int64 @@ -285,6 +286,10 @@ type timestampTracker struct { highestRecvTimestamp *maxTimestamp } +func (t *timestampTracker) SetOptions(opts *storage.AppendOptions) { + t.appendOptions = opts +} + // Append implements storage.Appender. func (t *timestampTracker) Append(_ storage.SeriesRef, _ labels.Labels, ts int64, _ float64) (storage.SeriesRef, error) { t.samples++ diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index d91949131..580c7c143 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -833,6 +833,10 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender { return m } +func (m *mockAppendable) SetOptions(opts *storage.AppendOptions) { + panic("unimplemented") +} + func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if m.appendSampleErr != nil { return 0, m.appendSampleErr diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index b2c40b201..5de84c93a 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -763,6 +763,7 @@ func (db *DB) Close() error { type appender struct { *DB + hints *storage.AppendOptions pendingSeries []record.RefSeries pendingSamples []record.RefSample @@ -783,6 +784,10 @@ type appender struct { floatHistogramSeries []*memSeries } +func (a *appender) SetOptions(opts *storage.AppendOptions) { + a.hints = opts +} + func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { // series references and chunk references are identical for agent mode. headRef := chunks.HeadSeriesRef(ref) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index adfd5d4bf..170e74044 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -40,6 +40,12 @@ type initAppender struct { var _ storage.GetRef = &initAppender{} +func (a *initAppender) SetOptions(opts *storage.AppendOptions) { + if a.app != nil { + a.app.SetOptions(opts) + } +} + func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { if a.app != nil { return a.app.Append(ref, lset, t, v) @@ -326,6 +332,11 @@ type headAppender struct { appendID, cleanupAppendIDsBelow uint64 closed bool + hints *storage.AppendOptions +} + +func (a *headAppender) SetOptions(opts *storage.AppendOptions) { + a.hints = opts } func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { @@ -359,13 +370,18 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } s.Lock() + + defer s.Unlock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. - _, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow) + isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err == nil { + if isOOO && a.hints != nil && a.hints.DiscardOutOfOrder { + a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc() + return 0, storage.ErrOutOfOrderSample + } s.pendingCommit = true } - s.Unlock() if delta > 0 { a.head.metrics.oooHistogram.Observe(float64(delta) / 1000) } diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index 7d1f9dda2..e15d591e0 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -30,15 +30,15 @@ import ( // New returns a new TestStorage for testing purposes // that removes all associated files on closing. -func New(t testutil.T) *TestStorage { - stor, err := NewWithError() +func New(t testutil.T, outOfOrderTimeWindow ...int64) *TestStorage { + stor, err := NewWithError(outOfOrderTimeWindow...) require.NoError(t, err) return stor } // NewWithError returns a new TestStorage for user facing tests, which reports // errors directly. -func NewWithError() (*TestStorage, error) { +func NewWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) { dir, err := os.MkdirTemp("", "test_storage") if err != nil { return nil, fmt.Errorf("opening test directory: %w", err) @@ -51,6 +51,14 @@ func NewWithError() (*TestStorage, error) { opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.RetentionDuration = 0 opts.EnableNativeHistograms = true + + // Set OutOfOrderTimeWindow if provided, otherwise use default (0) + if len(outOfOrderTimeWindow) > 0 { + opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0] + } else { + opts.OutOfOrderTimeWindow = 0 // Default value is zero + } + db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats()) if err != nil { return nil, fmt.Errorf("opening test storage: %w", err)