diff --git a/promql/engine.go b/promql/engine.go index 35dc52942..140c0a0e4 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1067,8 +1067,6 @@ func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Anno // EvalSeriesHelper stores extra information about a series. type EvalSeriesHelper struct { - // The grouping key used by aggregation. - groupingKey uint64 // Used to map left-hand to right-hand in binary operations. signature string } @@ -1316,13 +1314,25 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples - // Initialise series helpers with the grouping key. + // Create a mapping from input series to output groups. buf := make([]byte, 0, 1024) - - seriesHelper := make([]EvalSeriesHelper, len(inputMatrix)) + groupToResultIndex := make(map[uint64]int) + seriesToResult := make([]int, len(inputMatrix)) + orderedResult := make([]*groupedAggregation, 0, 16) for si, series := range inputMatrix { - seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) + var groupingKey uint64 + groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) + index, ok := groupToResultIndex[groupingKey] + // Add a new group if it doesn't exist. + if !ok { + m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) + newAgg := &groupedAggregation{labels: m} + index = len(orderedResult) + groupToResultIndex[groupingKey] = index + orderedResult = append(orderedResult, newAgg) + } + seriesToResult[si] = index } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { @@ -1334,7 +1344,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Make the function call. enh.Ts = ts - result, ws := ev.aggregation(aggExpr, sortedGrouping, param, inputMatrix, seriesHelper, enh, seriess) + result, ws := ev.aggregation(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) warnings.Merge(ws) @@ -2698,12 +2708,10 @@ type groupedAggregation struct { // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q float64, inputMatrix Matrix, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op - without := e.Without var annos annotations.Annotations - result := map[uint64]*groupedAggregation{} - orderedResult := []*groupedAggregation{} + seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. k := 1 if op == parser.TOPK || op == parser.BOTTOMK { if !convertibleToInt64(q) { @@ -2743,53 +2751,47 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q f ev.error(ErrTooManySamples(env)) } - metric := s.Metric - groupingKey := seriesHelper[si].groupingKey - - group, ok := result[groupingKey] - // Add a new group if it doesn't exist. - if !ok { - m := generateGroupingLabels(enh, metric, without, grouping) - newAgg := &groupedAggregation{ - labels: m, + group := orderedResult[seriesToResult[si]] + // Initialize this group if it's the first time we've seen it. + if !seen[seriesToResult[si]] { + *group = groupedAggregation{ + labels: group.labels, floatValue: s.F, floatMean: s.F, groupCount: 1, } switch { case s.H == nil: - newAgg.hasFloat = true + group.hasFloat = true case op == parser.SUM: - newAgg.histogramValue = s.H.Copy() - newAgg.hasHistogram = true + group.histogramValue = s.H.Copy() + group.hasHistogram = true case op == parser.AVG: - newAgg.histogramMean = s.H.Copy() - newAgg.hasHistogram = true + group.histogramMean = s.H.Copy() + group.hasHistogram = true case op == parser.STDVAR || op == parser.STDDEV: - newAgg.groupCount = 0 + group.groupCount = 0 } switch op { case parser.STDVAR, parser.STDDEV: - newAgg.floatValue = 0 + group.floatValue = 0 case parser.TOPK, parser.QUANTILE: - newAgg.heap = make(vectorByValueHeap, 1, k) - newAgg.heap[0] = Sample{ + group.heap = make(vectorByValueHeap, 1, k) + group.heap[0] = Sample{ F: s.F, Metric: s.Metric, } case parser.BOTTOMK: - newAgg.reverseHeap = make(vectorByReverseValueHeap, 1, k) - newAgg.reverseHeap[0] = Sample{ + group.reverseHeap = make(vectorByReverseValueHeap, 1, k) + group.reverseHeap[0] = Sample{ F: s.F, Metric: s.Metric, } case parser.GROUP: - newAgg.floatValue = 1 + group.floatValue = 1 } - - result[groupingKey] = newAgg - orderedResult = append(orderedResult, newAgg) + seen[seriesToResult[si]] = true continue } @@ -2950,7 +2952,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q f seriess[hash] = ss } } - for _, aggr := range orderedResult { + for ri, aggr := range orderedResult { + if !seen[ri] { + continue + } switch op { case parser.AVG: if aggr.hasFloat && aggr.hasHistogram {