mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2135 lines
63 KiB
2135 lines
63 KiB
// Copyright 2013 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package promql |
|
|
|
import ( |
|
"bytes" |
|
"container/heap" |
|
"context" |
|
"fmt" |
|
"math" |
|
"regexp" |
|
"runtime" |
|
"sort" |
|
"strconv" |
|
"sync" |
|
"time" |
|
|
|
"github.com/go-kit/kit/log" |
|
"github.com/go-kit/kit/log/level" |
|
"github.com/opentracing/opentracing-go" |
|
"github.com/pkg/errors" |
|
"github.com/prometheus/client_golang/prometheus" |
|
"github.com/prometheus/common/model" |
|
"github.com/uber/jaeger-client-go" |
|
|
|
"github.com/prometheus/prometheus/pkg/labels" |
|
"github.com/prometheus/prometheus/pkg/timestamp" |
|
"github.com/prometheus/prometheus/pkg/value" |
|
"github.com/prometheus/prometheus/promql/parser" |
|
"github.com/prometheus/prometheus/storage" |
|
"github.com/prometheus/prometheus/util/stats" |
|
) |
|
|
|
const ( |
|
namespace = "prometheus" |
|
subsystem = "engine" |
|
queryTag = "query" |
|
env = "query execution" |
|
defaultLookbackDelta = 5 * time.Minute |
|
|
|
// The largest SampleValue that can be converted to an int64 without overflow. |
|
maxInt64 = 9223372036854774784 |
|
// The smallest SampleValue that can be converted to an int64 without underflow. |
|
minInt64 = -9223372036854775808 |
|
) |
|
|
|
type engineMetrics struct { |
|
currentQueries prometheus.Gauge |
|
maxConcurrentQueries prometheus.Gauge |
|
queryLogEnabled prometheus.Gauge |
|
queryLogFailures prometheus.Counter |
|
queryQueueTime prometheus.Observer |
|
queryPrepareTime prometheus.Observer |
|
queryInnerEval prometheus.Observer |
|
queryResultSort prometheus.Observer |
|
} |
|
|
|
// convertibleToInt64 returns true if v does not over-/underflow an int64. |
|
func convertibleToInt64(v float64) bool { |
|
return v <= maxInt64 && v >= minInt64 |
|
} |
|
|
|
type ( |
|
// ErrQueryTimeout is returned if a query timed out during processing. |
|
ErrQueryTimeout string |
|
// ErrQueryCanceled is returned if a query was canceled during processing. |
|
ErrQueryCanceled string |
|
// ErrTooManySamples is returned if a query would load more than the maximum allowed samples into memory. |
|
ErrTooManySamples string |
|
// ErrStorage is returned if an error was encountered in the storage layer |
|
// during query handling. |
|
ErrStorage struct{ Err error } |
|
) |
|
|
|
func (e ErrQueryTimeout) Error() string { |
|
return fmt.Sprintf("query timed out in %s", string(e)) |
|
} |
|
func (e ErrQueryCanceled) Error() string { |
|
return fmt.Sprintf("query was canceled in %s", string(e)) |
|
} |
|
func (e ErrTooManySamples) Error() string { |
|
return fmt.Sprintf("query processing would load too many samples into memory in %s", string(e)) |
|
} |
|
func (e ErrStorage) Error() string { |
|
return e.Err.Error() |
|
} |
|
|
|
// QueryLogger is an interface that can be used to log all the queries logged |
|
// by the engine. |
|
type QueryLogger interface { |
|
Log(...interface{}) error |
|
Close() error |
|
} |
|
|
|
// A Query is derived from an a raw query string and can be run against an engine |
|
// it is associated with. |
|
type Query interface { |
|
// Exec processes the query. Can only be called once. |
|
Exec(ctx context.Context) *Result |
|
// Close recovers memory used by the query result. |
|
Close() |
|
// Statement returns the parsed statement of the query. |
|
Statement() parser.Statement |
|
// Stats returns statistics about the lifetime of the query. |
|
Stats() *stats.QueryTimers |
|
// Cancel signals that a running query execution should be aborted. |
|
Cancel() |
|
} |
|
|
|
// query implements the Query interface. |
|
type query struct { |
|
// Underlying data provider. |
|
queryable storage.Queryable |
|
// The original query string. |
|
q string |
|
// Statement of the parsed query. |
|
stmt parser.Statement |
|
// Timer stats for the query execution. |
|
stats *stats.QueryTimers |
|
// Result matrix for reuse. |
|
matrix Matrix |
|
// Cancellation function for the query. |
|
cancel func() |
|
|
|
// The engine against which the query is executed. |
|
ng *Engine |
|
} |
|
|
|
type queryOrigin struct{} |
|
|
|
// Statement implements the Query interface. |
|
func (q *query) Statement() parser.Statement { |
|
return q.stmt |
|
} |
|
|
|
// Stats implements the Query interface. |
|
func (q *query) Stats() *stats.QueryTimers { |
|
return q.stats |
|
} |
|
|
|
// Cancel implements the Query interface. |
|
func (q *query) Cancel() { |
|
if q.cancel != nil { |
|
q.cancel() |
|
} |
|
} |
|
|
|
// Close implements the Query interface. |
|
func (q *query) Close() { |
|
for _, s := range q.matrix { |
|
putPointSlice(s.Points) |
|
} |
|
} |
|
|
|
// Exec implements the Query interface. |
|
func (q *query) Exec(ctx context.Context) *Result { |
|
if span := opentracing.SpanFromContext(ctx); span != nil { |
|
span.SetTag(queryTag, q.stmt.String()) |
|
} |
|
|
|
// Exec query. |
|
res, warnings, err := q.ng.exec(ctx, q) |
|
|
|
return &Result{Err: err, Value: res, Warnings: warnings} |
|
} |
|
|
|
// contextDone returns an error if the context was canceled or timed out. |
|
func contextDone(ctx context.Context, env string) error { |
|
if err := ctx.Err(); err != nil { |
|
return contextErr(err, env) |
|
} |
|
return nil |
|
} |
|
|
|
func contextErr(err error, env string) error { |
|
switch err { |
|
case context.Canceled: |
|
return ErrQueryCanceled(env) |
|
case context.DeadlineExceeded: |
|
return ErrQueryTimeout(env) |
|
default: |
|
return err |
|
} |
|
} |
|
|
|
// EngineOpts contains configuration options used when creating a new Engine. |
|
type EngineOpts struct { |
|
Logger log.Logger |
|
Reg prometheus.Registerer |
|
MaxSamples int |
|
Timeout time.Duration |
|
ActiveQueryTracker *ActiveQueryTracker |
|
// LookbackDelta determines the time since the last sample after which a time |
|
// series is considered stale. |
|
LookbackDelta time.Duration |
|
|
|
// NoStepSubqueryIntervalFn is the default evaluation interval of |
|
// a subquery in milliseconds if no step in range vector was specified `[30m:<step>]`. |
|
NoStepSubqueryIntervalFn func(rangeMillis int64) int64 |
|
} |
|
|
|
// Engine handles the lifetime of queries from beginning to end. |
|
// It is connected to a querier. |
|
type Engine struct { |
|
logger log.Logger |
|
metrics *engineMetrics |
|
timeout time.Duration |
|
maxSamplesPerQuery int |
|
activeQueryTracker *ActiveQueryTracker |
|
queryLogger QueryLogger |
|
queryLoggerLock sync.RWMutex |
|
lookbackDelta time.Duration |
|
noStepSubqueryIntervalFn func(rangeMillis int64) int64 |
|
} |
|
|
|
// NewEngine returns a new engine. |
|
func NewEngine(opts EngineOpts) *Engine { |
|
if opts.Logger == nil { |
|
opts.Logger = log.NewNopLogger() |
|
} |
|
|
|
queryResultSummary := prometheus.NewSummaryVec(prometheus.SummaryOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "query_duration_seconds", |
|
Help: "Query timings", |
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, |
|
}, |
|
[]string{"slice"}, |
|
) |
|
|
|
metrics := &engineMetrics{ |
|
currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "queries", |
|
Help: "The current number of queries being executed or waiting.", |
|
}), |
|
queryLogEnabled: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "query_log_enabled", |
|
Help: "State of the query log.", |
|
}), |
|
queryLogFailures: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "query_log_failures_total", |
|
Help: "The number of query log failures.", |
|
}), |
|
maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "queries_concurrent_max", |
|
Help: "The max number of concurrent queries.", |
|
}), |
|
queryQueueTime: queryResultSummary.WithLabelValues("queue_time"), |
|
queryPrepareTime: queryResultSummary.WithLabelValues("prepare_time"), |
|
queryInnerEval: queryResultSummary.WithLabelValues("inner_eval"), |
|
queryResultSort: queryResultSummary.WithLabelValues("result_sort"), |
|
} |
|
|
|
if t := opts.ActiveQueryTracker; t != nil { |
|
metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent())) |
|
} else { |
|
metrics.maxConcurrentQueries.Set(-1) |
|
} |
|
|
|
if opts.LookbackDelta == 0 { |
|
opts.LookbackDelta = defaultLookbackDelta |
|
if l := opts.Logger; l != nil { |
|
level.Debug(l).Log("msg", "Lookback delta is zero, setting to default value", "value", defaultLookbackDelta) |
|
} |
|
} |
|
|
|
if opts.Reg != nil { |
|
opts.Reg.MustRegister( |
|
metrics.currentQueries, |
|
metrics.maxConcurrentQueries, |
|
metrics.queryLogEnabled, |
|
metrics.queryLogFailures, |
|
queryResultSummary, |
|
) |
|
} |
|
|
|
return &Engine{ |
|
timeout: opts.Timeout, |
|
logger: opts.Logger, |
|
metrics: metrics, |
|
maxSamplesPerQuery: opts.MaxSamples, |
|
activeQueryTracker: opts.ActiveQueryTracker, |
|
lookbackDelta: opts.LookbackDelta, |
|
noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn, |
|
} |
|
} |
|
|
|
// SetQueryLogger sets the query logger. |
|
func (ng *Engine) SetQueryLogger(l QueryLogger) { |
|
ng.queryLoggerLock.Lock() |
|
defer ng.queryLoggerLock.Unlock() |
|
|
|
if ng.queryLogger != nil { |
|
// An error closing the old file descriptor should |
|
// not make reload fail; only log a warning. |
|
err := ng.queryLogger.Close() |
|
if err != nil { |
|
level.Warn(ng.logger).Log("msg", "Error while closing the previous query log file", "err", err) |
|
} |
|
} |
|
|
|
ng.queryLogger = l |
|
|
|
if l != nil { |
|
ng.metrics.queryLogEnabled.Set(1) |
|
} else { |
|
ng.metrics.queryLogEnabled.Set(0) |
|
} |
|
} |
|
|
|
// NewInstantQuery returns an evaluation query for the given expression at the given time. |
|
func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) { |
|
expr, err := parser.ParseExpr(qs) |
|
if err != nil { |
|
return nil, err |
|
} |
|
qry := ng.newQuery(q, expr, ts, ts, 0) |
|
qry.q = qs |
|
|
|
return qry, nil |
|
} |
|
|
|
// NewRangeQuery returns an evaluation query for the given time range and with |
|
// the resolution set by the interval. |
|
func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (Query, error) { |
|
expr, err := parser.ParseExpr(qs) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { |
|
return nil, errors.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) |
|
} |
|
qry := ng.newQuery(q, expr, start, end, interval) |
|
qry.q = qs |
|
|
|
return qry, nil |
|
} |
|
|
|
func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end time.Time, interval time.Duration) *query { |
|
es := &parser.EvalStmt{ |
|
Expr: expr, |
|
Start: start, |
|
End: end, |
|
Interval: interval, |
|
} |
|
qry := &query{ |
|
stmt: es, |
|
ng: ng, |
|
stats: stats.NewQueryTimers(), |
|
queryable: q, |
|
} |
|
return qry |
|
} |
|
|
|
func (ng *Engine) newTestQuery(f func(context.Context) error) Query { |
|
qry := &query{ |
|
q: "test statement", |
|
stmt: parser.TestStmt(f), |
|
ng: ng, |
|
stats: stats.NewQueryTimers(), |
|
} |
|
return qry |
|
} |
|
|
|
// exec executes the query. |
|
// |
|
// At this point per query only one EvalStmt is evaluated. Alert and record |
|
// statements are not handled by the Engine. |
|
func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storage.Warnings, err error) { |
|
ng.metrics.currentQueries.Inc() |
|
defer ng.metrics.currentQueries.Dec() |
|
|
|
ctx, cancel := context.WithTimeout(ctx, ng.timeout) |
|
q.cancel = cancel |
|
|
|
defer func() { |
|
ng.queryLoggerLock.RLock() |
|
if l := ng.queryLogger; l != nil { |
|
params := make(map[string]interface{}, 4) |
|
params["query"] = q.q |
|
if eq, ok := q.Statement().(*parser.EvalStmt); ok { |
|
params["start"] = formatDate(eq.Start) |
|
params["end"] = formatDate(eq.End) |
|
// The step provided by the user is in seconds. |
|
params["step"] = int64(eq.Interval / (time.Second / time.Nanosecond)) |
|
} |
|
f := []interface{}{"params", params} |
|
if err != nil { |
|
f = append(f, "error", err) |
|
} |
|
f = append(f, "stats", stats.NewQueryStats(q.Stats())) |
|
if span := opentracing.SpanFromContext(ctx); span != nil { |
|
if spanCtx, ok := span.Context().(jaeger.SpanContext); ok { |
|
f = append(f, "spanID", spanCtx.SpanID()) |
|
} |
|
} |
|
if origin := ctx.Value(queryOrigin{}); origin != nil { |
|
for k, v := range origin.(map[string]interface{}) { |
|
f = append(f, k, v) |
|
} |
|
} |
|
if err := l.Log(f...); err != nil { |
|
ng.metrics.queryLogFailures.Inc() |
|
level.Error(ng.logger).Log("msg", "can't log query", "err", err) |
|
} |
|
} |
|
ng.queryLoggerLock.RUnlock() |
|
}() |
|
|
|
execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime) |
|
defer execSpanTimer.Finish() |
|
|
|
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) |
|
// Log query in active log. The active log guarantees that we don't run over |
|
// MaxConcurrent queries. |
|
if ng.activeQueryTracker != nil { |
|
queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q) |
|
if err != nil { |
|
queueSpanTimer.Finish() |
|
return nil, nil, contextErr(err, "query queue") |
|
} |
|
defer ng.activeQueryTracker.Delete(queryIndex) |
|
} |
|
queueSpanTimer.Finish() |
|
|
|
// Cancel when execution is done or an error was raised. |
|
defer q.cancel() |
|
|
|
const env = "query execution" |
|
|
|
evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime) |
|
defer evalSpanTimer.Finish() |
|
|
|
// The base context might already be canceled on the first iteration (e.g. during shutdown). |
|
if err := contextDone(ctx, env); err != nil { |
|
return nil, nil, err |
|
} |
|
|
|
switch s := q.Statement().(type) { |
|
case *parser.EvalStmt: |
|
return ng.execEvalStmt(ctx, q, s) |
|
case parser.TestStmt: |
|
return nil, nil, s(ctx) |
|
} |
|
|
|
panic(errors.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement())) |
|
} |
|
|
|
func timeMilliseconds(t time.Time) int64 { |
|
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond) |
|
} |
|
|
|
func durationMilliseconds(d time.Duration) int64 { |
|
return int64(d / (time.Millisecond / time.Nanosecond)) |
|
} |
|
|
|
// execEvalStmt evaluates the expression of an evaluation statement for the given time range. |
|
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, storage.Warnings, error) { |
|
prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime) |
|
mint := ng.findMinTime(s) |
|
querier, err := query.queryable.Querier(ctxPrepare, timestamp.FromTime(mint), timestamp.FromTime(s.End)) |
|
if err != nil { |
|
prepareSpanTimer.Finish() |
|
return nil, nil, err |
|
} |
|
defer querier.Close() |
|
|
|
ng.populateSeries(querier, s) |
|
prepareSpanTimer.Finish() |
|
|
|
evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval) |
|
// Instant evaluation. This is executed as a range evaluation with one step. |
|
if s.Start == s.End && s.Interval == 0 { |
|
start := timeMilliseconds(s.Start) |
|
evaluator := &evaluator{ |
|
startTimestamp: start, |
|
endTimestamp: start, |
|
interval: 1, |
|
ctx: ctxInnerEval, |
|
maxSamples: ng.maxSamplesPerQuery, |
|
logger: ng.logger, |
|
lookbackDelta: ng.lookbackDelta, |
|
noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, |
|
} |
|
|
|
val, warnings, err := evaluator.Eval(s.Expr) |
|
if err != nil { |
|
return nil, warnings, err |
|
} |
|
|
|
evalSpanTimer.Finish() |
|
|
|
var mat Matrix |
|
|
|
switch result := val.(type) { |
|
case Matrix: |
|
mat = result |
|
case String: |
|
return result, warnings, nil |
|
default: |
|
panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) |
|
} |
|
|
|
query.matrix = mat |
|
switch s.Expr.Type() { |
|
case parser.ValueTypeVector: |
|
// Convert matrix with one value per series into vector. |
|
vector := make(Vector, len(mat)) |
|
for i, s := range mat { |
|
// Point might have a different timestamp, force it to the evaluation |
|
// timestamp as that is when we ran the evaluation. |
|
vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}} |
|
} |
|
return vector, warnings, nil |
|
case parser.ValueTypeScalar: |
|
return Scalar{V: mat[0].Points[0].V, T: start}, warnings, nil |
|
case parser.ValueTypeMatrix: |
|
return mat, warnings, nil |
|
default: |
|
panic(errors.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type())) |
|
} |
|
} |
|
|
|
// Range evaluation. |
|
evaluator := &evaluator{ |
|
startTimestamp: timeMilliseconds(s.Start), |
|
endTimestamp: timeMilliseconds(s.End), |
|
interval: durationMilliseconds(s.Interval), |
|
ctx: ctxInnerEval, |
|
maxSamples: ng.maxSamplesPerQuery, |
|
logger: ng.logger, |
|
lookbackDelta: ng.lookbackDelta, |
|
noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, |
|
} |
|
val, warnings, err := evaluator.Eval(s.Expr) |
|
if err != nil { |
|
return nil, warnings, err |
|
} |
|
evalSpanTimer.Finish() |
|
|
|
mat, ok := val.(Matrix) |
|
if !ok { |
|
panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) |
|
} |
|
query.matrix = mat |
|
|
|
if err := contextDone(ctx, "expression evaluation"); err != nil { |
|
return nil, warnings, err |
|
} |
|
|
|
// TODO(fabxc): where to ensure metric labels are a copy from the storage internals. |
|
sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort) |
|
sort.Sort(mat) |
|
sortSpanTimer.Finish() |
|
|
|
return mat, warnings, nil |
|
} |
|
|
|
// cumulativeSubqueryOffset returns the sum of range and offset of all subqueries in the path. |
|
func (ng *Engine) cumulativeSubqueryOffset(path []parser.Node) time.Duration { |
|
var subqOffset time.Duration |
|
for _, node := range path { |
|
switch n := node.(type) { |
|
case *parser.SubqueryExpr: |
|
subqOffset += n.Range + n.Offset |
|
} |
|
} |
|
return subqOffset |
|
} |
|
|
|
func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time { |
|
var maxOffset time.Duration |
|
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { |
|
subqOffset := ng.cumulativeSubqueryOffset(path) |
|
switch n := node.(type) { |
|
case *parser.VectorSelector: |
|
if maxOffset < ng.lookbackDelta+subqOffset { |
|
maxOffset = ng.lookbackDelta + subqOffset |
|
} |
|
if n.Offset+ng.lookbackDelta+subqOffset > maxOffset { |
|
maxOffset = n.Offset + ng.lookbackDelta + subqOffset |
|
} |
|
case *parser.MatrixSelector: |
|
if maxOffset < n.Range+subqOffset { |
|
maxOffset = n.Range + subqOffset |
|
} |
|
if m := n.VectorSelector.(*parser.VectorSelector).Offset + n.Range + subqOffset; m > maxOffset { |
|
maxOffset = m |
|
} |
|
} |
|
return nil |
|
}) |
|
return s.Start.Add(-maxOffset) |
|
} |
|
|
|
func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) { |
|
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. |
|
// The evaluation of the VectorSelector inside then evaluates the given range and unsets |
|
// the variable. |
|
var evalRange time.Duration |
|
|
|
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { |
|
switch n := node.(type) { |
|
case *parser.VectorSelector: |
|
hints := &storage.SelectHints{ |
|
Start: timestamp.FromTime(s.Start), |
|
End: timestamp.FromTime(s.End), |
|
Step: durationMilliseconds(s.Interval), |
|
} |
|
|
|
// We need to make sure we select the timerange selected by the subquery. |
|
// The cumulativeSubqueryOffset function gives the sum of range and the offset. |
|
// TODO(gouthamve): Consider optimising it by separating out the range and offsets, and subtracting the offsets |
|
// from end also. See: https://github.com/prometheus/prometheus/issues/7629. |
|
// TODO(bwplotka): Add support for better hints when subquerying. See: https://github.com/prometheus/prometheus/issues/7630. |
|
subqOffset := ng.cumulativeSubqueryOffset(path) |
|
offsetMilliseconds := durationMilliseconds(subqOffset) |
|
hints.Start = hints.Start - offsetMilliseconds |
|
|
|
if evalRange == 0 { |
|
hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta) |
|
} else { |
|
hints.Range = durationMilliseconds(evalRange) |
|
// For all matrix queries we want to ensure that we have (end-start) + range selected |
|
// this way we have `range` data before the start time |
|
hints.Start = hints.Start - durationMilliseconds(evalRange) |
|
evalRange = 0 |
|
} |
|
|
|
hints.Func = extractFuncFromPath(path) |
|
hints.By, hints.Grouping = extractGroupsFromPath(path) |
|
if n.Offset > 0 { |
|
offsetMilliseconds := durationMilliseconds(n.Offset) |
|
hints.Start = hints.Start - offsetMilliseconds |
|
hints.End = hints.End - offsetMilliseconds |
|
} |
|
|
|
n.UnexpandedSeriesSet = querier.Select(false, hints, n.LabelMatchers...) |
|
case *parser.MatrixSelector: |
|
evalRange = n.Range |
|
} |
|
return nil |
|
}) |
|
} |
|
|
|
// extractFuncFromPath walks up the path and searches for the first instance of |
|
// a function or aggregation. |
|
func extractFuncFromPath(p []parser.Node) string { |
|
if len(p) == 0 { |
|
return "" |
|
} |
|
switch n := p[len(p)-1].(type) { |
|
case *parser.AggregateExpr: |
|
return n.Op.String() |
|
case *parser.Call: |
|
return n.Func.Name |
|
case *parser.BinaryExpr: |
|
// If we hit a binary expression we terminate since we only care about functions |
|
// or aggregations over a single metric. |
|
return "" |
|
} |
|
return extractFuncFromPath(p[:len(p)-1]) |
|
} |
|
|
|
// extractGroupsFromPath parses vector outer function and extracts grouping information if by or without was used. |
|
func extractGroupsFromPath(p []parser.Node) (bool, []string) { |
|
if len(p) == 0 { |
|
return false, nil |
|
} |
|
switch n := p[len(p)-1].(type) { |
|
case *parser.AggregateExpr: |
|
return !n.Without, n.Grouping |
|
} |
|
return false, nil |
|
} |
|
|
|
func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (storage.Warnings, error) { |
|
switch e := expr.(type) { |
|
case *parser.MatrixSelector: |
|
return checkAndExpandSeriesSet(ctx, e.VectorSelector) |
|
case *parser.VectorSelector: |
|
if e.Series != nil { |
|
return nil, nil |
|
} |
|
series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet) |
|
e.Series = series |
|
return ws, err |
|
} |
|
return nil, nil |
|
} |
|
|
|
func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, ws storage.Warnings, err error) { |
|
for it.Next() { |
|
select { |
|
case <-ctx.Done(): |
|
return nil, nil, ctx.Err() |
|
default: |
|
} |
|
res = append(res, it.At()) |
|
} |
|
return res, it.Warnings(), it.Err() |
|
} |
|
|
|
type errWithWarnings struct { |
|
err error |
|
warnings storage.Warnings |
|
} |
|
|
|
func (e errWithWarnings) Error() string { return e.err.Error() } |
|
|
|
// An evaluator evaluates given expressions over given fixed timestamps. It |
|
// is attached to an engine through which it connects to a querier and reports |
|
// errors. On timeout or cancellation of its context it terminates. |
|
type evaluator struct { |
|
ctx context.Context |
|
|
|
startTimestamp int64 // Start time in milliseconds. |
|
endTimestamp int64 // End time in milliseconds. |
|
interval int64 // Interval in milliseconds. |
|
|
|
maxSamples int |
|
currentSamples int |
|
logger log.Logger |
|
lookbackDelta time.Duration |
|
noStepSubqueryIntervalFn func(rangeMillis int64) int64 |
|
} |
|
|
|
// errorf causes a panic with the input formatted into an error. |
|
func (ev *evaluator) errorf(format string, args ...interface{}) { |
|
ev.error(errors.Errorf(format, args...)) |
|
} |
|
|
|
// error causes a panic with the given error. |
|
func (ev *evaluator) error(err error) { |
|
panic(err) |
|
} |
|
|
|
// recover is the handler that turns panics into returns from the top level of evaluation. |
|
func (ev *evaluator) recover(ws *storage.Warnings, errp *error) { |
|
e := recover() |
|
if e == nil { |
|
return |
|
} |
|
|
|
switch err := e.(type) { |
|
case runtime.Error: |
|
// Print the stack trace but do not inhibit the running application. |
|
buf := make([]byte, 64<<10) |
|
buf = buf[:runtime.Stack(buf, false)] |
|
|
|
level.Error(ev.logger).Log("msg", "runtime panic in parser", "err", e, "stacktrace", string(buf)) |
|
*errp = errors.Wrap(err, "unexpected error") |
|
case errWithWarnings: |
|
*errp = err.err |
|
*ws = append(*ws, err.warnings...) |
|
default: |
|
*errp = e.(error) |
|
} |
|
} |
|
|
|
func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws storage.Warnings, err error) { |
|
defer ev.recover(&ws, &err) |
|
|
|
v, ws = ev.eval(expr) |
|
return v, ws, nil |
|
} |
|
|
|
// EvalNodeHelper stores extra information and caches for evaluating a single node across steps. |
|
type EvalNodeHelper struct { |
|
// Evaluation timestamp. |
|
ts int64 |
|
// Vector that can be used for output. |
|
out Vector |
|
|
|
// Caches. |
|
// dropMetricName and label_*. |
|
dmn map[uint64]labels.Labels |
|
// signatureFunc. |
|
sigf map[string]string |
|
// funcHistogramQuantile. |
|
signatureToMetricWithBuckets map[string]*metricWithBuckets |
|
// label_replace. |
|
regex *regexp.Regexp |
|
|
|
lb *labels.Builder |
|
lblBuf []byte |
|
lblResultBuf []byte |
|
|
|
// For binary vector matching. |
|
rightSigs map[string]Sample |
|
matchedSigs map[string]map[uint64]struct{} |
|
resultMetric map[string]labels.Labels |
|
} |
|
|
|
// dropMetricName is a cached version of dropMetricName. |
|
func (enh *EvalNodeHelper) dropMetricName(l labels.Labels) labels.Labels { |
|
if enh.dmn == nil { |
|
enh.dmn = make(map[uint64]labels.Labels, len(enh.out)) |
|
} |
|
h := l.Hash() |
|
ret, ok := enh.dmn[h] |
|
if ok { |
|
return ret |
|
} |
|
ret = dropMetricName(l) |
|
enh.dmn[h] = ret |
|
return ret |
|
} |
|
|
|
func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) string { |
|
if enh.sigf == nil { |
|
enh.sigf = make(map[string]string, len(enh.out)) |
|
} |
|
f := signatureFunc(on, enh.lblBuf, names...) |
|
return func(l labels.Labels) string { |
|
enh.lblBuf = l.Bytes(enh.lblBuf) |
|
ret, ok := enh.sigf[string(enh.lblBuf)] |
|
if ok { |
|
return ret |
|
} |
|
ret = f(l) |
|
enh.sigf[string(enh.lblBuf)] = ret |
|
return ret |
|
} |
|
} |
|
|
|
// rangeEval evaluates the given expressions, and then for each step calls |
|
// the given function with the values computed for each expression at that |
|
// step. The return value is the combination into time series of all the |
|
// function call results. |
|
func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) { |
|
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 |
|
matrixes := make([]Matrix, len(exprs)) |
|
origMatrixes := make([]Matrix, len(exprs)) |
|
originalNumSamples := ev.currentSamples |
|
|
|
var warnings storage.Warnings |
|
for i, e := range exprs { |
|
// Functions will take string arguments from the expressions, not the values. |
|
if e != nil && e.Type() != parser.ValueTypeString { |
|
// ev.currentSamples will be updated to the correct value within the ev.eval call. |
|
val, ws := ev.eval(e) |
|
warnings = append(warnings, ws...) |
|
matrixes[i] = val.(Matrix) |
|
|
|
// Keep a copy of the original point slices so that they |
|
// can be returned to the pool. |
|
origMatrixes[i] = make(Matrix, len(matrixes[i])) |
|
copy(origMatrixes[i], matrixes[i]) |
|
} |
|
} |
|
|
|
vectors := make([]Vector, len(exprs)) // Input vectors for the function. |
|
args := make([]parser.Value, len(exprs)) // Argument to function. |
|
// Create an output vector that is as big as the input matrix with |
|
// the most time series. |
|
biggestLen := 1 |
|
for i := range exprs { |
|
vectors[i] = make(Vector, 0, len(matrixes[i])) |
|
if len(matrixes[i]) > biggestLen { |
|
biggestLen = len(matrixes[i]) |
|
} |
|
} |
|
enh := &EvalNodeHelper{out: make(Vector, 0, biggestLen)} |
|
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. |
|
tempNumSamples := ev.currentSamples |
|
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { |
|
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { |
|
ev.error(err) |
|
} |
|
// Reset number of samples in memory after each timestamp. |
|
ev.currentSamples = tempNumSamples |
|
// Gather input vectors for this timestamp. |
|
for i := range exprs { |
|
vectors[i] = vectors[i][:0] |
|
for si, series := range matrixes[i] { |
|
for _, point := range series.Points { |
|
if point.T == ts { |
|
if ev.currentSamples < ev.maxSamples { |
|
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point}) |
|
// Move input vectors forward so we don't have to re-scan the same |
|
// past points at the next step. |
|
matrixes[i][si].Points = series.Points[1:] |
|
ev.currentSamples++ |
|
} else { |
|
ev.error(ErrTooManySamples(env)) |
|
} |
|
} |
|
break |
|
} |
|
} |
|
args[i] = vectors[i] |
|
} |
|
// Make the function call. |
|
enh.ts = ts |
|
result, ws := f(args, enh) |
|
if result.ContainsSameLabelset() { |
|
ev.errorf("vector cannot contain metrics with the same labelset") |
|
} |
|
enh.out = result[:0] // Reuse result vector. |
|
warnings = append(warnings, ws...) |
|
|
|
ev.currentSamples += len(result) |
|
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also |
|
// needs to include the samples from the result here, as they're still in memory. |
|
tempNumSamples += len(result) |
|
|
|
if ev.currentSamples > ev.maxSamples { |
|
ev.error(ErrTooManySamples(env)) |
|
} |
|
|
|
// If this could be an instant query, shortcut so as not to change sort order. |
|
if ev.endTimestamp == ev.startTimestamp { |
|
mat := make(Matrix, len(result)) |
|
for i, s := range result { |
|
s.Point.T = ts |
|
mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}} |
|
} |
|
ev.currentSamples = originalNumSamples + mat.TotalSamples() |
|
return mat, warnings |
|
} |
|
|
|
// Add samples in output vector to output series. |
|
for _, sample := range result { |
|
h := sample.Metric.Hash() |
|
ss, ok := seriess[h] |
|
if !ok { |
|
ss = Series{ |
|
Metric: sample.Metric, |
|
Points: getPointSlice(numSteps), |
|
} |
|
} |
|
sample.Point.T = ts |
|
ss.Points = append(ss.Points, sample.Point) |
|
seriess[h] = ss |
|
|
|
} |
|
} |
|
|
|
// Reuse the original point slices. |
|
for _, m := range origMatrixes { |
|
for _, s := range m { |
|
putPointSlice(s.Points) |
|
} |
|
} |
|
// Assemble the output matrix. By the time we get here we know we don't have too many samples. |
|
mat := make(Matrix, 0, len(seriess)) |
|
for _, ss := range seriess { |
|
mat = append(mat, ss) |
|
} |
|
ev.currentSamples = originalNumSamples + mat.TotalSamples() |
|
return mat, warnings |
|
} |
|
|
|
// evalSubquery evaluates given SubqueryExpr and returns an equivalent |
|
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. |
|
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, storage.Warnings) { |
|
val, ws := ev.eval(subq) |
|
mat := val.(Matrix) |
|
vs := &parser.VectorSelector{ |
|
Offset: subq.Offset, |
|
Series: make([]storage.Series, 0, len(mat)), |
|
} |
|
ms := &parser.MatrixSelector{ |
|
Range: subq.Range, |
|
VectorSelector: vs, |
|
} |
|
for _, s := range mat { |
|
vs.Series = append(vs.Series, NewStorageSeries(s)) |
|
} |
|
return ms, ws |
|
} |
|
|
|
// eval evaluates the given expression as the given AST expression node requires. |
|
func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { |
|
// This is the top-level evaluation method. |
|
// Thus, we check for timeout/cancellation here. |
|
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { |
|
ev.error(err) |
|
} |
|
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 |
|
|
|
switch e := expr.(type) { |
|
case *parser.AggregateExpr: |
|
unwrapParenExpr(&e.Param) |
|
if s, ok := e.Param.(*parser.StringLiteral); ok { |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh), nil |
|
}, e.Expr) |
|
} |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
var param float64 |
|
if e.Param != nil { |
|
param = v[0].(Vector)[0].V |
|
} |
|
return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh), nil |
|
}, e.Param, e.Expr) |
|
|
|
case *parser.Call: |
|
call := FunctionCalls[e.Func.Name] |
|
|
|
if e.Func.Name == "timestamp" { |
|
// Matrix evaluation always returns the evaluation time, |
|
// so this function needs special handling when given |
|
// a vector selector. |
|
vs, ok := e.Args[0].(*parser.VectorSelector) |
|
if ok { |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
val, ws := ev.vectorSelector(vs, enh.ts) |
|
return call([]parser.Value{val}, e.Args, enh), ws |
|
}) |
|
} |
|
} |
|
|
|
// Check if the function has a matrix argument. |
|
var ( |
|
matrixArgIndex int |
|
matrixArg bool |
|
warnings storage.Warnings |
|
) |
|
for i := range e.Args { |
|
unwrapParenExpr(&e.Args[i]) |
|
a := e.Args[i] |
|
if _, ok := a.(*parser.MatrixSelector); ok { |
|
matrixArgIndex = i |
|
matrixArg = true |
|
break |
|
} |
|
// parser.SubqueryExpr can be used in place of parser.MatrixSelector. |
|
if subq, ok := a.(*parser.SubqueryExpr); ok { |
|
matrixArgIndex = i |
|
matrixArg = true |
|
// Replacing parser.SubqueryExpr with parser.MatrixSelector. |
|
val, ws := ev.evalSubquery(subq) |
|
e.Args[i] = val |
|
warnings = append(warnings, ws...) |
|
break |
|
} |
|
} |
|
if !matrixArg { |
|
// Does not have a matrix argument. |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return call(v, e.Args, enh), warnings |
|
}, e.Args...) |
|
} |
|
|
|
inArgs := make([]parser.Value, len(e.Args)) |
|
// Evaluate any non-matrix arguments. |
|
otherArgs := make([]Matrix, len(e.Args)) |
|
otherInArgs := make([]Vector, len(e.Args)) |
|
for i, e := range e.Args { |
|
if i != matrixArgIndex { |
|
val, ws := ev.eval(e) |
|
otherArgs[i] = val.(Matrix) |
|
otherInArgs[i] = Vector{Sample{}} |
|
inArgs[i] = otherInArgs[i] |
|
warnings = append(warnings, ws...) |
|
} |
|
} |
|
|
|
sel := e.Args[matrixArgIndex].(*parser.MatrixSelector) |
|
selVS := sel.VectorSelector.(*parser.VectorSelector) |
|
|
|
ws, err := checkAndExpandSeriesSet(ev.ctx, sel) |
|
warnings = append(warnings, ws...) |
|
if err != nil { |
|
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), warnings}) |
|
} |
|
mat := make(Matrix, 0, len(selVS.Series)) // Output matrix. |
|
offset := durationMilliseconds(selVS.Offset) |
|
selRange := durationMilliseconds(sel.Range) |
|
stepRange := selRange |
|
if stepRange > ev.interval { |
|
stepRange = ev.interval |
|
} |
|
// Reuse objects across steps to save memory allocations. |
|
points := getPointSlice(16) |
|
inMatrix := make(Matrix, 1) |
|
inArgs[matrixArgIndex] = inMatrix |
|
enh := &EvalNodeHelper{out: make(Vector, 0, 1)} |
|
// Process all the calls for one time series at a time. |
|
it := storage.NewBuffer(selRange) |
|
for i, s := range selVS.Series { |
|
ev.currentSamples -= len(points) |
|
points = points[:0] |
|
it.Reset(s.Iterator()) |
|
ss := Series{ |
|
// For all range vector functions, the only change to the |
|
// output labels is dropping the metric name so just do |
|
// it once here. |
|
Metric: dropMetricName(selVS.Series[i].Labels()), |
|
Points: getPointSlice(numSteps), |
|
} |
|
inMatrix[0].Metric = selVS.Series[i].Labels() |
|
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { |
|
step++ |
|
// Set the non-matrix arguments. |
|
// They are scalar, so it is safe to use the step number |
|
// when looking up the argument, as there will be no gaps. |
|
for j := range e.Args { |
|
if j != matrixArgIndex { |
|
otherInArgs[j][0].V = otherArgs[j][0].Points[step].V |
|
} |
|
} |
|
maxt := ts - offset |
|
mint := maxt - selRange |
|
// Evaluate the matrix selector for this series for this step. |
|
points = ev.matrixIterSlice(it, mint, maxt, points) |
|
if len(points) == 0 { |
|
continue |
|
} |
|
inMatrix[0].Points = points |
|
enh.ts = ts |
|
// Make the function call. |
|
outVec := call(inArgs, e.Args, enh) |
|
enh.out = outVec[:0] |
|
if len(outVec) > 0 { |
|
ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts}) |
|
} |
|
// Only buffer stepRange milliseconds from the second step on. |
|
it.ReduceDelta(stepRange) |
|
} |
|
if len(ss.Points) > 0 { |
|
if ev.currentSamples < ev.maxSamples { |
|
mat = append(mat, ss) |
|
ev.currentSamples += len(ss.Points) |
|
} else { |
|
ev.error(ErrTooManySamples(env)) |
|
} |
|
} else { |
|
putPointSlice(ss.Points) |
|
} |
|
} |
|
|
|
ev.currentSamples -= len(points) |
|
putPointSlice(points) |
|
|
|
// The absent_over_time function returns 0 or 1 series. So far, the matrix |
|
// contains multiple series. The following code will create a new series |
|
// with values of 1 for the timestamps where no series has value. |
|
if e.Func.Name == "absent_over_time" { |
|
steps := int(1 + (ev.endTimestamp-ev.startTimestamp)/ev.interval) |
|
// Iterate once to look for a complete series. |
|
for _, s := range mat { |
|
if len(s.Points) == steps { |
|
return Matrix{}, warnings |
|
} |
|
} |
|
|
|
found := map[int64]struct{}{} |
|
|
|
for i, s := range mat { |
|
for _, p := range s.Points { |
|
found[p.T] = struct{}{} |
|
} |
|
if i > 0 && len(found) == steps { |
|
return Matrix{}, warnings |
|
} |
|
} |
|
|
|
newp := make([]Point, 0, steps-len(found)) |
|
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { |
|
if _, ok := found[ts]; !ok { |
|
newp = append(newp, Point{T: ts, V: 1}) |
|
} |
|
} |
|
|
|
return Matrix{ |
|
Series{ |
|
Metric: createLabelsForAbsentFunction(e.Args[0]), |
|
Points: newp, |
|
}, |
|
}, warnings |
|
} |
|
|
|
if mat.ContainsSameLabelset() { |
|
ev.errorf("vector cannot contain metrics with the same labelset") |
|
} |
|
|
|
return mat, warnings |
|
|
|
case *parser.ParenExpr: |
|
return ev.eval(e.Expr) |
|
|
|
case *parser.UnaryExpr: |
|
val, ws := ev.eval(e.Expr) |
|
mat := val.(Matrix) |
|
if e.Op == parser.SUB { |
|
for i := range mat { |
|
mat[i].Metric = dropMetricName(mat[i].Metric) |
|
for j := range mat[i].Points { |
|
mat[i].Points[j].V = -mat[i].Points[j].V |
|
} |
|
} |
|
if mat.ContainsSameLabelset() { |
|
ev.errorf("vector cannot contain metrics with the same labelset") |
|
} |
|
} |
|
return mat, ws |
|
|
|
case *parser.BinaryExpr: |
|
switch lt, rt := e.LHS.Type(), e.RHS.Type(); { |
|
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar: |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
val := scalarBinop(e.Op, v[0].(Vector)[0].Point.V, v[1].(Vector)[0].Point.V) |
|
return append(enh.out, Sample{Point: Point{V: val}}), nil |
|
}, e.LHS, e.RHS) |
|
case lt == parser.ValueTypeVector && rt == parser.ValueTypeVector: |
|
switch e.Op { |
|
case parser.LAND: |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil |
|
}, e.LHS, e.RHS) |
|
case parser.LOR: |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil |
|
}, e.LHS, e.RHS) |
|
case parser.LUNLESS: |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil |
|
}, e.LHS, e.RHS) |
|
default: |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh), nil |
|
}, e.LHS, e.RHS) |
|
} |
|
|
|
case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar: |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].Point.V}, false, e.ReturnBool, enh), nil |
|
}, e.LHS, e.RHS) |
|
|
|
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector: |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].Point.V}, true, e.ReturnBool, enh), nil |
|
}, e.LHS, e.RHS) |
|
} |
|
|
|
case *parser.NumberLiteral: |
|
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { |
|
return append(enh.out, Sample{Point: Point{V: e.Val}}), nil |
|
}) |
|
|
|
case *parser.VectorSelector: |
|
ws, err := checkAndExpandSeriesSet(ev.ctx, e) |
|
if err != nil { |
|
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) |
|
} |
|
mat := make(Matrix, 0, len(e.Series)) |
|
it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta)) |
|
for i, s := range e.Series { |
|
it.Reset(s.Iterator()) |
|
ss := Series{ |
|
Metric: e.Series[i].Labels(), |
|
Points: getPointSlice(numSteps), |
|
} |
|
|
|
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { |
|
_, v, ok := ev.vectorSelectorSingle(it, e, ts) |
|
if ok { |
|
if ev.currentSamples < ev.maxSamples { |
|
ss.Points = append(ss.Points, Point{V: v, T: ts}) |
|
ev.currentSamples++ |
|
} else { |
|
ev.error(ErrTooManySamples(env)) |
|
} |
|
} |
|
} |
|
|
|
if len(ss.Points) > 0 { |
|
mat = append(mat, ss) |
|
} else { |
|
putPointSlice(ss.Points) |
|
} |
|
} |
|
return mat, ws |
|
|
|
case *parser.MatrixSelector: |
|
if ev.startTimestamp != ev.endTimestamp { |
|
panic(errors.New("cannot do range evaluation of matrix selector")) |
|
} |
|
return ev.matrixSelector(e) |
|
|
|
case *parser.SubqueryExpr: |
|
offsetMillis := durationMilliseconds(e.Offset) |
|
rangeMillis := durationMilliseconds(e.Range) |
|
newEv := &evaluator{ |
|
endTimestamp: ev.endTimestamp - offsetMillis, |
|
ctx: ev.ctx, |
|
currentSamples: ev.currentSamples, |
|
maxSamples: ev.maxSamples, |
|
logger: ev.logger, |
|
lookbackDelta: ev.lookbackDelta, |
|
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, |
|
} |
|
|
|
if e.Step != 0 { |
|
newEv.interval = durationMilliseconds(e.Step) |
|
} else { |
|
newEv.interval = ev.noStepSubqueryIntervalFn(rangeMillis) |
|
} |
|
|
|
// Start with the first timestamp after (ev.startTimestamp - offset - range) |
|
// that is aligned with the step (multiple of 'newEv.interval'). |
|
newEv.startTimestamp = newEv.interval * ((ev.startTimestamp - offsetMillis - rangeMillis) / newEv.interval) |
|
if newEv.startTimestamp < (ev.startTimestamp - offsetMillis - rangeMillis) { |
|
newEv.startTimestamp += newEv.interval |
|
} |
|
|
|
res, ws := newEv.eval(e.Expr) |
|
ev.currentSamples = newEv.currentSamples |
|
return res, ws |
|
case *parser.StringLiteral: |
|
return String{V: e.Val, T: ev.startTimestamp}, nil |
|
} |
|
|
|
panic(errors.Errorf("unhandled expression of type: %T", expr)) |
|
} |
|
|
|
// vectorSelector evaluates a *parser.VectorSelector expression. |
|
func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { |
|
ws, err := checkAndExpandSeriesSet(ev.ctx, node) |
|
if err != nil { |
|
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) |
|
} |
|
vec := make(Vector, 0, len(node.Series)) |
|
it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta)) |
|
for i, s := range node.Series { |
|
it.Reset(s.Iterator()) |
|
|
|
t, v, ok := ev.vectorSelectorSingle(it, node, ts) |
|
if ok { |
|
vec = append(vec, Sample{ |
|
Metric: node.Series[i].Labels(), |
|
Point: Point{V: v, T: t}, |
|
}) |
|
ev.currentSamples++ |
|
} |
|
|
|
if ev.currentSamples >= ev.maxSamples { |
|
ev.error(ErrTooManySamples(env)) |
|
} |
|
} |
|
return vec, ws |
|
} |
|
|
|
// vectorSelectorSingle evaluates a instant vector for the iterator of one time series. |
|
func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { |
|
refTime := ts - durationMilliseconds(node.Offset) |
|
var t int64 |
|
var v float64 |
|
|
|
ok := it.Seek(refTime) |
|
if !ok { |
|
if it.Err() != nil { |
|
ev.error(it.Err()) |
|
} |
|
} |
|
|
|
if ok { |
|
t, v = it.Values() |
|
} |
|
|
|
if !ok || t > refTime { |
|
t, v, ok = it.PeekBack(1) |
|
if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) { |
|
return 0, 0, false |
|
} |
|
} |
|
if value.IsStaleNaN(v) { |
|
return 0, 0, false |
|
} |
|
return t, v, true |
|
} |
|
|
|
var pointPool = sync.Pool{} |
|
|
|
func getPointSlice(sz int) []Point { |
|
p := pointPool.Get() |
|
if p != nil { |
|
return p.([]Point) |
|
} |
|
return make([]Point, 0, sz) |
|
} |
|
|
|
func putPointSlice(p []Point) { |
|
//lint:ignore SA6002 relax staticcheck verification. |
|
pointPool.Put(p[:0]) |
|
} |
|
|
|
// matrixSelector evaluates a *parser.MatrixSelector expression. |
|
func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storage.Warnings) { |
|
var ( |
|
vs = node.VectorSelector.(*parser.VectorSelector) |
|
|
|
offset = durationMilliseconds(vs.Offset) |
|
maxt = ev.startTimestamp - offset |
|
mint = maxt - durationMilliseconds(node.Range) |
|
matrix = make(Matrix, 0, len(vs.Series)) |
|
|
|
it = storage.NewBuffer(durationMilliseconds(node.Range)) |
|
) |
|
ws, err := checkAndExpandSeriesSet(ev.ctx, node) |
|
if err != nil { |
|
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) |
|
} |
|
|
|
series := vs.Series |
|
for i, s := range series { |
|
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { |
|
ev.error(err) |
|
} |
|
it.Reset(s.Iterator()) |
|
ss := Series{ |
|
Metric: series[i].Labels(), |
|
} |
|
|
|
ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16)) |
|
|
|
if len(ss.Points) > 0 { |
|
matrix = append(matrix, ss) |
|
} else { |
|
putPointSlice(ss.Points) |
|
} |
|
} |
|
return matrix, ws |
|
} |
|
|
|
// matrixIterSlice populates a matrix vector covering the requested range for a |
|
// single time series, with points retrieved from an iterator. |
|
// |
|
// As an optimization, the matrix vector may already contain points of the same |
|
// time series from the evaluation of an earlier step (with lower mint and maxt |
|
// values). Any such points falling before mint are discarded; points that fall |
|
// into the [mint, maxt] range are retained; only points with later timestamps |
|
// are populated from the iterator. |
|
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point { |
|
if len(out) > 0 && out[len(out)-1].T >= mint { |
|
// There is an overlap between previous and current ranges, retain common |
|
// points. In most such cases: |
|
// (a) the overlap is significantly larger than the eval step; and/or |
|
// (b) the number of samples is relatively small. |
|
// so a linear search will be as fast as a binary search. |
|
var drop int |
|
for drop = 0; out[drop].T < mint; drop++ { |
|
} |
|
ev.currentSamples -= drop |
|
copy(out, out[drop:]) |
|
out = out[:len(out)-drop] |
|
// Only append points with timestamps after the last timestamp we have. |
|
mint = out[len(out)-1].T + 1 |
|
} else { |
|
ev.currentSamples -= len(out) |
|
out = out[:0] |
|
} |
|
|
|
ok := it.Seek(maxt) |
|
if !ok { |
|
if it.Err() != nil { |
|
ev.error(it.Err()) |
|
} |
|
} |
|
|
|
buf := it.Buffer() |
|
for buf.Next() { |
|
t, v := buf.At() |
|
if value.IsStaleNaN(v) { |
|
continue |
|
} |
|
// Values in the buffer are guaranteed to be smaller than maxt. |
|
if t >= mint { |
|
if ev.currentSamples >= ev.maxSamples { |
|
ev.error(ErrTooManySamples(env)) |
|
} |
|
out = append(out, Point{T: t, V: v}) |
|
ev.currentSamples++ |
|
} |
|
} |
|
// The seeked sample might also be in the range. |
|
if ok { |
|
t, v := it.Values() |
|
if t == maxt && !value.IsStaleNaN(v) { |
|
if ev.currentSamples >= ev.maxSamples { |
|
ev.error(ErrTooManySamples(env)) |
|
} |
|
out = append(out, Point{T: t, V: v}) |
|
ev.currentSamples++ |
|
} |
|
} |
|
return out |
|
} |
|
|
|
func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, enh *EvalNodeHelper) Vector { |
|
if matching.Card != parser.CardManyToMany { |
|
panic("set operations must only use many-to-many matching") |
|
} |
|
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) |
|
|
|
// The set of signatures for the right-hand side Vector. |
|
rightSigs := map[string]struct{}{} |
|
// Add all rhs samples to a map so we can easily find matches later. |
|
for _, rs := range rhs { |
|
rightSigs[sigf(rs.Metric)] = struct{}{} |
|
} |
|
|
|
for _, ls := range lhs { |
|
// If there's a matching entry in the right-hand side Vector, add the sample. |
|
if _, ok := rightSigs[sigf(ls.Metric)]; ok { |
|
enh.out = append(enh.out, ls) |
|
} |
|
} |
|
return enh.out |
|
} |
|
|
|
func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *parser.VectorMatching, enh *EvalNodeHelper) Vector { |
|
if matching.Card != parser.CardManyToMany { |
|
panic("set operations must only use many-to-many matching") |
|
} |
|
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) |
|
|
|
leftSigs := map[string]struct{}{} |
|
// Add everything from the left-hand-side Vector. |
|
for _, ls := range lhs { |
|
leftSigs[sigf(ls.Metric)] = struct{}{} |
|
enh.out = append(enh.out, ls) |
|
} |
|
// Add all right-hand side elements which have not been added from the left-hand side. |
|
for _, rs := range rhs { |
|
if _, ok := leftSigs[sigf(rs.Metric)]; !ok { |
|
enh.out = append(enh.out, rs) |
|
} |
|
} |
|
return enh.out |
|
} |
|
|
|
func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *parser.VectorMatching, enh *EvalNodeHelper) Vector { |
|
if matching.Card != parser.CardManyToMany { |
|
panic("set operations must only use many-to-many matching") |
|
} |
|
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) |
|
|
|
rightSigs := map[string]struct{}{} |
|
for _, rs := range rhs { |
|
rightSigs[sigf(rs.Metric)] = struct{}{} |
|
} |
|
|
|
for _, ls := range lhs { |
|
if _, ok := rightSigs[sigf(ls.Metric)]; !ok { |
|
enh.out = append(enh.out, ls) |
|
} |
|
} |
|
return enh.out |
|
} |
|
|
|
// VectorBinop evaluates a binary operation between two Vectors, excluding set operators. |
|
func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *parser.VectorMatching, returnBool bool, enh *EvalNodeHelper) Vector { |
|
if matching.Card == parser.CardManyToMany { |
|
panic("many-to-many only allowed for set operators") |
|
} |
|
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) |
|
|
|
// The control flow below handles one-to-one or many-to-one matching. |
|
// For one-to-many, swap sidedness and account for the swap when calculating |
|
// values. |
|
if matching.Card == parser.CardOneToMany { |
|
lhs, rhs = rhs, lhs |
|
} |
|
|
|
// All samples from the rhs hashed by the matching label/values. |
|
if enh.rightSigs == nil { |
|
enh.rightSigs = make(map[string]Sample, len(enh.out)) |
|
} else { |
|
for k := range enh.rightSigs { |
|
delete(enh.rightSigs, k) |
|
} |
|
} |
|
rightSigs := enh.rightSigs |
|
|
|
// Add all rhs samples to a map so we can easily find matches later. |
|
for _, rs := range rhs { |
|
sig := sigf(rs.Metric) |
|
// The rhs is guaranteed to be the 'one' side. Having multiple samples |
|
// with the same signature means that the matching is many-to-many. |
|
if duplSample, found := rightSigs[sig]; found { |
|
// oneSide represents which side of the vector represents the 'one' in the many-to-one relationship. |
|
oneSide := "right" |
|
if matching.Card == parser.CardOneToMany { |
|
oneSide = "left" |
|
} |
|
matchedLabels := rs.Metric.MatchLabels(matching.On, matching.MatchingLabels...) |
|
// Many-to-many matching not allowed. |
|
ev.errorf("found duplicate series for the match group %s on the %s hand-side of the operation: [%s, %s]"+ |
|
";many-to-many matching not allowed: matching labels must be unique on one side", matchedLabels.String(), oneSide, rs.Metric.String(), duplSample.Metric.String()) |
|
} |
|
rightSigs[sig] = rs |
|
} |
|
|
|
// Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one |
|
// the value is a set of signatures to detect duplicated result elements. |
|
if enh.matchedSigs == nil { |
|
enh.matchedSigs = make(map[string]map[uint64]struct{}, len(rightSigs)) |
|
} else { |
|
for k := range enh.matchedSigs { |
|
delete(enh.matchedSigs, k) |
|
} |
|
} |
|
matchedSigs := enh.matchedSigs |
|
|
|
// For all lhs samples find a respective rhs sample and perform |
|
// the binary operation. |
|
for _, ls := range lhs { |
|
sig := sigf(ls.Metric) |
|
|
|
rs, found := rightSigs[sig] // Look for a match in the rhs Vector. |
|
if !found { |
|
continue |
|
} |
|
|
|
// Account for potentially swapped sidedness. |
|
vl, vr := ls.V, rs.V |
|
if matching.Card == parser.CardOneToMany { |
|
vl, vr = vr, vl |
|
} |
|
value, keep := vectorElemBinop(op, vl, vr) |
|
if returnBool { |
|
if keep { |
|
value = 1.0 |
|
} else { |
|
value = 0.0 |
|
} |
|
} else if !keep { |
|
continue |
|
} |
|
metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh) |
|
insertedSigs, exists := matchedSigs[sig] |
|
if matching.Card == parser.CardOneToOne { |
|
if exists { |
|
ev.errorf("multiple matches for labels: many-to-one matching must be explicit (group_left/group_right)") |
|
} |
|
matchedSigs[sig] = nil // Set existence to true. |
|
} else { |
|
// In many-to-one matching the grouping labels have to ensure a unique metric |
|
// for the result Vector. Check whether those labels have already been added for |
|
// the same matching labels. |
|
insertSig := metric.Hash() |
|
|
|
if !exists { |
|
insertedSigs = map[uint64]struct{}{} |
|
matchedSigs[sig] = insertedSigs |
|
} else if _, duplicate := insertedSigs[insertSig]; duplicate { |
|
ev.errorf("multiple matches for labels: grouping labels must ensure unique matches") |
|
} |
|
insertedSigs[insertSig] = struct{}{} |
|
} |
|
|
|
enh.out = append(enh.out, Sample{ |
|
Metric: metric, |
|
Point: Point{V: value}, |
|
}) |
|
} |
|
return enh.out |
|
} |
|
|
|
func signatureFunc(on bool, b []byte, names ...string) func(labels.Labels) string { |
|
sort.Strings(names) |
|
if on { |
|
return func(lset labels.Labels) string { |
|
return string(lset.WithLabels(names...).Bytes(b)) |
|
} |
|
} |
|
return func(lset labels.Labels) string { |
|
return string(lset.WithoutLabels(names...).Bytes(b)) |
|
} |
|
} |
|
|
|
// resultMetric returns the metric for the given sample(s) based on the Vector |
|
// binary operation and the matching options. |
|
func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.VectorMatching, enh *EvalNodeHelper) labels.Labels { |
|
if enh.resultMetric == nil { |
|
enh.resultMetric = make(map[string]labels.Labels, len(enh.out)) |
|
} |
|
|
|
if enh.lb == nil { |
|
enh.lb = labels.NewBuilder(lhs) |
|
} else { |
|
enh.lb.Reset(lhs) |
|
} |
|
|
|
buf := bytes.NewBuffer(enh.lblResultBuf[:0]) |
|
enh.lblBuf = lhs.Bytes(enh.lblBuf) |
|
buf.Write(enh.lblBuf) |
|
enh.lblBuf = rhs.Bytes(enh.lblBuf) |
|
buf.Write(enh.lblBuf) |
|
enh.lblResultBuf = buf.Bytes() |
|
|
|
if ret, ok := enh.resultMetric[string(enh.lblResultBuf)]; ok { |
|
return ret |
|
} |
|
str := string(enh.lblResultBuf) |
|
|
|
if shouldDropMetricName(op) { |
|
enh.lb.Del(labels.MetricName) |
|
} |
|
|
|
if matching.Card == parser.CardOneToOne { |
|
if matching.On { |
|
Outer: |
|
for _, l := range lhs { |
|
for _, n := range matching.MatchingLabels { |
|
if l.Name == n { |
|
continue Outer |
|
} |
|
} |
|
enh.lb.Del(l.Name) |
|
} |
|
} else { |
|
enh.lb.Del(matching.MatchingLabels...) |
|
} |
|
} |
|
for _, ln := range matching.Include { |
|
// Included labels from the `group_x` modifier are taken from the "one"-side. |
|
if v := rhs.Get(ln); v != "" { |
|
enh.lb.Set(ln, v) |
|
} else { |
|
enh.lb.Del(ln) |
|
} |
|
} |
|
|
|
ret := enh.lb.Labels() |
|
enh.resultMetric[str] = ret |
|
return ret |
|
} |
|
|
|
// VectorscalarBinop evaluates a binary operation between a Vector and a Scalar. |
|
func (ev *evaluator) VectorscalarBinop(op parser.ItemType, lhs Vector, rhs Scalar, swap, returnBool bool, enh *EvalNodeHelper) Vector { |
|
for _, lhsSample := range lhs { |
|
lv, rv := lhsSample.V, rhs.V |
|
// lhs always contains the Vector. If the original position was different |
|
// swap for calculating the value. |
|
if swap { |
|
lv, rv = rv, lv |
|
} |
|
value, keep := vectorElemBinop(op, lv, rv) |
|
// Catch cases where the scalar is the LHS in a scalar-vector comparison operation. |
|
// We want to always keep the vector element value as the output value, even if it's on the RHS. |
|
if op.IsComparisonOperator() && swap { |
|
value = rv |
|
} |
|
if returnBool { |
|
if keep { |
|
value = 1.0 |
|
} else { |
|
value = 0.0 |
|
} |
|
keep = true |
|
} |
|
if keep { |
|
lhsSample.V = value |
|
if shouldDropMetricName(op) || returnBool { |
|
lhsSample.Metric = enh.dropMetricName(lhsSample.Metric) |
|
} |
|
enh.out = append(enh.out, lhsSample) |
|
} |
|
} |
|
return enh.out |
|
} |
|
|
|
func dropMetricName(l labels.Labels) labels.Labels { |
|
return labels.NewBuilder(l).Del(labels.MetricName).Labels() |
|
} |
|
|
|
// scalarBinop evaluates a binary operation between two Scalars. |
|
func scalarBinop(op parser.ItemType, lhs, rhs float64) float64 { |
|
switch op { |
|
case parser.ADD: |
|
return lhs + rhs |
|
case parser.SUB: |
|
return lhs - rhs |
|
case parser.MUL: |
|
return lhs * rhs |
|
case parser.DIV: |
|
return lhs / rhs |
|
case parser.POW: |
|
return math.Pow(lhs, rhs) |
|
case parser.MOD: |
|
return math.Mod(lhs, rhs) |
|
case parser.EQL: |
|
return btos(lhs == rhs) |
|
case parser.NEQ: |
|
return btos(lhs != rhs) |
|
case parser.GTR: |
|
return btos(lhs > rhs) |
|
case parser.LSS: |
|
return btos(lhs < rhs) |
|
case parser.GTE: |
|
return btos(lhs >= rhs) |
|
case parser.LTE: |
|
return btos(lhs <= rhs) |
|
} |
|
panic(errors.Errorf("operator %q not allowed for Scalar operations", op)) |
|
} |
|
|
|
// vectorElemBinop evaluates a binary operation between two Vector elements. |
|
func vectorElemBinop(op parser.ItemType, lhs, rhs float64) (float64, bool) { |
|
switch op { |
|
case parser.ADD: |
|
return lhs + rhs, true |
|
case parser.SUB: |
|
return lhs - rhs, true |
|
case parser.MUL: |
|
return lhs * rhs, true |
|
case parser.DIV: |
|
return lhs / rhs, true |
|
case parser.POW: |
|
return math.Pow(lhs, rhs), true |
|
case parser.MOD: |
|
return math.Mod(lhs, rhs), true |
|
case parser.EQL: |
|
return lhs, lhs == rhs |
|
case parser.NEQ: |
|
return lhs, lhs != rhs |
|
case parser.GTR: |
|
return lhs, lhs > rhs |
|
case parser.LSS: |
|
return lhs, lhs < rhs |
|
case parser.GTE: |
|
return lhs, lhs >= rhs |
|
case parser.LTE: |
|
return lhs, lhs <= rhs |
|
} |
|
panic(errors.Errorf("operator %q not allowed for operations between Vectors", op)) |
|
} |
|
|
|
type groupedAggregation struct { |
|
labels labels.Labels |
|
value float64 |
|
mean float64 |
|
groupCount int |
|
heap vectorByValueHeap |
|
reverseHeap vectorByReverseValueHeap |
|
} |
|
|
|
// aggregation evaluates an aggregation operation on a Vector. |
|
func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector { |
|
|
|
result := map[uint64]*groupedAggregation{} |
|
var k int64 |
|
if op == parser.TOPK || op == parser.BOTTOMK { |
|
f := param.(float64) |
|
if !convertibleToInt64(f) { |
|
ev.errorf("Scalar value %v overflows int64", f) |
|
} |
|
k = int64(f) |
|
if k < 1 { |
|
return Vector{} |
|
} |
|
} |
|
var q float64 |
|
if op == parser.QUANTILE { |
|
q = param.(float64) |
|
} |
|
var valueLabel string |
|
if op == parser.COUNT_VALUES { |
|
valueLabel = param.(string) |
|
if !model.LabelName(valueLabel).IsValid() { |
|
ev.errorf("invalid label name %q", valueLabel) |
|
} |
|
if !without { |
|
grouping = append(grouping, valueLabel) |
|
} |
|
} |
|
|
|
sort.Strings(grouping) |
|
lb := labels.NewBuilder(nil) |
|
buf := make([]byte, 0, 1024) |
|
for _, s := range vec { |
|
metric := s.Metric |
|
|
|
if op == parser.COUNT_VALUES { |
|
lb.Reset(metric) |
|
lb.Set(valueLabel, strconv.FormatFloat(s.V, 'f', -1, 64)) |
|
metric = lb.Labels() |
|
} |
|
|
|
var ( |
|
groupingKey uint64 |
|
) |
|
if without { |
|
groupingKey, buf = metric.HashWithoutLabels(buf, grouping...) |
|
} else { |
|
groupingKey, buf = metric.HashForLabels(buf, grouping...) |
|
} |
|
|
|
group, ok := result[groupingKey] |
|
// Add a new group if it doesn't exist. |
|
if !ok { |
|
var m labels.Labels |
|
|
|
if without { |
|
lb.Reset(metric) |
|
lb.Del(grouping...) |
|
lb.Del(labels.MetricName) |
|
m = lb.Labels() |
|
} else { |
|
m = make(labels.Labels, 0, len(grouping)) |
|
for _, l := range metric { |
|
for _, n := range grouping { |
|
if l.Name == n { |
|
m = append(m, l) |
|
break |
|
} |
|
} |
|
} |
|
sort.Sort(m) |
|
} |
|
result[groupingKey] = &groupedAggregation{ |
|
labels: m, |
|
value: s.V, |
|
mean: s.V, |
|
groupCount: 1, |
|
} |
|
inputVecLen := int64(len(vec)) |
|
resultSize := k |
|
if k > inputVecLen { |
|
resultSize = inputVecLen |
|
} |
|
switch op { |
|
case parser.STDVAR, parser.STDDEV: |
|
result[groupingKey].value = 0 |
|
case parser.TOPK, parser.QUANTILE: |
|
result[groupingKey].heap = make(vectorByValueHeap, 0, resultSize) |
|
heap.Push(&result[groupingKey].heap, &Sample{ |
|
Point: Point{V: s.V}, |
|
Metric: s.Metric, |
|
}) |
|
case parser.BOTTOMK: |
|
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 0, resultSize) |
|
heap.Push(&result[groupingKey].reverseHeap, &Sample{ |
|
Point: Point{V: s.V}, |
|
Metric: s.Metric, |
|
}) |
|
case parser.GROUP: |
|
result[groupingKey].value = 1 |
|
} |
|
continue |
|
} |
|
|
|
switch op { |
|
case parser.SUM: |
|
group.value += s.V |
|
|
|
case parser.AVG: |
|
group.groupCount++ |
|
if math.IsInf(group.mean, 0) { |
|
if math.IsInf(s.V, 0) && (group.mean > 0) == (s.V > 0) { |
|
// The `mean` and `s.V` values are `Inf` of the same sign. They |
|
// can't be subtracted, but the value of `mean` is correct |
|
// already. |
|
break |
|
} |
|
if !math.IsInf(s.V, 0) && !math.IsNaN(s.V) { |
|
// At this stage, the mean is an infinite. If the added |
|
// value is neither an Inf or a Nan, we can keep that mean |
|
// value. |
|
// This is required because our calculation below removes |
|
// the mean value, which would look like Inf += x - Inf and |
|
// end up as a NaN. |
|
break |
|
} |
|
} |
|
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. |
|
group.mean += s.V/float64(group.groupCount) - group.mean/float64(group.groupCount) |
|
|
|
case parser.GROUP: |
|
// Do nothing. Required to avoid the panic in `default:` below. |
|
|
|
case parser.MAX: |
|
if group.value < s.V || math.IsNaN(group.value) { |
|
group.value = s.V |
|
} |
|
|
|
case parser.MIN: |
|
if group.value > s.V || math.IsNaN(group.value) { |
|
group.value = s.V |
|
} |
|
|
|
case parser.COUNT, parser.COUNT_VALUES: |
|
group.groupCount++ |
|
|
|
case parser.STDVAR, parser.STDDEV: |
|
group.groupCount++ |
|
delta := s.V - group.mean |
|
group.mean += delta / float64(group.groupCount) |
|
group.value += delta * (s.V - group.mean) |
|
|
|
case parser.TOPK: |
|
if int64(len(group.heap)) < k || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) { |
|
if int64(len(group.heap)) == k { |
|
heap.Pop(&group.heap) |
|
} |
|
heap.Push(&group.heap, &Sample{ |
|
Point: Point{V: s.V}, |
|
Metric: s.Metric, |
|
}) |
|
} |
|
|
|
case parser.BOTTOMK: |
|
if int64(len(group.reverseHeap)) < k || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) { |
|
if int64(len(group.reverseHeap)) == k { |
|
heap.Pop(&group.reverseHeap) |
|
} |
|
heap.Push(&group.reverseHeap, &Sample{ |
|
Point: Point{V: s.V}, |
|
Metric: s.Metric, |
|
}) |
|
} |
|
|
|
case parser.QUANTILE: |
|
group.heap = append(group.heap, s) |
|
|
|
default: |
|
panic(errors.Errorf("expected aggregation operator but got %q", op)) |
|
} |
|
} |
|
|
|
// Construct the result Vector from the aggregated groups. |
|
for _, aggr := range result { |
|
switch op { |
|
case parser.AVG: |
|
aggr.value = aggr.mean |
|
|
|
case parser.COUNT, parser.COUNT_VALUES: |
|
aggr.value = float64(aggr.groupCount) |
|
|
|
case parser.STDVAR: |
|
aggr.value = aggr.value / float64(aggr.groupCount) |
|
|
|
case parser.STDDEV: |
|
aggr.value = math.Sqrt(aggr.value / float64(aggr.groupCount)) |
|
|
|
case parser.TOPK: |
|
// The heap keeps the lowest value on top, so reverse it. |
|
sort.Sort(sort.Reverse(aggr.heap)) |
|
for _, v := range aggr.heap { |
|
enh.out = append(enh.out, Sample{ |
|
Metric: v.Metric, |
|
Point: Point{V: v.V}, |
|
}) |
|
} |
|
continue // Bypass default append. |
|
|
|
case parser.BOTTOMK: |
|
// The heap keeps the lowest value on top, so reverse it. |
|
sort.Sort(sort.Reverse(aggr.reverseHeap)) |
|
for _, v := range aggr.reverseHeap { |
|
enh.out = append(enh.out, Sample{ |
|
Metric: v.Metric, |
|
Point: Point{V: v.V}, |
|
}) |
|
} |
|
continue // Bypass default append. |
|
|
|
case parser.QUANTILE: |
|
aggr.value = quantile(q, aggr.heap) |
|
|
|
default: |
|
// For other aggregations, we already have the right value. |
|
} |
|
|
|
enh.out = append(enh.out, Sample{ |
|
Metric: aggr.labels, |
|
Point: Point{V: aggr.value}, |
|
}) |
|
} |
|
return enh.out |
|
} |
|
|
|
// btos returns 1 if b is true, 0 otherwise. |
|
func btos(b bool) float64 { |
|
if b { |
|
return 1 |
|
} |
|
return 0 |
|
} |
|
|
|
// shouldDropMetricName returns whether the metric name should be dropped in the |
|
// result of the op operation. |
|
func shouldDropMetricName(op parser.ItemType) bool { |
|
switch op { |
|
case parser.ADD, parser.SUB, parser.DIV, parser.MUL, parser.POW, parser.MOD: |
|
return true |
|
default: |
|
return false |
|
} |
|
} |
|
|
|
// NewOriginContext returns a new context with data about the origin attached. |
|
func NewOriginContext(ctx context.Context, data map[string]interface{}) context.Context { |
|
return context.WithValue(ctx, queryOrigin{}, data) |
|
} |
|
|
|
func formatDate(t time.Time) string { |
|
return t.UTC().Format("2006-01-02T15:04:05.000Z07:00") |
|
} |
|
|
|
// unwrapParenExpr does the AST equivalent of removing parentheses around a expression. |
|
func unwrapParenExpr(e *parser.Expr) { |
|
for { |
|
if p, ok := (*e).(*parser.ParenExpr); ok { |
|
*e = p.Expr |
|
} else { |
|
break |
|
} |
|
} |
|
}
|
|
|