@ -29,7 +29,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -418,7 +418,7 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func ( ng * Engine ) exec ( ctx context . Context , q * query ) ( v parser . Value , w storage . Warnings , err error ) {
func ( ng * Engine ) exec ( ctx context . Context , q * query ) ( v parser . Value , ws storage . Warnings , err error ) {
ng . metrics . currentQueries . Inc ( )
defer ng . metrics . currentQueries . Dec ( )
@ -517,13 +517,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
}
defer querier . Close ( )
warnings , err := ng . populateSeries ( ctxPrepare , querier , s )
ng . populateSeries ( querier , s )
prepareSpanTimer . Finish ( )
if err != nil {
return nil , warnings , err
}
evalSpanTimer , ctxInnerEval := query . stats . GetSpanTimer ( ctx , stats . InnerEvalTime , ng . metrics . queryInnerEval )
// Instant evaluation. This is executed as a range evaluation with one step.
if s . Start == s . End && s . Interval == 0 {
@ -539,7 +535,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
lookbackDelta : ng . lookbackDelta ,
}
val , err := evaluator . Eval ( s . Expr )
val , warnings , err := evaluator . Eval ( s . Expr )
if err != nil {
return nil , warnings , err
}
@ -588,7 +584,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
logger : ng . logger ,
lookbackDelta : ng . lookbackDelta ,
}
val , err := evaluator . Eval ( s . Expr )
val , warnings , err := evaluator . Eval ( s . Expr )
if err != nil {
return nil , warnings , err
}
@ -649,35 +645,29 @@ func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time {
return s . Start . Add ( - maxOffset )
}
func ( ng * Engine ) populateSeries ( ctx context . Context , querier storage . Querier , s * parser . EvalStmt ) ( storage . Warnings , error ) {
var (
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable.
evalRange time . Duration
warnings storage . Warnings
err error
)
func ( ng * Engine ) populateSeries ( querier storage . Querier , s * parser . EvalStmt ) {
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable.
var evalRange time . Duration
parser . Inspect ( s . Expr , func ( node parser . Node , path [ ] parser . Node ) error {
var set storage . SeriesSet
var wrn storage . Warnings
hints := & storage . SelectHints {
Start : timestamp . FromTime ( s . Start ) ,
End : timestamp . FromTime ( s . End ) ,
Step : durationToInt64Millis ( s . Interval ) ,
}
// We need to make sure we select the timerange selected by the subquery.
// TODO(gouthamve): cumulativeSubqueryOffset gives the sum of range and the offset
// we can optimise it by separating out the range and offsets, and subtracting the offsets
// from end also.
subqOffset := ng . cumulativeSubqueryOffset ( path )
offsetMilliseconds := durationMilliseconds ( subqOffset )
hints . Start = hints . Start - offsetMilliseconds
switch n := node . ( type ) {
case * parser . VectorSelector :
hints := & storage . SelectHints {
Start : timestamp . FromTime ( s . Start ) ,
End : timestamp . FromTime ( s . End ) ,
Step : durationToInt64Millis ( s . Interval ) ,
}
// We need to make sure we select the timerange selected by the subquery.
// TODO(gouthamve): cumulativeSubqueryOffset gives the sum of range and the offset
// we can optimise it by separating out the range and offsets, and subtracting the offsets
// from end also.
subqOffset := ng . cumulativeSubqueryOffset ( path )
offsetMilliseconds := durationMilliseconds ( subqOffset )
hints . Start = hints . Start - offsetMilliseconds
if evalRange == 0 {
hints . Start = hints . Start - durationMilliseconds ( ng . lookbackDelta )
} else {
@ -696,20 +686,12 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
hints . End = hints . End - offsetMilliseconds
}
set , wrn , err = querier . Select ( false , hints , n . LabelMatchers ... )
warnings = append ( warnings , wrn ... )
if err != nil {
level . Error ( ng . logger ) . Log ( "msg" , "error selecting series set" , "err" , err )
return err
}
n . UnexpandedSeriesSet = set
n . UnexpandedSeriesSet = querier . Select ( false , hints , n . LabelMatchers ... )
case * parser . MatrixSelector :
evalRange = n . Range
}
return nil
} )
return warnings , err
}
// extractFuncFromPath walks up the path and searches for the first instance of
@ -743,34 +725,40 @@ func extractGroupsFromPath(p []parser.Node) (bool, []string) {
return false , nil
}
func checkForSeriesSetExpansion ( ctx context . Context , expr parser . Expr ) {
func checkAndExpandSeriesSet ( ctx context . Context , expr parser . Expr ) ( storage . Warnings , erro r ) {
switch e := expr . ( type ) {
case * parser . MatrixSelector :
checkForSeriesSetExpansion ( ctx , e . VectorSelector )
return checkAndExpandSeriesSet ( ctx , e . VectorSelector )
case * parser . VectorSelector :
if e . Series == nil {
series , err := expandSeriesSet ( ctx , e . UnexpandedSeriesSet )
if err != nil {
panic ( err )
} else {
e . Series = series
}
if e . Series != nil {
return nil , nil
}
series , ws , err := expandSeriesSet ( ctx , e . UnexpandedSeriesSet )
e . Series = series
return ws , err
}
return nil , nil
}
func expandSeriesSet ( ctx context . Context , it storage . SeriesSet ) ( res [ ] storage . Series , err error ) {
func expandSeriesSet ( ctx context . Context , it storage . SeriesSet ) ( res [ ] storage . Series , ws storage . Warnings , err error ) {
for it . Next ( ) {
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
return nil , nil , ctx . Err ( )
default :
}
res = append ( res , it . At ( ) )
}
return res , it . Err ( )
return res , it . Warnings ( ) , it . Err ( )
}
type errWithWarnings struct {
err error
warnings storage . Warnings
}
func ( e errWithWarnings ) Error ( ) string { return e . err . Error ( ) }
// An evaluator evaluates given expressions over given fixed timestamps. It
// is attached to an engine through which it connects to a querier and reports
// errors. On timeout or cancellation of its context it terminates.
@ -799,26 +787,33 @@ func (ev *evaluator) error(err error) {
}
// recover is the handler that turns panics into returns from the top level of evaluation.
func ( ev * evaluator ) recover ( errp * error ) {
func ( ev * evaluator ) recover ( ws * storage . Warnings , errp * error ) {
e := recover ( )
if e == nil {
return
}
if err , ok := e . ( runtime . Error ) ; ok {
switch err := e . ( type ) {
case runtime . Error :
// Print the stack trace but do not inhibit the running application.
buf := make ( [ ] byte , 64 << 10 )
buf = buf [ : runtime . Stack ( buf , false ) ]
level . Error ( ev . logger ) . Log ( "msg" , "runtime panic in parser" , "err" , e , "stacktrace" , string ( buf ) )
* errp = errors . Wrap ( err , "unexpected error" )
} else {
case errWithWarnings :
* errp = err . err
* ws = append ( * ws , err . warnings ... )
default :
* errp = e . ( error )
}
}
func ( ev * evaluator ) Eval ( expr parser . Expr ) ( v parser . Value , err error ) {
defer ev . recover ( & err )
return ev . eval ( expr ) , nil
func ( ev * evaluator ) Eval ( expr parser . Expr ) ( v parser . Value , ws storage . Warnings , err error ) {
defer ev . recover ( & ws , & err )
v , ws = ev . eval ( expr )
return v , ws , nil
}
// EvalNodeHelper stores extra information and caches for evaluating a single node across steps.
@ -884,17 +879,20 @@ func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.L
// the given function with the values computed for each expression at that
// step. The return value is the combination into time series of all the
// function call results.
func ( ev * evaluator ) rangeEval ( f func ( [ ] parser . Value , * EvalNodeHelper ) Vector , exprs ... parser . Expr ) Matrix {
func ( ev * evaluator ) rangeEval ( f func ( [ ] parser . Value , * EvalNodeHelper ) ( Vector , storage . Warnings ) , exprs ... parser . Expr ) ( Matrix , storage . Warnings ) {
numSteps := int ( ( ev . endTimestamp - ev . startTimestamp ) / ev . interval ) + 1
matrixes := make ( [ ] Matrix , len ( exprs ) )
origMatrixes := make ( [ ] Matrix , len ( exprs ) )
originalNumSamples := ev . currentSamples
var warnings storage . Warnings
for i , e := range exprs {
// Functions will take string arguments from the expressions, not the values.
if e != nil && e . Type ( ) != parser . ValueTypeString {
// ev.currentSamples will be updated to the correct value within the ev.eval call.
matrixes [ i ] = ev . eval ( e ) . ( Matrix )
val , ws := ev . eval ( e )
warnings = append ( warnings , ws ... )
matrixes [ i ] = val . ( Matrix )
// Keep a copy of the original point slices so that they
// can be returned to the pool.
@ -946,11 +944,12 @@ func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) Vector, e
}
// Make the function call.
enh . ts = ts
result := f ( args , enh )
result , ws := f ( args , enh )
if result . ContainsSameLabelset ( ) {
ev . errorf ( "vector cannot contain metrics with the same labelset" )
}
enh . out = result [ : 0 ] // Reuse result vector.
warnings = append ( warnings , ws ... )
ev . currentSamples += len ( result )
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
@ -969,7 +968,7 @@ func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) Vector, e
mat [ i ] = Series { Metric : s . Metric , Points : [ ] Point { s . Point } }
}
ev . currentSamples = originalNumSamples + mat . TotalSamples ( )
return mat
return mat , warnings
}
// Add samples in output vector to output series.
@ -1001,29 +1000,30 @@ func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) Vector, e
mat = append ( mat , ss )
}
ev . currentSamples = originalNumSamples + mat . TotalSamples ( )
return mat
return mat , warnings
}
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func ( ev * evaluator ) evalSubquery ( subq * parser . SubqueryExpr ) * parser . MatrixSelector {
val := ev . eval ( subq ) . ( Matrix )
func ( ev * evaluator ) evalSubquery ( subq * parser . SubqueryExpr ) ( * parser . MatrixSelector , storage . Warnings ) {
val , ws := ev . eval ( subq )
mat := val . ( Matrix )
vs := & parser . VectorSelector {
Offset : subq . Offset ,
Series : make ( [ ] storage . Series , 0 , len ( val ) ) ,
Series : make ( [ ] storage . Series , 0 , len ( mat ) ) ,
}
ms := & parser . MatrixSelector {
Range : subq . Range ,
VectorSelector : vs ,
}
for _ , s := range val {
for _ , s := range mat {
vs . Series = append ( vs . Series , NewStorageSeries ( s ) )
}
return ms
return ms , ws
}
// eval evaluates the given expression as the given AST expression node requires.
func ( ev * evaluator ) eval ( expr parser . Expr ) parser . Value {
func ( ev * evaluator ) eval ( expr parser . Expr ) ( parser . Value , storage . Warnings ) {
// This is the top-level evaluation method.
// Thus, we check for timeout/cancellation here.
if err := contextDone ( ev . ctx , "expression evaluation" ) ; err != nil {
@ -1035,16 +1035,16 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
case * parser . AggregateExpr :
unwrapParenExpr ( & e . Param )
if s , ok := e . Param . ( * parser . StringLiteral ) ; ok {
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . aggregation ( e . Op , e . Grouping , e . Without , s . Val , v [ 0 ] . ( Vector ) , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return ev . aggregation ( e . Op , e . Grouping , e . Without , s . Val , v [ 0 ] . ( Vector ) , enh ) , nil
} , e . Expr )
}
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
var param float64
if e . Param != nil {
param = v [ 0 ] . ( Vector ) [ 0 ] . V
}
return ev . aggregation ( e . Op , e . Grouping , e . Without , param , v [ 1 ] . ( Vector ) , enh )
return ev . aggregation ( e . Op , e . Grouping , e . Without , param , v [ 1 ] . ( Vector ) , enh ) , nil
} , e . Param , e . Expr )
case * parser . Call :
@ -1056,15 +1056,19 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
// a vector selector.
vs , ok := e . Args [ 0 ] . ( * parser . VectorSelector )
if ok {
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return call ( [ ] parser . Value { ev . vectorSelector ( vs , enh . ts ) } , e . Args , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
val , ws := ev . vectorSelector ( vs , enh . ts )
return call ( [ ] parser . Value { val } , e . Args , enh ) , ws
} )
}
}
// Check if the function has a matrix argument.
var matrixArgIndex int
var matrixArg bool
var (
matrixArgIndex int
matrixArg bool
warnings storage . Warnings
)
for i := range e . Args {
unwrapParenExpr ( & e . Args [ i ] )
a := e . Args [ i ]
@ -1078,14 +1082,16 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
matrixArgIndex = i
matrixArg = true
// Replacing parser.SubqueryExpr with parser.MatrixSelector.
e . Args [ i ] = ev . evalSubquery ( subq )
val , ws := ev . evalSubquery ( subq )
e . Args [ i ] = val
warnings = append ( warnings , ws ... )
break
}
}
if ! matrixArg {
// Does not have a matrix argument.
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return call ( v , e . Args , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return call ( v , e . Args , enh ) , warnings
} , e . Args ... )
}
@ -1095,16 +1101,22 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
otherInArgs := make ( [ ] Vector , len ( e . Args ) )
for i , e := range e . Args {
if i != matrixArgIndex {
otherArgs [ i ] = ev . eval ( e ) . ( Matrix )
val , ws := ev . eval ( e )
otherArgs [ i ] = val . ( Matrix )
otherInArgs [ i ] = Vector { Sample { } }
inArgs [ i ] = otherInArgs [ i ]
warnings = append ( warnings , ws ... )
}
}
sel := e . Args [ matrixArgIndex ] . ( * parser . MatrixSelector )
selVS := sel . VectorSelector . ( * parser . VectorSelector )
checkForSeriesSetExpansion ( ev . ctx , sel )
ws , err := checkAndExpandSeriesSet ( ev . ctx , sel )
warnings = append ( warnings , ws ... )
if err != nil {
ev . error ( errWithWarnings { errors . Wrap ( err , "expanding series" ) , warnings } )
}
mat := make ( Matrix , 0 , len ( selVS . Series ) ) // Output matrix.
offset := durationMilliseconds ( selVS . Offset )
selRange := durationMilliseconds ( sel . Range )
@ -1182,7 +1194,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
// Iterate once to look for a complete series.
for _ , s := range mat {
if len ( s . Points ) == steps {
return Matrix { }
return Matrix { } , warnings
}
}
@ -1193,7 +1205,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
found [ p . T ] = struct { } { }
}
if i > 0 && len ( found ) == steps {
return Matrix { }
return Matrix { } , warnings
}
}
@ -1209,20 +1221,21 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
Metric : createLabelsForAbsentFunction ( e . Args [ 0 ] ) ,
Points : newp ,
} ,
}
} , warnings
}
if mat . ContainsSameLabelset ( ) {
ev . errorf ( "vector cannot contain metrics with the same labelset" )
}
return mat
return mat , warnings
case * parser . ParenExpr :
return ev . eval ( e . Expr )
case * parser . UnaryExpr :
mat := ev . eval ( e . Expr ) . ( Matrix )
val , ws := ev . eval ( e . Expr )
mat := val . ( Matrix )
if e . Op == parser . SUB {
for i := range mat {
mat [ i ] . Metric = dropMetricName ( mat [ i ] . Metric )
@ -1234,53 +1247,56 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
ev . errorf ( "vector cannot contain metrics with the same labelset" )
}
}
return mat
return mat , ws
case * parser . BinaryExpr :
switch lt , rt := e . LHS . Type ( ) , e . RHS . Type ( ) ; {
case lt == parser . ValueTypeScalar && rt == parser . ValueTypeScalar :
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
val := scalarBinop ( e . Op , v [ 0 ] . ( Vector ) [ 0 ] . Point . V , v [ 1 ] . ( Vector ) [ 0 ] . Point . V )
return append ( enh . out , Sample { Point : Point { V : val } } )
return append ( enh . out , Sample { Point : Point { V : val } } ) , nil
} , e . LHS , e . RHS )
case lt == parser . ValueTypeVector && rt == parser . ValueTypeVector :
switch e . Op {
case parser . LAND :
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . VectorAnd ( v [ 0 ] . ( Vector ) , v [ 1 ] . ( Vector ) , e . VectorMatching , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return ev . VectorAnd ( v [ 0 ] . ( Vector ) , v [ 1 ] . ( Vector ) , e . VectorMatching , enh ) , nil
} , e . LHS , e . RHS )
case parser . LOR :
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . VectorOr ( v [ 0 ] . ( Vector ) , v [ 1 ] . ( Vector ) , e . VectorMatching , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return ev . VectorOr ( v [ 0 ] . ( Vector ) , v [ 1 ] . ( Vector ) , e . VectorMatching , enh ) , nil
} , e . LHS , e . RHS )
case parser . LUNLESS :
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . VectorUnless ( v [ 0 ] . ( Vector ) , v [ 1 ] . ( Vector ) , e . VectorMatching , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return ev . VectorUnless ( v [ 0 ] . ( Vector ) , v [ 1 ] . ( Vector ) , e . VectorMatching , enh ) , nil
} , e . LHS , e . RHS )
default :
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . VectorBinop ( e . Op , v [ 0 ] . ( Vector ) , v [ 1 ] . ( Vector ) , e . VectorMatching , e . ReturnBool , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return ev . VectorBinop ( e . Op , v [ 0 ] . ( Vector ) , v [ 1 ] . ( Vector ) , e . VectorMatching , e . ReturnBool , enh ) , nil
} , e . LHS , e . RHS )
}
case lt == parser . ValueTypeVector && rt == parser . ValueTypeScalar :
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . VectorscalarBinop ( e . Op , v [ 0 ] . ( Vector ) , Scalar { V : v [ 1 ] . ( Vector ) [ 0 ] . Point . V } , false , e . ReturnBool , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return ev . VectorscalarBinop ( e . Op , v [ 0 ] . ( Vector ) , Scalar { V : v [ 1 ] . ( Vector ) [ 0 ] . Point . V } , false , e . ReturnBool , enh ) , nil
} , e . LHS , e . RHS )
case lt == parser . ValueTypeScalar && rt == parser . ValueTypeVector :
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return ev . VectorscalarBinop ( e . Op , v [ 1 ] . ( Vector ) , Scalar { V : v [ 0 ] . ( Vector ) [ 0 ] . Point . V } , true , e . ReturnBool , enh )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return ev . VectorscalarBinop ( e . Op , v [ 1 ] . ( Vector ) , Scalar { V : v [ 0 ] . ( Vector ) [ 0 ] . Point . V } , true , e . ReturnBool , enh ) , nil
} , e . LHS , e . RHS )
}
case * parser . NumberLiteral :
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) Vector {
return append ( enh . out , Sample { Point : Point { V : e . Val } } )
return ev . rangeEval ( func ( v [ ] parser . Value , enh * EvalNodeHelper ) ( Vector , storage . Warnings ) {
return append ( enh . out , Sample { Point : Point { V : e . Val } } ) , nil
} )
case * parser . VectorSelector :
checkForSeriesSetExpansion ( ev . ctx , e )
ws , err := checkAndExpandSeriesSet ( ev . ctx , e )
if err != nil {
ev . error ( errWithWarnings { errors . Wrap ( err , "expanding series" ) , ws } )
}
mat := make ( Matrix , 0 , len ( e . Series ) )
it := storage . NewBuffer ( durationMilliseconds ( ev . lookbackDelta ) )
for i , s := range e . Series {
@ -1307,9 +1323,8 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
} else {
putPointSlice ( ss . Points )
}
}
return mat
return mat , ws
case * parser . MatrixSelector :
if ev . startTimestamp != ev . endTimestamp {
@ -1342,11 +1357,11 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
newEv . startTimestamp += newEv . interval
}
res := newEv . eval ( e . Expr )
res , ws := newEv . eval ( e . Expr )
ev . currentSamples = newEv . currentSamples
return res
return res , ws
case * parser . StringLiteral :
return String { V : e . Val , T : ev . startTimestamp }
return String { V : e . Val , T : ev . startTimestamp } , nil
}
panic ( errors . Errorf ( "unhandled expression of type: %T" , expr ) )
@ -1357,13 +1372,12 @@ func durationToInt64Millis(d time.Duration) int64 {
}
// vectorSelector evaluates a *parser.VectorSelector expression.
func ( ev * evaluator ) vectorSelector ( node * parser . VectorSelector , ts int64 ) Vector {
checkForSeriesSetExpansion ( ev . ctx , node )
var (
vec = make ( Vector , 0 , len ( node . Series ) )
)
func ( ev * evaluator ) vectorSelector ( node * parser . VectorSelector , ts int64 ) ( Vector , storage . Warnings ) {
ws , err := checkAndExpandSeriesSet ( ev . ctx , node )
if err != nil {
ev . error ( errWithWarnings { errors . Wrap ( err , "expanding series" ) , ws } )
}
vec := make ( Vector , 0 , len ( node . Series ) )
it := storage . NewBuffer ( durationMilliseconds ( ev . lookbackDelta ) )
for i , s := range node . Series {
it . Reset ( s . Iterator ( ) )
@ -1381,7 +1395,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) Vecto
ev . error ( ErrTooManySamples ( env ) )
}
}
return vec
return vec , ws
}
// vectorSelectorSingle evaluates a instant vector for the iterator of one time series.
@ -1429,21 +1443,23 @@ func putPointSlice(p []Point) {
}
// matrixSelector evaluates a *parser.MatrixSelector expression.
func ( ev * evaluator ) matrixSelector ( node * parser . MatrixSelector ) Matrix {
checkForSeriesSetExpansion ( ev . ctx , node )
vs := node . VectorSelector . ( * parser . VectorSelector )
func ( ev * evaluator ) matrixSelector ( node * parser . MatrixSelector ) ( Matrix , storage . Warnings ) {
var (
vs = node . VectorSelector . ( * parser . VectorSelector )
offset = durationMilliseconds ( vs . Offset )
maxt = ev . startTimestamp - offset
mint = maxt - durationMilliseconds ( node . Range )
matrix = make ( Matrix , 0 , len ( vs . Series ) )
it = storage . NewBuffer ( durationMilliseconds ( node . Range ) )
)
ws , err := checkAndExpandSeriesSet ( ev . ctx , node )
if err != nil {
ev . error ( errWithWarnings { errors . Wrap ( err , "expanding series" ) , ws } )
}
it := storage . NewBuffer ( durationMilliseconds ( node . Range ) )
series := vs . Series
for i , s := range series {
if err := contextDone ( ev . ctx , "expression evaluation" ) ; err != nil {
ev . error ( err )
@ -1461,7 +1477,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix {
putPointSlice ( ss . Points )
}
}
return matrix
return matrix , ws
}
// matrixIterSlice populates a matrix vector covering the requested range for a