From 9ab1f6c690940187b227cdd6ec591349abf0f5aa Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 1 May 2015 00:49:19 +0200 Subject: [PATCH] Limit maximum number of concurrent queries. A high number of concurrent queries can slow each other down so that none of them is reasonbly responsive. This commit limits the number of queries being concurrently executed. --- promql/engine.go | 59 +++++++++++++++++++++++++++++++++----- promql/engine_test.go | 66 ++++++++++++++++++++++++++++++++++--------- stats/query_stats.go | 6 ++-- 3 files changed, 107 insertions(+), 24 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 7d3cc58dd..7b53f661d 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -31,8 +31,9 @@ import ( ) var ( - stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") - defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.") + stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") + defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.") + maxConcurrentQueries = flag.Int("query.max-concurrency", 20, "Maximum number of queries executed concurrently.") ) // SampleStream is a stream of Values belonging to an attached COWMetric. @@ -215,10 +216,7 @@ func (q *query) Cancel() { // Exec implements the Query interface. func (q *query) Exec() *Result { - ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout) - q.cancel = cancel - - res, err := q.ng.exec(ctx, q) + res, err := q.ng.exec(q) return &Result{Err: err, Value: res} } @@ -249,6 +247,8 @@ type Engine struct { // The base context for all queries and its cancellation function. baseCtx context.Context cancelQueries func() + // The gate limiting the maximum number of concurrent and waiting queries. + gate *queryGate } // NewEngine returns a new engine. @@ -258,6 +258,7 @@ func NewEngine(storage local.Storage) *Engine { storage: storage, baseCtx: ctx, cancelQueries: cancel, + gate: newQueryGate(*maxConcurrentQueries), } } @@ -316,9 +317,21 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query { // // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. -func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { +func (ng *Engine) exec(q *query) (Value, error) { const env = "query execution" + ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout) + q.cancel = cancel + + queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start() + + if err := ng.gate.Start(ctx); err != nil { + return nil, err + } + defer ng.gate.Done() + + queueTimer.Stop() + // Cancel when execution is done or an error was raised. defer q.cancel() @@ -1125,3 +1138,35 @@ func interpolateSamples(first, second *metric.SamplePair, timestamp clientmodel. Timestamp: timestamp, } } + +// A queryGate controls the maximum number of concurrently running and waiting queries. +type queryGate struct { + ch chan struct{} +} + +// newQueryGate returns a query gate that limits the number of queries +// being concurrently executed. +func newQueryGate(length int) *queryGate { + return &queryGate{ + ch: make(chan struct{}, length), + } +} + +// Start blocks until the gate has a free spot or the context is done. +func (g *queryGate) Start(ctx context.Context) error { + select { + case <-ctx.Done(): + return contextDone(ctx, "query queue") + case g.ch <- struct{}{}: + return nil + } +} + +// Done releases a single spot in the gate. +func (g *queryGate) Done() { + select { + case <-g.ch: + default: + panic("engine.queryGate.Done: more operations done than started") + } +} diff --git a/promql/engine_test.go b/promql/engine_test.go index 09b175385..ab86afba1 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -6,14 +6,61 @@ import ( "time" "golang.org/x/net/context" - - "github.com/prometheus/prometheus/storage/local" ) var noop = testStmt(func(context.Context) error { return nil }) +func TestQueryConcurreny(t *testing.T) { + engine := NewEngine(nil) + defer engine.Stop() + + block := make(chan struct{}) + processing := make(chan struct{}) + f1 := testStmt(func(context.Context) error { + processing <- struct{}{} + <-block + return nil + }) + + for i := 0; i < *maxConcurrentQueries; i++ { + q := engine.newTestQuery(f1) + go q.Exec() + select { + case <-processing: + // Expected. + case <-time.After(5 * time.Millisecond): + t.Fatalf("Query within concurrency threshold not being executed") + } + } + + q := engine.newTestQuery(f1) + go q.Exec() + + select { + case <-processing: + t.Fatalf("Query above concurrency threhosld being executed") + case <-time.After(5 * time.Millisecond): + // Expected. + } + + // Terminate a running query. + block <- struct{}{} + + select { + case <-processing: + // Expected. + case <-time.After(5 * time.Millisecond): + t.Fatalf("Query within concurrency threshold not being executed") + } + + // Terminate remaining queries. + for i := 0; i < *maxConcurrentQueries; i++ { + block <- struct{}{} + } +} + func TestQueryTimeout(t *testing.T) { *defaultQueryTimeout = 5 * time.Millisecond defer func() { @@ -21,10 +68,7 @@ func TestQueryTimeout(t *testing.T) { *defaultQueryTimeout = 2 * time.Minute }() - storage, closer := local.NewTestStorage(t, 1) - defer closer.Close() - - engine := NewEngine(storage) + engine := NewEngine(nil) defer engine.Stop() f1 := testStmt(func(context.Context) error { @@ -46,10 +90,7 @@ func TestQueryTimeout(t *testing.T) { } func TestQueryCancel(t *testing.T) { - storage, closer := local.NewTestStorage(t, 1) - defer closer.Close() - - engine := NewEngine(storage) + engine := NewEngine(nil) defer engine.Stop() // As for timeouts, cancellation is only checked at designated points. We ensure @@ -91,10 +132,7 @@ func TestQueryCancel(t *testing.T) { } func TestEngineShutdown(t *testing.T) { - storage, closer := local.NewTestStorage(t, 1) - defer closer.Close() - - engine := NewEngine(storage) + engine := NewEngine(nil) handlerExecutions := 0 // Shutdown engine on first handler execution. Should handler execution ever become diff --git a/stats/query_stats.go b/stats/query_stats.go index 8ed7c5f46..1f5cb7d52 100644 --- a/stats/query_stats.go +++ b/stats/query_stats.go @@ -31,7 +31,7 @@ const ( GetValueAtTimeTime GetBoundaryValuesTime GetRangeValuesTime - ViewQueueTime + ExecQueueTime ViewDiskPreparationTime ViewDataExtractionTime ViewDiskExtractionTime @@ -64,8 +64,8 @@ func (s QueryTiming) String() string { return "GetBoundaryValues() time" case GetRangeValuesTime: return "GetRangeValues() time" - case ViewQueueTime: - return "View queue wait time" + case ExecQueueTime: + return "Exec queue wait time" case ViewDiskPreparationTime: return "View building disk preparation time" case ViewDataExtractionTime: