From 859cda27fff7c314db751f6f5f9ffaeea44122f9 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 13 Feb 2019 17:06:03 +0000 Subject: [PATCH] Remove some 'global' state, moving segment numbers to parameters. Signed-off-by: Tom Wilkie --- storage/remote/wal_watcher.go | 83 +++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 01bd6e451..638ee2d32 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -139,9 +139,7 @@ type WALWatcher struct { logger log.Logger walDir string - currentSegment int - lastCheckpoint string - startTime int64 + startTime int64 samplesReadMetric prometheus.Counter seriesReadMetric prometheus.Counter @@ -153,6 +151,7 @@ type WALWatcher struct { currentSegmentMetric prometheus.Gauge quit chan struct{} + done chan struct{} } // NewWALWatcher creates a new WAL watcher for a given WriteTo. @@ -167,6 +166,7 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string startTime: startTime, name: name, quit: make(chan struct{}), + done: make(chan struct{}), } w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(w.name) @@ -189,11 +189,14 @@ func (w *WALWatcher) Start() { func (w *WALWatcher) Stop() { level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.name) close(w.quit) + <-w.done + level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) } func (w *WALWatcher) loop() { - // We may encourter failures processing the WAL; we should wait and retry. + defer close(w.done) + // We may encourter failures processing the WAL; we should wait and retry. for { if err := w.run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) @@ -213,39 +216,42 @@ func (w *WALWatcher) run() error { return errors.Wrap(err, "wal.New") } + _, last, err := nw.Segments() + if err != nil { + return err + } + // Backfill from the checkpoint first if it exists. - var nextIndex int - w.lastCheckpoint, nextIndex, err = tsdb.LastCheckpoint(w.walDir) + lastCheckpoint, nextIndex, err := tsdb.LastCheckpoint(w.walDir) if err != nil && err != tsdb.ErrNotFound { return err } - level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", w.lastCheckpoint, "startFrom", nextIndex) + level.Info(w.logger).Log("msg", "reading checkpoint", "dir", lastCheckpoint, "startFrom", nextIndex) if err == nil { - if err = w.readCheckpoint(w.lastCheckpoint); err != nil { + if err = w.readCheckpoint(lastCheckpoint); err != nil { return err } } - w.currentSegment, err = w.findSegmentForIndex(nextIndex) + currentSegment, err := w.findSegmentForIndex(nextIndex) if err != nil { return err } - level.Debug(w.logger).Log("msg", "starting from", "currentSegment", w.currentSegment) - + level.Info(w.logger).Log("msg", "starting from", "currentSegment", currentSegment, "last", last) for { - w.currentSegmentMetric.Set(float64(w.currentSegment)) - level.Info(w.logger).Log("msg", "process segment", "segment", w.currentSegment) + w.currentSegmentMetric.Set(float64(currentSegment)) + level.Info(w.logger).Log("msg", "process segment", "segment", currentSegment) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(nw, w.currentSegment, true); err != nil { + if err := w.watch(nw, currentSegment, currentSegment >= last); err != nil { level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err) return err } - w.currentSegment++ + currentSegment++ } } @@ -307,9 +313,9 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { segmentTicker.Stop() checkpointTicker.Stop() var err error - size, err = getSegmentSize(w.walDir, w.currentSegment) + size, err = getSegmentSize(w.walDir, segmentNum) if err != nil { - level.Error(w.logger).Log("msg", "error getting segment size", "segment", w.currentSegment) + level.Error(w.logger).Log("msg", "error getting segment size", "segment", segmentNum) return errors.Wrap(err, "get segment size") } } @@ -331,31 +337,25 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { continue } - if dir == w.lastCheckpoint { - continue - } - - level.Info(w.logger).Log("msg", "new checkpoint detected", "last", w.lastCheckpoint, "new", dir) - - d, err := checkpointNum(dir) + index, err := checkpointNum(dir) if err != nil { level.Error(w.logger).Log("msg", "error parsing checkpoint", "err", err) continue } - if d >= w.currentSegment { - level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", w.currentSegment), "checkpoint", dir) + if index >= segmentNum { + level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir) continue } - w.lastCheckpoint = dir + level.Info(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum) // This potentially takes a long time, should we run it in another go routine? - err = w.readCheckpoint(w.lastCheckpoint) + err = w.readCheckpoint(dir) if err != nil { level.Error(w.logger).Log("err", err) } // Clear series with a checkpoint or segment index # lower than the checkpoint we just read. - w.writer.SeriesReset(d) + w.writer.SeriesReset(index) case <-segmentTicker.C: _, last, err := wl.Segments() @@ -364,35 +364,35 @@ func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { } // Check if new segments exists. - if last <= w.currentSegment { + if last <= segmentNum { continue } - if err := w.readSegment(reader); err != nil { + if err := w.readSegment(reader, segmentNum); err != nil { // Ignore errors reading to end of segment, as we're going to move to // next segment now. level.Error(w.logger).Log("msg", "error reading to end of segment", "err", err) } - level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", fmt.Sprintf("%08d", w.currentSegment), "new", fmt.Sprintf("%08d", last)) + level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", fmt.Sprintf("%08d", segmentNum), "new", fmt.Sprintf("%08d", last)) return nil case <-readTicker.C: - if err := w.readSegment(reader); err != nil && err != io.EOF { + if err := w.readSegment(reader, segmentNum); err != nil && err != io.EOF { level.Error(w.logger).Log("err", err) return err } if reader.TotalRead() >= size && !tail { - level.Info(w.logger).Log("msg", "done replaying segment", "segment", w.currentSegment, "size", size, "read", reader.TotalRead()) + level.Info(w.logger).Log("msg", "done replaying segment", "segment", segmentNum, "size", size, "read", reader.TotalRead()) return nil } } } } -func (w *WALWatcher) readSegment(r *wal.LiveReader) error { +func (w *WALWatcher) readSegment(r *wal.LiveReader, segmentNum int) error { for r.Next() && !isClosed(w.quit) { - err := w.decodeRecord(r.Record()) + err := w.decodeRecord(r.Record(), segmentNum) // Intentionally skip over record decode errors. if err != nil { @@ -402,7 +402,7 @@ func (w *WALWatcher) readSegment(r *wal.LiveReader) error { return r.Err() } -func (w *WALWatcher) decodeRecord(rec []byte) error { +func (w *WALWatcher) decodeRecord(rec []byte, segmentNum int) error { var ( dec tsdb.RecordDecoder series []tsdb.RefSeries @@ -416,7 +416,7 @@ func (w *WALWatcher) decodeRecord(rec []byte) error { return err } w.seriesReadMetric.Add(float64(len(series))) - w.writer.StoreSeries(series, w.currentSegment) + w.writer.StoreSeries(series, segmentNum) case tsdb.RecordSamples: samples, err := dec.Samples(rec, samples[:0]) @@ -455,6 +455,11 @@ func (w *WALWatcher) decodeRecord(rec []byte) error { // Read all the series records from a Checkpoint directory. func (w *WALWatcher) readCheckpoint(checkpointDir string) error { level.Info(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir) + index, err := checkpointNum(checkpointDir) + if err != nil { + return err + } + sr, err := wal.NewSegmentsReader(checkpointDir) if err != nil { return errors.Wrap(err, "open checkpoint") @@ -469,7 +474,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { // w.readSeriesRecords(wal.NewLiveReader(sr), i, size) r := wal.NewLiveReader(sr) - if err := w.readSegment(r); err != nil { + if err := w.readSegment(r, index); err != nil { return errors.Wrap(err, "readSegment") }