@ -43,6 +43,7 @@ const (
namespace = "prometheus"
subsystem = "engine"
queryTag = "query"
env = "query execution"
// The largest SampleValue that can be converted to an int64 without overflow.
maxInt64 = 9223372036854774784
@ -69,13 +70,22 @@ type (
ErrQueryTimeout string
// ErrQueryCanceled is returned if a query was canceled during processing.
ErrQueryCanceled string
// ErrTooManySamples is returned if a query would woud load more than the maximum allowed samples into memory.
ErrTooManySamples string
// ErrStorage is returned if an error was encountered in the storage layer
// during query handling.
ErrStorage error
)
func ( e ErrQueryTimeout ) Error ( ) string { return fmt . Sprintf ( "query timed out in %s" , string ( e ) ) }
func ( e ErrQueryCanceled ) Error ( ) string { return fmt . Sprintf ( "query was canceled in %s" , string ( e ) ) }
func ( e ErrQueryTimeout ) Error ( ) string {
return fmt . Sprintf ( "query timed out in %s" , string ( e ) )
}
func ( e ErrQueryCanceled ) Error ( ) string {
return fmt . Sprintf ( "query was canceled in %s" , string ( e ) )
}
func ( e ErrTooManySamples ) Error ( ) string {
return fmt . Sprintf ( "query processing would load too many samples into memory in %s" , string ( e ) )
}
// A Query is derived from an a raw query string and can be run against an engine
// it is associated with.
@ -166,19 +176,29 @@ func contextErr(err error, env string) error {
}
}
// EngineOpts contains configuration options used when creating a new Engine.
type EngineOpts struct {
Logger log . Logger
Reg prometheus . Registerer
MaxConcurrent int
MaxSamples int
Timeout time . Duration
}
// Engine handles the lifetime of queries from beginning to end.
// It is connected to a querier.
type Engine struct {
logger log . Logger
metrics * engineMetrics
timeout time . Duration
gate * gate . Gate
logger log . Logger
metrics * engineMetrics
timeout time . Duration
gate * gate . Gate
maxSamplesPerQuery int
}
// NewEngine returns a new engine.
func NewEngine ( logger log . Logger , reg prometheus . Registerer , maxConcurrent int , timeout time . Duration ) * Engine {
if l ogger == nil {
l ogger = log . NewNopLogger ( )
func NewEngine ( opts EngineOpts ) * Engine {
if opts. L ogger == nil {
opts. L ogger = log . NewNopLogger ( )
}
metrics := & engineMetrics {
@ -223,10 +243,10 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int,
ConstLabels : prometheus . Labels { "slice" : "result_sort" } ,
} ) ,
}
metrics . maxConcurrentQueries . Set ( float64 ( m axConcurrent) )
metrics . maxConcurrentQueries . Set ( float64 ( opts. M axConcurrent) )
if r eg != nil {
r eg. MustRegister (
if opts. R eg != nil {
opts. R eg. MustRegister (
metrics . currentQueries ,
metrics . maxConcurrentQueries ,
metrics . queryQueueTime ,
@ -236,10 +256,11 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int,
)
}
return & Engine {
gate : gate . New ( maxConcurrent ) ,
timeout : timeout ,
logger : logger ,
metrics : metrics ,
gate : gate . New ( opts . MaxConcurrent ) ,
timeout : opts . Timeout ,
logger : opts . Logger ,
metrics : metrics ,
maxSamplesPerQuery : opts . MaxSamples ,
}
}
@ -384,6 +405,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
endTimestamp : start ,
interval : 1 ,
ctx : ctx ,
maxSamples : ng . maxSamplesPerQuery ,
logger : ng . logger ,
}
val , err := evaluator . Eval ( s . Expr )
@ -424,6 +446,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
endTimestamp : timeMilliseconds ( s . End ) ,
interval : durationMilliseconds ( s . Interval ) ,
ctx : ctx ,
maxSamples : ng . maxSamplesPerQuery ,
logger : ng . logger ,
}
val , err := evaluator . Eval ( s . Expr )
@ -575,11 +598,12 @@ type evaluator struct {
ctx context . Context
startTimestamp int64 // Start time in milliseconds.
endTimestamp int64 // End time in milliseconds.
interval int64 // Interval in milliseconds.
endTimestamp int64 // End time in milliseconds.
interval int64 // Interval in milliseconds.
logger log . Logger
maxSamples int
currentSamples int
logger log . Logger
}
// errorf causes a panic with the input formatted into an error.
@ -673,15 +697,18 @@ func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.L
// rangeEval evaluates the given expressions, and then for each step calls
// the given function with the values computed for each expression at that
// step. The return value is the combination into time series of of all the
// step. The return value is the combination into time series of all the
// function call results.
func ( ev * evaluator ) rangeEval ( f func ( [ ] Value , * EvalNodeHelper ) Vector , exprs ... Expr ) Matrix {
numSteps := int ( ( ev . endTimestamp - ev . startTimestamp ) / ev . interval ) + 1
matrixes := make ( [ ] Matrix , len ( exprs ) )
origMatrixes := make ( [ ] Matrix , len ( exprs ) )
originalNumSamples := ev . currentSamples
for i , e := range exprs {
// Functions will take string arguments from the expressions, not the values.
if e != nil && e . Type ( ) != ValueTypeString {
// ev.currentSamples will be updated to the correct value within the ev.eval call.
matrixes [ i ] = ev . eval ( e ) . ( Matrix )
// Keep a copy of the original point slices so that they
@ -704,17 +731,25 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ..
}
enh := & EvalNodeHelper { out : make ( Vector , 0 , biggestLen ) }
seriess := make ( map [ uint64 ] Series , biggestLen ) // Output series by series hash.
tempNumSamples := ev . currentSamples
for ts := ev . startTimestamp ; ts <= ev . endTimestamp ; ts += ev . interval {
// Reset number of samples in memory after each timestamp.
ev . currentSamples = tempNumSamples
// Gather input vectors for this timestamp.
for i := range exprs {
vectors [ i ] = vectors [ i ] [ : 0 ]
for si , series := range matrixes [ i ] {
for _ , point := range series . Points {
if point . T == ts {
vectors [ i ] = append ( vectors [ i ] , Sample { Metric : series . Metric , Point : point } )
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes [ i ] [ si ] . Points = series . Points [ 1 : ]
if ev . currentSamples < ev . maxSamples {
vectors [ i ] = append ( vectors [ i ] , Sample { Metric : series . Metric , Point : point } )
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes [ i ] [ si ] . Points = series . Points [ 1 : ]
ev . currentSamples ++
} else {
ev . error ( ErrTooManySamples ( env ) )
}
}
break
}
@ -728,6 +763,16 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ..
ev . errorf ( "vector cannot contain metrics with the same labelset" )
}
enh . out = result [ : 0 ] // Reuse result vector.
ev . currentSamples += len ( result )
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
// needs to include the samples from the result here, as they're still in memory.
tempNumSamples += len ( result )
if ev . currentSamples > ev . maxSamples {
ev . error ( ErrTooManySamples ( env ) )
}
// If this could be an instant query, shortcut so as not to change sort order.
if ev . endTimestamp == ev . startTimestamp {
mat := make ( Matrix , len ( result ) )
@ -735,8 +780,10 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ..
s . Point . T = ts
mat [ i ] = Series { Metric : s . Metric , Points : [ ] Point { s . Point } }
}
ev . currentSamples = originalNumSamples + mat . TotalSamples ( )
return mat
}
// Add samples in output vector to output series.
for _ , sample := range result {
h := sample . Metric . Hash ( )
@ -750,19 +797,22 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ..
sample . Point . T = ts
ss . Points = append ( ss . Points , sample . Point )
seriess [ h ] = ss
}
}
// Reuse the original point slices.
for _ , m := range origMatrixes {
for _ , s := range m {
putPointSlice ( s . Points )
}
}
// Assemble the output matrix.
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make ( Matrix , 0 , len ( seriess ) )
for _ , ss := range seriess {
mat = append ( mat , ss )
}
ev . currentSamples = originalNumSamples + mat . TotalSamples ( )
return mat
}
@ -802,6 +852,7 @@ func (ev *evaluator) eval(expr Expr) Value {
} )
}
}
// Check if the function has a matrix argument.
var matrixArgIndex int
var matrixArg bool
@ -887,7 +938,12 @@ func (ev *evaluator) eval(expr Expr) Value {
it . ReduceDelta ( stepRange )
}
if len ( ss . Points ) > 0 {
mat = append ( mat , ss )
if ev . currentSamples < ev . maxSamples {
mat = append ( mat , ss )
ev . currentSamples += len ( ss . Points )
} else {
ev . error ( ErrTooManySamples ( env ) )
}
}
}
if mat . ContainsSameLabelset ( ) {
@ -971,13 +1027,19 @@ func (ev *evaluator) eval(expr Expr) Value {
for ts := ev . startTimestamp ; ts <= ev . endTimestamp ; ts += ev . interval {
_ , v , ok := ev . vectorSelectorSingle ( it , e , ts )
if ok {
ss . Points = append ( ss . Points , Point { V : v , T : ts } )
if ev . currentSamples < ev . maxSamples {
ss . Points = append ( ss . Points , Point { V : v , T : ts } )
ev . currentSamples ++
} else {
ev . error ( ErrTooManySamples ( env ) )
}
}
}
if len ( ss . Points ) > 0 {
mat = append ( mat , ss )
}
}
return mat
@ -1007,8 +1069,12 @@ func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector {
Metric : node . series [ i ] . Labels ( ) ,
Point : Point { V : v , T : t } ,
} )
ev . currentSamples ++
}
if ev . currentSamples >= ev . maxSamples {
ev . error ( ErrTooManySamples ( env ) )
}
}
return vec
}
@ -1063,11 +1129,12 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
maxt = ev . startTimestamp - offset
mint = maxt - durationMilliseconds ( node . Range )
matrix = make ( Matrix , 0 , len ( node . series ) )
err error
)
it := storage . NewBuffer ( durationMilliseconds ( node . Range ) )
for i , s := range node . series {
if err : = contextDone ( ev . ctx , "expression evaluation" ) ; err != nil {
if err = contextDone ( ev . ctx , "expression evaluation" ) ; err != nil {
ev . error ( err )
}
it . Reset ( s . Iterator ( ) )
@ -1127,14 +1194,22 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev . currentSamples >= ev . maxSamples {
ev . error ( ErrTooManySamples ( env ) )
}
out = append ( out , Point { T : t , V : v } )
ev . currentSamples ++
}
}
// The seeked sample might also be in the range.
if ok {
t , v := it . Values ( )
if t == maxt && ! value . IsStaleNaN ( v ) {
if ev . currentSamples >= ev . maxSamples {
ev . error ( ErrTooManySamples ( env ) )
}
out = append ( out , Point { T : t , V : v } )
ev . currentSamples ++
}
}
return out