Implement a COUNT ... BY aggregation operator.

This also removes the now obsolete scalar count() function and corrects the
expressions test naming (broken in
2202cd71c9 (L6R59))
so that the expression tests will actually run.
pull/233/head
Julius Volz 12 years ago
parent 6551356af4
commit 0877680761

@ -82,6 +82,7 @@ const (
AVG AVG
MIN MIN
MAX MAX
COUNT
) )
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@ -317,8 +318,13 @@ func labelIntersection(metric1, metric2 model.Metric) model.Metric {
func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[string]*groupedAggregation, timestamp time.Time) Vector { func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[string]*groupedAggregation, timestamp time.Time) Vector {
vector := Vector{} vector := Vector{}
for _, aggregation := range aggregations { for _, aggregation := range aggregations {
if node.aggrType == AVG { switch node.aggrType {
case AVG:
aggregation.value = aggregation.value / model.SampleValue(aggregation.groupCount) aggregation.value = aggregation.value / model.SampleValue(aggregation.groupCount)
case COUNT:
aggregation.value = model.SampleValue(aggregation.groupCount)
default:
// For other aggregations, we already have the right value.
} }
sample := model.Sample{ sample := model.Sample{
Metric: aggregation.labels, Metric: aggregation.labels,
@ -351,6 +357,10 @@ func (node *VectorAggregation) Eval(timestamp time.Time, view *viewAdapter) Vect
if groupedResult.value > sample.Value { if groupedResult.value > sample.Value {
groupedResult.value = sample.Value groupedResult.value = sample.Value
} }
case COUNT:
groupedResult.groupCount++
default:
panic("Unknown aggregation type")
} }
} else { } else {
result[groupingKey] = &groupedAggregation{ result[groupingKey] = &groupedAggregation{

@ -69,11 +69,6 @@ func timeImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
return model.SampleValue(time.Now().Unix()) return model.SampleValue(time.Now().Unix())
} }
// === count(vector VectorNode) model.SampleValue ===
func countImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
return model.SampleValue(len(args[0].(VectorNode).Eval(timestamp, view)))
}
// === delta(matrix MatrixNode, isCounter ScalarNode) Vector === // === delta(matrix MatrixNode, isCounter ScalarNode) Vector ===
func deltaImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} { func deltaImpl(timestamp time.Time, view *viewAdapter, args []Node) interface{} {
matrixNode := args[0].(MatrixNode) matrixNode := args[0].(MatrixNode)
@ -253,12 +248,6 @@ func sampleVectorImpl(timestamp time.Time, view *viewAdapter, args []Node) inter
} }
var functions = map[string]*Function{ var functions = map[string]*Function{
"count": {
name: "count",
argTypes: []ExprType{VECTOR},
returnType: SCALAR,
callFn: countImpl,
},
"delta": { "delta": {
name: "delta", name: "delta",
argTypes: []ExprType{MATRIX, SCALAR}, argTypes: []ExprType{MATRIX, SCALAR},

@ -54,6 +54,7 @@ func (aggrType AggrType) String() string {
AVG: "AVG", AVG: "AVG",
MIN: "MIN", MIN: "MIN",
MAX: "MAX", MAX: "MAX",
COUNT: "COUNT",
} }
return aggrTypeMap[aggrType] return aggrTypeMap[aggrType]
} }

@ -59,6 +59,7 @@ func NewVectorAggregation(aggrTypeStr string, vector ast.Node, groupBy []model.L
"MAX": ast.MAX, "MAX": ast.MAX,
"MIN": ast.MIN, "MIN": ast.MIN,
"AVG": ast.AVG, "AVG": ast.AVG,
"COUNT": ast.COUNT,
} }
aggrType, ok := aggrTypes[aggrTypeStr] aggrType, ok := aggrTypes[aggrTypeStr]
if !ok { if !ok {

@ -44,8 +44,8 @@ WITH|with { return WITH }
PERMANENT|permanent { return PERMANENT } PERMANENT|permanent { return PERMANENT }
BY|by { return GROUP_OP } BY|by { return GROUP_OP }
AVG|SUM|MAX|MIN { yylval.str = yytext; return AGGR_OP } AVG|SUM|MAX|MIN|COUNT { yylval.str = yytext; return AGGR_OP }
avg|sum|max|min { yylval.str = strings.ToUpper(yytext); return AGGR_OP } avg|sum|max|min|count { yylval.str = strings.ToUpper(yytext); return AGGR_OP }
\<|>|AND|OR|and|or { yylval.str = strings.ToUpper(yytext); return CMP_OP } \<|>|AND|OR|and|or { yylval.str = strings.ToUpper(yytext); return CMP_OP }
==|!=|>=|<= { yylval.str = yytext; return CMP_OP } ==|!=|>=|<= { yylval.str = yytext; return CMP_OP }
[+\-] { yylval.str = yytext; return ADDITIVE_OP } [+\-] { yylval.str = yytext; return ADDITIVE_OP }

@ -380,7 +380,7 @@ var yyrules []yyrule = []yyrule{{regexp.MustCompile("[^\\n]"), nil, []yystartcon
return yyactionreturn{GROUP_OP, yyRT_USER_RETURN} return yyactionreturn{GROUP_OP, yyRT_USER_RETURN}
} }
return yyactionreturn{0, yyRT_FALLTHROUGH} return yyactionreturn{0, yyRT_FALLTHROUGH}
}}, {regexp.MustCompile("AVG|SUM|MAX|MIN"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) { }}, {regexp.MustCompile("AVG|SUM|MAX|MIN|COUNT"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
if r != "yyREJECT" { if r != "yyREJECT" {
@ -394,7 +394,7 @@ var yyrules []yyrule = []yyrule{{regexp.MustCompile("[^\\n]"), nil, []yystartcon
return yyactionreturn{AGGR_OP, yyRT_USER_RETURN} return yyactionreturn{AGGR_OP, yyRT_USER_RETURN}
} }
return yyactionreturn{0, yyRT_FALLTHROUGH} return yyactionreturn{0, yyRT_FALLTHROUGH}
}}, {regexp.MustCompile("avg|sum|max|min"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) { }}, {regexp.MustCompile("avg|sum|max|min|count"), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
if r != "yyREJECT" { if r != "yyREJECT" {
@ -509,7 +509,6 @@ var yyrules []yyrule = []yyrule{{regexp.MustCompile("[^\\n]"), nil, []yystartcon
yylval.num = model.SampleValue(num) yylval.num = model.SampleValue(num)
return yyactionreturn{NUMBER, yyRT_USER_RETURN} return yyactionreturn{NUMBER, yyRT_USER_RETURN}
} }
return yyactionreturn{0, yyRT_FALLTHROUGH} return yyactionreturn{0, yyRT_FALLTHROUGH}
}}, {regexp.MustCompile("\\\"(\\\\[^\\n]|[^\\\\\"])*\\\""), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) { }}, {regexp.MustCompile("\\\"(\\\\[^\\n]|[^\\\\\"])*\\\""), nil, []yystartcondition{}, false, func() (yyar yyactionreturn) {
defer func() { defer func() {

@ -58,7 +58,7 @@ func newTestStorage(t test.Tester) (storage *metric.TieredStorage, closer test.C
return return
} }
func ExpressionTests(t *testing.T) { func TestExpressions(t *testing.T) {
// Labels in expected output need to be alphabetically sorted. // Labels in expected output need to be alphabetically sorted.
var expressionTests = []struct { var expressionTests = []struct {
expr string expr string
@ -81,6 +81,14 @@ func ExpressionTests(t *testing.T) {
}, },
fullRanges: 0, fullRanges: 0,
intervalRanges: 8, intervalRanges: 8,
}, {
expr: "COUNT(http_requests) BY (job)",
output: []string{
"http_requests{job='api-server'} => 4 @[%v]",
"http_requests{job='app-server'} => 4 @[%v]",
},
fullRanges: 0,
intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job, group)", expr: "SUM(http_requests) BY (job, group)",
output: []string{ output: []string{
@ -116,10 +124,10 @@ func ExpressionTests(t *testing.T) {
fullRanges: 0, fullRanges: 0,
intervalRanges: 8, intervalRanges: 8,
}, { }, {
expr: "SUM(http_requests) BY (job) - count(http_requests)", expr: "SUM(http_requests) BY (job) - COUNT(http_requests) BY (job)",
output: []string{ output: []string{
"http_requests{job='api-server'} => 992 @[%v]", "http_requests{job='api-server'} => 996 @[%v]",
"http_requests{job='app-server'} => 2592 @[%v]", "http_requests{job='app-server'} => 2596 @[%v]",
}, },
fullRanges: 0, fullRanges: 0,
intervalRanges: 8, intervalRanges: 8,

Loading…
Cancel
Save