Browse Source

Merge pull request #12418 from bboreham/parsing-concurrency

promql: include parsing in active-query tracking
pull/12488/head
Bryan Boreham 1 year ago committed by GitHub
parent
commit
391ecaec9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 67
      promql/engine.go

67
promql/engine.go

@ -408,44 +408,50 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) {
} }
// NewInstantQuery returns an evaluation query for the given expression at the given time. // NewInstantQuery returns an evaluation query for the given expression at the given time.
func (ng *Engine) NewInstantQuery(_ context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) {
expr, err := parser.ParseExpr(qs) pExpr, qry := ng.newQuery(q, qs, opts, ts, ts, 0)
finishQueue, err := ng.queueActive(ctx, qry)
if err != nil { if err != nil {
return nil, err return nil, err
} }
qry, err := ng.newQuery(q, opts, expr, ts, ts, 0) defer finishQueue()
expr, err := parser.ParseExpr(qs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
qry.q = qs if err := ng.validateOpts(expr); err != nil {
return nil, err
}
*pExpr = PreprocessExpr(expr, ts, ts)
return qry, nil return qry, nil
} }
// NewRangeQuery returns an evaluation query for the given time range and with // NewRangeQuery returns an evaluation query for the given time range and with
// the resolution set by the interval. // the resolution set by the interval.
func (ng *Engine) NewRangeQuery(_ context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { func (ng *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) {
pExpr, qry := ng.newQuery(q, qs, opts, start, end, interval)
finishQueue, err := ng.queueActive(ctx, qry)
if err != nil {
return nil, err
}
defer finishQueue()
expr, err := parser.ParseExpr(qs) expr, err := parser.ParseExpr(qs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := ng.validateOpts(expr); err != nil {
return nil, err
}
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
return nil, fmt.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) return nil, fmt.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
} }
qry, err := ng.newQuery(q, opts, expr, start, end, interval) *pExpr = PreprocessExpr(expr, start, end)
if err != nil {
return nil, err
}
qry.q = qs
return qry, nil return qry, nil
} }
func (ng *Engine) newQuery(q storage.Queryable, opts *QueryOpts, expr parser.Expr, start, end time.Time, interval time.Duration) (*query, error) { func (ng *Engine) newQuery(q storage.Queryable, qs string, opts *QueryOpts, start, end time.Time, interval time.Duration) (*parser.Expr, *query) {
if err := ng.validateOpts(expr); err != nil {
return nil, err
}
// Default to empty QueryOpts if not provided. // Default to empty QueryOpts if not provided.
if opts == nil { if opts == nil {
opts = &QueryOpts{} opts = &QueryOpts{}
@ -457,20 +463,20 @@ func (ng *Engine) newQuery(q storage.Queryable, opts *QueryOpts, expr parser.Exp
} }
es := &parser.EvalStmt{ es := &parser.EvalStmt{
Expr: PreprocessExpr(expr, start, end),
Start: start, Start: start,
End: end, End: end,
Interval: interval, Interval: interval,
LookbackDelta: lookbackDelta, LookbackDelta: lookbackDelta,
} }
qry := &query{ qry := &query{
q: qs,
stmt: es, stmt: es,
ng: ng, ng: ng,
stats: stats.NewQueryTimers(), stats: stats.NewQueryTimers(),
sampleStats: stats.NewQuerySamples(ng.enablePerStepStats && opts.EnablePerStepStats), sampleStats: stats.NewQuerySamples(ng.enablePerStepStats && opts.EnablePerStepStats),
queryable: q, queryable: q,
} }
return qry, nil return &es.Expr, qry
} }
var ( var (
@ -589,18 +595,11 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storag
execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime) execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime)
defer execSpanTimer.Finish() defer execSpanTimer.Finish()
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) finishQueue, err := ng.queueActive(ctx, q)
// Log query in active log. The active log guarantees that we don't run over if err != nil {
// MaxConcurrent queries. return nil, nil, err
if ng.activeQueryTracker != nil {
queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q)
if err != nil {
queueSpanTimer.Finish()
return nil, nil, contextErr(err, "query queue")
}
defer ng.activeQueryTracker.Delete(queryIndex)
} }
queueSpanTimer.Finish() defer finishQueue()
// Cancel when execution is done or an error was raised. // Cancel when execution is done or an error was raised.
defer q.cancel() defer q.cancel()
@ -623,6 +622,18 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storag
panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement())) panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
} }
// Log query in active log. The active log guarantees that we don't run over
// MaxConcurrent queries.
func (ng *Engine) queueActive(ctx context.Context, q *query) (func(), error) {
if ng.activeQueryTracker == nil {
return func() {}, nil
}
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q)
queueSpanTimer.Finish()
return func() { ng.activeQueryTracker.Delete(queryIndex) }, err
}
func timeMilliseconds(t time.Time) int64 { func timeMilliseconds(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond) return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
} }

Loading…
Cancel
Save