From fd3630b9a3c7accecbf7fd438706c43612cfc568 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 17 Apr 2023 21:32:38 -0700 Subject: [PATCH] add ctx to QueryEngine interface Signed-off-by: Ben Ye --- cmd/promtool/unittest.go | 2 +- promql/bench_test.go | 5 ++-- promql/engine.go | 4 ++-- promql/engine_test.go | 52 +++++++++++++++++++++------------------- promql/functions_test.go | 5 ++-- promql/promql_test.go | 5 ++-- promql/test.go | 4 ++-- rules/manager.go | 2 +- web/api/v1/api.go | 8 +++---- 9 files changed, 46 insertions(+), 41 deletions(-) diff --git a/cmd/promtool/unittest.go b/cmd/promtool/unittest.go index b3e6f67f9..b9d8e0527 100644 --- a/cmd/promtool/unittest.go +++ b/cmd/promtool/unittest.go @@ -434,7 +434,7 @@ func (tg *testGroup) maxEvalTime() time.Duration { } func query(ctx context.Context, qs string, t time.Time, engine *promql.Engine, qu storage.Queryable) (promql.Vector, error) { - q, err := engine.NewInstantQuery(qu, nil, qs, t) + q, err := engine.NewInstantQuery(ctx, qu, nil, qs, t) if err != nil { return nil, err } diff --git a/promql/bench_test.go b/promql/bench_test.go index 88025d932..3d3588447 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -240,16 +240,17 @@ func BenchmarkRangeQuery(b *testing.B) { for _, c := range cases { name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps) b.Run(name, func(b *testing.B) { + ctx := context.Background() b.ReportAllocs() for i := 0; i < b.N; i++ { qry, err := engine.NewRangeQuery( - stor, nil, c.expr, + ctx, stor, nil, c.expr, time.Unix(int64((numIntervals-c.steps)*10), 0), time.Unix(int64(numIntervals*10), 0), time.Second*10) if err != nil { b.Fatal(err) } - res := qry.Exec(context.Background()) + res := qry.Exec(ctx) if res.Err != nil { b.Fatal(res.Err) } diff --git a/promql/engine.go b/promql/engine.go index b49be244f..fae4f0932 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -400,7 +400,7 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) { } // NewInstantQuery returns an evaluation query for the given expression at the given time. -func (ng *Engine) NewInstantQuery(q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { +func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { expr, err := parser.ParseExpr(qs) if err != nil { return nil, err @@ -416,7 +416,7 @@ func (ng *Engine) NewInstantQuery(q storage.Queryable, opts *QueryOpts, qs strin // NewRangeQuery returns an evaluation query for the given time range and with // the resolution set by the interval. -func (ng *Engine) NewRangeQuery(q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { +func (ng *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { expr, err := parser.ParseExpr(qs) if err != nil { return nil, err diff --git a/promql/engine_test.go b/promql/engine_test.go index b64e32ba4..2941a2240 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -231,14 +231,14 @@ func TestQueryError(t *testing.T) { ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() - vectorQuery, err := engine.NewInstantQuery(queryable, nil, "foo", time.Unix(1, 0)) + vectorQuery, err := engine.NewInstantQuery(ctx, queryable, nil, "foo", time.Unix(1, 0)) require.NoError(t, err) res := vectorQuery.Exec(ctx) require.Error(t, res.Err, "expected error on failed select but got none") require.True(t, errors.Is(res.Err, errStorage), "expected error doesn't match") - matrixQuery, err := engine.NewInstantQuery(queryable, nil, "foo[1m]", time.Unix(1, 0)) + matrixQuery, err := engine.NewInstantQuery(ctx, queryable, nil, "foo[1m]", time.Unix(1, 0)) require.NoError(t, err) res = matrixQuery.Exec(ctx) @@ -564,14 +564,15 @@ func TestSelectHintsSetCorrectly(t *testing.T) { query Query err error ) + ctx := context.Background() if tc.end == 0 { - query, err = engine.NewInstantQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start)) + query, err = engine.NewInstantQuery(ctx, hintsRecorder, nil, tc.query, timestamp.Time(tc.start)) } else { - query, err = engine.NewRangeQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second) + query, err = engine.NewRangeQuery(ctx, hintsRecorder, nil, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second) } require.NoError(t, err) - res := query.Exec(context.Background()) + res := query.Exec(ctx) require.NoError(t, res.Err) require.Equal(t, tc.expected, hintsRecorder.hints) @@ -727,9 +728,9 @@ load 10s var err error var qry Query if c.Interval == 0 { - qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) + qry, err = test.QueryEngine().NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start) } else { - qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) + qry, err = test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) } require.NoError(t, err) @@ -1204,9 +1205,9 @@ load 10s var err error var qry Query if c.Interval == 0 { - qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start) + qry, err = engine.NewInstantQuery(test.context, test.Queryable(), opts, c.Query, c.Start) } else { - qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval) + qry, err = engine.NewRangeQuery(test.context, test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval) } require.NoError(t, err) @@ -1387,9 +1388,9 @@ load 10s var err error var qry Query if c.Interval == 0 { - qry, err = engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) + qry, err = engine.NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start) } else { - qry, err = engine.NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) + qry, err = engine.NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) } require.NoError(t, err) @@ -1628,9 +1629,9 @@ load 1ms var err error var qry Query if c.end == 0 { - qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.query, start) + qry, err = test.QueryEngine().NewInstantQuery(test.context, test.Queryable(), nil, c.query, start) } else { - qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.query, start, end, interval) + qry, err = test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.query, start, end, interval) } require.NoError(t, err) @@ -1961,7 +1962,7 @@ func TestSubquerySelector(t *testing.T) { engine := test.QueryEngine() for _, c := range tst.cases { t.Run(c.Query, func(t *testing.T) { - qry, err := engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) + qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start) require.NoError(t, err) res := qry.Exec(test.Context()) @@ -2909,6 +2910,7 @@ func TestPreprocessAndWrapWithStepInvariantExpr(t *testing.T) { } func TestEngineOptsValidation(t *testing.T) { + ctx := context.Background() cases := []struct { opts EngineOpts query string @@ -2968,8 +2970,8 @@ func TestEngineOptsValidation(t *testing.T) { for _, c := range cases { eng := NewEngine(c.opts) - _, err1 := eng.NewInstantQuery(nil, nil, c.query, time.Unix(10, 0)) - _, err2 := eng.NewRangeQuery(nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) + _, err1 := eng.NewInstantQuery(ctx, nil, nil, c.query, time.Unix(10, 0)) + _, err2 := eng.NewRangeQuery(ctx, nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) if c.fail { require.Equal(t, c.expError, err1) require.Equal(t, c.expError, err2) @@ -3116,7 +3118,7 @@ func TestRangeQuery(t *testing.T) { err = test.Run() require.NoError(t, err) - qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) + qry, err := test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) require.NoError(t, err) res := qry.Exec(test.Context()) @@ -3147,7 +3149,7 @@ func TestNativeHistogramRate(t *testing.T) { engine := test.QueryEngine() queryString := fmt.Sprintf("rate(%s[1m])", seriesName) - qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) + qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) require.NoError(t, err) res := qry.Exec(test.Context()) require.NoError(t, res.Err) @@ -3191,7 +3193,7 @@ func TestNativeFloatHistogramRate(t *testing.T) { engine := test.QueryEngine() queryString := fmt.Sprintf("rate(%s[1m])", seriesName) - qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) + qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) require.NoError(t, err) res := qry.Exec(test.Context()) require.NoError(t, res.Err) @@ -3255,7 +3257,7 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { require.NoError(t, app.Commit()) queryString := fmt.Sprintf("histogram_count(%s)", seriesName) - qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) require.NoError(t, err) res := qry.Exec(test.Context()) @@ -3273,7 +3275,7 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { } queryString = fmt.Sprintf("histogram_sum(%s)", seriesName) - qry, err = engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err = engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) require.NoError(t, err) res = qry.Exec(test.Context()) @@ -3509,7 +3511,7 @@ func TestNativeHistogram_HistogramQuantile(t *testing.T) { for j, sc := range c.subCases { t.Run(fmt.Sprintf("%d %s", j, sc.quantile), func(t *testing.T) { queryString := fmt.Sprintf("histogram_quantile(%s, %s)", sc.quantile, seriesName) - qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) require.NoError(t, err) res := qry.Exec(test.Context()) @@ -3940,7 +3942,7 @@ func TestNativeHistogram_HistogramFraction(t *testing.T) { for j, sc := range c.subCases { t.Run(fmt.Sprintf("%d %s %s", j, sc.lower, sc.upper), func(t *testing.T) { queryString := fmt.Sprintf("histogram_fraction(%s, %s, %s)", sc.lower, sc.upper, seriesName) - qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) require.NoError(t, err) res := qry.Exec(test.Context()) @@ -4077,7 +4079,7 @@ func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) { require.NoError(t, app.Commit()) queryAndCheck := func(queryString string, exp Vector) { - qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) + qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts)) require.NoError(t, err) res := qry.Exec(test.Context()) @@ -4186,7 +4188,7 @@ metric 0 1 2 opts := &QueryOpts{ LookbackDelta: c.queryLookback, } - qry, err := eng.NewInstantQuery(test.Queryable(), opts, query, c.ts) + qry, err := eng.NewInstantQuery(test.context, test.Queryable(), opts, query, c.ts) require.NoError(t, err) res := qry.Exec(test.Context()) diff --git a/promql/functions_test.go b/promql/functions_test.go index e552424b3..8df5684b8 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -56,10 +56,11 @@ func TestDeriv(t *testing.T) { require.NoError(t, a.Commit()) - query, err := engine.NewInstantQuery(storage, nil, "deriv(foo[30m])", timestamp.Time(1493712846939)) + ctx := context.Background() + query, err := engine.NewInstantQuery(ctx, storage, nil, "deriv(foo[30m])", timestamp.Time(1493712846939)) require.NoError(t, err) - result := query.Exec(context.Background()) + result := query.Exec(ctx) require.NoError(t, result.Err) vec, _ := result.Vector() diff --git a/promql/promql_test.go b/promql/promql_test.go index 68bfa1d95..a07a0f5cb 100644 --- a/promql/promql_test.go +++ b/promql/promql_test.go @@ -81,14 +81,15 @@ func TestConcurrentRangeQueries(t *testing.T) { defer func() { sem <- struct{}{} }() + ctx := context.Background() qry, err := engine.NewRangeQuery( - stor, nil, c.expr, + ctx, stor, nil, c.expr, time.Unix(int64((numIntervals-c.steps)*10), 0), time.Unix(int64(numIntervals*10), 0), time.Second*10) if err != nil { return err } - res := qry.Exec(context.Background()) + res := qry.Exec(ctx) if res.Err != nil { t.Logf("Query: %q, steps: %d, result: %s", c.expr, c.steps, res.Err) return res.Err diff --git a/promql/test.go b/promql/test.go index 0b3958393..64fa66d5d 100644 --- a/promql/test.go +++ b/promql/test.go @@ -538,7 +538,7 @@ func (t *Test) exec(tc testCommand) error { } queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...) for _, iq := range queries { - q, err := t.QueryEngine().NewInstantQuery(t.storage, nil, iq.expr, iq.evalTime) + q, err := t.QueryEngine().NewInstantQuery(t.context, t.storage, nil, iq.expr, iq.evalTime) if err != nil { return err } @@ -560,7 +560,7 @@ func (t *Test) exec(tc testCommand) error { // Check query returns same result in range mode, // by checking against the middle step. - q, err = t.queryEngine.NewRangeQuery(t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute) + q, err = t.queryEngine.NewRangeQuery(t.context, t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute) if err != nil { return err } diff --git a/rules/manager.go b/rules/manager.go index 07d50be1b..b6513e82d 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -189,7 +189,7 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, // It converts scalar into vector results. func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { - q, err := engine.NewInstantQuery(q, nil, qs, t) + q, err := engine.NewInstantQuery(ctx, q, nil, qs, t) if err != nil { return nil, err } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index aeea87ca7..dde814eb0 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -177,8 +177,8 @@ type TSDBAdminStats interface { // QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked. type QueryEngine interface { SetQueryLogger(l promql.QueryLogger) - NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) - NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) + NewInstantQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) + NewRangeQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) } // API can register a set of endpoints in a router and handle @@ -417,7 +417,7 @@ func (api *API) query(r *http.Request) (result apiFuncResult) { if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } - qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, opts, r.FormValue("query"), ts) + qry, err := api.QueryEngine.NewInstantQuery(ctx, api.Queryable, opts, r.FormValue("query"), ts) if err != nil { return invalidParamError(err, "query") } @@ -520,7 +520,7 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) { if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } - qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, opts, r.FormValue("query"), start, end, step) + qry, err := api.QueryEngine.NewRangeQuery(ctx, api.Queryable, opts, r.FormValue("query"), start, end, step) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} }