@ -1464,12 +1464,12 @@ func intersection(ls1, ls2 labels.Labels) labels.Labels {
}
}
type groupedAggregation struct {
type groupedAggregation struct {
labels labels . Labels
labels labels . Labels
value float64
value float64
valuesSquaredSum float64
mean float64
groupCount int
groupCount int
heap vectorByValueHeap
heap vectorByValueHeap
reverseHeap vectorByReverseValueHeap
reverseHeap vectorByReverseValueHeap
}
}
// aggregation evaluates an aggregation operation on a Vector.
// aggregation evaluates an aggregation operation on a Vector.
@ -1540,17 +1540,19 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p
sort . Sort ( m )
sort . Sort ( m )
}
}
result [ groupingKey ] = & groupedAggregation {
result [ groupingKey ] = & groupedAggregation {
labels : m ,
labels : m ,
value : s . V ,
value : s . V ,
valuesSquaredSum : s . V * s . V ,
mean : s . V ,
groupCount : 1 ,
groupCount : 1 ,
}
}
inputVecLen := int64 ( len ( vec ) )
inputVecLen := int64 ( len ( vec ) )
resultSize := k
resultSize := k
if k > inputVecLen {
if k > inputVecLen {
resultSize = inputVecLen
resultSize = inputVecLen
}
}
if op == itemTopK || op == itemQuantile {
if op == itemStdvar || op == itemStddev {
result [ groupingKey ] . value = 0.0
} else if op == itemTopK || op == itemQuantile {
result [ groupingKey ] . heap = make ( vectorByValueHeap , 0 , resultSize )
result [ groupingKey ] . heap = make ( vectorByValueHeap , 0 , resultSize )
heap . Push ( & result [ groupingKey ] . heap , & Sample {
heap . Push ( & result [ groupingKey ] . heap , & Sample {
Point : Point { V : s . V } ,
Point : Point { V : s . V } ,
@ -1571,8 +1573,8 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p
group . value += s . V
group . value += s . V
case itemAvg :
case itemAvg :
group . value += s . V
group . groupCount ++
group . groupCount ++
group . mean += ( s . V - group . mean ) / float64 ( group . groupCount )
case itemMax :
case itemMax :
if group . value < s . V || math . IsNaN ( group . value ) {
if group . value < s . V || math . IsNaN ( group . value ) {
@ -1588,9 +1590,10 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p
group . groupCount ++
group . groupCount ++
case itemStdvar , itemStddev :
case itemStdvar , itemStddev :
group . value += s . V
group . valuesSquaredSum += s . V * s . V
group . groupCount ++
group . groupCount ++
delta := s . V - group . mean
group . mean += delta / float64 ( group . groupCount )
group . value += delta * ( s . V - group . mean )
case itemTopK :
case itemTopK :
if int64 ( len ( group . heap ) ) < k || group . heap [ 0 ] . V < s . V || math . IsNaN ( group . heap [ 0 ] . V ) {
if int64 ( len ( group . heap ) ) < k || group . heap [ 0 ] . V < s . V || math . IsNaN ( group . heap [ 0 ] . V ) {
@ -1626,18 +1629,16 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p
for _ , aggr := range result {
for _ , aggr := range result {
switch op {
switch op {
case itemAvg :
case itemAvg :
aggr . value = aggr . value / float64 ( aggr . groupCount )
aggr . value = aggr . mean
case itemCount , itemCountValues :
case itemCount , itemCountValues :
aggr . value = float64 ( aggr . groupCount )
aggr . value = float64 ( aggr . groupCount )
case itemStdvar :
case itemStdvar :
avg := aggr . value / float64 ( aggr . groupCount )
aggr . value = aggr . value / float64 ( aggr . groupCount )
aggr . value = aggr . valuesSquaredSum / float64 ( aggr . groupCount ) - avg * avg
case itemStddev :
case itemStddev :
avg := aggr . value / float64 ( aggr . groupCount )
aggr . value = math . Sqrt ( aggr . value / float64 ( aggr . groupCount ) )
aggr . value = math . Sqrt ( aggr . valuesSquaredSum / float64 ( aggr . groupCount ) - avg * avg )
case itemTopK :
case itemTopK :
// The heap keeps the lowest value on top, so reverse it.
// The heap keeps the lowest value on top, so reverse it.