From f5913266a13429eef7507a024fe171118a2e673e Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 13 Oct 2023 08:21:35 -0400 Subject: [PATCH] Additionally wrap WBL replay error (#12406) * Additionally wrap WBL replay error Although WBL replay is already wrapped with errLoadWbl, there are other errors that can happen during a WBL replay. We should not try to repair WAL in those cases. This commit additionally wraps the final error in Head.Init again with errLoadWbl so that WBL replay errors can be identified properly. Signed-off-by: Ganesh Vernekar Signed-off-by: Jesus Vazquez Co-authored-by: Jesus Vazquez --- tsdb/db.go | 12 ++++---- tsdb/db_test.go | 2 +- tsdb/head.go | 8 ++--- tsdb/head_test.go | 78 +++++++++++++++++++++++++++++++++++++++++++++-- tsdb/head_wal.go | 8 ----- 5 files changed, 86 insertions(+), 22 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 5c7040703..ce8ef1f9a 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -887,13 +887,13 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs if initErr := db.head.Init(minValidTime); initErr != nil { db.head.metrics.walCorruptionsTotal.Inc() - isOOOErr := isErrLoadOOOWal(initErr) - if isOOOErr { - level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr) - if err := wbl.Repair(initErr); err != nil { - return nil, errors.Wrap(err, "repair corrupted OOO WAL") + e, ok := initErr.(*errLoadWbl) + if ok { + level.Warn(db.logger).Log("msg", "Encountered WBL read error, attempting repair", "err", initErr) + if err := wbl.Repair(e.err); err != nil { + return nil, errors.Wrap(err, "repair corrupted WBL") } - level.Info(db.logger).Log("msg", "Successfully repaired OOO WAL") + level.Info(db.logger).Log("msg", "Successfully repaired WBL") } else { level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr) if err := wal.Repair(initErr); err != nil { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 6fda4e3bd..773561c6c 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3793,7 +3793,7 @@ func TestOOOWALWrite(t *testing.T) { actRecs := getRecords(path.Join(dir, "wal")) require.Equal(t, inOrderRecords, actRecs) - // The OOO WAL. + // The WBL. actRecs = getRecords(path.Join(dir, wlog.WblDirName)) require.Equal(t, oooRecords, actRecs) } diff --git a/tsdb/head.go b/tsdb/head.go index ea71b2718..0ea2b8385 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -765,17 +765,17 @@ func (h *Head) Init(minValidTime int64) error { wblReplayStart := time.Now() if h.wbl != nil { - // Replay OOO WAL. + // Replay WBL. startFrom, endAt, e = wlog.Segments(h.wbl.Dir()) if e != nil { - return errors.Wrap(e, "finding OOO WAL segments") + return &errLoadWbl{errors.Wrap(e, "finding WBL segments")} } h.startWALReplayStatus(startFrom, endAt) for i := startFrom; i <= endAt; i++ { s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i)) if err != nil { - return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i)) + return &errLoadWbl{errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))} } sr := wlog.NewSegmentBufReader(s) @@ -784,7 +784,7 @@ func (h *Head) Init(minValidTime int64) error { level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err) } if err != nil { - return err + return &errLoadWbl{err} } level.Info(h.logger).Log("msg", "WBL segment loaded", "segment", i, "maxSegment", endAt) h.updateWALReplayStatusRead(i) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index a9179af62..54fd469a3 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2086,6 +2086,79 @@ func TestWalRepair_DecodingError(t *testing.T) { } } +// TestWblRepair_DecodingError ensures that a repair is run for an error +// when decoding a record. +func TestWblRepair_DecodingError(t *testing.T) { + var enc record.Encoder + corrFunc := func(rec []byte) []byte { + return rec[:3] + } + rec := enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}) + totalRecs := 9 + expRecs := 5 + dir := t.TempDir() + + // Fill the wbl and corrupt it. + { + wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) + require.NoError(t, err) + wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) + require.NoError(t, err) + + for i := 1; i <= totalRecs; i++ { + // At this point insert a corrupted record. + if i-1 == expRecs { + require.NoError(t, wbl.Log(corrFunc(rec))) + continue + } + require.NoError(t, wbl.Log(rec)) + } + + opts := DefaultHeadOptions() + opts.ChunkRange = 1 + opts.ChunkDirRoot = wal.Dir() + opts.OutOfOrderCapMax.Store(30) + opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds()) + h, err := NewHead(nil, nil, wal, wbl, opts, nil) + require.NoError(t, err) + require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) + initErr := h.Init(math.MinInt64) + + _, ok := initErr.(*errLoadWbl) + require.True(t, ok) // Wbl errors are wrapped into errLoadWbl, make sure we can unwrap it. + + err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. + _, corrErr := err.(*wlog.CorruptionErr) + require.True(t, corrErr, "reading the wal didn't return corruption error") + require.NoError(t, h.Close()) // Head will close the wal as well. + } + + // Open the db to trigger a repair. + { + db, err := Open(dir, nil, nil, DefaultOptions(), nil) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal)) + } + + // Read the wbl content after the repair. + { + sr, err := wlog.NewSegmentsReader(filepath.Join(dir, "wbl")) + require.NoError(t, err) + defer sr.Close() + r := wlog.NewReader(sr) + + var actRec int + for r.Next() { + actRec++ + } + require.NoError(t, r.Err()) + require.Equal(t, expRecs, actRec, "Wrong number of intact records") + } +} + func TestHeadReadWriterRepair(t *testing.T) { dir := t.TempDir() @@ -4446,9 +4519,8 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { require.Greater(t, offset, 0) } -// TestOOOWalReplay checks the replay at a low level. -// TODO(codesome): Needs test for ooo WAL repair. -func TestOOOWalReplay(t *testing.T) { +// TestWBLReplay checks the replay at a low level. +func TestWBLReplay(t *testing.T) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 804060ad5..d6780c021 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -825,22 +825,14 @@ func (e errLoadWbl) Error() string { return e.err.Error() } -// To support errors.Cause(). func (e errLoadWbl) Cause() error { return e.err } -// To support errors.Unwrap(). func (e errLoadWbl) Unwrap() error { return e.err } -// isErrLoadOOOWal returns a boolean if the error is errLoadWbl. -func isErrLoadOOOWal(err error) bool { - _, ok := err.(*errLoadWbl) - return ok -} - type wblSubsetProcessor struct { input chan wblSubsetProcessorInputItem output chan []record.RefSample