Browse Source

TSDB: Remove OOOHeadChunkReader

Use HeadAndOOOChunkReader instead.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/14354/head
Bryan Boreham 4 months ago
parent
commit
a299c7b6d6
  1. 55
      tsdb/ooo_head_read.go
  2. 8
      tsdb/ooo_head_read_test.go

55
tsdb/ooo_head_read.go

@ -248,59 +248,6 @@ func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values
} }
} }
type OOOHeadChunkReader struct {
head *Head
mint, maxt int64
isoState *oooIsolationState
maxMmapRef chunks.ChunkDiskMapperRef
}
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader {
return &OOOHeadChunkReader{
head: head,
mint: mint,
maxt: maxt,
isoState: isoState,
maxMmapRef: maxMmapRef,
}
}
func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
sid, _ := chunks.HeadChunkRef(meta.Ref).Unpack()
s := cr.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, nil, storage.ErrNotFound
}
s.Lock()
if s.ooo == nil {
// There is no OOO data for this series.
s.Unlock()
return nil, nil, storage.ErrNotFound
}
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, nil, cr.mint, cr.maxt, cr.maxMmapRef)
s.Unlock()
if err != nil {
return nil, nil, err
}
// This means that the query range did not overlap with the requested chunk.
if len(mc.chunkIterables) == 0 {
return nil, nil, storage.ErrNotFound
}
return nil, mc, nil
}
func (cr OOOHeadChunkReader) Close() error {
if cr.isoState != nil {
cr.isoState.Close()
}
return nil
}
type OOOCompactionHead struct { type OOOCompactionHead struct {
oooIR *OOOHeadIndexReader oooIR *OOOHeadIndexReader
lastMmapRef chunks.ChunkDiskMapperRef lastMmapRef chunks.ChunkDiskMapperRef
@ -397,7 +344,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) {
} }
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) { func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil return NewHeadAndOOOChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, nil, ch.lastMmapRef), nil
} }
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) { func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) {

8
tsdb/ooo_head_read_test.go

@ -481,10 +481,10 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) { t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
db := newTestDBWithOpts(t, opts) db := newTestDBWithOpts(t, opts)
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0) cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
defer cr.Close() defer cr.Close()
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
}) })
require.Nil(t, iterable) require.Nil(t, iterable)
require.Equal(t, err, fmt.Errorf("not found")) require.Equal(t, err, fmt.Errorf("not found"))
@ -839,7 +839,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks)) require.Equal(t, len(tc.expChunksSamples), len(chks))
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
defer cr.Close() defer cr.Close()
for i := 0; i < len(chks); i++ { for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i]) c, iterable, err := cr.ChunkOrIterable(chks[i])
@ -1013,7 +1013,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
defer cr.Close() defer cr.Close()
for i := 0; i < len(chks); i++ { for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i]) c, iterable, err := cr.ChunkOrIterable(chks[i])

Loading…
Cancel
Save