|
|
|
@ -1319,10 +1319,25 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
|
|
|
|
|
seriesToResult[si] = index |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var k int |
|
|
|
|
var seriess map[uint64]Series |
|
|
|
|
switch aggExpr.Op { |
|
|
|
|
case parser.TOPK, parser.BOTTOMK: |
|
|
|
|
if !convertibleToInt64(param) { |
|
|
|
|
ev.errorf("Scalar value %v overflows int64", param) |
|
|
|
|
} |
|
|
|
|
k = int(param) |
|
|
|
|
if k > len(inputMatrix) { |
|
|
|
|
k = len(inputMatrix) |
|
|
|
|
} |
|
|
|
|
if k < 1 { |
|
|
|
|
return nil, warnings |
|
|
|
|
} |
|
|
|
|
seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash.
|
|
|
|
|
case parser.QUANTILE: |
|
|
|
|
if math.IsNaN(param) || param < 0 || param > 1 { |
|
|
|
|
warnings.Add(annotations.NewInvalidQuantileWarning(param, aggExpr.Param.PositionRange())) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { |
|
|
|
@ -1337,7 +1352,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
|
|
|
|
|
var ws annotations.Annotations |
|
|
|
|
switch aggExpr.Op { |
|
|
|
|
case parser.TOPK, parser.BOTTOMK: |
|
|
|
|
result, ws = ev.aggregationK(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) |
|
|
|
|
result, ws = ev.aggregationK(aggExpr, k, inputMatrix, seriesToResult, orderedResult, enh, seriess) |
|
|
|
|
// If this could be an instant query, shortcut so as not to change sort order.
|
|
|
|
|
if ev.endTimestamp == ev.startTimestamp { |
|
|
|
|
return result, ws |
|
|
|
@ -2733,11 +2748,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|
|
|
|
op := e.Op |
|
|
|
|
var annos annotations.Annotations |
|
|
|
|
seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp.
|
|
|
|
|
if op == parser.QUANTILE { |
|
|
|
|
if math.IsNaN(q) || q < 0 || q > 1 { |
|
|
|
|
annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange())) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for si := range inputMatrix { |
|
|
|
|
f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) |
|
|
|
@ -2919,21 +2929,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|
|
|
|
// seriesToResult maps inputMatrix indexes to groups indexes.
|
|
|
|
|
// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk.
|
|
|
|
|
// For a range query, aggregates output in the seriess map.
|
|
|
|
|
func (ev *evaluator) aggregationK(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { |
|
|
|
|
func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { |
|
|
|
|
op := e.Op |
|
|
|
|
var s Sample |
|
|
|
|
var annos annotations.Annotations |
|
|
|
|
seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp.
|
|
|
|
|
if !convertibleToInt64(q) { |
|
|
|
|
ev.errorf("Scalar value %v overflows int64", q) |
|
|
|
|
} |
|
|
|
|
k := int(q) |
|
|
|
|
if k > len(inputMatrix) { |
|
|
|
|
k = len(inputMatrix) |
|
|
|
|
} |
|
|
|
|
if k < 1 { |
|
|
|
|
return nil, annos |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for si := range inputMatrix { |
|
|
|
|
f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) |
|
|
|
|