Browse Source

Additionally wrap WBL replay error (#12406)

* Additionally wrap WBL replay error

Although WBL replay is already wrapped with errLoadWbl,
there are other errors that can happen during a WBL replay.
We should not try to repair WAL in those cases.

This commit additionally wraps the final error in Head.Init again
with errLoadWbl so that WBL replay errors can be identified properly.

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Signed-off-by: Jesus Vazquez <jesusvzpg@gmail.com>
Co-authored-by: Jesus Vazquez <jesusvzpg@gmail.com>
Signed-off-by: Levi Harrison <git@leviharrison.dev>
pull/12985/head
Ganesh Vernekar 1 year ago committed by Levi Harrison
parent
commit
4df2f2432b
  1. 12
      tsdb/db.go
  2. 2
      tsdb/db_test.go
  3. 8
      tsdb/head.go
  4. 78
      tsdb/head_test.go
  5. 8
      tsdb/head_wal.go

12
tsdb/db.go

@ -887,13 +887,13 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
if initErr := db.head.Init(minValidTime); initErr != nil {
db.head.metrics.walCorruptionsTotal.Inc()
isOOOErr := isErrLoadOOOWal(initErr)
if isOOOErr {
level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr)
if err := wbl.Repair(initErr); err != nil {
return nil, errors.Wrap(err, "repair corrupted OOO WAL")
e, ok := initErr.(*errLoadWbl)
if ok {
level.Warn(db.logger).Log("msg", "Encountered WBL read error, attempting repair", "err", initErr)
if err := wbl.Repair(e.err); err != nil {
return nil, errors.Wrap(err, "repair corrupted WBL")
}
level.Info(db.logger).Log("msg", "Successfully repaired OOO WAL")
level.Info(db.logger).Log("msg", "Successfully repaired WBL")
} else {
level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr)
if err := wal.Repair(initErr); err != nil {

2
tsdb/db_test.go

@ -3793,7 +3793,7 @@ func TestOOOWALWrite(t *testing.T) {
actRecs := getRecords(path.Join(dir, "wal"))
require.Equal(t, inOrderRecords, actRecs)
// The OOO WAL.
// The WBL.
actRecs = getRecords(path.Join(dir, wlog.WblDirName))
require.Equal(t, oooRecords, actRecs)
}

8
tsdb/head.go

@ -765,17 +765,17 @@ func (h *Head) Init(minValidTime int64) error {
wblReplayStart := time.Now()
if h.wbl != nil {
// Replay OOO WAL.
// Replay WBL.
startFrom, endAt, e = wlog.Segments(h.wbl.Dir())
if e != nil {
return errors.Wrap(e, "finding OOO WAL segments")
return &errLoadWbl{errors.Wrap(e, "finding WBL segments")}
}
h.startWALReplayStatus(startFrom, endAt)
for i := startFrom; i <= endAt; i++ {
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i))
if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))
return &errLoadWbl{errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))}
}
sr := wlog.NewSegmentBufReader(s)
@ -784,7 +784,7 @@ func (h *Head) Init(minValidTime int64) error {
level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err)
}
if err != nil {
return err
return &errLoadWbl{err}
}
level.Info(h.logger).Log("msg", "WBL segment loaded", "segment", i, "maxSegment", endAt)
h.updateWALReplayStatusRead(i)

78
tsdb/head_test.go

@ -2086,6 +2086,79 @@ func TestWalRepair_DecodingError(t *testing.T) {
}
}
// TestWblRepair_DecodingError ensures that a repair is run for an error
// when decoding a record.
func TestWblRepair_DecodingError(t *testing.T) {
var enc record.Encoder
corrFunc := func(rec []byte) []byte {
return rec[:3]
}
rec := enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{})
totalRecs := 9
expRecs := 5
dir := t.TempDir()
// Fill the wbl and corrupt it.
{
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
require.NoError(t, err)
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
require.NoError(t, err)
for i := 1; i <= totalRecs; i++ {
// At this point insert a corrupted record.
if i-1 == expRecs {
require.NoError(t, wbl.Log(corrFunc(rec)))
continue
}
require.NoError(t, wbl.Log(rec))
}
opts := DefaultHeadOptions()
opts.ChunkRange = 1
opts.ChunkDirRoot = wal.Dir()
opts.OutOfOrderCapMax.Store(30)
opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds())
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
require.NoError(t, err)
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
initErr := h.Init(math.MinInt64)
_, ok := initErr.(*errLoadWbl)
require.True(t, ok) // Wbl errors are wrapped into errLoadWbl, make sure we can unwrap it.
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
_, corrErr := err.(*wlog.CorruptionErr)
require.True(t, corrErr, "reading the wal didn't return corruption error")
require.NoError(t, h.Close()) // Head will close the wal as well.
}
// Open the db to trigger a repair.
{
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal))
}
// Read the wbl content after the repair.
{
sr, err := wlog.NewSegmentsReader(filepath.Join(dir, "wbl"))
require.NoError(t, err)
defer sr.Close()
r := wlog.NewReader(sr)
var actRec int
for r.Next() {
actRec++
}
require.NoError(t, r.Err())
require.Equal(t, expRecs, actRec, "Wrong number of intact records")
}
}
func TestHeadReadWriterRepair(t *testing.T) {
dir := t.TempDir()
@ -4446,9 +4519,8 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
require.Greater(t, offset, 0)
}
// TestOOOWalReplay checks the replay at a low level.
// TODO(codesome): Needs test for ooo WAL repair.
func TestOOOWalReplay(t *testing.T) {
// TestWBLReplay checks the replay at a low level.
func TestWBLReplay(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
require.NoError(t, err)

8
tsdb/head_wal.go

@ -825,22 +825,14 @@ func (e errLoadWbl) Error() string {
return e.err.Error()
}
// To support errors.Cause().
func (e errLoadWbl) Cause() error {
return e.err
}
// To support errors.Unwrap().
func (e errLoadWbl) Unwrap() error {
return e.err
}
// isErrLoadOOOWal returns a boolean if the error is errLoadWbl.
func isErrLoadOOOWal(err error) bool {
_, ok := err.(*errLoadWbl)
return ok
}
type wblSubsetProcessor struct {
input chan wblSubsetProcessorInputItem
output chan []record.RefSample

Loading…
Cancel
Save