diff --git a/tsdb/db.go b/tsdb/db.go index 1c430c211..bf1893ec0 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -2029,7 +2029,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } } - blockQueriers := make([]storage.Querier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers + blockQueriers := make([]storage.Querier, 0, len(blocks)+1) // +1 to allow for possible head querier. defer func() { if err != nil { @@ -2041,10 +2041,11 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } }() + var headQuerier storage.Querier if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) var err error - inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head %s: %w", rh, err) } @@ -2054,36 +2055,40 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { // won't run into a race later since any truncation that comes after will wait on this querier if it overlaps. shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt) if shouldClose { - if err := inOrderHeadQuerier.Close(); err != nil { + if err := headQuerier.Close(); err != nil { return nil, fmt.Errorf("closing head block querier %s: %w", rh, err) } - inOrderHeadQuerier = nil + headQuerier = nil } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) + headQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err) } } - - if inOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, inOrderHeadQuerier) - } } - if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { + if headQuerier != nil { + if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { + // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. + isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) + headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier) + } + } else if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) var err error - outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockQuerierFunc(rh, mint, maxt) if err != nil { // If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead. rh.isoState.Close() return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err) } + } - blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) + if headQuerier != nil { + blockQueriers = append(blockQueriers, headQuerier) } for _, b := range blocks { @@ -2111,7 +2116,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } } - blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers + blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+1) // +1 to allow for possible head querier. defer func() { if err != nil { @@ -2123,9 +2128,10 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } }() + var headQuerier storage.ChunkQuerier if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) - inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head %s: %w", rh, err) } @@ -2135,35 +2141,39 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer // won't run into a race later since any truncation that comes after will wait on this querier if it overlaps. shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt) if shouldClose { - if err := inOrderHeadQuerier.Close(); err != nil { + if err := headQuerier.Close(); err != nil { return nil, fmt.Errorf("closing head querier %s: %w", rh, err) } - inOrderHeadQuerier = nil + headQuerier = nil } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) + headQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err) } } - - if inOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, inOrderHeadQuerier) - } } - if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { + if headQuerier != nil { + if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { + // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. + isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) + headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier) + } + } else if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. rh.isoState.Close() return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err) } + } - blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) + if headQuerier != nil { + blockQueriers = append(blockQueriers, headQuerier) } for _, b := range blocks { diff --git a/tsdb/head_read.go b/tsdb/head_read.go index d75d28a58..977d6b978 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -248,12 +248,20 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID { return chunks.HeadChunkID(pos) + s.firstChunkID } +const oooChunkIDMask = 1 << 23 + // oooHeadChunkID returns the HeadChunkID referred to by the given position. +// Only the bottom 24 bits are used. Bit 23 is always 1 for an OOO chunk; for the rest: // * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos] // * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk // The caller must ensure that s.ooo is not nil. func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID { - return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID + return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask +} + +func unpackHeadChunkRef(ref chunks.ChunkRef) (chunks.HeadSeriesRef, chunks.HeadChunkID, bool) { + sid, cid := chunks.HeadChunkRef(ref).Unpack() + return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0 } // LabelValueFor returns label value for the given label name in the series referred to by ID. @@ -343,10 +351,15 @@ func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chu return chk, nil, err } -// ChunkWithCopy returns the chunk for the reference number. -// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk. -func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) { - return h.chunk(meta, true) +type ChunkReaderWithCopy interface { + ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) +} + +// ChunkOrIterableWithCopy returns the chunk for the reference number. +// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk. +func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + chk, maxTime, err := h.chunk(meta, true) + return chk, nil, maxTime, err } // chunk returns the chunk for the reference number. @@ -472,10 +485,11 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi // chunks.Meta reference from memory or by m-mapping it from the disk. The // returned iterable will be a merge of all the overlapping chunks, if any, // amongst all the chunks in the OOOHead. +// If hr is non-nil then in-order chunks are included. // This function is not thread safe unless the caller holds a lock. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { - _, cid := chunks.HeadChunkRef(meta.Ref).Unpack() +func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { + _, cid, _ := unpackHeadChunkRef(meta.Ref) // ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are // incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index. @@ -516,6 +530,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta}) } + if hr != nil { // Include in-order chunks. + var metas []chunks.Meta + getSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), &metas) + for _, m := range metas { + tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ + meta: m, + ref: 0, // This tells the loop below it's an in-order head chunk. + }) + } + } + // Next we want to sort all the collected chunks by min time so we can find // those that overlap and stop when we know the rest don't. slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef) @@ -527,9 +552,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe continue } var iterable chunkenc.Iterable - if c.meta.Chunk != nil { + switch { + case c.meta.Chunk != nil: iterable = c.meta.Chunk - } else { + case c.ref == 0: // This is an in-order head chunk. + _, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack() + var err error + iterable, _, err = hr.chunkFromSeries(s, cid, false) + if err != nil { + return nil, fmt.Errorf("invalid head chunk: %w", err) + } + default: chk, err := cdm.Chunk(c.ref) if err != nil { var cerr *chunks.CorruptionErr diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 892d2c4b6..b7944c56e 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" ) var _ IndexReader = &OOOHeadIndexReader{} @@ -92,10 +93,10 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra return nil } - return getOOOSeriesChunks(s, oh.mint, oh.maxt, lastGarbageCollectedMmapRef, maxMmapRef, chks) + return getOOOSeriesChunks(s, oh.mint, oh.maxt, lastGarbageCollectedMmapRef, maxMmapRef, false, chks) } -func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, chks *[]chunks.Meta) error { +func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -135,6 +136,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap } } + if includeInOrder { + getSeriesChunks(s, mint, maxt, &tmpChks) + } + // There is nothing to do if we did not collect any chunk. if len(tmpChks) == 0 { return nil @@ -275,7 +280,7 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, s.Unlock() return nil, nil, storage.ErrNotFound } - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef) + mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, nil, cr.mint, cr.maxt, cr.maxMmapRef) s.Unlock() if err != nil { return nil, nil, err @@ -498,3 +503,174 @@ func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, posti func (ir *OOOCompactionHeadIndexReader) Close() error { return ir.ch.oooIR.Close() } + +// HeadAndOOOQuerier queries both the head and the out-of-order head. +type HeadAndOOOQuerier struct { + mint, maxt int64 + head *Head + index IndexReader + chunkr ChunkReader + querier storage.Querier +} + +func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { + isoState := head.iso.State(mint, maxt) + return &HeadAndOOOQuerier{ + mint: mint, + maxt: maxt, + head: head, + index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), + chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0), + querier: querier, + } +} + +func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelValues(ctx, name, hints, matchers...) +} + +func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelNames(ctx, hints, matchers...) +} + +func (q *HeadAndOOOQuerier) Close() error { + q.chunkr.Close() + return q.querier.Close() +} + +func (q *HeadAndOOOQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return selectSeriesSet(ctx, sortSeries, hints, matchers, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt) +} + +// HeadAndOOOChunkQuerier queries both the head and the out-of-order head. +type HeadAndOOOChunkQuerier struct { + mint, maxt int64 + head *Head + index IndexReader + chunkr ChunkReader + querier storage.ChunkQuerier +} + +func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier { + isoState := head.iso.State(mint, maxt) + return &HeadAndOOOChunkQuerier{ + mint: mint, + maxt: maxt, + head: head, + index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), + chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0), + querier: querier, + } +} + +func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelValues(ctx, name, hints, matchers...) +} + +func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelNames(ctx, hints, matchers...) +} + +func (q *HeadAndOOOChunkQuerier) Close() error { + q.chunkr.Close() + return q.querier.Close() +} + +func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt) +} + +type HeadAndOOOIndexReader struct { + *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. + lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef +} + +func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader { + hr := &headIndexReader{ + head: head, + mint: mint, + maxt: maxt, + } + return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef} +} + +func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { + s := oh.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s == nil { + oh.head.metrics.seriesNotFound.Inc() + return storage.ErrNotFound + } + builder.Assign(s.lset) + + if chks == nil { + return nil + } + + s.Lock() + defer s.Unlock() + *chks = (*chks)[:0] + + if s.ooo != nil { + return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) + } + getSeriesChunks(s, oh.mint, oh.maxt, chks) + return nil +} + +type HeadAndOOOChunkReader struct { + cr headChunkReader + maxMmapRef chunks.ChunkDiskMapperRef + oooIsoState *oooIsolationState +} + +func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, isoState *isolationState, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { + return &HeadAndOOOChunkReader{ + cr: headChunkReader{ + head: head, + mint: mint, + maxt: maxt, + isoState: isoState, + }, + maxMmapRef: maxMmapRef, + oooIsoState: oooIsoState, + } +} + +func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { + sid, _, isOOO := unpackHeadChunkRef(meta.Ref) + if !isOOO { + return cr.cr.ChunkOrIterable(meta) + } + + s := cr.cr.head.series.getByID(sid) + // This means that the series has been garbage collected. + if s == nil { + return nil, nil, storage.ErrNotFound + } + + s.Lock() + mc, err := s.oooMergedChunks(meta, cr.cr.head.chunkDiskMapper, &cr.cr, cr.cr.mint, cr.cr.maxt, cr.maxMmapRef) + s.Unlock() + + return nil, mc, err +} + +// Pass through special behaviour for current head chunk. +func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + _, _, isOOO := unpackHeadChunkRef(meta.Ref) + if !isOOO { + return cr.cr.ChunkOrIterableWithCopy(meta) + } + chk, iter, err := cr.ChunkOrIterable(meta) + return chk, iter, 0, err +} + +func (cr *HeadAndOOOChunkReader) Close() error { + if cr.cr.isoState != nil { + cr.cr.isoState.Close() + } + if cr.oooIsoState != nil { + cr.oooIsoState.Close() + } + return nil +} diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 8cc3f1dde..08c5c4a3e 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -316,7 +316,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Ref to whatever Ref the chunk has, that we refer to by ID for ref, c := range intervals { if c.ID == e.ID { - meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), chunks.HeadChunkID(ref))) + meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref))) break } } diff --git a/tsdb/querier.go b/tsdb/querier.go index 37456d7e2..2e15f0b08 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -641,14 +641,16 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { } } - hcr, ok := p.cr.(*headChunkReader) + hcr, ok := p.cr.(ChunkReaderWithCopy) var iterable chunkenc.Iterable if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 { - // ChunkWithCopy will copy the head chunk. + // ChunkOrIterableWithCopy will copy the head chunk, if it can. var maxt int64 - p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta) - // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. - p.currMeta.MaxTime = maxt + p.currMeta.Chunk, iterable, maxt, p.err = hcr.ChunkOrIterableWithCopy(p.currMeta) + if p.currMeta.Chunk != nil { + // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. + p.currMeta.MaxTime = maxt + } } else { p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta) }