From 2f03acbafc08dd83cc8a5c3d94b2f071bb34f809 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 5 Apr 2024 12:22:44 +0100 Subject: [PATCH] promql: refactor: split topk/bottomk from sum/avg/etc They aggregate results in different ways. topk/bottomk don't consider histograms so can simplify data collection. Signed-off-by: Bryan Boreham --- promql/engine.go | 264 ++++++++++++++++++++++++++++------------------- 1 file changed, 159 insertions(+), 105 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 770550dac..22428e12c 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1299,6 +1299,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping groupToResultIndex := make(map[uint64]int) seriesToResult := make([]int, len(inputMatrix)) orderedResult := make([]*groupedAggregation, 0, 16) + var result Matrix for si, series := range inputMatrix { var groupingKey uint64 @@ -1306,8 +1307,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping index, ok := groupToResultIndex[groupingKey] // Add a new group if it doesn't exist. if !ok { - m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) - newAgg := &groupedAggregation{labels: m} + if aggExpr.Op != parser.TOPK && aggExpr.Op != parser.BOTTOMK { + m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) + result = append(result, Series{Metric: m}) + } + newAgg := &groupedAggregation{} index = len(orderedResult) groupToResultIndex[groupingKey] = index orderedResult = append(orderedResult, newAgg) @@ -1315,7 +1319,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriesToResult[si] = index } - seriess := make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + var seriess map[uint64]Series + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { @@ -1326,25 +1334,44 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Make the function call. enh.Ts = ts - result, ws := ev.aggregation(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) + var ws annotations.Annotations + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + result, ws = ev.aggregationK(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + return result, ws + } + default: + ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, orderedResult, enh) + } warnings.Merge(ws) - // If this could be an instant query, shortcut so as not to change sort order. - if ev.endTimestamp == ev.startTimestamp { - return result, warnings - } if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } } // Assemble the output matrix. By the time we get here we know we don't have too many samples. - mat := make(Matrix, 0, len(seriess)) - for _, ss := range seriess { - mat = append(mat, ss) + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + result = make(Matrix, 0, len(seriess)) + for _, ss := range seriess { + result = append(result, ss) + } + default: + // Remove empty result rows. + dst := 0 + for _, series := range result { + if len(series.Floats) > 0 || len(series.Histograms) > 0 { + result[dst] = series + dst++ + } + } + result = result[:dst] } - return mat, warnings + return result, warnings } // evalSubquery evaluates given SubqueryExpr and returns an equivalent @@ -2698,25 +2725,14 @@ type groupedAggregation struct { reverseHeap vectorByReverseValueHeap } -// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels -// must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. +// These functions produce one output series for each group specified in the expression, with just the labels from `by(...)`. +// outputMatrix should be already populated with grouping labels; groups is one-to-one with outputMatrix. +// seriesToResult maps inputMatrix indexes to outputMatrix indexes. +func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix, outputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper) annotations.Annotations { op := e.Op var annos annotations.Annotations seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. - k := 1 - if op == parser.TOPK || op == parser.BOTTOMK { - if !convertibleToInt64(q) { - ev.errorf("Scalar value %v overflows int64", q) - } - k = int(q) - if k > len(inputMatrix) { - k = len(inputMatrix) - } - if k < 1 { - return nil, annos - } - } if op == parser.QUANTILE { if math.IsNaN(q) || q < 0 || q > 1 { annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange())) @@ -2733,7 +2749,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // Initialize this group if it's the first time we've seen it. if !seen[seriesToResult[si]] { *group = groupedAggregation{ - labels: group.labels, floatValue: s.F, floatMean: s.F, groupCount: 1, @@ -2754,18 +2769,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix switch op { case parser.STDVAR, parser.STDDEV: group.floatValue = 0 - case parser.TOPK, parser.QUANTILE: - group.heap = make(vectorByValueHeap, 1, k) + case parser.QUANTILE: + group.heap = make(vectorByValueHeap, 1) group.heap[0] = Sample{ F: s.F, Metric: s.Metric, } - case parser.BOTTOMK: - group.reverseHeap = make(vectorByReverseValueHeap, 1, k) - group.reverseHeap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } case parser.GROUP: group.floatValue = 1 } @@ -2848,20 +2857,118 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.floatValue += delta * (s.F - group.floatMean) } + case parser.QUANTILE: + group.heap = append(group.heap, s) + + default: + panic(fmt.Errorf("expected aggregation operator but got %q", op)) + } + } + + // Construct the output matrix from the aggregated groups. + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + + for ri, aggr := range orderedResult { + if !seen[ri] { + continue + } + switch op { + case parser.AVG: + if aggr.hasFloat && aggr.hasHistogram { + // We cannot aggregate histogram sample with a float64 sample. + annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) + continue + } + if aggr.hasHistogram { + aggr.histogramValue = aggr.histogramMean.Compact(0) + } else { + aggr.floatValue = aggr.floatMean + } + + case parser.COUNT: + aggr.floatValue = float64(aggr.groupCount) + + case parser.STDVAR: + aggr.floatValue /= float64(aggr.groupCount) + + case parser.STDDEV: + aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount)) + + case parser.QUANTILE: + aggr.floatValue = quantile(q, aggr.heap) + + case parser.SUM: + if aggr.hasFloat && aggr.hasHistogram { + // We cannot aggregate histogram sample with a float64 sample. + annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) + continue + } + if aggr.hasHistogram { + aggr.histogramValue.Compact(0) + } + default: + // For other aggregations, we already have the right value. + } + + ss := &outputMatrix[ri] + addToSeries(ss, enh.Ts, aggr.floatValue, aggr.histogramValue, numSteps) + } + + return annos +} + +// aggregationK evaluates topk or bottomk at one timestep on inputMatrix. +// Output that has the same labels as the input, but just k of them per group. +// seriesToResult maps inputMatrix indexes to groups indexes. +// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk. +// For a range query, aggregates output in the seriess map. +func (ev *evaluator) aggregationK(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { + op := e.Op + var annos annotations.Annotations + seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. + if !convertibleToInt64(q) { + ev.errorf("Scalar value %v overflows int64", q) + } + k := int(q) + if k > len(inputMatrix) { + k = len(inputMatrix) + } + if k < 1 { + return nil, annos + } + + for si := range inputMatrix { + s, ok := ev.nextSample(enh.Ts, inputMatrix, si) + if !ok { + continue + } + + group := orderedResult[seriesToResult[si]] + // Initialize this group if it's the first time we've seen it. + if !seen[seriesToResult[si]] { + *group = groupedAggregation{} + + switch op { + case parser.TOPK: + group.heap = make(vectorByValueHeap, 1, k) + group.heap[0] = s + case parser.BOTTOMK: + group.reverseHeap = make(vectorByReverseValueHeap, 1, k) + group.reverseHeap[0] = s + } + seen[seriesToResult[si]] = true + continue + } + + switch op { case parser.TOPK: // We build a heap of up to k elements, with the smallest element at heap[0]. switch { case len(group.heap) < k: - heap.Push(&group.heap, &Sample{ - F: s.F, - Metric: s.Metric, - }) + heap.Push(&group.heap, &s) case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): // This new element is bigger than the previous smallest element - overwrite that. - group.heap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } + group.heap[0] = s if k > 1 { heap.Fix(&group.heap, 0) // Maintain the heap invariant. } @@ -2871,24 +2978,15 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // We build a heap of up to k elements, with the biggest element at heap[0]. switch { case len(group.reverseHeap) < k: - heap.Push(&group.reverseHeap, &Sample{ - F: s.F, - Metric: s.Metric, - }) + heap.Push(&group.reverseHeap, &s) case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)): // This new element is smaller than the previous biggest element - overwrite that. - group.reverseHeap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } + group.reverseHeap[0] = s if k > 1 { heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant. } } - case parser.QUANTILE: - group.heap = append(group.heap, s) - default: panic(fmt.Errorf("expected aggregation operator but got %q", op)) } @@ -2901,14 +2999,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix mat = make(Matrix, 0, len(orderedResult)) } - add := func(lbls labels.Labels, f float64, h *histogram.FloatHistogram) { + add := func(lbls labels.Labels, f float64) { // If this could be an instant query, add directly to the matrix so the result is in consistent order. if ev.endTimestamp == ev.startTimestamp { - if h == nil { - mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}}) - } else { - mat = append(mat, Series{Metric: lbls, Histograms: []HPoint{{T: enh.Ts, H: h}}}) - } + mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}}) } else { // Otherwise the results are added into seriess elements. hash := lbls.Hash() @@ -2916,7 +3010,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if !ok { ss = Series{Metric: lbls} } - addToSeries(&ss, enh.Ts, f, h, numSteps) + addToSeries(&ss, enh.Ts, f, nil, numSteps) seriess[hash] = ss } } @@ -2925,36 +3019,14 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix continue } switch op { - case parser.AVG: - if aggr.hasFloat && aggr.hasHistogram { - // We cannot aggregate histogram sample with a float64 sample. - annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) - continue - } - if aggr.hasHistogram { - aggr.histogramValue = aggr.histogramMean.Compact(0) - } else { - aggr.floatValue = aggr.floatMean - } - - case parser.COUNT: - aggr.floatValue = float64(aggr.groupCount) - - case parser.STDVAR: - aggr.floatValue /= float64(aggr.groupCount) - - case parser.STDDEV: - aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount)) - case parser.TOPK: // The heap keeps the lowest value on top, so reverse it. if len(aggr.heap) > 1 { sort.Sort(sort.Reverse(aggr.heap)) } for _, v := range aggr.heap { - add(v.Metric, v.F, nil) + add(v.Metric, v.F) } - continue // Bypass default append. case parser.BOTTOMK: // The heap keeps the highest value on top, so reverse it. @@ -2962,27 +3034,9 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix sort.Sort(sort.Reverse(aggr.reverseHeap)) } for _, v := range aggr.reverseHeap { - add(v.Metric, v.F, nil) - } - continue // Bypass default append. - - case parser.QUANTILE: - aggr.floatValue = quantile(q, aggr.heap) - - case parser.SUM: - if aggr.hasFloat && aggr.hasHistogram { - // We cannot aggregate histogram sample with a float64 sample. - annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) - continue - } - if aggr.hasHistogram { - aggr.histogramValue.Compact(0) + add(v.Metric, v.F) } - default: - // For other aggregations, we already have the right value. } - - add(aggr.labels, aggr.floatValue, aggr.histogramValue) } return mat, annos