diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index bbb99e373..7d7bdc77d 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -26,6 +26,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/version" + "golang.org/x/net/context" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" @@ -102,15 +104,17 @@ func Main() int { } var ( - notifier = notifier.New(&cfg.notifier) - targetManager = retrieval.NewTargetManager(sampleAppender) - queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) + notifier = notifier.New(&cfg.notifier) + targetManager = retrieval.NewTargetManager(sampleAppender) + queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) + queryCtx, cancelQueries = context.WithCancel(context.Background()) ) ruleManager := rules.NewManager(&rules.ManagerOptions{ SampleAppender: sampleAppender, Notifier: notifier, QueryEngine: queryEngine, + QueryCtx: queryCtx, ExternalURL: cfg.web.ExternalURL, }) @@ -128,7 +132,7 @@ func Main() int { GoVersion: version.GoVersion, } - webHandler := web.New(localStorage, queryEngine, targetManager, ruleManager, version, flags, &cfg.web) + webHandler := web.New(localStorage, queryEngine, queryCtx, targetManager, ruleManager, version, flags, &cfg.web) reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier) @@ -201,7 +205,7 @@ func Main() int { // Shutting down the query engine before the rule manager will cause pending queries // to be canceled and ensures a quick shutdown of the rule manager. - defer queryEngine.Stop() + defer cancelQueries() go webHandler.Run() diff --git a/promql/engine.go b/promql/engine.go index d9c33d413..184d41b3d 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -150,7 +150,7 @@ func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was cancele // it is associated with. type Query interface { // Exec processes the query and - Exec() *Result + Exec(ctx context.Context) *Result // Statement returns the parsed statement of the query. Statement() Statement // Stats returns statistics about the lifetime of the query. @@ -192,8 +192,8 @@ func (q *query) Cancel() { } // Exec implements the Query interface. -func (q *query) Exec() *Result { - res, err := q.ng.exec(q) +func (q *query) Exec(ctx context.Context) *Result { + res, err := q.ng.exec(ctx, q) return &Result{Err: err, Value: res} } @@ -220,13 +220,8 @@ func contextDone(ctx context.Context, env string) error { type Engine struct { // The querier on which the engine operates. querier local.Querier - - // The base context for all queries and its cancellation function. - baseCtx context.Context - cancelQueries func() // The gate limiting the maximum number of concurrent and waiting queries. - gate *queryGate - + gate *queryGate options *EngineOptions } @@ -235,13 +230,10 @@ func NewEngine(querier local.Querier, o *EngineOptions) *Engine { if o == nil { o = DefaultEngineOptions } - ctx, cancel := context.WithCancel(context.Background()) return &Engine{ - querier: querier, - baseCtx: ctx, - cancelQueries: cancel, - gate: newQueryGate(o.MaxConcurrentQueries), - options: o, + querier: querier, + gate: newQueryGate(o.MaxConcurrentQueries), + options: o, } } @@ -257,11 +249,6 @@ var DefaultEngineOptions = &EngineOptions{ Timeout: 2 * time.Minute, } -// Stop the engine and cancel all running queries. -func (ng *Engine) Stop() { - ng.cancelQueries() -} - // NewInstantQuery returns an evaluation query for the given expression at the given time. func (ng *Engine) NewInstantQuery(qs string, ts model.Time) (Query, error) { expr, err := ParseExpr(qs) @@ -326,8 +313,8 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query { // // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. -func (ng *Engine) exec(q *query) (model.Value, error) { - ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout) +func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) { + ctx, cancel := context.WithTimeout(ctx, ng.options.Timeout) q.cancel = cancel queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start() diff --git a/promql/engine_test.go b/promql/engine_test.go index 0064c180f..6aa63d284 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -23,7 +23,8 @@ import ( func TestQueryConcurrency(t *testing.T) { engine := NewEngine(nil, nil) - defer engine.Stop() + ctx, cancelQueries := context.WithCancel(context.Background()) + defer cancelQueries() block := make(chan struct{}) processing := make(chan struct{}) @@ -36,7 +37,7 @@ func TestQueryConcurrency(t *testing.T) { for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { q := engine.newTestQuery(f) - go q.Exec() + go q.Exec(ctx) select { case <-processing: // Expected. @@ -46,7 +47,7 @@ func TestQueryConcurrency(t *testing.T) { } q := engine.newTestQuery(f) - go q.Exec() + go q.Exec(ctx) select { case <-processing: @@ -76,14 +77,15 @@ func TestQueryTimeout(t *testing.T) { Timeout: 5 * time.Millisecond, MaxConcurrentQueries: 20, }) - defer engine.Stop() + ctx, cancelQueries := context.WithCancel(context.Background()) + defer cancelQueries() query := engine.newTestQuery(func(ctx context.Context) error { time.Sleep(50 * time.Millisecond) return contextDone(ctx, "test statement execution") }) - res := query.Exec() + res := query.Exec(ctx) if res.Err == nil { t.Fatalf("expected timeout error but got none") } @@ -94,7 +96,8 @@ func TestQueryTimeout(t *testing.T) { func TestQueryCancel(t *testing.T) { engine := NewEngine(nil, nil) - defer engine.Stop() + ctx, cancelQueries := context.WithCancel(context.Background()) + defer cancelQueries() // Cancel a running query before it completes. block := make(chan struct{}) @@ -109,7 +112,7 @@ func TestQueryCancel(t *testing.T) { var res *Result go func() { - res = query1.Exec() + res = query1.Exec(ctx) processing <- struct{}{} }() @@ -131,14 +134,15 @@ func TestQueryCancel(t *testing.T) { }) query2.Cancel() - res = query2.Exec() + res = query2.Exec(ctx) if res.Err != nil { - t.Fatalf("unexpeceted error on executing query2: %s", res.Err) + t.Fatalf("unexpected error on executing query2: %s", res.Err) } } func TestEngineShutdown(t *testing.T) { engine := NewEngine(nil, nil) + ctx, cancelQueries := context.WithCancel(context.Background()) block := make(chan struct{}) processing := make(chan struct{}) @@ -158,12 +162,12 @@ func TestEngineShutdown(t *testing.T) { var res *Result go func() { - res = query1.Exec() + res = query1.Exec(ctx) processing <- struct{}{} }() <-processing - engine.Stop() + cancelQueries() block <- struct{}{} <-processing @@ -181,9 +185,9 @@ func TestEngineShutdown(t *testing.T) { // The second query is started after the engine shut down. It must // be canceled immediately. - res2 := query2.Exec() + res2 := query2.Exec(ctx) if res2.Err == nil { - t.Fatalf("expected error on querying shutdown engine but got none") + t.Fatalf("expected error on querying with canceled context but got none") } if _, ok := res2.Err.(ErrQueryCanceled); !ok { t.Fatalf("expected cancelation error, got %q", res2.Err) diff --git a/promql/test.go b/promql/test.go index 60acb9285..d7866239b 100644 --- a/promql/test.go +++ b/promql/test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" @@ -49,9 +50,11 @@ type Test struct { cmds []testCommand - storage local.Storage - closeStorage func() - queryEngine *Engine + storage local.Storage + closeStorage func() + queryEngine *Engine + queryCtx context.Context + cancelQueries context.CancelFunc } // NewTest returns an initialized empty Test. @@ -79,6 +82,11 @@ func (t *Test) QueryEngine() *Engine { return t.queryEngine } +// Context returns the test's query context. +func (t *Test) Context() context.Context { + return t.queryCtx +} + // Storage returns the test's storage. func (t *Test) Storage() local.Storage { return t.storage @@ -463,7 +471,7 @@ func (t *Test) exec(tc testCommand) error { case *evalCmd: q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval) - res := q.Exec() + res := q.Exec(t.queryCtx) if res.Err != nil { if cmd.fail { return nil @@ -490,8 +498,8 @@ func (t *Test) clear() { if t.closeStorage != nil { t.closeStorage() } - if t.queryEngine != nil { - t.queryEngine.Stop() + if t.cancelQueries != nil { + t.cancelQueries() } var closer testutil.Closer @@ -499,11 +507,12 @@ func (t *Test) clear() { t.closeStorage = closer.Close t.queryEngine = NewEngine(t.storage, nil) + t.queryCtx, t.cancelQueries = context.WithCancel(context.Background()) } // Close closes resources associated with the Test. func (t *Test) Close() { - t.queryEngine.Stop() + t.cancelQueries() t.closeStorage() } diff --git a/rules/alerting.go b/rules/alerting.go index 39b5906e1..edd09f5f6 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -18,6 +18,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + html_template "html/template" "github.com/prometheus/common/log" @@ -146,12 +148,12 @@ const resolvedRetention = 15 * time.Minute // eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) { +func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx context.Context, externalURLPath string) (model.Vector, error) { query, err := engine.NewInstantQuery(r.vector.String(), ts) if err != nil { return nil, err } - res, err := query.Exec().Vector() + res, err := query.Exec(queryCtx).Vector() if err != nil { return nil, err } @@ -188,6 +190,7 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, externalURLPat tmplData, ts, engine, + queryCtx, externalURLPath, ) result, err := tmpl.Expand() diff --git a/rules/manager.go b/rules/manager.go index 8bd79a7ff..dac4f4b0d 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + html_template "html/template" "github.com/prometheus/client_golang/prometheus" @@ -105,7 +107,7 @@ const ( type Rule interface { Name() string // eval evaluates the rule, including any associated recording or alerting actions. - eval(model.Time, *promql.Engine, string) (model.Vector, error) + eval(model.Time, *promql.Engine, context.Context, string) (model.Vector, error) // String returns a human-readable string representation of the rule. String() string // HTMLSnippet returns a human-readable string representation of the rule, @@ -256,7 +258,7 @@ func (g *Group) eval() { evalTotal.WithLabelValues(rtyp).Inc() - vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.ExternalURL.Path) + vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.QueryCtx, g.opts.ExternalURL.Path) if err != nil { // Canceled queries are intentional termination of queries. This normally // happens on shutdown and thus we skip logging of any errors here. @@ -341,6 +343,7 @@ type Manager struct { type ManagerOptions struct { ExternalURL *url.URL QueryEngine *promql.Engine + QueryCtx context.Context Notifier *notifier.Notifier SampleAppender storage.SampleAppender } diff --git a/rules/manager_test.go b/rules/manager_test.go index dfee078d7..6f8a1c4fb 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -105,7 +105,7 @@ func TestAlertingRule(t *testing.T) { for i, test := range tests { evalTime := model.Time(0).Add(test.time) - res, err := rule.eval(evalTime, suite.QueryEngine(), "") + res, err := rule.eval(evalTime, suite.QueryEngine(), suite.Context(), "") if err != nil { t.Fatalf("Error during alerting rule evaluation: %s", err) } diff --git a/rules/recording.go b/rules/recording.go index e40fbed5e..ffe39c941 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -18,6 +18,7 @@ import ( "html/template" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/strutil" @@ -45,14 +46,14 @@ func (rule RecordingRule) Name() string { } // eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule RecordingRule) eval(timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) { +func (rule RecordingRule) eval(timestamp model.Time, engine *promql.Engine, queryCtx context.Context, _ string) (model.Vector, error) { query, err := engine.NewInstantQuery(rule.vector.String(), timestamp) if err != nil { return nil, err } var ( - result = query.Exec() + result = query.Exec(queryCtx) vector model.Vector ) if result.Err != nil { diff --git a/rules/recording_test.go b/rules/recording_test.go index 36b5ac1dd..6b58f4eaf 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage/local" @@ -27,6 +28,9 @@ func TestRuleEval(t *testing.T) { storage, closer := local.NewTestStorage(t, 2) defer closer.Close() engine := promql.NewEngine(storage, nil) + queryCtx, cancelQueries := context.WithCancel(context.Background()) + defer cancelQueries() + now := model.Now() suite := []struct { @@ -59,7 +63,7 @@ func TestRuleEval(t *testing.T) { for _, test := range suite { rule := NewRecordingRule(test.name, test.expr, test.labels) - result, err := rule.eval(now, engine, "") + result, err := rule.eval(now, engine, queryCtx, "") if err != nil { t.Fatalf("Error evaluating %s", test.name) } diff --git a/template/template.go b/template/template.go index a1d6cfe06..d07c446e3 100644 --- a/template/template.go +++ b/template/template.go @@ -26,6 +26,7 @@ import ( text_template "text/template" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/strutil" @@ -55,12 +56,12 @@ func (q queryResultByLabelSorter) Swap(i, j int) { q.results[i], q.results[j] = q.results[j], q.results[i] } -func query(q string, timestamp model.Time, queryEngine *promql.Engine) (queryResult, error) { +func query(ctx context.Context, q string, timestamp model.Time, queryEngine *promql.Engine) (queryResult, error) { query, err := queryEngine.NewInstantQuery(q, timestamp) if err != nil { return nil, err } - res := query.Exec() + res := query.Exec(ctx) if res.Err != nil { return nil, res.Err } @@ -110,14 +111,14 @@ type Expander struct { } // NewTemplateExpander returns a template expander ready to use. -func NewTemplateExpander(text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, pathPrefix string) *Expander { +func NewTemplateExpander(text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, queryCtx context.Context, pathPrefix string) *Expander { return &Expander{ text: text, name: name, data: data, funcMap: text_template.FuncMap{ "query": func(q string) (queryResult, error) { - return query(q, timestamp, queryEngine) + return query(queryCtx, q, timestamp, queryEngine) }, "first": func(v queryResult) (*sample, error) { if len(v) > 0 { diff --git a/template/template_test.go b/template/template_test.go index bab51d651..9d1c787a4 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage/local" @@ -220,7 +221,7 @@ func TestTemplateExpansion(t *testing.T) { for i, s := range scenarios { var result string var err error - expander := NewTemplateExpander(s.text, "test", s.input, time, engine, "") + expander := NewTemplateExpander(s.text, "test", s.input, time, engine, context.Background(), "") if s.html { result, err = expander.ExpandHTML(nil) } else { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 97085cda6..a79f9f9c7 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -87,15 +87,17 @@ type apiFunc func(r *http.Request) (interface{}, *apiError) type API struct { Storage local.Storage QueryEngine *promql.Engine + QueryCtx context.Context context func(r *http.Request) context.Context now func() model.Time } // NewAPI returns an initialized API type. -func NewAPI(qe *promql.Engine, st local.Storage) *API { +func NewAPI(qe *promql.Engine, qc context.Context, st local.Storage) *API { return &API{ QueryEngine: qe, + QueryCtx: qc, Storage: st, context: route.Context, now: model.Now, @@ -157,7 +159,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } - res := qry.Exec() + res := qry.Exec(api.QueryCtx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: @@ -204,7 +206,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } - res := qry.Exec() + res := qry.Exec(api.QueryCtx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 38ee39cfb..f0e66eab0 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -52,6 +52,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Storage: suite.Storage(), QueryEngine: suite.QueryEngine(), + QueryCtx: suite.Context(), now: func() model.Time { return now }, } diff --git a/web/web.go b/web/web.go index 9af2496ae..0cce11cc4 100644 --- a/web/web.go +++ b/web/web.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/promql" @@ -55,6 +56,7 @@ type Handler struct { targetManager *retrieval.TargetManager ruleManager *rules.Manager queryEngine *promql.Engine + queryCtx context.Context storage local.Storage apiV1 *api_v1.API @@ -112,6 +114,7 @@ type Options struct { func New( st local.Storage, qe *promql.Engine, + qc context.Context, tm *retrieval.TargetManager, rm *rules.Manager, version *PrometheusVersion, @@ -133,9 +136,10 @@ func New( targetManager: tm, ruleManager: rm, queryEngine: qe, + queryCtx: qc, storage: st, - apiV1: api_v1.NewAPI(qe, st), + apiV1: api_v1.NewAPI(qe, qc, st), now: model.Now, } @@ -293,7 +297,7 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) { Path: strings.TrimLeft(name, "/"), } - tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path) + tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, h.now(), h.queryEngine, h.queryCtx, h.options.ExternalURL.Path) filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -466,7 +470,7 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter http.Error(w, err.Error(), http.StatusInternalServerError) } - tmpl := template.NewTemplateExpander(text, name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path) + tmpl := template.NewTemplateExpander(text, name, data, h.now(), h.queryEngine, h.queryCtx, h.options.ExternalURL.Path) tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options)) result, err := tmpl.ExpandHTML(nil)