Browse Source

[BUGFIX] TSDB: Exclude OOO chunks mapped after compaction starts

Otherwise the writer can end up with invalid chunks.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/14584/head
Bryan Boreham 4 months ago
parent
commit
015638c4b6
  1. 5
      tsdb/head_read.go
  2. 2
      tsdb/ooo_head.go
  3. 8
      tsdb/ooo_head_read.go
  4. 6
      tsdb/ooo_head_read_test.go

5
tsdb/head_read.go

@ -467,7 +467,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
// amongst all the chunks in the OOOHead.
// This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (*mergedOOOChunks, error) {
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) {
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
@ -490,6 +490,9 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1)
for i, c := range s.ooo.oooMmappedChunks {
if maxMmapRef != 0 && c.ref > maxMmapRef {
break
}
if c.OverlapsClosedInterval(mint, maxt) {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{

2
tsdb/ooo_head.go

@ -201,7 +201,7 @@ func (oh *OOORangeHead) Index() (IndexReader, error) {
}
func (oh *OOORangeHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState), nil
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState, 0), nil
}
func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) {

8
tsdb/ooo_head_read.go

@ -243,14 +243,16 @@ type OOOHeadChunkReader struct {
head *Head
mint, maxt int64
isoState *oooIsolationState
maxMmapRef chunks.ChunkDiskMapperRef
}
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState) *OOOHeadChunkReader {
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader {
return &OOOHeadChunkReader{
head: head,
mint: mint,
maxt: maxt,
isoState: isoState,
maxMmapRef: maxMmapRef,
}
}
@ -269,7 +271,7 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk,
s.Unlock()
return nil, nil, storage.ErrNotFound
}
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt)
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef)
s.Unlock()
if err != nil {
return nil, nil, err
@ -386,7 +388,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) {
}
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil), nil
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil
}
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) {

6
tsdb/ooo_head_read_test.go

@ -481,7 +481,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil)
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0)
defer cr.Close()
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
@ -839,7 +839,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil)
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])
@ -1013,7 +1013,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
}
require.NoError(t, app.Commit())
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil)
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])

Loading…
Cancel
Save