Browse Source

Remove some 'global' state, moving segment numbers to parameters.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/5289/head
Tom Wilkie 6 years ago committed by Tom Wilkie
parent
commit
859cda27ff
  1. 81
      storage/remote/wal_watcher.go

81
storage/remote/wal_watcher.go

@ -139,8 +139,6 @@ type WALWatcher struct {
logger log.Logger
walDir string
currentSegment int
lastCheckpoint string
startTime int64
samplesReadMetric prometheus.Counter
@ -153,6 +151,7 @@ type WALWatcher struct {
currentSegmentMetric prometheus.Gauge
quit chan struct{}
done chan struct{}
}
// NewWALWatcher creates a new WAL watcher for a given WriteTo.
@ -167,6 +166,7 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string
startTime: startTime,
name: name,
quit: make(chan struct{}),
done: make(chan struct{}),
}
w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(w.name)
@ -189,11 +189,14 @@ func (w *WALWatcher) Start() {
func (w *WALWatcher) Stop() {
level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.name)
close(w.quit)
<-w.done
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
}
func (w *WALWatcher) loop() {
// We may encourter failures processing the WAL; we should wait and retry.
defer close(w.done)
// We may encourter failures processing the WAL; we should wait and retry.
for {
if err := w.run(); err != nil {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
@ -213,39 +216,42 @@ func (w *WALWatcher) run() error {
return errors.Wrap(err, "wal.New")
}
_, last, err := nw.Segments()
if err != nil {
return err
}
// Backfill from the checkpoint first if it exists.
var nextIndex int
w.lastCheckpoint, nextIndex, err = tsdb.LastCheckpoint(w.walDir)
lastCheckpoint, nextIndex, err := tsdb.LastCheckpoint(w.walDir)
if err != nil && err != tsdb.ErrNotFound {
return err
}
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", w.lastCheckpoint, "startFrom", nextIndex)
level.Info(w.logger).Log("msg", "reading checkpoint", "dir", lastCheckpoint, "startFrom", nextIndex)
if err == nil {
if err = w.readCheckpoint(w.lastCheckpoint); err != nil {
if err = w.readCheckpoint(lastCheckpoint); err != nil {
return err
}
}
w.currentSegment, err = w.findSegmentForIndex(nextIndex)
currentSegment, err := w.findSegmentForIndex(nextIndex)
if err != nil {
return err
}
level.Debug(w.logger).Log("msg", "starting from", "currentSegment", w.currentSegment)
level.Info(w.logger).Log("msg", "starting from", "currentSegment", currentSegment, "last", last)
for {
w.currentSegmentMetric.Set(float64(w.currentSegment))
level.Info(w.logger).Log("msg", "process segment", "segment", w.currentSegment)
w.currentSegmentMetric.Set(float64(currentSegment))
level.Info(w.logger).Log("msg", "process segment", "segment", currentSegment)
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
if err := w.watch(nw, w.currentSegment, true); err != nil {
if err := w.watch(nw, currentSegment, currentSegment >= last); err != nil {
level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err)
return err
}
w.currentSegment++
currentSegment++
}
}
@ -307,9 +313,9 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
segmentTicker.Stop()
checkpointTicker.Stop()
var err error
size, err = getSegmentSize(w.walDir, w.currentSegment)
size, err = getSegmentSize(w.walDir, segmentNum)
if err != nil {
level.Error(w.logger).Log("msg", "error getting segment size", "segment", w.currentSegment)
level.Error(w.logger).Log("msg", "error getting segment size", "segment", segmentNum)
return errors.Wrap(err, "get segment size")
}
}
@ -331,31 +337,25 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
continue
}
if dir == w.lastCheckpoint {
continue
}
level.Info(w.logger).Log("msg", "new checkpoint detected", "last", w.lastCheckpoint, "new", dir)
d, err := checkpointNum(dir)
index, err := checkpointNum(dir)
if err != nil {
level.Error(w.logger).Log("msg", "error parsing checkpoint", "err", err)
continue
}
if d >= w.currentSegment {
level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", w.currentSegment), "checkpoint", dir)
if index >= segmentNum {
level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir)
continue
}
w.lastCheckpoint = dir
level.Info(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum)
// This potentially takes a long time, should we run it in another go routine?
err = w.readCheckpoint(w.lastCheckpoint)
err = w.readCheckpoint(dir)
if err != nil {
level.Error(w.logger).Log("err", err)
}
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
w.writer.SeriesReset(d)
w.writer.SeriesReset(index)
case <-segmentTicker.C:
_, last, err := wl.Segments()
@ -364,35 +364,35 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error {
}
// Check if new segments exists.
if last <= w.currentSegment {
if last <= segmentNum {
continue
}
if err := w.readSegment(reader); err != nil {
if err := w.readSegment(reader, segmentNum); err != nil {
// Ignore errors reading to end of segment, as we're going to move to
// next segment now.
level.Error(w.logger).Log("msg", "error reading to end of segment", "err", err)
}
level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", fmt.Sprintf("%08d", w.currentSegment), "new", fmt.Sprintf("%08d", last))
level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", fmt.Sprintf("%08d", segmentNum), "new", fmt.Sprintf("%08d", last))
return nil
case <-readTicker.C:
if err := w.readSegment(reader); err != nil && err != io.EOF {
if err := w.readSegment(reader, segmentNum); err != nil && err != io.EOF {
level.Error(w.logger).Log("err", err)
return err
}
if reader.TotalRead() >= size && !tail {
level.Info(w.logger).Log("msg", "done replaying segment", "segment", w.currentSegment, "size", size, "read", reader.TotalRead())
level.Info(w.logger).Log("msg", "done replaying segment", "segment", segmentNum, "size", size, "read", reader.TotalRead())
return nil
}
}
}
}
func (w *WALWatcher) readSegment(r *wal.LiveReader) error {
func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int) error {
for r.Next() && !isClosed(w.quit) {
err := w.decodeRecord(r.Record())
err := w.decodeRecord(r.Record(), segmentNum)
// Intentionally skip over record decode errors.
if err != nil {
@ -402,7 +402,7 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader) error {
return r.Err()
}
func (w *WALWatcher) decodeRecord(rec []byte) error {
func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error {
var (
dec tsdb.RecordDecoder
series []tsdb.RefSeries
@ -416,7 +416,7 @@ func (w *WALWatcher) decodeRecord(rec []byte) error {
return err
}
w.seriesReadMetric.Add(float64(len(series)))
w.writer.StoreSeries(series, w.currentSegment)
w.writer.StoreSeries(series, segmentNum)
case tsdb.RecordSamples:
samples, err := dec.Samples(rec, samples[:0])
@ -455,6 +455,11 @@ func (w *WALWatcher) decodeRecord(rec []byte) error {
// Read all the series records from a Checkpoint directory.
func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
level.Info(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir)
index, err := checkpointNum(checkpointDir)
if err != nil {
return err
}
sr, err := wal.NewSegmentsReader(checkpointDir)
if err != nil {
return errors.Wrap(err, "open checkpoint")
@ -469,7 +474,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
// w.readSeriesRecords(wal.NewLiveReader(sr), i, size)
r := wal.NewLiveReader(sr)
if err := w.readSegment(r); err != nil {
if err := w.readSegment(r, index); err != nil {
return errors.Wrap(err, "readSegment")
}

Loading…
Cancel
Save