|
|
|
@ -14,6 +14,7 @@
|
|
|
|
|
package wlog
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"math"
|
|
|
|
@ -25,7 +26,6 @@ import (
|
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log"
|
|
|
|
|
"github.com/go-kit/log/level"
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
|
"golang.org/x/exp/slices"
|
|
|
|
|
|
|
|
|
@ -264,7 +264,7 @@ func (w *Watcher) loop() {
|
|
|
|
|
func (w *Watcher) Run() error {
|
|
|
|
|
_, lastSegment, err := w.firstAndLast()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "wal.Segments")
|
|
|
|
|
return fmt.Errorf("wal.Segments: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We want to ensure this is false across iterations since
|
|
|
|
@ -275,13 +275,13 @@ func (w *Watcher) Run() error {
|
|
|
|
|
|
|
|
|
|
// Backfill from the checkpoint first if it exists.
|
|
|
|
|
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
|
|
|
|
|
if err != nil && err != record.ErrNotFound {
|
|
|
|
|
return errors.Wrap(err, "tsdb.LastCheckpoint")
|
|
|
|
|
if err != nil && !errors.Is(err, record.ErrNotFound) {
|
|
|
|
|
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil {
|
|
|
|
|
return errors.Wrap(err, "readCheckpoint")
|
|
|
|
|
return fmt.Errorf("readCheckpoint: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
w.lastCheckpoint = lastCheckpoint
|
|
|
|
@ -371,7 +371,7 @@ func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, s
|
|
|
|
|
|
|
|
|
|
// Ignore all errors reading to end of segment whilst replaying the WAL.
|
|
|
|
|
if !tail {
|
|
|
|
|
if err != nil && errors.Cause(err) != io.EOF {
|
|
|
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
|
|
|
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
|
|
|
|
|
} else if r.Offset() != size {
|
|
|
|
|
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
|
|
|
|
@ -380,7 +380,7 @@ func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Otherwise, when we are tailing, non-EOFs are fatal.
|
|
|
|
|
if errors.Cause(err) != io.EOF {
|
|
|
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
@ -403,7 +403,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|
|
|
|
var err error
|
|
|
|
|
size, err = getSegmentSize(w.walDir, segmentNum)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "getSegmentSize")
|
|
|
|
|
return fmt.Errorf("getSegmentSize: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return w.readAndHandleError(reader, segmentNum, tail, size)
|
|
|
|
@ -447,7 +447,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|
|
|
|
case <-segmentTicker.C:
|
|
|
|
|
_, last, err := w.firstAndLast()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "segments")
|
|
|
|
|
return fmt.Errorf("segments: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check if new segments exists.
|
|
|
|
@ -459,7 +459,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|
|
|
|
// Ignore errors reading to end of segment whilst replaying the WAL.
|
|
|
|
|
if !tail {
|
|
|
|
|
switch {
|
|
|
|
|
case err != nil && errors.Cause(err) != io.EOF:
|
|
|
|
|
case err != nil && !errors.Is(err, io.EOF):
|
|
|
|
|
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "err", err)
|
|
|
|
|
case reader.Offset() != size:
|
|
|
|
|
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
|
|
|
|
@ -468,7 +468,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Otherwise, when we are tailing, non-EOFs are fatal.
|
|
|
|
|
if errors.Cause(err) != io.EOF {
|
|
|
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -497,8 +497,8 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|
|
|
|
|
|
|
|
|
func (w *Watcher) garbageCollectSeries(segmentNum int) error {
|
|
|
|
|
dir, _, err := LastCheckpoint(w.walDir)
|
|
|
|
|
if err != nil && err != record.ErrNotFound {
|
|
|
|
|
return errors.Wrap(err, "tsdb.LastCheckpoint")
|
|
|
|
|
if err != nil && !errors.Is(err, record.ErrNotFound) {
|
|
|
|
|
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if dir == "" || dir == w.lastCheckpoint {
|
|
|
|
@ -508,7 +508,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
|
|
|
|
|
|
|
|
|
|
index, err := checkpointNum(dir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "error parsing checkpoint filename")
|
|
|
|
|
return fmt.Errorf("error parsing checkpoint filename: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if index >= segmentNum {
|
|
|
|
@ -519,7 +519,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
|
|
|
|
|
level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum)
|
|
|
|
|
|
|
|
|
|
if err = w.readCheckpoint(dir, (*Watcher).readSegmentForGC); err != nil {
|
|
|
|
|
return errors.Wrap(err, "readCheckpoint")
|
|
|
|
|
return fmt.Errorf("readCheckpoint: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
|
|
|
|
@ -658,7 +658,10 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
|
|
|
|
w.recordDecodeFailsMetric.Inc()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
|
|
|
|
|
if err := r.Err(); err != nil {
|
|
|
|
|
return fmt.Errorf("segment %d: %w", segmentNum, err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Go through all series in a segment updating the segmentNum, so we can delete older series.
|
|
|
|
@ -691,7 +694,10 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error
|
|
|
|
|
w.recordDecodeFailsMetric.Inc()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
|
|
|
|
|
if err := r.Err(); err != nil {
|
|
|
|
|
return fmt.Errorf("segment %d: %w", segmentNum, err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *Watcher) SetStartTime(t time.Time) {
|
|
|
|
@ -706,29 +712,29 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
|
|
|
|
|
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
|
|
|
|
|
index, err := checkpointNum(checkpointDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "checkpointNum")
|
|
|
|
|
return fmt.Errorf("checkpointNum: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Ensure we read the whole contents of every segment in the checkpoint dir.
|
|
|
|
|
segs, err := w.segments(checkpointDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "Unable to get segments checkpoint dir")
|
|
|
|
|
return fmt.Errorf("Unable to get segments checkpoint dir: %w", err)
|
|
|
|
|
}
|
|
|
|
|
for _, seg := range segs {
|
|
|
|
|
size, err := getSegmentSize(checkpointDir, seg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "getSegmentSize")
|
|
|
|
|
return fmt.Errorf("getSegmentSize: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sr, err := OpenReadSegment(SegmentName(checkpointDir, seg))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "unable to open segment")
|
|
|
|
|
return fmt.Errorf("unable to open segment: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer sr.Close()
|
|
|
|
|
|
|
|
|
|
r := NewLiveReader(w.logger, w.readerMetrics, sr)
|
|
|
|
|
if err := readFn(w, r, index, false); errors.Cause(err) != io.EOF && err != nil {
|
|
|
|
|
return errors.Wrap(err, "readSegment")
|
|
|
|
|
if err := readFn(w, r, index, false); err != nil && !errors.Is(err, io.EOF) {
|
|
|
|
|
return fmt.Errorf("readSegment: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if r.Offset() != size {
|
|
|
|
|