From f26907785547784901126195536e13ad3021cfdd Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Fri, 10 Feb 2023 12:52:12 +0100 Subject: [PATCH 1/2] Protect NewOOOCompactionHead from an unitialized wbl Signed-off-by: Jesus Vazquez --- tsdb/ooo_head_read.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 9feb6bc6f..0552c6053 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -276,16 +276,18 @@ type OOOCompactionHead struct { // All the above together have a bit of CPU and memory overhead, and can have a bit of impact // on the sample append latency. So call NewOOOCompactionHead only right before compaction. func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) { - newWBLFile, err := head.wbl.NextSegmentSync() - if err != nil { - return nil, err - } - ch := &OOOCompactionHead{ chunkRange: head.chunkRange.Load(), mint: math.MaxInt64, maxt: math.MinInt64, - lastWBLFile: newWBLFile, + lastWBLFile: 0, + } + if head.wbl != nil { + lastWBLFile, err := head.wbl.NextSegmentSync() + if err != nil { + return nil, err + } + ch.lastWBLFile = lastWBLFile } ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64) From 5c3f058755f4947da4bd07564f359bb18437062a Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Fri, 10 Feb 2023 15:18:15 +0100 Subject: [PATCH 2/2] Add unit test and also protect truncateOOO Signed-off-by: Jesus Vazquez --- tsdb/db_test.go | 100 ++++++++++++++++++++++++++++++++++++++++++ tsdb/head.go | 4 ++ tsdb/ooo_head_read.go | 8 ++-- 3 files changed, 108 insertions(+), 4 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 9e5623bea..cc65069e4 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4333,6 +4333,106 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { verifySamples(db.Blocks()[1], 250, 350) } +// TestOOOCompactionWithDisabledWriteLog tests the scenario where the TSDB is +// configured to not have wal and wbl but its able to compact both the in-order +// and out-of-order head +func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { + dir := t.TempDir() + + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds() + opts.WALSegmentSize = -1 // disabled WAL and WBL + + db, err := Open(dir, nil, nil, opts, nil) + require.NoError(t, err) + db.DisableCompactions() // We want to manually call it. + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + series1 := labels.FromStrings("foo", "bar1") + series2 := labels.FromStrings("foo", "bar2") + + addSamples := func(fromMins, toMins int64) { + app := db.Appender(context.Background()) + for min := fromMins; min <= toMins; min++ { + ts := min * time.Minute.Milliseconds() + _, err := app.Append(0, series1, ts, float64(ts)) + require.NoError(t, err) + _, err = app.Append(0, series2, ts, float64(2*ts)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + // Add an in-order samples. + addSamples(250, 350) + + // Add ooo samples that will result into a single block. + addSamples(90, 110) + + // Checking that ooo chunk is not empty. + for _, lbls := range []labels.Labels{series1, series2} { + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0) + } + + // If the normal Head is not compacted, the OOO head compaction does not take place. + require.NoError(t, db.Compact()) + require.Equal(t, len(db.Blocks()), 0) + + // Add more in-order samples in future that would trigger the compaction. + addSamples(400, 450) + + // No blocks before compaction. + require.Equal(t, len(db.Blocks()), 0) + + // Compacts normal and OOO head. + require.NoError(t, db.Compact()) + + // 2 blocks exist now. [0, 120), [250, 360) + require.Equal(t, len(db.Blocks()), 2) + require.Equal(t, int64(0), db.Blocks()[0].MinTime()) + require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime()) + require.Equal(t, 250*time.Minute.Milliseconds(), db.Blocks()[1].MinTime()) + require.Equal(t, 360*time.Minute.Milliseconds(), db.Blocks()[1].MaxTime()) + + // Checking that ooo chunk is empty. + for _, lbls := range []labels.Labels{series1, series2} { + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.Nil(t, ms.ooo) + } + + verifySamples := func(block *Block, fromMins, toMins int64) { + series1Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1) + series2Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1) + for min := fromMins; min <= toMins; min++ { + ts := min * time.Minute.Milliseconds() + series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) + series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) + } + expRes := map[string][]tsdbutil.Sample{ + series1.String(): series1Samples, + series2.String(): series2Samples, + } + + q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + require.Equal(t, expRes, actRes) + } + + // Checking for expected data in the blocks. + verifySamples(db.Blocks()[0], 90, 110) + verifySamples(db.Blocks()[1], 250, 350) +} + func Test_Querier_OOOQuery(t *testing.T) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 diff --git a/tsdb/head.go b/tsdb/head.go index 728b3c9d3..2bcba146f 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1207,6 +1207,10 @@ func (h *Head) truncateOOO(lastWBLFile int, minOOOMmapRef chunks.ChunkDiskMapper } } + if h.wbl == nil { + return nil + } + return h.wbl.Truncate(lastWBLFile) } diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 0552c6053..86d0e3b7b 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -277,11 +277,11 @@ type OOOCompactionHead struct { // on the sample append latency. So call NewOOOCompactionHead only right before compaction. func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) { ch := &OOOCompactionHead{ - chunkRange: head.chunkRange.Load(), - mint: math.MaxInt64, - maxt: math.MinInt64, - lastWBLFile: 0, + chunkRange: head.chunkRange.Load(), + mint: math.MaxInt64, + maxt: math.MinInt64, } + if head.wbl != nil { lastWBLFile, err := head.wbl.NextSegmentSync() if err != nil {