diff --git a/tsdb/db.go b/tsdb/db.go index 5c7040703..ce8ef1f9a 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -887,13 +887,13 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs if initErr := db.head.Init(minValidTime); initErr != nil { db.head.metrics.walCorruptionsTotal.Inc() - isOOOErr := isErrLoadOOOWal(initErr) - if isOOOErr { - level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr) - if err := wbl.Repair(initErr); err != nil { - return nil, errors.Wrap(err, "repair corrupted OOO WAL") + e, ok := initErr.(*errLoadWbl) + if ok { + level.Warn(db.logger).Log("msg", "Encountered WBL read error, attempting repair", "err", initErr) + if err := wbl.Repair(e.err); err != nil { + return nil, errors.Wrap(err, "repair corrupted WBL") } - level.Info(db.logger).Log("msg", "Successfully repaired OOO WAL") + level.Info(db.logger).Log("msg", "Successfully repaired WBL") } else { level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr) if err := wal.Repair(initErr); err != nil { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 6fda4e3bd..773561c6c 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3793,7 +3793,7 @@ func TestOOOWALWrite(t *testing.T) { actRecs := getRecords(path.Join(dir, "wal")) require.Equal(t, inOrderRecords, actRecs) - // The OOO WAL. + // The WBL. actRecs = getRecords(path.Join(dir, wlog.WblDirName)) require.Equal(t, oooRecords, actRecs) } diff --git a/tsdb/head.go b/tsdb/head.go index ea71b2718..0ea2b8385 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -765,17 +765,17 @@ func (h *Head) Init(minValidTime int64) error { wblReplayStart := time.Now() if h.wbl != nil { - // Replay OOO WAL. + // Replay WBL. startFrom, endAt, e = wlog.Segments(h.wbl.Dir()) if e != nil { - return errors.Wrap(e, "finding OOO WAL segments") + return &errLoadWbl{errors.Wrap(e, "finding WBL segments")} } h.startWALReplayStatus(startFrom, endAt) for i := startFrom; i <= endAt; i++ { s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i)) if err != nil { - return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i)) + return &errLoadWbl{errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))} } sr := wlog.NewSegmentBufReader(s) @@ -784,7 +784,7 @@ func (h *Head) Init(minValidTime int64) error { level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err) } if err != nil { - return err + return &errLoadWbl{err} } level.Info(h.logger).Log("msg", "WBL segment loaded", "segment", i, "maxSegment", endAt) h.updateWALReplayStatusRead(i) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index a9179af62..54fd469a3 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2086,6 +2086,79 @@ func TestWalRepair_DecodingError(t *testing.T) { } } +// TestWblRepair_DecodingError ensures that a repair is run for an error +// when decoding a record. +func TestWblRepair_DecodingError(t *testing.T) { + var enc record.Encoder + corrFunc := func(rec []byte) []byte { + return rec[:3] + } + rec := enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}) + totalRecs := 9 + expRecs := 5 + dir := t.TempDir() + + // Fill the wbl and corrupt it. + { + wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) + require.NoError(t, err) + wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) + require.NoError(t, err) + + for i := 1; i <= totalRecs; i++ { + // At this point insert a corrupted record. + if i-1 == expRecs { + require.NoError(t, wbl.Log(corrFunc(rec))) + continue + } + require.NoError(t, wbl.Log(rec)) + } + + opts := DefaultHeadOptions() + opts.ChunkRange = 1 + opts.ChunkDirRoot = wal.Dir() + opts.OutOfOrderCapMax.Store(30) + opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds()) + h, err := NewHead(nil, nil, wal, wbl, opts, nil) + require.NoError(t, err) + require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) + initErr := h.Init(math.MinInt64) + + _, ok := initErr.(*errLoadWbl) + require.True(t, ok) // Wbl errors are wrapped into errLoadWbl, make sure we can unwrap it. + + err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. + _, corrErr := err.(*wlog.CorruptionErr) + require.True(t, corrErr, "reading the wal didn't return corruption error") + require.NoError(t, h.Close()) // Head will close the wal as well. + } + + // Open the db to trigger a repair. + { + db, err := Open(dir, nil, nil, DefaultOptions(), nil) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal)) + } + + // Read the wbl content after the repair. + { + sr, err := wlog.NewSegmentsReader(filepath.Join(dir, "wbl")) + require.NoError(t, err) + defer sr.Close() + r := wlog.NewReader(sr) + + var actRec int + for r.Next() { + actRec++ + } + require.NoError(t, r.Err()) + require.Equal(t, expRecs, actRec, "Wrong number of intact records") + } +} + func TestHeadReadWriterRepair(t *testing.T) { dir := t.TempDir() @@ -4446,9 +4519,8 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { require.Greater(t, offset, 0) } -// TestOOOWalReplay checks the replay at a low level. -// TODO(codesome): Needs test for ooo WAL repair. -func TestOOOWalReplay(t *testing.T) { +// TestWBLReplay checks the replay at a low level. +func TestWBLReplay(t *testing.T) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 804060ad5..d6780c021 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -825,22 +825,14 @@ func (e errLoadWbl) Error() string { return e.err.Error() } -// To support errors.Cause(). func (e errLoadWbl) Cause() error { return e.err } -// To support errors.Unwrap(). func (e errLoadWbl) Unwrap() error { return e.err } -// isErrLoadOOOWal returns a boolean if the error is errLoadWbl. -func isErrLoadOOOWal(err error) bool { - _, ok := err.(*errLoadWbl) - return ok -} - type wblSubsetProcessor struct { input chan wblSubsetProcessorInputItem output chan []record.RefSample