Browse Source

Merge pull request #2207 from weaveworks/expose-rules

Expose rule & group evaluation
pull/2164/head
Fabian Reinartz 8 years ago committed by GitHub
parent
commit
dbd6810ea0
  1. 4
      rules/alerting.go
  2. 37
      rules/manager.go
  3. 2
      rules/manager_test.go
  4. 4
      rules/recording.go
  5. 2
      rules/recording_test.go

4
rules/alerting.go

@ -146,9 +146,9 @@ func (r *AlertingRule) sample(alert *Alert, ts model.Time, set bool) *model.Samp
// is kept in memory state and consequentally repeatedly sent to the AlertManager.
const resolvedRetention = 15 * time.Minute
// eval evaluates the rule expression and then creates pending alerts and fires
// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) {
func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) {
query, err := engine.NewInstantQuery(r.vector.String(), ts)
if err != nil {
return nil, err

37
rules/manager.go

@ -106,7 +106,7 @@ const (
type Rule interface {
Name() string
// eval evaluates the rule, including any associated recording or alerting actions.
eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error)
Eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error)
// String returns a human-readable string representation of the rule.
String() string
// HTMLSnippet returns a human-readable string representation of the rule,
@ -125,9 +125,12 @@ type Group struct {
terminated chan struct{}
}
func newGroup(name string, opts *ManagerOptions) *Group {
// NewGroup makes a new Group with the given name, options, and rules.
func NewGroup(name string, interval time.Duration, rules []Rule, opts *ManagerOptions) *Group {
return &Group{
name: name,
interval: interval,
rules: rules,
opts: opts,
done: make(chan struct{}),
terminated: make(chan struct{}),
@ -151,7 +154,7 @@ func (g *Group) run() {
return
}
start := time.Now()
g.eval()
g.Eval()
iterationDuration.Observe(time.Since(start).Seconds())
}
@ -234,10 +237,10 @@ func typeForRule(r Rule) ruleType {
panic(fmt.Errorf("unknown rule type: %T", r))
}
// eval runs a single evaluation cycle in which all rules are evaluated in parallel.
// Eval runs a single evaluation cycle in which all rules are evaluated in parallel.
// In the future a single group will be evaluated sequentially to properly handle
// rule dependency.
func (g *Group) eval() {
func (g *Group) Eval() {
var (
now = model.Now()
wg sync.WaitGroup
@ -257,7 +260,7 @@ func (g *Group) eval() {
evalTotal.WithLabelValues(rtyp).Inc()
vector, err := rule.eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL.Path)
vector, err := rule.Eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL.Path)
if err != nil {
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
@ -394,7 +397,8 @@ func (m *Manager) ApplyConfig(conf *config.Config) error {
files = append(files, fs...)
}
groups, err := m.loadGroups(files...)
// To be replaced with a configurable per-group interval.
groups, err := m.loadGroups(time.Duration(conf.GlobalConfig.EvaluationInterval), files...)
if err != nil {
return fmt.Errorf("error loading rules, previous rule set restored: %s", err)
}
@ -402,9 +406,6 @@ func (m *Manager) ApplyConfig(conf *config.Config) error {
var wg sync.WaitGroup
for _, newg := range groups {
// To be replaced with a configurable per-group interval.
newg.interval = time.Duration(conf.GlobalConfig.EvaluationInterval)
wg.Add(1)
// If there is an old group with the same identifier, stop it and wait for
@ -442,14 +443,8 @@ func (m *Manager) ApplyConfig(conf *config.Config) error {
// loadGroups reads groups from a list of files.
// As there's currently no group syntax a single group named "default" containing
// all rules will be returned.
func (m *Manager) loadGroups(filenames ...string) (map[string]*Group, error) {
groups := map[string]*Group{}
// Currently there is no group syntax implemented. Thus all rules
// are read into a single default group.
g := newGroup("default", m.opts)
groups[g.name] = g
func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[string]*Group, error) {
rules := []Rule{}
for _, fn := range filenames {
content, err := ioutil.ReadFile(fn)
if err != nil {
@ -473,10 +468,14 @@ func (m *Manager) loadGroups(filenames ...string) (map[string]*Group, error) {
default:
panic("retrieval.Manager.LoadRuleFiles: unknown statement type")
}
g.rules = append(g.rules, rule)
rules = append(rules, rule)
}
}
// Currently there is no group syntax implemented. Thus all rules
// are read into a single default group.
g := NewGroup("default", interval, rules, m.opts)
groups := map[string]*Group{g.name: g}
return groups, nil
}

2
rules/manager_test.go

@ -105,7 +105,7 @@ func TestAlertingRule(t *testing.T) {
for i, test := range tests {
evalTime := model.Time(0).Add(test.time)
res, err := rule.eval(suite.Context(), evalTime, suite.QueryEngine(), "")
res, err := rule.Eval(suite.Context(), evalTime, suite.QueryEngine(), "")
if err != nil {
t.Fatalf("Error during alerting rule evaluation: %s", err)
}

4
rules/recording.go

@ -45,8 +45,8 @@ func (rule RecordingRule) Name() string {
return rule.name
}
// eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule RecordingRule) eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) {
// Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule RecordingRule) Eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) {
query, err := engine.NewInstantQuery(rule.vector.String(), timestamp)
if err != nil {
return nil, err

2
rules/recording_test.go

@ -63,7 +63,7 @@ func TestRuleEval(t *testing.T) {
for _, test := range suite {
rule := NewRecordingRule(test.name, test.expr, test.labels)
result, err := rule.eval(ctx, now, engine, "")
result, err := rule.Eval(ctx, now, engine, "")
if err != nil {
t.Fatalf("Error evaluating %s", test.name)
}

Loading…
Cancel
Save