mirror of https://github.com/prometheus/prometheus
Additionally wrap WBL replay error (#12406)
* Additionally wrap WBL replay error Although WBL replay is already wrapped with errLoadWbl, there are other errors that can happen during a WBL replay. We should not try to repair WAL in those cases. This commit additionally wraps the final error in Head.Init again with errLoadWbl so that WBL replay errors can be identified properly. Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesusvzpg@gmail.com> Co-authored-by: Jesus Vazquez <jesusvzpg@gmail.com>pull/11860/head
parent
ffa74eb12d
commit
f5913266a1
12
tsdb/db.go
12
tsdb/db.go
|
@ -887,13 +887,13 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
|
||||
if initErr := db.head.Init(minValidTime); initErr != nil {
|
||||
db.head.metrics.walCorruptionsTotal.Inc()
|
||||
isOOOErr := isErrLoadOOOWal(initErr)
|
||||
if isOOOErr {
|
||||
level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr)
|
||||
if err := wbl.Repair(initErr); err != nil {
|
||||
return nil, errors.Wrap(err, "repair corrupted OOO WAL")
|
||||
e, ok := initErr.(*errLoadWbl)
|
||||
if ok {
|
||||
level.Warn(db.logger).Log("msg", "Encountered WBL read error, attempting repair", "err", initErr)
|
||||
if err := wbl.Repair(e.err); err != nil {
|
||||
return nil, errors.Wrap(err, "repair corrupted WBL")
|
||||
}
|
||||
level.Info(db.logger).Log("msg", "Successfully repaired OOO WAL")
|
||||
level.Info(db.logger).Log("msg", "Successfully repaired WBL")
|
||||
} else {
|
||||
level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr)
|
||||
if err := wal.Repair(initErr); err != nil {
|
||||
|
|
|
@ -3793,7 +3793,7 @@ func TestOOOWALWrite(t *testing.T) {
|
|||
actRecs := getRecords(path.Join(dir, "wal"))
|
||||
require.Equal(t, inOrderRecords, actRecs)
|
||||
|
||||
// The OOO WAL.
|
||||
// The WBL.
|
||||
actRecs = getRecords(path.Join(dir, wlog.WblDirName))
|
||||
require.Equal(t, oooRecords, actRecs)
|
||||
}
|
||||
|
|
|
@ -765,17 +765,17 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
|
||||
wblReplayStart := time.Now()
|
||||
if h.wbl != nil {
|
||||
// Replay OOO WAL.
|
||||
// Replay WBL.
|
||||
startFrom, endAt, e = wlog.Segments(h.wbl.Dir())
|
||||
if e != nil {
|
||||
return errors.Wrap(e, "finding OOO WAL segments")
|
||||
return &errLoadWbl{errors.Wrap(e, "finding WBL segments")}
|
||||
}
|
||||
h.startWALReplayStatus(startFrom, endAt)
|
||||
|
||||
for i := startFrom; i <= endAt; i++ {
|
||||
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))
|
||||
return &errLoadWbl{errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))}
|
||||
}
|
||||
|
||||
sr := wlog.NewSegmentBufReader(s)
|
||||
|
@ -784,7 +784,7 @@ func (h *Head) Init(minValidTime int64) error {
|
|||
level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
return &errLoadWbl{err}
|
||||
}
|
||||
level.Info(h.logger).Log("msg", "WBL segment loaded", "segment", i, "maxSegment", endAt)
|
||||
h.updateWALReplayStatusRead(i)
|
||||
|
|
|
@ -2086,6 +2086,79 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestWblRepair_DecodingError ensures that a repair is run for an error
|
||||
// when decoding a record.
|
||||
func TestWblRepair_DecodingError(t *testing.T) {
|
||||
var enc record.Encoder
|
||||
corrFunc := func(rec []byte) []byte {
|
||||
return rec[:3]
|
||||
}
|
||||
rec := enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{})
|
||||
totalRecs := 9
|
||||
expRecs := 5
|
||||
dir := t.TempDir()
|
||||
|
||||
// Fill the wbl and corrupt it.
|
||||
{
|
||||
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
|
||||
require.NoError(t, err)
|
||||
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := 1; i <= totalRecs; i++ {
|
||||
// At this point insert a corrupted record.
|
||||
if i-1 == expRecs {
|
||||
require.NoError(t, wbl.Log(corrFunc(rec)))
|
||||
continue
|
||||
}
|
||||
require.NoError(t, wbl.Log(rec))
|
||||
}
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1
|
||||
opts.ChunkDirRoot = wal.Dir()
|
||||
opts.OutOfOrderCapMax.Store(30)
|
||||
opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds())
|
||||
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
|
||||
initErr := h.Init(math.MinInt64)
|
||||
|
||||
_, ok := initErr.(*errLoadWbl)
|
||||
require.True(t, ok) // Wbl errors are wrapped into errLoadWbl, make sure we can unwrap it.
|
||||
|
||||
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
|
||||
_, corrErr := err.(*wlog.CorruptionErr)
|
||||
require.True(t, corrErr, "reading the wal didn't return corruption error")
|
||||
require.NoError(t, h.Close()) // Head will close the wal as well.
|
||||
}
|
||||
|
||||
// Open the db to trigger a repair.
|
||||
{
|
||||
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, db.Close())
|
||||
}()
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal))
|
||||
}
|
||||
|
||||
// Read the wbl content after the repair.
|
||||
{
|
||||
sr, err := wlog.NewSegmentsReader(filepath.Join(dir, "wbl"))
|
||||
require.NoError(t, err)
|
||||
defer sr.Close()
|
||||
r := wlog.NewReader(sr)
|
||||
|
||||
var actRec int
|
||||
for r.Next() {
|
||||
actRec++
|
||||
}
|
||||
require.NoError(t, r.Err())
|
||||
require.Equal(t, expRecs, actRec, "Wrong number of intact records")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeadReadWriterRepair(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
|
@ -4446,9 +4519,8 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
|||
require.Greater(t, offset, 0)
|
||||
}
|
||||
|
||||
// TestOOOWalReplay checks the replay at a low level.
|
||||
// TODO(codesome): Needs test for ooo WAL repair.
|
||||
func TestOOOWalReplay(t *testing.T) {
|
||||
// TestWBLReplay checks the replay at a low level.
|
||||
func TestWBLReplay(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -825,22 +825,14 @@ func (e errLoadWbl) Error() string {
|
|||
return e.err.Error()
|
||||
}
|
||||
|
||||
// To support errors.Cause().
|
||||
func (e errLoadWbl) Cause() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
// To support errors.Unwrap().
|
||||
func (e errLoadWbl) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
// isErrLoadOOOWal returns a boolean if the error is errLoadWbl.
|
||||
func isErrLoadOOOWal(err error) bool {
|
||||
_, ok := err.(*errLoadWbl)
|
||||
return ok
|
||||
}
|
||||
|
||||
type wblSubsetProcessor struct {
|
||||
input chan wblSubsetProcessorInputItem
|
||||
output chan []record.RefSample
|
||||
|
|
Loading…
Reference in New Issue