|
|
|
@ -138,7 +138,6 @@ func (w *WALWatcher) Start() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *WALWatcher) Stop() {
|
|
|
|
|
level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.name)
|
|
|
|
|
close(w.quit)
|
|
|
|
|
<-w.done
|
|
|
|
|
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
|
|
|
|
@ -170,13 +169,13 @@ func (w *WALWatcher) run() error {
|
|
|
|
|
|
|
|
|
|
_, last, err := nw.Segments()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
return errors.Wrap(err, "wal.Segments")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Backfill from the checkpoint first if it exists.
|
|
|
|
|
lastCheckpoint, nextIndex, err := tsdb.LastCheckpoint(w.walDir)
|
|
|
|
|
if err != nil && err != tsdb.ErrNotFound {
|
|
|
|
|
return err
|
|
|
|
|
return errors.Wrap(err, "tsdb.LastCheckpoint")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
@ -194,7 +193,6 @@ func (w *WALWatcher) run() error {
|
|
|
|
|
level.Info(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "startFrom", nextIndex, "currentSegment", currentSegment, "last", last)
|
|
|
|
|
for !isClosed(w.quit) {
|
|
|
|
|
w.currentSegmentMetric.Set(float64(currentSegment))
|
|
|
|
|
level.Info(w.logger).Log("msg", "process segment", "segment", currentSegment)
|
|
|
|
|
|
|
|
|
|
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
|
|
|
|
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
|
|
|
@ -269,8 +267,7 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
|
|
|
|
|
var err error
|
|
|
|
|
size, err = getSegmentSize(w.walDir, segmentNum)
|
|
|
|
|
if err != nil {
|
|
|
|
|
level.Error(w.logger).Log("msg", "error getting segment size", "segment", segmentNum)
|
|
|
|
|
return errors.Wrap(err, "get segment size")
|
|
|
|
|
return errors.Wrap(err, "getSegmentSize")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -446,25 +443,23 @@ func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error {
|
|
|
|
|
|
|
|
|
|
// Read all the series records from a Checkpoint directory.
|
|
|
|
|
func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
|
|
|
|
|
level.Info(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir)
|
|
|
|
|
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir)
|
|
|
|
|
index, err := checkpointNum(checkpointDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
return errors.Wrap(err, "checkpointNum")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sr, err := wal.NewSegmentsReader(checkpointDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "open checkpoint")
|
|
|
|
|
return errors.Wrap(err, "NewSegmentsReader")
|
|
|
|
|
}
|
|
|
|
|
defer sr.Close()
|
|
|
|
|
|
|
|
|
|
size, err := getCheckpointSize(checkpointDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir)
|
|
|
|
|
return errors.Wrap(err, "get checkpoint size")
|
|
|
|
|
return errors.Wrap(err, "getCheckpointSize")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// w.readSeriesRecords(wal.NewLiveReader(sr), i, size)
|
|
|
|
|
r := wal.NewLiveReader(w.logger, sr)
|
|
|
|
|
if err := w.readSegment(r, index); err != io.EOF {
|
|
|
|
|
return errors.Wrap(err, "readSegment")
|
|
|
|
|