promql: refactor: pull fetching input data out of rangeEvalAgg

This is a cleaner split of responsibilities.
We now check the sample count after calling rangeEvalAgg.
Changed re-use of samples to use `Clone` and `defer`.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/13744/head
Bryan Boreham 2024-04-05 11:56:04 +01:00
parent 602eb69edf
commit 74eed67ef6
1 changed files with 32 additions and 30 deletions

View File

@ -1279,29 +1279,19 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
return mat, warnings
}
func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) {
originalNumSamples := ev.currentSamples
func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) {
// Keep a copy of the original point slice so that it can be returned to the pool.
origMatrix := slices.Clone(inputMatrix)
defer func() {
for _, s := range origMatrix {
putFPointSlice(s.Floats)
putHPointSlice(s.Histograms)
}
}()
var warnings annotations.Annotations
// param is the number k for topk/bottomk, or q for quantile.
var param float64
if aggExpr.Param != nil {
val, ws := ev.eval(aggExpr.Param)
warnings.Merge(ws)
param = val.(Matrix)[0].Floats[0].F
}
// Now fetch the data to be aggregated.
// ev.currentSamples will be updated to the correct value within the ev.eval call.
val, ws := ev.eval(aggExpr.Expr)
warnings.Merge(ws)
inputMatrix := val.(Matrix)
// Keep a copy of the original point slice so that it can be returned to the pool.
origMatrix := inputMatrix
biggestLen := len(inputMatrix)
enh := &EvalNodeHelper{}
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
// Create a mapping from input series to output groups.
@ -1325,6 +1315,8 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
seriesToResult[si] = index
}
seriess := make(map[uint64]Series, len(inputMatrix)) // Output series by series hash.
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
@ -1340,8 +1332,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
ev.currentSamples = originalNumSamples + result.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return result, warnings
}
if ev.currentSamples > ev.maxSamples {
@ -1349,18 +1339,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
}
}
// Reuse the original point slice.
for _, s := range origMatrix {
putFPointSlice(s.Floats)
putHPointSlice(s.Histograms)
}
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make(Matrix, 0, len(seriess))
for _, ss := range seriess {
mat = append(mat, ss)
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}
@ -1434,7 +1417,26 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
}, e.Expr)
}
return ev.rangeEvalAgg(e, sortedGrouping)
var warnings annotations.Annotations
originalNumSamples := ev.currentSamples
// param is the number k for topk/bottomk, or q for quantile.
var fParam float64
if param != nil {
val, ws := ev.eval(param)
warnings.Merge(ws)
fParam = val.(Matrix)[0].Floats[0].F
}
// Now fetch the data to be aggregated.
val, ws := ev.eval(e.Expr)
warnings.Merge(ws)
inputMatrix := val.(Matrix)
result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam)
warnings.Merge(ws)
ev.currentSamples = originalNumSamples + result.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return result, warnings
case *parser.Call:
call := FunctionCalls[e.Func.Name]