Merge pull request #14584 from bboreham/ooo-head-lastmm

[BUGFIX] TSDB: Exclude OOO chunks mapped after compaction starts
pull/14593/head
Bryan Boreham 2024-08-05 16:04:27 +01:00 committed by GitHub
commit 13222fa21b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 55 additions and 26 deletions

View File

@ -1295,6 +1295,9 @@ func (db *DB) CompactOOOHead(ctx context.Context) error {
return db.compactOOOHead(ctx)
}
// Callback for testing.
var compactOOOHeadTestingCallback func()
func (db *DB) compactOOOHead(ctx context.Context) error {
if !db.oooWasEnabled.Load() {
return nil
@ -1304,6 +1307,11 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
return fmt.Errorf("get ooo compaction head: %w", err)
}
if compactOOOHeadTestingCallback != nil {
compactOOOHeadTestingCallback()
compactOOOHeadTestingCallback = nil
}
ulids, err := db.compactOOO(db.dir, oooHead)
if err != nil {
return fmt.Errorf("compact ooo head: %w", err)

View File

@ -4497,12 +4497,15 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
func TestOOOCompaction(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOCompaction(t, scenario)
testOOOCompaction(t, scenario, false)
})
t.Run(name+"+extra", func(t *testing.T) {
testOOOCompaction(t, scenario, true)
})
}
}
func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSamples bool) {
dir := t.TempDir()
ctx := context.Background()
@ -4533,7 +4536,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
}
// Add an in-order samples.
addSample(250, 350)
addSample(250, 300)
// Verify that the in-memory ooo chunk is empty.
checkEmptyOOOChunk := func(lbls labels.Labels) {
@ -4547,15 +4550,17 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
// Add ooo samples that creates multiple chunks.
// 90 to 300 spans across 3 block ranges: [0, 120), [120, 240), [240, 360)
addSample(90, 310)
addSample(90, 300)
// Adding same samples to create overlapping chunks.
// Since the active chunk won't start at 90 again, all the new
// chunks will have different time ranges than the previous chunks.
addSample(90, 310)
addSample(90, 300)
var highest int64 = 300
verifyDBSamples := func() {
var series1Samples, series2Samples []chunks.Sample
for _, r := range [][2]int64{{90, 119}, {120, 239}, {240, 350}} {
for _, r := range [][2]int64{{90, 119}, {120, 239}, {240, highest}} {
fromMins, toMins := r[0], r[1]
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
@ -4583,7 +4588,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, err)
require.False(t, created)
require.Positive(t, ms.ooo.oooHeadChunk.chunk.NumSamples())
require.Len(t, ms.ooo.oooMmappedChunks, 14) // 7 original, 7 duplicate.
require.Len(t, ms.ooo.oooMmappedChunks, 13) // 7 original, 6 duplicate.
}
checkNonEmptyOOOChunk(series1)
checkNonEmptyOOOChunk(series2)
@ -4601,6 +4606,15 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, err)
require.Greater(t, f.Size(), int64(100))
if addExtraSamples {
compactOOOHeadTestingCallback = func() {
addSample(90, 120) // Back in time, to generate a new OOO chunk.
addSample(300, 330) // Now some samples after the previous highest timestamp.
addSample(300, 330) // Repeat to generate an OOO chunk at these timestamps.
}
highest = 330
}
// OOO compaction happens here.
require.NoError(t, db.CompactOOOHead(ctx))
@ -4616,11 +4630,13 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
require.Equal(t, "00000001", files[0].Name())
f, err = files[0].Info()
require.NoError(t, err)
require.Equal(t, int64(0), f.Size())
// OOO stuff should not be present in the Head now.
checkEmptyOOOChunk(series1)
checkEmptyOOOChunk(series2)
if !addExtraSamples {
require.Equal(t, int64(0), f.Size())
// OOO stuff should not be present in the Head now.
checkEmptyOOOChunk(series1)
checkEmptyOOOChunk(series2)
}
verifySamples := func(block *Block, fromMins, toMins int64) {
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
@ -4645,7 +4661,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
// Checking for expected data in the blocks.
verifySamples(db.Blocks()[0], 90, 119)
verifySamples(db.Blocks()[1], 120, 239)
verifySamples(db.Blocks()[2], 240, 310)
verifySamples(db.Blocks()[2], 240, 299)
// There should be a single m-map file.
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
@ -4658,7 +4674,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
err = db.CompactHead(NewRangeHead(db.head, 250*time.Minute.Milliseconds(), 350*time.Minute.Milliseconds()))
require.NoError(t, err)
require.Len(t, db.Blocks(), 4) // [0, 120), [120, 240), [240, 360), [250, 351)
verifySamples(db.Blocks()[3], 250, 350)
verifySamples(db.Blocks()[3], 250, highest)
verifyDBSamples() // Blocks created out of normal and OOO head now. But not merged.
@ -4675,7 +4691,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
require.Len(t, db.Blocks(), 3) // [0, 120), [120, 240), [240, 360)
verifySamples(db.Blocks()[0], 90, 119)
verifySamples(db.Blocks()[1], 120, 239)
verifySamples(db.Blocks()[2], 240, 350) // Merged block.
verifySamples(db.Blocks()[2], 240, highest) // Merged block.
verifyDBSamples() // Final state. Blocks from normal and OOO head are merged.
}

View File

@ -467,7 +467,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
// amongst all the chunks in the OOOHead.
// This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (*mergedOOOChunks, error) {
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) {
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
@ -490,6 +490,9 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1)
for i, c := range s.ooo.oooMmappedChunks {
if maxMmapRef != 0 && c.ref > maxMmapRef {
break
}
if c.OverlapsClosedInterval(mint, maxt) {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{

View File

@ -201,7 +201,7 @@ func (oh *OOORangeHead) Index() (IndexReader, error) {
}
func (oh *OOORangeHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState), nil
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState, 0), nil
}
func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) {

View File

@ -243,14 +243,16 @@ type OOOHeadChunkReader struct {
head *Head
mint, maxt int64
isoState *oooIsolationState
maxMmapRef chunks.ChunkDiskMapperRef
}
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState) *OOOHeadChunkReader {
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader {
return &OOOHeadChunkReader{
head: head,
mint: mint,
maxt: maxt,
isoState: isoState,
head: head,
mint: mint,
maxt: maxt,
isoState: isoState,
maxMmapRef: maxMmapRef,
}
}
@ -269,7 +271,7 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk,
s.Unlock()
return nil, nil, storage.ErrNotFound
}
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt)
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef)
s.Unlock()
if err != nil {
return nil, nil, err
@ -386,7 +388,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) {
}
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) {
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil), nil
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil
}
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) {

View File

@ -481,7 +481,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil)
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0)
defer cr.Close()
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
@ -839,7 +839,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil)
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])
@ -1013,7 +1013,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
}
require.NoError(t, app.Commit())
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil)
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, iterable, err := cr.ChunkOrIterable(chks[i])