From 6cecb8794164200c2010483b8f22676994bcbe16 Mon Sep 17 00:00:00 2001 From: Soon-Ping Date: Tue, 4 Apr 2023 11:21:13 -0700 Subject: [PATCH] Generalized rule group iteration evaluation hook (#11885) Signed-off-by: Soon-Ping Phang --- rules/manager.go | 149 ++++++++++++++++++++++++------------------ rules/manager_test.go | 58 +++++++++++----- 2 files changed, 126 insertions(+), 81 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index f8dcf081f..108b6a77a 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -253,7 +253,8 @@ type Group struct { opts *ManagerOptions mtx sync.Mutex evaluationTime time.Duration - lastEvaluation time.Time + lastEvaluation time.Time // Wall-clock time of most recent evaluation. + lastEvalTimestamp time.Time // Time slot used for most recent evaluation. shouldRestore bool @@ -266,22 +267,27 @@ type Group struct { metrics *Metrics - ruleGroupPostProcessFunc RuleGroupPostProcessFunc + // Rule group evaluation iteration function, + // defaults to DefaultEvalIterationFunc. + evalIterationFunc GroupEvalIterationFunc } -// This function will be used before each rule group evaluation if not nil. -// Use this function type if the rule group post processing is needed. -type RuleGroupPostProcessFunc func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error +// GroupEvalIterationFunc is used to implement and extend rule group +// evaluation iteration logic. It is configured in Group.evalIterationFunc, +// and periodically invoked at each group evaluation interval to +// evaluate the rules in the group at that point in time. +// DefaultEvalIterationFunc is the default implementation. +type GroupEvalIterationFunc func(ctx context.Context, g *Group, evalTimestamp time.Time) type GroupOptions struct { - Name, File string - Interval time.Duration - Limit int - Rules []Rule - ShouldRestore bool - Opts *ManagerOptions - done chan struct{} - RuleGroupPostProcessFunc RuleGroupPostProcessFunc + Name, File string + Interval time.Duration + Limit int + Rules []Rule + ShouldRestore bool + Opts *ManagerOptions + done chan struct{} + EvalIterationFunc GroupEvalIterationFunc } // NewGroup makes a new Group with the given name, options, and rules. @@ -302,21 +308,26 @@ func NewGroup(o GroupOptions) *Group { metrics.GroupSamples.WithLabelValues(key) metrics.GroupInterval.WithLabelValues(key).Set(o.Interval.Seconds()) + evalIterationFunc := o.EvalIterationFunc + if evalIterationFunc == nil { + evalIterationFunc = DefaultEvalIterationFunc + } + return &Group{ - name: o.Name, - file: o.File, - interval: o.Interval, - limit: o.Limit, - rules: o.Rules, - shouldRestore: o.ShouldRestore, - opts: o.Opts, - seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan struct{}), - managerDone: o.done, - terminated: make(chan struct{}), - logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), - metrics: metrics, - ruleGroupPostProcessFunc: o.RuleGroupPostProcessFunc, + name: o.Name, + file: o.File, + interval: o.Interval, + limit: o.Limit, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), + metrics: metrics, + evalIterationFunc: evalIterationFunc, } } @@ -341,6 +352,8 @@ func (g *Group) Interval() time.Duration { return g.interval } // Limit returns the group's limit. func (g *Group) Limit() int { return g.limit } +func (g *Group) Logger() log.Logger { return g.logger } + func (g *Group) run(ctx context.Context) { defer close(g.terminated) @@ -359,18 +372,6 @@ func (g *Group) run(ctx context.Context) { }, }) - iter := func() { - g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc() - - start := time.Now() - g.Eval(ctx, evalTimestamp) - timeSinceStart := time.Since(start) - - g.metrics.IterationDuration.Observe(timeSinceStart.Seconds()) - g.setEvaluationTime(timeSinceStart) - g.setLastEvaluation(start) - } - // The assumption here is that since the ticker was started after having // waited for `evalTimestamp` to pass, the ticks will trigger soon // after each `evalTimestamp + N * g.interval` occurrence. @@ -400,7 +401,7 @@ func (g *Group) run(ctx context.Context) { }(time.Now()) }() - iter() + g.evalIterationFunc(ctx, g, evalTimestamp) if g.shouldRestore { // If we have to restore, we wait for another Eval to finish. // The reason behind this is, during first eval (or before it) @@ -416,7 +417,7 @@ func (g *Group) run(ctx context.Context) { g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) } evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) - iter() + g.evalIterationFunc(ctx, g, evalTimestamp) } g.RestoreForState(time.Now()) @@ -439,21 +440,29 @@ func (g *Group) run(ctx context.Context) { } evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) - useRuleGroupPostProcessFunc(g, evalTimestamp.Add(-(missed+1)*g.interval)) - - iter() + g.evalIterationFunc(ctx, g, evalTimestamp) } } } } -func useRuleGroupPostProcessFunc(g *Group, lastEvalTimestamp time.Time) { - if g.ruleGroupPostProcessFunc != nil { - err := g.ruleGroupPostProcessFunc(g, lastEvalTimestamp, g.logger) - if err != nil { - level.Warn(g.logger).Log("msg", "ruleGroupPostProcessFunc failed", "err", err) - } - } +// DefaultEvalIterationFunc is the default implementation of +// GroupEvalIterationFunc that is periodically invoked to evaluate the rules +// in a group at a given point in time and updates Group state and metrics +// accordingly. Custom GroupEvalIterationFunc implementations are recommended +// to invoke this function as well, to ensure correct Group state and metrics +// are maintained. +func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.Time) { + g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc() + + start := time.Now() + g.Eval(ctx, evalTimestamp) + timeSinceStart := time.Since(start) + + g.metrics.IterationDuration.Observe(timeSinceStart.Seconds()) + g.setEvaluationTime(timeSinceStart) + g.setLastEvaluation(start) + g.setLastEvalTimestamp(evalTimestamp) } func (g *Group) stop() { @@ -533,6 +542,20 @@ func (g *Group) setLastEvaluation(ts time.Time) { g.lastEvaluation = ts } +// GetLastEvalTimestamp returns the timestamp of the last evaluation. +func (g *Group) GetLastEvalTimestamp() time.Time { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.lastEvalTimestamp +} + +// setLastEvalTimestamp updates lastEvalTimestamp to the timestamp of the last evaluation. +func (g *Group) setLastEvalTimestamp(ts time.Time) { + g.mtx.Lock() + defer g.mtx.Unlock() + g.lastEvalTimestamp = ts +} + // EvalTimestamp returns the immediately preceding consistently slotted evaluation time. func (g *Group) EvalTimestamp(startTime int64) time.Time { var ( @@ -996,11 +1019,11 @@ func (m *Manager) Stop() { // Update the rule manager's state as the config requires. If // loading the new rules failed the old rule set is restored. -func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc) error { +func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc) error { m.mtx.Lock() defer m.mtx.Unlock() - groups, errs := m.LoadGroups(interval, externalLabels, externalURL, ruleGroupPostProcessFunc, files...) + groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) if errs != nil { for _, e := range errs { @@ -1085,7 +1108,7 @@ func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.Parse // LoadGroups reads groups from a list of files. func (m *Manager) LoadGroups( - interval time.Duration, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc, filenames ...string, + interval time.Duration, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc, filenames ...string, ) (map[string]*Group, []error) { groups := make(map[string]*Group) @@ -1133,15 +1156,15 @@ func (m *Manager) LoadGroups( } groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ - Name: rg.Name, - File: fn, - Interval: itv, - Limit: rg.Limit, - Rules: rules, - ShouldRestore: shouldRestore, - Opts: m.opts, - done: m.done, - RuleGroupPostProcessFunc: ruleGroupPostProcessFunc, + Name: rg.Name, + File: fn, + Interval: itv, + Limit: rg.Limit, + Rules: rules, + ShouldRestore: shouldRestore, + Opts: m.opts, + done: m.done, + EvalIterationFunc: groupEvalIterationFunc, }) } } diff --git a/rules/manager_test.go b/rules/manager_test.go index aed289c5b..1faf730be 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1237,7 +1237,7 @@ func TestRuleHealthUpdates(t *testing.T) { require.Equal(t, HealthBad, rules.Health()) } -func TestUpdateMissedEvalMetrics(t *testing.T) { +func TestRuleGroupEvalIterationFunc(t *testing.T) { suite, err := promql.NewTest(t, ` load 5m http_requests{instance="0"} 75 85 50 0 0 25 0 0 40 0 120 @@ -1254,26 +1254,40 @@ func TestUpdateMissedEvalMetrics(t *testing.T) { testValue := 1 - overrideFunc := func(g *Group, lastEvalTimestamp time.Time, log log.Logger) error { + evalIterationFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) { testValue = 2 - return nil + DefaultEvalIterationFunc(ctx, g, evalTimestamp) + testValue = 3 + } + + skipEvalIterationFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) { + testValue = 4 } type testInput struct { - overrideFunc func(g *Group, lastEvalTimestamp time.Time, logger log.Logger) error - expectedValue int + evalIterationFunc GroupEvalIterationFunc + expectedValue int + lastEvalTimestampIsZero bool } tests := []testInput{ - // testValue should still have value of 1 since overrideFunc is nil. + // testValue should still have value of 1 since the default iteration function will be called. { - overrideFunc: nil, - expectedValue: 1, + evalIterationFunc: nil, + expectedValue: 1, + lastEvalTimestampIsZero: false, }, - // testValue should be incremented to 2 since overrideFunc is called. + // testValue should be incremented to 3 since evalIterationFunc is called. { - overrideFunc: overrideFunc, - expectedValue: 2, + evalIterationFunc: evalIterationFunc, + expectedValue: 3, + lastEvalTimestampIsZero: false, + }, + // testValue should be incremented to 4 since skipEvalIterationFunc is called. + { + evalIterationFunc: skipEvalIterationFunc, + expectedValue: 4, + lastEvalTimestampIsZero: true, }, } @@ -1315,12 +1329,12 @@ func TestUpdateMissedEvalMetrics(t *testing.T) { } group := NewGroup(GroupOptions{ - Name: "default", - Interval: time.Second, - Rules: []Rule{rule}, - ShouldRestore: true, - Opts: opts, - RuleGroupPostProcessFunc: tst.overrideFunc, + Name: "default", + Interval: time.Second, + Rules: []Rule{rule}, + ShouldRestore: true, + Opts: opts, + EvalIterationFunc: tst.evalIterationFunc, }) go func() { @@ -1329,10 +1343,18 @@ func TestUpdateMissedEvalMetrics(t *testing.T) { time.Sleep(3 * time.Second) group.stop() + require.Equal(t, tst.expectedValue, testValue) + if tst.lastEvalTimestampIsZero { + require.Zero(t, group.GetLastEvalTimestamp()) + } else { + oneMinute, _ := time.ParseDuration("1m") + require.WithinDuration(t, time.Now(), group.GetLastEvalTimestamp(), oneMinute) + } } - for _, tst := range tests { + for i, tst := range tests { + t.Logf("case %d", i) testFunc(tst) } }