From ff0003e0721b1365d4958a2d6d50489f43b8df7b Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Mon, 10 Feb 2020 00:58:23 +0100 Subject: [PATCH] Make lookbackDelta a option of QueryEngine (#6746) * Make lookbackDelta a option of QueryEngine Signed-off-by: Julien Pivotto * julius' suggestion Signed-off-by: Julien Pivotto * remove trivial getter Signed-off-by: Julien Pivotto * Assume lookback delta is always > 0 Signed-off-by: Julien Pivotto * add debug log Signed-off-by: Julien Pivotto * don't expose loopback delta Signed-off-by: Julien Pivotto * Specify that lookack delta is also used in federation Signed-off-by: Julien Pivotto * Fix federation test While we have added some logic to the promql engine to keep it backwards compatible and have a 5 minute loopback by default, the web/ package is likely to really be internal to Prometheus and we should not add the same kind of heuritstics here. Signed-off-by: Julien Pivotto * loopback delta: Fix debug log Signed-off-by: Julien Pivotto --- cmd/prometheus/main.go | 5 +++-- promql/engine.go | 46 ++++++++++++++++++++++++++---------------- promql/engine_test.go | 16 +++++---------- web/federate.go | 4 ++-- web/federate_test.go | 8 +++++--- web/web.go | 3 +++ 6 files changed, 47 insertions(+), 35 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 9ffe3a906..ab20d6594 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -236,7 +236,7 @@ func main() { a.Flag("alertmanager.timeout", "Timeout for sending alerts to Alertmanager."). Default("10s").SetValue(&cfg.notifierTimeout) - a.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations."). + a.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations and federation."). Default("5m").SetValue(&cfg.lookbackDelta) a.Flag("query.timeout", "Maximum time a query may take before being aborted."). @@ -321,7 +321,6 @@ func main() { } } - promql.LookbackDelta = time.Duration(cfg.lookbackDelta) promql.SetDefaultEvaluationInterval(time.Duration(config.DefaultGlobalConfig.EvaluationInterval)) // Above level 6, the k8s client would log bearer tokens in clear-text. @@ -360,6 +359,7 @@ func main() { MaxSamples: cfg.queryMaxSamples, Timeout: time.Duration(cfg.queryTimeout), ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), + LookbackDelta: time.Duration(cfg.lookbackDelta), } queryEngine = promql.NewEngine(opts) @@ -387,6 +387,7 @@ func main() { cfg.web.RuleManager = ruleManager cfg.web.Notifier = notifierManager cfg.web.TSDBCfg = cfg.tsdb + cfg.web.LookbackDelta = time.Duration(cfg.lookbackDelta) cfg.web.Version = &web.PrometheusVersion{ Version: version.Version, diff --git a/promql/engine.go b/promql/engine.go index 0add9e147..14b37d383 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -41,10 +41,11 @@ import ( ) const ( - namespace = "prometheus" - subsystem = "engine" - queryTag = "query" - env = "query execution" + namespace = "prometheus" + subsystem = "engine" + queryTag = "query" + env = "query execution" + defaultLookbackDelta = 5 * time.Minute // The largest SampleValue that can be converted to an int64 without overflow. maxInt64 = 9223372036854774784 @@ -53,10 +54,6 @@ const ( ) var ( - // LookbackDelta determines the time since the last sample after which a time - // series is considered stale. - LookbackDelta = 5 * time.Minute - // DefaultEvaluationInterval is the default evaluation interval of // a subquery in milliseconds. DefaultEvaluationInterval int64 @@ -220,6 +217,9 @@ type EngineOpts struct { MaxSamples int Timeout time.Duration ActiveQueryTracker *ActiveQueryTracker + // LookbackDelta determines the time since the last sample after which a time + // series is considered stale. + LookbackDelta time.Duration } // Engine handles the lifetime of queries from beginning to end. @@ -232,6 +232,7 @@ type Engine struct { activeQueryTracker *ActiveQueryTracker queryLogger QueryLogger queryLoggerLock sync.RWMutex + lookbackDelta time.Duration } // NewEngine returns a new engine. @@ -305,6 +306,13 @@ func NewEngine(opts EngineOpts) *Engine { metrics.maxConcurrentQueries.Set(-1) } + if opts.LookbackDelta == 0 { + opts.LookbackDelta = defaultLookbackDelta + if l := opts.Logger; l != nil { + level.Debug(l).Log("msg", "lookback delta is zero, setting to default value", "value", defaultLookbackDelta) + } + } + if opts.Reg != nil { opts.Reg.MustRegister( metrics.currentQueries, @@ -324,6 +332,7 @@ func NewEngine(opts EngineOpts) *Engine { metrics: metrics, maxSamplesPerQuery: opts.MaxSamples, activeQueryTracker: opts.ActiveQueryTracker, + lookbackDelta: opts.LookbackDelta, } } @@ -533,6 +542,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( maxSamples: ng.maxSamplesPerQuery, defaultEvalInterval: GetDefaultEvaluationInterval(), logger: ng.logger, + lookbackDelta: ng.lookbackDelta, } val, err := evaluator.Eval(s.Expr) @@ -571,7 +581,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( default: panic(errors.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type())) } - } // Range evaluation. @@ -583,6 +592,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( maxSamples: ng.maxSamplesPerQuery, defaultEvalInterval: GetDefaultEvaluationInterval(), logger: ng.logger, + lookbackDelta: ng.lookbackDelta, } val, err := evaluator.Eval(s.Expr) if err != nil { @@ -626,11 +636,11 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev subqOffset := ng.cumulativeSubqueryOffset(path) switch n := node.(type) { case *VectorSelector: - if maxOffset < LookbackDelta+subqOffset { - maxOffset = LookbackDelta + subqOffset + if maxOffset < ng.lookbackDelta+subqOffset { + maxOffset = ng.lookbackDelta + subqOffset } - if n.Offset+LookbackDelta+subqOffset > maxOffset { - maxOffset = n.Offset + LookbackDelta + subqOffset + if n.Offset+ng.lookbackDelta+subqOffset > maxOffset { + maxOffset = n.Offset + ng.lookbackDelta + subqOffset } case *MatrixSelector: if maxOffset < n.Range+subqOffset { @@ -677,7 +687,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev switch n := node.(type) { case *VectorSelector: if evalRange == 0 { - params.Start = params.Start - durationMilliseconds(LookbackDelta) + params.Start = params.Start - durationMilliseconds(ng.lookbackDelta) } else { params.Range = durationMilliseconds(evalRange) // For all matrix queries we want to ensure that we have (end-start) + range selected @@ -783,6 +793,7 @@ type evaluator struct { currentSamples int defaultEvalInterval int64 logger log.Logger + lookbackDelta time.Duration } // errorf causes a panic with the input formatted into an error. @@ -1272,7 +1283,7 @@ func (ev *evaluator) eval(expr Expr) Value { case *VectorSelector: checkForSeriesSetExpansion(ev.ctx, e) mat := make(Matrix, 0, len(e.series)) - it := storage.NewBuffer(durationMilliseconds(LookbackDelta)) + it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta)) for i, s := range e.series { it.Reset(s.Iterator()) ss := Series{ @@ -1318,6 +1329,7 @@ func (ev *evaluator) eval(expr Expr) Value { maxSamples: ev.maxSamples, defaultEvalInterval: ev.defaultEvalInterval, logger: ev.logger, + lookbackDelta: ev.lookbackDelta, } if e.Step != 0 { @@ -1353,7 +1365,7 @@ func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector { vec = make(Vector, 0, len(node.series)) ) - it := storage.NewBuffer(durationMilliseconds(LookbackDelta)) + it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta)) for i, s := range node.series { it.Reset(s.Iterator()) @@ -1392,7 +1404,7 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, no if !ok || t > refTime { t, v, ok = it.PeekBack(1) - if !ok || t < refTime-durationMilliseconds(LookbackDelta) { + if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) { return 0, 0, false } } diff --git a/promql/engine_test.go b/promql/engine_test.go index a13cdfd77..24d275564 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -257,19 +257,13 @@ func (*paramCheckerQuerier) Close() error { r func TestParamsSetCorrectly(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + LookbackDelta: 5 * time.Second, } - // Set the lookback to be smaller and reset at the end. - currLookback := LookbackDelta - LookbackDelta = 5 * time.Second - defer func() { - LookbackDelta = currLookback - }() - cases := []struct { query string diff --git a/web/federate.go b/web/federate.go index a90f84374..6a525d0a8 100644 --- a/web/federate.go +++ b/web/federate.go @@ -63,7 +63,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } var ( - mint = timestamp.FromTime(h.now().Time().Add(-promql.LookbackDelta)) + mint = timestamp.FromTime(h.now().Time().Add(-h.lookbackDelta)) maxt = timestamp.FromTime(h.now().Time()) format = expfmt.Negotiate(req.Header) enc = expfmt.NewEncoder(w, format) @@ -101,7 +101,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } set := storage.NewMergeSeriesSet(sets, nil) - it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6)) + it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) for set.Next() { s := set.At() diff --git a/web/federate_test.go b/web/federate_test.go index 956f1169a..558e38914 100644 --- a/web/federate_test.go +++ b/web/federate_test.go @@ -19,6 +19,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -198,9 +199,10 @@ func TestFederation(t *testing.T) { } h := &Handler{ - storage: suite.Storage(), - queryEngine: suite.QueryEngine(), - now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch. + storage: suite.Storage(), + queryEngine: suite.QueryEngine(), + lookbackDelta: 5 * time.Minute, + now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch. config: &config.Config{ GlobalConfig: config.GlobalConfig{}, }, diff --git a/web/web.go b/web/web.go index 284728ce6..732702a0b 100644 --- a/web/web.go +++ b/web/web.go @@ -179,6 +179,7 @@ type Handler struct { scrapeManager *scrape.Manager ruleManager *rules.Manager queryEngine *promql.Engine + lookbackDelta time.Duration context context.Context tsdb func() *tsdb.DB storage storage.Storage @@ -219,6 +220,7 @@ type Options struct { TSDBCfg prometheus_tsdb.Options Storage storage.Storage QueryEngine *promql.Engine + LookbackDelta time.Duration ScrapeManager *scrape.Manager RuleManager *rules.Manager Notifier *notifier.Manager @@ -281,6 +283,7 @@ func New(logger log.Logger, o *Options) *Handler { scrapeManager: o.ScrapeManager, ruleManager: o.RuleManager, queryEngine: o.QueryEngine, + lookbackDelta: o.LookbackDelta, tsdb: o.TSDB, storage: o.Storage, notifier: o.Notifier,