diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b2cb534f1..13e732f96 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -230,6 +230,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) + case "ooo-native-histograms": + c.tsdb.EnableOOONativeHistograms = true + level.Info(logger).Log("msg", "Experimental out-of-order native histogram ingestion enabled. This will only take effect if OutOfOrderTimeWindow is > 0 and if EnableNativeHistograms = true") case "created-timestamp-zero-ingestion": c.scrape.EnableCreatedTimestampZeroIngestion = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. @@ -1735,6 +1738,7 @@ type tsdbOptions struct { EnableNativeHistograms bool EnableDelayedCompaction bool EnableOverlappingCompaction bool + EnableOOONativeHistograms bool } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1754,6 +1758,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { MaxExemplars: opts.MaxExemplars, EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown, EnableNativeHistograms: opts.EnableNativeHistograms, + EnableOOONativeHistograms: opts.EnableOOONativeHistograms, OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow, EnableDelayedCompaction: opts.EnableDelayedCompaction, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, diff --git a/storage/interface.go b/storage/interface.go index a4608e08c..9654c8833 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -43,6 +43,7 @@ var ( ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0") ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled") + ErrOOONativeHistogramsDisabled = fmt.Errorf("out-of-order native histogram ingestion is disabled") // ErrOutOfOrderCT indicates failed append of CT to the storage // due to CT being older the then newer sample. diff --git a/tsdb/db.go b/tsdb/db.go index a5b3a5e60..64e1158d4 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -173,6 +173,12 @@ type Options struct { // EnableNativeHistograms enables the ingestion of native histograms. EnableNativeHistograms bool + // EnableOOONativeHistograms enables the ingestion of OOO native histograms. + // It will only take effect if EnableNativeHistograms is set to true and the + // OutOfOrderTimeWindow is > 0. This flag will be removed after testing of + // OOO Native Histogram ingestion is complete. + EnableOOONativeHistograms bool + // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. // This can change during run-time, so this value from here should only be used // while initialising. @@ -948,6 +954,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.MaxExemplars.Store(opts.MaxExemplars) headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms) + headOpts.EnableOOONativeHistograms.Store(opts.EnableOOONativeHistograms) headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow) headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) headOpts.EnableSharding = opts.EnableSharding @@ -1172,6 +1179,16 @@ func (db *DB) DisableNativeHistograms() { db.head.DisableNativeHistograms() } +// EnableOOONativeHistograms enables the ingestion of out-of-order native histograms. +func (db *DB) EnableOOONativeHistograms() { + db.head.EnableOOONativeHistograms() +} + +// DisableOOONativeHistograms disables the ingestion of out-of-order native histograms. +func (db *DB) DisableOOONativeHistograms() { + db.head.DisableOOONativeHistograms() +} + // dbAppender wraps the DB's head appender and triggers compactions on commit // if necessary. type dbAppender struct { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 8db09d3f4..ef9637748 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -102,23 +102,9 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str for ss.Next() { series := ss.At() - samples := []chunks.Sample{} it = series.Iterator(it) - for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { - switch typ { - case chunkenc.ValFloat: - ts, v := it.At() - samples = append(samples, sample{t: ts, f: v}) - case chunkenc.ValHistogram: - ts, h := it.AtHistogram(nil) - samples = append(samples, sample{t: ts, h: h}) - case chunkenc.ValFloatHistogram: - ts, fh := it.AtFloatHistogram(nil) - samples = append(samples, sample{t: ts, fh: fh}) - default: - t.Fatalf("unknown sample type in query %s", typ.String()) - } - } + samples, err := storage.ExpandSamples(it, newSample) + require.NoError(t, err) require.NoError(t, it.Err()) if len(samples) == 0 { @@ -3996,6 +3982,307 @@ func newTestDB(t *testing.T) *DB { } func TestOOOWALWrite(t *testing.T) { + minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } + + s := labels.NewSymbolTable() + scratchBuilder1 := labels.NewScratchBuilderWithSymbolTable(s, 1) + scratchBuilder1.Add("l", "v1") + s1 := scratchBuilder1.Labels() + scratchBuilder2 := labels.NewScratchBuilderWithSymbolTable(s, 1) + scratchBuilder2.Add("l", "v2") + s2 := scratchBuilder2.Labels() + + scenarios := map[string]struct { + appendSample func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) + expectedOOORecords []interface{} + expectedInORecords []interface{} + }{ + "float": { + appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) { + seriesRef, err := app.Append(0, l, minutes(mins), float64(mins)) + require.NoError(t, err) + return seriesRef, nil + }, + expectedOOORecords: []interface{}{ + // The MmapRef in this are not hand calculated, and instead taken from the test run. + // What is important here is the order of records, and that MmapRef increases for each record. + []record.RefMmapMarker{ + {Ref: 1}, + }, + []record.RefSample{ + {Ref: 1, T: minutes(40), V: 40}, + }, + + []record.RefMmapMarker{ + {Ref: 2}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(42), V: 42}, + }, + + []record.RefSample{ + {Ref: 2, T: minutes(45), V: 45}, + {Ref: 1, T: minutes(35), V: 35}, + }, + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 8}, + }, + []record.RefSample{ + {Ref: 1, T: minutes(36), V: 36}, + {Ref: 1, T: minutes(37), V: 37}, + }, + + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 58}, + }, + []record.RefSample{ // Does not contain the in-order sample here. + {Ref: 1, T: minutes(50), V: 50}, + }, + + // Single commit but multiple OOO records. + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 107}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(50), V: 50}, + {Ref: 2, T: minutes(51), V: 51}, + }, + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 156}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(52), V: 52}, + {Ref: 2, T: minutes(53), V: 53}, + }, + }, + expectedInORecords: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: s1}, + {Ref: 2, Labels: s2}, + }, + []record.RefSample{ + {Ref: 1, T: minutes(60), V: 60}, + {Ref: 2, T: minutes(60), V: 60}, + }, + []record.RefSample{ + {Ref: 1, T: minutes(40), V: 40}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(42), V: 42}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(45), V: 45}, + {Ref: 1, T: minutes(35), V: 35}, + {Ref: 1, T: minutes(36), V: 36}, + {Ref: 1, T: minutes(37), V: 37}, + }, + []record.RefSample{ // Contains both in-order and ooo sample. + {Ref: 1, T: minutes(50), V: 50}, + {Ref: 2, T: minutes(65), V: 65}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(50), V: 50}, + {Ref: 2, T: minutes(51), V: 51}, + {Ref: 2, T: minutes(52), V: 52}, + {Ref: 2, T: minutes(53), V: 53}, + }, + }, + }, + "integer histogram": { + appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) { + seriesRef, err := app.AppendHistogram(0, l, minutes(mins), tsdbutil.GenerateTestHistogram(int(mins)), nil) + require.NoError(t, err) + return seriesRef, nil + }, + expectedOOORecords: []interface{}{ + // The MmapRef in this are not hand calculated, and instead taken from the test run. + // What is important here is the order of records, and that MmapRef increases for each record. + []record.RefMmapMarker{ + {Ref: 1}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(40), H: tsdbutil.GenerateTestHistogram(40)}, + }, + + []record.RefMmapMarker{ + {Ref: 2}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(42), H: tsdbutil.GenerateTestHistogram(42)}, + }, + + []record.RefHistogramSample{ + {Ref: 2, T: minutes(45), H: tsdbutil.GenerateTestHistogram(45)}, + {Ref: 1, T: minutes(35), H: tsdbutil.GenerateTestHistogram(35)}, + }, + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 8}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(36), H: tsdbutil.GenerateTestHistogram(36)}, + {Ref: 1, T: minutes(37), H: tsdbutil.GenerateTestHistogram(37)}, + }, + + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 89}, + }, + []record.RefHistogramSample{ // Does not contain the in-order sample here. + {Ref: 1, T: minutes(50), H: tsdbutil.GenerateTestHistogram(50)}, + }, + + // Single commit but multiple OOO records. + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 172}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(50), H: tsdbutil.GenerateTestHistogram(50)}, + {Ref: 2, T: minutes(51), H: tsdbutil.GenerateTestHistogram(51)}, + }, + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 257}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(52), H: tsdbutil.GenerateTestHistogram(52)}, + {Ref: 2, T: minutes(53), H: tsdbutil.GenerateTestHistogram(53)}, + }, + }, + expectedInORecords: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: s1}, + {Ref: 2, Labels: s2}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(60), H: tsdbutil.GenerateTestHistogram(60)}, + {Ref: 2, T: minutes(60), H: tsdbutil.GenerateTestHistogram(60)}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(40), H: tsdbutil.GenerateTestHistogram(40)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(42), H: tsdbutil.GenerateTestHistogram(42)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(45), H: tsdbutil.GenerateTestHistogram(45)}, + {Ref: 1, T: minutes(35), H: tsdbutil.GenerateTestHistogram(35)}, + {Ref: 1, T: minutes(36), H: tsdbutil.GenerateTestHistogram(36)}, + {Ref: 1, T: minutes(37), H: tsdbutil.GenerateTestHistogram(37)}, + }, + []record.RefHistogramSample{ // Contains both in-order and ooo sample. + {Ref: 1, T: minutes(50), H: tsdbutil.GenerateTestHistogram(50)}, + {Ref: 2, T: minutes(65), H: tsdbutil.GenerateTestHistogram(65)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(50), H: tsdbutil.GenerateTestHistogram(50)}, + {Ref: 2, T: minutes(51), H: tsdbutil.GenerateTestHistogram(51)}, + {Ref: 2, T: minutes(52), H: tsdbutil.GenerateTestHistogram(52)}, + {Ref: 2, T: minutes(53), H: tsdbutil.GenerateTestHistogram(53)}, + }, + }, + }, + "float histogram": { + appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) { + seriesRef, err := app.AppendHistogram(0, l, minutes(mins), nil, tsdbutil.GenerateTestFloatHistogram(int(mins))) + require.NoError(t, err) + return seriesRef, nil + }, + expectedOOORecords: []interface{}{ + // The MmapRef in this are not hand calculated, and instead taken from the test run. + // What is important here is the order of records, and that MmapRef increases for each record. + []record.RefMmapMarker{ + {Ref: 1}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(40), FH: tsdbutil.GenerateTestFloatHistogram(40)}, + }, + + []record.RefMmapMarker{ + {Ref: 2}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(42), FH: tsdbutil.GenerateTestFloatHistogram(42)}, + }, + + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(45), FH: tsdbutil.GenerateTestFloatHistogram(45)}, + {Ref: 1, T: minutes(35), FH: tsdbutil.GenerateTestFloatHistogram(35)}, + }, + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 8}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(36), FH: tsdbutil.GenerateTestFloatHistogram(36)}, + {Ref: 1, T: minutes(37), FH: tsdbutil.GenerateTestFloatHistogram(37)}, + }, + + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 177}, + }, + []record.RefFloatHistogramSample{ // Does not contain the in-order sample here. + {Ref: 1, T: minutes(50), FH: tsdbutil.GenerateTestFloatHistogram(50)}, + }, + + // Single commit but multiple OOO records. + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 348}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(50), FH: tsdbutil.GenerateTestFloatHistogram(50)}, + {Ref: 2, T: minutes(51), FH: tsdbutil.GenerateTestFloatHistogram(51)}, + }, + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 521}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(52), FH: tsdbutil.GenerateTestFloatHistogram(52)}, + {Ref: 2, T: minutes(53), FH: tsdbutil.GenerateTestFloatHistogram(53)}, + }, + }, + expectedInORecords: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: s1}, + {Ref: 2, Labels: s2}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(60), FH: tsdbutil.GenerateTestFloatHistogram(60)}, + {Ref: 2, T: minutes(60), FH: tsdbutil.GenerateTestFloatHistogram(60)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(40), FH: tsdbutil.GenerateTestFloatHistogram(40)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(42), FH: tsdbutil.GenerateTestFloatHistogram(42)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(45), FH: tsdbutil.GenerateTestFloatHistogram(45)}, + {Ref: 1, T: minutes(35), FH: tsdbutil.GenerateTestFloatHistogram(35)}, + {Ref: 1, T: minutes(36), FH: tsdbutil.GenerateTestFloatHistogram(36)}, + {Ref: 1, T: minutes(37), FH: tsdbutil.GenerateTestFloatHistogram(37)}, + }, + []record.RefFloatHistogramSample{ // Contains both in-order and ooo sample. + {Ref: 1, T: minutes(50), FH: tsdbutil.GenerateTestFloatHistogram(50)}, + {Ref: 2, T: minutes(65), FH: tsdbutil.GenerateTestFloatHistogram(65)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(50), FH: tsdbutil.GenerateTestFloatHistogram(50)}, + {Ref: 2, T: minutes(51), FH: tsdbutil.GenerateTestFloatHistogram(51)}, + {Ref: 2, T: minutes(52), FH: tsdbutil.GenerateTestFloatHistogram(52)}, + {Ref: 2, T: minutes(53), FH: tsdbutil.GenerateTestFloatHistogram(53)}, + }, + }, + }, + } + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + testOOOWALWrite(t, scenario.appendSample, scenario.expectedOOORecords, scenario.expectedInORecords) + }) + } +} + +func testOOOWALWrite(t *testing.T, + appendSample func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error), + expectedOOORecords []interface{}, + expectedInORecords []interface{}, +) { dir := t.TempDir() opts := DefaultOptions() @@ -4004,18 +4291,14 @@ func TestOOOWALWrite(t *testing.T) { db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() t.Cleanup(func() { require.NoError(t, db.Close()) }) s1, s2 := labels.FromStrings("l", "v1"), labels.FromStrings("l", "v2") - minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - - appendSample := func(app storage.Appender, l labels.Labels, mins int64) { - _, err = app.Append(0, l, minutes(mins), float64(mins)) - require.NoError(t, err) - } // Ingest sample at 1h. app := db.Appender(context.Background()) @@ -4055,92 +4338,6 @@ func TestOOOWALWrite(t *testing.T) { appendSample(app, s2, 53) require.NoError(t, app.Commit()) - // The MmapRef in this are not hand calculated, and instead taken from the test run. - // What is important here is the order of records, and that MmapRef increases for each record. - oooRecords := []interface{}{ - []record.RefMmapMarker{ - {Ref: 1}, - }, - []record.RefSample{ - {Ref: 1, T: minutes(40), V: 40}, - }, - - []record.RefMmapMarker{ - {Ref: 2}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(42), V: 42}, - }, - - []record.RefSample{ - {Ref: 2, T: minutes(45), V: 45}, - {Ref: 1, T: minutes(35), V: 35}, - }, - []record.RefMmapMarker{ // 3rd sample, hence m-mapped. - {Ref: 1, MmapRef: 4294967304}, - }, - []record.RefSample{ - {Ref: 1, T: minutes(36), V: 36}, - {Ref: 1, T: minutes(37), V: 37}, - }, - - []record.RefMmapMarker{ // 3rd sample, hence m-mapped. - {Ref: 1, MmapRef: 4294967354}, - }, - []record.RefSample{ // Does not contain the in-order sample here. - {Ref: 1, T: minutes(50), V: 50}, - }, - - // Single commit but multiple OOO records. - []record.RefMmapMarker{ - {Ref: 2, MmapRef: 4294967403}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(50), V: 50}, - {Ref: 2, T: minutes(51), V: 51}, - }, - []record.RefMmapMarker{ - {Ref: 2, MmapRef: 4294967452}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(52), V: 52}, - {Ref: 2, T: minutes(53), V: 53}, - }, - } - - inOrderRecords := []interface{}{ - []record.RefSeries{ - {Ref: 1, Labels: s1}, - {Ref: 2, Labels: s2}, - }, - []record.RefSample{ - {Ref: 1, T: minutes(60), V: 60}, - {Ref: 2, T: minutes(60), V: 60}, - }, - []record.RefSample{ - {Ref: 1, T: minutes(40), V: 40}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(42), V: 42}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(45), V: 45}, - {Ref: 1, T: minutes(35), V: 35}, - {Ref: 1, T: minutes(36), V: 36}, - {Ref: 1, T: minutes(37), V: 37}, - }, - []record.RefSample{ // Contains both in-order and ooo sample. - {Ref: 1, T: minutes(50), V: 50}, - {Ref: 2, T: minutes(65), V: 65}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(50), V: 50}, - {Ref: 2, T: minutes(51), V: 51}, - {Ref: 2, T: minutes(52), V: 52}, - {Ref: 2, T: minutes(53), V: 53}, - }, - } - getRecords := func(walDir string) []interface{} { sr, err := wlog.NewSegmentsReader(walDir) require.NoError(t, err) @@ -4149,10 +4346,8 @@ func TestOOOWALWrite(t *testing.T) { require.NoError(t, sr.Close()) }() - var ( - records []interface{} - dec record.Decoder = record.NewDecoder(labels.NewSymbolTable()) - ) + var records []interface{} + dec := record.NewDecoder(nil) for r.Next() { rec := r.Record() switch typ := dec.Type(rec); typ { @@ -4168,6 +4363,14 @@ func TestOOOWALWrite(t *testing.T) { markers, err := dec.MmapMarkers(rec, nil) require.NoError(t, err) records = append(records, markers) + case record.HistogramSamples: + histogramSamples, err := dec.HistogramSamples(rec, nil) + require.NoError(t, err) + records = append(records, histogramSamples) + case record.FloatHistogramSamples: + floatHistogramSamples, err := dec.FloatHistogramSamples(rec, nil) + require.NoError(t, err) + records = append(records, floatHistogramSamples) default: t.Fatalf("got a WAL record that is not series or samples: %v", typ) } @@ -4178,11 +4381,11 @@ func TestOOOWALWrite(t *testing.T) { // The normal WAL. actRecs := getRecords(path.Join(dir, "wal")) - testutil.RequireEqual(t, inOrderRecords, actRecs) + require.Equal(t, expectedInORecords, actRecs) // The WBL. actRecs = getRecords(path.Join(dir, wlog.WblDirName)) - testutil.RequireEqual(t, oooRecords, actRecs) + require.Equal(t, expectedOOORecords, actRecs) } // Tests https://github.com/prometheus/prometheus/issues/10291#issuecomment-1044373110. @@ -4495,6 +4698,160 @@ func TestMetadataAssertInMemoryData(t *testing.T) { require.Equal(t, *reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4) } +// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the +// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This +// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed. +// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a +// histogram in a single write request. +func TestMultipleEncodingsCommitOrder(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() + + series1 := labels.FromStrings("foo", "bar1") + + db := openTestDB(t, opts, nil) + db.DisableCompactions() + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + defer func() { + require.NoError(t, db.Close()) + }() + + addSample := func(app storage.Appender, ts int64, valType chunkenc.ValueType) chunks.Sample { + if valType == chunkenc.ValFloat { + _, err := app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts)) + require.NoError(t, err) + return sample{t: ts, f: float64(ts)} + } + if valType == chunkenc.ValHistogram { + h := tsdbutil.GenerateTestHistogram(int(ts)) + _, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil) + require.NoError(t, err) + return sample{t: ts, h: h} + } + fh := tsdbutil.GenerateTestFloatHistogram(int(ts)) + _, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh) + require.NoError(t, err) + return sample{t: ts, fh: fh} + } + + verifySamples := func(minT, maxT int64, expSamples []chunks.Sample, oooCount int) { + requireEqualOOOSamples(t, oooCount, db) + + // Verify samples querier. + querier, err := db.Querier(minT, maxT) + require.NoError(t, err) + defer querier.Close() + + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) + require.Len(t, seriesSet, 1) + gotSamples := seriesSet[series1.String()] + requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + + // Verify chunks querier. + chunkQuerier, err := db.ChunkQuerier(minT, maxT) + require.NoError(t, err) + defer chunkQuerier.Close() + + chks := queryChunks(t, chunkQuerier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) + require.NotNil(t, chks[series1.String()]) + require.Len(t, chks, 1) + var gotChunkSamples []chunks.Sample + for _, chunk := range chks[series1.String()] { + it := chunk.Chunk.Iterator(nil) + smpls, err := storage.ExpandSamples(it, newSample) + require.NoError(t, err) + gotChunkSamples = append(gotChunkSamples, smpls...) + require.NoError(t, it.Err()) + } + requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, true) + } + + var expSamples []chunks.Sample + + // Append samples with different encoding types and then commit them at once. + app := db.Appender(context.Background()) + + for i := 100; i < 105; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the + // same batch. + for i := 110; i < 120; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the + // same batch. + for i := 120; i < 130; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as in-order as their timestamps are greater than the max timestamp for float + // samples in the same batch. + for i := 140; i < 150; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150 + // because float samples are processed first and these samples are in-order wrt to the float samples in the batch. + for i := 130; i < 135; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + + require.NoError(t, app.Commit()) + + sort.Slice(expSamples, func(i, j int) bool { + return expSamples[i].T() < expSamples[j].T() + }) + + // oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO. + verifySamples(100, 150, expSamples, 20) + + // Append and commit some in-order histograms by themselves. + app = db.Appender(context.Background()) + for i := 150; i < 160; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + require.NoError(t, app.Commit()) + + // oooCount remains at 20 as no new OOO samples have been added. + verifySamples(100, 160, expSamples, 20) + + // Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples + // with newer timestamps have already been committed. + app = db.Appender(context.Background()) + for i := 50; i < 55; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + for i := 60; i < 70; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + for i := 70; i < 75; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + for i := 80; i < 90; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + require.NoError(t, app.Commit()) + + // Sort samples again because OOO samples have been added. + sort.Slice(expSamples, func(i, j int) bool { + return expSamples[i].T() < expSamples[j].T() + }) + + // oooCount = 50 as we've added 30 more OOO samples. + verifySamples(50, 160, expSamples, 50) +} + // TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start // // are not included in this compaction. @@ -4516,6 +4873,8 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample opts := DefaultOptions() opts.OutOfOrderCapMax = 30 opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -4721,6 +5080,8 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) db.DisableCompactions() // We want to manually call it. + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() t.Cleanup(func() { require.NoError(t, db.Close()) }) @@ -4826,10 +5187,14 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen opts.OutOfOrderCapMax = 30 opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds() opts.WALSegmentSize = -1 // disabled WAL and WBL + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) db.DisableCompactions() // We want to manually call it. + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() t.Cleanup(func() { require.NoError(t, db.Close()) }) @@ -4935,6 +5300,8 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa opts.OutOfOrderCapMax = 10 opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds() opts.EnableMemorySnapshotOnShutdown = true + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -5034,7 +5401,67 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa verifySamples(90, 109) } -func Test_Querier_OOOQuery(t *testing.T) { +func TestQuerierOOOQuery(t *testing.T) { + scenarios := map[string]struct { + appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) + sampleFunc func(ts int64) chunks.Sample + }{ + "float": { + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + return app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts)) + }, + sampleFunc: func(ts int64) chunks.Sample { + return sample{t: ts, f: float64(ts)} + }, + }, + "integer histogram": { + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + h := tsdbutil.GenerateTestHistogram(int(ts)) + if counterReset { + h.CounterResetHint = histogram.CounterReset + } + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil) + }, + sampleFunc: func(ts int64) chunks.Sample { + return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} + }, + }, + "float histogram": { + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + fh := tsdbutil.GenerateTestFloatHistogram(int(ts)) + if counterReset { + fh.CounterResetHint = histogram.CounterReset + } + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh) + }, + sampleFunc: func(ts int64) chunks.Sample { + return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts))} + }, + }, + "integer histogram counter resets": { + // Adding counter reset to all histograms means each histogram will have its own chunk. + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + h := tsdbutil.GenerateTestHistogram(int(ts)) + h.CounterResetHint = histogram.CounterReset // For this scenario, ignore the counterReset argument. + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil) + }, + sampleFunc: func(ts int64) chunks.Sample { + return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} + }, + }, + } + + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + testQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc) + }) + } +} + +func testQuerierOOOQuery(t *testing.T, + appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error), + sampleFunc func(ts int64) chunks.Sample, +) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() @@ -5044,16 +5471,16 @@ func Test_Querier_OOOQuery(t *testing.T) { defaultFilterFunc := func(t int64) bool { return true } minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc) ([]chunks.Sample, int) { + addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc, counterReset bool) ([]chunks.Sample, int) { app := db.Appender(context.Background()) totalAppended := 0 for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() { if !filter(m / time.Minute.Milliseconds()) { continue } - _, err := app.Append(0, series1, m, float64(m)) + _, err := appendFunc(app, m, counterReset) if m >= queryMinT && m <= queryMaxT { - expSamples = append(expSamples, sample{t: m, f: float64(m)}) + expSamples = append(expSamples, sampleFunc(m)) } require.NoError(t, err) totalAppended++ @@ -5064,10 +5491,11 @@ func Test_Querier_OOOQuery(t *testing.T) { } type sampleBatch struct { - minT int64 - maxT int64 - filter filterFunc - isOOO bool + minT int64 + maxT int64 + filter filterFunc + counterReset bool + isOOO bool } tests := []struct { @@ -5115,6 +5543,31 @@ func Test_Querier_OOOQuery(t *testing.T) { }, }, }, + { + name: "alternating OOO batches", // In order: 100-200 normal. out of order first path: 0, 2, 4, ... 98 (no counter reset), second pass: 1, 3, 5, ... 99 (with counter reset). + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: defaultFilterFunc, + }, + { + minT: minutes(0), + maxT: minutes(99), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: true, + }, + { + minT: minutes(0), + maxT: minutes(99), + filter: func(t int64) bool { return t%2 == 1 }, + counterReset: true, + isOOO: true, + }, + }, + }, { name: "query overlapping inorder and ooo samples returns all ingested samples at the end of the interval", oooCap: 30, @@ -5213,6 +5666,8 @@ func Test_Querier_OOOQuery(t *testing.T) { opts.OutOfOrderCapMax = tc.oooCap db := openTestDB(t, opts, nil) db.DisableCompactions() + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() defer func() { require.NoError(t, db.Close()) }() @@ -5221,7 +5676,7 @@ func Test_Querier_OOOQuery(t *testing.T) { var oooSamples, appendedCount int for _, batch := range tc.batches { - expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter) + expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter, batch.counterReset) if batch.isOOO { oooSamples += appendedCount } @@ -5236,35 +5691,95 @@ func Test_Querier_OOOQuery(t *testing.T) { defer querier.Close() seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) - require.NotNil(t, seriesSet[series1.String()]) + gotSamples := seriesSet[series1.String()] + require.NotNil(t, gotSamples) require.Len(t, seriesSet, 1) - require.Equal(t, expSamples, seriesSet[series1.String()]) + requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) requireEqualOOOSamples(t, oooSamples, db) }) } } -func Test_ChunkQuerier_OOOQuery(t *testing.T) { - opts := DefaultOptions() - opts.OutOfOrderCapMax = 30 - opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() - - series1 := labels.FromStrings("foo", "bar1") - - type filterFunc func(t int64) bool - defaultFilterFunc := func(t int64) bool { return true } - +func TestChunkQuerierOOOQuery(t *testing.T) { + scenarios := map[string]struct { + appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) + sampleFunc func(ts int64) chunks.Sample + }{ + "float": { + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + return app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts)) + }, + sampleFunc: func(ts int64) chunks.Sample { + return sample{t: ts, f: float64(ts)} + }, + }, + "integer histogram": { + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + h := tsdbutil.GenerateTestHistogram(int(ts)) + if counterReset { + h.CounterResetHint = histogram.CounterReset + } + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil) + }, + sampleFunc: func(ts int64) chunks.Sample { + return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} + }, + }, + "float histogram": { + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + fh := tsdbutil.GenerateTestFloatHistogram(int(ts)) + if counterReset { + fh.CounterResetHint = histogram.CounterReset + } + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh) + }, + sampleFunc: func(ts int64) chunks.Sample { + return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts))} + }, + }, + "integer histogram counter resets": { + // Adding counter reset to all histograms means each histogram will have its own chunk. + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + h := tsdbutil.GenerateTestHistogram(int(ts)) + h.CounterResetHint = histogram.CounterReset // For this scenario, ignore the counterReset argument. + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil) + }, + sampleFunc: func(ts int64) chunks.Sample { + return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} + }, + }, + } + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc) + }) + } +} + +func testChunkQuerierOOOQuery(t *testing.T, + appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error), + sampleFunc func(ts int64) chunks.Sample, +) { + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() + + series1 := labels.FromStrings("foo", "bar1") + + type filterFunc func(t int64) bool + defaultFilterFunc := func(t int64) bool { return true } + minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc) ([]chunks.Sample, int) { + addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc, counterReset bool) ([]chunks.Sample, int) { app := db.Appender(context.Background()) totalAppended := 0 for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() { if !filter(m / time.Minute.Milliseconds()) { continue } - _, err := app.Append(0, series1, m, float64(m)) + _, err := appendFunc(app, m, counterReset) if m >= queryMinT && m <= queryMaxT { - expSamples = append(expSamples, sample{t: m, f: float64(m)}) + expSamples = append(expSamples, sampleFunc(m)) } require.NoError(t, err) totalAppended++ @@ -5275,10 +5790,11 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { } type sampleBatch struct { - minT int64 - maxT int64 - filter filterFunc - isOOO bool + minT int64 + maxT int64 + filter filterFunc + counterReset bool + isOOO bool } tests := []struct { @@ -5326,6 +5842,31 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { }, }, }, + { + name: "alternating OOO batches", // In order: 100-200 normal. out of order first path: 0, 2, 4, ... 98 (no counter reset), second pass: 1, 3, 5, ... 99 (with counter reset). + queryMinT: minutes(0), + queryMaxT: minutes(200), + batches: []sampleBatch{ + { + minT: minutes(100), + maxT: minutes(200), + filter: defaultFilterFunc, + }, + { + minT: minutes(0), + maxT: minutes(99), + filter: func(t int64) bool { return t%2 == 0 }, + isOOO: true, + }, + { + minT: minutes(0), + maxT: minutes(99), + filter: func(t int64) bool { return t%2 == 1 }, + counterReset: true, + isOOO: true, + }, + }, + }, { name: "query overlapping inorder and ooo samples returns all ingested samples at the end of the interval", oooCap: 30, @@ -5424,6 +5965,8 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { opts.OutOfOrderCapMax = tc.oooCap db := openTestDB(t, opts, nil) db.DisableCompactions() + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() defer func() { require.NoError(t, db.Close()) }() @@ -5432,7 +5975,7 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { var oooSamples, appendedCount int for _, batch := range tc.batches { - expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter) + expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter, batch.counterReset) if batch.isOOO { oooSamples += appendedCount } @@ -5453,12 +5996,186 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { var gotSamples []chunks.Sample for _, chunk := range chks[series1.String()] { it := chunk.Chunk.Iterator(nil) - for it.Next() == chunkenc.ValFloat { - ts, v := it.At() - gotSamples = append(gotSamples, sample{t: ts, f: v}) + smpls, err := storage.ExpandSamples(it, newSample) + require.NoError(t, err) + gotSamples = append(gotSamples, smpls...) + require.NoError(t, it.Err()) + } + requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + }) + } +} + +// TestOOONativeHistogramsWithCounterResets verifies the counter reset headers for in-order and out-of-order samples +// upon ingestion. Note that when the counter reset(s) occur in OOO samples, the header is set to UnknownCounterReset +// rather than CounterReset. This is because with OOO native histogram samples, it cannot be definitely +// determined if a counter reset occurred because the samples are not consecutive, and another sample +// could potentially come in that would change the status of the header. In this case, the UnknownCounterReset +// headers would be re-checked at query time and updated as needed. However, this test is checking the counter +// reset headers at the time of storage. +func TestOOONativeHistogramsWithCounterResets(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + if name == intHistogram || name == floatHistogram { + testOOONativeHistogramsWithCounterResets(t, scenario) + } + }) + } +} + +func testOOONativeHistogramsWithCounterResets(t *testing.T, scenario sampleTypeScenario) { + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() + + type resetFunc func(v int64) bool + defaultResetFunc := func(v int64) bool { return false } + + lbls := labels.FromStrings("foo", "bar1") + minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } + + type sampleBatch struct { + from int64 + until int64 + shouldReset resetFunc + expCounterResetHints []histogram.CounterResetHint + } + + tests := []struct { + name string + queryMin int64 + queryMax int64 + batches []sampleBatch + expectedSamples []chunks.Sample + }{ + { + name: "Counter reset within in-order samples", + queryMin: minutes(40), + queryMax: minutes(55), + batches: []sampleBatch{ + // In-order samples + { + from: 40, + until: 50, + shouldReset: func(v int64) bool { + return v == 45 + }, + expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.CounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset}, + }, + }, + }, + { + name: "Counter reset right at beginning of OOO samples", + queryMin: minutes(40), + queryMax: minutes(55), + batches: []sampleBatch{ + // In-order samples + { + from: 40, + until: 45, + shouldReset: defaultResetFunc, + expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset}, + }, + { + from: 50, + until: 55, + shouldReset: defaultResetFunc, + expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset}, + }, + // OOO samples + { + from: 45, + until: 50, + shouldReset: func(v int64) bool { + return v == 45 + }, + expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset}, + }, + }, + }, + { + name: "Counter resets in both in-order and OOO samples", + queryMin: minutes(40), + queryMax: minutes(55), + batches: []sampleBatch{ + // In-order samples + { + from: 40, + until: 45, + shouldReset: func(v int64) bool { + return v == 44 + }, + expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.UnknownCounterReset}, + }, + { + from: 50, + until: 55, + shouldReset: defaultResetFunc, + expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset}, + }, + // OOO samples + { + from: 45, + until: 50, + shouldReset: func(v int64) bool { + return v == 49 + }, + expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.UnknownCounterReset}, + }, + }, + }, + } + for _, tc := range tests { + t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { + db := openTestDB(t, opts, nil) + db.DisableCompactions() + db.EnableOOONativeHistograms() + defer func() { + require.NoError(t, db.Close()) + }() + + app := db.Appender(context.Background()) + + expSamples := make(map[string][]chunks.Sample) + + for _, batch := range tc.batches { + j := batch.from + smplIdx := 0 + for i := batch.from; i < batch.until; i++ { + resetCount := batch.shouldReset(i) + if resetCount { + j = 0 + } + _, s, err := scenario.appendFunc(app, lbls, minutes(i), j) + require.NoError(t, err) + if s.Type() == chunkenc.ValHistogram { + s.H().CounterResetHint = batch.expCounterResetHints[smplIdx] + } else if s.Type() == chunkenc.ValFloatHistogram { + s.FH().CounterResetHint = batch.expCounterResetHints[smplIdx] + } + expSamples[lbls.String()] = append(expSamples[lbls.String()], s) + j++ + smplIdx++ } } - require.Equal(t, expSamples, gotSamples) + + require.NoError(t, app.Commit()) + + for k, v := range expSamples { + sort.Slice(v, func(i, j int) bool { + return v[i].T() < v[j].T() + }) + expSamples[k] = v + } + + querier, err := db.Querier(tc.queryMin, tc.queryMax) + require.NoError(t, err) + defer querier.Close() + + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) + require.NotNil(t, seriesSet[lbls.String()]) + require.Len(t, seriesSet, 1) + requireEqualSeries(t, expSamples, seriesSet, false) }) } } @@ -5478,6 +6195,8 @@ func testOOOAppendAndQuery(t *testing.T, scenario sampleTypeScenario) { db := openTestDB(t, opts, nil) db.DisableCompactions() + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() t.Cleanup(func() { require.NoError(t, db.Close()) }) @@ -5555,9 +6274,9 @@ func testOOOAppendAndQuery(t *testing.T, scenario sampleTypeScenario) { addSample(s2, 255, 265, false) verifyOOOMinMaxTimes(250, 265) testQuery(math.MinInt64, math.MaxInt64) - testQuery(minutes(250), minutes(265)) // Test querying ooo data time range - testQuery(minutes(290), minutes(300)) // Test querying in-order data time range - testQuery(minutes(250), minutes(300)) // Test querying the entire range + testQuery(minutes(250), minutes(265)) // Test querying ooo data time range. + testQuery(minutes(290), minutes(300)) // Test querying in-order data time range. + testQuery(minutes(250), minutes(300)) // Test querying the entire range. // Out of time window. addSample(s1, 59, 59, true) @@ -5609,6 +6328,8 @@ func testOOODisabled(t *testing.T, scenario sampleTypeScenario) { opts.OutOfOrderTimeWindow = 0 db := openTestDB(t, opts, nil) db.DisableCompactions() + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() t.Cleanup(func() { require.NoError(t, db.Close()) }) @@ -5681,6 +6402,8 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db := openTestDB(t, opts, nil) db.DisableCompactions() @@ -5861,6 +6584,380 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) { }) } +func TestOOOHistogramCompactionWithCounterResets(t *testing.T) { + for _, floatHistogram := range []bool{false, true} { + dir := t.TempDir() + ctx := context.Background() + + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 500 * time.Minute.Milliseconds() + + db, err := Open(dir, nil, nil, opts, nil) + require.NoError(t, err) + db.DisableCompactions() // We want to manually call it. + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + series1 := labels.FromStrings("foo", "bar1") + series2 := labels.FromStrings("foo", "bar2") + + var series1ExpSamplesPreCompact, series2ExpSamplesPreCompact, series1ExpSamplesPostCompact, series2ExpSamplesPostCompact []chunks.Sample + + addSample := func(ts int64, l labels.Labels, val int, hint histogram.CounterResetHint) sample { + app := db.Appender(context.Background()) + tsMs := ts * time.Minute.Milliseconds() + if floatHistogram { + h := tsdbutil.GenerateTestFloatHistogram(val) + h.CounterResetHint = hint + _, err = app.AppendHistogram(0, l, tsMs, nil, h) + require.NoError(t, err) + require.NoError(t, app.Commit()) + return sample{t: tsMs, fh: h.Copy()} + } + + h := tsdbutil.GenerateTestHistogram(val) + h.CounterResetHint = hint + _, err = app.AppendHistogram(0, l, tsMs, h, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + return sample{t: tsMs, h: h.Copy()} + } + + // Add an in-order sample to each series. + s := addSample(520, series1, 1000000, histogram.UnknownCounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + + s = addSample(520, series2, 1000000, histogram.UnknownCounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + + // Verify that the in-memory ooo chunk is empty. + checkEmptyOOOChunk := func(lbls labels.Labels) { + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.Nil(t, ms.ooo) + } + + checkEmptyOOOChunk(series1) + checkEmptyOOOChunk(series2) + + // Add samples for series1. There are three head chunks that will be created: + // Chunk 1 - Samples between 100 - 440. One explicit counter reset at ts 250. + // Chunk 2 - Samples between 105 - 395. Overlaps with Chunk 1. One detected counter reset at ts 165. + // Chunk 3 - Samples between 480 - 509. All within one block boundary. One detected counter reset at 490. + + // Chunk 1. + // First add 10 samples. + for i := 100; i < 200; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Before compaction, all the samples have UnknownCounterReset even though they've been added to the same + // chunk. This is because they overlap with the samples from chunk two and when merging two chunks on read, + // the header is set as unknown when the next sample is not in the same chunk as the previous one. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // After compaction, samples from multiple mmapped chunks will be merged, so there won't be any overlapping + // chunks. Therefore, most samples will have the NotCounterReset header. + // 100 is the first sample in the first chunk in the blocks, so is still set to UnknownCounterReset. + // 120 is a block boundary - after compaction, 120 will be the first sample in a chunk, so is still set to + // UnknownCounterReset. + if i > 100 && i != 120 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + // Explicit counter reset - the counter reset header is set to CounterReset but the value is higher + // than for the previous timestamp. Explicit counter reset headers are actually ignored though, so when reading + // the sample back you actually get unknown/not counter reset. This is as the chainSampleIterator ignores + // existing headers and sets the header as UnknownCounterReset if the next sample is not in the same chunk as + // the previous one, and counter resets always create a new chunk. + // This case has been added to document what's happening, though it might not be the ideal behavior. + s = addSample(250, series1, 100000+250, histogram.CounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, copyWithCounterReset(s, histogram.UnknownCounterReset)) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.NotCounterReset)) + + // Add 19 more samples to complete a chunk. + for i := 260; i < 450; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // The samples with timestamp less than 410 overlap with the samples from chunk 2, so before compaction, + // they're all UnknownCounterReset. Samples greater than or equal to 410 don't overlap with other chunks + // so they're always detected as NotCounterReset pre and post compaction/ + if i >= 410 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // + // 360 is a block boundary, so after compaction its header is still UnknownCounterReset. + if i != 360 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Chunk 2. + // Add six OOO samples. + for i := 105; i < 165; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Samples overlap with chunk 1 so before compaction all headers are UnknownCounterReset. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.NotCounterReset)) + } + + // Add sample that will be detected as a counter reset. + s = addSample(165, series1, 100000, histogram.UnknownCounterReset) + // Before compaction, sample has an UnknownCounterReset header due to the chainSampleIterator. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // After compaction, the sample's counter reset is properly detected. + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.CounterReset)) + + // Add 23 more samples to complete a chunk. + for i := 175; i < 405; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Samples between 205-255 overlap with chunk 1 so before compaction those samples will have the + // UnknownCounterReset header. + if i >= 205 && i < 255 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // 245 is the first sample >= the block boundary at 240, so it's still UnknownCounterReset after compaction. + if i != 245 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } else { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Chunk 3. + for i := 480; i < 490; i++ { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // No overlapping samples in other chunks, so all other samples will already be detected as NotCounterReset + // before compaction. + if i > 480 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // 480 is block boundary. + if i == 480 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + // Counter reset. + s = addSample(int64(490), series1, 100000, histogram.UnknownCounterReset) + s = copyWithCounterReset(s, histogram.CounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + // Add some more samples after the counter reset. + for i := 491; i < 510; i++ { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + s = copyWithCounterReset(s, histogram.NotCounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Add samples for series2 - one chunk with one detected counter reset at 300. + for i := 200; i < 300; i += 10 { + s = addSample(int64(i), series2, 100000+i, histogram.UnknownCounterReset) + if i > 200 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + if i == 240 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + } + // Counter reset. + s = addSample(int64(300), series2, 100000, histogram.UnknownCounterReset) + s = copyWithCounterReset(s, histogram.CounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + // Add some more samples after the counter reset. + for i := 310; i < 500; i += 10 { + s := addSample(int64(i), series2, 100000+i, histogram.UnknownCounterReset) + s = copyWithCounterReset(s, histogram.NotCounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + // 360 and 480 are block boundaries. + if i == 360 || i == 480 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + } + + // Sort samples (as OOO samples not added in time-order). + sort.Slice(series1ExpSamplesPreCompact, func(i, j int) bool { + return series1ExpSamplesPreCompact[i].T() < series1ExpSamplesPreCompact[j].T() + }) + sort.Slice(series1ExpSamplesPostCompact, func(i, j int) bool { + return series1ExpSamplesPostCompact[i].T() < series1ExpSamplesPostCompact[j].T() + }) + sort.Slice(series2ExpSamplesPreCompact, func(i, j int) bool { + return series2ExpSamplesPreCompact[i].T() < series2ExpSamplesPreCompact[j].T() + }) + sort.Slice(series2ExpSamplesPostCompact, func(i, j int) bool { + return series2ExpSamplesPostCompact[i].T() < series2ExpSamplesPostCompact[j].T() + }) + + verifyDBSamples := func(s1Samples, s2Samples []chunks.Sample) { + expRes := map[string][]chunks.Sample{ + series1.String(): s1Samples, + series2.String(): s2Samples, + } + + q, err := db.Querier(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + requireEqualSeries(t, expRes, actRes, false) + } + + // Verify DB samples before compaction. + verifyDBSamples(series1ExpSamplesPreCompact, series2ExpSamplesPreCompact) + + // Verify that the in-memory ooo chunk is not empty. + checkNonEmptyOOOChunk := func(lbls labels.Labels) { + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples()) + } + + checkNonEmptyOOOChunk(series1) + checkNonEmptyOOOChunk(series2) + + // No blocks before compaction. + require.Empty(t, db.Blocks()) + + // There is a 0th WBL file. + require.NoError(t, db.head.wbl.Sync()) // syncing to make sure wbl is flushed in windows + files, err := os.ReadDir(db.head.wbl.Dir()) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "00000000", files[0].Name()) + f, err := files[0].Info() + require.NoError(t, err) + require.Greater(t, f.Size(), int64(100)) + + // OOO compaction happens here. + require.NoError(t, db.CompactOOOHead(ctx)) + + // Check that blocks are created after compaction. + require.Len(t, db.Blocks(), 5) + + // Check samples after compaction. + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + + // 0th WBL file will be deleted and 1st will be the only present. + files, err = os.ReadDir(db.head.wbl.Dir()) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "00000001", files[0].Name()) + f, err = files[0].Info() + require.NoError(t, err) + require.Equal(t, int64(0), f.Size()) + + // OOO stuff should not be present in the Head now. + checkEmptyOOOChunk(series1) + checkEmptyOOOChunk(series2) + + verifyBlockSamples := func(block *Block, fromMins, toMins int64) { + var series1Samples, series2Samples []chunks.Sample + + for _, s := range series1ExpSamplesPostCompact { + if s.T() >= fromMins*time.Minute.Milliseconds() { + // Samples should be sorted, so break out of loop when we reach a timestamp that's too big. + if s.T() > toMins*time.Minute.Milliseconds() { + break + } + series1Samples = append(series1Samples, s) + } + } + for _, s := range series2ExpSamplesPostCompact { + if s.T() >= fromMins*time.Minute.Milliseconds() { + // Samples should be sorted, so break out of loop when we reach a timestamp that's too big. + if s.T() > toMins*time.Minute.Milliseconds() { + break + } + series2Samples = append(series2Samples, s) + } + } + + expRes := map[string][]chunks.Sample{} + if len(series1Samples) != 0 { + expRes[series1.String()] = series1Samples + } + if len(series2Samples) != 0 { + expRes[series2.String()] = series2Samples + } + + q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + requireEqualSeries(t, expRes, actRes, false) + } + + // Checking for expected data in the blocks. + verifyBlockSamples(db.Blocks()[0], 100, 119) + verifyBlockSamples(db.Blocks()[1], 120, 239) + verifyBlockSamples(db.Blocks()[2], 240, 359) + verifyBlockSamples(db.Blocks()[3], 360, 479) + verifyBlockSamples(db.Blocks()[4], 480, 509) + + // There should be a single m-map file. + mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) + files, err = os.ReadDir(mmapDir) + require.NoError(t, err) + require.Len(t, files, 1) + + // Compact the in-order head and expect another block. + // Since this is a forced compaction, this block is not aligned with 2h. + err = db.CompactHead(NewRangeHead(db.head, 500*time.Minute.Milliseconds(), 550*time.Minute.Milliseconds())) + require.NoError(t, err) + require.Len(t, db.Blocks(), 6) + verifyBlockSamples(db.Blocks()[5], 520, 520) + + // Blocks created out of normal and OOO head now. But not merged. + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + + // The compaction also clears out the old m-map files. Including + // the file that has ooo chunks. + files, err = os.ReadDir(mmapDir) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "000001", files[0].Name()) + + // This will merge overlapping block. + require.NoError(t, db.Compact(ctx)) + + require.Len(t, db.Blocks(), 5) + verifyBlockSamples(db.Blocks()[0], 100, 119) + verifyBlockSamples(db.Blocks()[1], 120, 239) + verifyBlockSamples(db.Blocks()[2], 240, 359) + verifyBlockSamples(db.Blocks()[3], 360, 479) + verifyBlockSamples(db.Blocks()[4], 480, 520) // Merged block. + + // Final state. Blocks from normal and OOO head are merged. + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + } +} + +func copyWithCounterReset(s sample, hint histogram.CounterResetHint) sample { + if s.h != nil { + h := s.h.Copy() + h.CounterResetHint = hint + return sample{t: s.t, h: h} + } + + h := s.fh.Copy() + h.CounterResetHint = hint + return sample{t: s.t, fh: h} +} + func TestOOOCompactionFailure(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { @@ -5880,6 +6977,8 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) db.DisableCompactions() // We want to manually call it. + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() t.Cleanup(func() { require.NoError(t, db.Close()) }) @@ -5907,7 +7006,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { // There is a 0th WBL file. verifyFirstWBLFileIs0 := func(count int) { - require.NoError(t, db.head.wbl.Sync()) // syncing to make sure wbl is flushed in windows + require.NoError(t, db.head.wbl.Sync()) // Syncing to make sure wbl is flushed in windows. files, err := os.ReadDir(db.head.wbl.Dir()) require.NoError(t, err) require.Len(t, files, count) @@ -5961,7 +7060,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { require.Len(t, db.Blocks(), 3) require.Equal(t, oldBlocks, db.Blocks()) - // There should be a single m-map file + // There should be a single m-map file. verifyMmapFiles("000001") // All but last WBL file will be deleted. @@ -6057,7 +7156,7 @@ func TestWBLCorruption(t *testing.T) { // should be deleted after replay. // Checking where we corrupt it. - require.NoError(t, db.head.wbl.Sync()) // syncing to make sure wbl is flushed in windows + require.NoError(t, db.head.wbl.Sync()) // Syncing to make sure wbl is flushed in windows. files, err := os.ReadDir(db.head.wbl.Dir()) require.NoError(t, err) require.Len(t, files, 2) @@ -6080,7 +7179,7 @@ func TestWBLCorruption(t *testing.T) { addSamples(310, 320, false) // Verifying that we have data after corruption point. - require.NoError(t, db.head.wbl.Sync()) // syncing to make sure wbl is flushed in windows + require.NoError(t, db.head.wbl.Sync()) // Syncing to make sure wbl is flushed in windows. files, err = os.ReadDir(db.head.wbl.Dir()) require.NoError(t, err) require.Len(t, files, 3) @@ -6167,6 +7266,8 @@ func testOOOMmapCorruption(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 10 opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -6300,6 +7401,8 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = oooTimeWindow + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -6593,6 +7696,8 @@ func testNoGapAfterRestartWithOOO(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = 30 * time.Minute.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -6651,6 +7756,8 @@ func testWblReplayAfterOOODisableAndRestart(t *testing.T, scenario sampleTypeSce opts := DefaultOptions() opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -6718,6 +7825,8 @@ func testPanicOnApplyConfig(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -6775,6 +7884,8 @@ func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenari opts := DefaultOptions() opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -7147,28 +8258,16 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { createBlock(t, db.Dir(), series) for _, s := range series { - key := s.Labels().String() + lbls := s.Labels().String() + slice := exp[lbls] it = s.Iterator(it) - slice := exp[key] - for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { - switch typ { - case chunkenc.ValFloat: - ts, v := it.At() - slice = append(slice, sample{t: ts, f: v}) - case chunkenc.ValHistogram: - ts, h := it.AtHistogram(nil) - slice = append(slice, sample{t: ts, h: h}) - case chunkenc.ValFloatHistogram: - ts, h := it.AtFloatHistogram(nil) - slice = append(slice, sample{t: ts, fh: h}) - default: - t.Fatalf("unexpected sample value type %d", typ) - } - } + smpls, err := storage.ExpandSamples(it, nil) + require.NoError(t, err) + slice = append(slice, smpls...) sort.Slice(slice, func(i, j int) bool { return slice[i].T() < slice[j].T() }) - exp[key] = slice + exp[lbls] = slice } } @@ -7201,10 +8300,10 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { // due to origin from different overlapping chunks anymore. for _, ss := range exp { for i, s := range ss[1:] { - if s.H() != nil && ss[i].H() != nil && s.H().CounterResetHint == histogram.UnknownCounterReset { + if s.Type() == chunkenc.ValHistogram && ss[i].Type() == chunkenc.ValHistogram && s.H().CounterResetHint == histogram.UnknownCounterReset { s.H().CounterResetHint = histogram.NotCounterReset } - if s.FH() != nil && ss[i].FH() != nil && s.FH().CounterResetHint == histogram.UnknownCounterReset { + if s.Type() == chunkenc.ValFloatHistogram && ss[i].Type() == chunkenc.ValFloatHistogram && s.FH().CounterResetHint == histogram.UnknownCounterReset { s.FH().CounterResetHint = histogram.NotCounterReset } } @@ -7328,6 +8427,112 @@ func TestNativeHistogramFlag(t *testing.T) { }, act) } +func TestOOONativeHistogramFlag(t *testing.T) { + h := &histogram.Histogram{ + Count: 9, + ZeroCount: 4, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, + } + + l := labels.FromStrings("foo", "bar") + + t.Run("Test OOO native histograms if OOO is disabled", func(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 0 + db := openTestDB(t, opts, []int64{100}) + defer func() { + require.NoError(t, db.Close()) + }() + + // Enable Native Histograms and OOO Native Histogram ingestion + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + + app := db.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, 100, h, nil) + require.NoError(t, err) + + _, err = app.AppendHistogram(0, l, 50, h, nil) + require.NoError(t, err) // The OOO sample is not detected until it is committed, so no error is returned + + require.NoError(t, app.Commit()) + + q, err := db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{ + l.String(): {sample{t: 100, h: h}}, + }, act) + }) + t.Run("Test OOO Native Histograms if Native Histograms are disabled", func(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 100 + db := openTestDB(t, opts, []int64{100}) + defer func() { + require.NoError(t, db.Close()) + }() + + // Disable Native Histograms and enable OOO Native Histogram ingestion + db.DisableNativeHistograms() + db.EnableOOONativeHistograms() + + // Attempt to add an in-order sample + app := db.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, 200, h, nil) + require.Equal(t, storage.ErrNativeHistogramsDisabled, err) + + // Attempt to add an OOO sample + _, err = app.AppendHistogram(0, l, 100, h, nil) + require.Equal(t, storage.ErrNativeHistogramsDisabled, err) + + require.NoError(t, app.Commit()) + + q, err := db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{}, act) + }) + t.Run("Test OOO native histograms when flag is enabled", func(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 100 + db := openTestDB(t, opts, []int64{100}) + defer func() { + require.NoError(t, db.Close()) + }() + + // Enable Native Histograms and OOO Native Histogram ingestion + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + + // Add in-order samples + app := db.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, 200, h, nil) + require.NoError(t, err) + + // Add OOO samples + _, err = app.AppendHistogram(0, l, 100, h, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, l, 150, h, nil) + require.NoError(t, err) + + require.NoError(t, app.Commit()) + + q, err := db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + requireEqualSeries(t, map[string][]chunks.Sample{ + l.String(): {sample{t: 100, h: h}, sample{t: 150, h: h}, sample{t: 200, h: h}}, + }, act, true) + }) +} + // compareSeries essentially replaces `require.Equal(t, expected, actual) in // situations where the actual series might contain more counter reset hints // "unknown" than the expected series. This can easily happen for long series @@ -7343,29 +8548,47 @@ func compareSeries(t require.TestingT, expected, actual map[string][]chunks.Samp // package. require.Equal(t, expected, actual, "number of series differs") } - for key, eSamples := range expected { - aSamples, ok := actual[key] + for key, expSamples := range expected { + actSamples, ok := actual[key] if !ok { require.Equal(t, expected, actual, "expected series %q not found", key) } - if len(eSamples) != len(aSamples) { - require.Equal(t, eSamples, aSamples, "number of samples for series %q differs", key) - } - for i, eS := range eSamples { - aS := aSamples[i] - aH, eH := aS.H(), eS.H() - aFH, eFH := aS.FH(), eS.FH() - switch { - case aH != nil && eH != nil && aH.CounterResetHint == histogram.UnknownCounterReset && eH.CounterResetHint != histogram.GaugeType: - eH = eH.Copy() - eH.CounterResetHint = histogram.UnknownCounterReset - eS = sample{t: eS.T(), h: eH} - case aFH != nil && eFH != nil && aFH.CounterResetHint == histogram.UnknownCounterReset && eFH.CounterResetHint != histogram.GaugeType: - eFH = eFH.Copy() - eFH.CounterResetHint = histogram.UnknownCounterReset - eS = sample{t: eS.T(), fh: eFH} + if len(expSamples) != len(actSamples) { + require.Equal(t, expSamples, actSamples, "number of samples for series %q differs", key) + } + + for i, eS := range expSamples { + aS := actSamples[i] + + // Must use the interface as Equal does not work when actual types differ + // not only does the type differ, but chunk.Sample.FH() interface may auto convert from chunk.Sample.H()! + require.Equal(t, eS.T(), aS.T(), "timestamp of sample %d in series %q differs", i, key) + + require.Equal(t, eS.Type(), aS.Type(), "type of sample %d in series %q differs", i, key) + + switch eS.Type() { + case chunkenc.ValFloat: + require.Equal(t, eS.F(), aS.F(), "sample %d in series %q differs", i, key) + case chunkenc.ValHistogram: + eH, aH := eS.H(), aS.H() + if aH.CounterResetHint == histogram.UnknownCounterReset && aH.CounterResetHint != histogram.GaugeType { + eH = eH.Copy() + // It is always safe to set the counter reset hint to UnknownCounterReset + eH.CounterResetHint = histogram.UnknownCounterReset + eS = sample{t: eS.T(), h: eH} + } + require.Equal(t, eH, aH, "histogram sample %d in series %q differs", i, key) + + case chunkenc.ValFloatHistogram: + eFH, aFH := eS.FH(), aS.FH() + if aFH.CounterResetHint == histogram.UnknownCounterReset && aFH.CounterResetHint != histogram.GaugeType { + eFH = eFH.Copy() + // It is always safe to set the counter reset hint to UnknownCounterReset + eFH.CounterResetHint = histogram.UnknownCounterReset + eS = sample{t: eS.T(), fh: eFH} + } + require.Equal(t, eFH, aFH, "float histogram sample %d in series %q differs", i, key) } - require.Equal(t, eS, aS, "sample %d in series %q differs", i, key) } } } diff --git a/tsdb/head.go b/tsdb/head.go index fdc5ad0e1..af16fbf37 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -150,6 +150,11 @@ type HeadOptions struct { // EnableNativeHistograms enables the ingestion of native histograms. EnableNativeHistograms atomic.Bool + // EnableOOONativeHistograms enables the ingestion of OOO native histograms. + // It will only take effect if EnableNativeHistograms is set to true and the + // OutOfOrderTimeWindow is > 0 + EnableOOONativeHistograms atomic.Bool + // EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample. // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md EnableCreatedTimestampZeroIngestion bool @@ -1018,6 +1023,16 @@ func (h *Head) DisableNativeHistograms() { h.opts.EnableNativeHistograms.Store(false) } +// EnableOOONativeHistograms enables the ingestion of out-of-order native histograms. +func (h *Head) EnableOOONativeHistograms() { + h.opts.EnableOOONativeHistograms.Store(true) +} + +// DisableOOONativeHistograms disables the ingestion of out-of-order native histograms. +func (h *Head) DisableOOONativeHistograms() { + h.opts.EnableOOONativeHistograms.Store(false) +} + // PostingsCardinalityStats returns highest cardinality stats by label and value names. func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats { cacheKey := statsByLabelName + ";" + strconv.Itoa(limit) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index ea2ee0091..3dd9a367b 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -321,8 +321,8 @@ type headAppender struct { } func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - // For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append. - // If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work. + // Fail fast if OOO is disabled and the sample is out of bounds. + // Otherwise a full check will be done later to decide if the sample is in-order or out-of-order. if a.oooTimeWindow == 0 && t < a.minValidTime { a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc() return 0, storage.ErrOutOfBounds @@ -493,46 +493,94 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi return false, headMaxt - t, storage.ErrOutOfOrderSample } -// appendableHistogram checks whether the given histogram is valid for appending to the series. -func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error { - if s.headChunks == nil { - return nil +// appendableHistogram checks whether the given histogram sample is valid for appending to the series. (if we return false and no error) +// The sample belongs to the out of order chunk if we return true and no error. +// An error signifies the sample cannot be handled. +func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram, headMaxt, minValidTime, oooTimeWindow int64, oooHistogramsEnabled bool) (isOOO bool, oooDelta int64, err error) { + // Check if we can append in the in-order chunk. + if t >= minValidTime { + if s.headChunks == nil { + // The series has no sample and was freshly created. + return false, 0, nil + } + msMaxt := s.maxTime() + if t > msMaxt { + return false, 0, nil + } + if t == msMaxt { + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + // This only checks against the latest in-order sample. + // The OOO headchunk has its own method to detect these duplicates. + if !h.Equals(s.lastHistogramValue) { + return false, 0, storage.ErrDuplicateSampleForTimestamp + } + // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. + return false, 0, nil + } } - if t > s.headChunks.maxTime { - return nil - } - if t < s.headChunks.maxTime { - return storage.ErrOutOfOrderSample + // The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk. + if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow { + if !oooHistogramsEnabled { + return true, headMaxt - t, storage.ErrOOONativeHistogramsDisabled + } + return true, headMaxt - t, nil } - // We are allowing exact duplicates as we can encounter them in valid cases - // like federation and erroring out at that time would be extremely noisy. - if !h.Equals(s.lastHistogramValue) { - return storage.ErrDuplicateSampleForTimestamp + // The sample cannot go in both in-order and out-of-order chunk. + if oooTimeWindow > 0 { + return true, headMaxt - t, storage.ErrTooOldSample } - return nil + if t < minValidTime { + return false, headMaxt - t, storage.ErrOutOfBounds + } + return false, headMaxt - t, storage.ErrOutOfOrderSample } -// appendableFloatHistogram checks whether the given float histogram is valid for appending to the series. -func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram) error { - if s.headChunks == nil { - return nil +// appendableFloatHistogram checks whether the given float histogram sample is valid for appending to the series. (if we return false and no error) +// The sample belongs to the out of order chunk if we return true and no error. +// An error signifies the sample cannot be handled. +func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram, headMaxt, minValidTime, oooTimeWindow int64, oooHistogramsEnabled bool) (isOOO bool, oooDelta int64, err error) { + // Check if we can append in the in-order chunk. + if t >= minValidTime { + if s.headChunks == nil { + // The series has no sample and was freshly created. + return false, 0, nil + } + msMaxt := s.maxTime() + if t > msMaxt { + return false, 0, nil + } + if t == msMaxt { + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + // This only checks against the latest in-order sample. + // The OOO headchunk has its own method to detect these duplicates. + if !fh.Equals(s.lastFloatHistogramValue) { + return false, 0, storage.ErrDuplicateSampleForTimestamp + } + // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. + return false, 0, nil + } } - if t > s.headChunks.maxTime { - return nil - } - if t < s.headChunks.maxTime { - return storage.ErrOutOfOrderSample + // The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk. + if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow { + if !oooHistogramsEnabled { + return true, headMaxt - t, storage.ErrOOONativeHistogramsDisabled + } + return true, headMaxt - t, nil } - // We are allowing exact duplicates as we can encounter them in valid cases - // like federation and erroring out at that time would be extremely noisy. - if !fh.Equals(s.lastFloatHistogramValue) { - return storage.ErrDuplicateSampleForTimestamp + // The sample cannot go in both in-order and out-of-order chunk. + if oooTimeWindow > 0 { + return true, headMaxt - t, storage.ErrTooOldSample } - return nil + if t < minValidTime { + return false, headMaxt - t, storage.ErrOutOfBounds + } + return false, headMaxt - t, storage.ErrOutOfOrderSample } // AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't @@ -577,7 +625,9 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels return 0, storage.ErrNativeHistogramsDisabled } - if t < a.minValidTime { + // Fail fast if OOO is disabled and the sample is out of bounds. + // Otherwise a full check will be done later to decide if the sample is in-order or out-of-order. + if (a.oooTimeWindow == 0 || !a.head.opts.EnableOOONativeHistograms.Load()) && t < a.minValidTime { a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() return 0, storage.ErrOutOfBounds } @@ -629,15 +679,27 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels switch { case h != nil: s.Lock() - if err := s.appendableHistogram(t, h); err != nil { - s.Unlock() - if errors.Is(err, storage.ErrOutOfOrderSample) { + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise + // to skip that sample from the WAL and write only in the WBL. + _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + if err != nil { + s.pendingCommit = true + } + s.Unlock() + if delta > 0 { + a.head.metrics.oooHistogram.Observe(float64(delta) / 1000) + } + if err != nil { + switch { + case errors.Is(err, storage.ErrOutOfOrderSample): + fallthrough + case errors.Is(err, storage.ErrOOONativeHistogramsDisabled): a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + case errors.Is(err, storage.ErrTooOldSample): + a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() } return 0, err } - s.pendingCommit = true - s.Unlock() a.histograms = append(a.histograms, record.RefHistogramSample{ Ref: s.ref, T: t, @@ -646,15 +708,27 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels a.histogramSeries = append(a.histogramSeries, s) case fh != nil: s.Lock() - if err := s.appendableFloatHistogram(t, fh); err != nil { - s.Unlock() - if errors.Is(err, storage.ErrOutOfOrderSample) { + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise + // to skip that sample from the WAL and write only in the WBL. + _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + if err == nil { + s.pendingCommit = true + } + s.Unlock() + if delta > 0 { + a.head.metrics.oooHistogram.Observe(float64(delta) / 1000) + } + if err != nil { + switch { + case errors.Is(err, storage.ErrOutOfOrderSample): + fallthrough + case errors.Is(err, storage.ErrOOONativeHistogramsDisabled): a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + case errors.Is(err, storage.ErrTooOldSample): + a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() } return 0, err } - s.pendingCommit = true - s.Unlock() a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ Ref: s.ref, T: t, @@ -841,20 +915,24 @@ func (a *headAppender) Commit() (err error) { floatsAppended = len(a.samples) histogramsAppended = len(a.histograms) + len(a.floatHistograms) // number of samples out of order but accepted: with ooo enabled and within time window - oooFloatsAccepted int + oooFloatsAccepted int + oooHistogramAccepted int // number of samples rejected due to: out of order but OOO support disabled. floatOOORejected int histoOOORejected int // number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window) floatTooOldRejected int + histoTooOldRejected int // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled) - floatOOBRejected int - + floatOOBRejected int + histoOOBRejected int inOrderMint int64 = math.MaxInt64 inOrderMaxt int64 = math.MinInt64 oooMinT int64 = math.MaxInt64 oooMaxT int64 = math.MinInt64 wblSamples []record.RefSample + wblHistograms []record.RefHistogramSample + wblFloatHistograms []record.RefFloatHistogramSample oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef oooMmapMarkersCount int oooRecords [][]byte @@ -876,6 +954,8 @@ func (a *headAppender) Commit() (err error) { if a.head.wbl == nil { // WBL is not enabled. So no need to collect. wblSamples = nil + wblHistograms = nil + wblFloatHistograms = nil oooMmapMarkers = nil oooMmapMarkersCount = 0 return @@ -903,8 +983,18 @@ func (a *headAppender) Commit() (err error) { r := enc.Samples(wblSamples, a.head.getBytesBuffer()) oooRecords = append(oooRecords, r) } + if len(wblHistograms) > 0 { + r := enc.HistogramSamples(wblHistograms, a.head.getBytesBuffer()) + oooRecords = append(oooRecords, r) + } + if len(wblFloatHistograms) > 0 { + r := enc.FloatHistogramSamples(wblFloatHistograms, a.head.getBytesBuffer()) + oooRecords = append(oooRecords, r) + } wblSamples = nil + wblHistograms = nil + wblFloatHistograms = nil oooMmapMarkers = nil } for i, s := range a.samples { @@ -1006,51 +1096,193 @@ func (a *headAppender) Commit() (err error) { for i, s := range a.histograms { series = a.histogramSeries[i] series.Lock() - ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts) - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - if ok { - if s.T < inOrderMint { - inOrderMint = s.T - } - if s.T > inOrderMaxt { - inOrderMaxt = s.T - } - } else { + oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + switch { + case err == nil: + // Do nothing. + case errors.Is(err, storage.ErrOutOfOrderSample): histogramsAppended-- histoOOORejected++ + case errors.Is(err, storage.ErrOutOfBounds): + histogramsAppended-- + histoOOBRejected++ + case errors.Is(err, storage.ErrTooOldSample): + histogramsAppended-- + histoTooOldRejected++ + default: + histogramsAppended-- + } + + var ok, chunkCreated bool + + switch { + case err != nil: + // Do nothing here. + case oooSample: + // Sample is OOO and OOO handling is enabled + // and the delta is within the OOO tolerance. + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger) + if chunkCreated { + r, ok := oooMmapMarkers[series.ref] + if !ok || r != nil { + // !ok means there are no markers collected for these samples yet. So we first flush the samples + // before setting this m-map marker. + + // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). + // Hence, before we m-map again, we should add the samples and m-map markers + // seen till now to the WBL records. + collectOOORecords() + } + + if oooMmapMarkers == nil { + oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + oooMmapMarkers[series.ref] = mmapRefs + oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + oooMmapMarkersCount++ + } + } + if ok { + wblHistograms = append(wblHistograms, s) + if s.T < oooMinT { + oooMinT = s.T + } + if s.T > oooMaxT { + oooMaxT = s.T + } + oooHistogramAccepted++ + } else { + // Sample is an exact duplicate of the last sample. + // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, + // not with samples in already flushed OOO chunks. + // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. + histogramsAppended-- + } + default: + ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts) + if ok { + if s.T < inOrderMint { + inOrderMint = s.T + } + if s.T > inOrderMaxt { + inOrderMaxt = s.T + } + } else { + histogramsAppended-- + histoOOORejected++ + } } + if chunkCreated { a.head.metrics.chunks.Inc() a.head.metrics.chunksCreated.Inc() } + + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() } for i, s := range a.floatHistograms { series = a.floatHistogramSeries[i] series.Lock() - ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts) - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - if ok { - if s.T < inOrderMint { - inOrderMint = s.T - } - if s.T > inOrderMaxt { - inOrderMaxt = s.T - } - } else { + oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) + switch { + case err == nil: + // Do nothing. + case errors.Is(err, storage.ErrOutOfOrderSample): histogramsAppended-- histoOOORejected++ + case errors.Is(err, storage.ErrOutOfBounds): + histogramsAppended-- + histoOOBRejected++ + case errors.Is(err, storage.ErrTooOldSample): + histogramsAppended-- + histoTooOldRejected++ + default: + histogramsAppended-- } + + var ok, chunkCreated bool + + switch { + case err != nil: + // Do nothing here. + case oooSample: + // Sample is OOO and OOO handling is enabled + // and the delta is within the OOO tolerance. + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax, a.head.logger) + if chunkCreated { + r, ok := oooMmapMarkers[series.ref] + if !ok || r != nil { + // !ok means there are no markers collected for these samples yet. So we first flush the samples + // before setting this m-map marker. + + // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). + // Hence, before we m-map again, we should add the samples and m-map markers + // seen till now to the WBL records. + collectOOORecords() + } + + if oooMmapMarkers == nil { + oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + oooMmapMarkers[series.ref] = mmapRefs + oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + oooMmapMarkersCount++ + } + } + if ok { + wblFloatHistograms = append(wblFloatHistograms, s) + if s.T < oooMinT { + oooMinT = s.T + } + if s.T > oooMaxT { + oooMaxT = s.T + } + oooHistogramAccepted++ + } else { + // Sample is an exact duplicate of the last sample. + // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, + // not with samples in already flushed OOO chunks. + // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. + histogramsAppended-- + } + default: + ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts) + if ok { + if s.T < inOrderMint { + inOrderMint = s.T + } + if s.T > inOrderMaxt { + inOrderMaxt = s.T + } + } else { + histogramsAppended-- + histoOOORejected++ + } + } + if chunkCreated { a.head.metrics.chunks.Inc() a.head.metrics.chunksCreated.Inc() } + + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() } for i, m := range a.metadata { @@ -1067,6 +1299,7 @@ func (a *headAppender) Commit() (err error) { a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended)) a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooFloatsAccepted)) + a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(oooHistogramAccepted)) a.head.updateMinMaxTime(inOrderMint, inOrderMaxt) a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ffe3c0494..483121dc6 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2723,15 +2723,32 @@ func TestIsolationWithoutAdd(t *testing.T) { func TestOutOfOrderSamplesMetric(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { - testOutOfOrderSamplesMetric(t, scenario) + options := DefaultOptions() + options.EnableNativeHistograms = true + options.EnableOOONativeHistograms = true + testOutOfOrderSamplesMetric(t, scenario, options, storage.ErrOutOfOrderSample) }) } } -func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) { - dir := t.TempDir() +func TestOutOfOrderSamplesMetricNativeHistogramOOODisabled(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + if scenario.sampleType != "histogram" { + continue + } + t.Run(name, func(t *testing.T) { + options := DefaultOptions() + options.OutOfOrderTimeWindow = (1000 * time.Minute).Milliseconds() + options.EnableNativeHistograms = true + options.EnableOOONativeHistograms = false + testOutOfOrderSamplesMetric(t, scenario, options, storage.ErrOOONativeHistogramsDisabled) + }) + } +} - db, err := Open(dir, nil, nil, DefaultOptions(), nil) +func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario, options *Options, expectOutOfOrderError error) { + dir := t.TempDir() + db, err := Open(dir, nil, nil, options, nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) @@ -2755,15 +2772,15 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) { require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) app = db.Appender(ctx) _, err = appendSample(app, 2) - require.Equal(t, storage.ErrOutOfOrderSample, err) + require.Equal(t, expectOutOfOrderError, err) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) _, err = appendSample(app, 3) - require.Equal(t, storage.ErrOutOfOrderSample, err) + require.Equal(t, expectOutOfOrderError, err) require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) _, err = appendSample(app, 4) - require.Equal(t, storage.ErrOutOfOrderSample, err) + require.Equal(t, expectOutOfOrderError, err) require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) require.NoError(t, app.Commit()) @@ -2798,15 +2815,15 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) { // Test out of order metric. app = db.Appender(ctx) _, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+2) - require.Equal(t, storage.ErrOutOfOrderSample, err) + require.Equal(t, expectOutOfOrderError, err) require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) _, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+3) - require.Equal(t, storage.ErrOutOfOrderSample, err) + require.Equal(t, expectOutOfOrderError, err) require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) _, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+4) - require.Equal(t, storage.ErrOutOfOrderSample, err) + require.Equal(t, expectOutOfOrderError, err) require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) require.NoError(t, app.Commit()) } @@ -4657,10 +4674,172 @@ func TestHistogramCounterResetHeader(t *testing.T) { } } +func TestOOOHistogramCounterResetHeaders(t *testing.T) { + for _, floatHisto := range []bool{true, false} { + t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { + l := labels.FromStrings("a", "b") + head, _ := newTestHead(t, 1000, wlog.CompressionNone, true) + head.opts.OutOfOrderCapMax.Store(5) + head.opts.EnableOOONativeHistograms.Store(true) + + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + appendHistogram := func(ts int64, h *histogram.Histogram) { + app := head.Appender(context.Background()) + var err error + if floatHisto { + _, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat(nil)) + } else { + _, err = app.AppendHistogram(0, l, ts, h.Copy(), nil) + } + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + type expOOOMmappedChunks struct { + header chunkenc.CounterResetHeader + mint, maxt int64 + numSamples uint16 + } + + var expChunks []expOOOMmappedChunks + checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) { + expChunks = append(expChunks, newChunks...) + + ms, _, err := head.getOrCreate(l.Hash(), l) + require.NoError(t, err) + + require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks)) + + for i, mmapChunk := range ms.ooo.oooMmappedChunks { + chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) + require.NoError(t, err) + if floatHisto { + require.Equal(t, expChunks[i].header, chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + } else { + require.Equal(t, expChunks[i].header, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + require.Equal(t, expChunks[i].mint, mmapChunk.minTime) + require.Equal(t, expChunks[i].maxt, mmapChunk.maxTime) + require.Equal(t, expChunks[i].numSamples, mmapChunk.numSamples) + } + } + + // Append an in-order histogram, so the rest of the samples can be detected as OOO. + appendHistogram(1000, tsdbutil.GenerateTestHistogram(1000)) + + // OOO histogram + for i := 1; i <= 5; i++ { + appendHistogram(100+int64(i), tsdbutil.GenerateTestHistogram(1000+i)) + } + // Nothing mmapped yet. + checkOOOExpCounterResetHeader() + + // 6th observation (which triggers a head chunk mmapping). + appendHistogram(int64(112), tsdbutil.GenerateTestHistogram(1002)) + + // One mmapped chunk with (ts, val) [(101, 1001), (102, 1002), (103, 1003), (104, 1004), (105, 1005)]. + checkOOOExpCounterResetHeader(expOOOMmappedChunks{ + header: chunkenc.UnknownCounterReset, + mint: 101, + maxt: 105, + numSamples: 5, + }) + + // Add more samples, there's a counter reset at ts 122. + appendHistogram(int64(110), tsdbutil.GenerateTestHistogram(1001)) + appendHistogram(int64(124), tsdbutil.GenerateTestHistogram(904)) + appendHistogram(int64(123), tsdbutil.GenerateTestHistogram(903)) + appendHistogram(int64(122), tsdbutil.GenerateTestHistogram(902)) + + // New samples not mmapped yet. + checkOOOExpCounterResetHeader() + + // 11th observation (which triggers another head chunk mmapping). + appendHistogram(int64(200), tsdbutil.GenerateTestHistogram(2000)) + + // Two new mmapped chunks [(110, 1001), (112, 1002)], [(122, 902), (123, 903), (124, 904)]. + checkOOOExpCounterResetHeader( + expOOOMmappedChunks{ + header: chunkenc.UnknownCounterReset, + mint: 110, + maxt: 112, + numSamples: 2, + }, + expOOOMmappedChunks{ + header: chunkenc.CounterReset, + mint: 122, + maxt: 124, + numSamples: 3, + }, + ) + + // Count is lower than previous sample at ts 200, and NotCounterReset is always ignored on append. + appendHistogram(int64(205), tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(1000))) + + appendHistogram(int64(210), tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(2010))) + + appendHistogram(int64(220), tsdbutil.GenerateTestHistogram(2020)) + + appendHistogram(int64(215), tsdbutil.GenerateTestHistogram(2005)) + + // 16th observation (which triggers another head chunk mmapping). + appendHistogram(int64(350), tsdbutil.GenerateTestHistogram(4000)) + + // Four new mmapped chunks: [(200, 2000)] [(205, 1000)], [(210, 2010)], [(215, 2015), (220, 2020)] + checkOOOExpCounterResetHeader( + expOOOMmappedChunks{ + header: chunkenc.UnknownCounterReset, + mint: 200, + maxt: 200, + numSamples: 1, + }, + expOOOMmappedChunks{ + header: chunkenc.CounterReset, + mint: 205, + maxt: 205, + numSamples: 1, + }, + expOOOMmappedChunks{ + header: chunkenc.CounterReset, + mint: 210, + maxt: 210, + numSamples: 1, + }, + expOOOMmappedChunks{ + header: chunkenc.CounterReset, + mint: 215, + maxt: 220, + numSamples: 2, + }, + ) + + // Adding five more samples (21 in total), so another mmapped chunk is created. + appendHistogram(300, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(3000))) + + for i := 1; i <= 4; i++ { + appendHistogram(300+int64(i), tsdbutil.GenerateTestHistogram(3000+i)) + } + + // One mmapped chunk with (ts, val) [(300, 3000), (301, 3001), (302, 3002), (303, 3003), (350, 4000)]. + checkOOOExpCounterResetHeader(expOOOMmappedChunks{ + header: chunkenc.CounterReset, + mint: 300, + maxt: 350, + numSamples: 5, + }) + }) + } +} + func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { dir := t.TempDir() opts := DefaultOptions() opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) t.Cleanup(func() { @@ -4931,6 +5110,8 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { opts.ChunkRange = 1000 opts.ChunkDirRoot = dir opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds()) + opts.EnableNativeHistograms.Store(true) + opts.EnableOOONativeHistograms.Store(true) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -4940,13 +5121,12 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { l := labels.FromStrings("foo", "bar") appendSample := func(mins int64, val float64, isOOO bool) { app := h.Appender(context.Background()) - ts, v := mins*time.Minute.Milliseconds(), val - _, err := app.Append(0, l, ts, v) + _, s, err := scenario.appendFunc(app, l, mins*time.Minute.Milliseconds(), mins) require.NoError(t, err) require.NoError(t, app.Commit()) if isOOO { - expOOOSamples = append(expOOOSamples, sample{t: ts, f: v}) + expOOOSamples = append(expOOOSamples, s) } } @@ -5025,6 +5205,8 @@ func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { opts.ChunkDirRoot = dir opts.OutOfOrderCapMax.Store(30) opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds()) + opts.EnableNativeHistograms.Store(true) + opts.EnableOOONativeHistograms.Store(true) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -5326,6 +5508,8 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap opts.ChunkDirRoot = dir opts.OutOfOrderCapMax.Store(30) opts.OutOfOrderTimeWindow.Store(120 * time.Minute.Milliseconds()) + opts.EnableNativeHistograms.Store(true) + opts.EnableOOONativeHistograms.Store(true) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -5399,7 +5583,9 @@ func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Ap func TestHeadMinOOOTimeUpdate(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { - testHeadMinOOOTimeUpdate(t, scenario) + if scenario.sampleType == sampleMetricTypeFloat { + testHeadMinOOOTimeUpdate(t, scenario) + } }) } } @@ -5414,6 +5600,8 @@ func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) { opts := DefaultHeadOptions() opts.ChunkDirRoot = dir opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) + opts.EnableNativeHistograms.Store(true) + opts.EnableOOONativeHistograms.Store(true) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ef96b5330..14a1d0d47 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -646,9 +646,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp } func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { - // Track number of samples, m-map markers, that referenced a series we don't know about + // Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about // for error reporting. - var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64 + var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64 lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. @@ -657,8 +657,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch concurrency = h.opts.WALReplayConcurrency processors = make([]wblSubsetProcessor, concurrency) - dec = record.NewDecoder(syms) - shards = make([][]record.RefSample, concurrency) + dec record.Decoder + shards = make([][]record.RefSample, concurrency) + histogramShards = make([][]histogramRecord, concurrency) decodedCh = make(chan interface{}, 10) decodeErr error @@ -672,6 +673,16 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return []record.RefMmapMarker{} }, } + histogramSamplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefHistogramSample{} + }, + } + floatHistogramSamplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefFloatHistogramSample{} + }, + } ) defer func() { @@ -692,8 +703,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch processors[i].setup() go func(wp *wblSubsetProcessor) { - unknown := wp.processWBLSamples(h) + unknown, unknownHistograms := wp.processWBLSamples(h) unknownRefs.Add(unknown) + unknownHistogramRefs.Add(unknownHistograms) wg.Done() }(&processors[i]) } @@ -727,6 +739,30 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decodedCh <- markers + case record.HistogramSamples: + hists := histogramSamplesPool.Get().([]record.RefHistogramSample)[:0] + hists, err = dec.HistogramSamples(rec, hists) + if err != nil { + decodeErr = &wlog.CorruptionErr{ + Err: fmt.Errorf("decode histograms: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decodedCh <- hists + case record.FloatHistogramSamples: + hists := floatHistogramSamplesPool.Get().([]record.RefFloatHistogramSample)[:0] + hists, err = dec.FloatHistogramSamples(rec, hists) + if err != nil { + decodeErr = &wlog.CorruptionErr{ + Err: fmt.Errorf("decode float histograms: %w", err), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decodedCh <- hists default: // Noop. } @@ -791,6 +827,70 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch idx := uint64(ms.ref) % uint64(concurrency) processors[idx].input <- wblSubsetProcessorInputItem{mmappedSeries: ms} } + case []record.RefHistogramSample: + samples := v + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + m := 5000 + if len(samples) < m { + m = len(samples) + } + for i := 0; i < concurrency; i++ { + if histogramShards[i] == nil { + histogramShards[i] = processors[i].reuseHistogramBuf() + } + } + for _, sam := range samples[:m] { + if r, ok := multiRef[sam.Ref]; ok { + sam.Ref = r + } + mod := uint64(sam.Ref) % uint64(concurrency) + histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) + } + for i := 0; i < concurrency; i++ { + if len(histogramShards[i]) > 0 { + processors[i].input <- wblSubsetProcessorInputItem{histogramSamples: histogramShards[i]} + histogramShards[i] = nil + } + } + samples = samples[m:] + } + histogramSamplesPool.Put(v) //nolint:staticcheck + case []record.RefFloatHistogramSample: + samples := v + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + m := 5000 + if len(samples) < m { + m = len(samples) + } + for i := 0; i < concurrency; i++ { + if histogramShards[i] == nil { + histogramShards[i] = processors[i].reuseHistogramBuf() + } + } + for _, sam := range samples[:m] { + if r, ok := multiRef[sam.Ref]; ok { + sam.Ref = r + } + mod := uint64(sam.Ref) % uint64(concurrency) + histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) + } + for i := 0; i < concurrency; i++ { + if len(histogramShards[i]) > 0 { + processors[i].input <- wblSubsetProcessorInputItem{histogramSamples: histogramShards[i]} + histogramShards[i] = nil + } + } + samples = samples[m:] + } + floatHistogramSamplesPool.Put(v) //nolint:staticcheck default: panic(fmt.Errorf("unexpected decodedCh type: %T", d)) } @@ -833,17 +933,20 @@ func (e errLoadWbl) Unwrap() error { } type wblSubsetProcessor struct { - input chan wblSubsetProcessorInputItem - output chan []record.RefSample + input chan wblSubsetProcessorInputItem + output chan []record.RefSample + histogramsOutput chan []histogramRecord } type wblSubsetProcessorInputItem struct { - mmappedSeries *memSeries - samples []record.RefSample + mmappedSeries *memSeries + samples []record.RefSample + histogramSamples []histogramRecord } func (wp *wblSubsetProcessor) setup() { wp.output = make(chan []record.RefSample, 300) + wp.histogramsOutput = make(chan []histogramRecord, 300) wp.input = make(chan wblSubsetProcessorInputItem, 300) } @@ -851,6 +954,8 @@ func (wp *wblSubsetProcessor) closeAndDrain() { close(wp.input) for range wp.output { } + for range wp.histogramsOutput { + } } // If there is a buffer in the output chan, return it for reuse, otherwise return nil. @@ -863,10 +968,21 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample { return nil } +// If there is a buffer in the output chan, return it for reuse, otherwise return nil. +func (wp *wblSubsetProcessor) reuseHistogramBuf() []histogramRecord { + select { + case buf := <-wp.histogramsOutput: + return buf[:0] + default: + } + return nil +} + // processWBLSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. -func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { +func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) { defer close(wp.output) + defer close(wp.histogramsOutput) oooCapMax := h.opts.OutOfOrderCapMax.Load() // We don't check for minValidTime for ooo samples. @@ -905,11 +1021,41 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { case wp.output <- in.samples: default: } + for _, s := range in.histogramSamples { + ms := h.series.getByID(s.ref) + if ms == nil { + unknownHistogramRefs++ + continue + } + var chunkCreated bool + var ok bool + if s.h != nil { + ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger) + } else { + ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger) + } + if chunkCreated { + h.metrics.chunksCreated.Inc() + h.metrics.chunks.Inc() + } + if ok { + if s.t > maxt { + maxt = s.t + } + if s.t < mint { + mint = s.t + } + } + } + select { + case wp.histogramsOutput <- in.histogramSamples: + default: + } } h.updateMinOOOMaxOOOTime(mint, maxt) - return unknownRefs + return unknownRefs, unknownHistogramRefs } const ( diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 40e37043b..b5944f6c8 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -389,6 +389,7 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) { chunkRange := int64(2000) head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true) + head.opts.EnableOOONativeHistograms.Store(true) t.Cleanup(func() { require.NoError(t, head.Close()) }) ctx := context.Background() @@ -493,6 +494,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 5 opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true s1 := labels.FromStrings("l", "v1") minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } @@ -902,6 +905,8 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( opts := DefaultOptions() opts.OutOfOrderCapMax = 5 opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() + opts.EnableNativeHistograms = true + opts.EnableOOONativeHistograms = true s1 := labels.FromStrings("l", "v1") minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index d3cd5f601..b9badfea2 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -28,15 +28,14 @@ import ( const testMaxSize int = 32 // Formulas chosen to make testing easy. -func valEven(pos int) int { return pos*2 + 2 } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 - Predictable pre-existing values -func valOdd(pos int) int { return pos*2 + 1 } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 - New values will interject at chosen position because they sort before the pre-existing vals. - -func samplify(v int) sample { return sample{int64(v), float64(v), nil, nil} } +// Formulas chosen to make testing easy. +func valEven(pos int) int64 { return int64(pos*2 + 2) } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 - Predictable pre-existing values +func valOdd(pos int) int64 { return int64(pos*2 + 1) } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 - New values will interject at chosen position because they sort before the pre-existing vals. -func makeEvenSampleSlice(n int) []sample { +func makeEvenSampleSlice(n int, sampleFunc func(ts int64) sample) []sample { s := make([]sample, n) for i := 0; i < n; i++ { - s[i] = samplify(valEven(i)) + s[i] = sampleFunc(valEven(i)) } return s } @@ -45,8 +44,36 @@ func makeEvenSampleSlice(n int) []sample { // - Number of pre-existing samples anywhere from 0 to testMaxSize-1. // - Insert new sample before first pre-existing samples, after the last, and anywhere in between. // - With a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves. -// Note: In all samples used, t always equals v in numeric value. when we talk about 'value' we just refer to a value that will be used for both sample.t and sample.v. func TestOOOInsert(t *testing.T) { + scenarios := map[string]struct { + sampleFunc func(ts int64) sample + }{ + "float": { + sampleFunc: func(ts int64) sample { + return sample{t: ts, f: float64(ts)} + }, + }, + "integer histogram": { + sampleFunc: func(ts int64) sample { + return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} + }, + }, + "float histogram": { + sampleFunc: func(ts int64) sample { + return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts))} + }, + }, + } + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + testOOOInsert(t, scenario.sampleFunc) + }) + } +} + +func testOOOInsert(t *testing.T, + sampleFunc func(ts int64) sample, +) { for numPreExisting := 0; numPreExisting <= testMaxSize; numPreExisting++ { // For example, if we have numPreExisting 2, then: // chunk.samples indexes filled 0 1 @@ -56,20 +83,21 @@ func TestOOOInsert(t *testing.T) { for insertPos := 0; insertPos <= numPreExisting; insertPos++ { chunk := NewOOOChunk() - chunk.samples = makeEvenSampleSlice(numPreExisting) - newSample := samplify(valOdd(insertPos)) - chunk.Insert(newSample.t, newSample.f, nil, nil) + chunk.samples = make([]sample, numPreExisting) + chunk.samples = makeEvenSampleSlice(numPreExisting, sampleFunc) + newSample := sampleFunc(valOdd(insertPos)) + chunk.Insert(newSample.t, newSample.f, newSample.h, newSample.fh) var expSamples []sample // Our expected new samples slice, will be first the original samples. for i := 0; i < insertPos; i++ { - expSamples = append(expSamples, samplify(valEven(i))) + expSamples = append(expSamples, sampleFunc(valEven(i))) } // Then the new sample. expSamples = append(expSamples, newSample) // Followed by any original samples that were pushed back by the new one. for i := insertPos; i < numPreExisting; i++ { - expSamples = append(expSamples, samplify(valEven(i))) + expSamples = append(expSamples, sampleFunc(valEven(i))) } require.Equal(t, expSamples, chunk.samples, "numPreExisting %d, insertPos %d", numPreExisting, insertPos) @@ -81,17 +109,46 @@ func TestOOOInsert(t *testing.T) { // pre-existing samples, with between 1 and testMaxSize pre-existing samples and // with a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves. func TestOOOInsertDuplicate(t *testing.T) { + scenarios := map[string]struct { + sampleFunc func(ts int64) sample + }{ + "float": { + sampleFunc: func(ts int64) sample { + return sample{t: ts, f: float64(ts)} + }, + }, + "integer histogram": { + sampleFunc: func(ts int64) sample { + return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} + }, + }, + "float histogram": { + sampleFunc: func(ts int64) sample { + return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts))} + }, + }, + } + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + testOOOInsertDuplicate(t, scenario.sampleFunc) + }) + } +} + +func testOOOInsertDuplicate(t *testing.T, + sampleFunc func(ts int64) sample, +) { for num := 1; num <= testMaxSize; num++ { for dupPos := 0; dupPos < num; dupPos++ { chunk := NewOOOChunk() - chunk.samples = makeEvenSampleSlice(num) + chunk.samples = makeEvenSampleSlice(num, sampleFunc) dupSample := chunk.samples[dupPos] dupSample.f = 0.123 - ok := chunk.Insert(dupSample.t, dupSample.f, nil, nil) + ok := chunk.Insert(dupSample.t, dupSample.f, dupSample.h, dupSample.fh) - expSamples := makeEvenSampleSlice(num) // We expect no change. + expSamples := makeEvenSampleSlice(num, sampleFunc) // We expect no change. require.False(t, ok) require.Equal(t, expSamples, chunk.samples, "num %d, dupPos %d", num, dupPos) } diff --git a/tsdb/testutil.go b/tsdb/testutil.go index 9730e4713..ab6aab79f 100644 --- a/tsdb/testutil.go +++ b/tsdb/testutil.go @@ -16,6 +16,8 @@ package tsdb import ( "testing" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" @@ -27,7 +29,11 @@ import ( ) const ( - float = "float" + float = "float" + intHistogram = "integer histogram" + floatHistogram = "float histogram" + gaugeIntHistogram = "gauge int histogram" + gaugeFloatHistogram = "gauge float histogram" ) type testValue struct { @@ -42,7 +48,6 @@ type sampleTypeScenario struct { sampleFunc func(ts, value int64) sample } -// TODO: native histogram sample types will be added as part of out-of-order native histogram support; see #11220. var sampleTypeScenarios = map[string]sampleTypeScenario{ float: { sampleType: sampleMetricTypeFloat, @@ -55,50 +60,50 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{ return sample{t: ts, f: float64(value)} }, }, - // intHistogram: { - // sampleType: sampleMetricTypeHistogram, - // appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { - // s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))} - // ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil) - // return ref, s, err - // }, - // sampleFunc: func(ts, value int64) sample { - // return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))} - // }, - // }, - // floatHistogram: { - // sampleType: sampleMetricTypeHistogram, - // appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { - // s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))} - // ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh) - // return ref, s, err - // }, - // sampleFunc: func(ts, value int64) sample { - // return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))} - // }, - // }, - // gaugeIntHistogram: { - // sampleType: sampleMetricTypeHistogram, - // appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { - // s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))} - // ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil) - // return ref, s, err - // }, - // sampleFunc: func(ts, value int64) sample { - // return sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))} - // }, - // }, - // gaugeFloatHistogram: { - // sampleType: sampleMetricTypeHistogram, - // appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { - // s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))} - // ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh) - // return ref, s, err - // }, - // sampleFunc: func(ts, value int64) sample { - // return sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))} - // }, - // }, + intHistogram: { + sampleType: sampleMetricTypeHistogram, + appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { + s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))} + ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil) + return ref, s, err + }, + sampleFunc: func(ts, value int64) sample { + return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))} + }, + }, + floatHistogram: { + sampleType: sampleMetricTypeHistogram, + appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { + s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))} + ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh) + return ref, s, err + }, + sampleFunc: func(ts, value int64) sample { + return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))} + }, + }, + gaugeIntHistogram: { + sampleType: sampleMetricTypeHistogram, + appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { + s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))} + ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil) + return ref, s, err + }, + sampleFunc: func(ts, value int64) sample { + return sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))} + }, + }, + gaugeFloatHistogram: { + sampleType: sampleMetricTypeHistogram, + appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) { + s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))} + ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh) + return ref, s, err + }, + sampleFunc: func(ts, value int64) sample { + return sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))} + }, + }, } // requireEqualSeries checks that the actual series are equal to the expected ones. It ignores the counter reset hints for histograms.