rules: decouple notifier packages

The dependency on the notifier packages caused a transitive dependency
on discovery and with that all client libraries our service discovery
uses.
pull/3507/head
Fabian Reinartz 7 years ago
parent 4d964a0a0d
commit 62461379b7

@ -50,6 +50,7 @@ import (
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web"
) )
@ -236,8 +237,8 @@ func main() {
ruleManager := rules.NewManager(&rules.ManagerOptions{ ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage, Appendable: fanoutStorage,
Notifier: notifier, QueryFunc: rules.EngineQueryFunc(queryEngine),
Query: rules.EngineQueryFunc(queryEngine), NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
Context: ctx, Context: ctx,
ExternalURL: cfg.web.ExternalURL, ExternalURL: cfg.web.ExternalURL,
Logger: log.With(logger, "component", "rule manager"), Logger: log.With(logger, "component", "rule manager"),
@ -552,3 +553,33 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) {
return eu, nil return eu, nil
} }
// sendAlerts implements a the rules.NotifyFunc for a Notifier.
// It filters any non-firing alerts from the input.
func sendAlerts(n *notifier.Notifier, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
var res []*notifier.Alert
for _, alert := range alerts {
// Only send actually firing alerts.
if alert.State == rules.StatePending {
continue
}
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
}
res = append(res, a)
}
if len(alerts) > 0 {
n.Send(res...)
}
return nil
}
}

@ -83,7 +83,9 @@ type Alert struct {
Value float64 Value float64
// The interval during which the condition of this alert held true. // The interval during which the condition of this alert held true.
// ResolvedAt will be 0 to indicate a still active alert. // ResolvedAt will be 0 to indicate a still active alert.
ActiveAt, ResolvedAt time.Time ActiveAt time.Time
FiredAt time.Time
ResolvedAt time.Time
} }
// An AlertingRule generates alerts from its vector expression. // An AlertingRule generates alerts from its vector expression.
@ -264,12 +266,14 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
if a.State != StateInactive { if a.State != StateInactive {
a.State = StateInactive a.State = StateInactive
a.ResolvedAt = ts a.ResolvedAt = ts
a.FiredAt = time.Time{}
} }
continue continue
} }
if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
a.State = StateFiring a.State = StateFiring
a.FiredAt = ts
} }
vec = append(vec, r.sample(a, ts)) vec = append(vec, r.sample(a, ts))

