diff --git a/promql/engine.go b/promql/engine.go index 592114db2..0dd33a7f9 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1319,10 +1319,25 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriesToResult[si] = index } + var k int var seriess map[uint64]Series switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK: + if !convertibleToInt64(param) { + ev.errorf("Scalar value %v overflows int64", param) + } + k = int(param) + if k > len(inputMatrix) { + k = len(inputMatrix) + } + if k < 1 { + return nil, warnings + } seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + case parser.QUANTILE: + if math.IsNaN(param) || param < 0 || param > 1 { + warnings.Add(annotations.NewInvalidQuantileWarning(param, aggExpr.Param.PositionRange())) + } } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { @@ -1337,7 +1352,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping var ws annotations.Annotations switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK: - result, ws = ev.aggregationK(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) + result, ws = ev.aggregationK(aggExpr, k, inputMatrix, seriesToResult, orderedResult, enh, seriess) // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { return result, ws @@ -2733,11 +2748,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix op := e.Op var annos annotations.Annotations seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. - if op == parser.QUANTILE { - if math.IsNaN(q) || q < 0 || q > 1 { - annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange())) - } - } for si := range inputMatrix { f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) @@ -2919,21 +2929,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // seriesToResult maps inputMatrix indexes to groups indexes. // For an instant query, returns a Matrix in descending order for topk or ascending for bottomk. // For a range query, aggregates output in the seriess map. -func (ev *evaluator) aggregationK(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op var s Sample var annos annotations.Annotations seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. - if !convertibleToInt64(q) { - ev.errorf("Scalar value %v overflows int64", q) - } - k := int(q) - if k > len(inputMatrix) { - k = len(inputMatrix) - } - if k < 1 { - return nil, annos - } for si := range inputMatrix { f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si])