diff --git a/promql/engine.go b/promql/engine.go index 0b6468f92..2f442acbe 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -33,7 +33,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" @@ -189,12 +188,6 @@ func (q *query) Exec(ctx context.Context) *Result { span.SetTag(queryTag, q.stmt.String()) } - // Log query in active log. - if q.ng.activeQueryTracker != nil { - queryIndex := q.ng.activeQueryTracker.Insert(q.q) - defer q.ng.activeQueryTracker.Delete(queryIndex) - } - // Exec query. res, warnings, err := q.ng.exec(ctx, q) @@ -236,7 +229,6 @@ type Engine struct { logger log.Logger metrics *engineMetrics timeout time.Duration - gate *gate.Gate maxSamplesPerQuery int activeQueryTracker *ActiveQueryTracker queryLogger QueryLogger @@ -323,7 +315,6 @@ func NewEngine(opts EngineOpts) *Engine { } return &Engine{ - gate: gate.New(opts.MaxConcurrent), timeout: opts.Timeout, logger: opts.Logger, metrics: metrics, @@ -466,12 +457,16 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v Value, w storage.Warnin defer execSpanTimer.Finish() queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) - - if err := ng.gate.Start(ctx); err != nil { - return nil, nil, contextErr(err, "query queue") + // Log query in active log. The active log guarantees that we don't run over + // MaxConcurrent queries. + if ng.activeQueryTracker != nil { + queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q) + if err != nil { + queueSpanTimer.Finish() + return nil, nil, contextErr(err, "query queue") + } + defer ng.activeQueryTracker.Delete(queryIndex) } - defer ng.gate.Done() - queueSpanTimer.Finish() // Cancel when execution is done or an error was raised. diff --git a/promql/engine_test.go b/promql/engine_test.go index ad76166d9..0c20b6a6f 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -16,6 +16,8 @@ package promql import ( "context" "errors" + "io/ioutil" + "os" "strings" "testing" "time" @@ -28,12 +30,20 @@ import ( ) func TestQueryConcurrency(t *testing.T) { + maxConcurrency := 10 + + dir, err := ioutil.TempDir("", "test_concurrency") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + queryTracker := NewActiveQueryTracker(dir, maxConcurrency, nil) + opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxConcurrent: maxConcurrency, + MaxSamples: 10, + Timeout: 100 * time.Second, + ActiveQueryTracker: queryTracker, } engine := NewEngine(opts) diff --git a/promql/query_logger.go b/promql/query_logger.go index 1f4f2df5c..e63e5cde5 100644 --- a/promql/query_logger.go +++ b/promql/query_logger.go @@ -14,6 +14,7 @@ package promql import ( + "context" "encoding/json" "os" "path/filepath" @@ -174,13 +175,17 @@ func (tracker ActiveQueryTracker) Delete(insertIndex int) { tracker.getNextIndex <- insertIndex } -func (tracker ActiveQueryTracker) Insert(query string) int { - i, fileBytes := <-tracker.getNextIndex, tracker.mmapedFile - entry := newJSONEntry(query, tracker.logger) - start, end := i, i+entrySize - - copy(fileBytes[start:], entry) - copy(fileBytes[end-1:], ",") - - return i +func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) { + select { + case i := <-tracker.getNextIndex: + fileBytes := tracker.mmapedFile + entry := newJSONEntry(query, tracker.logger) + start, end := i, i+entrySize + + copy(fileBytes[start:], entry) + copy(fileBytes[end-1:], ",") + return i, nil + case <-ctx.Done(): + return 0, ctx.Err() + } } diff --git a/promql/query_logger_test.go b/promql/query_logger_test.go index 0285db85c..640e94541 100644 --- a/promql/query_logger_test.go +++ b/promql/query_logger_test.go @@ -14,6 +14,7 @@ package promql import ( + "context" "io/ioutil" "os" "regexp" @@ -51,7 +52,7 @@ func TestQueryLogging(t *testing.T) { start := 1 + i*entrySize end := start + entrySize - queryLogger.Insert(queries[i]) + queryLogger.Insert(context.Background(), queries[i]) have := string(fileAsBytes[start:end]) if !regexp.MustCompile(want[i]).MatchString(have) { @@ -77,16 +78,16 @@ func TestIndexReuse(t *testing.T) { } queryLogger.generateIndices(3) - queryLogger.Insert("TestQuery1") - queryLogger.Insert("TestQuery2") - queryLogger.Insert("TestQuery3") + queryLogger.Insert(context.Background(), "TestQuery1") + queryLogger.Insert(context.Background(), "TestQuery2") + queryLogger.Insert(context.Background(), "TestQuery3") queryLogger.Delete(1 + entrySize) queryLogger.Delete(1) newQuery2 := "ThisShouldBeInsertedAtIndex2" newQuery1 := "ThisShouldBeInsertedAtIndex1" - queryLogger.Insert(newQuery2) - queryLogger.Insert(newQuery1) + queryLogger.Insert(context.Background(), newQuery2) + queryLogger.Insert(context.Background(), newQuery1) want := []string{ `^{"query":"ThisShouldBeInsertedAtIndex1","timestamp_sec":\d+}\x00*,$`,