@ -29,14 +29,12 @@ import (
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/strutil"
) )
// Constants for instrumentation. // Constants for instrumentation.
@ -192,7 +190,7 @@ func (g *Group) File() string { return g.file }
// Rules returns the group's rules. // Rules returns the group's rules.
func (g *Group) Rules() []Rule { return g.rules } func (g *Group) Rules() []Rule { return g.rules }
func (g *Group) run() { func (g *Group) run(ctx context.Context) {
defer close(g.terminated) defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals. // Wait an initial amount to have consistently slotted intervals.
@ -206,7 +204,7 @@ func (g *Group) run() {
iterationsScheduled.Inc() iterationsScheduled.Inc()
start := time.Now() start := time.Now()
g.Eval(start) g.Eval(ctx, start)
iterationDuration.Observe(time.Since(start).Seconds()) iterationDuration.Observe(time.Since(start).Seconds())
g.SetEvaluationTime(time.Since(start)) g.SetEvaluationTime(time.Since(start))
@ -328,7 +326,7 @@ func typeForRule(r Rule) ruleType {
} }
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially. // Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ts time.Time) { func (g *Group) Eval(ctx context.Context, ts time.Time) {
for i, rule := range g.rules { for i, rule := range g.rules {
select { select {
case <-g.done: case <-g.done:
@ -346,7 +344,7 @@ func (g *Group) Eval(ts time.Time) {
evalTotal.WithLabelValues(rtyp).Inc() evalTotal.WithLabelValues(rtyp).Inc()
vector, err := rule.Eval(g.opts.Context, ts, g.opts.Query, g.opts.ExternalURL) vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
if err != nil { if err != nil {
// Canceled queries are intentional termination of queries. This normally // Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here. // happens on shutdown and thus we skip logging of any errors here.
@ -358,7 +356,7 @@ func (g *Group) Eval(ts time.Time) {
} }
if ar, ok := rule.(*AlertingRule); ok { if ar, ok := rule.(*AlertingRule); ok {
g.sendAlerts(ar) g.opts.NotifyFunc(ctx, ar.vector.String(), ar.currentAlerts()...)
} }
var ( var (
numOutOfOrder = 0 numOutOfOrder = 0
@ -418,36 +416,6 @@ func (g *Group) Eval(ts time.Time) {
} }
} }
// sendAlerts sends alert notifications for the given rule.
func (g *Group) sendAlerts(rule *AlertingRule) error {
var alerts []*notifier.Alert
for _, alert := range rule.currentAlerts() {
// Only send actually firing alerts.
if alert.State == StatePending {
continue
}
a := &notifier.Alert{
StartsAt: alert.ActiveAt.Add(rule.holdDuration),
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: g.opts.ExternalURL.String() + strutil.TableLinkForExpression(rule.vector.String()),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
}
alerts = append(alerts, a)
}
if len(alerts) > 0 {
g.opts.Notifier.Send(alerts...)
}
return nil
}
// The Manager manages recording and alerting rules. // The Manager manages recording and alerting rules.
type Manager struct { type Manager struct {
opts *ManagerOptions opts *ManagerOptions
@ -463,12 +431,15 @@ type Appendable interface {
Appender() (storage.Appender, error) Appender() (storage.Appender, error)
} }
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) error
// ManagerOptions bundles options for the Manager. // ManagerOptions bundles options for the Manager.
type ManagerOptions struct { type ManagerOptions struct {
ExternalURL *url.URL ExternalURL *url.URL
Query QueryFunc QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context.Context Context context.Context
Notifier *notifier.Notifier
Appendable Appendable Appendable Appendable
Logger log.Logger Logger log.Logger
} }
@ -539,7 +510,7 @@ func (m *Manager) Update(interval time.Duration, files []string) error {
// is told to run. This is necessary to avoid running // is told to run. This is necessary to avoid running
// queries against a bootstrapping storage. // queries against a bootstrapping storage.
<-m.block <-m.block
newg.run() newg.run(m.opts.Context)
}() }()
wg.Done() wg.Done()
}(newg) }(newg)

@ -176,7 +176,7 @@ func TestStaleness(t *testing.T) {
defer storage.Close() defer storage.Close()
engine := promql.NewEngine(storage, nil) engine := promql.NewEngine(storage, nil)
opts := &ManagerOptions{ opts := &ManagerOptions{
Query: EngineQueryFunc(engine), QueryFunc: EngineQueryFunc(engine),
Appendable: storage, Appendable: storage,
Context: context.Background(), Context: context.Background(),
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
@ -196,10 +196,12 @@ func TestStaleness(t *testing.T) {
err = app.Commit() err = app.Commit()
testutil.Ok(t, err) testutil.Ok(t, err)
ctx := context.Background()
// Execute 3 times, 1 second apart. // Execute 3 times, 1 second apart.
group.Eval(time.Unix(0, 0)) group.Eval(ctx, time.Unix(0, 0))
group.Eval(time.Unix(1, 0)) group.Eval(ctx, time.Unix(1, 0))
group.Eval(time.Unix(2, 0)) group.Eval(ctx, time.Unix(2, 0))
querier, err := storage.Querier(context.Background(), 0, 2000) querier, err := storage.Querier(context.Background(), 0, 2000)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -296,11 +298,8 @@ func TestUpdate(t *testing.T) {
"test": labels.FromStrings("name", "value"), "test": labels.FromStrings("name", "value"),
} }
ruleManager := NewManager(&ManagerOptions{ ruleManager := NewManager(&ManagerOptions{
Appendable: nil, Context: context.Background(),
Notifier: nil, Logger: log.NewNopLogger(),
Query: nil,
Context: context.Background(),
Logger: log.NewNopLogger(),
}) })
ruleManager.Run() ruleManager.Run()

Loading…
Cancel
Save