diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 9a940ab5d..d824668ed 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -96,6 +96,7 @@ func main() { webTimeout model.Duration queryTimeout model.Duration queryConcurrency int + queryMaxSamples int RemoteFlushDeadline model.Duration prometheusURL string @@ -197,6 +198,8 @@ func main() { a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently."). Default("20").IntVar(&cfg.queryConcurrency) + a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they would load more samples than this into memory, so this also limits the number of samples a query can return."). + Default("50000000").IntVar(&cfg.queryMaxSamples) promlogflag.AddFlags(a, &cfg.logLevel) @@ -264,12 +267,14 @@ func main() { scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage) - queryEngine = promql.NewEngine( - log.With(logger, "component", "query engine"), - prometheus.DefaultRegisterer, - cfg.queryConcurrency, - time.Duration(cfg.queryTimeout), - ) + opts = promql.EngineOpts{ + Logger: log.With(logger, "component", "query engine"), + Reg: prometheus.DefaultRegisterer, + MaxConcurrent: cfg.queryConcurrency, + MaxSamples: cfg.queryMaxSamples, + Timeout: time.Duration(cfg.queryTimeout), + } + queryEngine = promql.NewEngine(opts) ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, diff --git a/promql/bench_test.go b/promql/bench_test.go index 4bdddb13e..fef667927 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -28,7 +28,14 @@ import ( func BenchmarkRangeQuery(b *testing.B) { storage := testutil.NewStorage(b) defer storage.Close() - engine := NewEngine(nil, nil, 10, 100*time.Second) + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 100 * time.Second, + } + engine := NewEngine(opts) metrics := []labels.Labels{} metrics = append(metrics, labels.FromStrings("__name__", "a_one")) diff --git a/promql/engine.go b/promql/engine.go index e8ca8b7eb..009862b7a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -43,6 +43,7 @@ const ( namespace = "prometheus" subsystem = "engine" queryTag = "query" + env = "query execution" // The largest SampleValue that can be converted to an int64 without overflow. maxInt64 = 9223372036854774784 @@ -69,13 +70,22 @@ type ( ErrQueryTimeout string // ErrQueryCanceled is returned if a query was canceled during processing. ErrQueryCanceled string + // ErrTooManySamples is returned if a query would woud load more than the maximum allowed samples into memory. + ErrTooManySamples string // ErrStorage is returned if an error was encountered in the storage layer // during query handling. ErrStorage error ) -func (e ErrQueryTimeout) Error() string { return fmt.Sprintf("query timed out in %s", string(e)) } -func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was canceled in %s", string(e)) } +func (e ErrQueryTimeout) Error() string { + return fmt.Sprintf("query timed out in %s", string(e)) +} +func (e ErrQueryCanceled) Error() string { + return fmt.Sprintf("query was canceled in %s", string(e)) +} +func (e ErrTooManySamples) Error() string { + return fmt.Sprintf("query processing would load too many samples into memory in %s", string(e)) +} // A Query is derived from an a raw query string and can be run against an engine // it is associated with. @@ -166,19 +176,29 @@ func contextErr(err error, env string) error { } } +// EngineOpts contains configuration options used when creating a new Engine. +type EngineOpts struct { + Logger log.Logger + Reg prometheus.Registerer + MaxConcurrent int + MaxSamples int + Timeout time.Duration +} + // Engine handles the lifetime of queries from beginning to end. // It is connected to a querier. type Engine struct { - logger log.Logger - metrics *engineMetrics - timeout time.Duration - gate *gate.Gate + logger log.Logger + metrics *engineMetrics + timeout time.Duration + gate *gate.Gate + maxSamplesPerQuery int } // NewEngine returns a new engine. -func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, timeout time.Duration) *Engine { - if logger == nil { - logger = log.NewNopLogger() +func NewEngine(opts EngineOpts) *Engine { + if opts.Logger == nil { + opts.Logger = log.NewNopLogger() } metrics := &engineMetrics{ @@ -223,10 +243,10 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, ConstLabels: prometheus.Labels{"slice": "result_sort"}, }), } - metrics.maxConcurrentQueries.Set(float64(maxConcurrent)) + metrics.maxConcurrentQueries.Set(float64(opts.MaxConcurrent)) - if reg != nil { - reg.MustRegister( + if opts.Reg != nil { + opts.Reg.MustRegister( metrics.currentQueries, metrics.maxConcurrentQueries, metrics.queryQueueTime, @@ -236,10 +256,11 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, ) } return &Engine{ - gate: gate.New(maxConcurrent), - timeout: timeout, - logger: logger, - metrics: metrics, + gate: gate.New(opts.MaxConcurrent), + timeout: opts.Timeout, + logger: opts.Logger, + metrics: metrics, + maxSamplesPerQuery: opts.MaxSamples, } } @@ -384,6 +405,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( endTimestamp: start, interval: 1, ctx: ctx, + maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, } val, err := evaluator.Eval(s.Expr) @@ -424,6 +446,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( endTimestamp: timeMilliseconds(s.End), interval: durationMilliseconds(s.Interval), ctx: ctx, + maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, } val, err := evaluator.Eval(s.Expr) @@ -575,11 +598,12 @@ type evaluator struct { ctx context.Context startTimestamp int64 // Start time in milliseconds. + endTimestamp int64 // End time in milliseconds. + interval int64 // Interval in milliseconds. - endTimestamp int64 // End time in milliseconds. - interval int64 // Interval in milliseconds. - - logger log.Logger + maxSamples int + currentSamples int + logger log.Logger } // errorf causes a panic with the input formatted into an error. @@ -673,15 +697,18 @@ func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.L // rangeEval evaluates the given expressions, and then for each step calls // the given function with the values computed for each expression at that -// step. The return value is the combination into time series of of all the +// step. The return value is the combination into time series of all the // function call results. func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ...Expr) Matrix { numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 matrixes := make([]Matrix, len(exprs)) origMatrixes := make([]Matrix, len(exprs)) + originalNumSamples := ev.currentSamples + for i, e := range exprs { // Functions will take string arguments from the expressions, not the values. if e != nil && e.Type() != ValueTypeString { + // ev.currentSamples will be updated to the correct value within the ev.eval call. matrixes[i] = ev.eval(e).(Matrix) // Keep a copy of the original point slices so that they @@ -704,17 +731,25 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs .. } enh := &EvalNodeHelper{out: make(Vector, 0, biggestLen)} seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. + tempNumSamples := ev.currentSamples for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + // Reset number of samples in memory after each timestamp. + ev.currentSamples = tempNumSamples // Gather input vectors for this timestamp. for i := range exprs { vectors[i] = vectors[i][:0] for si, series := range matrixes[i] { for _, point := range series.Points { if point.T == ts { - vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point}) - // Move input vectors forward so we don't have to re-scan the same - // past points at the next step. - matrixes[i][si].Points = series.Points[1:] + if ev.currentSamples < ev.maxSamples { + vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point}) + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + matrixes[i][si].Points = series.Points[1:] + ev.currentSamples++ + } else { + ev.error(ErrTooManySamples(env)) + } } break } @@ -728,6 +763,16 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs .. ev.errorf("vector cannot contain metrics with the same labelset") } enh.out = result[:0] // Reuse result vector. + + ev.currentSamples += len(result) + // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also + // needs to include the samples from the result here, as they're still in memory. + tempNumSamples += len(result) + + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { mat := make(Matrix, len(result)) @@ -735,8 +780,10 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs .. s.Point.T = ts mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}} } + ev.currentSamples = originalNumSamples + mat.TotalSamples() return mat } + // Add samples in output vector to output series. for _, sample := range result { h := sample.Metric.Hash() @@ -750,19 +797,22 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs .. sample.Point.T = ts ss.Points = append(ss.Points, sample.Point) seriess[h] = ss + } } + // Reuse the original point slices. for _, m := range origMatrixes { for _, s := range m { putPointSlice(s.Points) } } - // Assemble the output matrix. + // Assemble the output matrix. By the time we get here we know we don't have too many samples. mat := make(Matrix, 0, len(seriess)) for _, ss := range seriess { mat = append(mat, ss) } + ev.currentSamples = originalNumSamples + mat.TotalSamples() return mat } @@ -802,6 +852,7 @@ func (ev *evaluator) eval(expr Expr) Value { }) } } + // Check if the function has a matrix argument. var matrixArgIndex int var matrixArg bool @@ -887,7 +938,12 @@ func (ev *evaluator) eval(expr Expr) Value { it.ReduceDelta(stepRange) } if len(ss.Points) > 0 { - mat = append(mat, ss) + if ev.currentSamples < ev.maxSamples { + mat = append(mat, ss) + ev.currentSamples += len(ss.Points) + } else { + ev.error(ErrTooManySamples(env)) + } } } if mat.ContainsSameLabelset() { @@ -971,13 +1027,19 @@ func (ev *evaluator) eval(expr Expr) Value { for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { _, v, ok := ev.vectorSelectorSingle(it, e, ts) if ok { - ss.Points = append(ss.Points, Point{V: v, T: ts}) + if ev.currentSamples < ev.maxSamples { + ss.Points = append(ss.Points, Point{V: v, T: ts}) + ev.currentSamples++ + } else { + ev.error(ErrTooManySamples(env)) + } } } if len(ss.Points) > 0 { mat = append(mat, ss) } + } return mat @@ -1007,8 +1069,12 @@ func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector { Metric: node.series[i].Labels(), Point: Point{V: v, T: t}, }) + ev.currentSamples++ } + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } } return vec } @@ -1063,11 +1129,12 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { maxt = ev.startTimestamp - offset mint = maxt - durationMilliseconds(node.Range) matrix = make(Matrix, 0, len(node.series)) + err error ) it := storage.NewBuffer(durationMilliseconds(node.Range)) for i, s := range node.series { - if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + if err = contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } it.Reset(s.Iterator()) @@ -1127,14 +1194,22 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m } // Values in the buffer are guaranteed to be smaller than maxt. if t >= mint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } out = append(out, Point{T: t, V: v}) + ev.currentSamples++ } } // The seeked sample might also be in the range. if ok { t, v := it.Values() if t == maxt && !value.IsStaleNaN(v) { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } out = append(out, Point{T: t, V: v}) + ev.currentSamples++ } } return out diff --git a/promql/engine_test.go b/promql/engine_test.go index 6b9bf5665..adf99985d 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -27,9 +27,15 @@ import ( ) func TestQueryConcurrency(t *testing.T) { - concurrentQueries := 10 + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 100 * time.Second, + } - engine := NewEngine(nil, nil, concurrentQueries, 10*time.Second) + engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -42,7 +48,7 @@ func TestQueryConcurrency(t *testing.T) { return nil } - for i := 0; i < concurrentQueries; i++ { + for i := 0; i < opts.MaxConcurrent; i++ { q := engine.newTestQuery(f) go q.Exec(ctx) select { @@ -74,13 +80,20 @@ func TestQueryConcurrency(t *testing.T) { } // Terminate remaining queries. - for i := 0; i < concurrentQueries; i++ { + for i := 0; i < opts.MaxConcurrent; i++ { block <- struct{}{} } } func TestQueryTimeout(t *testing.T) { - engine := NewEngine(nil, nil, 20, 5*time.Millisecond) + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 20, + MaxSamples: 10, + Timeout: 5 * time.Millisecond, + } + engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -99,7 +112,14 @@ func TestQueryTimeout(t *testing.T) { } func TestQueryCancel(t *testing.T) { - engine := NewEngine(nil, nil, 10, 10*time.Second) + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -165,7 +185,14 @@ func (errSeriesSet) At() storage.Series { return nil } func (e errSeriesSet) Err() error { return e.err } func TestQueryError(t *testing.T) { - engine := NewEngine(nil, nil, 10, 10*time.Second) + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := NewEngine(opts) errStorage := ErrStorage(fmt.Errorf("storage error")) queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return &errQuerier{err: errStorage}, nil @@ -199,7 +226,14 @@ func TestQueryError(t *testing.T) { } func TestEngineShutdown(t *testing.T) { - engine := NewEngine(nil, nil, 10, 10*time.Second) + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) block := make(chan struct{}) @@ -360,6 +394,193 @@ load 10s } +func TestMaxQuerySamples(t *testing.T) { + test, err := NewTest(t, ` +load 10s + metric 1 2 +`) + + if err != nil { + t.Fatalf("unexpected error creating test: %q", err) + } + defer test.Close() + + err = test.Run() + if err != nil { + t.Fatalf("unexpected error initializing test: %q", err) + } + + cases := []struct { + Query string + MaxSamples int + Result Result + Start time.Time + End time.Time + Interval time.Duration + }{ + // Instant queries. + { + Query: "1", + MaxSamples: 1, + Result: Result{ + nil, + Scalar{V: 1, T: 1000}}, + Start: time.Unix(1, 0), + }, + { + Query: "1", + MaxSamples: 0, + Result: Result{ + ErrTooManySamples(env), + nil, + }, + Start: time.Unix(1, 0), + }, + { + Query: "metric", + MaxSamples: 0, + Result: Result{ + ErrTooManySamples(env), + nil, + }, + Start: time.Unix(1, 0), + }, + { + Query: "metric", + MaxSamples: 1, + Result: Result{ + nil, + Vector{ + Sample{Point: Point{V: 1, T: 1000}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + }, + Start: time.Unix(1, 0), + }, + { + Query: "metric[20s]", + MaxSamples: 2, + Result: Result{ + nil, + Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + }, + Start: time.Unix(10, 0), + }, + { + Query: "metric[20s]", + MaxSamples: 0, + Result: Result{ + ErrTooManySamples(env), + nil, + }, + Start: time.Unix(10, 0), + }, + // Range queries. + { + Query: "1", + MaxSamples: 3, + Result: Result{ + nil, + Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, + Metric: labels.FromStrings()}, + }, + }, + Start: time.Unix(0, 0), + End: time.Unix(2, 0), + Interval: time.Second, + }, + { + Query: "1", + MaxSamples: 0, + Result: Result{ + ErrTooManySamples(env), + nil, + }, + Start: time.Unix(0, 0), + End: time.Unix(2, 0), + Interval: time.Second, + }, + { + Query: "metric", + MaxSamples: 3, + Result: Result{ + nil, + Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + }, + Start: time.Unix(0, 0), + End: time.Unix(2, 0), + Interval: time.Second, + }, + { + Query: "metric", + MaxSamples: 2, + Result: Result{ + ErrTooManySamples(env), + nil, + }, + Start: time.Unix(0, 0), + End: time.Unix(2, 0), + Interval: time.Second, + }, + { + Query: "metric", + MaxSamples: 3, + Result: Result{ + nil, + Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + }, + Start: time.Unix(0, 0), + End: time.Unix(10, 0), + Interval: 5 * time.Second, + }, + { + Query: "metric", + MaxSamples: 2, + Result: Result{ + ErrTooManySamples(env), + nil, + }, + Start: time.Unix(0, 0), + End: time.Unix(10, 0), + Interval: 5 * time.Second, + }, + } + + engine := test.QueryEngine() + for _, c := range cases { + var err error + var qry Query + + engine.maxSamplesPerQuery = c.MaxSamples + + if c.Interval == 0 { + qry, err = engine.NewInstantQuery(test.Queryable(), c.Query, c.Start) + } else { + qry, err = engine.NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval) + } + if err != nil { + t.Fatalf("unexpected error creating query: %q", err) + } + res := qry.Exec(test.Context()) + if res.Err != nil && res.Err != c.Result.Err { + t.Fatalf("unexpected error running query: %q, expected to get result: %q", res.Err, c.Result.Value) + } + if !reflect.DeepEqual(res.Value, c.Result.Value) { + t.Fatalf("unexpected result for query %q: got %q wanted %q", c.Query, res.Value.String(), c.Result.String()) + } + } +} + func TestRecoverEvaluatorRuntime(t *testing.T) { ev := &evaluator{logger: log.NewNopLogger()} diff --git a/promql/functions_test.go b/promql/functions_test.go index 19680eb7f..d7134b3d7 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -29,7 +29,14 @@ func TestDeriv(t *testing.T) { // so we test it by hand. storage := testutil.NewStorage(t) defer storage.Close() - engine := NewEngine(nil, nil, 10, 10*time.Second) + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10000, + Timeout: 10 * time.Second, + } + engine := NewEngine(opts) a, err := storage.Appender() testutil.Ok(t, err) diff --git a/promql/test.go b/promql/test.go index f903e86e2..e90966cc8 100644 --- a/promql/test.go +++ b/promql/test.go @@ -506,7 +506,15 @@ func (t *Test) clear() { } t.storage = testutil.NewStorage(t) - t.queryEngine = NewEngine(nil, nil, 20, 10*time.Second) + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 20, + MaxSamples: 1000, + Timeout: 100 * time.Second, + } + + t.queryEngine = NewEngine(opts) t.context, t.cancelCtx = context.WithCancel(context.Background()) } diff --git a/promql/value.go b/promql/value.go index a57703081..2063f22d7 100644 --- a/promql/value.go +++ b/promql/value.go @@ -171,6 +171,15 @@ func (m Matrix) String() string { return strings.Join(strs, "\n") } +// TotalSamples returns the total number of samples in the series within a matrix. +func (m Matrix) TotalSamples() int { + numSamples := 0 + for _, series := range m { + numSamples += len(series.Points) + } + return numSamples +} + func (m Matrix) Len() int { return len(m) } func (m Matrix) Less(i, j int) bool { return labels.Compare(m[i].Metric, m[j].Metric) < 0 } func (m Matrix) Swap(i, j int) { m[i], m[j] = m[j], m[i] } diff --git a/rules/manager_test.go b/rules/manager_test.go index 429fcbb05..60f25cc52 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -494,7 +494,14 @@ func TestForStateRestore(t *testing.T) { func TestStaleness(t *testing.T) { storage := testutil.NewStorage(t) defer storage.Close() - engine := promql.NewEngine(nil, nil, 10, 10*time.Second) + engineOpts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(engineOpts) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(engine, storage), Appendable: storage, @@ -623,7 +630,14 @@ func TestUpdate(t *testing.T) { } storage := testutil.NewStorage(t) defer storage.Close() - engine := promql.NewEngine(nil, nil, 10, 10*time.Second) + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, TSDB: storage, @@ -655,7 +669,14 @@ func TestUpdate(t *testing.T) { func TestNotify(t *testing.T) { storage := testutil.NewStorage(t) defer storage.Close() - engine := promql.NewEngine(nil, nil, 10, 10*time.Second) + engineOpts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(engineOpts) var lastNotified []*Alert notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) { lastNotified = alerts diff --git a/rules/recording_test.go b/rules/recording_test.go index 484474140..97133f9e4 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -28,7 +28,15 @@ func TestRuleEval(t *testing.T) { storage := testutil.NewStorage(t) defer storage.Close() - engine := promql.NewEngine(nil, nil, 10, 10*time.Second) + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + + engine := promql.NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index a04fd5115..c5739ebb9 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -151,7 +151,15 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group { storage := testutil.NewStorage(m.testing) defer storage.Close() - engine := promql.NewEngine(nil, nil, 10, 10*time.Second) + engineOpts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 100 * time.Second, + } + + engine := promql.NewEngine(engineOpts) opts := &rules.ManagerOptions{ QueryFunc: rules.EngineQueryFunc(engine, storage), Appendable: storage,