Make lookbackDelta a option of QueryEngine (#6746)

* Make lookbackDelta a option of QueryEngine

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>

* julius' suggestion

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>

* remove trivial getter

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>

* Assume lookback delta is always > 0

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>

* add debug log

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>

* don't expose loopback delta

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>

* Specify that lookack delta is also used in federation

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>

* Fix federation test

While we have added some logic to the promql engine to keep it backwards
compatible and have a 5 minute loopback by default, the web/ package is
likely to really be internal to Prometheus and we should not add the
same kind of heuritstics here.

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>

* loopback delta: Fix debug log

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
pull/6802/head
Julien Pivotto 5 years ago committed by GitHub
parent b00023344e
commit ff0003e072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -236,7 +236,7 @@ func main() {
a.Flag("alertmanager.timeout", "Timeout for sending alerts to Alertmanager."). a.Flag("alertmanager.timeout", "Timeout for sending alerts to Alertmanager.").
Default("10s").SetValue(&cfg.notifierTimeout) Default("10s").SetValue(&cfg.notifierTimeout)
a.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations."). a.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations and federation.").
Default("5m").SetValue(&cfg.lookbackDelta) Default("5m").SetValue(&cfg.lookbackDelta)
a.Flag("query.timeout", "Maximum time a query may take before being aborted."). a.Flag("query.timeout", "Maximum time a query may take before being aborted.").
@ -321,7 +321,6 @@ func main() {
} }
} }
promql.LookbackDelta = time.Duration(cfg.lookbackDelta)
promql.SetDefaultEvaluationInterval(time.Duration(config.DefaultGlobalConfig.EvaluationInterval)) promql.SetDefaultEvaluationInterval(time.Duration(config.DefaultGlobalConfig.EvaluationInterval))
// Above level 6, the k8s client would log bearer tokens in clear-text. // Above level 6, the k8s client would log bearer tokens in clear-text.
@ -360,6 +359,7 @@ func main() {
MaxSamples: cfg.queryMaxSamples, MaxSamples: cfg.queryMaxSamples,
Timeout: time.Duration(cfg.queryTimeout), Timeout: time.Duration(cfg.queryTimeout),
ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")),
LookbackDelta: time.Duration(cfg.lookbackDelta),
} }
queryEngine = promql.NewEngine(opts) queryEngine = promql.NewEngine(opts)
@ -387,6 +387,7 @@ func main() {
cfg.web.RuleManager = ruleManager cfg.web.RuleManager = ruleManager
cfg.web.Notifier = notifierManager cfg.web.Notifier = notifierManager
cfg.web.TSDBCfg = cfg.tsdb cfg.web.TSDBCfg = cfg.tsdb
cfg.web.LookbackDelta = time.Duration(cfg.lookbackDelta)
cfg.web.Version = &web.PrometheusVersion{ cfg.web.Version = &web.PrometheusVersion{
Version: version.Version, Version: version.Version,

@ -45,6 +45,7 @@ const (
subsystem = "engine" subsystem = "engine"
queryTag = "query" queryTag = "query"
env = "query execution" env = "query execution"
defaultLookbackDelta = 5 * time.Minute
// The largest SampleValue that can be converted to an int64 without overflow. // The largest SampleValue that can be converted to an int64 without overflow.
maxInt64 = 9223372036854774784 maxInt64 = 9223372036854774784
@ -53,10 +54,6 @@ const (
) )
var ( var (
// LookbackDelta determines the time since the last sample after which a time
// series is considered stale.
LookbackDelta = 5 * time.Minute
// DefaultEvaluationInterval is the default evaluation interval of // DefaultEvaluationInterval is the default evaluation interval of
// a subquery in milliseconds. // a subquery in milliseconds.
DefaultEvaluationInterval int64 DefaultEvaluationInterval int64
@ -220,6 +217,9 @@ type EngineOpts struct {
MaxSamples int MaxSamples int
Timeout time.Duration Timeout time.Duration
ActiveQueryTracker *ActiveQueryTracker ActiveQueryTracker *ActiveQueryTracker
// LookbackDelta determines the time since the last sample after which a time
// series is considered stale.
LookbackDelta time.Duration
} }
// Engine handles the lifetime of queries from beginning to end. // Engine handles the lifetime of queries from beginning to end.
@ -232,6 +232,7 @@ type Engine struct {
activeQueryTracker *ActiveQueryTracker activeQueryTracker *ActiveQueryTracker
queryLogger QueryLogger queryLogger QueryLogger
queryLoggerLock sync.RWMutex queryLoggerLock sync.RWMutex
lookbackDelta time.Duration
} }
// NewEngine returns a new engine. // NewEngine returns a new engine.
@ -305,6 +306,13 @@ func NewEngine(opts EngineOpts) *Engine {
metrics.maxConcurrentQueries.Set(-1) metrics.maxConcurrentQueries.Set(-1)
} }
if opts.LookbackDelta == 0 {
opts.LookbackDelta = defaultLookbackDelta
if l := opts.Logger; l != nil {
level.Debug(l).Log("msg", "lookback delta is zero, setting to default value", "value", defaultLookbackDelta)
}
}
if opts.Reg != nil { if opts.Reg != nil {
opts.Reg.MustRegister( opts.Reg.MustRegister(
metrics.currentQueries, metrics.currentQueries,
@ -324,6 +332,7 @@ func NewEngine(opts EngineOpts) *Engine {
metrics: metrics, metrics: metrics,
maxSamplesPerQuery: opts.MaxSamples, maxSamplesPerQuery: opts.MaxSamples,
activeQueryTracker: opts.ActiveQueryTracker, activeQueryTracker: opts.ActiveQueryTracker,
lookbackDelta: opts.LookbackDelta,
} }
} }
@ -533,6 +542,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
maxSamples: ng.maxSamplesPerQuery, maxSamples: ng.maxSamplesPerQuery,
defaultEvalInterval: GetDefaultEvaluationInterval(), defaultEvalInterval: GetDefaultEvaluationInterval(),
logger: ng.logger, logger: ng.logger,
lookbackDelta: ng.lookbackDelta,
} }
val, err := evaluator.Eval(s.Expr) val, err := evaluator.Eval(s.Expr)
@ -571,7 +581,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
default: default:
panic(errors.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type())) panic(errors.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type()))
} }
} }
// Range evaluation. // Range evaluation.
@ -583,6 +592,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
maxSamples: ng.maxSamplesPerQuery, maxSamples: ng.maxSamplesPerQuery,
defaultEvalInterval: GetDefaultEvaluationInterval(), defaultEvalInterval: GetDefaultEvaluationInterval(),
logger: ng.logger, logger: ng.logger,
lookbackDelta: ng.lookbackDelta,
} }
val, err := evaluator.Eval(s.Expr) val, err := evaluator.Eval(s.Expr)
if err != nil { if err != nil {
@ -626,11 +636,11 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
subqOffset := ng.cumulativeSubqueryOffset(path) subqOffset := ng.cumulativeSubqueryOffset(path)
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
if maxOffset < LookbackDelta+subqOffset { if maxOffset < ng.lookbackDelta+subqOffset {
maxOffset = LookbackDelta + subqOffset maxOffset = ng.lookbackDelta + subqOffset
} }
if n.Offset+LookbackDelta+subqOffset > maxOffset { if n.Offset+ng.lookbackDelta+subqOffset > maxOffset {
maxOffset = n.Offset + LookbackDelta + subqOffset maxOffset = n.Offset + ng.lookbackDelta + subqOffset
} }
case *MatrixSelector: case *MatrixSelector:
if maxOffset < n.Range+subqOffset { if maxOffset < n.Range+subqOffset {
@ -677,7 +687,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
if evalRange == 0 { if evalRange == 0 {
params.Start = params.Start - durationMilliseconds(LookbackDelta) params.Start = params.Start - durationMilliseconds(ng.lookbackDelta)
} else { } else {
params.Range = durationMilliseconds(evalRange) params.Range = durationMilliseconds(evalRange)
// For all matrix queries we want to ensure that we have (end-start) + range selected // For all matrix queries we want to ensure that we have (end-start) + range selected
@ -783,6 +793,7 @@ type evaluator struct {
currentSamples int currentSamples int
defaultEvalInterval int64 defaultEvalInterval int64
logger log.Logger logger log.Logger
lookbackDelta time.Duration
} }
// errorf causes a panic with the input formatted into an error. // errorf causes a panic with the input formatted into an error.
@ -1272,7 +1283,7 @@ func (ev *evaluator) eval(expr Expr) Value {
case *VectorSelector: case *VectorSelector:
checkForSeriesSetExpansion(ev.ctx, e) checkForSeriesSetExpansion(ev.ctx, e)
mat := make(Matrix, 0, len(e.series)) mat := make(Matrix, 0, len(e.series))
it := storage.NewBuffer(durationMilliseconds(LookbackDelta)) it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta))
for i, s := range e.series { for i, s := range e.series {
it.Reset(s.Iterator()) it.Reset(s.Iterator())
ss := Series{ ss := Series{
@ -1318,6 +1329,7 @@ func (ev *evaluator) eval(expr Expr) Value {
maxSamples: ev.maxSamples, maxSamples: ev.maxSamples,
defaultEvalInterval: ev.defaultEvalInterval, defaultEvalInterval: ev.defaultEvalInterval,
logger: ev.logger, logger: ev.logger,
lookbackDelta: ev.lookbackDelta,
} }
if e.Step != 0 { if e.Step != 0 {
@ -1353,7 +1365,7 @@ func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector {
vec = make(Vector, 0, len(node.series)) vec = make(Vector, 0, len(node.series))
) )
it := storage.NewBuffer(durationMilliseconds(LookbackDelta)) it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta))
for i, s := range node.series { for i, s := range node.series {
it.Reset(s.Iterator()) it.Reset(s.Iterator())
@ -1392,7 +1404,7 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, no
if !ok || t > refTime { if !ok || t > refTime {
t, v, ok = it.PeekBack(1) t, v, ok = it.PeekBack(1)
if !ok || t < refTime-durationMilliseconds(LookbackDelta) { if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) {
return 0, 0, false return 0, 0, false
} }
} }

@ -261,15 +261,9 @@ func TestParamsSetCorrectly(t *testing.T) {
Reg: nil, Reg: nil,
MaxSamples: 10, MaxSamples: 10,
Timeout: 10 * time.Second, Timeout: 10 * time.Second,
LookbackDelta: 5 * time.Second,
} }
// Set the lookback to be smaller and reset at the end.
currLookback := LookbackDelta
LookbackDelta = 5 * time.Second
defer func() {
LookbackDelta = currLookback
}()
cases := []struct { cases := []struct {
query string query string

@ -63,7 +63,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
} }
var ( var (
mint = timestamp.FromTime(h.now().Time().Add(-promql.LookbackDelta)) mint = timestamp.FromTime(h.now().Time().Add(-h.lookbackDelta))
maxt = timestamp.FromTime(h.now().Time()) maxt = timestamp.FromTime(h.now().Time())
format = expfmt.Negotiate(req.Header) format = expfmt.Negotiate(req.Header)
enc = expfmt.NewEncoder(w, format) enc = expfmt.NewEncoder(w, format)
@ -101,7 +101,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
} }
set := storage.NewMergeSeriesSet(sets, nil) set := storage.NewMergeSeriesSet(sets, nil)
it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6)) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
for set.Next() { for set.Next() {
s := set.At() s := set.At()

@ -19,6 +19,7 @@ import (
"sort" "sort"
"strings" "strings"
"testing" "testing"
"time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
@ -200,6 +201,7 @@ func TestFederation(t *testing.T) {
h := &Handler{ h := &Handler{
storage: suite.Storage(), storage: suite.Storage(),
queryEngine: suite.QueryEngine(), queryEngine: suite.QueryEngine(),
lookbackDelta: 5 * time.Minute,
now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch. now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch.
config: &config.Config{ config: &config.Config{
GlobalConfig: config.GlobalConfig{}, GlobalConfig: config.GlobalConfig{},

@ -179,6 +179,7 @@ type Handler struct {
scrapeManager *scrape.Manager scrapeManager *scrape.Manager
ruleManager *rules.Manager ruleManager *rules.Manager
queryEngine *promql.Engine queryEngine *promql.Engine
lookbackDelta time.Duration
context context.Context context context.Context
tsdb func() *tsdb.DB tsdb func() *tsdb.DB
storage storage.Storage storage storage.Storage
@ -219,6 +220,7 @@ type Options struct {
TSDBCfg prometheus_tsdb.Options TSDBCfg prometheus_tsdb.Options
Storage storage.Storage Storage storage.Storage
QueryEngine *promql.Engine QueryEngine *promql.Engine
LookbackDelta time.Duration
ScrapeManager *scrape.Manager ScrapeManager *scrape.Manager
RuleManager *rules.Manager RuleManager *rules.Manager
Notifier *notifier.Manager Notifier *notifier.Manager
@ -281,6 +283,7 @@ func New(logger log.Logger, o *Options) *Handler {
scrapeManager: o.ScrapeManager, scrapeManager: o.ScrapeManager,
ruleManager: o.RuleManager, ruleManager: o.RuleManager,
queryEngine: o.QueryEngine, queryEngine: o.QueryEngine,
lookbackDelta: o.LookbackDelta,
tsdb: o.TSDB, tsdb: o.TSDB,
storage: o.Storage, storage: o.Storage,
notifier: o.Notifier, notifier: o.Notifier,

Loading…
Cancel
Save