Always create a new clean segment when starting the WAL. (#608)

* Always create a new clean segment when starting the WAL.
* Ensure we flush the last page after repairing and before recreating the
new segment in Repair.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
pull/5805/head
Callum Styan 6 years ago committed by Brian Brazil
parent d48a5e2d5c
commit 562e93e8e6

@ -14,6 +14,7 @@
package tsdb
import (
"fmt"
"math"
"runtime"
"sort"
@ -492,22 +493,32 @@ func (h *Head) Init(minValidTime int64) error {
startFrom++
}
// Backfill segments from the last checkpoint onwards
sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1})
// Find the last segment.
_, last, err := h.wal.Segments()
if err != nil {
return errors.Wrap(err, "open WAL segments")
return errors.Wrap(err, "finding WAL segments")
}
// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
}
sr := wal.NewSegmentBufReader(s)
err = h.loadWAL(wal.NewReader(sr))
sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows.
if err == nil {
return nil
continue
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
h.metrics.walCorruptionsTotal.Inc()
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}
}
return nil
}

@ -228,21 +228,17 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
}
_, j, err := w.Segments()
// Index of the Segment we want to open and write to.
writeSegmentIndex := 0
if err != nil {
return nil, errors.Wrap(err, "get segment range")
}
// Fresh dir, no segments yet.
if j == -1 {
segment, err := CreateSegment(w.dir, 0)
if err != nil {
return nil, err
// If some segments already exist create one with a higher index than the last segment.
if j != -1 {
writeSegmentIndex = j + 1
}
if err := w.setSegment(segment); err != nil {
return nil, err
}
} else {
segment, err := OpenWriteSegment(logger, w.dir, j)
segment, err := CreateSegment(w.dir, writeSegmentIndex)
if err != nil {
return nil, err
}
@ -250,7 +246,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
if err := w.setSegment(segment); err != nil {
return nil, err
}
}
go w.run()
return w, nil
@ -363,6 +359,9 @@ func (w *WAL) Repair(origErr error) error {
}
// We expect an error here from r.Err(), so nothing to handle.
// We need to pad to the end of the last page in the repaired segment
w.flushPage(true)
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
@ -372,6 +371,20 @@ func (w *WAL) Repair(origErr error) error {
if err := os.Remove(tmpfn); err != nil {
return errors.Wrap(err, "delete corrupted segment")
}
// Explicitly close the the segment we just repaired to avoid issues with Windows.
s.Close()
// We always want to start writing to a new Segment rather than an existing
// Segment, which is handled by NewSize, but earlier in Repair we're deleting
// all segments that come after the corrupted Segment. Recreate a new Segment here.
s, err = CreateSegment(w.dir, cerr.Segment+1)
if err != nil {
return err
}
if err := w.setSegment(s); err != nil {
return err
}
return nil
}
@ -710,7 +723,7 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) {
segs = append(segs, s)
}
}
return newSegmentBufReader(segs...), nil
return NewSegmentBufReader(segs...), nil
}
// segmentBufReader is a buffered reader that reads in multiples of pages.
@ -725,7 +738,7 @@ type segmentBufReader struct {
off int // Offset of read data into current segment.
}
func newSegmentBufReader(segs ...*Segment) *segmentBufReader {
func NewSegmentBufReader(segs ...*Segment) *segmentBufReader {
return &segmentBufReader{
buf: bufio.NewReaderSize(segs[0], 16*pageSize),
segs: segs,

@ -143,20 +143,35 @@ func TestWAL_Repair(t *testing.T) {
testutil.Ok(t, err)
defer w.Close()
sr, err := NewSegmentsReader(dir)
first, last, err := w.Segments()
testutil.Ok(t, err)
// Backfill segments from the most recent checkpoint onwards.
for i := first; i <= last; i++ {
s, err := OpenReadSegment(SegmentName(w.Dir(), i))
testutil.Ok(t, err)
r := NewReader(sr)
sr := NewSegmentBufReader(s)
testutil.Ok(t, err)
r := NewReader(sr)
for r.Next() {
}
testutil.NotOk(t, r.Err())
testutil.Ok(t, sr.Close())
//Close the segment so we don't break things on Windows.
s.Close()
// No corruption in this segment.
if r.Err() == nil {
continue
}
testutil.Ok(t, w.Repair(r.Err()))
sr, err = NewSegmentsReader(dir)
break
}
sr, err := NewSegmentsReader(dir)
testutil.Ok(t, err)
defer sr.Close()
r = NewReader(sr)
r := NewReader(sr)
var result [][]byte
for r.Next() {
@ -172,10 +187,13 @@ func TestWAL_Repair(t *testing.T) {
}
}
// Make sure the last segment is the corrupt segment.
_, last, err := w.Segments()
// Make sure there is a new 0 size Segment after the corrupted Segment.
_, last, err = w.Segments()
testutil.Ok(t, err)
testutil.Equals(t, test.corrSgm+1, last)
fi, err := os.Stat(SegmentName(dir, last))
testutil.Ok(t, err)
testutil.Equals(t, test.corrSgm, last)
testutil.Equals(t, int64(0), fi.Size())
})
}
}
@ -276,6 +294,10 @@ func TestCorruptAndCarryOn(t *testing.T) {
err = w.Repair(corruptionErr)
testutil.Ok(t, err)
// Ensure that we have a completely clean slate after reapiring.
testutil.Equals(t, w.segment.Index(), 1) // We corrupted segment 0.
testutil.Equals(t, w.donePages, 0)
for i := 0; i < 5; i++ {
buf := make([]byte, recordSize)
_, err := rand.Read(buf)

Loading…
Cancel
Save