From 227d155e3f7455186c10c8ff00d1647e941d6513 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 8 Dec 2021 21:02:14 +0530 Subject: [PATCH] Fix queries after a failed snapshot replay (#9980) Signed-off-by: Ganesh Vernekar --- tsdb/head.go | 4 +- tsdb/head_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++++++ tsdb/head_wal.go | 3 +- 3 files changed, 109 insertions(+), 2 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index dff2b0c67..0722d10ba 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -473,7 +473,9 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second // limits the ingested samples to the head min valid time. func (h *Head) Init(minValidTime int64) error { h.minValidTime.Store(minValidTime) - defer h.postings.EnsureOrder() + defer func() { + h.postings.EnsureOrder() + }() defer h.gc() // After loading the wal remove the obsolete data from the head. defer func() { // Loading of m-mapped chunks and snapshot can make the mint of the Head diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f9030f5ea..d5f6f6373 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2847,3 +2847,107 @@ func TestSnapshotError(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(tm)) } + +// Tests https://github.com/prometheus/prometheus/issues/9725. +func TestChunkSnapshotReplayBug(t *testing.T) { + dir := t.TempDir() + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + require.NoError(t, err) + + // Write few series records and samples such that the series references are not in order in the WAL + // for status_code="200". + var buf []byte + for i := 1; i <= 1000; i++ { + var ref chunks.HeadSeriesRef + if i <= 500 { + ref = chunks.HeadSeriesRef(i * 100) + } else { + ref = chunks.HeadSeriesRef((i - 500) * 50) + } + seriesRec := record.RefSeries{ + Ref: ref, + Labels: labels.Labels{ + {Name: "__name__", Value: "request_duration"}, + {Name: "status_code", Value: "200"}, + {Name: "foo", Value: fmt.Sprintf("baz%d", rand.Int())}, + }, + } + // Add a sample so that the series is not garbage collected. + samplesRec := record.RefSample{Ref: ref, T: 1000, V: 1000} + var enc record.Encoder + + rec := enc.Series([]record.RefSeries{seriesRec}, buf) + buf = rec[:0] + require.NoError(t, wlog.Log(rec)) + rec = enc.Samples([]record.RefSample{samplesRec}, buf) + buf = rec[:0] + require.NoError(t, wlog.Log(rec)) + } + + // Write a corrupt snapshot to fail the replay on startup. + snapshotName := chunkSnapshotDir(0, 100) + cpdir := filepath.Join(dir, snapshotName) + require.NoError(t, os.MkdirAll(cpdir, 0o777)) + + err = ioutil.WriteFile(filepath.Join(cpdir, "00000000"), []byte{1, 5, 3, 5, 6, 7, 4, 2, 2}, 0o777) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkDirRoot = dir + opts.EnableMemorySnapshotOnShutdown = true + head, err := NewHead(nil, nil, wlog, opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + defer func() { + require.NoError(t, head.Close()) + }() + + // Snapshot replay should error out. + require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) + + // Querying `request_duration{status_code!="200"}` should return no series since all of + // them have status_code="200". + q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + series := query(t, q, + labels.MustNewMatcher(labels.MatchEqual, "__name__", "request_duration"), + labels.MustNewMatcher(labels.MatchNotEqual, "status_code", "200"), + ) + require.Len(t, series, 0, "there should be no series found") +} + +func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { + dir := t.TempDir() + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + require.NoError(t, err) + + // Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots. + snapshotName := chunkSnapshotDir(0, 100) + ".tmp" + cpdir := filepath.Join(dir, snapshotName) + require.NoError(t, os.MkdirAll(cpdir, 0o777)) + + opts := DefaultHeadOptions() + opts.ChunkDirRoot = dir + opts.EnableMemorySnapshotOnShutdown = true + head, err := NewHead(nil, nil, wlog, opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + + require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) + + // Add some samples for the snapshot. + app := head.Appender(context.Background()) + _, err = app.Append(0, labels.Labels{{Name: "foo", Value: "bar"}}, 10, 10) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Should not return any error for a successful snapshot. + require.NoError(t, head.Close()) + + // Verify the snapshot. + name, idx, offset, err := LastChunkSnapshot(dir) + require.NoError(t, err) + require.True(t, name != "") + require.Equal(t, 0, idx) + require.Greater(t, offset, 0) +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 819128493..d0fe685ff 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -777,7 +777,8 @@ func LastChunkSnapshot(dir string) (string, int, int, error) { splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") if len(splits) != 2 { - return "", 0, 0, errors.Errorf("chunk snapshot %s is not in the right format", fi.Name()) + // Chunk snapshots is not in the right format, we do not care about it. + continue } idx, err := strconv.Atoi(splits[0])