From 0c71230784368da829f1f02d412d181d7a06aee6 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 21 Feb 2024 17:09:07 -0800 Subject: [PATCH] fix bug that would cause us to endlessly fall behind (#13583) * fix bug that would cause us to only read from the WAL on the 15s fallback timer if remote write had fallen behind and is no longer reading from the WAL segment that is currently being written to Signed-off-by: Callum Styan * remove unintended logging, fix lint, plus allow test to take slightly longer because cloud CI Signed-off-by: Callum Styan * address review feedback Signed-off-by: Callum Styan * fix watcher sleeps in test, flu brain is smooth Signed-off-by: Callum Styan * increase timeout, unfortunately cloud CI can require a longer timeout Signed-off-by: Callum Styan --------- Signed-off-by: Callum Styan --- tsdb/wlog/watcher.go | 16 +++-- tsdb/wlog/watcher_test.go | 125 +++++++++++++++++++++++++++++++++++--- 2 files changed, 128 insertions(+), 13 deletions(-) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 1c76e3887..525515221 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -262,10 +262,8 @@ func (w *Watcher) loop() { // Run the watcher, which will tail the WAL until the quit channel is closed // or an error case is hit. func (w *Watcher) Run() error { - _, lastSegment, err := w.firstAndLast() - if err != nil { - return fmt.Errorf("wal.Segments: %w", err) - } + var lastSegment int + var err error // We want to ensure this is false across iterations since // Run will be called again if there was a failure to read the WAL. @@ -296,9 +294,17 @@ func (w *Watcher) Run() error { w.currentSegmentMetric.Set(float64(currentSegment)) level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment) + // Reset the value of lastSegment each iteration, this is to avoid having to wait too long for + // between reads if we're reading a segment that is not the most recent segment after startup. + _, lastSegment, err = w.firstAndLast() + if err != nil { + return fmt.Errorf("wal.Segments: %w", err) + } + tail := currentSegment >= lastSegment + // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) { + if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) { return err } diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index b10f8f8f8..871640a7c 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -58,29 +58,47 @@ type writeToMock struct { floatHistogramsAppended int seriesLock sync.Mutex seriesSegmentIndexes map[chunks.HeadSeriesRef]int + + // delay reads with a short sleep + delay time.Duration } func (wtm *writeToMock) Append(s []record.RefSample) bool { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.samplesAppended += len(s) return true } func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.exemplarsAppended += len(e) return true } func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.histogramsAppended += len(h) return true } func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.floatHistogramsAppended += len(fh) return true } func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { + if wtm.delay > 0 { + time.Sleep(wtm.delay) + } wtm.UpdateSeriesSegment(series, index) } @@ -110,9 +128,10 @@ func (wtm *writeToMock) checkNumSeries() int { return len(wtm.seriesSegmentIndexes) } -func newWriteToMock() *writeToMock { +func newWriteToMock(delay time.Duration) *writeToMock { return &writeToMock{ seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), + delay: delay, } } @@ -209,7 +228,7 @@ func TestTailSamples(t *testing.T) { first, last, err := Segments(w.Dir()) require.NoError(t, err) - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true) watcher.SetStartTime(now) @@ -294,7 +313,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() @@ -383,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) readTimeout = time.Second - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() @@ -454,7 +473,7 @@ func TestReadCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() @@ -523,7 +542,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { require.NoError(t, err) } - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = -1 @@ -596,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) { require.NoError(t, err) readTimeout = time.Second - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = -1 go watcher.Start() @@ -675,7 +694,7 @@ func TestRun_StartupTime(t *testing.T) { } require.NoError(t, w.Close()) - wt := newWriteToMock() + wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = segments @@ -688,3 +707,93 @@ func TestRun_StartupTime(t *testing.T) { }) } } + +func TestRun_AvoidNotifyWhenBehind(t *testing.T) { + const pageSize = 32 * 1024 + const segments = 10 + const seriesCount = 20 + const samplesCount = 300 + + // This test can take longer than intended to finish in cloud CI. + readTimeout := 10 * time.Second + + for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { + t.Run(string(compress), func(t *testing.T) { + dir := t.TempDir() + + wdir := path.Join(dir, "wal") + err := os.Mkdir(wdir, 0o777) + require.NoError(t, err) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, pageSize, compress) + require.NoError(t, err) + var wg sync.WaitGroup + // add one segment initially to ensure there's a value > 0 for the last segment id + for i := 0; i < 1; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + } + } + wg.Add(1) + go func() { + defer wg.Done() + for i := 1; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ + { + Ref: chunks.HeadSeriesRef(ref), + Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)), + }, + }, nil) + require.NoError(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + { + Ref: chunks.HeadSeriesRef(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + require.NoError(t, w.Log(sample)) + } + } + } + }() + + wt := newWriteToMock(time.Millisecond) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher.MaxSegment = segments + + watcher.setMetrics() + startTime := time.Now() + err = watcher.Run() + wg.Wait() + require.Less(t, time.Since(startTime), readTimeout) + require.NoError(t, err) + require.NoError(t, w.Close()) + }) + } +}