|
|
|
@ -68,7 +68,9 @@ type Watcher struct {
|
|
|
|
|
metrics *WatcherMetrics |
|
|
|
|
readerMetrics *liveReaderMetrics |
|
|
|
|
|
|
|
|
|
StartTime int64 |
|
|
|
|
startTime time.Time |
|
|
|
|
startTimestamp int64 // the start time as a Prometheus timestamp
|
|
|
|
|
sendSamples bool |
|
|
|
|
|
|
|
|
|
recordsReadMetric *prometheus.CounterVec |
|
|
|
|
recordDecodeFailsMetric prometheus.Counter |
|
|
|
@ -191,7 +193,7 @@ func (w *Watcher) loop() {
|
|
|
|
|
|
|
|
|
|
// We may encounter failures processing the WAL; we should wait and retry.
|
|
|
|
|
for !isClosed(w.quit) { |
|
|
|
|
w.StartTime = timestamp.FromTime(time.Now()) |
|
|
|
|
w.SetStartTime(time.Now()) |
|
|
|
|
if err := w.Run(); err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) |
|
|
|
|
} |
|
|
|
@ -212,6 +214,12 @@ func (w *Watcher) Run() error {
|
|
|
|
|
return errors.Wrap(err, "wal.Segments") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// We want to ensure this is false across iterations since
|
|
|
|
|
// Run will be called again if there was a failure to read the WAL.
|
|
|
|
|
w.sendSamples = false |
|
|
|
|
|
|
|
|
|
level.Info(w.logger).Log("msg", "replaying WAL", "queue", w.name) |
|
|
|
|
|
|
|
|
|
// Backfill from the checkpoint first if it exists.
|
|
|
|
|
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) |
|
|
|
|
if err != nil && err != record.ErrNotFound { |
|
|
|
@ -456,7 +464,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
|
|
|
|
samples []record.RefSample |
|
|
|
|
send []record.RefSample |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for r.Next() && !isClosed(w.quit) { |
|
|
|
|
rec := r.Record() |
|
|
|
|
w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() |
|
|
|
@ -482,7 +489,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
for _, s := range samples { |
|
|
|
|
if s.T > w.StartTime { |
|
|
|
|
if s.T > w.startTimestamp { |
|
|
|
|
if !w.sendSamples { |
|
|
|
|
w.sendSamples = true |
|
|
|
|
duration := time.Since(w.startTime) |
|
|
|
|
level.Info(w.logger).Log("msg", "done replaying WAL", "duration", duration) |
|
|
|
|
} |
|
|
|
|
send = append(send, s) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -505,6 +517,11 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
|
|
|
|
return r.Err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *Watcher) SetStartTime(t time.Time) { |
|
|
|
|
w.startTime = t |
|
|
|
|
w.startTimestamp = timestamp.FromTime(t) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func recordType(rt record.Type) string { |
|
|
|
|
switch rt { |
|
|
|
|
case record.Invalid: |
|
|
|
|