Browse Source

Rules: Refactor concurrency controller interface (#14491)

* Rules: Refactor concurrency controller interface

Even though the main purpose of this refactor is to modify the interface of the concurrency controller to accept a Context. I did two drive-by modifications that I think are sensible:

1. I have moved the check for dependencies on rules to the controller itself - this aligns with how the controller should behave as it is a deciding factor on wether we should run concurrently or not.
2. I cleaned up some unused methods from the days of the old interface before #13527 changed it.

Signed-off-by: gotjosh <josue.abreu@gmail.com>
---------

Signed-off-by: gotjosh <josue.abreu@gmail.com>
pull/14461/head
gotjosh 4 months ago committed by GitHub
parent
commit
465891cc56
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      rules/group.go
  2. 56
      rules/manager.go

10
rules/group.go

@ -621,14 +621,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
}
// If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
// Try run concurrently if there are slots available.
if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow() {
if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) {
wg.Add(1)
go eval(i, rule, func() {
wg.Done()
ctrl.Done()
ctrl.Done(ctx)
})
} else {
eval(i, rule, nil)
@ -1094,7 +1092,3 @@ func buildDependencyMap(rules []Rule) dependencyMap {
return dependencies
}
func isRuleEligibleForConcurrentExecution(rule Rule) bool {
return rule.NoDependentRules() && rule.NoDependencyRules()
}

56
rules/manager.go

@ -457,67 +457,47 @@ func (c ruleDependencyController) AnalyseRules(rules []Rule) {
// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
// server with additional query load. Concurrency is controlled globally, not on a per-group basis.
type RuleConcurrencyController interface {
// Allow determines whether any concurrent evaluation slots are available.
// If Allow() returns true, then Done() must be called to release the acquired slot.
Allow() bool
// Allow determines if the given rule is allowed to be evaluated concurrently.
// If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done.
// It is important that both *Group and Rule are not retained and only be used for the duration of the call.
Allow(ctx context.Context, group *Group, rule Rule) bool
// Done releases a concurrent evaluation slot.
Done()
Done(ctx context.Context)
}
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
type concurrentRuleEvalController struct {
sema *semaphore.Weighted
depMapsMu sync.Mutex
depMaps map[*Group]dependencyMap
sema *semaphore.Weighted
}
func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController {
return &concurrentRuleEvalController{
sema: semaphore.NewWeighted(maxConcurrency),
depMaps: map[*Group]dependencyMap{},
sema: semaphore.NewWeighted(maxConcurrency),
}
}
func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool {
c.depMapsMu.Lock()
defer c.depMapsMu.Unlock()
depMap, found := c.depMaps[g]
if !found {
depMap = buildDependencyMap(g.rules)
c.depMaps[g] = depMap
func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool {
// To allow a rule to be executed concurrently, we need 3 conditions:
// 1. The rule must not have any rules that depend on it.
// 2. The rule itself must not depend on any other rules.
// 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot.
if rule.NoDependentRules() && rule.NoDependencyRules() {
return c.sema.TryAcquire(1)
}
return depMap.isIndependent(r)
}
func (c *concurrentRuleEvalController) Allow() bool {
return c.sema.TryAcquire(1)
return false
}
func (c *concurrentRuleEvalController) Done() {
func (c *concurrentRuleEvalController) Done(_ context.Context) {
c.sema.Release(1)
}
func (c *concurrentRuleEvalController) Invalidate() {
c.depMapsMu.Lock()
defer c.depMapsMu.Unlock()
// Clear out the memoized dependency maps because some or all groups may have been updated.
c.depMaps = map[*Group]dependencyMap{}
}
// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
type sequentialRuleEvalController struct{}
func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool {
return false
}
func (c sequentialRuleEvalController) Allow() bool {
func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) bool {
return false
}
func (c sequentialRuleEvalController) Done() {}
func (c sequentialRuleEvalController) Invalidate() {}
func (c sequentialRuleEvalController) Done(_ context.Context) {}

Loading…
Cancel
Save