mirror of https://github.com/prometheus/prometheus
promql: make avg aggregation more precise and less expensive
The basic idea here is that the previous code was always doing incremental calculation of the mean value, which is more costly and can be less precise. It protects against overflows, but in most cases, an overflow doesn't happen anyway. The other idea applied here is to expand on #14074, where Kahan summation was applied to sum(). With this commit, the average is calculated in a conventional way (adding everything up and divide in the end) as long as the sum isn't overflowing float64. This is combined with Kahan summation so that the avg aggregation, in most cases, is really equivalent to the sum aggregation with a following division (which is the user's expectation as avg is supposed to be syntactic sugar for sum with a following divison). If the sum hits ±Inf, the calculation reverts to incremental calculation of the mean value. Kahan summation is also applied here, although it cannot fully compensate for the numerical errors introduced by the incremental mean calculation. (The tests added in this commit would fail if incremental mean calculation was always used.) Signed-off-by: beorn7 <beorn@grafana.com>pull/14413/head
parent
b5b04ddbe3
commit
c46074f4dd
|
@ -2773,15 +2773,19 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram
|
|||
}
|
||||
|
||||
type groupedAggregation struct {
|
||||
floatValue float64
|
||||
histogramValue *histogram.FloatHistogram
|
||||
floatMean float64
|
||||
floatKahanC float64 // "Compensating value" for Kahan summation.
|
||||
groupCount float64
|
||||
heap vectorByValueHeap
|
||||
|
||||
// All bools together for better packing within the struct.
|
||||
seen bool // Was this output groups seen in the input at this timestamp.
|
||||
hasFloat bool // Has at least 1 float64 sample aggregated.
|
||||
hasHistogram bool // Has at least 1 histogram sample aggregated.
|
||||
floatValue float64
|
||||
histogramValue *histogram.FloatHistogram
|
||||
floatMean float64 // Mean, or "compensating value" for Kahan summation.
|
||||
groupCount float64
|
||||
groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group.
|
||||
heap vectorByValueHeap
|
||||
incrementalMean bool // True after reverting to incremental calculation of the mean value.
|
||||
}
|
||||
|
||||
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
|
||||
|
@ -2807,13 +2811,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|||
*group = groupedAggregation{
|
||||
seen: true,
|
||||
floatValue: f,
|
||||
floatMean: f,
|
||||
groupCount: 1,
|
||||
}
|
||||
switch op {
|
||||
case parser.AVG:
|
||||
group.floatMean = f
|
||||
fallthrough
|
||||
case parser.SUM:
|
||||
case parser.AVG, parser.SUM:
|
||||
if h == nil {
|
||||
group.hasFloat = true
|
||||
} else {
|
||||
|
@ -2821,7 +2823,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|||
group.hasHistogram = true
|
||||
}
|
||||
case parser.STDVAR, parser.STDDEV:
|
||||
group.floatMean = f
|
||||
group.floatValue = 0
|
||||
case parser.QUANTILE:
|
||||
group.heap = make(vectorByValueHeap, 1)
|
||||
|
@ -2847,7 +2848,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|||
// point in copying the histogram in that case.
|
||||
} else {
|
||||
group.hasFloat = true
|
||||
group.floatValue, group.floatMean = kahanSumInc(f, group.floatValue, group.floatMean)
|
||||
group.floatValue, group.floatKahanC = kahanSumInc(f, group.floatValue, group.floatKahanC)
|
||||
}
|
||||
|
||||
case parser.AVG:
|
||||
|
@ -2871,6 +2872,22 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|||
// point in copying the histogram in that case.
|
||||
} else {
|
||||
group.hasFloat = true
|
||||
if !group.incrementalMean {
|
||||
newV, newC := kahanSumInc(f, group.floatValue, group.floatKahanC)
|
||||
if !math.IsInf(newV, 0) {
|
||||
// The sum doesn't overflow, so we propagate it to the
|
||||
// group struct and continue with the regular
|
||||
// calculation of the mean value.
|
||||
group.floatValue, group.floatKahanC = newV, newC
|
||||
break
|
||||
}
|
||||
// If we are here, we know that the sum _would_ overflow. So
|
||||
// instead of continue to sum up, we revert to incremental
|
||||
// calculation of the mean value from here on.
|
||||
group.incrementalMean = true
|
||||
group.floatMean = group.floatValue / (group.groupCount - 1)
|
||||
group.floatKahanC /= group.groupCount - 1
|
||||
}
|
||||
if math.IsInf(group.floatMean, 0) {
|
||||
if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) {
|
||||
// The `floatMean` and `s.F` values are `Inf` of the same sign. They
|
||||
|
@ -2888,8 +2905,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|||
break
|
||||
}
|
||||
}
|
||||
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
|
||||
group.floatMean += f/group.groupCount - group.floatMean/group.groupCount
|
||||
currentMean := group.floatMean + group.floatKahanC
|
||||
group.floatMean, group.floatKahanC = kahanSumInc(
|
||||
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
|
||||
f/group.groupCount-currentMean/group.groupCount,
|
||||
group.floatMean,
|
||||
group.floatKahanC,
|
||||
)
|
||||
}
|
||||
|
||||
case parser.GROUP:
|
||||
|
@ -2938,10 +2960,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|||
annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange()))
|
||||
continue
|
||||
}
|
||||
if aggr.hasHistogram {
|
||||
switch {
|
||||
case aggr.hasHistogram:
|
||||
aggr.histogramValue = aggr.histogramValue.Compact(0)
|
||||
} else {
|
||||
aggr.floatValue = aggr.floatMean
|
||||
case aggr.incrementalMean:
|
||||
aggr.floatValue = aggr.floatMean + aggr.floatKahanC
|
||||
default:
|
||||
aggr.floatValue = (aggr.floatValue + aggr.floatKahanC) / aggr.groupCount
|
||||
}
|
||||
|
||||
case parser.COUNT:
|
||||
|
@ -2965,7 +2990,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
|||
if aggr.hasHistogram {
|
||||
aggr.histogramValue.Compact(0)
|
||||
} else {
|
||||
aggr.floatValue += aggr.floatMean // Add Kahan summation compensating term.
|
||||
aggr.floatValue += aggr.floatKahanC
|
||||
}
|
||||
default:
|
||||
// For other aggregations, we already have the right value.
|
||||
|
|
|
@ -503,7 +503,7 @@ eval instant at 1m avg(data{test="-big"})
|
|||
eval instant at 1m avg(data{test="bigzero"})
|
||||
{} 0
|
||||
|
||||
# Test summing extreme values.
|
||||
# Test summing and averaging extreme values.
|
||||
clear
|
||||
|
||||
load 10s
|
||||
|
@ -529,21 +529,39 @@ load 10s
|
|||
eval instant at 1m sum(data{test="ten"})
|
||||
{} 10
|
||||
|
||||
eval instant at 1m avg(data{test="ten"})
|
||||
{} 2.5
|
||||
|
||||
eval instant at 1m sum by (group) (data{test="pos_inf"})
|
||||
{group="1"} Inf
|
||||
{group="2"} Inf
|
||||
|
||||
eval instant at 1m avg by (group) (data{test="pos_inf"})
|
||||
{group="1"} Inf
|
||||
{group="2"} Inf
|
||||
|
||||
eval instant at 1m sum by (group) (data{test="neg_inf"})
|
||||
{group="1"} -Inf
|
||||
{group="2"} -Inf
|
||||
|
||||
eval instant at 1m avg by (group) (data{test="neg_inf"})
|
||||
{group="1"} -Inf
|
||||
{group="2"} -Inf
|
||||
|
||||
eval instant at 1m sum(data{test="inf_inf"})
|
||||
{} NaN
|
||||
|
||||
eval instant at 1m avg(data{test="inf_inf"})
|
||||
{} NaN
|
||||
|
||||
eval instant at 1m sum by (group) (data{test="nan"})
|
||||
{group="1"} NaN
|
||||
{group="2"} NaN
|
||||
|
||||
eval instant at 1m avg by (group) (data{test="nan"})
|
||||
{group="1"} NaN
|
||||
{group="2"} NaN
|
||||
|
||||
clear
|
||||
|
||||
# Test that aggregations are deterministic.
|
||||
|
|
Loading…
Reference in New Issue