|
|
|
@ -1067,8 +1067,6 @@ func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Anno
|
|
|
|
|
|
|
|
|
|
// EvalSeriesHelper stores extra information about a series.
|
|
|
|
|
type EvalSeriesHelper struct { |
|
|
|
|
// The grouping key used by aggregation.
|
|
|
|
|
groupingKey uint64 |
|
|
|
|
// Used to map left-hand to right-hand in binary operations.
|
|
|
|
|
signature string |
|
|
|
|
} |
|
|
|
@ -1316,13 +1314,25 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
|
|
|
|
|
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
|
|
|
|
|
tempNumSamples := ev.currentSamples |
|
|
|
|
|
|
|
|
|
// Initialise series helpers with the grouping key.
|
|
|
|
|
// Create a mapping from input series to output groups.
|
|
|
|
|
buf := make([]byte, 0, 1024) |
|
|
|
|
|
|
|
|
|
seriesHelper := make([]EvalSeriesHelper, len(inputMatrix)) |
|
|
|
|
groupToResultIndex := make(map[uint64]int) |
|
|
|
|
seriesToResult := make([]int, len(inputMatrix)) |
|
|
|
|
orderedResult := make([]*groupedAggregation, 0, 16) |
|
|
|
|
|
|
|
|
|
for si, series := range inputMatrix { |
|
|
|
|
seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) |
|
|
|
|
var groupingKey uint64 |
|
|
|
|
groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) |
|
|
|
|
index, ok := groupToResultIndex[groupingKey] |
|
|
|
|
// Add a new group if it doesn't exist.
|
|
|
|
|
if !ok { |
|
|
|
|
m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) |
|
|
|
|
newAgg := &groupedAggregation{labels: m} |
|
|
|
|
index = len(orderedResult) |
|
|
|
|
groupToResultIndex[groupingKey] = index |
|
|
|
|
orderedResult = append(orderedResult, newAgg) |
|
|
|
|
} |
|
|
|
|
seriesToResult[si] = index |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { |
|
|
|
@ -1334,7 +1344,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
|
|
|
|
|
|
|
|
|
|
// Make the function call.
|
|
|
|
|
enh.Ts = ts |
|
|
|
|
result, ws := ev.aggregation(aggExpr, sortedGrouping, param, inputMatrix, seriesHelper, enh, seriess) |
|
|
|
|
result, ws := ev.aggregation(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) |
|
|
|
|
|
|
|
|
|
warnings.Merge(ws) |
|
|
|
|
|
|
|
|
@ -2698,12 +2708,10 @@ type groupedAggregation struct {
|
|
|
|
|
|
|
|
|
|
// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
|
|
|
|
|
// must be sorted.
|
|
|
|
|
func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q float64, inputMatrix Matrix, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { |
|
|
|
|
func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { |
|
|
|
|
op := e.Op |
|
|
|
|
without := e.Without |
|
|
|
|
var annos annotations.Annotations |
|
|
|
|
result := map[uint64]*groupedAggregation{} |
|
|
|
|
orderedResult := []*groupedAggregation{} |
|
|
|
|
seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp.
|
|
|
|
|
k := 1 |
|
|
|
|
if op == parser.TOPK || op == parser.BOTTOMK { |
|
|
|
|
if !convertibleToInt64(q) { |
|
|
|
@ -2743,53 +2751,47 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q f
|
|
|
|
|
ev.error(ErrTooManySamples(env)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
metric := s.Metric |
|
|
|
|
groupingKey := seriesHelper[si].groupingKey |
|
|
|
|
|
|
|
|
|
group, ok := result[groupingKey] |
|
|
|
|
// Add a new group if it doesn't exist.
|
|
|
|
|
if !ok { |
|
|
|
|
m := generateGroupingLabels(enh, metric, without, grouping) |
|
|
|
|
newAgg := &groupedAggregation{ |
|
|
|
|
labels: m, |
|
|
|
|
group := orderedResult[seriesToResult[si]] |
|
|
|
|
// Initialize this group if it's the first time we've seen it.
|
|
|
|
|
if !seen[seriesToResult[si]] { |
|
|
|
|
*group = groupedAggregation{ |
|
|
|
|
labels: group.labels, |
|
|
|
|
floatValue: s.F, |
|
|
|
|
floatMean: s.F, |
|
|
|
|
groupCount: 1, |
|
|
|
|
} |
|
|
|
|
switch { |
|
|
|
|
case s.H == nil: |
|
|
|
|
newAgg.hasFloat = true |
|
|
|
|
group.hasFloat = true |
|
|
|
|
case op == parser.SUM: |
|
|
|
|
newAgg.histogramValue = s.H.Copy() |
|
|
|
|
newAgg.hasHistogram = true |
|
|
|
|
group.histogramValue = s.H.Copy() |
|
|
|
|
group.hasHistogram = true |
|
|
|
|
case op == parser.AVG: |
|
|
|
|
newAgg.histogramMean = s.H.Copy() |
|
|
|
|
newAgg.hasHistogram = true |
|
|
|
|
group.histogramMean = s.H.Copy() |
|
|
|
|
group.hasHistogram = true |
|
|
|
|
case op == parser.STDVAR || op == parser.STDDEV: |
|
|
|
|
newAgg.groupCount = 0 |
|
|
|
|
group.groupCount = 0 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
switch op { |
|
|
|
|
case parser.STDVAR, parser.STDDEV: |
|
|
|
|
newAgg.floatValue = 0 |
|
|
|
|
group.floatValue = 0 |
|
|
|
|
case parser.TOPK, parser.QUANTILE: |
|
|
|
|
newAgg.heap = make(vectorByValueHeap, 1, k) |
|
|
|
|
newAgg.heap[0] = Sample{ |
|
|
|
|
group.heap = make(vectorByValueHeap, 1, k) |
|
|
|
|
group.heap[0] = Sample{ |
|
|
|
|
F: s.F, |
|
|
|
|
Metric: s.Metric, |
|
|
|
|
} |
|
|
|
|
case parser.BOTTOMK: |
|
|
|
|
newAgg.reverseHeap = make(vectorByReverseValueHeap, 1, k) |
|
|
|
|
newAgg.reverseHeap[0] = Sample{ |
|
|
|
|
group.reverseHeap = make(vectorByReverseValueHeap, 1, k) |
|
|
|
|
group.reverseHeap[0] = Sample{ |
|
|
|
|
F: s.F, |
|
|
|
|
Metric: s.Metric, |
|
|
|
|
} |
|
|
|
|
case parser.GROUP: |
|
|
|
|
newAgg.floatValue = 1 |
|
|
|
|
group.floatValue = 1 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
result[groupingKey] = newAgg |
|
|
|
|
orderedResult = append(orderedResult, newAgg) |
|
|
|
|
seen[seriesToResult[si]] = true |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2950,7 +2952,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q f
|
|
|
|
|
seriess[hash] = ss |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for _, aggr := range orderedResult { |
|
|
|
|
for ri, aggr := range orderedResult { |
|
|
|
|
if !seen[ri] { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
switch op { |
|
|
|
|
case parser.AVG: |
|
|
|
|
if aggr.hasFloat && aggr.hasHistogram { |
|
|
|
|