Merge pull request #12269 from yeya24/add-ctx-engine-interface

Add ctx to QueryEngine interface
pull/12252/head
Julien Pivotto 2023-04-18 11:31:31 +02:00 committed by GitHub
commit bb217dded8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 46 additions and 41 deletions

View File

@ -434,7 +434,7 @@ func (tg *testGroup) maxEvalTime() time.Duration {
} }
func query(ctx context.Context, qs string, t time.Time, engine *promql.Engine, qu storage.Queryable) (promql.Vector, error) { func query(ctx context.Context, qs string, t time.Time, engine *promql.Engine, qu storage.Queryable) (promql.Vector, error) {
q, err := engine.NewInstantQuery(qu, nil, qs, t) q, err := engine.NewInstantQuery(ctx, qu, nil, qs, t)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -240,16 +240,17 @@ func BenchmarkRangeQuery(b *testing.B) {
for _, c := range cases { for _, c := range cases {
name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps) name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps)
b.Run(name, func(b *testing.B) { b.Run(name, func(b *testing.B) {
ctx := context.Background()
b.ReportAllocs() b.ReportAllocs()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
qry, err := engine.NewRangeQuery( qry, err := engine.NewRangeQuery(
stor, nil, c.expr, ctx, stor, nil, c.expr,
time.Unix(int64((numIntervals-c.steps)*10), 0), time.Unix(int64((numIntervals-c.steps)*10), 0),
time.Unix(int64(numIntervals*10), 0), time.Second*10) time.Unix(int64(numIntervals*10), 0), time.Second*10)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
res := qry.Exec(context.Background()) res := qry.Exec(ctx)
if res.Err != nil { if res.Err != nil {
b.Fatal(res.Err) b.Fatal(res.Err)
} }

View File

@ -400,7 +400,7 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) {
} }
// NewInstantQuery returns an evaluation query for the given expression at the given time. // NewInstantQuery returns an evaluation query for the given expression at the given time.
func (ng *Engine) NewInstantQuery(q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) {
expr, err := parser.ParseExpr(qs) expr, err := parser.ParseExpr(qs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -416,7 +416,7 @@ func (ng *Engine) NewInstantQuery(q storage.Queryable, opts *QueryOpts, qs strin
// NewRangeQuery returns an evaluation query for the given time range and with // NewRangeQuery returns an evaluation query for the given time range and with
// the resolution set by the interval. // the resolution set by the interval.
func (ng *Engine) NewRangeQuery(q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { func (ng *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) {
expr, err := parser.ParseExpr(qs) expr, err := parser.ParseExpr(qs)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -231,14 +231,14 @@ func TestQueryError(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background()) ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx() defer cancelCtx()
vectorQuery, err := engine.NewInstantQuery(queryable, nil, "foo", time.Unix(1, 0)) vectorQuery, err := engine.NewInstantQuery(ctx, queryable, nil, "foo", time.Unix(1, 0))
require.NoError(t, err) require.NoError(t, err)
res := vectorQuery.Exec(ctx) res := vectorQuery.Exec(ctx)
require.Error(t, res.Err, "expected error on failed select but got none") require.Error(t, res.Err, "expected error on failed select but got none")
require.True(t, errors.Is(res.Err, errStorage), "expected error doesn't match") require.True(t, errors.Is(res.Err, errStorage), "expected error doesn't match")
matrixQuery, err := engine.NewInstantQuery(queryable, nil, "foo[1m]", time.Unix(1, 0)) matrixQuery, err := engine.NewInstantQuery(ctx, queryable, nil, "foo[1m]", time.Unix(1, 0))
require.NoError(t, err) require.NoError(t, err)
res = matrixQuery.Exec(ctx) res = matrixQuery.Exec(ctx)
@ -564,14 +564,15 @@ func TestSelectHintsSetCorrectly(t *testing.T) {
query Query query Query
err error err error
) )
ctx := context.Background()
if tc.end == 0 { if tc.end == 0 {
query, err = engine.NewInstantQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start)) query, err = engine.NewInstantQuery(ctx, hintsRecorder, nil, tc.query, timestamp.Time(tc.start))
} else { } else {
query, err = engine.NewRangeQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second) query, err = engine.NewRangeQuery(ctx, hintsRecorder, nil, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second)
} }
require.NoError(t, err) require.NoError(t, err)
res := query.Exec(context.Background()) res := query.Exec(ctx)
require.NoError(t, res.Err) require.NoError(t, res.Err)
require.Equal(t, tc.expected, hintsRecorder.hints) require.Equal(t, tc.expected, hintsRecorder.hints)
@ -727,9 +728,9 @@ load 10s
var err error var err error
var qry Query var qry Query
if c.Interval == 0 { if c.Interval == 0 {
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) qry, err = test.QueryEngine().NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start)
} else { } else {
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) qry, err = test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
} }
require.NoError(t, err) require.NoError(t, err)
@ -1204,9 +1205,9 @@ load 10s
var err error var err error
var qry Query var qry Query
if c.Interval == 0 { if c.Interval == 0 {
qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start) qry, err = engine.NewInstantQuery(test.context, test.Queryable(), opts, c.Query, c.Start)
} else { } else {
qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval) qry, err = engine.NewRangeQuery(test.context, test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval)
} }
require.NoError(t, err) require.NoError(t, err)
@ -1387,9 +1388,9 @@ load 10s
var err error var err error
var qry Query var qry Query
if c.Interval == 0 { if c.Interval == 0 {
qry, err = engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) qry, err = engine.NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start)
} else { } else {
qry, err = engine.NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) qry, err = engine.NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
} }
require.NoError(t, err) require.NoError(t, err)
@ -1628,9 +1629,9 @@ load 1ms
var err error var err error
var qry Query var qry Query
if c.end == 0 { if c.end == 0 {
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.query, start) qry, err = test.QueryEngine().NewInstantQuery(test.context, test.Queryable(), nil, c.query, start)
} else { } else {
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.query, start, end, interval) qry, err = test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.query, start, end, interval)
} }
require.NoError(t, err) require.NoError(t, err)
@ -1961,7 +1962,7 @@ func TestSubquerySelector(t *testing.T) {
engine := test.QueryEngine() engine := test.QueryEngine()
for _, c := range tst.cases { for _, c := range tst.cases {
t.Run(c.Query, func(t *testing.T) { t.Run(c.Query, func(t *testing.T) {
qry, err := engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start)
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
@ -2909,6 +2910,7 @@ func TestPreprocessAndWrapWithStepInvariantExpr(t *testing.T) {
} }
func TestEngineOptsValidation(t *testing.T) { func TestEngineOptsValidation(t *testing.T) {
ctx := context.Background()
cases := []struct { cases := []struct {
opts EngineOpts opts EngineOpts
query string query string
@ -2968,8 +2970,8 @@ func TestEngineOptsValidation(t *testing.T) {
for _, c := range cases { for _, c := range cases {
eng := NewEngine(c.opts) eng := NewEngine(c.opts)
_, err1 := eng.NewInstantQuery(nil, nil, c.query, time.Unix(10, 0)) _, err1 := eng.NewInstantQuery(ctx, nil, nil, c.query, time.Unix(10, 0))
_, err2 := eng.NewRangeQuery(nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) _, err2 := eng.NewRangeQuery(ctx, nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
if c.fail { if c.fail {
require.Equal(t, c.expError, err1) require.Equal(t, c.expError, err1)
require.Equal(t, c.expError, err2) require.Equal(t, c.expError, err2)
@ -3116,7 +3118,7 @@ func TestRangeQuery(t *testing.T) {
err = test.Run() err = test.Run()
require.NoError(t, err) require.NoError(t, err)
qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) qry, err := test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
@ -3147,7 +3149,7 @@ func TestNativeHistogramRate(t *testing.T) {
engine := test.QueryEngine() engine := test.QueryEngine()
queryString := fmt.Sprintf("rate(%s[1m])", seriesName) queryString := fmt.Sprintf("rate(%s[1m])", seriesName)
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
require.NoError(t, res.Err) require.NoError(t, res.Err)
@ -3191,7 +3193,7 @@ func TestNativeFloatHistogramRate(t *testing.T) {
engine := test.QueryEngine() engine := test.QueryEngine()
queryString := fmt.Sprintf("rate(%s[1m])", seriesName) queryString := fmt.Sprintf("rate(%s[1m])", seriesName)
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
require.NoError(t, res.Err) require.NoError(t, res.Err)
@ -3255,7 +3257,7 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
queryString := fmt.Sprintf("histogram_count(%s)", seriesName) queryString := fmt.Sprintf("histogram_count(%s)", seriesName)
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
@ -3273,7 +3275,7 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) {
} }
queryString = fmt.Sprintf("histogram_sum(%s)", seriesName) queryString = fmt.Sprintf("histogram_sum(%s)", seriesName)
qry, err = engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) qry, err = engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err) require.NoError(t, err)
res = qry.Exec(test.Context()) res = qry.Exec(test.Context())
@ -3509,7 +3511,7 @@ func TestNativeHistogram_HistogramQuantile(t *testing.T) {
for j, sc := range c.subCases { for j, sc := range c.subCases {
t.Run(fmt.Sprintf("%d %s", j, sc.quantile), func(t *testing.T) { t.Run(fmt.Sprintf("%d %s", j, sc.quantile), func(t *testing.T) {
queryString := fmt.Sprintf("histogram_quantile(%s, %s)", sc.quantile, seriesName) queryString := fmt.Sprintf("histogram_quantile(%s, %s)", sc.quantile, seriesName)
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
@ -3940,7 +3942,7 @@ func TestNativeHistogram_HistogramFraction(t *testing.T) {
for j, sc := range c.subCases { for j, sc := range c.subCases {
t.Run(fmt.Sprintf("%d %s %s", j, sc.lower, sc.upper), func(t *testing.T) { t.Run(fmt.Sprintf("%d %s %s", j, sc.lower, sc.upper), func(t *testing.T) {
queryString := fmt.Sprintf("histogram_fraction(%s, %s, %s)", sc.lower, sc.upper, seriesName) queryString := fmt.Sprintf("histogram_fraction(%s, %s, %s)", sc.lower, sc.upper, seriesName)
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
@ -4077,7 +4079,7 @@ func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
queryAndCheck := func(queryString string, exp Vector) { queryAndCheck := func(queryString string, exp Vector) {
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts)) qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())
@ -4186,7 +4188,7 @@ metric 0 1 2
opts := &QueryOpts{ opts := &QueryOpts{
LookbackDelta: c.queryLookback, LookbackDelta: c.queryLookback,
} }
qry, err := eng.NewInstantQuery(test.Queryable(), opts, query, c.ts) qry, err := eng.NewInstantQuery(test.context, test.Queryable(), opts, query, c.ts)
require.NoError(t, err) require.NoError(t, err)
res := qry.Exec(test.Context()) res := qry.Exec(test.Context())

View File

@ -56,10 +56,11 @@ func TestDeriv(t *testing.T) {
require.NoError(t, a.Commit()) require.NoError(t, a.Commit())
query, err := engine.NewInstantQuery(storage, nil, "deriv(foo[30m])", timestamp.Time(1493712846939)) ctx := context.Background()
query, err := engine.NewInstantQuery(ctx, storage, nil, "deriv(foo[30m])", timestamp.Time(1493712846939))
require.NoError(t, err) require.NoError(t, err)
result := query.Exec(context.Background()) result := query.Exec(ctx)
require.NoError(t, result.Err) require.NoError(t, result.Err)
vec, _ := result.Vector() vec, _ := result.Vector()

View File

@ -81,14 +81,15 @@ func TestConcurrentRangeQueries(t *testing.T) {
defer func() { defer func() {
sem <- struct{}{} sem <- struct{}{}
}() }()
ctx := context.Background()
qry, err := engine.NewRangeQuery( qry, err := engine.NewRangeQuery(
stor, nil, c.expr, ctx, stor, nil, c.expr,
time.Unix(int64((numIntervals-c.steps)*10), 0), time.Unix(int64((numIntervals-c.steps)*10), 0),
time.Unix(int64(numIntervals*10), 0), time.Second*10) time.Unix(int64(numIntervals*10), 0), time.Second*10)
if err != nil { if err != nil {
return err return err
} }
res := qry.Exec(context.Background()) res := qry.Exec(ctx)
if res.Err != nil { if res.Err != nil {
t.Logf("Query: %q, steps: %d, result: %s", c.expr, c.steps, res.Err) t.Logf("Query: %q, steps: %d, result: %s", c.expr, c.steps, res.Err)
return res.Err return res.Err

View File

@ -538,7 +538,7 @@ func (t *Test) exec(tc testCommand) error {
} }
queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...) queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...)
for _, iq := range queries { for _, iq := range queries {
q, err := t.QueryEngine().NewInstantQuery(t.storage, nil, iq.expr, iq.evalTime) q, err := t.QueryEngine().NewInstantQuery(t.context, t.storage, nil, iq.expr, iq.evalTime)
if err != nil { if err != nil {
return err return err
} }
@ -560,7 +560,7 @@ func (t *Test) exec(tc testCommand) error {
// Check query returns same result in range mode, // Check query returns same result in range mode,
// by checking against the middle step. // by checking against the middle step.
q, err = t.queryEngine.NewRangeQuery(t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute) q, err = t.queryEngine.NewRangeQuery(t.context, t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
if err != nil { if err != nil {
return err return err
} }

View File

@ -189,7 +189,7 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector,
// It converts scalar into vector results. // It converts scalar into vector results.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
q, err := engine.NewInstantQuery(q, nil, qs, t) q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -177,8 +177,8 @@ type TSDBAdminStats interface {
// QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked. // QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked.
type QueryEngine interface { type QueryEngine interface {
SetQueryLogger(l promql.QueryLogger) SetQueryLogger(l promql.QueryLogger)
NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error)
NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
} }
// API can register a set of endpoints in a router and handle // API can register a set of endpoints in a router and handle
@ -417,7 +417,7 @@ func (api *API) query(r *http.Request) (result apiFuncResult) {
if err != nil { if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, opts, r.FormValue("query"), ts) qry, err := api.QueryEngine.NewInstantQuery(ctx, api.Queryable, opts, r.FormValue("query"), ts)
if err != nil { if err != nil {
return invalidParamError(err, "query") return invalidParamError(err, "query")
} }
@ -520,7 +520,7 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) {
if err != nil { if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, opts, r.FormValue("query"), start, end, step) qry, err := api.QueryEngine.NewRangeQuery(ctx, api.Queryable, opts, r.FormValue("query"), start, end, step)
if err != nil { if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }