Merge pull request #666 from prometheus/fabxc/pql/gate

Limit maximum number of concurrent queries.
pull/676/head
Fabian Reinartz 10 years ago
commit b365947bc4

@ -31,8 +31,9 @@ import (
) )
var ( var (
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.") defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
maxConcurrentQueries = flag.Int("query.max-concurrency", 20, "Maximum number of queries executed concurrently.")
) )
// SampleStream is a stream of Values belonging to an attached COWMetric. // SampleStream is a stream of Values belonging to an attached COWMetric.
@ -215,10 +216,7 @@ func (q *query) Cancel() {
// Exec implements the Query interface. // Exec implements the Query interface.
func (q *query) Exec() *Result { func (q *query) Exec() *Result {
ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout) res, err := q.ng.exec(q)
q.cancel = cancel
res, err := q.ng.exec(ctx, q)
return &Result{Err: err, Value: res} return &Result{Err: err, Value: res}
} }
@ -249,6 +247,8 @@ type Engine struct {
// The base context for all queries and its cancellation function. // The base context for all queries and its cancellation function.
baseCtx context.Context baseCtx context.Context
cancelQueries func() cancelQueries func()
// The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate
} }
// NewEngine returns a new engine. // NewEngine returns a new engine.
@ -258,6 +258,7 @@ func NewEngine(storage local.Storage) *Engine {
storage: storage, storage: storage,
baseCtx: ctx, baseCtx: ctx,
cancelQueries: cancel, cancelQueries: cancel,
gate: newQueryGate(*maxConcurrentQueries),
} }
} }
@ -316,9 +317,21 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query {
// //
// At this point per query only one EvalStmt is evaluated. Alert and record // At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine. // statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { func (ng *Engine) exec(q *query) (Value, error) {
const env = "query execution" const env = "query execution"
ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout)
q.cancel = cancel
queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()
if err := ng.gate.Start(ctx); err != nil {
return nil, err
}
defer ng.gate.Done()
queueTimer.Stop()
// Cancel when execution is done or an error was raised. // Cancel when execution is done or an error was raised.
defer q.cancel() defer q.cancel()
@ -1125,3 +1138,35 @@ func interpolateSamples(first, second *metric.SamplePair, timestamp clientmodel.
Timestamp: timestamp, Timestamp: timestamp,
} }
} }
// A queryGate controls the maximum number of concurrently running and waiting queries.
type queryGate struct {
ch chan struct{}
}
// newQueryGate returns a query gate that limits the number of queries
// being concurrently executed.
func newQueryGate(length int) *queryGate {
return &queryGate{
ch: make(chan struct{}, length),
}
}
// Start blocks until the gate has a free spot or the context is done.
func (g *queryGate) Start(ctx context.Context) error {
select {
case <-ctx.Done():
return contextDone(ctx, "query queue")
case g.ch <- struct{}{}:
return nil
}
}
// Done releases a single spot in the gate.
func (g *queryGate) Done() {
select {
case <-g.ch:
default:
panic("engine.queryGate.Done: more operations done than started")
}
}

@ -6,14 +6,61 @@ import (
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local"
) )
var noop = testStmt(func(context.Context) error { var noop = testStmt(func(context.Context) error {
return nil return nil
}) })
func TestQueryConcurreny(t *testing.T) {
engine := NewEngine(nil)
defer engine.Stop()
block := make(chan struct{})
processing := make(chan struct{})
f1 := testStmt(func(context.Context) error {
processing <- struct{}{}
<-block
return nil
})
for i := 0; i < *maxConcurrentQueries; i++ {
q := engine.newTestQuery(f1)
go q.Exec()
select {
case <-processing:
// Expected.
case <-time.After(5 * time.Millisecond):
t.Fatalf("Query within concurrency threshold not being executed")
}
}
q := engine.newTestQuery(f1)
go q.Exec()
select {
case <-processing:
t.Fatalf("Query above concurrency threhosld being executed")
case <-time.After(5 * time.Millisecond):
// Expected.
}
// Terminate a running query.
block <- struct{}{}
select {
case <-processing:
// Expected.
case <-time.After(5 * time.Millisecond):
t.Fatalf("Query within concurrency threshold not being executed")
}
// Terminate remaining queries.
for i := 0; i < *maxConcurrentQueries; i++ {
block <- struct{}{}
}
}
func TestQueryTimeout(t *testing.T) { func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 5 * time.Millisecond *defaultQueryTimeout = 5 * time.Millisecond
defer func() { defer func() {
@ -21,10 +68,7 @@ func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 2 * time.Minute *defaultQueryTimeout = 2 * time.Minute
}() }()
storage, closer := local.NewTestStorage(t, 1) engine := NewEngine(nil)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop() defer engine.Stop()
f1 := testStmt(func(context.Context) error { f1 := testStmt(func(context.Context) error {
@ -46,10 +90,7 @@ func TestQueryTimeout(t *testing.T) {
} }
func TestQueryCancel(t *testing.T) { func TestQueryCancel(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1) engine := NewEngine(nil)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop() defer engine.Stop()
// As for timeouts, cancellation is only checked at designated points. We ensure // As for timeouts, cancellation is only checked at designated points. We ensure
@ -91,10 +132,7 @@ func TestQueryCancel(t *testing.T) {
} }
func TestEngineShutdown(t *testing.T) { func TestEngineShutdown(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1) engine := NewEngine(nil)
defer closer.Close()
engine := NewEngine(storage)
handlerExecutions := 0 handlerExecutions := 0
// Shutdown engine on first handler execution. Should handler execution ever become // Shutdown engine on first handler execution. Should handler execution ever become

@ -31,7 +31,7 @@ const (
GetValueAtTimeTime GetValueAtTimeTime
GetBoundaryValuesTime GetBoundaryValuesTime
GetRangeValuesTime GetRangeValuesTime
ViewQueueTime ExecQueueTime
ViewDiskPreparationTime ViewDiskPreparationTime
ViewDataExtractionTime ViewDataExtractionTime
ViewDiskExtractionTime ViewDiskExtractionTime
@ -64,8 +64,8 @@ func (s QueryTiming) String() string {
return "GetBoundaryValues() time" return "GetBoundaryValues() time"
case GetRangeValuesTime: case GetRangeValuesTime:
return "GetRangeValues() time" return "GetRangeValues() time"
case ViewQueueTime: case ExecQueueTime:
return "View queue wait time" return "Exec queue wait time"
case ViewDiskPreparationTime: case ViewDiskPreparationTime:
return "View building disk preparation time" return "View building disk preparation time"
case ViewDataExtractionTime: case ViewDataExtractionTime:

Loading…
Cancel
Save