From 536d9f9ce989b670974e94b5db12f7bee0276e8e Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Thu, 5 Sep 2024 18:17:42 +0200 Subject: [PATCH] BUGFIX: TSDB: panic in query during truncation with OOO head (#14831) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Check if headQuerier is nil before trying to use it. * TestQueryOOOHeadDuringTruncate: unit test to check query during truncate Regression test for #14822 * Simulate race between query and Compact() Signed-off-by: György Krajcsovits --- tsdb/head.go | 5 +++ tsdb/head_test.go | 87 +++++++++++++++++++++++++++++++++++++++++++ tsdb/ooo_head_read.go | 11 +++++- 3 files changed, 102 insertions(+), 1 deletion(-) diff --git a/tsdb/head.go b/tsdb/head.go index b7bfaa0fd..4ff7aab63 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -128,6 +128,7 @@ type Head struct { writeNotified wlog.WriteNotified memTruncationInProcess atomic.Bool + memTruncationCallBack func() // For testing purposes. } type ExemplarStorage interface { @@ -1129,6 +1130,10 @@ func (h *Head) truncateMemory(mint int64) (err error) { h.memTruncationInProcess.Store(true) defer h.memTruncationInProcess.Store(false) + if h.memTruncationCallBack != nil { + h.memTruncationCallBack() + } + // We wait for pending queries to end that overlap with this truncation. if initialized { h.WaitForPendingReadersInTimeRange(h.MinTime(), mint) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 18ec4f0ac..c338ddadd 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3492,6 +3492,93 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) { } } +func TestQueryOOOHeadDuringTruncate(t *testing.T) { + const maxT int64 = 6000 + + dir := t.TempDir() + opts := DefaultOptions() + opts.EnableNativeHistograms = true + opts.OutOfOrderTimeWindow = maxT + opts.MinBlockDuration = maxT / 2 // So that head will compact up to 3000. + + db, err := Open(dir, nil, nil, opts, nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + db.DisableCompactions() + + var ( + ref = storage.SeriesRef(0) + app = db.Appender(context.Background()) + ) + // Add in-order samples at every 100ms starting at 0ms. + for i := int64(0); i < maxT; i += 100 { + _, err := app.Append(ref, labels.FromStrings("a", "b"), i, 0) + require.NoError(t, err) + } + // Add out-of-order samples at every 100ms starting at 50ms. + for i := int64(50); i < maxT; i += 100 { + _, err := app.Append(ref, labels.FromStrings("a", "b"), i, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + requireEqualOOOSamples(t, int(maxT/100-1), db) + + // Synchronization points. + allowQueryToStart := make(chan struct{}) + queryStarted := make(chan struct{}) + compactionFinished := make(chan struct{}) + + db.head.memTruncationCallBack = func() { + // Compaction has started, let the query start and wait for it to actually start to simulate race condition. + allowQueryToStart <- struct{}{} + <-queryStarted + } + + go func() { + db.Compact(context.Background()) // Compact and write blocks up to 3000 (maxtT/2). + compactionFinished <- struct{}{} + }() + + // Wait for the compaction to start. + <-allowQueryToStart + + q, err := db.Querier(1500, 2500) + require.NoError(t, err) + queryStarted <- struct{}{} // Unblock the compaction. + ctx := context.Background() + + // Label names. + res, annots, err := q.LabelNames(ctx, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.NoError(t, err) + require.Empty(t, annots) + require.Equal(t, []string{"a"}, res) + + // Label values. + res, annots, err = q.LabelValues(ctx, "a", nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.NoError(t, err) + require.Empty(t, annots) + require.Equal(t, []string{"b"}, res) + + // Samples + ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) // One series. + it := s.Iterator(nil) + require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data. + require.Equal(t, int64(1500), it.AtT()) // It is an in-order sample. + require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data. + require.Equal(t, int64(1550), it.AtT()) // it is an out-of-order sample. + require.NoError(t, it.Err()) + + require.NoError(t, q.Close()) // Cannot be deferred as the compaction waits for queries to close before finishing. + + <-compactionFinished // Wait for compaction otherwise Go test finds stray goroutines. +} + func TestAppendHistogram(t *testing.T) { l := labels.FromStrings("a", "b") for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 7b58ec566..a3c959bc4 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -513,7 +513,7 @@ type HeadAndOOOQuerier struct { head *Head index IndexReader chunkr ChunkReader - querier storage.Querier + querier storage.Querier // Used for LabelNames, LabelValues, but may be nil if head was truncated in the mean time, in which case we ignore it and not close it in the end. } func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { @@ -534,15 +534,24 @@ func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolatio } func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + if q.querier == nil { + return nil, nil, nil + } return q.querier.LabelValues(ctx, name, hints, matchers...) } func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + if q.querier == nil { + return nil, nil, nil + } return q.querier.LabelNames(ctx, hints, matchers...) } func (q *HeadAndOOOQuerier) Close() error { q.chunkr.Close() + if q.querier == nil { + return nil + } return q.querier.Close() }