From 62461379b75bba14a8fda503aa6de823c23e9c23 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 24 Nov 2017 08:59:05 +0100 Subject: [PATCH] rules: decouple notifier packages The dependency on the notifier packages caused a transitive dependency on discovery and with that all client libraries our service discovery uses. --- cmd/prometheus/main.go | 35 +++++++++++++++++++++++++++-- rules/alerting.go | 6 ++++- rules/manager.go | 51 +++++++++--------------------------------- rules/manager_test.go | 17 +++++++------- 4 files changed, 57 insertions(+), 52 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 01bf1710f..fc7951e5f 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -50,6 +50,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/tsdb" + "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/web" ) @@ -236,8 +237,8 @@ func main() { ruleManager := rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, - Notifier: notifier, - Query: rules.EngineQueryFunc(queryEngine), + QueryFunc: rules.EngineQueryFunc(queryEngine), + NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), Context: ctx, ExternalURL: cfg.web.ExternalURL, Logger: log.With(logger, "component", "rule manager"), @@ -552,3 +553,33 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) { return eu, nil } + +// sendAlerts implements a the rules.NotifyFunc for a Notifier. +// It filters any non-firing alerts from the input. +func sendAlerts(n *notifier.Notifier, externalURL string) rules.NotifyFunc { + return func(ctx context.Context, expr string, alerts ...*rules.Alert) error { + var res []*notifier.Alert + + for _, alert := range alerts { + // Only send actually firing alerts. + if alert.State == rules.StatePending { + continue + } + a := ¬ifier.Alert{ + StartsAt: alert.FiredAt, + Labels: alert.Labels, + Annotations: alert.Annotations, + GeneratorURL: externalURL + strutil.TableLinkForExpression(expr), + } + if !alert.ResolvedAt.IsZero() { + a.EndsAt = alert.ResolvedAt + } + res = append(res, a) + } + + if len(alerts) > 0 { + n.Send(res...) + } + return nil + } +} diff --git a/rules/alerting.go b/rules/alerting.go index 3b8b61e9f..0a2fe7448 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -83,7 +83,9 @@ type Alert struct { Value float64 // The interval during which the condition of this alert held true. // ResolvedAt will be 0 to indicate a still active alert. - ActiveAt, ResolvedAt time.Time + ActiveAt time.Time + FiredAt time.Time + ResolvedAt time.Time } // An AlertingRule generates alerts from its vector expression. @@ -264,12 +266,14 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, if a.State != StateInactive { a.State = StateInactive a.ResolvedAt = ts + a.FiredAt = time.Time{} } continue } if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { a.State = StateFiring + a.FiredAt = ts } vec = append(vec, r.sample(a, ts)) diff --git a/rules/manager.go b/rules/manager.go index 742d0eb40..778bfe0c7 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -29,14 +29,12 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/util/strutil" ) // Constants for instrumentation. @@ -192,7 +190,7 @@ func (g *Group) File() string { return g.file } // Rules returns the group's rules. func (g *Group) Rules() []Rule { return g.rules } -func (g *Group) run() { +func (g *Group) run(ctx context.Context) { defer close(g.terminated) // Wait an initial amount to have consistently slotted intervals. @@ -206,7 +204,7 @@ func (g *Group) run() { iterationsScheduled.Inc() start := time.Now() - g.Eval(start) + g.Eval(ctx, start) iterationDuration.Observe(time.Since(start).Seconds()) g.SetEvaluationTime(time.Since(start)) @@ -328,7 +326,7 @@ func typeForRule(r Rule) ruleType { } // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. -func (g *Group) Eval(ts time.Time) { +func (g *Group) Eval(ctx context.Context, ts time.Time) { for i, rule := range g.rules { select { case <-g.done: @@ -346,7 +344,7 @@ func (g *Group) Eval(ts time.Time) { evalTotal.WithLabelValues(rtyp).Inc() - vector, err := rule.Eval(g.opts.Context, ts, g.opts.Query, g.opts.ExternalURL) + vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL) if err != nil { // Canceled queries are intentional termination of queries. This normally // happens on shutdown and thus we skip logging of any errors here. @@ -358,7 +356,7 @@ func (g *Group) Eval(ts time.Time) { } if ar, ok := rule.(*AlertingRule); ok { - g.sendAlerts(ar) + g.opts.NotifyFunc(ctx, ar.vector.String(), ar.currentAlerts()...) } var ( numOutOfOrder = 0 @@ -418,36 +416,6 @@ func (g *Group) Eval(ts time.Time) { } } -// sendAlerts sends alert notifications for the given rule. -func (g *Group) sendAlerts(rule *AlertingRule) error { - var alerts []*notifier.Alert - - for _, alert := range rule.currentAlerts() { - // Only send actually firing alerts. - if alert.State == StatePending { - continue - } - - a := ¬ifier.Alert{ - StartsAt: alert.ActiveAt.Add(rule.holdDuration), - Labels: alert.Labels, - Annotations: alert.Annotations, - GeneratorURL: g.opts.ExternalURL.String() + strutil.TableLinkForExpression(rule.vector.String()), - } - if !alert.ResolvedAt.IsZero() { - a.EndsAt = alert.ResolvedAt - } - - alerts = append(alerts, a) - } - - if len(alerts) > 0 { - g.opts.Notifier.Send(alerts...) - } - - return nil -} - // The Manager manages recording and alerting rules. type Manager struct { opts *ManagerOptions @@ -463,12 +431,15 @@ type Appendable interface { Appender() (storage.Appender, error) } +// NotifyFunc sends notifications about a set of alerts generated by the given expression. +type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) error + // ManagerOptions bundles options for the Manager. type ManagerOptions struct { ExternalURL *url.URL - Query QueryFunc + QueryFunc QueryFunc + NotifyFunc NotifyFunc Context context.Context - Notifier *notifier.Notifier Appendable Appendable Logger log.Logger } @@ -539,7 +510,7 @@ func (m *Manager) Update(interval time.Duration, files []string) error { // is told to run. This is necessary to avoid running // queries against a bootstrapping storage. <-m.block - newg.run() + newg.run(m.opts.Context) }() wg.Done() }(newg) diff --git a/rules/manager_test.go b/rules/manager_test.go index 33ad84fe8..30e38e922 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -176,7 +176,7 @@ func TestStaleness(t *testing.T) { defer storage.Close() engine := promql.NewEngine(storage, nil) opts := &ManagerOptions{ - Query: EngineQueryFunc(engine), + QueryFunc: EngineQueryFunc(engine), Appendable: storage, Context: context.Background(), Logger: log.NewNopLogger(), @@ -196,10 +196,12 @@ func TestStaleness(t *testing.T) { err = app.Commit() testutil.Ok(t, err) + ctx := context.Background() + // Execute 3 times, 1 second apart. - group.Eval(time.Unix(0, 0)) - group.Eval(time.Unix(1, 0)) - group.Eval(time.Unix(2, 0)) + group.Eval(ctx, time.Unix(0, 0)) + group.Eval(ctx, time.Unix(1, 0)) + group.Eval(ctx, time.Unix(2, 0)) querier, err := storage.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) @@ -296,11 +298,8 @@ func TestUpdate(t *testing.T) { "test": labels.FromStrings("name", "value"), } ruleManager := NewManager(&ManagerOptions{ - Appendable: nil, - Notifier: nil, - Query: nil, - Context: context.Background(), - Logger: log.NewNopLogger(), + Context: context.Background(), + Logger: log.NewNopLogger(), }) ruleManager.Run()