From 59548b8a0b8f7f4e35d9b1fcd43c26f6d7cd10f5 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 27 Feb 2024 08:19:01 +0000 Subject: [PATCH] promql: refactor: move collection of results into aggregation() We don't need to check for duplicates as aggregation cannot generate them. Signed-off-by: Bryan Boreham --- promql/engine.go | 73 ++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 821a0e369..b73364bc8 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1292,7 +1292,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) { - numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 originalNumSamples := ev.currentSamples var warnings annotations.Annotations @@ -1315,11 +1314,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping var vector Vector // Input vectors for the function. biggestLen := len(inputMatrix) enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} - type seriesAndTimestamp struct { - Series - ts int64 - } - seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash. + seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples // Initialise series helpers with the grouping key. @@ -1367,7 +1362,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Make the function call. enh.Ts = ts - result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh) + result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh, seriess) enh.Out = result[:0] // Reuse result vector. warnings.Merge(ws) @@ -1386,9 +1381,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { - if result.ContainsSameLabelset() { - ev.errorf("vector cannot contain metrics with the same labelset") - } mat := make(Matrix, len(result)) for i, s := range result { if s.H == nil { @@ -1401,32 +1393,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } - - // Add samples in output vector to output series. - for _, sample := range result { - h := sample.Metric.Hash() - ss, ok := seriess[h] - if ok { - if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate. - ev.errorf("vector cannot contain metrics with the same labelset") - } - ss.ts = ts - } else { - ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} - } - if sample.H == nil { - if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) - } - ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) - } else { - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) - } - ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) - } - seriess[h] = ss - } } // Reuse the original point slice. @@ -1437,7 +1403,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Assemble the output matrix. By the time we get here we know we don't have too many samples. mat := make(Matrix, 0, len(seriess)) for _, ss := range seriess { - mat = append(mat, ss.Series) + mat = append(mat, ss) } ev.currentSamples = originalNumSamples + mat.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -2778,7 +2744,7 @@ type groupedAggregation struct { // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Vector, annotations.Annotations) { op := e.Op without := e.Without var annos annotations.Annotations @@ -3055,7 +3021,36 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par H: aggr.histogramValue, }) } - return enh.Out, annos + + ts := enh.Ts + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + return enh.Out, annos + } + + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + // Add samples in output vector to output series. + for _, sample := range enh.Out { + h := sample.Metric.Hash() + ss, ok := seriess[h] + if !ok { + ss = Series{Metric: sample.Metric} + } + if sample.H == nil { + if ss.Floats == nil { + ss.Floats = getFPointSlice(numSteps) + } + ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) + } else { + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) + } + seriess[h] = ss + } + + return nil, annos } // aggregationK evaluates count_values on vec.