|
|
@ -2786,11 +2786,12 @@ type groupedAggregation struct {
|
|
|
|
heap vectorByValueHeap
|
|
|
|
heap vectorByValueHeap
|
|
|
|
|
|
|
|
|
|
|
|
// All bools together for better packing within the struct.
|
|
|
|
// All bools together for better packing within the struct.
|
|
|
|
seen bool // Was this output groups seen in the input at this timestamp.
|
|
|
|
seen bool // Was this output groups seen in the input at this timestamp.
|
|
|
|
hasFloat bool // Has at least 1 float64 sample aggregated.
|
|
|
|
hasFloat bool // Has at least 1 float64 sample aggregated.
|
|
|
|
hasHistogram bool // Has at least 1 histogram sample aggregated.
|
|
|
|
hasHistogram bool // Has at least 1 histogram sample aggregated.
|
|
|
|
groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group.
|
|
|
|
incompatibleHistograms bool // If true, group has seen mixed exponential and custom buckets, or incompatible custom buckets.
|
|
|
|
incrementalMean bool // True after reverting to incremental calculation of the mean value.
|
|
|
|
groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group.
|
|
|
|
|
|
|
|
incrementalMean bool // True after reverting to incremental calculation of the mean value.
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
|
|
|
|
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
|
|
|
@ -2814,10 +2815,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|
|
|
// Initialize this group if it's the first time we've seen it.
|
|
|
|
// Initialize this group if it's the first time we've seen it.
|
|
|
|
if !group.seen {
|
|
|
|
if !group.seen {
|
|
|
|
*group = groupedAggregation{
|
|
|
|
*group = groupedAggregation{
|
|
|
|
seen: true,
|
|
|
|
seen: true,
|
|
|
|
floatValue: f,
|
|
|
|
floatValue: f,
|
|
|
|
floatMean: f,
|
|
|
|
floatMean: f,
|
|
|
|
groupCount: 1,
|
|
|
|
incompatibleHistograms: false,
|
|
|
|
|
|
|
|
groupCount: 1,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
switch op {
|
|
|
|
switch op {
|
|
|
|
case parser.AVG, parser.SUM:
|
|
|
|
case parser.AVG, parser.SUM:
|
|
|
@ -2838,6 +2840,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if group.incompatibleHistograms {
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
switch op {
|
|
|
|
switch op {
|
|
|
|
case parser.SUM:
|
|
|
|
case parser.SUM:
|
|
|
|
if h != nil {
|
|
|
|
if h != nil {
|
|
|
@ -2846,6 +2852,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|
|
|
_, err := group.histogramValue.Add(h)
|
|
|
|
_, err := group.histogramValue.Add(h)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
|
|
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
|
|
|
|
|
|
|
group.incompatibleHistograms = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Otherwise the aggregation contained floats
|
|
|
|
// Otherwise the aggregation contained floats
|
|
|
@ -2866,10 +2873,14 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|
|
|
toAdd, err := left.Sub(right)
|
|
|
|
toAdd, err := left.Sub(right)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
|
|
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
|
|
|
|
|
|
|
group.incompatibleHistograms = true
|
|
|
|
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_, err = group.histogramValue.Add(toAdd)
|
|
|
|
_, err = group.histogramValue.Add(toAdd)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
|
|
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
|
|
|
|
|
|
|
group.incompatibleHistograms = true
|
|
|
|
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Otherwise the aggregation contained floats
|
|
|
|
// Otherwise the aggregation contained floats
|
|
|
@ -2966,6 +2977,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
switch {
|
|
|
|
switch {
|
|
|
|
|
|
|
|
case aggr.incompatibleHistograms:
|
|
|
|
|
|
|
|
continue
|
|
|
|
case aggr.hasHistogram:
|
|
|
|
case aggr.hasHistogram:
|
|
|
|
aggr.histogramValue = aggr.histogramValue.Compact(0)
|
|
|
|
aggr.histogramValue = aggr.histogramValue.Compact(0)
|
|
|
|
case aggr.incrementalMean:
|
|
|
|
case aggr.incrementalMean:
|
|
|
@ -2992,9 +3005,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|
|
|
annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange()))
|
|
|
|
annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange()))
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if aggr.hasHistogram {
|
|
|
|
switch {
|
|
|
|
|
|
|
|
case aggr.incompatibleHistograms:
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
case aggr.hasHistogram:
|
|
|
|
aggr.histogramValue.Compact(0)
|
|
|
|
aggr.histogramValue.Compact(0)
|
|
|
|
} else {
|
|
|
|
default:
|
|
|
|
aggr.floatValue += aggr.floatKahanC
|
|
|
|
aggr.floatValue += aggr.floatKahanC
|
|
|
|
}
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
default:
|
|
|
|