diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go
index f7244646e..b8b90ffbe 100644
--- a/cmd/prometheus/main.go
+++ b/cmd/prometheus/main.go
@@ -137,6 +137,7 @@ type flagConfig struct {
forGracePeriod model.Duration
outageTolerance model.Duration
resendDelay model.Duration
+ maxConcurrentEvals int64
web web.Options
scrape scrape.Options
tsdb tsdbOptions
@@ -411,6 +412,9 @@ func main() {
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").SetValue(&cfg.resendDelay)
+ serverOnlyFlag(a, "rules.max-concurrent-rule-evals", "Global concurrency limit for independent rules which can run concurrently.").
+ Default("4").Int64Var(&cfg.maxConcurrentEvals)
+
a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps)
@@ -749,17 +753,18 @@ func main() {
queryEngine = promql.NewEngine(opts)
ruleManager = rules.NewManager(&rules.ManagerOptions{
- Appendable: fanoutStorage,
- Queryable: localStorage,
- QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
- NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
- Context: ctxRule,
- ExternalURL: cfg.web.ExternalURL,
- Registerer: prometheus.DefaultRegisterer,
- Logger: log.With(logger, "component", "rule manager"),
- OutageTolerance: time.Duration(cfg.outageTolerance),
- ForGracePeriod: time.Duration(cfg.forGracePeriod),
- ResendDelay: time.Duration(cfg.resendDelay),
+ Appendable: fanoutStorage,
+ Queryable: localStorage,
+ QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
+ NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
+ Context: ctxRule,
+ ExternalURL: cfg.web.ExternalURL,
+ Registerer: prometheus.DefaultRegisterer,
+ Logger: log.With(logger, "component", "rule manager"),
+ OutageTolerance: time.Duration(cfg.outageTolerance),
+ ForGracePeriod: time.Duration(cfg.forGracePeriod),
+ ResendDelay: time.Duration(cfg.resendDelay),
+ MaxConcurrentEvals: cfg.maxConcurrentEvals,
})
}
diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md
index 747457de1..de3baa107 100644
--- a/docs/command-line/prometheus.md
+++ b/docs/command-line/prometheus.md
@@ -48,6 +48,7 @@ The Prometheus monitoring server
| --rules.alert.for-outage-tolerance
| Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
| --rules.alert.for-grace-period
| Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
| --rules.alert.resend-delay
| Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
+| --rules.max-concurrent-rule-evals
| Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` |
| --alertmanager.notification-queue-capacity
| The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
| --query.lookback-delta
| The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
| --query.timeout
| Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
diff --git a/rules/fixtures/rules_dependencies.yaml b/rules/fixtures/rules_dependencies.yaml
new file mode 100644
index 000000000..31d2c6176
--- /dev/null
+++ b/rules/fixtures/rules_dependencies.yaml
@@ -0,0 +1,7 @@
+groups:
+ - name: test
+ rules:
+ - record: job:http_requests:rate5m
+ expr: sum by (job)(rate(http_requests_total[5m]))
+ - record: HighRequestRate
+ expr: job:http_requests:rate5m > 100
diff --git a/rules/fixtures/rules_multiple.yaml b/rules/fixtures/rules_multiple.yaml
new file mode 100644
index 000000000..db57bede1
--- /dev/null
+++ b/rules/fixtures/rules_multiple.yaml
@@ -0,0 +1,14 @@
+groups:
+ - name: test
+ rules:
+ # independents
+ - record: job:http_requests:rate1m
+ expr: sum by (job)(rate(http_requests_total[1m]))
+ - record: job:http_requests:rate5m
+ expr: sum by (job)(rate(http_requests_total[5m]))
+
+ # dependents
+ - record: job:http_requests:rate15m
+ expr: sum by (job)(rate(http_requests_total[15m]))
+ - record: TooManyRequests
+ expr: job:http_requests:rate15m > 100
diff --git a/rules/fixtures/rules_multiple_groups.yaml b/rules/fixtures/rules_multiple_groups.yaml
new file mode 100644
index 000000000..87f31a6ca
--- /dev/null
+++ b/rules/fixtures/rules_multiple_groups.yaml
@@ -0,0 +1,28 @@
+groups:
+ - name: http
+ rules:
+ # independents
+ - record: job:http_requests:rate1m
+ expr: sum by (job)(rate(http_requests_total[1m]))
+ - record: job:http_requests:rate5m
+ expr: sum by (job)(rate(http_requests_total[5m]))
+
+ # dependents
+ - record: job:http_requests:rate15m
+ expr: sum by (job)(rate(http_requests_total[15m]))
+ - record: TooManyHTTPRequests
+ expr: job:http_requests:rate15m > 100
+
+ - name: grpc
+ rules:
+ # independents
+ - record: job:grpc_requests:rate1m
+ expr: sum by (job)(rate(grpc_requests_total[1m]))
+ - record: job:grpc_requests:rate5m
+ expr: sum by (job)(rate(grpc_requests_total[5m]))
+
+ # dependents
+ - record: job:grpc_requests:rate15m
+ expr: sum by (job)(rate(grpc_requests_total[15m]))
+ - record: TooManyGRPCRequests
+ expr: job:grpc_requests:rate15m > 100
diff --git a/rules/group.go b/rules/group.go
index 55673452e..c742820a8 100644
--- a/rules/group.go
+++ b/rules/group.go
@@ -21,8 +21,11 @@ import (
"sync"
"time"
+ "go.uber.org/atomic"
"golang.org/x/exp/slices"
+ "github.com/prometheus/prometheus/promql/parser"
+
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
@@ -68,6 +71,7 @@ type Group struct {
// Rule group evaluation iteration function,
// defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc
+ dependencyMap dependencyMap
}
// GroupEvalIterationFunc is used to implement and extend rule group
@@ -126,6 +130,7 @@ func NewGroup(o GroupOptions) *Group {
logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
+ dependencyMap: buildDependencyMap(o.Rules),
}
}
@@ -421,7 +426,7 @@ func (g *Group) CopyState(from *Group) {
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
- var samplesTotal float64
+ var samplesTotal atomic.Float64
for i, rule := range g.rules {
select {
case <-g.done:
@@ -429,7 +434,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
default:
}
- func(i int, rule Rule) {
+ eval := func(i int, rule Rule, async bool) {
+ if async {
+ defer func() {
+ g.opts.ConcurrentEvalSema.Release(1)
+ }()
+ }
logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i)
ctx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name()))
@@ -465,7 +475,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
rule.SetHealth(HealthGood)
rule.SetLastError(nil)
- samplesTotal += float64(len(vector))
+ samplesTotal.Add(float64(len(vector)))
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
@@ -554,10 +564,19 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
}
}
- }(i, rule)
+ }
+
+ // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
+ // Try run concurrently if there are slots available.
+ if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalSema != nil && g.opts.ConcurrentEvalSema.TryAcquire(1) {
+ go eval(i, rule, true)
+ } else {
+ eval(i, rule, false)
+ }
}
+
if g.metrics != nil {
- g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
+ g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
}
g.cleanupStaleSeries(ctx, ts)
}
@@ -866,3 +885,109 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics {
return m
}
+
+// dependencyMap is a data-structure which contains the relationships between rules within a group.
+// It is used to describe the dependency associations between recording rules in a group whereby one rule uses the
+// output metric produced by another recording rule in its expression (i.e. as its "input").
+type dependencyMap map[Rule][]Rule
+
+// dependents returns all rules which use the output of the given rule as one of their inputs.
+func (m dependencyMap) dependents(r Rule) []Rule {
+ if len(m) == 0 {
+ return nil
+ }
+
+ return m[r]
+}
+
+// dependencies returns all the rules on which the given rule is dependent for input.
+func (m dependencyMap) dependencies(r Rule) []Rule {
+ if len(m) == 0 {
+ return nil
+ }
+
+ var parents []Rule
+ for parent, children := range m {
+ if len(children) == 0 {
+ continue
+ }
+
+ for _, child := range children {
+ if child == r {
+ parents = append(parents, parent)
+ }
+ }
+ }
+
+ return parents
+}
+
+func (m dependencyMap) isIndependent(r Rule) bool {
+ if m == nil {
+ return false
+ }
+
+ return len(m.dependents(r)) == 0 && len(m.dependencies(r)) == 0
+}
+
+// buildDependencyMap builds a data-structure which contains the relationships between rules within a group.
+func buildDependencyMap(rules []Rule) dependencyMap {
+ dependencies := make(dependencyMap)
+
+ if len(rules) <= 1 {
+ // No relationships if group has 1 or fewer rules.
+ return nil
+ }
+
+ inputs := make(map[string][]Rule, len(rules))
+ outputs := make(map[string][]Rule, len(rules))
+
+ var indeterminate bool
+
+ for _, rule := range rules {
+ rule := rule
+
+ name := rule.Name()
+ outputs[name] = append(outputs[name], rule)
+
+ parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error {
+ if n, ok := node.(*parser.VectorSelector); ok {
+ // A wildcard metric expression means we cannot reliably determine if this rule depends on any other,
+ // which means we cannot safely run any rules concurrently.
+ if n.Name == "" && len(n.LabelMatchers) > 0 {
+ indeterminate = true
+ return nil
+ }
+
+ // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour
+ // if they run concurrently.
+ if n.Name == alertMetricName || n.Name == alertForStateMetricName {
+ indeterminate = true
+ return nil
+ }
+
+ inputs[n.Name] = append(inputs[n.Name], rule)
+ }
+ return nil
+ })
+ }
+
+ if indeterminate {
+ return nil
+ }
+
+ if len(inputs) == 0 || len(outputs) == 0 {
+ // No relationships can be inferred.
+ return nil
+ }
+
+ for output, outRules := range outputs {
+ for _, outRule := range outRules {
+ if rs, found := inputs[output]; found && len(rs) > 0 {
+ dependencies[outRule] = append(dependencies[outRule], rs...)
+ }
+ }
+ }
+
+ return dependencies
+}
diff --git a/rules/manager.go b/rules/manager.go
index ed4d42eba..e9fa94e9e 100644
--- a/rules/manager.go
+++ b/rules/manager.go
@@ -26,6 +26,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
+ "golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
@@ -103,18 +104,20 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
- ExternalURL *url.URL
- QueryFunc QueryFunc
- NotifyFunc NotifyFunc
- Context context.Context
- Appendable storage.Appendable
- Queryable storage.Queryable
- Logger log.Logger
- Registerer prometheus.Registerer
- OutageTolerance time.Duration
- ForGracePeriod time.Duration
- ResendDelay time.Duration
- GroupLoader GroupLoader
+ ExternalURL *url.URL
+ QueryFunc QueryFunc
+ NotifyFunc NotifyFunc
+ Context context.Context
+ Appendable storage.Appendable
+ Queryable storage.Queryable
+ Logger log.Logger
+ Registerer prometheus.Registerer
+ OutageTolerance time.Duration
+ ForGracePeriod time.Duration
+ ResendDelay time.Duration
+ MaxConcurrentEvals int64
+ ConcurrentEvalSema *semaphore.Weighted
+ GroupLoader GroupLoader
Metrics *Metrics
}
@@ -130,6 +133,8 @@ func NewManager(o *ManagerOptions) *Manager {
o.GroupLoader = FileLoader{}
}
+ o.ConcurrentEvalSema = semaphore.NewWeighted(o.MaxConcurrentEvals)
+
m := &Manager{
groups: map[string]*Group{},
opts: o,
diff --git a/rules/manager_test.go b/rules/manager_test.go
index 3feae51de..e3e156038 100644
--- a/rules/manager_test.go
+++ b/rules/manager_test.go
@@ -19,15 +19,18 @@ import (
"math"
"os"
"sort"
+ "sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
+ "golang.org/x/sync/semaphore"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/model/labels"
@@ -1402,3 +1405,382 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
require.Equal(t, expHist, fh)
require.Equal(t, chunkenc.ValNone, it.Next())
}
+
+func TestDependencyMap(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ expr, err = parser.ParseExpr("user:requests:rate1m <= 0")
+ require.NoError(t, err)
+ rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger())
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule, rule2},
+ Opts: opts,
+ })
+
+ require.Equal(t, []Rule{rule2}, group.dependencyMap.dependents(rule))
+ require.Len(t, group.dependencyMap.dependencies(rule), 0)
+ require.False(t, group.dependencyMap.isIndependent(rule))
+
+ require.Len(t, group.dependencyMap.dependents(rule2), 0)
+ require.Equal(t, []Rule{rule}, group.dependencyMap.dependencies(rule2))
+ require.False(t, group.dependencyMap.isIndependent(rule2))
+}
+
+func TestNoDependency(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule},
+ Opts: opts,
+ })
+
+ // A group with only one rule cannot have dependencies.
+ require.False(t, group.dependencyMap.isIndependent(rule))
+}
+
+func TestNoMetricSelector(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ expr, err = parser.ParseExpr(`count({user="bob"})`)
+ require.NoError(t, err)
+ rule2 := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule, rule2},
+ Opts: opts,
+ })
+
+ // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore
+ // all rules are not considered independent.
+ require.False(t, group.dependencyMap.isIndependent(rule))
+ require.False(t, group.dependencyMap.isIndependent(rule2))
+}
+
+func TestDependentRulesWithNonMetricExpression(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ expr, err = parser.ParseExpr("user:requests:rate1m <= 0")
+ require.NoError(t, err)
+ rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger())
+
+ expr, err = parser.ParseExpr("3")
+ require.NoError(t, err)
+ rule3 := NewRecordingRule("three", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule, rule2, rule3},
+ Opts: opts,
+ })
+
+ require.False(t, group.dependencyMap.isIndependent(rule))
+ require.False(t, group.dependencyMap.isIndependent(rule2))
+ require.True(t, group.dependencyMap.isIndependent(rule3))
+}
+
+func TestRulesDependentOnMetaMetrics(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ // This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by
+ // the rule engine, and is therefore not independent.
+ expr, err := parser.ParseExpr("count(ALERTS)")
+ require.NoError(t, err)
+ rule := NewRecordingRule("alert_count", expr, labels.Labels{})
+
+ // Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules).
+ expr, err = parser.ParseExpr("1")
+ require.NoError(t, err)
+ rule2 := NewRecordingRule("one", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule, rule2},
+ Opts: opts,
+ })
+
+ require.False(t, group.dependencyMap.isIndependent(rule))
+}
+
+func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
+ files := []string{"fixtures/rules.yaml"}
+ ruleManager := NewManager(&ManagerOptions{
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ })
+
+ ruleManager.start()
+ defer ruleManager.Stop()
+
+ err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
+ require.NoError(t, err)
+ require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups")
+
+ orig := make(map[string]dependencyMap, len(ruleManager.groups))
+ for _, g := range ruleManager.groups {
+ // No dependency map is expected because there is only one rule in the group.
+ require.Empty(t, g.dependencyMap)
+ orig[g.Name()] = g.dependencyMap
+ }
+
+ // Update once without changing groups.
+ err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
+ require.NoError(t, err)
+ for h, g := range ruleManager.groups {
+ // Dependency maps are the same because of no updates.
+ require.Equal(t, orig[h], g.dependencyMap)
+ }
+
+ // Groups will be recreated when updated.
+ files[0] = "fixtures/rules_dependencies.yaml"
+ err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
+ require.NoError(t, err)
+
+ for h, g := range ruleManager.groups {
+ // Dependency maps must change because the groups would've been updated.
+ require.NotEqual(t, orig[h], g.dependencyMap)
+ // We expect there to be some dependencies since the new rule group contains a dependency.
+ require.Greater(t, len(g.dependencyMap), 0)
+ }
+}
+
+func TestAsyncRuleEvaluation(t *testing.T) {
+ storage := teststorage.New(t)
+ t.Cleanup(func() { storage.Close() })
+
+ const artificialDelay = time.Second
+
+ var (
+ inflightQueries atomic.Int32
+ maxInflight atomic.Int32
+ )
+
+ files := []string{"fixtures/rules_multiple.yaml"}
+ ruleManager := NewManager(&ManagerOptions{
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ Appendable: storage,
+ QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
+ inflightQueries.Add(1)
+ defer func() {
+ inflightQueries.Add(-1)
+ }()
+
+ // Artificially delay all query executions to highly concurrent execution improvement.
+ time.Sleep(artificialDelay)
+
+ // return a stub sample
+ return promql.Vector{
+ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
+ }, nil
+ },
+ })
+
+ // Evaluate groups manually to show the impact of async rule evaluations.
+ groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
+ require.Empty(t, errs)
+ require.Len(t, groups, 1)
+
+ expectedRules := 4
+
+ t.Run("synchronous evaluation with independent rules", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ for _, group := range groups {
+ require.Len(t, group.rules, expectedRules)
+
+ start := time.Now()
+
+ // Never expect more than 1 inflight query at a time.
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ highWatermark := maxInflight.Load()
+ current := inflightQueries.Load()
+ if current > highWatermark {
+ maxInflight.Store(current)
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+ }
+ }()
+
+ group.Eval(ctx, start)
+
+ require.EqualValues(t, 1, maxInflight.Load())
+ // Each rule should take at least 1 second to execute sequentially.
+ require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds())
+ // Each rule produces one vector.
+ require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples))
+ }
+
+ cancel()
+ })
+
+ t.Run("asynchronous evaluation with independent rules", func(t *testing.T) {
+ // Reset.
+ inflightQueries.Store(0)
+ maxInflight.Store(0)
+ ctx, cancel := context.WithCancel(context.Background())
+
+ for _, group := range groups {
+ // Allow up to 2 concurrent rule evaluations.
+ group.opts.ConcurrentEvalSema = semaphore.NewWeighted(2)
+ require.Len(t, group.rules, expectedRules)
+
+ start := time.Now()
+
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ highWatermark := maxInflight.Load()
+ current := inflightQueries.Load()
+ if current > highWatermark {
+ maxInflight.Store(current)
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+ }
+ }()
+
+ group.Eval(ctx, start)
+
+ require.EqualValues(t, 3, maxInflight.Load())
+ // Some rules should execute concurrently so should complete quicker.
+ require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds())
+ // Each rule produces one vector.
+ require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples))
+ }
+
+ cancel()
+ })
+}
+
+func TestBoundedRuleEvalConcurrency(t *testing.T) {
+ storage := teststorage.New(t)
+ t.Cleanup(func() { storage.Close() })
+
+ const artificialDelay = time.Millisecond * 100
+
+ var (
+ inflightQueries atomic.Int32
+ maxInflight atomic.Int32
+ maxConcurrency int64 = 3
+ groupCount = 2
+ )
+
+ files := []string{"fixtures/rules_multiple_groups.yaml"}
+ ruleManager := NewManager(&ManagerOptions{
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ Appendable: storage,
+ MaxConcurrentEvals: maxConcurrency,
+ QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
+ inflightQueries.Add(1)
+ defer func() {
+ inflightQueries.Add(-1)
+ }()
+
+ // Artificially delay all query executions to highly concurrent execution improvement.
+ time.Sleep(artificialDelay)
+
+ // return a stub sample
+ return promql.Vector{
+ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
+ }, nil
+ },
+ })
+
+ // Evaluate groups manually to show the impact of async rule evaluations.
+ groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
+ require.Empty(t, errs)
+ require.Len(t, groups, groupCount)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ highWatermark := maxInflight.Load()
+ current := inflightQueries.Load()
+ if current > highWatermark {
+ maxInflight.Store(current)
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+ }
+ }()
+
+ // Evaluate groups concurrently (like they normally do).
+ var wg sync.WaitGroup
+ for _, group := range groups {
+ group := group
+
+ wg.Add(1)
+ go func() {
+ group.Eval(ctx, time.Now())
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+ cancel()
+
+ // Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations.
+ require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount))
+}