diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 9ba8785ad..87564ae3c 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -467,7 +467,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi // amongst all the chunks in the OOOHead. // This function is not thread safe unless the caller holds a lock. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (*mergedOOOChunks, error) { +func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { _, cid := chunks.HeadChunkRef(meta.Ref).Unpack() // ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are @@ -490,6 +490,9 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1) for i, c := range s.ooo.oooMmappedChunks { + if maxMmapRef != 0 && c.ref > maxMmapRef { + break + } if c.OverlapsClosedInterval(mint, maxt) { tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ meta: chunks.Meta{ diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index b2556d62e..01b5bff63 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -201,7 +201,7 @@ func (oh *OOORangeHead) Index() (IndexReader, error) { } func (oh *OOORangeHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState), nil + return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState, 0), nil } func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index a35276af5..9d5b9d644 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -243,14 +243,16 @@ type OOOHeadChunkReader struct { head *Head mint, maxt int64 isoState *oooIsolationState + maxMmapRef chunks.ChunkDiskMapperRef } -func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState) *OOOHeadChunkReader { +func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader { return &OOOHeadChunkReader{ - head: head, - mint: mint, - maxt: maxt, - isoState: isoState, + head: head, + mint: mint, + maxt: maxt, + isoState: isoState, + maxMmapRef: maxMmapRef, } } @@ -269,7 +271,7 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, s.Unlock() return nil, nil, storage.ErrNotFound } - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt) + mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef) s.Unlock() if err != nil { return nil, nil, err @@ -386,7 +388,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) { } func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil), nil + return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil } func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 7ecd355b5..8cc3f1dde 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -481,7 +481,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) { db := newTestDBWithOpts(t, opts) - cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil) + cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, @@ -839,7 +839,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { require.NoError(t, err) require.Equal(t, len(tc.expChunksSamples), len(chks)) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil) + cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) @@ -1013,7 +1013,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( } require.NoError(t, app.Commit()) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil) + cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i])