@ -1299,6 +1299,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
groupToResultIndex := make ( map [ uint64 ] int )
seriesToResult := make ( [ ] int , len ( inputMatrix ) )
orderedResult := make ( [ ] * groupedAggregation , 0 , 16 )
var result Matrix
for si , series := range inputMatrix {
var groupingKey uint64
@ -1306,8 +1307,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
index , ok := groupToResultIndex [ groupingKey ]
// Add a new group if it doesn't exist.
if ! ok {
if aggExpr . Op != parser . TOPK && aggExpr . Op != parser . BOTTOMK {
m := generateGroupingLabels ( enh , series . Metric , aggExpr . Without , sortedGrouping )
newAgg := & groupedAggregation { labels : m }
result = append ( result , Series { Metric : m } )
}
newAgg := & groupedAggregation { }
index = len ( orderedResult )
groupToResultIndex [ groupingKey ] = index
orderedResult = append ( orderedResult , newAgg )
@ -1315,7 +1319,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
seriesToResult [ si ] = index
}
seriess := make ( map [ uint64 ] Series , len ( inputMatrix ) ) // Output series by series hash.
var seriess map [ uint64 ] Series
switch aggExpr . Op {
case parser . TOPK , parser . BOTTOMK :
seriess = make ( map [ uint64 ] Series , len ( inputMatrix ) ) // Output series by series hash.
}
for ts := ev . startTimestamp ; ts <= ev . endTimestamp ; ts += ev . interval {
if err := contextDone ( ev . ctx , "expression evaluation" ) ; err != nil {
@ -1326,25 +1334,44 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// Make the function call.
enh . Ts = ts
result , ws := ev . aggregation ( aggExpr , param , inputMatrix , seriesToResult , orderedResult , enh , seriess )
warnings . Merge ( ws )
var ws annotations . Annotations
switch aggExpr . Op {
case parser . TOPK , parser . BOTTOMK :
result , ws = ev . aggregationK ( aggExpr , param , inputMatrix , seriesToResult , orderedResult , enh , seriess )
// If this could be an instant query, shortcut so as not to change sort order.
if ev . endTimestamp == ev . startTimestamp {
return result , warning s
return result , ws
}
default :
ws = ev . aggregation ( aggExpr , param , inputMatrix , result , seriesToResult , orderedResult , enh )
}
warnings . Merge ( ws )
if ev . currentSamples > ev . maxSamples {
ev . error ( ErrTooManySamples ( env ) )
}
}
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make ( Matrix , 0 , len ( seriess ) )
switch aggExpr . Op {
case parser . TOPK , parser . BOTTOMK :
result = make ( Matrix , 0 , len ( seriess ) )
for _ , ss := range seriess {
mat = append ( mat , ss )
resul t = append ( resul t, ss )
}
return mat , warnings
default :
// Remove empty result rows.
dst := 0
for _ , series := range result {
if len ( series . Floats ) > 0 || len ( series . Histograms ) > 0 {
result [ dst ] = series
dst ++
}
}
result = result [ : dst ]
}
return result , warnings
}
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
@ -2698,25 +2725,14 @@ type groupedAggregation struct {
reverseHeap vectorByReverseValueHeap
}
// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
// must be sorted.
func ( ev * evaluator ) aggregation ( e * parser . AggregateExpr , q float64 , inputMatrix Matrix , seriesToResult [ ] int , orderedResult [ ] * groupedAggregation , enh * EvalNodeHelper , seriess map [ uint64 ] Series ) ( Matrix , annotations . Annotations ) {
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
// These functions produce one output series for each group specified in the expression, with just the labels from `by(...)`.
// outputMatrix should be already populated with grouping labels; groups is one-to-one with outputMatrix.
// seriesToResult maps inputMatrix indexes to outputMatrix indexes.
func ( ev * evaluator ) aggregation ( e * parser . AggregateExpr , q float64 , inputMatrix , outputMatrix Matrix , seriesToResult [ ] int , orderedResult [ ] * groupedAggregation , enh * EvalNodeHelper ) annotations . Annotations {
op := e . Op
var annos annotations . Annotations
seen := make ( [ ] bool , len ( orderedResult ) ) // Which output groups were seen in the input at this timestamp.
k := 1
if op == parser . TOPK || op == parser . BOTTOMK {
if ! convertibleToInt64 ( q ) {
ev . errorf ( "Scalar value %v overflows int64" , q )
}
k = int ( q )
if k > len ( inputMatrix ) {
k = len ( inputMatrix )
}
if k < 1 {
return nil , annos
}
}
if op == parser . QUANTILE {
if math . IsNaN ( q ) || q < 0 || q > 1 {
annos . Add ( annotations . NewInvalidQuantileWarning ( q , e . Param . PositionRange ( ) ) )
@ -2733,7 +2749,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
// Initialize this group if it's the first time we've seen it.
if ! seen [ seriesToResult [ si ] ] {
* group = groupedAggregation {
labels : group . labels ,
floatValue : s . F ,
floatMean : s . F ,
groupCount : 1 ,
@ -2754,18 +2769,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
switch op {
case parser . STDVAR , parser . STDDEV :
group . floatValue = 0
case parser . TOPK , parser . QUANTILE :
group . heap = make ( vectorByValueHeap , 1 , k )
case parser . QUANTILE :
group . heap = make ( vectorByValueHeap , 1 )
group . heap [ 0 ] = Sample {
F : s . F ,
Metric : s . Metric ,
}
case parser . BOTTOMK :
group . reverseHeap = make ( vectorByReverseValueHeap , 1 , k )
group . reverseHeap [ 0 ] = Sample {
F : s . F ,
Metric : s . Metric ,
}
case parser . GROUP :
group . floatValue = 1
}
@ -2848,20 +2857,118 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
group . floatValue += delta * ( s . F - group . floatMean )
}
case parser . QUANTILE :
group . heap = append ( group . heap , s )
default :
panic ( fmt . Errorf ( "expected aggregation operator but got %q" , op ) )
}
}
// Construct the output matrix from the aggregated groups.
numSteps := int ( ( ev . endTimestamp - ev . startTimestamp ) / ev . interval ) + 1
for ri , aggr := range orderedResult {
if ! seen [ ri ] {
continue
}
switch op {
case parser . AVG :
if aggr . hasFloat && aggr . hasHistogram {
// We cannot aggregate histogram sample with a float64 sample.
annos . Add ( annotations . NewMixedFloatsHistogramsAggWarning ( e . Expr . PositionRange ( ) ) )
continue
}
if aggr . hasHistogram {
aggr . histogramValue = aggr . histogramMean . Compact ( 0 )
} else {
aggr . floatValue = aggr . floatMean
}
case parser . COUNT :
aggr . floatValue = float64 ( aggr . groupCount )
case parser . STDVAR :
aggr . floatValue /= float64 ( aggr . groupCount )
case parser . STDDEV :
aggr . floatValue = math . Sqrt ( aggr . floatValue / float64 ( aggr . groupCount ) )
case parser . QUANTILE :
aggr . floatValue = quantile ( q , aggr . heap )
case parser . SUM :
if aggr . hasFloat && aggr . hasHistogram {
// We cannot aggregate histogram sample with a float64 sample.
annos . Add ( annotations . NewMixedFloatsHistogramsAggWarning ( e . Expr . PositionRange ( ) ) )
continue
}
if aggr . hasHistogram {
aggr . histogramValue . Compact ( 0 )
}
default :
// For other aggregations, we already have the right value.
}
ss := & outputMatrix [ ri ]
addToSeries ( ss , enh . Ts , aggr . floatValue , aggr . histogramValue , numSteps )
}
return annos
}
// aggregationK evaluates topk or bottomk at one timestep on inputMatrix.
// Output that has the same labels as the input, but just k of them per group.
// seriesToResult maps inputMatrix indexes to groups indexes.
// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk.
// For a range query, aggregates output in the seriess map.
func ( ev * evaluator ) aggregationK ( e * parser . AggregateExpr , q float64 , inputMatrix Matrix , seriesToResult [ ] int , orderedResult [ ] * groupedAggregation , enh * EvalNodeHelper , seriess map [ uint64 ] Series ) ( Matrix , annotations . Annotations ) {
op := e . Op
var annos annotations . Annotations
seen := make ( [ ] bool , len ( orderedResult ) ) // Which output groups were seen in the input at this timestamp.
if ! convertibleToInt64 ( q ) {
ev . errorf ( "Scalar value %v overflows int64" , q )
}
k := int ( q )
if k > len ( inputMatrix ) {
k = len ( inputMatrix )
}
if k < 1 {
return nil , annos
}
for si := range inputMatrix {
s , ok := ev . nextSample ( enh . Ts , inputMatrix , si )
if ! ok {
continue
}
group := orderedResult [ seriesToResult [ si ] ]
// Initialize this group if it's the first time we've seen it.
if ! seen [ seriesToResult [ si ] ] {
* group = groupedAggregation { }
switch op {
case parser . TOPK :
group . heap = make ( vectorByValueHeap , 1 , k )
group . heap [ 0 ] = s
case parser . BOTTOMK :
group . reverseHeap = make ( vectorByReverseValueHeap , 1 , k )
group . reverseHeap [ 0 ] = s
}
seen [ seriesToResult [ si ] ] = true
continue
}
switch op {
case parser . TOPK :
// We build a heap of up to k elements, with the smallest element at heap[0].
switch {
case len ( group . heap ) < k :
heap . Push ( & group . heap , & Sample {
F : s . F ,
Metric : s . Metric ,
} )
heap . Push ( & group . heap , & s )
case group . heap [ 0 ] . F < s . F || ( math . IsNaN ( group . heap [ 0 ] . F ) && ! math . IsNaN ( s . F ) ) :
// This new element is bigger than the previous smallest element - overwrite that.
group . heap [ 0 ] = Sample {
F : s . F ,
Metric : s . Metric ,
}
group . heap [ 0 ] = s
if k > 1 {
heap . Fix ( & group . heap , 0 ) // Maintain the heap invariant.
}
@ -2871,24 +2978,15 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
// We build a heap of up to k elements, with the biggest element at heap[0].
switch {
case len ( group . reverseHeap ) < k :
heap . Push ( & group . reverseHeap , & Sample {
F : s . F ,
Metric : s . Metric ,
} )
heap . Push ( & group . reverseHeap , & s )
case group . reverseHeap [ 0 ] . F > s . F || ( math . IsNaN ( group . reverseHeap [ 0 ] . F ) && ! math . IsNaN ( s . F ) ) :
// This new element is smaller than the previous biggest element - overwrite that.
group . reverseHeap [ 0 ] = Sample {
F : s . F ,
Metric : s . Metric ,
}
group . reverseHeap [ 0 ] = s
if k > 1 {
heap . Fix ( & group . reverseHeap , 0 ) // Maintain the heap invariant.
}
}
case parser . QUANTILE :
group . heap = append ( group . heap , s )
default :
panic ( fmt . Errorf ( "expected aggregation operator but got %q" , op ) )
}
@ -2901,14 +2999,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
mat = make ( Matrix , 0 , len ( orderedResult ) )
}
add := func ( lbls labels . Labels , f float64 , h * histogram . FloatHistogram ) {
add := func ( lbls labels . Labels , f float64 ) {
// If this could be an instant query, add directly to the matrix so the result is in consistent order.
if ev . endTimestamp == ev . startTimestamp {
if h == nil {
mat = append ( mat , Series { Metric : lbls , Floats : [ ] FPoint { { T : enh . Ts , F : f } } } )
} else {
mat = append ( mat , Series { Metric : lbls , Histograms : [ ] HPoint { { T : enh . Ts , H : h } } } )
}
} else {
// Otherwise the results are added into seriess elements.
hash := lbls . Hash ( )
@ -2916,7 +3010,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
if ! ok {
ss = Series { Metric : lbls }
}
addToSeries ( & ss , enh . Ts , f , h , numSteps )
addToSeries ( & ss , enh . Ts , f , nil , numSteps )
seriess [ hash ] = ss
}
}
@ -2925,36 +3019,14 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
continue
}
switch op {
case parser . AVG :
if aggr . hasFloat && aggr . hasHistogram {
// We cannot aggregate histogram sample with a float64 sample.
annos . Add ( annotations . NewMixedFloatsHistogramsAggWarning ( e . Expr . PositionRange ( ) ) )
continue
}
if aggr . hasHistogram {
aggr . histogramValue = aggr . histogramMean . Compact ( 0 )
} else {
aggr . floatValue = aggr . floatMean
}
case parser . COUNT :
aggr . floatValue = float64 ( aggr . groupCount )
case parser . STDVAR :
aggr . floatValue /= float64 ( aggr . groupCount )
case parser . STDDEV :
aggr . floatValue = math . Sqrt ( aggr . floatValue / float64 ( aggr . groupCount ) )
case parser . TOPK :
// The heap keeps the lowest value on top, so reverse it.
if len ( aggr . heap ) > 1 {
sort . Sort ( sort . Reverse ( aggr . heap ) )
}
for _ , v := range aggr . heap {
add ( v . Metric , v . F , nil )
add ( v . Metric , v . F )
}
continue // Bypass default append.
case parser . BOTTOMK :
// The heap keeps the highest value on top, so reverse it.
@ -2962,27 +3034,9 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
sort . Sort ( sort . Reverse ( aggr . reverseHeap ) )
}
for _ , v := range aggr . reverseHeap {
add ( v . Metric , v . F , nil )
}
continue // Bypass default append.
case parser . QUANTILE :
aggr . floatValue = quantile ( q , aggr . heap )
case parser . SUM :
if aggr . hasFloat && aggr . hasHistogram {
// We cannot aggregate histogram sample with a float64 sample.
annos . Add ( annotations . NewMixedFloatsHistogramsAggWarning ( e . Expr . PositionRange ( ) ) )
continue
add ( v . Metric , v . F )
}
if aggr . hasHistogram {
aggr . histogramValue . Compact ( 0 )
}
default :
// For other aggregations, we already have the right value.
}
add ( aggr . labels , aggr . floatValue , aggr . histogramValue )
}
return mat , annos