Optimize aggregations in PromQL engine (#8594)

* Optimize aggregations in PromQL engine

Signed-off-by: Marco Pracucci <marco@pracucci.com>
pull/8627/head
Marco Pracucci 2021-03-19 17:52:29 +01:00 committed by GitHub
parent e4f076f813
commit 6719071a0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 29 deletions

View File

@ -156,6 +156,9 @@ func BenchmarkRangeQuery(b *testing.B) {
{
expr: "sum by (le)(h_X)",
},
{
expr: "count_values('value', h_X)",
},
// Combinations.
{
expr: "rate(a_X[1m]) + rate(b_X[1m])",

View File

@ -902,6 +902,12 @@ func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws storage.Warnings
return v, ws, nil
}
// EvalSeriesHelper stores extra information about a series.
type EvalSeriesHelper struct {
// The grouping key used by aggregation.
groupingKey uint64
}
// EvalNodeHelper stores extra information and caches for evaluating a single node across steps.
type EvalNodeHelper struct {
// Evaluation timestamp.
@ -962,10 +968,12 @@ func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.L
}
// rangeEval evaluates the given expressions, and then for each step calls
// the given function with the values computed for each expression at that
// step. The return value is the combination into time series of all the
// the given funcCall with the values computed for each expression at that
// step. The return value is the combination into time series of all the
// function call results.
func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) {
// The prepSeries function (if provided) can be used to prepare the helper
// for each series, then passed to each call funcCall.
func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
matrixes := make([]Matrix, len(exprs))
origMatrixes := make([]Matrix, len(exprs))
@ -1001,6 +1009,30 @@ func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (V
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
var (
seriesHelpers [][]EvalSeriesHelper
bufHelpers [][]EvalSeriesHelper // Buffer updated on each step
)
// If the series preparation function is provided, we should run it for
// every single series in the matrix.
if prepSeries != nil {
seriesHelpers = make([][]EvalSeriesHelper, len(exprs))
bufHelpers = make([][]EvalSeriesHelper, len(exprs))
for i := range exprs {
seriesHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i]))
bufHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i]))
for si, series := range matrixes[i] {
h := seriesHelpers[i][si]
prepSeries(series.Metric, &h)
seriesHelpers[i][si] = h
}
}
}
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
@ -1010,11 +1042,20 @@ func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (V
// Gather input vectors for this timestamp.
for i := range exprs {
vectors[i] = vectors[i][:0]
if prepSeries != nil {
bufHelpers[i] = bufHelpers[i][:0]
}
for si, series := range matrixes[i] {
for _, point := range series.Points {
if point.T == ts {
if ev.currentSamples < ev.maxSamples {
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point})
if prepSeries != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Points = series.Points[1:]
@ -1028,9 +1069,10 @@ func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (V
}
args[i] = vectors[i]
}
// Make the function call.
enh.Ts = ts
result, ws := funcCall(args, enh)
result, ws := funcCall(args, bufHelpers, enh)
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
@ -1132,18 +1174,29 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
switch e := expr.(type) {
case *parser.AggregateExpr:
// Grouping labels must be sorted (expected both by generateGroupingKey() and aggregation()).
sortedGrouping := e.Grouping
sort.Strings(sortedGrouping)
// Prepare a function to initialise series helpers with the grouping key.
buf := make([]byte, 0, 1024)
initSeries := func(series labels.Labels, h *EvalSeriesHelper) {
h.groupingKey, buf = generateGroupingKey(series, sortedGrouping, e.Without, buf)
}
unwrapParenExpr(&e.Param)
if s, ok := unwrapStepInvariantExpr(e.Param).(*parser.StringLiteral); ok {
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh), nil
return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.aggregation(e.Op, sortedGrouping, e.Without, s.Val, v[0].(Vector), sh[0], enh), nil
}, e.Expr)
}
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
var param float64
if e.Param != nil {
param = v[0].(Vector)[0].V
}
return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh), nil
return ev.aggregation(e.Op, sortedGrouping, e.Without, param, v[1].(Vector), sh[1], enh), nil
}, e.Param, e.Expr)
case *parser.Call:
@ -1156,7 +1209,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
arg := unwrapStepInvariantExpr(e.Args[0])
vs, ok := arg.(*parser.VectorSelector)
if ok {
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
if vs.Timestamp != nil {
// This is a special case only for "timestamp" since the offset
// needs to be adjusted for every point.
@ -1200,7 +1253,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
}
if !matrixArg {
// Does not have a matrix argument.
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return call(v, e.Args, enh), warnings
}, e.Args...)
}
@ -1367,43 +1420,43 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
case *parser.BinaryExpr:
switch lt, rt := e.LHS.Type(), e.RHS.Type(); {
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
val := scalarBinop(e.Op, v[0].(Vector)[0].Point.V, v[1].(Vector)[0].Point.V)
return append(enh.Out, Sample{Point: Point{V: val}}), nil
}, e.LHS, e.RHS)
case lt == parser.ValueTypeVector && rt == parser.ValueTypeVector:
switch e.Op {
case parser.LAND:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil
}, e.LHS, e.RHS)
case parser.LOR:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil
}, e.LHS, e.RHS)
case parser.LUNLESS:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil
}, e.LHS, e.RHS)
default:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh), nil
}, e.LHS, e.RHS)
}
case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].Point.V}, false, e.ReturnBool, enh), nil
}, e.LHS, e.RHS)
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].Point.V}, true, e.ReturnBool, enh), nil
}, e.LHS, e.RHS)
}
case *parser.NumberLiteral:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return append(enh.Out, Sample{Point: Point{V: e.Val}}), nil
})
@ -2067,8 +2120,9 @@ type groupedAggregation struct {
reverseHeap vectorByReverseValueHeap
}
// aggregation evaluates an aggregation operation on a Vector.
func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector {
// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
// must be sorted.
func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without bool, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) Vector {
result := map[uint64]*groupedAggregation{}
var k int64
@ -2087,35 +2141,43 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
q = param.(float64)
}
var valueLabel string
var recomputeGroupingKey bool
if op == parser.COUNT_VALUES {
valueLabel = param.(string)
if !model.LabelName(valueLabel).IsValid() {
ev.errorf("invalid label name %q", valueLabel)
}
if !without {
// We're changing the grouping labels so we have to ensure they're still sorted
// and we have to flag to recompute the grouping key. Considering the count_values()
// operator is less frequently used than other aggregations, we're fine having to
// re-compute the grouping key on each step for this case.
grouping = append(grouping, valueLabel)
sort.Strings(grouping)
recomputeGroupingKey = true
}
}
sort.Strings(grouping)
lb := labels.NewBuilder(nil)
buf := make([]byte, 0, 1024)
for _, s := range vec {
var buf []byte
for si, s := range vec {
metric := s.Metric
if op == parser.COUNT_VALUES {
lb.Reset(metric)
lb.Set(valueLabel, strconv.FormatFloat(s.V, 'f', -1, 64))
metric = lb.Labels()
// We've changed the metric so we have to recompute the grouping key.
recomputeGroupingKey = true
}
var (
groupingKey uint64
)
if without {
groupingKey, buf = metric.HashWithoutLabels(buf, grouping...)
// We can use the pre-computed grouping key unless grouping labels have changed.
var groupingKey uint64
if !recomputeGroupingKey {
groupingKey = seriesHelper[si].groupingKey
} else {
groupingKey, buf = metric.HashForLabels(buf, grouping...)
groupingKey, buf = generateGroupingKey(metric, grouping, without, buf)
}
group, ok := result[groupingKey]
@ -2302,6 +2364,21 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
return enh.Out
}
// groupingKey builds and returns the grouping key for the given metric and
// grouping labels.
func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) {
if without {
return metric.HashWithoutLabels(buf, grouping...)
}
if len(grouping) == 0 {
// No need to generate any hash if there are no grouping labels.
return 0, buf
}
return metric.HashForLabels(buf, grouping...)
}
// btos returns 1 if b is true, 0 otherwise.
func btos(b bool) float64 {
if b {