diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index dc0314e8c..b8c2380bd 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -52,6 +53,13 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) { t.Logf("function returned false") } +// Overwrite readTimeout defined in watcher.go. +func overwriteReadTimeout(t *testing.T, val time.Duration) { + initialVal := readTimeout + readTimeout = val + t.Cleanup(func() { readTimeout = initialVal }) +} + type writeToMock struct { samplesAppended int exemplarsAppended int @@ -302,7 +310,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { } } require.NoError(t, w.Log(recs...)) - readTimeout = time.Second + overwriteReadTimeout(t, time.Second) _, _, err = Segments(w.Dir()) require.NoError(t, err) @@ -394,7 +402,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) - readTimeout = time.Second + overwriteReadTimeout(t, time.Second) wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) go watcher.Start() @@ -607,7 +615,7 @@ func TestCheckpointSeriesReset(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) - readTimeout = time.Second + overwriteReadTimeout(t, time.Second) wt := newWriteToMock(0) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher.MaxSegment = -1 @@ -742,9 +750,6 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { const seriesCount = 10 const samplesCount = 50 - // This test can take longer than intended to finish in cloud CI. - readTimeout := 10 * time.Second - for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { t.Run(string(compress), func(t *testing.T) { dir := t.TempDir() @@ -755,36 +760,50 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) { w, err := NewSize(nil, nil, wdir, segmentSize, compress) require.NoError(t, err) - var wg sync.WaitGroup - // Generate one segment initially to ensure that watcher.Run() finds at least one segment on disk. + // Write to 00000000, the watcher will read series from it. require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount)) - w.NextSegment() // Force creation of the next segment - wg.Add(1) - go func() { - defer wg.Done() - for i := 1; i < segmentsToWrite; i++ { - require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount)) - w.NextSegment() - } - }() + // Create 00000001, the watcher will tail it once started. + w.NextSegment() + // Set up the watcher and run it in the background. wt := newWriteToMock(time.Millisecond) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) + watcher.setMetrics() watcher.MaxSegment = segmentsToRead - watcher.setMetrics() - startTime := time.Now() - err = watcher.Run() - wg.Wait() - require.Less(t, time.Since(startTime), readTimeout) + var g errgroup.Group + g.Go(func() error { + startTime := time.Now() + err = watcher.Run() + if err != nil { + return err + } + // If the watcher was to wait for readTicker to read every new segment, it would need readTimeout * segmentsToRead. + d := time.Since(startTime) + if d > readTimeout { + return fmt.Errorf("watcher ran for %s, it shouldn't rely on readTicker=%s to read the new segments", d, readTimeout) + } + return nil + }) - // But samples records shouldn't get dropped + // The watcher went through 00000000 and is tailing the next one. retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumSeries() > 0 + return wt.checkNumSeries() == seriesCount }) - require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended) - require.NoError(t, err) + // In the meantime, add some new segments in bulk. + // We should end up with segmentsToWrite + 1 segments now. + for i := 1; i < segmentsToWrite; i++ { + require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount)) + w.NextSegment() + } + + // Wait for the watcher. + require.NoError(t, g.Wait()) + + // All series and samples were read. + require.Equal(t, (segmentsToRead+1)*seriesCount, wt.checkNumSeries()) // Series from 00000000 are also read. + require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended) require.NoError(t, w.Close()) }) }