From 0c852680bf921036624f6672b7814ad380a99222 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 29 Jun 2024 17:49:49 +0100 Subject: [PATCH 01/17] [Benchmark] TSDB: Add BenchmarkQuerierSelectWithOutOfOrder Refactor existing BenchmarkQuerierSelect to provide the set-up. Note that Head queries now run faster because they use a RangeHead. Signed-off-by: Bryan Boreham --- tsdb/block.go | 5 ++ tsdb/querier_bench_test.go | 104 +++++++++++++++++++++++++------------ 2 files changed, 75 insertions(+), 34 deletions(-) diff --git a/tsdb/block.go b/tsdb/block.go index 2f32733f8..c55e22ce5 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -467,6 +467,11 @@ func (pb *Block) setCompactionFailed() error { return nil } +// Querier implements Queryable. +func (pb *Block) Querier(mint, maxt int64) (storage.Querier, error) { + return NewBlockQuerier(pb, mint, maxt) +} + type blockIndexReader struct { ir IndexReader b *Block diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 9a8230242..e3e457d07 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/require" @@ -254,56 +255,91 @@ func BenchmarkMergedStringIter(b *testing.B) { b.ReportAllocs() } -func BenchmarkQuerierSelect(b *testing.B) { - opts := DefaultHeadOptions() - opts.ChunkRange = 1000 - opts.ChunkDirRoot = b.TempDir() - h, err := NewHead(nil, nil, nil, nil, opts, nil) +func createHeadForBenchmarkSelect(b *testing.B, numSeries int, addSeries func(app storage.Appender, i int)) (*Head, *DB) { + dir := b.TempDir() + opts := DefaultOptions() + opts.OutOfOrderCapMax = 255 + opts.OutOfOrderTimeWindow = 1000 + db, err := Open(dir, nil, nil, opts, nil) require.NoError(b, err) - defer h.Close() + b.Cleanup(func() { + require.NoError(b, db.Close()) + }) + h := db.Head() + app := h.Appender(context.Background()) - numSeries := 1000000 for i := 0; i < numSeries; i++ { - app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0) + addSeries(app, i) } require.NoError(b, app.Commit()) + return h, db +} - bench := func(b *testing.B, br BlockReader, sorted bool) { - matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") - for s := 1; s <= numSeries; s *= 10 { - b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) { - q, err := NewBlockQuerier(br, 0, int64(s-1)) - require.NoError(b, err) +func benchmarkSelect(b *testing.B, queryable storage.Queryable, numSeries int, sorted bool) { + matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") + b.ResetTimer() + for s := 1; s <= numSeries; s *= 10 { + b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) { + q, err := queryable.Querier(0, int64(s-1)) + require.NoError(b, err) - b.ResetTimer() - for i := 0; i < b.N; i++ { - ss := q.Select(context.Background(), sorted, nil, matcher) - for ss.Next() { - } - require.NoError(b, ss.Err()) + b.ResetTimer() + for i := 0; i < b.N; i++ { + ss := q.Select(context.Background(), sorted, nil, matcher) + for ss.Next() { } - q.Close() - }) - } + require.NoError(b, ss.Err()) + } + q.Close() + }) } +} + +func BenchmarkQuerierSelect(b *testing.B) { + numSeries := 1000000 + h, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) { + _, err := app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0) + if err != nil { + b.Fatal(err) + } + }) b.Run("Head", func(b *testing.B) { - bench(b, h, false) + benchmarkSelect(b, db, numSeries, false) }) b.Run("SortedHead", func(b *testing.B) { - bench(b, h, true) + benchmarkSelect(b, db, numSeries, true) }) - tmpdir := b.TempDir() + b.Run("Block", func(b *testing.B) { + tmpdir := b.TempDir() - blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlock(nil, blockdir, nil) - require.NoError(b, err) - defer func() { - require.NoError(b, block.Close()) - }() + blockdir := createBlockFromHead(b, tmpdir, h) + block, err := OpenBlock(nil, blockdir, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, block.Close()) + }() - b.Run("Block", func(b *testing.B) { - bench(b, block, false) + benchmarkSelect(b, block, numSeries, false) + }) +} + +func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) { + numSeries := 1000000 + _, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) { + l := labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)) + ref, err := app.Append(0, l, int64(i+1), 0) + if err != nil { + b.Fatal(err) + } + _, err = app.Append(ref, l, int64(i), 1) // Out of order sample + if err != nil { + b.Fatal(err) + } + }) + + b.Run("Head", func(b *testing.B) { + benchmarkSelect(b, db, numSeries, false) }) } From c75c8f8329758f82279d62b483a50c2fae00c283 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 24 Jun 2024 21:06:50 +0100 Subject: [PATCH 02/17] Refactoring: extract getSeriesChunks Signed-off-by: Bryan Boreham --- tsdb/head_read.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index c8b394be8..ff9345fa0 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -200,9 +200,15 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB *chks = (*chks)[:0] + getSeriesChunks(s, h.mint, h.maxt, chks) + + return nil +} + +func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) { for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. - if !c.OverlapsClosedInterval(h.mint, h.maxt) { + if !c.OverlapsClosedInterval(mint, maxt) { continue } *chks = append(*chks, chunks.Meta{ @@ -223,7 +229,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB } else { maxTime = chk.maxTime } - if chk.OverlapsClosedInterval(h.mint, h.maxt) { + if chk.OverlapsClosedInterval(mint, maxt) { *chks = append(*chks, chunks.Meta{ MinTime: chk.minTime, MaxTime: maxTime, @@ -233,8 +239,6 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB j++ } } - - return nil } // headChunkID returns the HeadChunkID referred to by the given position. From a32aca0cd74d5d1acbef0abd58ec48f2a8e560c5 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 27 Jun 2024 09:25:26 +0100 Subject: [PATCH 03/17] Refactoring: extract getOOOSeriesChunks Signed-off-by: Bryan Boreham --- tsdb/ooo_head_read.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 9d5b9d644..892d2c4b6 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -92,6 +92,10 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra return nil } + return getOOOSeriesChunks(s, oh.mint, oh.maxt, lastGarbageCollectedMmapRef, maxMmapRef, chks) +} + +func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, chks *[]chunks.Meta) error { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -106,7 +110,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra // Collect all chunks that overlap the query range. if s.ooo.oooHeadChunk != nil { c := s.ooo.oooHeadChunk - if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { + if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime) @@ -125,7 +129,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra } for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- { c := s.ooo.oooMmappedChunks[i] - if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) { + if c.OverlapsClosedInterval(mint, maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) addChunk(c.minTime, c.maxTime, ref, nil) } From 7e24844d081f82ef1d3933ace4477d60cec7d05b Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 26 Jun 2024 20:48:39 +0100 Subject: [PATCH 04/17] Refactor: extract headChunkReader.chunkFromSeries() For when you have a series locked already. Signed-off-by: Bryan Boreham --- tsdb/head_read.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index ff9345fa0..d75d28a58 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -362,9 +362,14 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. } s.Lock() + defer s.Unlock() + return h.chunkFromSeries(s, cid, copyLastChunk) +} + +// Call with s locked. +func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, copyLastChunk bool) (chunkenc.Chunk, int64, error) { c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) if err != nil { - s.Unlock() return nil, 0, err } defer func() { @@ -378,7 +383,6 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. // This means that the chunk is outside the specified range. if !c.OverlapsClosedInterval(h.mint, h.maxt) { - s.Unlock() return nil, 0, storage.ErrNotFound } @@ -395,7 +399,6 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. return nil, 0, err } } - s.Unlock() return &safeHeadChunk{ Chunk: chk, From da31da3ea6f46da2b3c605d5a85c4d3fc80dd560 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 27 Jun 2024 10:36:25 +0100 Subject: [PATCH 05/17] Refactor: extract selectSeriesSet and selectChunkSeriesSet Signed-off-by: Bryan Boreham --- tsdb/querier.go | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/tsdb/querier.go b/tsdb/querier.go index 910c2d7fc..37456d7e2 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -115,20 +115,24 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { } func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { - mint := q.mint - maxt := q.maxt + return selectSeriesSet(ctx, sortSeries, hints, ms, q.index, q.chunks, q.tombstones, q.mint, q.maxt) +} + +func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64, +) storage.SeriesSet { disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 - p, err := PostingsForMatchers(ctx, q.index, ms...) + p, err := PostingsForMatchers(ctx, index, ms...) if err != nil { return storage.ErrSeriesSet(err) } if sharded { - p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) } if sortSeries { - p = q.index.SortedPostings(p) + p = index.SortedPostings(p) } if hints != nil { @@ -137,11 +141,11 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora disableTrimming = hints.DisableTrimming if hints.Func == "series" { // When you're only looking up metadata (for example series API), you don't need to load any chunks. - return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming) + return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming) } } - return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) + return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming) } // blockChunkQuerier provides chunk querying access to a single block database. @@ -159,8 +163,12 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier } func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { - mint := q.mint - maxt := q.maxt + return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt) +} + +func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + blockID ulid.ULID, index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64, +) storage.ChunkSeriesSet { disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 @@ -169,17 +177,17 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints * maxt = hints.End disableTrimming = hints.DisableTrimming } - p, err := PostingsForMatchers(ctx, q.index, ms...) + p, err := PostingsForMatchers(ctx, index, ms...) if err != nil { return storage.ErrChunkSeriesSet(err) } if sharded { - p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) } if sortSeries { - p = q.index.SortedPostings(p) + p = index.SortedPostings(p) } - return NewBlockChunkSeriesSet(q.blockID, q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) + return NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming) } // PostingsForMatchers assembles a single postings iterator against the index reader From 2936ab80d7dbc0c944d99346ea7ab26449fe82d3 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 27 Jun 2024 12:47:31 +0100 Subject: [PATCH 06/17] [Tests] Promtool: Sort output where Prometheus does not guarantee the order. Previously this was working because iout-of-order chunks forced a sort and merge. Signed-off-by: Bryan Boreham --- cmd/promtool/tsdb_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/cmd/promtool/tsdb_test.go b/cmd/promtool/tsdb_test.go index 75089b168..d7cc56088 100644 --- a/cmd/promtool/tsdb_test.go +++ b/cmd/promtool/tsdb_test.go @@ -20,6 +20,7 @@ import ( "math" "os" "runtime" + "slices" "strings" "testing" "time" @@ -152,12 +153,18 @@ func TestTSDBDump(t *testing.T) { expectedMetrics, err := os.ReadFile(tt.expectedDump) require.NoError(t, err) expectedMetrics = normalizeNewLine(expectedMetrics) - // even though in case of one matcher samples are not sorted, the order in the cases above should stay the same. - require.Equal(t, string(expectedMetrics), dumpedMetrics) + // Sort both, because Prometheus does not guarantee the output order. + require.Equal(t, sortLines(string(expectedMetrics)), sortLines(dumpedMetrics)) }) } } +func sortLines(buf string) string { + lines := strings.Split(buf, "\n") + slices.Sort(lines) + return strings.Join(lines, "\n") +} + func TestTSDBDumpOpenMetrics(t *testing.T) { storage := promqltest.LoadedStorage(t, ` load 1m @@ -169,7 +176,7 @@ func TestTSDBDumpOpenMetrics(t *testing.T) { require.NoError(t, err) expectedMetrics = normalizeNewLine(expectedMetrics) dumpedMetrics := getDumpedSamples(t, storage.Dir(), math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics) - require.Equal(t, string(expectedMetrics), dumpedMetrics) + require.Equal(t, sortLines(string(expectedMetrics)), sortLines(dumpedMetrics)) } func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) { From e04d137649697ea59b0e5dbfad965ae24d6c0faa Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 24 Jun 2024 13:41:44 +0100 Subject: [PATCH 07/17] [PERF] TSDB: Query head and ooo-head together Add `HeadAndOOOQuerier` which iterates just once over series, then where necessary merges chunks from in-order and out-of-order lists. Add a ChunkQuerier for in-order and ooo together Add copy-last-chunk behaviour to HeadAndOOOChunkReader Out-of-order chunk IDs are distinguished from in-order by setting bit 23. Signed-off-by: Bryan Boreham --- tsdb/db.go | 58 +++++++----- tsdb/head_read.go | 51 +++++++++-- tsdb/ooo_head_read.go | 182 ++++++++++++++++++++++++++++++++++++- tsdb/ooo_head_read_test.go | 2 +- tsdb/querier.go | 12 ++- 5 files changed, 263 insertions(+), 42 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 1c430c211..bf1893ec0 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -2029,7 +2029,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } } - blockQueriers := make([]storage.Querier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers + blockQueriers := make([]storage.Querier, 0, len(blocks)+1) // +1 to allow for possible head querier. defer func() { if err != nil { @@ -2041,10 +2041,11 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } }() + var headQuerier storage.Querier if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) var err error - inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head %s: %w", rh, err) } @@ -2054,36 +2055,40 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { // won't run into a race later since any truncation that comes after will wait on this querier if it overlaps. shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt) if shouldClose { - if err := inOrderHeadQuerier.Close(); err != nil { + if err := headQuerier.Close(); err != nil { return nil, fmt.Errorf("closing head block querier %s: %w", rh, err) } - inOrderHeadQuerier = nil + headQuerier = nil } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) + headQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err) } } - - if inOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, inOrderHeadQuerier) - } } - if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { + if headQuerier != nil { + if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { + // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. + isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) + headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier) + } + } else if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) var err error - outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockQuerierFunc(rh, mint, maxt) if err != nil { // If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead. rh.isoState.Close() return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err) } + } - blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) + if headQuerier != nil { + blockQueriers = append(blockQueriers, headQuerier) } for _, b := range blocks { @@ -2111,7 +2116,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } } - blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers + blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+1) // +1 to allow for possible head querier. defer func() { if err != nil { @@ -2123,9 +2128,10 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } }() + var headQuerier storage.ChunkQuerier if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) - inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head %s: %w", rh, err) } @@ -2135,35 +2141,39 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer // won't run into a race later since any truncation that comes after will wait on this querier if it overlaps. shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt) if shouldClose { - if err := inOrderHeadQuerier.Close(); err != nil { + if err := headQuerier.Close(); err != nil { return nil, fmt.Errorf("closing head querier %s: %w", rh, err) } - inOrderHeadQuerier = nil + headQuerier = nil } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) + headQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err) } } - - if inOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, inOrderHeadQuerier) - } } - if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { + if headQuerier != nil { + if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { + // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. + isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) + headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier) + } + } else if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. rh.isoState.Close() return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err) } + } - blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) + if headQuerier != nil { + blockQueriers = append(blockQueriers, headQuerier) } for _, b := range blocks { diff --git a/tsdb/head_read.go b/tsdb/head_read.go index d75d28a58..977d6b978 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -248,12 +248,20 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID { return chunks.HeadChunkID(pos) + s.firstChunkID } +const oooChunkIDMask = 1 << 23 + // oooHeadChunkID returns the HeadChunkID referred to by the given position. +// Only the bottom 24 bits are used. Bit 23 is always 1 for an OOO chunk; for the rest: // * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos] // * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk // The caller must ensure that s.ooo is not nil. func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID { - return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID + return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask +} + +func unpackHeadChunkRef(ref chunks.ChunkRef) (chunks.HeadSeriesRef, chunks.HeadChunkID, bool) { + sid, cid := chunks.HeadChunkRef(ref).Unpack() + return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0 } // LabelValueFor returns label value for the given label name in the series referred to by ID. @@ -343,10 +351,15 @@ func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chu return chk, nil, err } -// ChunkWithCopy returns the chunk for the reference number. -// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk. -func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) { - return h.chunk(meta, true) +type ChunkReaderWithCopy interface { + ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) +} + +// ChunkOrIterableWithCopy returns the chunk for the reference number. +// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk. +func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + chk, maxTime, err := h.chunk(meta, true) + return chk, nil, maxTime, err } // chunk returns the chunk for the reference number. @@ -472,10 +485,11 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi // chunks.Meta reference from memory or by m-mapping it from the disk. The // returned iterable will be a merge of all the overlapping chunks, if any, // amongst all the chunks in the OOOHead. +// If hr is non-nil then in-order chunks are included. // This function is not thread safe unless the caller holds a lock. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { - _, cid := chunks.HeadChunkRef(meta.Ref).Unpack() +func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { + _, cid, _ := unpackHeadChunkRef(meta.Ref) // ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are // incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index. @@ -516,6 +530,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta}) } + if hr != nil { // Include in-order chunks. + var metas []chunks.Meta + getSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), &metas) + for _, m := range metas { + tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ + meta: m, + ref: 0, // This tells the loop below it's an in-order head chunk. + }) + } + } + // Next we want to sort all the collected chunks by min time so we can find // those that overlap and stop when we know the rest don't. slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef) @@ -527,9 +552,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe continue } var iterable chunkenc.Iterable - if c.meta.Chunk != nil { + switch { + case c.meta.Chunk != nil: iterable = c.meta.Chunk - } else { + case c.ref == 0: // This is an in-order head chunk. + _, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack() + var err error + iterable, _, err = hr.chunkFromSeries(s, cid, false) + if err != nil { + return nil, fmt.Errorf("invalid head chunk: %w", err) + } + default: chk, err := cdm.Chunk(c.ref) if err != nil { var cerr *chunks.CorruptionErr diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 892d2c4b6..b7944c56e 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" ) var _ IndexReader = &OOOHeadIndexReader{} @@ -92,10 +93,10 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra return nil } - return getOOOSeriesChunks(s, oh.mint, oh.maxt, lastGarbageCollectedMmapRef, maxMmapRef, chks) + return getOOOSeriesChunks(s, oh.mint, oh.maxt, lastGarbageCollectedMmapRef, maxMmapRef, false, chks) } -func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, chks *[]chunks.Meta) error { +func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -135,6 +136,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap } } + if includeInOrder { + getSeriesChunks(s, mint, maxt, &tmpChks) + } + // There is nothing to do if we did not collect any chunk. if len(tmpChks) == 0 { return nil @@ -275,7 +280,7 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, s.Unlock() return nil, nil, storage.ErrNotFound } - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef) + mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, nil, cr.mint, cr.maxt, cr.maxMmapRef) s.Unlock() if err != nil { return nil, nil, err @@ -498,3 +503,174 @@ func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, posti func (ir *OOOCompactionHeadIndexReader) Close() error { return ir.ch.oooIR.Close() } + +// HeadAndOOOQuerier queries both the head and the out-of-order head. +type HeadAndOOOQuerier struct { + mint, maxt int64 + head *Head + index IndexReader + chunkr ChunkReader + querier storage.Querier +} + +func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { + isoState := head.iso.State(mint, maxt) + return &HeadAndOOOQuerier{ + mint: mint, + maxt: maxt, + head: head, + index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), + chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0), + querier: querier, + } +} + +func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelValues(ctx, name, hints, matchers...) +} + +func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelNames(ctx, hints, matchers...) +} + +func (q *HeadAndOOOQuerier) Close() error { + q.chunkr.Close() + return q.querier.Close() +} + +func (q *HeadAndOOOQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return selectSeriesSet(ctx, sortSeries, hints, matchers, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt) +} + +// HeadAndOOOChunkQuerier queries both the head and the out-of-order head. +type HeadAndOOOChunkQuerier struct { + mint, maxt int64 + head *Head + index IndexReader + chunkr ChunkReader + querier storage.ChunkQuerier +} + +func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier { + isoState := head.iso.State(mint, maxt) + return &HeadAndOOOChunkQuerier{ + mint: mint, + maxt: maxt, + head: head, + index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), + chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0), + querier: querier, + } +} + +func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelValues(ctx, name, hints, matchers...) +} + +func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelNames(ctx, hints, matchers...) +} + +func (q *HeadAndOOOChunkQuerier) Close() error { + q.chunkr.Close() + return q.querier.Close() +} + +func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt) +} + +type HeadAndOOOIndexReader struct { + *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. + lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef +} + +func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader { + hr := &headIndexReader{ + head: head, + mint: mint, + maxt: maxt, + } + return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef} +} + +func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { + s := oh.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s == nil { + oh.head.metrics.seriesNotFound.Inc() + return storage.ErrNotFound + } + builder.Assign(s.lset) + + if chks == nil { + return nil + } + + s.Lock() + defer s.Unlock() + *chks = (*chks)[:0] + + if s.ooo != nil { + return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) + } + getSeriesChunks(s, oh.mint, oh.maxt, chks) + return nil +} + +type HeadAndOOOChunkReader struct { + cr headChunkReader + maxMmapRef chunks.ChunkDiskMapperRef + oooIsoState *oooIsolationState +} + +func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, isoState *isolationState, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { + return &HeadAndOOOChunkReader{ + cr: headChunkReader{ + head: head, + mint: mint, + maxt: maxt, + isoState: isoState, + }, + maxMmapRef: maxMmapRef, + oooIsoState: oooIsoState, + } +} + +func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { + sid, _, isOOO := unpackHeadChunkRef(meta.Ref) + if !isOOO { + return cr.cr.ChunkOrIterable(meta) + } + + s := cr.cr.head.series.getByID(sid) + // This means that the series has been garbage collected. + if s == nil { + return nil, nil, storage.ErrNotFound + } + + s.Lock() + mc, err := s.oooMergedChunks(meta, cr.cr.head.chunkDiskMapper, &cr.cr, cr.cr.mint, cr.cr.maxt, cr.maxMmapRef) + s.Unlock() + + return nil, mc, err +} + +// Pass through special behaviour for current head chunk. +func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + _, _, isOOO := unpackHeadChunkRef(meta.Ref) + if !isOOO { + return cr.cr.ChunkOrIterableWithCopy(meta) + } + chk, iter, err := cr.ChunkOrIterable(meta) + return chk, iter, 0, err +} + +func (cr *HeadAndOOOChunkReader) Close() error { + if cr.cr.isoState != nil { + cr.cr.isoState.Close() + } + if cr.oooIsoState != nil { + cr.oooIsoState.Close() + } + return nil +} diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 8cc3f1dde..08c5c4a3e 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -316,7 +316,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Ref to whatever Ref the chunk has, that we refer to by ID for ref, c := range intervals { if c.ID == e.ID { - meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), chunks.HeadChunkID(ref))) + meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref))) break } } diff --git a/tsdb/querier.go b/tsdb/querier.go index 37456d7e2..2e15f0b08 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -641,14 +641,16 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { } } - hcr, ok := p.cr.(*headChunkReader) + hcr, ok := p.cr.(ChunkReaderWithCopy) var iterable chunkenc.Iterable if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 { - // ChunkWithCopy will copy the head chunk. + // ChunkOrIterableWithCopy will copy the head chunk, if it can. var maxt int64 - p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta) - // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. - p.currMeta.MaxTime = maxt + p.currMeta.Chunk, iterable, maxt, p.err = hcr.ChunkOrIterableWithCopy(p.currMeta) + if p.currMeta.Chunk != nil { + // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. + p.currMeta.MaxTime = maxt + } } else { p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta) } From 6529d6336cc277aefef78a595128a65a719e86a0 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 15 Jul 2024 20:07:12 +0100 Subject: [PATCH 08/17] TSDB: NewHeadAndOOOChunkReader takes headChunkReader So we can pass nil and have it read just OOO chunks. Signed-off-by: Bryan Boreham --- tsdb/ooo_head_read.go | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index b7944c56e..4be4e9e18 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -514,13 +514,18 @@ type HeadAndOOOQuerier struct { } func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { - isoState := head.iso.State(mint, maxt) + cr := &headChunkReader{ + head: head, + mint: mint, + maxt: maxt, + isoState: head.iso.State(mint, maxt), + } return &HeadAndOOOQuerier{ mint: mint, maxt: maxt, head: head, index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), - chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0), + chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), querier: querier, } } @@ -552,13 +557,18 @@ type HeadAndOOOChunkQuerier struct { } func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier { - isoState := head.iso.State(mint, maxt) + cr := &headChunkReader{ + head: head, + mint: mint, + maxt: maxt, + isoState: head.iso.State(mint, maxt), + } return &HeadAndOOOChunkQuerier{ mint: mint, maxt: maxt, head: head, index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), - chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0), + chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), querier: querier, } } @@ -618,19 +628,19 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S } type HeadAndOOOChunkReader struct { - cr headChunkReader + head *Head + mint, maxt int64 + cr *headChunkReader // If nil, only read OOO chunks. maxMmapRef chunks.ChunkDiskMapperRef oooIsoState *oooIsolationState } -func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, isoState *isolationState, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { +func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { return &HeadAndOOOChunkReader{ - cr: headChunkReader{ - head: head, - mint: mint, - maxt: maxt, - isoState: isoState, - }, + head: head, + mint: mint, + maxt: maxt, + cr: cr, maxMmapRef: maxMmapRef, oooIsoState: oooIsoState, } @@ -642,14 +652,14 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu return cr.cr.ChunkOrIterable(meta) } - s := cr.cr.head.series.getByID(sid) + s := cr.head.series.getByID(sid) // This means that the series has been garbage collected. if s == nil { return nil, nil, storage.ErrNotFound } s.Lock() - mc, err := s.oooMergedChunks(meta, cr.cr.head.chunkDiskMapper, &cr.cr, cr.cr.mint, cr.cr.maxt, cr.maxMmapRef) + mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) s.Unlock() return nil, mc, err @@ -666,7 +676,7 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chun } func (cr *HeadAndOOOChunkReader) Close() error { - if cr.cr.isoState != nil { + if cr.cr != nil && cr.cr.isoState != nil { cr.cr.isoState.Close() } if cr.oooIsoState != nil { From f26159794434d20c0ec3081d0bd080b37756cc60 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 15 Jul 2024 20:56:55 +0100 Subject: [PATCH 09/17] TSDB: Fix up LabelValues to work for OOO-only head Signed-off-by: Bryan Boreham --- tsdb/ooo_head_read.go | 14 ++++++++++++++ tsdb/ooo_head_read_test.go | 10 +++++----- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 4be4e9e18..f844cfaca 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -627,6 +627,20 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S return nil } +// LabelValues needs to be overridden from the headIndexReader implementation +// so we can return labels within either in-order range or ooo range. +func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() { + return []string{}, nil + } + + if len(matchers) == 0 { + return oh.head.postings.LabelValues(ctx, name), nil + } + + return labelValuesWithMatchers(ctx, oh, name, matchers...) +} + type HeadAndOOOChunkReader struct { head *Head mint, maxt int64 diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 08c5c4a3e..b837b9e2f 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -421,17 +421,17 @@ func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenari name: "LabelValues calls with ooo head query range not overlapping out-of-order data", queryMinT: 100, queryMaxT: 100, - expValues1: []string{}, - expValues2: []string{}, - expValues3: []string{}, - expValues4: []string{}, + expValues1: []string{"bar1"}, + expValues2: nil, + expValues3: []string{"bar1", "bar2"}, + expValues4: []string{"bar1", "bar2"}, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { // We first want to test using a head index reader that covers the biggest query interval - oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT, 0) + oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0) matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} values, err := oh.LabelValues(ctx, "foo", matchers...) sort.Strings(values) From 0a2ff76881a82bd2751cd3f316494b9ab5621b07 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 15 Jul 2024 18:17:48 +0100 Subject: [PATCH 10/17] TSDB tests: Fix up BenchmarkQueries Was not working even on main. Some cases still error. Signed-off-by: Bryan Boreham --- tsdb/querier_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index ffdf8dc02..50525f65f 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -3169,12 +3169,11 @@ func BenchmarkQueries(b *testing.B) { qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples) require.NoError(b, err) - qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples, 0), 1, nSamples) - require.NoError(b, err) + isoState := head.oooIso.TrackReadAfter(0) + qOOOHead := NewHeadAndOOOQuerier(1, nSamples, head, isoState, qHead) queryTypes = append(queryTypes, qt{ - fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), - storage.NewMergeQuerier([]storage.Querier{qHead, qOOOHead}, nil, storage.ChainedSeriesMerge), + fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead, }) } From e7e50a3afd285136366ebbb0270cce442df3c1b1 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 15 Jul 2024 18:27:31 +0100 Subject: [PATCH 11/17] TSDB: Remove code for querying OOO-head only Just query via `HeadAndOOOQuerier`, which will skip series where no in-order chunks are in range. Now we don't need `OOORangeHead`. Signed-off-by: Bryan Boreham --- tsdb/db.go | 45 ++++++++-------------------- tsdb/ooo_head.go | 78 ------------------------------------------------ 2 files changed, 12 insertions(+), 111 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index bf1893ec0..94c44161d 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -2041,8 +2041,9 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } }() + overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) var headQuerier storage.Querier - if maxt >= db.head.MinTime() { + if maxt >= db.head.MinTime() || overlapsOOO { rh := NewRangeHead(db.head, mint, maxt) var err error headQuerier, err = db.blockQuerierFunc(rh, mint, maxt) @@ -2069,22 +2070,10 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } } - if headQuerier != nil { - if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { - // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. - isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) - headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier) - } - } else if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { - rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - var err error - headQuerier, err = db.blockQuerierFunc(rh, mint, maxt) - if err != nil { - // If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead. - rh.isoState.Close() - - return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err) - } + if overlapsOOO { + // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. + isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) + headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier) } if headQuerier != nil { @@ -2128,8 +2117,9 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } }() + overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) var headQuerier storage.ChunkQuerier - if maxt >= db.head.MinTime() { + if maxt >= db.head.MinTime() || overlapsOOO { rh := NewRangeHead(db.head, mint, maxt) headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { @@ -2155,21 +2145,10 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } } - if headQuerier != nil { - if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { - // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. - isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) - headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier) - } - } else if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { - rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) - if err != nil { - // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. - rh.isoState.Close() - - return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err) - } + if overlapsOOO { + // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. + isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) + headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier) } if headQuerier != nil { diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 209b14673..0ed9f3648 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -14,16 +14,10 @@ package tsdb import ( - "fmt" "sort" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" - - "github.com/oklog/ulid" - - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/tombstones" ) // OOOChunk maintains samples in time-ascending order. @@ -171,75 +165,3 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error } return chks, nil } - -var _ BlockReader = &OOORangeHead{} - -// OOORangeHead allows querying Head out of order samples via BlockReader -// interface implementation. -type OOORangeHead struct { - head *Head - // mint and maxt are tracked because when a query is handled we only want - // the timerange of the query and having preexisting pointers to the first - // and last timestamp help with that. - mint, maxt int64 - - isoState *oooIsolationState -} - -func NewOOORangeHead(head *Head, mint, maxt int64, minRef chunks.ChunkDiskMapperRef) *OOORangeHead { - isoState := head.oooIso.TrackReadAfter(minRef) - - return &OOORangeHead{ - head: head, - mint: mint, - maxt: maxt, - isoState: isoState, - } -} - -func (oh *OOORangeHead) Index() (IndexReader, error) { - return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt, oh.isoState.minRef), nil -} - -func (oh *OOORangeHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState, 0), nil -} - -func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) { - // As stated in the design doc https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing - // Tombstones are not supported for out of order metrics. - return tombstones.NewMemTombstones(), nil -} - -var oooRangeHeadULID = ulid.MustParse("0000000000XXXX000RANGEHEAD") - -func (oh *OOORangeHead) Meta() BlockMeta { - return BlockMeta{ - MinTime: oh.mint, - MaxTime: oh.maxt, - ULID: oooRangeHeadULID, - Stats: BlockStats{ - NumSeries: oh.head.NumSeries(), - }, - } -} - -// Size returns the size taken by the Head block. -func (oh *OOORangeHead) Size() int64 { - return oh.head.Size() -} - -// String returns an human readable representation of the out of order range -// head. It's important to keep this function in order to avoid the struct dump -// when the head is stringified in errors or logs. -func (oh *OOORangeHead) String() string { - return fmt.Sprintf("ooo range head (mint: %d, maxt: %d)", oh.MinTime(), oh.MaxTime()) -} - -func (oh *OOORangeHead) MinTime() int64 { - return oh.mint -} - -func (oh *OOORangeHead) MaxTime() int64 { - return oh.maxt -} From a299c7b6d61cbbfc898962acb3e88430bd7e048e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 15 Jul 2024 20:10:17 +0100 Subject: [PATCH 12/17] TSDB: Remove OOOHeadChunkReader Use HeadAndOOOChunkReader instead. Signed-off-by: Bryan Boreham --- tsdb/ooo_head_read.go | 55 +------------------------------------- tsdb/ooo_head_read_test.go | 8 +++--- 2 files changed, 5 insertions(+), 58 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index f844cfaca..01ba12986 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -248,59 +248,6 @@ func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values } } -type OOOHeadChunkReader struct { - head *Head - mint, maxt int64 - isoState *oooIsolationState - maxMmapRef chunks.ChunkDiskMapperRef -} - -func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader { - return &OOOHeadChunkReader{ - head: head, - mint: mint, - maxt: maxt, - isoState: isoState, - maxMmapRef: maxMmapRef, - } -} - -func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { - sid, _ := chunks.HeadChunkRef(meta.Ref).Unpack() - - s := cr.head.series.getByID(sid) - // This means that the series has been garbage collected. - if s == nil { - return nil, nil, storage.ErrNotFound - } - - s.Lock() - if s.ooo == nil { - // There is no OOO data for this series. - s.Unlock() - return nil, nil, storage.ErrNotFound - } - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, nil, cr.mint, cr.maxt, cr.maxMmapRef) - s.Unlock() - if err != nil { - return nil, nil, err - } - - // This means that the query range did not overlap with the requested chunk. - if len(mc.chunkIterables) == 0 { - return nil, nil, storage.ErrNotFound - } - - return nil, mc, nil -} - -func (cr OOOHeadChunkReader) Close() error { - if cr.isoState != nil { - cr.isoState.Close() - } - return nil -} - type OOOCompactionHead struct { oooIR *OOOHeadIndexReader lastMmapRef chunks.ChunkDiskMapperRef @@ -397,7 +344,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) { } func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil + return NewHeadAndOOOChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, nil, ch.lastMmapRef), nil } func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index b837b9e2f..c0b130ffb 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -481,10 +481,10 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) { db := newTestDBWithOpts(t, opts) - cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0) + cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ - Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, + Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, }) require.Nil(t, iterable) require.Equal(t, err, fmt.Errorf("not found")) @@ -839,7 +839,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, err) require.Equal(t, len(tc.expChunksSamples), len(chks)) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) + cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) @@ -1013,7 +1013,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( } require.NoError(t, app.Commit()) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) + cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) From 26b3de04387b38fc633ba2ce0931fdf65059086d Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 16 Jul 2024 13:56:22 +0100 Subject: [PATCH 13/17] TSDB: Remove OOOHeadIndexReader Use headIndexReader instead. OOOCompactionHeadIndexReader needs to be expanded slightly, because it previously delegated to OOOHeadIndexReader. Signed-off-by: Bryan Boreham --- tsdb/ooo_head_read.go | 127 ++++++++++--------------------------- tsdb/ooo_head_read_test.go | 6 +- 2 files changed, 35 insertions(+), 98 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 01ba12986..aad1d2fa8 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -30,19 +30,6 @@ import ( "github.com/prometheus/prometheus/util/annotations" ) -var _ IndexReader = &OOOHeadIndexReader{} - -// OOOHeadIndexReader implements IndexReader so ooo samples in the head can be -// accessed. -// It also has a reference to headIndexReader so we can leverage on its -// IndexReader implementation for all the methods that remain the same. We -// decided to do this to avoid code duplication. -// The only methods that change are the ones about getting Series and Postings. -type OOOHeadIndexReader struct { - *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. - lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef -} - var _ chunkenc.Iterable = &mergedOOOChunks{} // mergedOOOChunks holds the list of iterables for overlapping chunks. @@ -54,48 +41,11 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables) } -func NewOOOHeadIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *OOOHeadIndexReader { - hr := &headIndexReader{ - head: head, - mint: mint, - maxt: maxt, - } - return &OOOHeadIndexReader{hr, lastGarbageCollectedMmapRef} -} - -func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { - return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0) -} - // lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so // any chunk at or before this ref will not be considered. 0 disables this check. // // maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then // the oooHeadChunk will not be considered. -func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef) error { - s := oh.head.series.getByID(chunks.HeadSeriesRef(ref)) - - if s == nil { - oh.head.metrics.seriesNotFound.Inc() - return storage.ErrNotFound - } - builder.Assign(s.labels()) - - if chks == nil { - return nil - } - - s.Lock() - defer s.Unlock() - *chks = (*chks)[:0] - - if s.ooo == nil { - return nil - } - - return getOOOSeriesChunks(s, oh.mint, oh.maxt, lastGarbageCollectedMmapRef, maxMmapRef, false, chks) -} - func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) @@ -176,21 +126,6 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap return nil } -// LabelValues needs to be overridden from the headIndexReader implementation due -// to the check that happens at the beginning where we make sure that the query -// interval overlaps with the head minooot and maxooot. -func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { - if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() { - return []string{}, nil - } - - if len(matchers) == 0 { - return oh.head.postings.LabelValues(ctx, name), nil - } - - return labelValuesWithMatchers(ctx, oh, name, matchers...) -} - type chunkMetaAndChunkDiskMapperRef struct { meta chunks.Meta ref chunks.ChunkDiskMapperRef @@ -232,24 +167,8 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int { } } -func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { - switch len(values) { - case 0: - return index.EmptyPostings(), nil - case 1: - return oh.head.postings.Get(name, values[0]), nil // TODO(ganesh) Also call GetOOOPostings - default: - // TODO(ganesh) We want to only return postings for out of order series. - res := make([]index.Postings, 0, len(values)) - for _, value := range values { - res = append(res, oh.head.postings.Get(name, value)) // TODO(ganesh) Also call GetOOOPostings - } - return index.Merge(ctx, res...), nil - } -} - type OOOCompactionHead struct { - oooIR *OOOHeadIndexReader + head *Head lastMmapRef chunks.ChunkDiskMapperRef lastWBLFile int postings []storage.SeriesRef @@ -266,6 +185,7 @@ type OOOCompactionHead struct { // on the sample append latency. So call NewOOOCompactionHead only right before compaction. func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error) { ch := &OOOCompactionHead{ + head: head, chunkRange: head.chunkRange.Load(), mint: math.MaxInt64, maxt: math.MinInt64, @@ -279,15 +199,14 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, ch.lastWBLFile = lastWBLFile } - ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64, 0) + hr := headIndexReader{head: head, mint: ch.mint, maxt: ch.maxt} n, v := index.AllPostingsKey() - - // TODO: verify this gets only ooo samples. - p, err := ch.oooIR.Postings(ctx, n, v) + // TODO: filter to series with OOO samples, before sorting. + p, err := hr.Postings(ctx, n, v) if err != nil { return nil, err } - p = ch.oooIR.SortedPostings(p) + p = hr.SortedPostings(p) var lastSeq, lastOff int for p.Next() { @@ -344,7 +263,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) { } func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) { - return NewHeadAndOOOChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, nil, ch.lastMmapRef), nil + return NewHeadAndOOOChunkReader(ch.head, ch.mint, ch.maxt, nil, nil, ch.lastMmapRef), nil } func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) { @@ -370,12 +289,12 @@ func (ch *OOOCompactionHead) Meta() BlockMeta { // Only the method of BlockReader interface are valid for the cloned OOOCompactionHead. func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead { return &OOOCompactionHead{ - oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt, 0), + head: ch.head, lastMmapRef: ch.lastMmapRef, postings: ch.postings, chunkRange: ch.chunkRange, - mint: ch.mint, - maxt: ch.maxt, + mint: mint, + maxt: maxt, } } @@ -395,7 +314,8 @@ func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader { } func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter { - return ir.ch.oooIR.Symbols() + hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt} + return hr.Symbols() } func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error) { @@ -416,11 +336,28 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P } func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { - return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount) + hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt} + return hr.ShardedPostings(p, shardIndex, shardCount) } func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { - return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef) + s := ir.ch.head.series.getByID(chunks.HeadSeriesRef(ref)) + + if s == nil { + ir.ch.head.metrics.seriesNotFound.Inc() + return storage.ErrNotFound + } + builder.Assign(s.lset) + + s.Lock() + defer s.Unlock() + *chks = (*chks)[:0] + + if s.ooo == nil { + return nil + } + + return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, chks) } func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { @@ -448,7 +385,7 @@ func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, posti } func (ir *OOOCompactionHeadIndexReader) Close() error { - return ir.ch.oooIR.Close() + return nil } // HeadAndOOOQuerier queries both the head and the out-of-order head. diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index c0b130ffb..f71d49732 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -341,7 +341,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { }) } - ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder @@ -832,7 +832,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { // The Series method populates the chunk metas, taking a copy of the // head OOO chunk if necessary. These are then used by the ChunkReader. - ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err = ir.Series(s1Ref, &b, &chks) @@ -997,7 +997,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( // The Series method populates the chunk metas, taking a copy of the // head OOO chunk if necessary. These are then used by the ChunkReader. - ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err = ir.Series(s1Ref, &b, &chks) From e95607b2765bf9b0492342d08b07c3b5e31089bc Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 16 Jul 2024 14:18:55 +0100 Subject: [PATCH 14/17] TSDB: Lock round access to labels, where necessary Signed-off-by: Bryan Boreham --- tsdb/ooo_head_read.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index aad1d2fa8..e1881aef8 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -347,7 +347,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l ir.ch.head.metrics.seriesNotFound.Inc() return storage.ErrNotFound } - builder.Assign(s.lset) + builder.Assign(s.labels()) s.Lock() defer s.Unlock() @@ -494,7 +494,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S oh.head.metrics.seriesNotFound.Inc() return storage.ErrNotFound } - builder.Assign(s.lset) + builder.Assign(s.labels()) if chks == nil { return nil From 7ffd3ca2807326b76d1c2c19dc769163a9280eed Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 16 Jul 2024 14:20:18 +0100 Subject: [PATCH 15/17] TSDB: Cosmetic: move HeadAndOOO implementations where old code was This makes the diffs easier to follow. Signed-off-by: Bryan Boreham --- tsdb/ooo_head_read.go | 221 +++++++++++++++++++++--------------------- 1 file changed, 112 insertions(+), 109 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index e1881aef8..aaaa24963 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -30,6 +30,13 @@ import ( "github.com/prometheus/prometheus/util/annotations" ) +var _ IndexReader = &HeadAndOOOIndexReader{} + +type HeadAndOOOIndexReader struct { + *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. + lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef +} + var _ chunkenc.Iterable = &mergedOOOChunks{} // mergedOOOChunks holds the list of iterables for overlapping chunks. @@ -41,6 +48,39 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables) } +func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader { + hr := &headIndexReader{ + head: head, + mint: mint, + maxt: maxt, + } + return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef} +} + +func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { + s := oh.head.series.getByID(chunks.HeadSeriesRef(ref)) + + if s == nil { + oh.head.metrics.seriesNotFound.Inc() + return storage.ErrNotFound + } + builder.Assign(s.labels()) + + if chks == nil { + return nil + } + + s.Lock() + defer s.Unlock() + *chks = (*chks)[:0] + + if s.ooo != nil { + return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) + } + getSeriesChunks(s, oh.mint, oh.maxt, chks) + return nil +} + // lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so // any chunk at or before this ref will not be considered. 0 disables this check. // @@ -126,6 +166,20 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap return nil } +// LabelValues needs to be overridden from the headIndexReader implementation +// so we can return labels within either in-order range or ooo range. +func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() { + return []string{}, nil + } + + if len(matchers) == 0 { + return oh.head.postings.LabelValues(ctx, name), nil + } + + return labelValuesWithMatchers(ctx, oh, name, matchers...) +} + type chunkMetaAndChunkDiskMapperRef struct { meta chunks.Meta ref chunks.ChunkDiskMapperRef @@ -167,6 +221,64 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int { } } +type HeadAndOOOChunkReader struct { + head *Head + mint, maxt int64 + cr *headChunkReader // If nil, only read OOO chunks. + maxMmapRef chunks.ChunkDiskMapperRef + oooIsoState *oooIsolationState +} + +func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { + return &HeadAndOOOChunkReader{ + head: head, + mint: mint, + maxt: maxt, + cr: cr, + maxMmapRef: maxMmapRef, + oooIsoState: oooIsoState, + } +} + +func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { + sid, _, isOOO := unpackHeadChunkRef(meta.Ref) + if !isOOO { + return cr.cr.ChunkOrIterable(meta) + } + + s := cr.head.series.getByID(sid) + // This means that the series has been garbage collected. + if s == nil { + return nil, nil, storage.ErrNotFound + } + + s.Lock() + mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) + s.Unlock() + + return nil, mc, err +} + +// Pass through special behaviour for current head chunk. +func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + _, _, isOOO := unpackHeadChunkRef(meta.Ref) + if !isOOO { + return cr.cr.ChunkOrIterableWithCopy(meta) + } + chk, iter, err := cr.ChunkOrIterable(meta) + return chk, iter, 0, err +} + +func (cr *HeadAndOOOChunkReader) Close() error { + if cr.cr != nil && cr.cr.isoState != nil { + cr.cr.isoState.Close() + } + if cr.oooIsoState != nil { + cr.oooIsoState.Close() + } + return nil +} + type OOOCompactionHead struct { head *Head lastMmapRef chunks.ChunkDiskMapperRef @@ -473,112 +585,3 @@ func (q *HeadAndOOOChunkQuerier) Close() error { func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt) } - -type HeadAndOOOIndexReader struct { - *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. - lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef -} - -func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader { - hr := &headIndexReader{ - head: head, - mint: mint, - maxt: maxt, - } - return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef} -} - -func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { - s := oh.head.series.getByID(chunks.HeadSeriesRef(ref)) - if s == nil { - oh.head.metrics.seriesNotFound.Inc() - return storage.ErrNotFound - } - builder.Assign(s.labels()) - - if chks == nil { - return nil - } - - s.Lock() - defer s.Unlock() - *chks = (*chks)[:0] - - if s.ooo != nil { - return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) - } - getSeriesChunks(s, oh.mint, oh.maxt, chks) - return nil -} - -// LabelValues needs to be overridden from the headIndexReader implementation -// so we can return labels within either in-order range or ooo range. -func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { - if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() { - return []string{}, nil - } - - if len(matchers) == 0 { - return oh.head.postings.LabelValues(ctx, name), nil - } - - return labelValuesWithMatchers(ctx, oh, name, matchers...) -} - -type HeadAndOOOChunkReader struct { - head *Head - mint, maxt int64 - cr *headChunkReader // If nil, only read OOO chunks. - maxMmapRef chunks.ChunkDiskMapperRef - oooIsoState *oooIsolationState -} - -func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { - return &HeadAndOOOChunkReader{ - head: head, - mint: mint, - maxt: maxt, - cr: cr, - maxMmapRef: maxMmapRef, - oooIsoState: oooIsoState, - } -} - -func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { - sid, _, isOOO := unpackHeadChunkRef(meta.Ref) - if !isOOO { - return cr.cr.ChunkOrIterable(meta) - } - - s := cr.head.series.getByID(sid) - // This means that the series has been garbage collected. - if s == nil { - return nil, nil, storage.ErrNotFound - } - - s.Lock() - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) - s.Unlock() - - return nil, mc, err -} - -// Pass through special behaviour for current head chunk. -func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { - _, _, isOOO := unpackHeadChunkRef(meta.Ref) - if !isOOO { - return cr.cr.ChunkOrIterableWithCopy(meta) - } - chk, iter, err := cr.ChunkOrIterable(meta) - return chk, iter, 0, err -} - -func (cr *HeadAndOOOChunkReader) Close() error { - if cr.cr != nil && cr.cr.isoState != nil { - cr.cr.isoState.Close() - } - if cr.oooIsoState != nil { - cr.oooIsoState.Close() - } - return nil -} From 9135da1e4f24850008493c3b27f866123c761bdb Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 12 Aug 2024 17:14:41 +0100 Subject: [PATCH 16/17] TSDB: Review feedback Signed-off-by: Bryan Boreham * Re-enable check in `createHeadWithOOOSamples` which wasn't really broken. * Move code making `Block` into a `Queryable` into test file. * Make `getSeriesChunks` return a slice (renamed `appendSeriesChunks`). * Rename `oooMergedChunks` to `mergedChunks`. * Improve comment on `ChunkOrIterableWithCopy`. * Name return values from unpackHeadChunkRef. Co-authored-by: Oleg Zaytsev Signed-off-by: Bryan Boreham --- tsdb/block.go | 5 ----- tsdb/head_read.go | 19 +++++++++---------- tsdb/ooo_head_read.go | 9 +++++---- tsdb/querier_bench_test.go | 9 ++++++++- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/tsdb/block.go b/tsdb/block.go index c55e22ce5..2f32733f8 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -467,11 +467,6 @@ func (pb *Block) setCompactionFailed() error { return nil } -// Querier implements Queryable. -func (pb *Block) Querier(mint, maxt int64) (storage.Querier, error) { - return NewBlockQuerier(pb, mint, maxt) -} - type blockIndexReader struct { ir IndexReader b *Block diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 977d6b978..47f12df99 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -199,19 +199,18 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB defer s.Unlock() *chks = (*chks)[:0] - - getSeriesChunks(s, h.mint, h.maxt, chks) + *chks = appendSeriesChunks(s, h.mint, h.maxt, *chks) return nil } -func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) { +func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. if !c.OverlapsClosedInterval(mint, maxt) { continue } - *chks = append(*chks, chunks.Meta{ + chks = append(chks, chunks.Meta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))), @@ -230,7 +229,7 @@ func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) { maxTime = chk.maxTime } if chk.OverlapsClosedInterval(mint, maxt) { - *chks = append(*chks, chunks.Meta{ + chks = append(chks, chunks.Meta{ MinTime: chk.minTime, MaxTime: maxTime, Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))), @@ -239,6 +238,7 @@ func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) { j++ } } + return chks } // headChunkID returns the HeadChunkID referred to by the given position. @@ -259,7 +259,7 @@ func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID { return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask } -func unpackHeadChunkRef(ref chunks.ChunkRef) (chunks.HeadSeriesRef, chunks.HeadChunkID, bool) { +func unpackHeadChunkRef(ref chunks.ChunkRef) (seriesID chunks.HeadSeriesRef, chunkID chunks.HeadChunkID, isOOO bool) { sid, cid := chunks.HeadChunkRef(ref).Unpack() return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0 } @@ -481,14 +481,14 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi return elem, true, offset == 0, nil } -// oooMergedChunks return an iterable over one or more OOO chunks for the given +// mergedChunks return an iterable over one or more OOO chunks for the given // chunks.Meta reference from memory or by m-mapping it from the disk. The // returned iterable will be a merge of all the overlapping chunks, if any, // amongst all the chunks in the OOOHead. // If hr is non-nil then in-order chunks are included. // This function is not thread safe unless the caller holds a lock. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { +func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) { _, cid, _ := unpackHeadChunkRef(meta.Ref) // ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are @@ -531,8 +531,7 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe } if hr != nil { // Include in-order chunks. - var metas []chunks.Meta - getSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), &metas) + metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil) for _, m := range metas { tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ meta: m, diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index aaaa24963..47e2efb86 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S if s.ooo != nil { return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) } - getSeriesChunks(s, oh.mint, oh.maxt, chks) + *chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks) return nil } @@ -127,7 +127,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap } if includeInOrder { - getSeriesChunks(s, mint, maxt, &tmpChks) + tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks) } // There is nothing to do if we did not collect any chunk. @@ -253,13 +253,14 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu } s.Lock() - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) + mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) s.Unlock() return nil, mc, err } -// Pass through special behaviour for current head chunk. +// ChunkOrIterableWithCopy: implements ChunkReaderWithCopy. The special Copy behaviour +// is only implemented for the in-order head chunk. func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { _, _, isOOO := unpackHeadChunkRef(meta.Ref) if !isOOO { diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index e3e457d07..43accc253 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -321,10 +321,17 @@ func BenchmarkQuerierSelect(b *testing.B) { require.NoError(b, block.Close()) }() - benchmarkSelect(b, block, numSeries, false) + benchmarkSelect(b, (*queryableBlock)(block), numSeries, false) }) } +// Type wrapper to let a Block be a Queryable in benchmarkSelect(). +type queryableBlock Block + +func (pb *queryableBlock) Querier(mint, maxt int64) (storage.Querier, error) { + return NewBlockQuerier((*Block)(pb), mint, maxt) +} + func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) { numSeries := 1000000 _, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) { From 512c67ec26e764e7adb4d2746ecf71d2222701f5 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 12 Aug 2024 18:49:00 +0100 Subject: [PATCH 17/17] TSDB: Never go over maximum number of OOO chunks In `mmapCurrentOOOHeadChunk`, check if the number is at the maximum and drop the data with an error log. This is not expected to happen as the maximum is over 8 million; that's 8 years of 1 sample every second. Signed-off-by: Bryan Boreham --- tsdb/head_append.go | 20 +++++++++++++------- tsdb/head_wal.go | 2 +- tsdb/ooo_head_read.go | 2 +- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 59681b8da..b66ac7278 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -19,6 +19,7 @@ import ( "fmt" "math" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/exemplar" @@ -936,7 +937,7 @@ func (a *headAppender) Commit() (err error) { // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax) + ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger) if chunkCreated { r, ok := oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1083,14 +1084,14 @@ func (a *headAppender) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger log.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } c := s.ooo.oooHeadChunk if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. - c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper) + c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger) chunkCreated = true } @@ -1444,9 +1445,9 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange } // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. -// The caller must ensure that s.ooo is not nil. -func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { - ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper) +// The caller must ensure that s is locked and s.ooo is not nil. +func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { + ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger) s.ooo.oooHeadChunk = &oooHeadChunk{ chunk: NewOOOChunk(), @@ -1457,7 +1458,8 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk return s.ooo.oooHeadChunk, ref } -func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef { +// s must be locked when calling. +func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) []chunks.ChunkDiskMapperRef { if s.ooo == nil || s.ooo.oooHeadChunk == nil { // OOO is not enabled or there is no head chunk, so nothing to m-map here. return nil @@ -1469,6 +1471,10 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap } chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1) for _, memchunk := range chks { + if len(s.ooo.oooMmappedChunks) >= (oooChunkIDMask - 1) { + level.Error(logger).Log("msg", "Too many OOO chunks, dropping data", "series", s.lset.String()) + break + } chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError) chunkRefs = append(chunkRefs, chunkRef) s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 85b0c656d..7397bbf41 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -890,7 +890,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { unknownRefs++ continue } - ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax) + ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 47e2efb86..55e241fd9 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -340,7 +340,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, } var lastMmapRef chunks.ChunkDiskMapperRef - mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) + mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger) if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 { // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}