Browse Source

[BUGFIX] TSDB: Only query chunks up to truncation time (#14948)

If the query overlaps the range currently undergoing compaction, we
should only fetch chunks up to that time. Need to store that min time
in `HeadAndOOOIndexReader`.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/14951/head
Bryan Boreham 2 months ago
parent
commit
6f0d6038b7
  1. 8
      tsdb/db.go
  2. 23
      tsdb/ooo_head_read.go
  3. 8
      tsdb/ooo_head_read_test.go
  4. 2
      tsdb/querier_test.go

8
tsdb/db.go

@ -2043,6 +2043,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
var headQuerier storage.Querier var headQuerier storage.Querier
inoMint := mint
if maxt >= db.head.MinTime() || overlapsOOO { if maxt >= db.head.MinTime() || overlapsOOO {
rh := NewRangeHead(db.head, mint, maxt) rh := NewRangeHead(db.head, mint, maxt)
var err error var err error
@ -2067,13 +2068,14 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err) return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
} }
inoMint = newMint
} }
} }
if overlapsOOO { if overlapsOOO {
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier) headQuerier = NewHeadAndOOOQuerier(inoMint, mint, maxt, db.head, isoState, headQuerier)
} }
if headQuerier != nil { if headQuerier != nil {
@ -2119,6 +2121,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
var headQuerier storage.ChunkQuerier var headQuerier storage.ChunkQuerier
inoMint := mint
if maxt >= db.head.MinTime() || overlapsOOO { if maxt >= db.head.MinTime() || overlapsOOO {
rh := NewRangeHead(db.head, mint, maxt) rh := NewRangeHead(db.head, mint, maxt)
headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt)
@ -2142,13 +2145,14 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
if err != nil { if err != nil {
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err) return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
} }
inoMint = newMint
} }
} }
if overlapsOOO { if overlapsOOO {
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier) headQuerier = NewHeadAndOOOChunkQuerier(inoMint, mint, maxt, db.head, isoState, headQuerier)
} }
if headQuerier != nil { if headQuerier != nil {

23
tsdb/ooo_head_read.go

@ -35,6 +35,7 @@ var _ IndexReader = &HeadAndOOOIndexReader{}
type HeadAndOOOIndexReader struct { type HeadAndOOOIndexReader struct {
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
inoMint int64
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
} }
@ -49,13 +50,13 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator
return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables) return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
} }
func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader { func NewHeadAndOOOIndexReader(head *Head, inoMint, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
hr := &headIndexReader{ hr := &headIndexReader{
head: head, head: head,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
} }
return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef} return &HeadAndOOOIndexReader{hr, inoMint, lastGarbageCollectedMmapRef}
} }
func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
@ -76,9 +77,9 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
*chks = (*chks)[:0] *chks = (*chks)[:0]
if s.ooo != nil { if s.ooo != nil {
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
} }
*chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks) *chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
return nil return nil
} }
@ -87,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
// //
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then // maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered. // the oooHeadChunk will not be considered.
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error { func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
@ -128,7 +129,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
} }
if includeInOrder { if includeInOrder {
tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks) tmpChks = appendSeriesChunks(s, inoMint, maxt, tmpChks)
} }
// There is nothing to do if we did not collect any chunk. // There is nothing to do if we did not collect any chunk.
@ -476,7 +477,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
return nil return nil
} }
return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, chks) return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
} }
func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
@ -516,7 +517,7 @@ type HeadAndOOOQuerier struct {
querier storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end. querier storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end.
} }
func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { func NewHeadAndOOOQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
cr := &headChunkReader{ cr := &headChunkReader{
head: head, head: head,
mint: mint, mint: mint,
@ -527,7 +528,7 @@ func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolatio
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
head: head, head: head,
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), index: NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef),
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
querier: querier, querier: querier,
} }
@ -568,7 +569,7 @@ type HeadAndOOOChunkQuerier struct {
querier storage.ChunkQuerier querier storage.ChunkQuerier
} }
func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier { func NewHeadAndOOOChunkQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
cr := &headChunkReader{ cr := &headChunkReader{
head: head, head: head,
mint: mint, mint: mint,
@ -579,7 +580,7 @@ func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIso
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
head: head, head: head,
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), index: NewHeadAndOOOIndexReader(head, inoMint, mint, maxt, oooIsoState.minRef),
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
querier: querier, querier: querier,
} }

8
tsdb/ooo_head_read_test.go

@ -360,7 +360,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
}) })
} }
ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0) ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta var chks []chunks.Meta
var b labels.ScratchBuilder var b labels.ScratchBuilder
@ -450,7 +450,7 @@ func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenari
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// We first want to test using a head index reader that covers the biggest query interval // We first want to test using a head index reader that covers the biggest query interval
oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0) oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
values, err := oh.LabelValues(ctx, "foo", matchers...) values, err := oh.LabelValues(ctx, "foo", matchers...)
sort.Strings(values) sort.Strings(values)
@ -854,7 +854,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
// The Series method populates the chunk metas, taking a copy of the // The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader. // head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta var chks []chunks.Meta
var b labels.ScratchBuilder var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks) err = ir.Series(s1Ref, &b, &chks)
@ -1023,7 +1023,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
// The Series method populates the chunk metas, taking a copy of the // The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader. // head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta var chks []chunks.Meta
var b labels.ScratchBuilder var b labels.ScratchBuilder
err = ir.Series(s1Ref, &b, &chks) err = ir.Series(s1Ref, &b, &chks)

2
tsdb/querier_test.go

@ -3170,7 +3170,7 @@ func BenchmarkQueries(b *testing.B) {
qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples) qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
require.NoError(b, err) require.NoError(b, err)
isoState := head.oooIso.TrackReadAfter(0) isoState := head.oooIso.TrackReadAfter(0)
qOOOHead := NewHeadAndOOOQuerier(1, nSamples, head, isoState, qHead) qOOOHead := NewHeadAndOOOQuerier(1, 1, nSamples, head, isoState, qHead)
queryTypes = append(queryTypes, qt{ queryTypes = append(queryTypes, qt{
fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead, fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead,

Loading…
Cancel
Save