From 0cc99e677ad3da2cf00599cb0e6c272ab58688f1 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 7 May 2024 18:14:22 +0200 Subject: [PATCH] promql.Engine: Add Close method Signed-off-by: Arve Knudsen --- cmd/prometheus/main.go | 9 +++++ promql/bench_test.go | 8 ++++ promql/engine.go | 11 ++++++ promql/engine_test.go | 71 ++++++++++++++++++++++------------ promql/functions_test.go | 3 ++ promql/promql_test.go | 9 +++-- promql/promqltest/test.go | 14 +++++-- promql/promqltest/test_test.go | 2 +- rules/alerting_test.go | 70 +++++++++++++++++++++------------ rules/manager_test.go | 36 ++++++++++++++--- rules/recording_test.go | 12 ++++-- web/api/v1/api_test.go | 48 ++++++++++++++--------- web/api/v1/errors_test.go | 9 ++++- 13 files changed, 218 insertions(+), 84 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f2988b2f2..dfc499c63 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -955,12 +955,18 @@ func main() { listener, err := webHandler.Listener() if err != nil { level.Error(logger).Log("msg", "Unable to start web listener", "err", err) + if err := queryEngine.Close(); err != nil { + level.Warn(logger).Log("msg", "Closing query engine failed", "err", err) + } os.Exit(1) } err = toolkit_web.Validate(*webConfig) if err != nil { level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err) + if err := queryEngine.Close(); err != nil { + level.Warn(logger).Log("msg", "Closing query engine failed", "err", err) + } os.Exit(1) } @@ -982,6 +988,9 @@ func main() { case <-cancel: reloadReady.Close() } + if err := queryEngine.Close(); err != nil { + level.Warn(logger).Log("msg", "Closing query engine failed", "err", err) + } return nil }, func(err error) { diff --git a/promql/bench_test.go b/promql/bench_test.go index 9a8529091..eca721d56 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" @@ -257,6 +259,9 @@ func BenchmarkRangeQuery(b *testing.B) { Timeout: 100 * time.Second, } engine := promql.NewEngine(opts) + b.Cleanup(func() { + require.NoError(b, engine.Close()) + }) const interval = 10000 // 10s interval. // A day of data plus 10k steps. @@ -340,6 +345,9 @@ func BenchmarkNativeHistograms(b *testing.B) { for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { ng := promql.NewEngine(opts) + b.Cleanup(func() { + require.NoError(b, ng.Close()) + }) for i := 0; i < b.N; i++ { qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step) if err != nil { diff --git a/promql/engine.go b/promql/engine.go index ea4bc1af8..8825be20c 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "io" "math" "reflect" "runtime" @@ -271,6 +272,8 @@ func contextErr(err error, env string) error { // // 2) Enforcement of the maximum number of concurrent queries. type QueryTracker interface { + io.Closer + // GetMaxConcurrent returns maximum number of concurrent queries that are allowed by this tracker. GetMaxConcurrent() int @@ -423,6 +426,14 @@ func NewEngine(opts EngineOpts) *Engine { } } +// Close closes ng. +func (ng *Engine) Close() error { + if ng.activeQueryTracker != nil { + return ng.activeQueryTracker.Close() + } + return nil +} + // SetQueryLogger sets the query logger. func (ng *Engine) SetQueryLogger(l QueryLogger) { ng.queryLoggerLock.Lock() diff --git a/promql/engine_test.go b/promql/engine_test.go index b7435d473..805999369 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -18,7 +18,6 @@ import ( "errors" "fmt" "math" - "os" "sort" "strconv" "sync" @@ -55,14 +54,7 @@ func TestMain(m *testing.M) { func TestQueryConcurrency(t *testing.T) { maxConcurrency := 10 - dir, err := os.MkdirTemp("", "test_concurrency") - require.NoError(t, err) - defer os.RemoveAll(dir) - queryTracker := promql.NewActiveQueryTracker(dir, maxConcurrency, nil) - t.Cleanup(func() { - require.NoError(t, queryTracker.Close()) - }) - + queryTracker := promql.NewActiveQueryTracker(t.TempDir(), maxConcurrency, nil) opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -72,13 +64,18 @@ func TestQueryConcurrency(t *testing.T) { } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) - defer cancelCtx() + t.Cleanup(cancelCtx) block := make(chan struct{}) processing := make(chan struct{}) done := make(chan int) - defer close(done) + t.Cleanup(func() { + close(done) + }) f := func(context.Context) error { select { @@ -164,6 +161,9 @@ func TestQueryTimeout(t *testing.T) { Timeout: 5 * time.Millisecond, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -263,6 +263,9 @@ func TestQueryError(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) errStorage := promql.ErrStorage{errors.New("storage error")} queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { return &errQuerier{err: errStorage}, nil @@ -597,6 +600,9 @@ func TestSelectHintsSetCorrectly(t *testing.T) { } { t.Run(tc.query, func(t *testing.T) { engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) hintsRecorder := &noopHintRecordingQueryable{} var ( @@ -628,6 +634,9 @@ func TestEngineShutdown(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) block := make(chan struct{}) @@ -763,7 +772,7 @@ load 10s t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) { var err error var qry promql.Query - engine := newTestEngine() + engine := newTestEngine(t) if c.Interval == 0 { qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start) } else { @@ -1305,7 +1314,7 @@ load 10s for _, c := range cases { t.Run(c.Query, func(t *testing.T) { opts := promql.NewPrometheusQueryOpts(true, 0) - engine := promqltest.NewTestEngine(true, 0, promqltest.DefaultMaxSamplesPerQuery) + engine := promqltest.NewTestEngine(t, true, 0, promqltest.DefaultMaxSamplesPerQuery) runQuery := func(expErr error) *stats.Statistics { var err error @@ -1332,7 +1341,7 @@ load 10s if c.SkipMaxCheck { return } - engine = promqltest.NewTestEngine(true, 0, stats.Samples.PeakSamples-1) + engine = promqltest.NewTestEngine(t, true, 0, stats.Samples.PeakSamples-1) runQuery(promql.ErrTooManySamples(env)) }) } @@ -1485,7 +1494,7 @@ load 10s for _, c := range cases { t.Run(c.Query, func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) testFunc := func(expError error) { var err error var qry promql.Query @@ -1506,18 +1515,18 @@ load 10s } // Within limit. - engine = promqltest.NewTestEngine(false, 0, c.MaxSamples) + engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples) testFunc(nil) // Exceeding limit. - engine = promqltest.NewTestEngine(false, 0, c.MaxSamples-1) + engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples-1) testFunc(promql.ErrTooManySamples(env)) }) } } func TestAtModifier(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := promqltest.LoadedStorage(t, ` load 10s metric{job="1"} 0+1x1000 @@ -1995,7 +2004,7 @@ func TestSubquerySelector(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := promqltest.LoadedStorage(t, tst.loadString) t.Cleanup(func() { storage.Close() }) @@ -2016,7 +2025,7 @@ func TestSubquerySelector(t *testing.T) { } func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := promqltest.LoadedStorage(t, ` load 1m metric 0+1x1000 @@ -2086,6 +2095,9 @@ func TestQueryLogger_basic(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) queryExec := func() { ctx, cancelCtx := context.WithCancel(context.Background()) @@ -2137,6 +2149,9 @@ func TestQueryLogger_fields(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) f1 := NewFakeQueryLogger() engine.SetQueryLogger(f1) @@ -2166,6 +2181,9 @@ func TestQueryLogger_error(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) f1 := NewFakeQueryLogger() engine.SetQueryLogger(f1) @@ -3049,6 +3067,9 @@ func TestEngineOptsValidation(t *testing.T) { for _, c := range cases { eng := promql.NewEngine(c.opts) + t.Cleanup(func() { + require.NoError(t, eng.Close()) + }) _, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0)) _, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) if c.fail { @@ -3208,7 +3229,7 @@ func TestRangeQuery(t *testing.T) { } for _, c := range cases { t.Run(c.Name, func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := promqltest.LoadedStorage(t, c.Load) t.Cleanup(func() { storage.Close() }) @@ -3342,7 +3363,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) { seriesName := "sparse_histogram_series" seriesNameOverTime := "sparse_histogram_series_over_time" - engine := newTestEngine() + engine := newTestEngine(t) ts := idx0 * int64(10*time.Minute/time.Millisecond) app := storage.Appender(context.Background()) @@ -3612,7 +3633,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { for _, c := range cases { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) @@ -3773,7 +3794,7 @@ func TestNativeHistogram_MulDivOperator(t *testing.T) { seriesName := "sparse_histogram_series" floatSeriesName := "float_series" - engine := newTestEngine() + engine := newTestEngine(t) ts := idx0 * int64(10*time.Minute/time.Millisecond) app := storage.Appender(context.Background()) @@ -3896,7 +3917,7 @@ metric 0 1 2 for _, c := range cases { c := c t.Run(c.name, func(t *testing.T) { - engine := promqltest.NewTestEngine(false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery) + engine := promqltest.NewTestEngine(t, false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery) storage := promqltest.LoadedStorage(t, load) t.Cleanup(func() { storage.Close() }) diff --git a/promql/functions_test.go b/promql/functions_test.go index aef59c837..ce62d316b 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -40,6 +40,9 @@ func TestDeriv(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) a := storage.Appender(context.Background()) diff --git a/promql/promql_test.go b/promql/promql_test.go index 7bafc02e3..0106a1e10 100644 --- a/promql/promql_test.go +++ b/promql/promql_test.go @@ -27,12 +27,12 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) -func newTestEngine() *promql.Engine { - return promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery) +func newTestEngine(t *testing.T) *promql.Engine { + return promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery) } func TestEvaluations(t *testing.T) { - promqltest.RunBuiltinTests(t, newTestEngine()) + promqltest.RunBuiltinTests(t, newTestEngine(t)) } // Run a lot of queries at the same time, to check for race conditions. @@ -46,6 +46,9 @@ func TestConcurrentRangeQueries(t *testing.T) { Timeout: 100 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) const interval = 10000 // 10s interval. // A day of data plus 10k steps. diff --git a/promql/promqltest/test.go b/promql/promqltest/test.go index 1affd91f6..29908acd9 100644 --- a/promql/promqltest/test.go +++ b/promql/promqltest/test.go @@ -72,8 +72,8 @@ func LoadedStorage(t testutil.T, input string) *teststorage.TestStorage { return test.storage } -func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine { - return promql.NewEngine(promql.EngineOpts{ +func NewTestEngine(t *testing.T, enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine { + ng := promql.NewEngine(promql.EngineOpts{ Logger: nil, Reg: nil, MaxSamples: maxSamples, @@ -84,6 +84,10 @@ func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamp EnablePerStepStats: enablePerStepStats, LookbackDelta: lookbackDelta, }) + t.Cleanup(func() { + require.NoError(t, ng.Close()) + }) + return ng } // RunBuiltinTests runs an acceptance test suite against the provided engine. @@ -1073,7 +1077,11 @@ func (ll *LazyLoader) Storage() storage.Storage { // Close closes resources associated with the LazyLoader. func (ll *LazyLoader) Close() error { ll.cancelCtx() - return ll.storage.Close() + err := ll.queryEngine.Close() + if sErr := ll.storage.Close(); sErr != nil { + return errors.Join(sErr, err) + } + return err } func makeInt64Pointer(val int64) *int64 { diff --git a/promql/promqltest/test_test.go b/promql/promqltest/test_test.go index f6fe38707..4fd3c4000 100644 --- a/promql/promqltest/test_test.go +++ b/promql/promqltest/test_test.go @@ -451,7 +451,7 @@ eval range from 0 to 5m step 5m testmetric for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - err := runTest(t, testCase.input, NewTestEngine(false, 0, DefaultMaxSamplesPerQuery)) + err := runTest(t, testCase.input, NewTestEngine(t, false, 0, DefaultMaxSamplesPerQuery)) if testCase.expectedError == "" { require.NoError(t, err) diff --git a/rules/alerting_test.go b/rules/alerting_test.go index a9315b47e..ee19d2f25 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -36,16 +36,23 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) -var testEngine = promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: 100 * time.Second, - NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, - EnableAtModifier: true, - EnableNegativeOffset: true, - EnablePerStepStats: true, -}) +func testEngine(tb testing.TB) *promql.Engine { + tb.Helper() + e := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: true, + }) + tb.Cleanup(func() { + require.NoError(tb, e.Close()) + }) + return e +} func TestAlertingRuleState(t *testing.T) { tests := []struct { @@ -225,12 +232,14 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { }, } + ng := testEngine(t) + baseTime := time.Unix(0, 0) for i, result := range results { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].T = timestamp.FromTime(evalTime) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -247,7 +256,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { testutil.RequireEqual(t, result, filteredRes) } evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } @@ -309,13 +318,15 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { }, } + ng := testEngine(t) + evalTime := time.Unix(0, 0) result[0].T = timestamp.FromTime(evalTime) result[1].T = timestamp.FromTime(evalTime) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalLabels.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -329,7 +340,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { } res, err = ruleWithExternalLabels.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -406,9 +417,11 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { result[0].T = timestamp.FromTime(evalTime) result[1].T = timestamp.FromTime(evalTime) + ng := testEngine(t) + var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalURL.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -422,7 +435,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { } res, err = ruleWithExternalURL.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -475,9 +488,11 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) { evalTime := time.Unix(0, 0) result[0].T = timestamp.FromTime(evalTime) + ng := testEngine(t) + var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := rule.Eval( - context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -520,6 +535,8 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }}; ) evalTime := time.Unix(0, 0) + ng := testEngine(t) + startQueryCh := make(chan struct{}) getDoneCh := make(chan struct{}) slowQueryFunc := func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { @@ -533,7 +550,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }}; require.Fail(t, "unexpected blocking when template expanding.") } } - return EngineQueryFunc(testEngine, storage)(ctx, q, ts) + return EngineQueryFunc(ng, storage)(ctx, q, ts) } go func() { <-startQueryCh @@ -579,6 +596,9 @@ func TestAlertingRuleDuplicate(t *testing.T) { } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -642,9 +662,9 @@ func TestAlertingRuleLimit(t *testing.T) { ) evalTime := time.Unix(0, 0) - + ng := testEngine(t) for _, test := range tests { - switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { + switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, test.limit); { case err != nil: require.EqualError(t, err, test.err) case test.err != "": @@ -866,12 +886,13 @@ func TestKeepFiringFor(t *testing.T) { }, } + ng := testEngine(t) baseTime := time.Unix(0, 0) for i, result := range results { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].T = timestamp.FromTime(evalTime) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -888,7 +909,7 @@ func TestKeepFiringFor(t *testing.T) { testutil.RequireEqual(t, result, filteredRes) } evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } @@ -923,9 +944,10 @@ func TestPendingAndKeepFiringFor(t *testing.T) { F: 1, } + ng := testEngine(t) baseTime := time.Unix(0, 0) result.T = timestamp.FromTime(baseTime) - res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Len(t, res, 2) @@ -940,7 +962,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) { } evalTime := baseTime.Add(time.Minute) - res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } diff --git a/rules/manager_test.go b/rules/manager_test.go index 2f7343ebb..5ec142c37 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -157,12 +157,13 @@ func TestAlertingRule(t *testing.T) { }, } + ng := testEngine(t) for i, test := range tests { t.Logf("case %d", i) evalTime := baseTime.Add(test.time) - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -296,6 +297,7 @@ func TestForStateAddSamples(t *testing.T) { }, } + ng := testEngine(t) var forState float64 for i, test := range tests { t.Logf("case %d", i) @@ -308,7 +310,7 @@ func TestForStateAddSamples(t *testing.T) { forState = float64(value.StaleNaN) } - res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS' samples. @@ -359,8 +361,9 @@ func TestForStateRestore(t *testing.T) { expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) require.NoError(t, err) + ng := testEngine(t) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), @@ -528,6 +531,9 @@ func TestStaleness(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(engine, st), Appendable: st, @@ -720,6 +726,9 @@ func TestUpdate(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ruleManager := NewManager(&ManagerOptions{ Appendable: st, Queryable: st, @@ -858,6 +867,9 @@ func TestNotify(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) var lastNotified []*Alert notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) { lastNotified = alerts @@ -933,6 +945,9 @@ func TestMetricsUpdate(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1007,6 +1022,9 @@ func TestGroupStalenessOnRemoval(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1084,6 +1102,9 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1186,6 +1207,9 @@ func TestRuleHealthUpdates(t *testing.T) { Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(engine, st), Appendable: st, @@ -1282,9 +1306,10 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) { }, } + ng := testEngine(t) testFunc := func(tst testInput) { opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), @@ -1368,8 +1393,9 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { } require.NoError(t, app.Commit()) + ng := testEngine(t) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), diff --git a/rules/recording_test.go b/rules/recording_test.go index 49f37b1ac..4c2288e6f 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -123,10 +123,11 @@ func TestRuleEval(t *testing.T) { storage := setUpRuleEvalTest(t) t.Cleanup(func() { storage.Close() }) + ng := testEngine(t) for _, scenario := range ruleEvalTestScenarios { t.Run(scenario.name, func(t *testing.T) { rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels) - result, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) + result, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) testutil.RequireEqual(t, scenario.expected, result) }) @@ -137,6 +138,7 @@ func BenchmarkRuleEval(b *testing.B) { storage := setUpRuleEvalTest(b) b.Cleanup(func() { storage.Close() }) + ng := testEngine(b) for _, scenario := range ruleEvalTestScenarios { b.Run(scenario.name, func(b *testing.B) { rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels) @@ -144,7 +146,7 @@ func BenchmarkRuleEval(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) + _, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0) if err != nil { require.NoError(b, err) } @@ -166,6 +168,9 @@ func TestRuleEvalDuplicate(t *testing.T) { } engine := promql.NewEngine(opts) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -212,10 +217,11 @@ func TestRecordingRuleLimit(t *testing.T) { labels.FromStrings("test", "test"), ) + ng := testEngine(t) evalTime := time.Unix(0, 0) for _, test := range tests { - switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { + switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, test.limit); { case err != nil: require.EqualError(t, err, test.err) case test.err != "": diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index b30890893..3fe5a092f 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -59,16 +59,25 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) -var testEngine = promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: 100 * time.Second, - NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, - EnableAtModifier: true, - EnableNegativeOffset: true, - EnablePerStepStats: true, -}) +func testEngine(t *testing.T) *promql.Engine { + t.Helper() + + ng := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: true, + }) + t.Cleanup(func() { + require.NoError(t, ng.Close()) + }) + + return ng +} // testMetaStore satisfies the scrape.MetricMetadataStore interface. // It is used to inject specific metadata as part of a test case. @@ -283,6 +292,9 @@ func (m *rulesRetrieverMock) CreateRuleGroups() { } engine := promql.NewEngine(engineOpts) + m.testing.Cleanup(func() { + require.NoError(m.testing, engine.Close()) + }) opts := &rules.ManagerOptions{ QueryFunc: rules.EngineQueryFunc(engine, storage), Appendable: storage, @@ -403,9 +415,10 @@ func TestEndpoints(t *testing.T) { now := time.Now() + ng := testEngine(t) + t.Run("local", func(t *testing.T) { - algr := rulesRetrieverMock{} - algr.testing = t + algr := rulesRetrieverMock{testing: t} algr.CreateAlertingRules() algr.CreateRuleGroups() @@ -417,7 +430,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: ng, ExemplarQueryable: storage.ExemplarQueryable(), targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), @@ -468,8 +481,7 @@ func TestEndpoints(t *testing.T) { }) require.NoError(t, err) - algr := rulesRetrieverMock{} - algr.testing = t + algr := rulesRetrieverMock{testing: t} algr.CreateAlertingRules() algr.CreateRuleGroups() @@ -481,7 +493,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: remote, - QueryEngine: testEngine, + QueryEngine: ng, ExemplarQueryable: storage.ExemplarQueryable(), targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), @@ -623,7 +635,7 @@ func TestQueryExemplars(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: testEngine(t), ExemplarQueryable: storage.ExemplarQueryable(), } @@ -831,7 +843,7 @@ func TestStats(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: testEngine(t), now: func() time.Time { return time.Unix(123, 0) }, diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index e76a1a3d3..5ddd47e1b 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -86,7 +86,7 @@ func TestApiStatusCodes(t *testing.T) { "error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}}, } { t.Run(fmt.Sprintf("%s/%s", name, k), func(t *testing.T) { - r := createPrometheusAPI(q) + r := createPrometheusAPI(t, q) rec := httptest.NewRecorder() req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=up", nil) @@ -100,7 +100,9 @@ func TestApiStatusCodes(t *testing.T) { } } -func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { +func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route.Router { + t.Helper() + engine := promql.NewEngine(promql.EngineOpts{ Logger: log.NewNopLogger(), Reg: nil, @@ -108,6 +110,9 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { MaxSamples: 100, Timeout: 5 * time.Second, }) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) api := NewAPI( engine,