You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
prometheus/rules/manager.go

514 lines
14 KiB

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rules
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/strutil"
)
// QueryFunc processes PromQL queries.
type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, error)
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
promql: Separate `Point` into `FPoint` and `HPoint` In other words: Instead of having a “polymorphous” `Point` that can either contain a float value or a histogram value, use an `FPoint` for floats and an `HPoint` for histograms. This seemingly small change has a _lot_ of repercussions throughout the codebase. The idea here is to avoid the increase in size of `Point` arrays that happened after native histograms had been added. The higher-level data structures (`Sample`, `Series`, etc.) are still “polymorphous”. The same idea could be applied to them, but at each step the trade-offs needed to be evaluated. The idea with this change is to do the minimum necessary to get back to pre-histogram performance for functions that do not touch histograms. Here are comparisons for the `changes` function. The test data doesn't include histograms yet. Ideally, there would be no change in the benchmark result at all. First runtime v2.39 compared to directly prior to this commit: ``` name old time/op new time/op delta RangeQuery/expr=changes(a_one[1d]),steps=1-16 391µs ± 2% 542µs ± 1% +38.58% (p=0.000 n=9+8) RangeQuery/expr=changes(a_one[1d]),steps=10-16 452µs ± 2% 617µs ± 2% +36.48% (p=0.000 n=10+10) RangeQuery/expr=changes(a_one[1d]),steps=100-16 1.12ms ± 1% 1.36ms ± 2% +21.58% (p=0.000 n=8+10) RangeQuery/expr=changes(a_one[1d]),steps=1000-16 7.83ms ± 1% 8.94ms ± 1% +14.21% (p=0.000 n=10+10) RangeQuery/expr=changes(a_ten[1d]),steps=1-16 2.98ms ± 0% 3.30ms ± 1% +10.67% (p=0.000 n=9+10) RangeQuery/expr=changes(a_ten[1d]),steps=10-16 3.66ms ± 1% 4.10ms ± 1% +11.82% (p=0.000 n=10+10) RangeQuery/expr=changes(a_ten[1d]),steps=100-16 10.5ms ± 0% 11.8ms ± 1% +12.50% (p=0.000 n=8+10) RangeQuery/expr=changes(a_ten[1d]),steps=1000-16 77.6ms ± 1% 87.4ms ± 1% +12.63% (p=0.000 n=9+9) RangeQuery/expr=changes(a_hundred[1d]),steps=1-16 30.4ms ± 2% 32.8ms ± 1% +8.01% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=10-16 37.1ms ± 2% 40.6ms ± 2% +9.64% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=100-16 105ms ± 1% 117ms ± 1% +11.69% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=1000-16 783ms ± 3% 876ms ± 1% +11.83% (p=0.000 n=9+10) ``` And then runtime v2.39 compared to after this commit: ``` name old time/op new time/op delta RangeQuery/expr=changes(a_one[1d]),steps=1-16 391µs ± 2% 547µs ± 1% +39.84% (p=0.000 n=9+8) RangeQuery/expr=changes(a_one[1d]),steps=10-16 452µs ± 2% 616µs ± 2% +36.15% (p=0.000 n=10+10) RangeQuery/expr=changes(a_one[1d]),steps=100-16 1.12ms ± 1% 1.26ms ± 1% +12.20% (p=0.000 n=8+10) RangeQuery/expr=changes(a_one[1d]),steps=1000-16 7.83ms ± 1% 7.95ms ± 1% +1.59% (p=0.000 n=10+8) RangeQuery/expr=changes(a_ten[1d]),steps=1-16 2.98ms ± 0% 3.38ms ± 2% +13.49% (p=0.000 n=9+10) RangeQuery/expr=changes(a_ten[1d]),steps=10-16 3.66ms ± 1% 4.02ms ± 1% +9.80% (p=0.000 n=10+9) RangeQuery/expr=changes(a_ten[1d]),steps=100-16 10.5ms ± 0% 10.8ms ± 1% +3.08% (p=0.000 n=8+10) RangeQuery/expr=changes(a_ten[1d]),steps=1000-16 77.6ms ± 1% 78.1ms ± 1% +0.58% (p=0.035 n=9+10) RangeQuery/expr=changes(a_hundred[1d]),steps=1-16 30.4ms ± 2% 33.5ms ± 4% +10.18% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=10-16 37.1ms ± 2% 40.0ms ± 1% +7.98% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=100-16 105ms ± 1% 107ms ± 1% +1.92% (p=0.000 n=10+10) RangeQuery/expr=changes(a_hundred[1d]),steps=1000-16 783ms ± 3% 775ms ± 1% -1.02% (p=0.019 n=9+9) ``` In summary, the runtime doesn't really improve with this change for queries with just a few steps. For queries with many steps, this commit essentially reinstates the old performance. This is good because the many-step queries are the one that matter most (longest absolute runtime). In terms of allocations, though, this commit doesn't make a dent at all (numbers not shown). The reason is that most of the allocations happen in the sampleRingIterator (in the storage package), which has to be addressed in a separate commit. Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
T: v.T,
F: v.V,
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
}
}
// DefaultEvalIterationFunc is the default implementation of
// GroupEvalIterationFunc that is periodically invoked to evaluate the rules
// in a group at a given point in time and updates Group state and metrics
// accordingly. Custom GroupEvalIterationFunc implementations are recommended
// to invoke this function as well, to ensure correct Group state and metrics
// are maintained.
func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.Time) {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
start := time.Now()
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
g.setLastEvalTimestamp(evalTimestamp)
Add SyncForState Implementation for Ruler HA (#10070) * continuously syncing activeAt for alerts Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * add import Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Refactor SyncForState and add unit tests Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Format code Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Add hook for syncForState Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go lint Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Refactor syncForState override implementation Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Add syncForState override func as argument to Update() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go formatting Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix circleci test errors Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Remove overrideFunc as argument to run() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * remove the syncForState Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use the override function to decide if need to replace the activeAt or not Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix test case Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix format Signed-off-by: Yijie Qin <qinyijie@amazon.com> * Trigger build Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * return the result of map of alerts instead of single one Signed-off-by: Yijie Qin <qinyijie@amazon.com> * upper case the QueryforStateSeries Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use a more generic rule group post process function type Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix indentation Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix gofmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix lint Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing naming Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * add the lastEvalTimestamp as parameter Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * change funcType to func Signed-off-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <63399121+qinxx108@users.noreply.github.com>
3 years ago
}
// The Manager manages recording and alerting rules.
type Manager struct {
opts *ManagerOptions
groups map[string]*Group
mtx sync.RWMutex
block chan struct{}
done chan struct{}
restored bool
logger log.Logger
}
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
ExternalURL *url.URL
QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context.Context
Appendable storage.Appendable
Queryable storage.Queryable
Logger log.Logger
Registerer prometheus.Registerer
OutageTolerance time.Duration
ForGracePeriod time.Duration
ResendDelay time.Duration
GroupLoader GroupLoader
MaxConcurrentEvals int64
ConcurrentEvalsEnabled bool
RuleConcurrencyController RuleConcurrencyController
RuleDependencyController RuleDependencyController
Metrics *Metrics
}
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) *Manager {
if o.Metrics == nil {
o.Metrics = NewGroupMetrics(o.Registerer)
}
if o.GroupLoader == nil {
o.GroupLoader = FileLoader{}
}
if o.RuleConcurrencyController == nil {
if o.ConcurrentEvalsEnabled {
o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals)
} else {
o.RuleConcurrencyController = sequentialRuleEvalController{}
}
}
if o.RuleDependencyController == nil {
o.RuleDependencyController = ruleDependencyController{}
}
m := &Manager{
groups: map[string]*Group{},
opts: o,
block: make(chan struct{}),
done: make(chan struct{}),
logger: o.Logger,
}
return m
}
// Run starts processing of the rule manager. It is blocking.
func (m *Manager) Run() {
level.Info(m.logger).Log("msg", "Starting rule manager...")
m.start()
<-m.done
}
func (m *Manager) start() {
close(m.block)
}
// Stop the rule manager's rule evaluation cycles.
func (m *Manager) Stop() {
rules/manager.go: Fix race between reload and stop On one relatively large Prometheus instance (1.7M series), I noticed that upgrades were frequently resulting in Prometheus undergoing crash recovery on start-up. On closer examination, I found that Prometheus was panicking on shutdown. It seems that our configuration management (or misconfiguration thereof) is reloading Prometheus then immediately restarting it, which I suspect is causing this race: Sep 21 15:12:42 host systemd[1]: Reloading prometheus monitoring system. Sep 21 15:12:42 host prometheus[18734]: time="2016-09-21T15:12:42Z" level=info msg="Loading configuration file /etc/prometheus/config.yaml" source="main.go:221" Sep 21 15:12:42 host systemd[1]: Reloaded prometheus monitoring system. Sep 21 15:12:44 host systemd[1]: Stopping prometheus monitoring system... Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=warning msg="Received SIGTERM, exiting gracefully..." source="main.go:203" Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=info msg="See you next time!" source="main.go:210" Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=info msg="Stopping target manager..." source="targetmanager.go:90" Sep 21 15:12:52 host prometheus[18734]: time="2016-09-21T15:12:52Z" level=info msg="Checkpointing in-memory metrics and chunks..." source="persistence.go:548" Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=warning msg="Error on ingesting out-of-order samples" numDropped=1 source="scrape.go:467" Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=error msg="Error adding file watch for \"/etc/prometheus/targets\": no such file or directory" source="file.go:84" Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=error msg="Error adding file watch for \"/etc/prometheus/targets\": no such file or directory" source="file.go:84" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping rule manager..." source="manager.go:366" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Rule manager stopped." source="manager.go:372" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping notification handler..." source="notifier.go:325" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping local storage..." source="storage.go:381" Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping maintenance loop..." source="storage.go:383" Sep 21 15:13:01 host prometheus[18734]: panic: close of closed channel Sep 21 15:13:01 host prometheus[18734]: goroutine 7686074 [running]: Sep 21 15:13:01 host prometheus[18734]: panic(0xba57a0, 0xc60c42b500) Sep 21 15:13:01 host prometheus[18734]: /usr/local/go/src/runtime/panic.go:500 +0x1a1 Sep 21 15:13:01 host prometheus[18734]: github.com/prometheus/prometheus/rules.(*Manager).ApplyConfig.func1(0xc6645a9901, 0xc420271ef0, 0xc420338ed0, 0xc60c42b4f0, 0xc6645a9900) Sep 21 15:13:01 host prometheus[18734]: /home/build/packages/prometheus/tmp/build/gopath/src/github.com/prometheus/prometheus/rules/manager.go:412 +0x3c Sep 21 15:13:01 host prometheus[18734]: created by github.com/prometheus/prometheus/rules.(*Manager).ApplyConfig Sep 21 15:13:01 host prometheus[18734]: /home/build/packages/prometheus/tmp/build/gopath/src/github.com/prometheus/prometheus/rules/manager.go:423 +0x56b Sep 21 15:13:03 host systemd[1]: prometheus.service: main process exited, code=exited, status=2/INVALIDARGUMENT
8 years ago
m.mtx.Lock()
defer m.mtx.Unlock()
level.Info(m.logger).Log("msg", "Stopping rule manager...")
for _, eg := range m.groups {
eg.stop()
}
// Shut down the groups waiting multiple evaluation intervals to write
// staleness markers.
close(m.done)
level.Info(m.logger).Log("msg", "Rule manager stopped")
}
// Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc) error {
m.mtx.Lock()
defer m.mtx.Unlock()
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)
Add SyncForState Implementation for Ruler HA (#10070) * continuously syncing activeAt for alerts Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * add import Signed-off-by: Yijie Qin <qinyijie@amazon.com> Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Refactor SyncForState and add unit tests Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Format code Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * Add hook for syncForState Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go lint Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Refactor syncForState override implementation Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Add syncForState override func as argument to Update() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix go formatting Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Fix circleci test errors Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> Remove overrideFunc as argument to run() Signed-off-by: Wilbert Guo <wilbeguo@amazon.com> * remove the syncForState Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use the override function to decide if need to replace the activeAt or not Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix test case Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix format Signed-off-by: Yijie Qin <qinyijie@amazon.com> * Trigger build Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * return the result of map of alerts instead of single one Signed-off-by: Yijie Qin <qinyijie@amazon.com> * upper case the QueryforStateSeries Signed-off-by: Yijie Qin <qinyijie@amazon.com> * use a more generic rule group post process function type Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix indentation Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix gofmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix lint Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fixing naming Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fix comments Signed-off-by: Yijie Qin <qinyijie@amazon.com> * add the lastEvalTimestamp as parameter Signed-off-by: Yijie Qin <qinyijie@amazon.com> * fmt Signed-off-by: Yijie Qin <qinyijie@amazon.com> * change funcType to func Signed-off-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <qinyijie@amazon.com> Co-authored-by: Yijie Qin <63399121+qinxx108@users.noreply.github.com>
3 years ago
if errs != nil {
for _, e := range errs {
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true
var wg sync.WaitGroup
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
gn := GroupKey(newg.file, newg.name)
oldg, ok := m.groups[gn]
delete(m.groups, gn)
if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}
wg.Add(1)
go func(newg *Group) {
if ok {
oldg.stop()
newg.CopyState(oldg)
}
wg.Done()
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newg.run(m.opts.Context)
}(newg)
}
// Stop remaining old groups.
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
if m := g.metrics; m != nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
}
wg.Wait()
m.groups = groups
return nil
}
// GroupLoader is responsible for loading rule groups from arbitrary sources and parsing them.
type GroupLoader interface {
Load(identifier string) (*rulefmt.RuleGroups, []error)
Parse(query string) (parser.Expr, error)
}
// FileLoader is the default GroupLoader implementation. It defers to rulefmt.ParseFile
// and parser.ParseExpr.
type FileLoader struct{}
func (FileLoader) Load(identifier string) (*rulefmt.RuleGroups, []error) {
return rulefmt.ParseFile(identifier)
}
func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.ParseExpr(query) }
// LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups(
interval time.Duration, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc, filenames ...string,
) (map[string]*Group, []error) {
groups := make(map[string]*Group)
shouldRestore := !m.restored
for _, fn := range filenames {
rgs, errs := m.opts.GroupLoader.Load(fn)
if errs != nil {
return nil, errs
}
for _, rg := range rgs.Groups {
itv := interval
if rg.Interval != 0 {
itv = time.Duration(rg.Interval)
}
rules := make([]Rule, 0, len(rg.Rules))
for _, r := range rg.Rules {
expr, err := m.opts.GroupLoader.Parse(r.Expr.Value)
if err != nil {
return nil, []error{fmt.Errorf("%s: %w", fn, err)}
}
if r.Alert.Value != "" {
rules = append(rules, NewAlertingRule(
r.Alert.Value,
expr,
time.Duration(r.For),
time.Duration(r.KeepFiringFor),
labels.FromMap(r.Labels),
labels.FromMap(r.Annotations),
externalLabels,
externalURL,
m.restored,
log.With(m.logger, "alert", r.Alert),
))
continue
}
rules = append(rules, NewRecordingRule(
r.Record.Value,
expr,
labels.FromMap(r.Labels),
))
}
// Check dependencies between rules and store it on the Rule itself.
m.opts.RuleDependencyController.AnalyseRules(rules)
groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Limit: rg.Limit,
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
done: m.done,
EvalIterationFunc: groupEvalIterationFunc,
})
}
}
return groups, nil
}
// RuleGroups returns the list of manager's rule groups.
func (m *Manager) RuleGroups() []*Group {
m.mtx.RLock()
defer m.mtx.RUnlock()
rgs := make([]*Group, 0, len(m.groups))
for _, g := range m.groups {
rgs = append(rgs, g)
}
slices.SortFunc(rgs, func(a, b *Group) int {
fileCompare := strings.Compare(a.file, b.file)
// If its 0, then the file names are the same.
// Lets look at the group names in that case.
if fileCompare != 0 {
return fileCompare
}
return strings.Compare(a.name, b.name)
})
return rgs
}
// Rules returns the list of the manager's rules.
func (m *Manager) Rules() []Rule {
m.mtx.RLock()
defer m.mtx.RUnlock()
var rules []Rule
for _, g := range m.groups {
rules = append(rules, g.rules...)
}
return rules
}
// AlertingRules returns the list of the manager's alerting rules.
func (m *Manager) AlertingRules() []*AlertingRule {
alerts := []*AlertingRule{}
for _, rule := range m.Rules() {
if alertingRule, ok := rule.(*AlertingRule); ok {
alerts = append(alerts, alertingRule)
}
}
return alerts
}
type Sender interface {
Send(alerts ...*notifier.Alert)
}
// SendAlerts implements the rules.NotifyFunc for a Notifier.
func SendAlerts(s Sender, externalURL string) NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*Alert) {
var res []*notifier.Alert
for _, alert := range alerts {
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}
if len(alerts) > 0 {
s.Send(res...)
}
}
}
// RuleDependencyController controls whether a set of rules have dependencies between each other.
type RuleDependencyController interface {
// AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed
// not having any dependants and/or dependency, this function should call Rule.SetNoDependentRules(true)
// and/or Rule.SetNoDependencyRules(true).
AnalyseRules(rules []Rule)
}
type ruleDependencyController struct{}
// AnalyseRules implements RuleDependencyController.
func (c ruleDependencyController) AnalyseRules(rules []Rule) {
depMap := buildDependencyMap(rules)
for _, r := range rules {
r.SetNoDependentRules(depMap.dependents(r) == 0)
r.SetNoDependencyRules(depMap.dependencies(r) == 0)
}
}
// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently.
// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
// server with additional query load. Concurrency is controlled globally, not on a per-group basis.
type RuleConcurrencyController interface {
// Allow determines whether any concurrent evaluation slots are available.
// If Allow() returns true, then Done() must be called to release the acquired slot.
Allow() bool
// Done releases a concurrent evaluation slot.
Done()
}
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
type concurrentRuleEvalController struct {
sema *semaphore.Weighted
depMapsMu sync.Mutex
depMaps map[*Group]dependencyMap
}
func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController {
return &concurrentRuleEvalController{
sema: semaphore.NewWeighted(maxConcurrency),
depMaps: map[*Group]dependencyMap{},
}
}
func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool {
c.depMapsMu.Lock()
defer c.depMapsMu.Unlock()
depMap, found := c.depMaps[g]
if !found {
depMap = buildDependencyMap(g.rules)
c.depMaps[g] = depMap
}
return depMap.isIndependent(r)
}
func (c *concurrentRuleEvalController) Allow() bool {
return c.sema.TryAcquire(1)
}
func (c *concurrentRuleEvalController) Done() {
c.sema.Release(1)
}
func (c *concurrentRuleEvalController) Invalidate() {
c.depMapsMu.Lock()
defer c.depMapsMu.Unlock()
// Clear out the memoized dependency maps because some or all groups may have been updated.
c.depMaps = map[*Group]dependencyMap{}
}
// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
type sequentialRuleEvalController struct{}
func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool {
return false
}
func (c sequentialRuleEvalController) Allow() bool {
return false
}
func (c sequentialRuleEvalController) Done() {}
func (c sequentialRuleEvalController) Invalidate() {}