Merge pull request #15403 from bboreham/fix-rw-benchmark-startup

[TESTS] Remote-Write: Fix BenchmarkStartup
pull/15240/merge
Bryan Boreham 2024-11-25 17:31:24 +00:00 committed by GitHub
commit 7996a13fdd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 13 additions and 9 deletions

View File

@ -20,6 +20,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"os" "os"
"path"
"runtime/pprof" "runtime/pprof"
"sort" "sort"
"strconv" "strconv"
@ -48,6 +49,7 @@ import (
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -1407,12 +1409,12 @@ func BenchmarkStartup(b *testing.B) {
// Find the second largest segment; we will replay up to this. // Find the second largest segment; we will replay up to this.
// (Second largest as WALWatcher will start tailing the largest). // (Second largest as WALWatcher will start tailing the largest).
dirents, err := os.ReadDir(dir) dirents, err := os.ReadDir(path.Join(dir, "wal"))
require.NoError(b, err) require.NoError(b, err)
var segments []int var segments []int
for _, dirent := range dirents { for _, dirent := range dirents {
if i, err := strconv.Atoi(dirent.Name()); err != nil { if i, err := strconv.Atoi(dirent.Name()); err == nil {
segments = append(segments, i) segments = append(segments, i)
} }
} }
@ -1424,13 +1426,15 @@ func BenchmarkStartup(b *testing.B) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
watcherMetrics := wlog.NewWatcherMetrics(nil)
c := NewTestBlockedWriteClient() c := NewTestBlockedWriteClient()
// todo: test with new proto type(s) // todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, watcherMetrics, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1) cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.MaxSegment = segments[len(segments)-2]
m.watcher.SetMetrics()
err := m.watcher.Run() err := m.watcher.Run()
require.NoError(b, err) require.NoError(b, err)
} }

View File

@ -206,7 +206,7 @@ func (w *Watcher) Notify() {
} }
} }
func (w *Watcher) setMetrics() { func (w *Watcher) SetMetrics() {
// Setup the WAL Watchers metrics. We do this here rather than in the // Setup the WAL Watchers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's, // constructor because of the ordering of creating Queue Managers's,
// stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig. // stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig.
@ -221,7 +221,7 @@ func (w *Watcher) setMetrics() {
// Start the Watcher. // Start the Watcher.
func (w *Watcher) Start() { func (w *Watcher) Start() {
w.setMetrics() w.SetMetrics()
w.logger.Info("Starting WAL watcher", "queue", w.name) w.logger.Info("Starting WAL watcher", "queue", w.name)
go w.loop() go w.loop()

View File

@ -234,7 +234,7 @@ func TestTailSamples(t *testing.T) {
watcher.SetStartTime(now) watcher.SetStartTime(now)
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics() watcher.SetMetrics()
for i := first; i <= last; i++ { for i := first; i <= last; i++ {
segment, err := OpenReadSegment(SegmentName(watcher.walDir, i)) segment, err := OpenReadSegment(SegmentName(watcher.walDir, i))
require.NoError(t, err) require.NoError(t, err)
@ -548,7 +548,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
watcher.MaxSegment = -1 watcher.MaxSegment = -1
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics() watcher.SetMetrics()
lastCheckpoint, _, err := LastCheckpoint(watcher.walDir) lastCheckpoint, _, err := LastCheckpoint(watcher.walDir)
require.NoError(t, err) require.NoError(t, err)
@ -699,7 +699,7 @@ func TestRun_StartupTime(t *testing.T) {
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.MaxSegment = segments watcher.MaxSegment = segments
watcher.setMetrics() watcher.SetMetrics()
startTime := time.Now() startTime := time.Now()
err = watcher.Run() err = watcher.Run()
@ -768,7 +768,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
// Set up the watcher and run it in the background. // Set up the watcher and run it in the background.
wt := newWriteToMock(time.Millisecond) wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.setMetrics() watcher.SetMetrics()
watcher.MaxSegment = segmentsToRead watcher.MaxSegment = segmentsToRead
var g errgroup.Group var g errgroup.Group