From 135cc30063695acf8824beed9b7c3e1a208dac68 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Wed, 12 Feb 2020 16:22:18 +0100 Subject: [PATCH] rules: Make deleted rule series as stale after a reload (#6745) * rules: Make deleted rule series as stale after a reload Signed-off-by: Julien Pivotto --- rules/fixtures/rules2_copy.yaml | 5 + rules/manager.go | 166 ++++++++++++++++++++--------- rules/manager_test.go | 181 +++++++++++++++++++++++++++++++- web/api/v1/api_test.go | 9 +- 4 files changed, 305 insertions(+), 56 deletions(-) create mode 100644 rules/fixtures/rules2_copy.yaml diff --git a/rules/fixtures/rules2_copy.yaml b/rules/fixtures/rules2_copy.yaml new file mode 100644 index 000000000..dd74b6511 --- /dev/null +++ b/rules/fixtures/rules2_copy.yaml @@ -0,0 +1,5 @@ +groups: + - name: test_2 copy + rules: + - record: test_2 + expr: vector(2) diff --git a/rules/manager.go b/rules/manager.go index 65409a732..9ae3b69b7 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -231,37 +231,48 @@ type Group struct { shouldRestore bool - done chan struct{} - terminated chan struct{} + done chan bool + terminated chan struct{} + managerDone chan struct{} logger log.Logger metrics *Metrics } +type GroupOptions struct { + Name, File string + Interval time.Duration + Rules []Rule + ShouldRestore bool + Opts *ManagerOptions + done chan struct{} +} + // NewGroup makes a new Group with the given name, options, and rules. -func NewGroup(name, file string, interval time.Duration, rules []Rule, shouldRestore bool, opts *ManagerOptions) *Group { - metrics := opts.Metrics +func NewGroup(o GroupOptions) *Group { + metrics := o.Opts.Metrics if metrics == nil { - metrics = NewGroupMetrics(opts.Registerer) + metrics = NewGroupMetrics(o.Opts.Registerer) } - metrics.groupLastEvalTime.WithLabelValues(groupKey(file, name)) - metrics.groupLastDuration.WithLabelValues(groupKey(file, name)) - metrics.groupRules.WithLabelValues(groupKey(file, name)).Set(float64(len(rules))) - metrics.groupInterval.WithLabelValues(groupKey(file, name)).Set(interval.Seconds()) + metrics.groupLastEvalTime.WithLabelValues(groupKey(o.File, o.Name)) + metrics.groupLastDuration.WithLabelValues(groupKey(o.File, o.Name)) + metrics.groupRules.WithLabelValues(groupKey(o.File, o.Name)).Set(float64(len(o.Rules))) + metrics.groupInterval.WithLabelValues(groupKey(o.File, o.Name)).Set(o.Interval.Seconds()) return &Group{ - name: name, - file: file, - interval: interval, - rules: rules, - shouldRestore: shouldRestore, - opts: opts, - seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)), - done: make(chan struct{}), + name: o.Name, + file: o.File, + interval: o.Interval, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan bool), + managerDone: o.done, terminated: make(chan struct{}), - logger: log.With(opts.Logger, "group", name), + logger: log.With(o.Opts.Logger, "group", o.Name), metrics: metrics, } } @@ -314,6 +325,29 @@ func (g *Group) run(ctx context.Context) { tick := time.NewTicker(g.interval) defer tick.Stop() + makeStale := func(s bool) { + if !s { + return + } + go func(now time.Time) { + for _, rule := range g.seriesInPreviousEval { + for _, r := range rule { + g.staleSeries = append(g.staleSeries, r) + } + } + // That can be garbage collected at this point. + g.seriesInPreviousEval = nil + // Wait for 2 intervals to give the opportunity to renamed rules + // to insert new series in the tsdb. At this point if there is a + // renamed rule, it should already be started. + select { + case <-g.managerDone: + case <-time.After(2 * g.interval): + g.cleanupStaleSeries(now) + } + }(time.Now()) + } + iter() if g.shouldRestore { // If we have to restore, we wait for another Eval to finish. @@ -321,7 +355,8 @@ func (g *Group) run(ctx context.Context) { // we might not have enough data scraped, and recording rules would not // have updated the latest values, on which some alerts might depend. select { - case <-g.done: + case stale := <-g.done: + makeStale(stale) return case <-tick.C: missed := (time.Since(evalTimestamp) / g.interval) - 1 @@ -339,11 +374,13 @@ func (g *Group) run(ctx context.Context) { for { select { - case <-g.done: + case stale := <-g.done: + makeStale(stale) return default: select { - case <-g.done: + case stale := <-g.done: + makeStale(stale) return case <-tick.C: missed := (time.Since(evalTimestamp) / g.interval) - 1 @@ -358,6 +395,11 @@ func (g *Group) run(ctx context.Context) { } } +func (g *Group) stopAndMakeStale() { + g.done <- true + <-g.terminated +} + func (g *Group) stop() { close(g.done) <-g.terminated @@ -596,31 +638,35 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } }(i, rule) } + g.cleanupStaleSeries(ts) +} - if len(g.staleSeries) != 0 { - app, err := g.opts.Appendable.Appender() - if err != nil { - level.Warn(g.logger).Log("msg", "creating appender failed", "err", err) - return - } - for _, s := range g.staleSeries { - // Rule that produced series no longer configured, mark it stale. - _, err = app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { - case nil: - case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: - // Do not count these in logging, as this is expected if series - // is exposed from a different rule. - default: - level.Warn(g.logger).Log("msg", "adding stale sample for previous configuration failed", "sample", s, "err", err) - } - } - if err := app.Commit(); err != nil { - level.Warn(g.logger).Log("msg", "stale sample appending for previous configuration failed", "err", err) - } else { - g.staleSeries = nil +func (g *Group) cleanupStaleSeries(ts time.Time) { + if len(g.staleSeries) == 0 { + return + } + app, err := g.opts.Appendable.Appender() + if err != nil { + level.Warn(g.logger).Log("msg", "creating appender failed", "err", err) + return + } + for _, s := range g.staleSeries { + // Rule that produced series no longer configured, mark it stale. + _, err = app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) + switch err { + case nil: + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + level.Warn(g.logger).Log("msg", "adding stale sample for previous configuration failed", "sample", s, "err", err) } } + if err := app.Commit(); err != nil { + level.Warn(g.logger).Log("msg", "stale sample appending for previous configuration failed", "err", err) + } else { + g.staleSeries = nil + } } // RestoreForState restores the 'for' state of the alerts @@ -784,6 +830,7 @@ type Manager struct { groups map[string]*Group mtx sync.RWMutex block chan struct{} + done chan struct{} restored bool logger log.Logger @@ -825,6 +872,7 @@ func NewManager(o *ManagerOptions) *Manager { groups: map[string]*Group{}, opts: o, block: make(chan struct{}), + done: make(chan struct{}), logger: o.Logger, } @@ -848,6 +896,10 @@ func (m *Manager) Stop() { eg.stop() } + // Shut down the groups waiting multiple evaluation intervals to write + // staleness markers. + close(m.done) + level.Info(m.logger).Log("msg", "Rule manager stopped") } @@ -899,14 +951,18 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels } // Stop remaining old groups. + wg.Add(len(m.groups)) for n, oldg := range m.groups { - oldg.stop() - if m := oldg.metrics; m != nil { - m.groupInterval.DeleteLabelValues(n) - m.groupLastEvalTime.DeleteLabelValues(n) - m.groupLastDuration.DeleteLabelValues(n) - m.groupRules.DeleteLabelValues(n) - } + go func(n string, g *Group) { + g.stopAndMakeStale() + if m := g.metrics; m != nil { + m.groupInterval.DeleteLabelValues(n) + m.groupLastEvalTime.DeleteLabelValues(n) + m.groupLastDuration.DeleteLabelValues(n) + m.groupRules.DeleteLabelValues(n) + } + wg.Done() + }(n, oldg) } wg.Wait() @@ -962,7 +1018,15 @@ func (m *Manager) LoadGroups( )) } - groups[groupKey(fn, rg.Name)] = NewGroup(rg.Name, fn, itv, rules, shouldRestore, m.opts) + groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{ + Name: rg.Name, + File: fn, + Interval: itv, + Rules: rules, + ShouldRestore: shouldRestore, + Opts: m.opts, + done: m.done, + }) } } diff --git a/rules/manager_test.go b/rules/manager_test.go index 1f62009e0..61b6cf570 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -376,7 +376,13 @@ func TestForStateRestore(t *testing.T) { nil, nil, true, nil, ) - group := NewGroup("default", "", time.Second, []Rule{rule}, true, opts) + group := NewGroup(GroupOptions{ + Name: "default", + Interval: time.Second, + Rules: []Rule{rule}, + ShouldRestore: true, + Opts: opts, + }) groups := make(map[string]*Group) groups["default;"] = group @@ -435,7 +441,13 @@ func TestForStateRestore(t *testing.T) { labels.FromStrings("severity", "critical"), nil, nil, false, nil, ) - newGroup := NewGroup("default", "", time.Second, []Rule{newRule}, true, opts) + newGroup := NewGroup(GroupOptions{ + Name: "default", + Interval: time.Second, + Rules: []Rule{newRule}, + ShouldRestore: true, + Opts: opts, + }) newGroups := make(map[string]*Group) newGroups["default;"] = newGroup @@ -519,7 +531,13 @@ func TestStaleness(t *testing.T) { expr, err := promql.ParseExpr("a + 1") testutil.Ok(t, err) rule := NewRecordingRule("a_plus_one", expr, labels.Labels{}) - group := NewGroup("default", "", time.Second, []Rule{rule}, true, opts) + group := NewGroup(GroupOptions{ + Name: "default", + Interval: time.Second, + Rules: []Rule{rule}, + ShouldRestore: true, + Opts: opts, + }) // A time series that has two samples and then goes stale. app, _ := storage.Appender() @@ -842,7 +860,13 @@ func TestNotify(t *testing.T) { expr, err := promql.ParseExpr("a > 1") testutil.Ok(t, err) rule := NewAlertingRule("aTooHigh", expr, 0, labels.Labels{}, labels.Labels{}, nil, true, log.NewNopLogger()) - group := NewGroup("alert", "", time.Second, []Rule{rule}, true, opts) + group := NewGroup(GroupOptions{ + Name: "alert", + Interval: time.Second, + Rules: []Rule{rule}, + ShouldRestore: true, + Opts: opts, + }) app, _ := storage.Appender() app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) @@ -948,3 +972,152 @@ func TestMetricsUpdate(t *testing.T) { testutil.Equals(t, c.metrics, countMetrics(), "test %d: invalid count of metrics", i) } } + +func TestGroupStalenessOnRemoval(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + files := []string{"fixtures/rules2.yaml"} + sameFiles := []string{"fixtures/rules2_copy.yaml"} + + storage := teststorage.New(t) + defer storage.Close() + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(opts) + ruleManager := NewManager(&ManagerOptions{ + Appendable: storage, + TSDB: storage, + QueryFunc: EngineQueryFunc(engine, storage), + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + var stopped bool + ruleManager.Run() + defer func() { + if !stopped { + ruleManager.Stop() + } + }() + + cases := []struct { + files []string + staleNaN int + }{ + { + files: files, + staleNaN: 0, + }, + { + // When we remove the files, it should produce a staleness marker. + files: files[:0], + staleNaN: 1, + }, + { + // Rules that produce the same metrics but in a different file + // should not produce staleness marker. + files: sameFiles, + staleNaN: 0, + }, + { + // Staleness marker should be present as we don't have any rules + // loaded anymore. + files: files[:0], + staleNaN: 1, + }, + { + // Add rules back so we have rules loaded when we stop the manager + // and check for the absence of staleness markers. + files: sameFiles, + staleNaN: 0, + }, + } + + var totalStaleNaN int + for i, c := range cases { + err := ruleManager.Update(time.Second, c.files, nil) + testutil.Ok(t, err) + time.Sleep(3 * time.Second) + totalStaleNaN += c.staleNaN + testutil.Equals(t, totalStaleNaN, countStaleNaN(t, storage), "test %d/%q: invalid count of staleness markers", i, c.files) + } + ruleManager.Stop() + stopped = true + testutil.Equals(t, totalStaleNaN, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine") +} + +func TestMetricsStalenessOnManagerShutdown(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + files := []string{"fixtures/rules2.yaml"} + + storage := teststorage.New(t) + defer storage.Close() + opts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := promql.NewEngine(opts) + ruleManager := NewManager(&ManagerOptions{ + Appendable: storage, + TSDB: storage, + QueryFunc: EngineQueryFunc(engine, storage), + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + var stopped bool + ruleManager.Run() + defer func() { + if !stopped { + ruleManager.Stop() + } + }() + + err := ruleManager.Update(2*time.Second, files, nil) + time.Sleep(4 * time.Second) + testutil.Ok(t, err) + start := time.Now() + err = ruleManager.Update(3*time.Second, files[:0], nil) + testutil.Ok(t, err) + ruleManager.Stop() + stopped = true + testutil.Assert(t, time.Since(start) < 1*time.Second, "rule manager does not stop early") + time.Sleep(5 * time.Second) + testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine") +} + +func countStaleNaN(t *testing.T, storage storage.Storage) int { + var c int + querier, err := storage.Querier(context.Background(), 0, time.Now().Unix()*1000) + testutil.Ok(t, err) + defer querier.Close() + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2") + testutil.Ok(t, err) + + set, _, err := querier.Select(nil, matcher) + testutil.Ok(t, err) + + samples, err := readSeriesSet(set) + testutil.Ok(t, err) + + metric := labels.FromStrings(model.MetricNameLabel, "test_2").String() + metricSample, ok := samples[metric] + + testutil.Assert(t, ok, "Series %s not returned.", metric) + for _, s := range metricSample { + if value.IsStaleNaN(s.V) { + c++ + } + } + return c +} diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index bc4d71058..836daab1a 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -258,7 +258,14 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group { recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{}) r = append(r, recordingRule) - group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts) + group := rules.NewGroup(rules.GroupOptions{ + Name: "grp", + File: "/path/to/file", + Interval: time.Second, + Rules: r, + ShouldRestore: false, + Opts: opts, + }) return []*rules.Group{group} }