Merge pull request #13744 from bboreham/wip-aggr-index

[ENHANCEMENT] PromQL: Re-structure aggregations for clarity and performance
pull/13898/head
Bryan Boreham 2024-04-05 16:34:57 +01:00 committed by GitHub
commit 2278d2377c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 406 additions and 242 deletions

View File

@ -1067,8 +1067,6 @@ func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Anno
// EvalSeriesHelper stores extra information about a series.
type EvalSeriesHelper struct {
// The grouping key used by aggregation.
groupingKey uint64
// Used to map left-hand to right-hand in binary operations.
signature string
}
@ -1259,17 +1257,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
} else {
ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts}
}
if sample.H == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
}
ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F})
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H})
}
addToSeries(&ss.Series, enh.Ts, sample.F, sample.H, numSteps)
seriess[h] = ss
}
}
@ -1291,6 +1279,116 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
return mat, warnings
}
func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) {
// Keep a copy of the original point slice so that it can be returned to the pool.
origMatrix := slices.Clone(inputMatrix)
defer func() {
for _, s := range origMatrix {
putFPointSlice(s.Floats)
putHPointSlice(s.Histograms)
}
}()
var warnings annotations.Annotations
enh := &EvalNodeHelper{}
tempNumSamples := ev.currentSamples
// Create a mapping from input series to output groups.
buf := make([]byte, 0, 1024)
groupToResultIndex := make(map[uint64]int)
seriesToResult := make([]int, len(inputMatrix))
var result Matrix
groupCount := 0
for si, series := range inputMatrix {
var groupingKey uint64
groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf)
index, ok := groupToResultIndex[groupingKey]
// Add a new group if it doesn't exist.
if !ok {
if aggExpr.Op != parser.TOPK && aggExpr.Op != parser.BOTTOMK {
m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping)
result = append(result, Series{Metric: m})
}
index = groupCount
groupToResultIndex[groupingKey] = index
groupCount++
}
seriesToResult[si] = index
}
groups := make([]groupedAggregation, groupCount)
var k int
var seriess map[uint64]Series
switch aggExpr.Op {
case parser.TOPK, parser.BOTTOMK:
if !convertibleToInt64(param) {
ev.errorf("Scalar value %v overflows int64", param)
}
k = int(param)
if k > len(inputMatrix) {
k = len(inputMatrix)
}
if k < 1 {
return nil, warnings
}
seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash.
case parser.QUANTILE:
if math.IsNaN(param) || param < 0 || param > 1 {
warnings.Add(annotations.NewInvalidQuantileWarning(param, aggExpr.Param.PositionRange()))
}
}
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
}
// Reset number of samples in memory after each timestamp.
ev.currentSamples = tempNumSamples
// Make the function call.
enh.Ts = ts
var ws annotations.Annotations
switch aggExpr.Op {
case parser.TOPK, parser.BOTTOMK:
result, ws = ev.aggregationK(aggExpr, k, inputMatrix, seriesToResult, groups, enh, seriess)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
return result, ws
}
default:
ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, groups, enh)
}
warnings.Merge(ws)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
switch aggExpr.Op {
case parser.TOPK, parser.BOTTOMK:
result = make(Matrix, 0, len(seriess))
for _, ss := range seriess {
result = append(result, ss)
}
default:
// Remove empty result rows.
dst := 0
for _, series := range result {
if len(series.Floats) > 0 || len(series.Histograms) > 0 {
result[dst] = series
dst++
}
}
result = result[:dst]
}
return result, warnings
}
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) {
@ -1343,28 +1441,44 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
sortedGrouping := e.Grouping
slices.Sort(sortedGrouping)
// Prepare a function to initialise series helpers with the grouping key.
buf := make([]byte, 0, 1024)
initSeries := func(series labels.Labels, h *EvalSeriesHelper) {
h.groupingKey, buf = generateGroupingKey(series, sortedGrouping, e.Without, buf)
}
unwrapParenExpr(&e.Param)
param := unwrapStepInvariantExpr(e.Param)
unwrapParenExpr(&param)
if s, ok := param.(*parser.StringLiteral); ok {
return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.aggregation(e, sortedGrouping, s.Val, v[0].(Vector), sh[0], enh)
if e.Op == parser.COUNT_VALUES {
valueLabel := param.(*parser.StringLiteral)
if !model.LabelName(valueLabel.Val).IsValid() {
ev.errorf("invalid label name %q", valueLabel)
}
if !e.Without {
sortedGrouping = append(sortedGrouping, valueLabel.Val)
slices.Sort(sortedGrouping)
}
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh)
}, e.Expr)
}
return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
var param float64
if e.Param != nil {
param = v[0].(Vector)[0].F
}
return ev.aggregation(e, sortedGrouping, param, v[1].(Vector), sh[1], enh)
}, e.Param, e.Expr)
var warnings annotations.Annotations
originalNumSamples := ev.currentSamples
// param is the number k for topk/bottomk, or q for quantile.
var fParam float64
if param != nil {
val, ws := ev.eval(param)
warnings.Merge(ws)
fParam = val.(Matrix)[0].Floats[0].F
}
// Now fetch the data to be aggregated.
val, ws := ev.eval(e.Expr)
warnings.Merge(ws)
inputMatrix := val.(Matrix)
result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam)
warnings.Merge(ws)
ev.currentSamples = originalNumSamples + result.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return result, warnings
case *parser.Call:
call := FunctionCalls[e.Func.Name]
@ -2614,171 +2728,85 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram
}
type groupedAggregation struct {
seen bool // Was this output groups seen in the input at this timestamp.
hasFloat bool // Has at least 1 float64 sample aggregated.
hasHistogram bool // Has at least 1 histogram sample aggregated.
labels labels.Labels
floatValue float64
histogramValue *histogram.FloatHistogram
floatMean float64
histogramMean *histogram.FloatHistogram
groupCount int
heap vectorByValueHeap
reverseHeap vectorByReverseValueHeap
}
// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
// must be sorted.
func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
// These functions produce one output series for each group specified in the expression, with just the labels from `by(...)`.
// outputMatrix should be already populated with grouping labels; groups is one-to-one with outputMatrix.
// seriesToResult maps inputMatrix indexes to outputMatrix indexes.
func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix, outputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper) annotations.Annotations {
op := e.Op
without := e.Without
var annos annotations.Annotations
result := map[uint64]*groupedAggregation{}
orderedResult := []*groupedAggregation{}
var k int64
if op == parser.TOPK || op == parser.BOTTOMK {
f := param.(float64)
if !convertibleToInt64(f) {
ev.errorf("Scalar value %v overflows int64", f)
}
k = int64(f)
if k < 1 {
return Vector{}, annos
}
}
var q float64
if op == parser.QUANTILE {
q = param.(float64)
}
var valueLabel string
var recomputeGroupingKey bool
if op == parser.COUNT_VALUES {
valueLabel = param.(string)
if !model.LabelName(valueLabel).IsValid() {
ev.errorf("invalid label name %q", valueLabel)
}
if !without {
// We're changing the grouping labels so we have to ensure they're still sorted
// and we have to flag to recompute the grouping key. Considering the count_values()
// operator is less frequently used than other aggregations, we're fine having to
// re-compute the grouping key on each step for this case.
grouping = append(grouping, valueLabel)
slices.Sort(grouping)
recomputeGroupingKey = true
}
for i := range groups {
groups[i].seen = false
}
var buf []byte
for si, s := range vec {
metric := s.Metric
if op == parser.COUNT_VALUES {
enh.resetBuilder(metric)
enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64))
metric = enh.lb.Labels()
// We've changed the metric so we have to recompute the grouping key.
recomputeGroupingKey = true
}
// We can use the pre-computed grouping key unless grouping labels have changed.
var groupingKey uint64
if !recomputeGroupingKey {
groupingKey = seriesHelper[si].groupingKey
} else {
groupingKey, buf = generateGroupingKey(metric, grouping, without, buf)
}
group, ok := result[groupingKey]
// Add a new group if it doesn't exist.
for si := range inputMatrix {
f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si])
if !ok {
var m labels.Labels
enh.resetBuilder(metric)
switch {
case without:
enh.lb.Del(grouping...)
enh.lb.Del(labels.MetricName)
m = enh.lb.Labels()
case len(grouping) > 0:
enh.lb.Keep(grouping...)
m = enh.lb.Labels()
default:
m = labels.EmptyLabels()
}
newAgg := &groupedAggregation{
labels: m,
floatValue: s.F,
floatMean: s.F,
continue
}
group := &groups[seriesToResult[si]]
// Initialize this group if it's the first time we've seen it.
if !group.seen {
*group = groupedAggregation{
seen: true,
floatValue: f,
floatMean: f,
groupCount: 1,
}
switch {
case s.H == nil:
newAgg.hasFloat = true
case op == parser.SUM:
newAgg.histogramValue = s.H.Copy()
newAgg.hasHistogram = true
case op == parser.AVG:
newAgg.histogramMean = s.H.Copy()
newAgg.hasHistogram = true
case op == parser.STDVAR || op == parser.STDDEV:
newAgg.groupCount = 0
}
result[groupingKey] = newAgg
orderedResult = append(orderedResult, newAgg)
inputVecLen := int64(len(vec))
resultSize := k
switch {
case k > inputVecLen:
resultSize = inputVecLen
case k == 0:
resultSize = 1
}
switch op {
case parser.SUM, parser.AVG:
if h == nil {
group.hasFloat = true
} else {
group.histogramValue = h.Copy()
group.hasHistogram = true
}
case parser.STDVAR, parser.STDDEV:
result[groupingKey].floatValue = 0
case parser.TOPK, parser.QUANTILE:
result[groupingKey].heap = make(vectorByValueHeap, 1, resultSize)
result[groupingKey].heap[0] = Sample{
F: s.F,
Metric: s.Metric,
}
case parser.BOTTOMK:
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 1, resultSize)
result[groupingKey].reverseHeap[0] = Sample{
F: s.F,
Metric: s.Metric,
}
group.floatValue = 0
case parser.QUANTILE:
group.heap = make(vectorByValueHeap, 1)
group.heap[0] = Sample{F: f}
case parser.GROUP:
result[groupingKey].floatValue = 1
group.floatValue = 1
}
continue
}
switch op {
case parser.SUM:
if s.H != nil {
if h != nil {
group.hasHistogram = true
if group.histogramValue != nil {
group.histogramValue.Add(s.H)
group.histogramValue.Add(h)
}
// Otherwise the aggregation contained floats
// previously and will be invalid anyway. No
// point in copying the histogram in that case.
} else {
group.hasFloat = true
group.floatValue += s.F
group.floatValue += f
}
case parser.AVG:
group.groupCount++
if s.H != nil {
if h != nil {
group.hasHistogram = true
if group.histogramMean != nil {
left := s.H.Copy().Div(float64(group.groupCount))
right := group.histogramMean.Copy().Div(float64(group.groupCount))
if group.histogramValue != nil {
left := h.Copy().Div(float64(group.groupCount))
right := group.histogramValue.Copy().Div(float64(group.groupCount))
toAdd := left.Sub(right)
group.histogramMean.Add(toAdd)
group.histogramValue.Add(toAdd)
}
// Otherwise the aggregation contained floats
// previously and will be invalid anyway. No
@ -2786,13 +2814,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
} else {
group.hasFloat = true
if math.IsInf(group.floatMean, 0) {
if math.IsInf(s.F, 0) && (group.floatMean > 0) == (s.F > 0) {
if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) {
// The `floatMean` and `s.F` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `floatMean` is correct
// already.
break
}
if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) {
if !math.IsInf(f, 0) && !math.IsNaN(f) {
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
@ -2803,81 +2831,48 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
}
}
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
group.floatMean += s.F/float64(group.groupCount) - group.floatMean/float64(group.groupCount)
group.floatMean += f/float64(group.groupCount) - group.floatMean/float64(group.groupCount)
}
case parser.GROUP:
// Do nothing. Required to avoid the panic in `default:` below.
case parser.MAX:
if group.floatValue < s.F || math.IsNaN(group.floatValue) {
group.floatValue = s.F
if group.floatValue < f || math.IsNaN(group.floatValue) {
group.floatValue = f
}
case parser.MIN:
if group.floatValue > s.F || math.IsNaN(group.floatValue) {
group.floatValue = s.F
if group.floatValue > f || math.IsNaN(group.floatValue) {
group.floatValue = f
}
case parser.COUNT, parser.COUNT_VALUES:
case parser.COUNT:
group.groupCount++
case parser.STDVAR, parser.STDDEV:
if s.H == nil { // Ignore native histograms.
if h == nil { // Ignore native histograms.
group.groupCount++
delta := s.F - group.floatMean
delta := f - group.floatMean
group.floatMean += delta / float64(group.groupCount)
group.floatValue += delta * (s.F - group.floatMean)
}
case parser.TOPK:
// We build a heap of up to k elements, with the smallest element at heap[0].
switch {
case int64(len(group.heap)) < k:
heap.Push(&group.heap, &Sample{
F: s.F,
Metric: s.Metric,
})
case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
// This new element is bigger than the previous smallest element - overwrite that.
group.heap[0] = Sample{
F: s.F,
Metric: s.Metric,
}
if k > 1 {
heap.Fix(&group.heap, 0) // Maintain the heap invariant.
}
}
case parser.BOTTOMK:
// We build a heap of up to k elements, with the biggest element at heap[0].
switch {
case int64(len(group.reverseHeap)) < k:
heap.Push(&group.reverseHeap, &Sample{
F: s.F,
Metric: s.Metric,
})
case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)):
// This new element is smaller than the previous biggest element - overwrite that.
group.reverseHeap[0] = Sample{
F: s.F,
Metric: s.Metric,
}
if k > 1 {
heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant.
}
group.floatValue += delta * (f - group.floatMean)
}
case parser.QUANTILE:
group.heap = append(group.heap, s)
group.heap = append(group.heap, Sample{F: f})
default:
panic(fmt.Errorf("expected aggregation operator but got %q", op))
}
}
// Construct the result Vector from the aggregated groups.
for _, aggr := range orderedResult {
// Construct the output matrix from the aggregated groups.
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
for ri, aggr := range groups {
if !aggr.seen {
continue
}
switch op {
case parser.AVG:
if aggr.hasFloat && aggr.hasHistogram {
@ -2886,12 +2881,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
continue
}
if aggr.hasHistogram {
aggr.histogramValue = aggr.histogramMean.Compact(0)
aggr.histogramValue = aggr.histogramValue.Compact(0)
} else {
aggr.floatValue = aggr.floatMean
}
case parser.COUNT, parser.COUNT_VALUES:
case parser.COUNT:
aggr.floatValue = float64(aggr.groupCount)
case parser.STDVAR:
@ -2900,36 +2895,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
case parser.STDDEV:
aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount))
case parser.TOPK:
// The heap keeps the lowest value on top, so reverse it.
if len(aggr.heap) > 1 {
sort.Sort(sort.Reverse(aggr.heap))
}
for _, v := range aggr.heap {
enh.Out = append(enh.Out, Sample{
Metric: v.Metric,
F: v.F,
})
}
continue // Bypass default append.
case parser.BOTTOMK:
// The heap keeps the highest value on top, so reverse it.
if len(aggr.reverseHeap) > 1 {
sort.Sort(sort.Reverse(aggr.reverseHeap))
}
for _, v := range aggr.reverseHeap {
enh.Out = append(enh.Out, Sample{
Metric: v.Metric,
F: v.F,
})
}
continue // Bypass default append.
case parser.QUANTILE:
if math.IsNaN(q) || q < 0 || q > 1 {
annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange()))
}
aggr.floatValue = quantile(q, aggr.heap)
case parser.SUM:
@ -2945,13 +2911,196 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
// For other aggregations, we already have the right value.
}
ss := &outputMatrix[ri]
addToSeries(ss, enh.Ts, aggr.floatValue, aggr.histogramValue, numSteps)
}
return annos
}
// aggregationK evaluates topk or bottomk at one timestep on inputMatrix.
// Output that has the same labels as the input, but just k of them per group.
// seriesToResult maps inputMatrix indexes to groups indexes.
// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk.
// For a range query, aggregates output in the seriess map.
func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) {
op := e.Op
var s Sample
var annos annotations.Annotations
for i := range groups {
groups[i].seen = false
}
for si := range inputMatrix {
f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si])
if !ok {
continue
}
s = Sample{Metric: inputMatrix[si].Metric, F: f}
group := &groups[seriesToResult[si]]
// Initialize this group if it's the first time we've seen it.
if !group.seen {
*group = groupedAggregation{
seen: true,
heap: make(vectorByValueHeap, 1, k),
}
group.heap[0] = s
continue
}
switch op {
case parser.TOPK:
// We build a heap of up to k elements, with the smallest element at heap[0].
switch {
case len(group.heap) < k:
heap.Push(&group.heap, &s)
case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
// This new element is bigger than the previous smallest element - overwrite that.
group.heap[0] = s
if k > 1 {
heap.Fix(&group.heap, 0) // Maintain the heap invariant.
}
}
case parser.BOTTOMK:
// We build a heap of up to k elements, with the biggest element at heap[0].
switch {
case len(group.heap) < k:
heap.Push((*vectorByReverseValueHeap)(&group.heap), &s)
case group.heap[0].F > s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
// This new element is smaller than the previous biggest element - overwrite that.
group.heap[0] = s
if k > 1 {
heap.Fix((*vectorByReverseValueHeap)(&group.heap), 0) // Maintain the heap invariant.
}
}
default:
panic(fmt.Errorf("expected aggregation operator but got %q", op))
}
}
// Construct the result from the aggregated groups.
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
var mat Matrix
if ev.endTimestamp == ev.startTimestamp {
mat = make(Matrix, 0, len(groups))
}
add := func(lbls labels.Labels, f float64) {
// If this could be an instant query, add directly to the matrix so the result is in consistent order.
if ev.endTimestamp == ev.startTimestamp {
mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}})
} else {
// Otherwise the results are added into seriess elements.
hash := lbls.Hash()
ss, ok := seriess[hash]
if !ok {
ss = Series{Metric: lbls}
}
addToSeries(&ss, enh.Ts, f, nil, numSteps)
seriess[hash] = ss
}
}
for _, aggr := range groups {
if !aggr.seen {
continue
}
switch op {
case parser.TOPK:
// The heap keeps the lowest value on top, so reverse it.
if len(aggr.heap) > 1 {
sort.Sort(sort.Reverse(aggr.heap))
}
for _, v := range aggr.heap {
add(v.Metric, v.F)
}
case parser.BOTTOMK:
// The heap keeps the highest value on top, so reverse it.
if len(aggr.heap) > 1 {
sort.Sort(sort.Reverse((*vectorByReverseValueHeap)(&aggr.heap)))
}
for _, v := range aggr.heap {
add(v.Metric, v.F)
}
}
}
return mat, annos
}
// aggregationK evaluates count_values on vec.
// Outputs as many series per group as there are values in the input.
func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
type groupCount struct {
labels labels.Labels
count int
}
result := map[uint64]*groupCount{}
var buf []byte
for _, s := range vec {
enh.resetBuilder(s.Metric)
enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64))
metric := enh.lb.Labels()
// Considering the count_values()
// operator is less frequently used than other aggregations, we're fine having to
// re-compute the grouping key on each step for this case.
var groupingKey uint64
groupingKey, buf = generateGroupingKey(metric, grouping, e.Without, buf)
group, ok := result[groupingKey]
// Add a new group if it doesn't exist.
if !ok {
result[groupingKey] = &groupCount{
labels: generateGroupingLabels(enh, metric, e.Without, grouping),
count: 1,
}
continue
}
group.count++
}
// Construct the result Vector from the aggregated groups.
for _, aggr := range result {
enh.Out = append(enh.Out, Sample{
Metric: aggr.labels,
F: aggr.floatValue,
H: aggr.histogramValue,
F: float64(aggr.count),
})
}
return enh.Out, annos
return enh.Out, nil
}
func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, numSteps int) {
if h == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
}
ss.Floats = append(ss.Floats, FPoint{T: ts, F: f})
return
}
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: h})
}
func (ev *evaluator) nextValues(ts int64, series *Series) (f float64, h *histogram.FloatHistogram, b bool) {
switch {
case len(series.Floats) > 0 && series.Floats[0].T == ts:
f = series.Floats[0].F
series.Floats = series.Floats[1:] // Move input vectors forward
case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
h = series.Histograms[0].H
series.Histograms = series.Histograms[1:]
default:
return f, h, false
}
return f, h, true
}
// groupingKey builds and returns the grouping key for the given metric and
@ -2969,6 +3118,21 @@ func generateGroupingKey(metric labels.Labels, grouping []string, without bool,
return metric.HashForLabels(buf, grouping...)
}
func generateGroupingLabels(enh *EvalNodeHelper, metric labels.Labels, without bool, grouping []string) labels.Labels {
enh.resetBuilder(metric)
switch {
case without:
enh.lb.Del(grouping...)
enh.lb.Del(labels.MetricName)
return enh.lb.Labels()
case len(grouping) > 0:
enh.lb.Keep(grouping...)
return enh.lb.Labels()
default:
return labels.EmptyLabels()
}
}
// btos returns 1 if b is true, 0 otherwise.
func btos(b bool) float64 {
if b {

View File

@ -966,7 +966,7 @@ load 10s
{
Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))",
Start: time.Unix(201, 0),
PeakSamples: 8,
PeakSamples: 7,
TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,