From 37e35f9e0cb3ea4452fec8dcf5f26cdedcc16b4c Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 18 Jan 2019 12:31:36 -0800 Subject: [PATCH] Various improvements to WAL based remote write. - Use the queue name in WAL watcher logging. - Don't return from watch if the reader error was EOF. - Fix sample timestamp check logic regarding what samples we send. - Refactor so we don't need readToEnd/readSeriesRecords - Fix wal_watcher tests since readToEnd no longer exists Signed-off-by: Callum Styan --- storage/remote/queue_manager.go | 1 + storage/remote/wal_watcher.go | 233 +++++++++++------------------ storage/remote/wal_watcher_test.go | 146 ++++++++++++++---- 3 files changed, 202 insertions(+), 178 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a81442112..e91c6e86d 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -268,6 +268,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high sentBatchDuration.WithLabelValues(t.queueName) succeededSamplesTotal.WithLabelValues(t.queueName) failedSamplesTotal.WithLabelValues(t.queueName) + droppedSamplesTotal.WithLabelValues(t.queueName) retriedSamplesTotal.WithLabelValues(t.queueName) // Reset pending samples metric to 0. t.pendingSamplesMetric.Set(0) diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 5faf843d0..0568fc14b 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -15,6 +15,8 @@ package remote import ( "fmt" + "io" + "math" "os" "path" "strconv" @@ -162,17 +164,18 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string writer: writer, walDir: path.Join(walDir, "wal"), startTime: startTime, + name: name, quit: make(chan struct{}), } - w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(name) - w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(name) - w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(name) - w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(name) - w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(name) - w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(name) - w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(name) - w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(name) + w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(w.name) + w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(w.name) + w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(w.name) + w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(w.name) + w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(w.name) + w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.name) + w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.name) + w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.name) return w } @@ -214,23 +217,46 @@ func (w *WALWatcher) runWatcher() { return } - // Read series records in the current WAL and latest checkpoint, get the segment pointer back. - // TODO: callum, handle maintaining the WAL pointer somehow across apply configs? - segment, reader, err := w.readToEnd(w.walDir, first, last) + // Backfill from the checkpoint first if it exists. + dir, _, err := tsdb.LastCheckpoint(w.walDir) + if err != nil && err != tsdb.ErrNotFound { + level.Error(w.logger).Log("msg", "error looking for existing checkpoint, some samples may be dropped", "err", errors.Wrap(err, "find last checkpoint")) + } + + level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir) + if err == nil { + w.lastCheckpoint = dir + err = w.readCheckpoint(dir) + if err != nil { + level.Error(w.logger).Log("msg", "error reading existing checkpoint, some samples may be dropped", "err", err) + } + } + + w.currentSegment = first + w.currentSegmentMetric.Set(float64(w.currentSegment)) + segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment)) + // TODO: callum, is this error really fatal? if err != nil { level.Error(w.logger).Log("err", err) return } - - w.currentSegment = last - w.currentSegmentMetric.Set(float64(w.currentSegment)) + reader := wal.NewLiveReader(segment) + tail := false for { - level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment) + // If we've replayed the existing WAL, start tailing. + if w.currentSegment == last { + tail = true + } + if tail { + level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment) + } else { + level.Info(w.logger).Log("msg", "replaying segment", "segment", w.currentSegment) + } // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - err := w.watch(nw, reader) + err := w.watch(nw, reader, tail) segment.Close() if err != nil { level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err) @@ -241,126 +267,19 @@ func (w *WALWatcher) runWatcher() { w.currentSegmentMetric.Set(float64(w.currentSegment)) segment, err = wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment)) - reader = wal.NewLiveReader(segment) // TODO: callum, is this error really fatal? if err != nil { level.Error(w.logger).Log("err", err) return } + reader = wal.NewLiveReader(segment) } } -// When starting the WAL watcher, there is potentially an existing WAL. In that case, we -// should read to the end of the newest existing segment before reading new records that -// are written to it, storing data from series records along the way. -// Unfortunately this function is duplicates some of TSDB Head.Init(). -func (w *WALWatcher) readToEnd(walDir string, firstSegment, lastSegment int) (*wal.Segment, *wal.LiveReader, error) { - // Backfill from the checkpoint first if it exists. - defer level.Debug(w.logger).Log("msg", "done reading existing WAL") - dir, startFrom, err := tsdb.LastCheckpoint(walDir) - if err != nil && err != tsdb.ErrNotFound { - return nil, nil, errors.Wrap(err, "find last checkpoint") - } - - level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir) - if err == nil { - w.lastCheckpoint = dir - err = w.readCheckpoint(dir) - if err != nil { - return nil, nil, err - } - startFrom++ - } - - // Backfill segments from the last checkpoint onwards if at least 2 segments exist. - if lastSegment > 0 { - for i := firstSegment; i < lastSegment; i++ { - seg, err := wal.OpenReadSegment(wal.SegmentName(walDir, i)) - if err != nil { - return nil, nil, err - } - sz, _ := getSegmentSize(walDir, i) - w.readSeriesRecords(wal.NewLiveReader(seg), i, sz) - } - } - - // We want to start the WAL Watcher from the end of the last segment on start, - // so we make sure to return the wal.Segment pointer - segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, lastSegment)) - if err != nil { - return nil, nil, err - } - - r := wal.NewLiveReader(segment) - sz, _ := getSegmentSize(walDir, lastSegment) - w.readSeriesRecords(r, lastSegment, sz) - return segment, r, nil -} - -// TODO: fix the exit logic for this function -// The stop param is used to stop at the end of the existing WAL on startup, -// since scraped samples may be written to the latest segment before we finish reading it. -func (w *WALWatcher) readSeriesRecords(r *wal.LiveReader, index int, stop int64) { - var ( - dec tsdb.RecordDecoder - series []tsdb.RefSeries - samples []tsdb.RefSample - ret bool - ) - - for r.Next() && !isClosed(w.quit) { - series = series[:0] - rec := r.Record() - // If the timestamp is > start then we should Append this sample and exit readSeriesRecords, - // because this is the first sample written to the WAL after the WAL watcher was started. - typ := dec.Type(rec) - if typ == tsdb.RecordSamples { - samples, err := dec.Samples(rec, samples[:0]) - if err != nil { - continue - } - for _, s := range samples { - if s.T > w.startTime { - w.writer.Append(samples) - ret = true - w.samplesSentPreTailing.Inc() - } - } - if ret { - level.Info(w.logger).Log("msg", "found a sample with a timestamp after the WAL watcher start") - level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index) - return - } - } - if typ != tsdb.RecordSeries { - continue - } - - series, err := dec.Series(rec, nil) - if err != nil { - level.Error(log.With(w.logger)).Log("err", err) - break - } - - w.writer.StoreSeries(series, index) - } - - // Since we only call readSeriesRecords on fully written WAL segments or checkpoints, - // Error() will only return an error if something actually went wrong when reading - // a record, either it was invalid or it was only partially written to the WAL. - if err := r.Err(); err != nil { - level.Error(w.logger).Log("err", err) - return - } - - // Ensure we read all of the bytes in the segment or checkpoint. - if r.TotalRead() >= stop { - level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index) - return - } -} - -func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error { +// Use tail true to indicate thatreader is currently on a segment that is +// actively being written to. If false, assume it's a full segment and we're +// replaying it on start to cache the series records. +func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader, tail bool) error { readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() @@ -370,6 +289,19 @@ func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error { segmentTicker := time.NewTicker(segmentCheckPeriod) defer segmentTicker.Stop() + // If we're replaying the segment we need to know the size of the file to know + // when to return from watch and move on to the next segment. + size := int64(math.MaxInt64) + if !tail { + segmentTicker.Stop() + checkpointTicker.Stop() + var err error + size, err = getSegmentSize(w.walDir, w.currentSegment) + if err != nil { + level.Error(w.logger).Log("msg", "error getting segment size", "segment", w.currentSegment) + return errors.Wrap(err, "get segment size") + } + } for { select { @@ -411,14 +343,16 @@ func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error { if err != nil { level.Error(w.logger).Log("err", err) } + // Clear series with a checkpoint or segment index # lower than the checkpoint we just read. + w.writer.SeriesReset(d) case <-segmentTicker.C: - // check if new segments exist _, last, err := wl.Segments() if err != nil { return errors.Wrap(err, "segments") } + // Check if new segments exists. if last <= w.currentSegment { continue } @@ -433,10 +367,14 @@ func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error { return nil case <-readTicker.C: - if err := w.readSegment(reader); err != nil { + if err := w.readSegment(reader); err != nil && err != io.EOF { level.Error(w.logger).Log("err", err) return err } + if reader.TotalRead() >= size && !tail { + level.Info(w.logger).Log("msg", "done replaying segment", "segment", w.currentSegment, "size", size, "read", reader.TotalRead()) + return nil + } } } } @@ -475,9 +413,19 @@ func (w *WALWatcher) decodeRecord(rec []byte) error { w.recordDecodeFailsMetric.Inc() return err } - w.samplesReadMetric.Add(float64(len(samples))) - // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). - w.writer.Append(samples) + var send []tsdb.RefSample + for _, s := range samples { + if s.T > w.startTime { + send = append(send, s) + } + } + if len(send) > 0 { + // We don't want to count samples read prior to the starting timestamp + // so that we can compare samples in vs samples read and succeeded samples. + w.samplesReadMetric.Add(float64(len(samples))) + // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). + w.writer.Append(send) + } case tsdb.RecordTombstones: w.tombstonesReadMetric.Add(float64(len(samples))) @@ -495,31 +443,26 @@ func (w *WALWatcher) decodeRecord(rec []byte) error { // Read all the series records from a Checkpoint directory. func (w *WALWatcher) readCheckpoint(checkpointDir string) error { + level.Info(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir) sr, err := wal.NewSegmentsReader(checkpointDir) if err != nil { return errors.Wrap(err, "open checkpoint") } defer sr.Close() - split := strings.Split(checkpointDir, ".") - if len(split) != 2 { - return errors.Errorf("checkpoint dir name is not in the right format: %s", checkpointDir) - } - - i, err := strconv.Atoi(split[1]) - if err != nil { - i = w.currentSegment - 1 - } - size, err := getCheckpointSize(checkpointDir) if err != nil { level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir) return errors.Wrap(err, "get checkpoint size") } - w.readSeriesRecords(wal.NewLiveReader(sr), i, size) + // w.readSeriesRecords(wal.NewLiveReader(sr), i, size) + r := wal.NewLiveReader(sr) + w.readSegment(r) + if r.TotalRead() != size { + level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint") + } level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) - w.writer.SeriesReset(i) return nil } diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index 0b1cced9a..f16c8f0ce 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -18,6 +18,7 @@ import ( "math/rand" "os" "path" + "sync" "testing" "time" @@ -33,6 +34,7 @@ import ( type writeToMock struct { samplesAppended int seriesLabels map[uint64][]prompb.Label + seriesLock sync.Mutex seriesSegmentIndexes map[uint64]int } @@ -51,8 +53,8 @@ func (wtm *writeToMock) StoreSeries(series []tsdb.RefSeries, index int) { temp[s.Ref] = labelsetToLabelsProto(ls) } - // wtm.seriesMtx.Lock() - // defer t.seriesMtx.Unlock() + wtm.seriesLock.Lock() + defer wtm.seriesLock.Unlock() for ref, labels := range temp { wtm.seriesLabels[ref] = labels wtm.seriesSegmentIndexes[ref] = index @@ -62,6 +64,8 @@ func (wtm *writeToMock) StoreSeries(series []tsdb.RefSeries, index int) { func (wtm *writeToMock) SeriesReset(index int) { // Check for series that are in segments older than the checkpoint // that were not also present in the checkpoint. + wtm.seriesLock.Lock() + defer wtm.seriesLock.Unlock() for k, v := range wtm.seriesSegmentIndexes { if v < index { delete(wtm.seriesLabels, k) @@ -70,6 +74,12 @@ func (wtm *writeToMock) SeriesReset(index int) { } } +func (wtm *writeToMock) checkNumLabels() int { + wtm.seriesLock.Lock() + defer wtm.seriesLock.Unlock() + return len(wtm.seriesLabels) +} + func newWriteToMock() *writeToMock { return &writeToMock{ seriesLabels: make(map[uint64][]prompb.Label), @@ -77,12 +87,6 @@ func newWriteToMock() *writeToMock { } } -// we need a way to check the value of the wal watcher records read metrics, the samples and series records -// with these we could write some example segments and checkpoints and then write tests for readSegment/watch -// to see if we get back the write number of series records/samples records/etc., and that we read a whole checkpoint -// on startup and when a new one is created -// -// we could do the same thing for readToEnd, readCheckpoint, readSeriesRecords, etc. func Test_readToEnd_noCheckpoint(t *testing.T) { pageSize := 32 * 1024 const seriesCount = 10 @@ -98,7 +102,6 @@ func Test_readToEnd_noCheckpoint(t *testing.T) { w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) testutil.Ok(t, err) - // var input [][]byte var recs [][]byte enc := tsdb.RecordEncoder{} @@ -131,15 +134,28 @@ func Test_readToEnd_noCheckpoint(t *testing.T) { } testutil.Ok(t, w.Log(recs...)) - first, last, err := w.Segments() + _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() st := timestamp.FromTime(time.Now()) watcher := NewWALWatcher(nil, "", wt, dir, st) - _, _, err = watcher.readToEnd(wdir, first, last) - testutil.Ok(t, err) - testutil.Equals(t, seriesCount, len(wt.seriesLabels)) + go watcher.Start() + i := 0 + ticker := time.NewTicker(100 * time.Millisecond) + for range ticker.C { + if wt.checkNumLabels() >= seriesCount*10*2 { + break + } + i++ + if i >= 10 { + break + } + + } + watcher.Stop() + ticker.Stop() + testutil.Equals(t, seriesCount, wt.checkNumLabels()) } func Test_readToEnd_withCheckpoint(t *testing.T) { @@ -161,7 +177,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) testutil.Ok(t, err) - // write to the initial segment then checkpoint + // Write to the initial segment then checkpoint. for i := 0; i < seriesCount*10; i++ { ref := i + 100 series := enc.Series([]tsdb.RefSeries{ @@ -187,7 +203,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) w.Truncate(32) - // write more records after checkpointing + // Write more records after checkpointing. for i := 0; i < seriesCount*10; i++ { series := enc.Series([]tsdb.RefSeries{ tsdb.RefSeries{ @@ -209,15 +225,27 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { } } - first, last, err := w.Segments() + _, _, err = w.Segments() testutil.Ok(t, err) - wt := newWriteToMock() st := timestamp.FromTime(time.Now()) watcher := NewWALWatcher(nil, "", wt, dir, st) - _, _, err = watcher.readToEnd(wdir, first, last) - testutil.Ok(t, err) - testutil.Equals(t, seriesCount*10*2, len(wt.seriesLabels)) + go watcher.Start() + i := 0 + ticker := time.NewTicker(100 * time.Millisecond) + for range ticker.C { + if wt.checkNumLabels() >= seriesCount*10*2 { + break + } + i++ + if i >= 20 { + break + } + + } + watcher.Stop() + ticker.Stop() + testutil.Equals(t, seriesCount*10*2, wt.checkNumLabels()) } func Test_readCheckpoint(t *testing.T) { @@ -239,7 +267,7 @@ func Test_readCheckpoint(t *testing.T) { w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) testutil.Ok(t, err) - // write to the initial segment then checkpoint + // Write to the initial segment then checkpoint. for i := 0; i < seriesCount*10; i++ { ref := i + 100 series := enc.Series([]tsdb.RefSeries{ @@ -265,15 +293,29 @@ func Test_readCheckpoint(t *testing.T) { tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) w.Truncate(32) - first, last, err := w.Segments() + // Start read after checkpoint, no more data written. + _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() st := timestamp.FromTime(time.Now()) watcher := NewWALWatcher(nil, "", wt, dir, st) - _, _, err = watcher.readToEnd(wdir, first, last) - testutil.Ok(t, err) - testutil.Equals(t, seriesCount*10, len(wt.seriesLabels)) + go watcher.Start() + i := 0 + ticker := time.NewTicker(100 * time.Millisecond) + for range ticker.C { + if wt.checkNumLabels() >= seriesCount*10*2 { + break + } + i++ + if i >= 8 { + break + } + + } + watcher.Stop() + ticker.Stop() + testutil.Equals(t, seriesCount*10, wt.checkNumLabels()) } func Test_checkpoint_seriesReset(t *testing.T) { @@ -291,10 +333,9 @@ func Test_checkpoint_seriesReset(t *testing.T) { enc := tsdb.RecordEncoder{} w, err := wal.NewSize(nil, nil, wdir, pageSize) - // w. testutil.Ok(t, err) - // write to the initial segment then checkpoint + // Write to the initial segment, then checkpoint later. for i := 0; i < seriesCount*10; i++ { ref := i + 100 series := enc.Series([]tsdb.RefSeries{ @@ -318,15 +359,29 @@ func Test_checkpoint_seriesReset(t *testing.T) { } } - first, last, err := w.Segments() + _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() st := timestamp.FromTime(time.Now()) watcher := NewWALWatcher(nil, "", wt, dir, st) - _, _, err = watcher.readToEnd(wdir, first, last) - testutil.Ok(t, err) - testutil.Equals(t, seriesCount*10, len(wt.seriesLabels)) + go watcher.Start() + i := 0 + ticker := time.NewTicker(100 * time.Millisecond) + for range ticker.C { + if wt.checkNumLabels() >= seriesCount*10*2 { + break + } + + i++ + if i >= 50 { + break + } + + } + watcher.Stop() + ticker.Stop() + testutil.Equals(t, seriesCount*10, wt.checkNumLabels()) // If you modify the checkpoint and truncate segment #'s run the test to see how // many series records you end up with and change the last Equals check accordingly @@ -347,8 +402,8 @@ func Test_decodeRecord(t *testing.T) { defer os.RemoveAll(dir) wt := newWriteToMock() - st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, "", wt, dir, st) + // st := timestamp.FromTime(time.Now().Add(-10 * time.Second)) + watcher := NewWALWatcher(nil, "", wt, dir, 0) // decode a series record enc := tsdb.RecordEncoder{} @@ -365,3 +420,28 @@ func Test_decodeRecord(t *testing.T) { testutil.Equals(t, 2, wt.samplesAppended) } + +func Test_decodeRecord_afterStart(t *testing.T) { + dir, err := ioutil.TempDir("", "decodeRecord") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wt := newWriteToMock() + // st := timestamp.FromTime(time.Now().Add(-10 * time.Second)) + watcher := NewWALWatcher(nil, "", wt, dir, 1) + + // decode a series record + enc := tsdb.RecordEncoder{} + buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil) + watcher.decodeRecord(buf) + testutil.Ok(t, err) + + testutil.Equals(t, 1, len(wt.seriesLabels)) + + // decode a samples record + buf = enc.Samples([]tsdb.RefSample{tsdb.RefSample{Ref: 100, T: 1, V: 1.0}, tsdb.RefSample{Ref: 100, T: 2, V: 2.0}}, nil) + watcher.decodeRecord(buf) + testutil.Ok(t, err) + + testutil.Equals(t, 1, wt.samplesAppended) +}