diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 11c9bfddc..75403cc4b 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -68,7 +68,8 @@ type Watcher struct { metrics *WatcherMetrics readerMetrics *liveReaderMetrics - StartTime int64 + StartTime int64 + lastSegment int recordsReadMetric *prometheus.CounterVec recordDecodeFailsMetric prometheus.Counter @@ -211,6 +212,9 @@ func (w *Watcher) Run() error { if err != nil { return errors.Wrap(err, "wal.Segments") } + w.lastSegment = lastSegment + + level.Info(w.logger).Log("msg", "replaying WAL", "queue", w.name) // Backfill from the checkpoint first if it exists. lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) @@ -237,7 +241,7 @@ func (w *Watcher) Run() error { // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { + if err := w.watch(currentSegment, currentSegment >= w.lastSegment); err != nil { return err } @@ -451,10 +455,11 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { var ( - dec record.Decoder - series []record.RefSeries - samples []record.RefSample - send []record.RefSample + dec record.Decoder + series []record.RefSeries + samples []record.RefSample + send []record.RefSample + sentSamples bool ) for r.Next() && !isClosed(w.quit) { @@ -483,6 +488,10 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } for _, s := range samples { if s.T > w.StartTime { + if !sentSamples && segmentNum == w.lastSegment { + sentSamples = true + level.Info(w.logger).Log("msg", "done replaying WAL") + } send = append(send, s) } }