2015-01-21 19:07:45 +00:00
// Copyright 2013 The Prometheus Authors
2013-01-07 22:24:26 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2015-03-30 17:43:19 +00:00
package rules
2013-01-07 22:24:26 +00:00
import (
2017-10-25 04:21:42 +00:00
"context"
2022-06-17 07:54:25 +00:00
"errors"
"fmt"
2017-05-18 15:26:36 +00:00
"math"
2015-06-30 12:38:01 +00:00
"net/url"
2017-06-14 10:39:14 +00:00
"sort"
2013-04-17 12:42:15 +00:00
"sync"
2013-01-07 22:24:26 +00:00
"time"
2021-06-11 16:17:59 +00:00
"github.com/go-kit/log"
"github.com/go-kit/log/level"
2014-06-18 17:43:15 +00:00
"github.com/prometheus/client_golang/prometheus"
2018-08-02 10:18:24 +00:00
"github.com/prometheus/common/model"
2022-01-25 10:08:04 +00:00
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
2022-05-25 08:06:17 +00:00
"go.opentelemetry.io/otel/codes"
2019-03-25 23:01:12 +00:00
2021-11-08 14:23:17 +00:00
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
2015-03-30 17:43:19 +00:00
"github.com/prometheus/prometheus/promql"
2020-02-03 18:06:39 +00:00
"github.com/prometheus/prometheus/promql/parser"
2015-03-15 02:36:15 +00:00
"github.com/prometheus/prometheus/storage"
2013-06-25 12:02:27 +00:00
)
2013-01-07 22:24:26 +00:00
2019-05-05 09:48:42 +00:00
// RuleHealth describes the health state of a rule.
2018-08-06 22:33:45 +00:00
type RuleHealth string
// The possible health states of a rule based on the last execution.
const (
HealthUnknown RuleHealth = "unknown"
HealthGood RuleHealth = "ok"
HealthBad RuleHealth = "err"
)
2014-06-18 17:43:15 +00:00
// Constants for instrumentation.
2016-01-21 10:09:24 +00:00
const namespace = "prometheus"
2014-06-18 17:43:15 +00:00
2019-01-03 12:07:06 +00:00
// Metrics for rule evaluation.
type Metrics struct {
2021-04-30 17:25:34 +00:00
EvalDuration prometheus . Summary
IterationDuration prometheus . Summary
IterationsMissed * prometheus . CounterVec
IterationsScheduled * prometheus . CounterVec
EvalTotal * prometheus . CounterVec
EvalFailures * prometheus . CounterVec
GroupInterval * prometheus . GaugeVec
GroupLastEvalTime * prometheus . GaugeVec
GroupLastDuration * prometheus . GaugeVec
GroupRules * prometheus . GaugeVec
GroupSamples * prometheus . GaugeVec
2018-12-28 10:20:29 +00:00
}
2020-04-07 12:06:01 +00:00
// NewGroupMetrics creates a new instance of Metrics and registers it with the provided registerer,
2019-01-03 12:07:06 +00:00
// if not nil.
func NewGroupMetrics ( reg prometheus . Registerer ) * Metrics {
m := & Metrics {
2021-04-30 17:25:34 +00:00
EvalDuration : prometheus . NewSummary (
2018-12-28 10:20:29 +00:00
prometheus . SummaryOpts {
2019-06-12 00:03:13 +00:00
Namespace : namespace ,
Name : "rule_evaluation_duration_seconds" ,
Help : "The duration for a rule to execute." ,
Objectives : map [ float64 ] float64 { 0.5 : 0.05 , 0.9 : 0.01 , 0.99 : 0.001 } ,
2018-12-28 10:20:29 +00:00
} ) ,
2021-04-30 17:25:34 +00:00
IterationDuration : prometheus . NewSummary ( prometheus . SummaryOpts {
2018-12-28 10:20:29 +00:00
Namespace : namespace ,
Name : "rule_group_duration_seconds" ,
Help : "The duration of rule group evaluations." ,
Objectives : map [ float64 ] float64 { 0.01 : 0.001 , 0.05 : 0.005 , 0.5 : 0.05 , 0.90 : 0.01 , 0.99 : 0.001 } ,
} ) ,
2021-04-30 17:25:34 +00:00
IterationsMissed : prometheus . NewCounterVec (
2020-08-19 13:29:13 +00:00
prometheus . CounterOpts {
Namespace : namespace ,
Name : "rule_group_iterations_missed_total" ,
Help : "The total number of rule group evaluations missed due to slow rule group evaluation." ,
} ,
[ ] string { "rule_group" } ,
) ,
2021-04-30 17:25:34 +00:00
IterationsScheduled : prometheus . NewCounterVec (
2020-08-19 13:29:13 +00:00
prometheus . CounterOpts {
Namespace : namespace ,
Name : "rule_group_iterations_total" ,
Help : "The total number of scheduled rule group evaluations, whether executed or missed." ,
} ,
[ ] string { "rule_group" } ,
) ,
2021-04-30 17:25:34 +00:00
EvalTotal : prometheus . NewCounterVec (
2020-04-08 21:21:37 +00:00
prometheus . CounterOpts {
Namespace : namespace ,
Name : "rule_evaluations_total" ,
Help : "The total number of rule evaluations." ,
} ,
[ ] string { "rule_group" } ,
) ,
2021-04-30 17:25:34 +00:00
EvalFailures : prometheus . NewCounterVec (
2020-04-08 21:21:37 +00:00
prometheus . CounterOpts {
Namespace : namespace ,
Name : "rule_evaluation_failures_total" ,
Help : "The total number of rule evaluation failures." ,
} ,
[ ] string { "rule_group" } ,
) ,
2021-04-30 17:25:34 +00:00
GroupInterval : prometheus . NewGaugeVec (
2020-01-29 11:26:08 +00:00
prometheus . GaugeOpts {
Namespace : namespace ,
Name : "rule_group_interval_seconds" ,
Help : "The interval of a rule group." ,
} ,
[ ] string { "rule_group" } ,
) ,
2021-04-30 17:25:34 +00:00
GroupLastEvalTime : prometheus . NewGaugeVec (
2018-12-28 10:20:29 +00:00
prometheus . GaugeOpts {
Namespace : namespace ,
Name : "rule_group_last_evaluation_timestamp_seconds" ,
Help : "The timestamp of the last rule group evaluation in seconds." ,
} ,
[ ] string { "rule_group" } ,
) ,
2021-04-30 17:25:34 +00:00
GroupLastDuration : prometheus . NewGaugeVec (
2018-12-28 10:20:29 +00:00
prometheus . GaugeOpts {
Namespace : namespace ,
Name : "rule_group_last_duration_seconds" ,
Help : "The duration of the last rule group evaluation." ,
} ,
[ ] string { "rule_group" } ,
) ,
2021-04-30 17:25:34 +00:00
GroupRules : prometheus . NewGaugeVec (
2019-01-13 14:28:07 +00:00
prometheus . GaugeOpts {
Namespace : namespace ,
Name : "rule_group_rules" ,
Help : "The number of rules." ,
} ,
[ ] string { "rule_group" } ,
) ,
2021-04-30 17:25:34 +00:00
GroupSamples : prometheus . NewGaugeVec (
2020-09-25 15:48:38 +00:00
prometheus . GaugeOpts {
Namespace : namespace ,
Name : "rule_group_last_evaluation_samples" ,
Help : "The number of samples returned during the last rule group evaluation." ,
} ,
[ ] string { "rule_group" } ,
) ,
2018-12-28 10:20:29 +00:00
}
if reg != nil {
reg . MustRegister (
2021-04-30 17:25:34 +00:00
m . EvalDuration ,
m . IterationDuration ,
m . IterationsMissed ,
m . IterationsScheduled ,
m . EvalTotal ,
m . EvalFailures ,
m . GroupInterval ,
m . GroupLastEvalTime ,
m . GroupLastDuration ,
m . GroupRules ,
m . GroupSamples ,
2018-12-28 10:20:29 +00:00
)
}
return m
2014-06-18 17:43:15 +00:00
}
2017-11-23 12:04:54 +00:00
// QueryFunc processes PromQL queries.
type QueryFunc func ( ctx context . Context , q string , t time . Time ) ( promql . Vector , error )
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
2018-07-18 03:54:33 +00:00
// It converts scalar into vector results.
2018-01-09 16:44:23 +00:00
func EngineQueryFunc ( engine * promql . Engine , q storage . Queryable ) QueryFunc {
2017-11-23 12:04:54 +00:00
return func ( ctx context . Context , qs string , t time . Time ) ( promql . Vector , error ) {
2022-02-02 02:07:23 +00:00
q , err := engine . NewInstantQuery ( q , nil , qs , t )
2017-11-23 12:04:54 +00:00
if err != nil {
return nil , err
}
res := q . Exec ( ctx )
if res . Err != nil {
return nil , res . Err
}
switch v := res . Value . ( type ) {
case promql . Vector :
return v , nil
case promql . Scalar :
return promql . Vector { promql . Sample {
Point : promql . Point ( v ) ,
Metric : labels . Labels { } ,
} } , nil
default :
2019-03-25 23:01:12 +00:00
return nil , errors . New ( "rule result is not a vector or scalar" )
2017-11-23 12:04:54 +00:00
}
}
}
2015-06-30 09:18:07 +00:00
// A Rule encapsulates a vector expression which is evaluated at a specified
// interval and acted upon (currently either recorded or used for alerting).
type Rule interface {
Name ( ) string
2019-03-15 15:23:36 +00:00
// Labels of the rule.
Labels ( ) labels . Labels
2015-12-14 16:40:40 +00:00
// eval evaluates the rule, including any associated recording or alerting actions.
2021-09-15 07:48:26 +00:00
Eval ( context . Context , time . Time , QueryFunc , * url . URL , int ) ( promql . Vector , error )
2015-06-30 09:18:07 +00:00
// String returns a human-readable string representation of the rule.
String ( ) string
2020-09-13 15:07:59 +00:00
// Query returns the rule query expression.
Query ( ) parser . Expr
2018-08-06 22:33:45 +00:00
// SetLastErr sets the current error experienced by the rule.
SetLastError ( error )
// LastErr returns the last error experienced by the rule.
LastError ( ) error
// SetHealth sets the current health of the rule.
SetHealth ( RuleHealth )
// Health returns the current health of the rule.
Health ( ) RuleHealth
2018-07-18 03:54:33 +00:00
SetEvaluationDuration ( time . Duration )
2018-12-28 10:20:29 +00:00
// GetEvaluationDuration returns last evaluation duration.
// NOTE: Used dynamically by rules.html template.
2018-07-18 03:54:33 +00:00
GetEvaluationDuration ( ) time . Duration
2018-10-12 16:26:59 +00:00
SetEvaluationTimestamp ( time . Time )
2018-12-28 10:20:29 +00:00
// GetEvaluationTimestamp returns last evaluation timestamp.
// NOTE: Used dynamically by rules.html template.
2018-10-12 16:26:59 +00:00
GetEvaluationTimestamp ( ) time . Time
2015-06-30 09:18:07 +00:00
}
2015-12-17 10:46:10 +00:00
// Group is a set of rules that have a logical relation.
2015-12-14 16:40:40 +00:00
type Group struct {
2017-11-23 12:52:15 +00:00
name string
file string
interval time . Duration
2021-09-15 07:48:26 +00:00
limit int
2017-11-23 12:52:15 +00:00
rules [ ] Rule
seriesInPreviousEval [ ] map [ string ] labels . Labels // One per Rule.
2019-08-07 15:11:05 +00:00
staleSeries [ ] labels . Labels
2017-11-23 12:52:15 +00:00
opts * ManagerOptions
mtx sync . Mutex
2020-08-25 10:38:06 +00:00
evaluationTime time . Duration
lastEvaluation time . Time
2013-08-20 13:42:06 +00:00
2018-08-02 10:18:24 +00:00
shouldRestore bool
2020-04-18 12:32:18 +00:00
markStale bool
done chan struct { }
2020-02-12 15:22:18 +00:00
terminated chan struct { }
managerDone chan struct { }
2017-06-16 10:22:44 +00:00
logger log . Logger
2018-12-28 10:20:29 +00:00
2019-01-03 12:07:06 +00:00
metrics * Metrics
2022-03-29 00:16:46 +00:00
ruleGroupPostProcessFunc RuleGroupPostProcessFunc
2015-12-14 16:40:40 +00:00
}
2013-08-20 13:42:06 +00:00
2022-03-29 00:16:46 +00:00
// This function will be used before each rule group evaluation if not nil.
// Use this function type if the rule group post processing is needed.
type RuleGroupPostProcessFunc func ( g * Group , lastEvalTimestamp time . Time , log log . Logger ) error
2020-02-12 15:22:18 +00:00
type GroupOptions struct {
2022-03-29 00:16:46 +00:00
Name , File string
Interval time . Duration
Limit int
Rules [ ] Rule
ShouldRestore bool
Opts * ManagerOptions
done chan struct { }
RuleGroupPostProcessFunc RuleGroupPostProcessFunc
2020-02-12 15:22:18 +00:00
}
2016-11-18 16:25:58 +00:00
// NewGroup makes a new Group with the given name, options, and rules.
2020-02-12 15:22:18 +00:00
func NewGroup ( o GroupOptions ) * Group {
metrics := o . Opts . Metrics
2018-12-28 10:20:29 +00:00
if metrics == nil {
2020-02-12 15:22:18 +00:00
metrics = NewGroupMetrics ( o . Opts . Registerer )
2018-12-28 10:20:29 +00:00
}
2020-09-13 15:07:59 +00:00
key := GroupKey ( o . File , o . Name )
2021-04-30 17:25:34 +00:00
metrics . IterationsMissed . WithLabelValues ( key )
metrics . IterationsScheduled . WithLabelValues ( key )
metrics . EvalTotal . WithLabelValues ( key )
metrics . EvalFailures . WithLabelValues ( key )
metrics . GroupLastEvalTime . WithLabelValues ( key )
metrics . GroupLastDuration . WithLabelValues ( key )
metrics . GroupRules . WithLabelValues ( key ) . Set ( float64 ( len ( o . Rules ) ) )
metrics . GroupSamples . WithLabelValues ( key )
metrics . GroupInterval . WithLabelValues ( key ) . Set ( o . Interval . Seconds ( ) )
2018-12-28 10:20:29 +00:00
2015-12-14 16:40:40 +00:00
return & Group {
2022-03-29 00:16:46 +00:00
name : o . Name ,
file : o . File ,
interval : o . Interval ,
limit : o . Limit ,
rules : o . Rules ,
shouldRestore : o . ShouldRestore ,
opts : o . Opts ,
seriesInPreviousEval : make ( [ ] map [ string ] labels . Labels , len ( o . Rules ) ) ,
done : make ( chan struct { } ) ,
managerDone : o . done ,
terminated : make ( chan struct { } ) ,
logger : log . With ( o . Opts . Logger , "file" , o . File , "group" , o . Name ) ,
metrics : metrics ,
ruleGroupPostProcessFunc : o . RuleGroupPostProcessFunc ,
2015-12-14 16:40:40 +00:00
}
2013-08-20 13:42:06 +00:00
}
2017-06-14 10:39:14 +00:00
// Name returns the group name.
func ( g * Group ) Name ( ) string { return g . name }
// File returns the group's file.
func ( g * Group ) File ( ) string { return g . file }
// Rules returns the group's rules.
func ( g * Group ) Rules ( ) [ ] Rule { return g . rules }
2022-03-29 00:16:46 +00:00
// Queryable returns the group's querable.
func ( g * Group ) Queryable ( ) storage . Queryable { return g . opts . Queryable }
// Context returns the group's context.
func ( g * Group ) Context ( ) context . Context { return g . opts . Context }
2018-06-27 07:15:17 +00:00
// Interval returns the group's interval.
func ( g * Group ) Interval ( ) time . Duration { return g . interval }
2021-09-15 07:48:26 +00:00
// Limit returns the group's limit.
func ( g * Group ) Limit ( ) int { return g . limit }
2017-11-24 07:59:05 +00:00
func ( g * Group ) run ( ctx context . Context ) {
2015-12-14 16:40:40 +00:00
defer close ( g . terminated )
2013-08-20 13:42:06 +00:00
2015-12-14 16:40:40 +00:00
// Wait an initial amount to have consistently slotted intervals.
2020-09-13 15:07:59 +00:00
evalTimestamp := g . EvalTimestamp ( time . Now ( ) . UnixNano ( ) ) . Add ( g . interval )
2016-01-12 09:52:40 +00:00
select {
2018-06-01 14:23:07 +00:00
case <- time . After ( time . Until ( evalTimestamp ) ) :
2016-01-12 09:52:40 +00:00
case <- g . done :
return
}
2013-08-20 13:42:06 +00:00
2020-01-27 09:53:10 +00:00
ctx = promql . NewOriginContext ( ctx , map [ string ] interface { } {
"ruleGroup" : map [ string ] string {
"file" : g . File ( ) ,
"name" : g . Name ( ) ,
} ,
2020-01-10 08:28:17 +00:00
} )
2015-12-14 16:40:40 +00:00
iter := func ( ) {
2021-04-30 17:25:34 +00:00
g . metrics . IterationsScheduled . WithLabelValues ( GroupKey ( g . file , g . name ) ) . Inc ( )
2016-12-29 08:27:30 +00:00
2015-12-14 16:40:40 +00:00
start := time . Now ( )
2018-06-01 14:23:07 +00:00
g . Eval ( ctx , evalTimestamp )
timeSinceStart := time . Since ( start )
2013-01-07 22:24:26 +00:00
2021-04-30 17:25:34 +00:00
g . metrics . IterationDuration . Observe ( timeSinceStart . Seconds ( ) )
2020-08-25 10:38:06 +00:00
g . setEvaluationTime ( timeSinceStart )
g . setLastEvaluation ( start )
2013-01-07 22:24:26 +00:00
}
2015-01-29 14:05:10 +00:00
2018-06-01 14:23:07 +00:00
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.interval` occurrence.
2015-12-14 16:40:40 +00:00
tick := time . NewTicker ( g . interval )
defer tick . Stop ( )
2013-01-07 22:24:26 +00:00
2020-04-18 12:32:18 +00:00
defer func ( ) {
if ! g . markStale {
2020-02-12 15:22:18 +00:00
return
}
go func ( now time . Time ) {
for _ , rule := range g . seriesInPreviousEval {
for _ , r := range rule {
g . staleSeries = append ( g . staleSeries , r )
}
}
// That can be garbage collected at this point.
g . seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <- g . managerDone :
case <- time . After ( 2 * g . interval ) :
2020-07-24 14:10:51 +00:00
g . cleanupStaleSeries ( ctx , now )
2020-02-12 15:22:18 +00:00
}
} ( time . Now ( ) )
2020-04-18 12:32:18 +00:00
} ( )
2020-02-12 15:22:18 +00:00
2018-06-01 14:23:07 +00:00
iter ( )
2018-08-02 10:18:24 +00:00
if g . shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
2020-04-18 12:32:18 +00:00
case <- g . done :
2018-08-02 10:18:24 +00:00
return
case <- tick . C :
missed := ( time . Since ( evalTimestamp ) / g . interval ) - 1
if missed > 0 {
2021-04-30 17:25:34 +00:00
g . metrics . IterationsMissed . WithLabelValues ( GroupKey ( g . file , g . name ) ) . Add ( float64 ( missed ) )
g . metrics . IterationsScheduled . WithLabelValues ( GroupKey ( g . file , g . name ) ) . Add ( float64 ( missed ) )
2018-08-02 10:18:24 +00:00
}
evalTimestamp = evalTimestamp . Add ( ( missed + 1 ) * g . interval )
iter ( )
}
g . RestoreForState ( time . Now ( ) )
g . shouldRestore = false
}
2013-01-07 22:24:26 +00:00
for {
select {
2020-04-18 12:32:18 +00:00
case <- g . done :
2013-12-11 14:30:27 +00:00
return
2015-01-29 14:05:10 +00:00
default :
select {
2020-04-18 12:32:18 +00:00
case <- g . done :
2015-01-29 14:05:10 +00:00
return
2015-12-14 16:40:40 +00:00
case <- tick . C :
2018-06-01 14:23:07 +00:00
missed := ( time . Since ( evalTimestamp ) / g . interval ) - 1
2017-04-02 23:03:28 +00:00
if missed > 0 {
2021-04-30 17:25:34 +00:00
g . metrics . IterationsMissed . WithLabelValues ( GroupKey ( g . file , g . name ) ) . Add ( float64 ( missed ) )
g . metrics . IterationsScheduled . WithLabelValues ( GroupKey ( g . file , g . name ) ) . Add ( float64 ( missed ) )
2017-04-02 23:03:28 +00:00
}
2018-06-01 14:23:07 +00:00
evalTimestamp = evalTimestamp . Add ( ( missed + 1 ) * g . interval )
2022-03-29 00:16:46 +00:00
useRuleGroupPostProcessFunc ( g , evalTimestamp . Add ( - ( missed + 1 ) * g . interval ) )
2015-12-14 16:40:40 +00:00
iter ( )
2015-01-29 14:05:10 +00:00
}
2013-01-07 22:24:26 +00:00
}
}
}
2022-03-29 00:16:46 +00:00
func useRuleGroupPostProcessFunc ( g * Group , lastEvalTimestamp time . Time ) {
if g . ruleGroupPostProcessFunc != nil {
err := g . ruleGroupPostProcessFunc ( g , lastEvalTimestamp , g . logger )
if err != nil {
level . Warn ( g . logger ) . Log ( "msg" , "ruleGroupPostProcessFunc failed" , "err" , err )
}
}
}
2015-12-14 16:40:40 +00:00
func ( g * Group ) stop ( ) {
close ( g . done )
<- g . terminated
2013-01-07 22:24:26 +00:00
}
2017-06-16 11:14:33 +00:00
func ( g * Group ) hash ( ) uint64 {
l := labels . New (
2019-05-03 13:11:28 +00:00
labels . Label { Name : "name" , Value : g . name } ,
labels . Label { Name : "file" , Value : g . file } ,
2017-06-16 11:14:33 +00:00
)
return l . Hash ( )
2015-12-14 16:40:40 +00:00
}
2019-05-14 21:14:27 +00:00
// AlertingRules returns the list of the group's alerting rules.
func ( g * Group ) AlertingRules ( ) [ ] * AlertingRule {
g . mtx . Lock ( )
defer g . mtx . Unlock ( )
var alerts [ ] * AlertingRule
for _ , rule := range g . rules {
if alertingRule , ok := rule . ( * AlertingRule ) ; ok {
alerts = append ( alerts , alertingRule )
}
}
sort . Slice ( alerts , func ( i , j int ) bool {
return alerts [ i ] . State ( ) > alerts [ j ] . State ( ) ||
( alerts [ i ] . State ( ) == alerts [ j ] . State ( ) &&
alerts [ i ] . Name ( ) < alerts [ j ] . Name ( ) )
} )
return alerts
}
// HasAlertingRules returns true if the group contains at least one AlertingRule.
func ( g * Group ) HasAlertingRules ( ) bool {
g . mtx . Lock ( )
defer g . mtx . Unlock ( )
for _ , rule := range g . rules {
if _ , ok := rule . ( * AlertingRule ) ; ok {
return true
}
}
return false
}
2020-08-25 10:38:06 +00:00
// GetEvaluationTime returns the time in seconds it took to evaluate the rule group.
func ( g * Group ) GetEvaluationTime ( ) time . Duration {
2017-11-17 15:18:34 +00:00
g . mtx . Lock ( )
defer g . mtx . Unlock ( )
2020-08-25 10:38:06 +00:00
return g . evaluationTime
2017-11-17 15:18:34 +00:00
}
2020-08-25 10:38:06 +00:00
// setEvaluationTime sets the time in seconds the last evaluation took.
func ( g * Group ) setEvaluationTime ( dur time . Duration ) {
2021-04-30 17:25:34 +00:00
g . metrics . GroupLastDuration . WithLabelValues ( GroupKey ( g . file , g . name ) ) . Set ( dur . Seconds ( ) )
2018-12-28 10:20:29 +00:00
2017-11-17 15:18:34 +00:00
g . mtx . Lock ( )
defer g . mtx . Unlock ( )
2020-08-25 10:38:06 +00:00
g . evaluationTime = dur
2017-11-17 15:18:34 +00:00
}
2020-08-25 10:38:06 +00:00
// GetLastEvaluation returns the time the last evaluation of the rule group took place.
func ( g * Group ) GetLastEvaluation ( ) time . Time {
2018-10-12 16:26:59 +00:00
g . mtx . Lock ( )
defer g . mtx . Unlock ( )
2020-08-25 10:38:06 +00:00
return g . lastEvaluation
2018-10-12 16:26:59 +00:00
}
2020-11-01 15:54:04 +00:00
// setLastEvaluation updates evaluationTimestamp to the timestamp of when the rule group was last evaluated.
2020-08-25 10:38:06 +00:00
func ( g * Group ) setLastEvaluation ( ts time . Time ) {
2021-04-30 17:25:34 +00:00
g . metrics . GroupLastEvalTime . WithLabelValues ( GroupKey ( g . file , g . name ) ) . Set ( float64 ( ts . UnixNano ( ) ) / 1e9 )
2018-12-28 10:20:29 +00:00
2018-10-12 16:26:59 +00:00
g . mtx . Lock ( )
defer g . mtx . Unlock ( )
2020-08-25 10:38:06 +00:00
g . lastEvaluation = ts
2018-10-12 16:26:59 +00:00
}
2020-09-13 15:07:59 +00:00
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
func ( g * Group ) EvalTimestamp ( startTime int64 ) time . Time {
2015-12-14 16:40:40 +00:00
var (
2018-06-01 14:23:07 +00:00
offset = int64 ( g . hash ( ) % uint64 ( g . interval ) )
2020-10-31 13:40:24 +00:00
adjNow = startTime - offset
2018-06-01 14:23:07 +00:00
base = adjNow - ( adjNow % int64 ( g . interval ) )
2015-12-14 16:40:40 +00:00
)
2020-03-29 16:35:39 +00:00
return time . Unix ( 0 , base + offset ) . UTC ( )
2015-12-14 16:40:40 +00:00
}
2013-07-30 15:18:07 +00:00
2019-03-15 15:23:36 +00:00
func nameAndLabels ( rule Rule ) string {
return rule . Name ( ) + rule . Labels ( ) . String ( )
}
2018-07-18 13:14:38 +00:00
// CopyState copies the alerting rule and staleness related state from the given group.
2017-05-19 15:43:59 +00:00
//
2019-03-15 15:23:36 +00:00
// Rules are matched based on their name and labels. If there are duplicates, the
2017-05-19 15:43:59 +00:00
// first is matched with the first, second with the second etc.
2018-07-18 13:14:38 +00:00
func ( g * Group ) CopyState ( from * Group ) {
2020-08-25 10:38:06 +00:00
g . evaluationTime = from . evaluationTime
g . lastEvaluation = from . lastEvaluation
2017-11-30 13:49:15 +00:00
2017-05-19 15:43:59 +00:00
ruleMap := make ( map [ string ] [ ] int , len ( from . rules ) )
for fi , fromRule := range from . rules {
2019-03-15 15:23:36 +00:00
nameAndLabels := nameAndLabels ( fromRule )
l := ruleMap [ nameAndLabels ]
ruleMap [ nameAndLabels ] = append ( l , fi )
2017-05-19 15:43:59 +00:00
}
for i , rule := range g . rules {
2019-03-15 15:23:36 +00:00
nameAndLabels := nameAndLabels ( rule )
indexes := ruleMap [ nameAndLabels ]
2017-05-19 15:43:59 +00:00
if len ( indexes ) == 0 {
continue
}
fi := indexes [ 0 ]
g . seriesInPreviousEval [ i ] = from . seriesInPreviousEval [ fi ]
2019-03-15 15:23:36 +00:00
ruleMap [ nameAndLabels ] = indexes [ 1 : ]
2017-05-19 15:43:59 +00:00
ar , ok := rule . ( * AlertingRule )
2015-12-14 16:40:40 +00:00
if ! ok {
continue
}
2017-05-19 15:43:59 +00:00
far , ok := from . rules [ fi ] . ( * AlertingRule )
if ! ok {
continue
}
for fp , a := range far . active {
ar . active [ fp ] = a
2015-12-14 16:40:40 +00:00
}
}
2019-08-07 15:11:05 +00:00
// Handle deleted and unmatched duplicate rules.
g . staleSeries = from . staleSeries
for fi , fromRule := range from . rules {
nameAndLabels := nameAndLabels ( fromRule )
l := ruleMap [ nameAndLabels ]
if len ( l ) != 0 {
for _ , series := range from . seriesInPreviousEval [ fi ] {
g . staleSeries = append ( g . staleSeries , series )
}
}
}
2015-12-14 16:40:40 +00:00
}
2017-06-14 09:37:54 +00:00
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
2017-11-24 07:59:05 +00:00
func ( g * Group ) Eval ( ctx context . Context , ts time . Time ) {
2020-09-25 15:48:38 +00:00
var samplesTotal float64
2017-05-19 12:42:07 +00:00
for i , rule := range g . rules {
2017-06-16 11:27:22 +00:00
select {
case <- g . done :
return
default :
}
2017-06-12 12:44:39 +00:00
func ( i int , rule Rule ) {
2022-01-25 10:08:04 +00:00
ctx , sp := otel . Tracer ( "" ) . Start ( ctx , "rule" )
sp . SetAttributes ( attribute . String ( "name" , rule . Name ( ) ) )
2016-01-21 10:09:24 +00:00
defer func ( t time . Time ) {
2022-01-25 10:08:04 +00:00
sp . End ( )
2018-12-28 10:20:29 +00:00
since := time . Since ( t )
2021-04-30 17:25:34 +00:00
g . metrics . EvalDuration . Observe ( since . Seconds ( ) )
2018-12-28 10:20:29 +00:00
rule . SetEvaluationDuration ( since )
2018-10-12 16:26:59 +00:00
rule . SetEvaluationTimestamp ( t )
2016-01-21 10:09:24 +00:00
} ( time . Now ( ) )
2021-04-30 17:25:34 +00:00
g . metrics . EvalTotal . WithLabelValues ( GroupKey ( g . File ( ) , g . Name ( ) ) ) . Inc ( )
2015-12-14 16:40:40 +00:00
2021-09-15 07:48:26 +00:00
vector , err := rule . Eval ( ctx , ts , g . opts . QueryFunc , g . opts . ExternalURL , g . Limit ( ) )
2015-12-14 16:40:40 +00:00
if err != nil {
2021-03-18 14:44:33 +00:00
rule . SetHealth ( HealthBad )
rule . SetLastError ( err )
2022-05-25 08:06:17 +00:00
sp . SetStatus ( codes . Error , err . Error ( ) )
2021-04-30 17:25:34 +00:00
g . metrics . EvalFailures . WithLabelValues ( GroupKey ( g . File ( ) , g . Name ( ) ) ) . Inc ( )
2021-03-18 14:44:33 +00:00
2016-01-18 15:53:37 +00:00
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
2022-06-17 07:54:25 +00:00
var eqc promql . ErrQueryCanceled
if ! errors . As ( err , & eqc ) {
2022-03-21 18:52:20 +00:00
level . Warn ( g . logger ) . Log ( "name" , rule . Name ( ) , "index" , i , "msg" , "Evaluating rule failed" , "rule" , rule , "err" , err )
2016-01-18 15:53:37 +00:00
}
2016-01-21 10:09:24 +00:00
return
2015-12-14 16:40:40 +00:00
}
2021-08-20 20:42:31 +00:00
rule . SetHealth ( HealthGood )
rule . SetLastError ( nil )
2020-09-25 15:48:38 +00:00
samplesTotal += float64 ( len ( vector ) )
2015-12-14 16:40:40 +00:00
2016-01-21 10:09:24 +00:00
if ar , ok := rule . ( * AlertingRule ) ; ok {
2018-08-28 15:05:00 +00:00
ar . sendAlerts ( ctx , ts , g . opts . ResendDelay , g . interval , g . opts . NotifyFunc )
2015-12-14 16:40:40 +00:00
}
2016-05-19 14:22:49 +00:00
var (
numOutOfOrder = 0
numDuplicates = 0
)
2016-12-24 23:37:46 +00:00
2020-07-24 14:10:51 +00:00
app := g . opts . Appendable . Appender ( ctx )
2017-05-19 12:42:07 +00:00
seriesReturned := make ( map [ string ] labels . Labels , len ( g . seriesInPreviousEval [ i ] ) )
2020-03-13 19:54:47 +00:00
defer func ( ) {
if err := app . Commit ( ) ; err != nil {
2021-03-18 14:44:33 +00:00
rule . SetHealth ( HealthBad )
rule . SetLastError ( err )
2022-05-25 08:06:17 +00:00
sp . SetStatus ( codes . Error , err . Error ( ) )
2021-04-30 17:25:34 +00:00
g . metrics . EvalFailures . WithLabelValues ( GroupKey ( g . File ( ) , g . Name ( ) ) ) . Inc ( )
2021-03-18 14:44:33 +00:00
2022-03-21 18:52:20 +00:00
level . Warn ( g . logger ) . Log ( "name" , rule . Name ( ) , "index" , i , "msg" , "Rule sample appending failed" , "err" , err )
2020-03-13 19:54:47 +00:00
return
}
g . seriesInPreviousEval [ i ] = seriesReturned
} ( )
2021-03-18 14:44:33 +00:00
2015-12-14 16:40:40 +00:00
for _ , s := range vector {
2021-02-18 12:07:00 +00:00
if _ , err := app . Append ( 0 , s . Metric , s . T , s . V ) ; err != nil {
2021-03-18 14:44:33 +00:00
rule . SetHealth ( HealthBad )
rule . SetLastError ( err )
2022-05-25 08:06:17 +00:00
sp . SetStatus ( codes . Error , err . Error ( ) )
2022-06-17 07:54:25 +00:00
unwrappedErr := errors . Unwrap ( err )
switch {
case errors . Is ( unwrappedErr , storage . ErrOutOfOrderSample ) :
2016-05-19 14:22:49 +00:00
numOutOfOrder ++
2022-03-21 18:52:20 +00:00
level . Debug ( g . logger ) . Log ( "name" , rule . Name ( ) , "index" , i , "msg" , "Rule evaluation result discarded" , "err" , err , "sample" , s )
2022-06-17 07:54:25 +00:00
case errors . Is ( unwrappedErr , storage . ErrDuplicateSampleForTimestamp ) :
2016-05-19 14:22:49 +00:00
numDuplicates ++
2022-03-21 18:52:20 +00:00
level . Debug ( g . logger ) . Log ( "name" , rule . Name ( ) , "index" , i , "msg" , "Rule evaluation result discarded" , "err" , err , "sample" , s )
2016-05-19 14:22:49 +00:00
default :
2022-03-21 18:52:20 +00:00
level . Warn ( g . logger ) . Log ( "name" , rule . Name ( ) , "index" , i , "msg" , "Rule evaluation result discarded" , "err" , err , "sample" , s )
2016-05-19 14:22:49 +00:00
}
2017-05-18 15:26:36 +00:00
} else {
2021-11-08 16:52:33 +00:00
buf := [ 1024 ] byte { }
seriesReturned [ string ( s . Metric . Bytes ( buf [ : ] ) ) ] = s . Metric
2016-05-19 14:22:49 +00:00
}
}
if numOutOfOrder > 0 {
2022-03-21 18:52:20 +00:00
level . Warn ( g . logger ) . Log ( "name" , rule . Name ( ) , "index" , i , "msg" , "Error on ingesting out-of-order result from rule evaluation" , "numDropped" , numOutOfOrder )
2016-05-19 14:22:49 +00:00
}
if numDuplicates > 0 {
2022-03-21 18:52:20 +00:00
level . Warn ( g . logger ) . Log ( "name" , rule . Name ( ) , "index" , i , "msg" , "Error on ingesting results from rule evaluation with different value but same timestamp" , "numDropped" , numDuplicates )
2015-12-14 16:40:40 +00:00
}
2017-05-19 12:42:07 +00:00
for metric , lset := range g . seriesInPreviousEval [ i ] {
if _ , ok := seriesReturned [ metric ] ; ! ok {
// Series no longer exposed, mark it stale.
2021-02-18 12:07:00 +00:00
_ , err = app . Append ( 0 , lset , timestamp . FromTime ( ts ) , math . Float64frombits ( value . StaleNaN ) )
2022-06-17 07:54:25 +00:00
unwrappedErr := errors . Unwrap ( err )
switch {
case unwrappedErr == nil :
case errors . Is ( unwrappedErr , storage . ErrOutOfOrderSample ) , errors . Is ( unwrappedErr , storage . ErrDuplicateSampleForTimestamp ) :
2017-05-19 12:42:07 +00:00
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default :
2022-03-21 18:52:20 +00:00
level . Warn ( g . logger ) . Log ( "name" , rule . Name ( ) , "index" , i , "msg" , "Adding stale sample failed" , "sample" , lset . String ( ) , "err" , err )
2017-05-19 12:42:07 +00:00
}
}
}
} ( i , rule )
2015-12-14 16:40:40 +00:00
}
2020-09-25 15:48:38 +00:00
if g . metrics != nil {
2021-04-30 17:25:34 +00:00
g . metrics . GroupSamples . WithLabelValues ( GroupKey ( g . File ( ) , g . Name ( ) ) ) . Set ( samplesTotal )
2020-09-25 15:48:38 +00:00
}
2020-07-24 14:10:51 +00:00
g . cleanupStaleSeries ( ctx , ts )
2020-02-12 15:22:18 +00:00
}
2019-08-07 15:11:05 +00:00
2020-07-24 14:10:51 +00:00
func ( g * Group ) cleanupStaleSeries ( ctx context . Context , ts time . Time ) {
2020-02-12 15:22:18 +00:00
if len ( g . staleSeries ) == 0 {
return
}
2020-07-24 14:10:51 +00:00
app := g . opts . Appendable . Appender ( ctx )
2020-02-12 15:22:18 +00:00
for _ , s := range g . staleSeries {
// Rule that produced series no longer configured, mark it stale.
2021-02-18 12:07:00 +00:00
_ , err := app . Append ( 0 , s , timestamp . FromTime ( ts ) , math . Float64frombits ( value . StaleNaN ) )
2022-06-17 07:54:25 +00:00
unwrappedErr := errors . Unwrap ( err )
switch {
case unwrappedErr == nil :
case errors . Is ( unwrappedErr , storage . ErrOutOfOrderSample ) , errors . Is ( unwrappedErr , storage . ErrDuplicateSampleForTimestamp ) :
2020-02-12 15:22:18 +00:00
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default :
2020-04-11 08:22:18 +00:00
level . Warn ( g . logger ) . Log ( "msg" , "Adding stale sample for previous configuration failed" , "sample" , s , "err" , err )
2019-08-07 15:11:05 +00:00
}
}
2020-02-12 15:22:18 +00:00
if err := app . Commit ( ) ; err != nil {
2020-04-11 08:22:18 +00:00
level . Warn ( g . logger ) . Log ( "msg" , "Stale sample appending for previous configuration failed" , "err" , err )
2020-02-12 15:22:18 +00:00
} else {
g . staleSeries = nil
}
2015-12-14 16:40:40 +00:00
}
2018-08-02 10:18:24 +00:00
// RestoreForState restores the 'for' state of the alerts
// by looking up last ActiveAt from storage.
func ( g * Group ) RestoreForState ( ts time . Time ) {
maxtMS := int64 ( model . TimeFromUnixNano ( ts . UnixNano ( ) ) )
// We allow restoration only if alerts were active before after certain time.
mint := ts . Add ( - g . opts . OutageTolerance )
mintMS := int64 ( model . TimeFromUnixNano ( mint . UnixNano ( ) ) )
2020-06-26 18:06:36 +00:00
q , err := g . opts . Queryable . Querier ( g . opts . Context , mintMS , maxtMS )
2018-08-02 10:18:24 +00:00
if err != nil {
level . Error ( g . logger ) . Log ( "msg" , "Failed to get Querier" , "err" , err )
return
}
2018-12-28 10:20:29 +00:00
defer func ( ) {
if err := q . Close ( ) ; err != nil {
level . Error ( g . logger ) . Log ( "msg" , "Failed to close Querier" , "err" , err )
}
} ( )
2018-08-02 10:18:24 +00:00
for _ , rule := range g . Rules ( ) {
alertRule , ok := rule . ( * AlertingRule )
if ! ok {
continue
}
alertHoldDuration := alertRule . HoldDuration ( )
if alertHoldDuration < g . opts . ForGracePeriod {
// If alertHoldDuration is already less than grace period, we would not
// like to make it wait for `g.opts.ForGracePeriod` time before firing.
// Hence we skip restoration, which will make it wait for alertHoldDuration.
alertRule . SetRestored ( true )
continue
}
alertRule . ForEachActiveAlert ( func ( a * Alert ) {
var s storage . Series
2022-03-29 00:16:46 +00:00
s , err := alertRule . QueryforStateSeries ( a , q )
if err != nil {
2020-06-09 16:57:31 +00:00
// Querier Warnings are ignored. We do not care unless we have an error.
level . Error ( g . logger ) . Log (
"msg" , "Failed to restore 'for' state" ,
labels . AlertName , alertRule . Name ( ) ,
"stage" , "Select" ,
"err" , err ,
)
return
}
2022-03-29 00:16:46 +00:00
if s == nil {
2018-08-02 10:18:24 +00:00
return
}
// Series found for the 'for' state.
var t int64
var v float64
it := s . Iterator ( )
for it . Next ( ) {
t , v = it . At ( )
}
if it . Err ( ) != nil {
level . Error ( g . logger ) . Log ( "msg" , "Failed to restore 'for' state" ,
labels . AlertName , alertRule . Name ( ) , "stage" , "Iterator" , "err" , it . Err ( ) )
return
}
if value . IsStaleNaN ( v ) { // Alert was not active.
return
}
2020-03-29 16:35:39 +00:00
downAt := time . Unix ( t / 1000 , 0 ) . UTC ( )
restoredActiveAt := time . Unix ( int64 ( v ) , 0 ) . UTC ( )
2018-08-02 10:18:24 +00:00
timeSpentPending := downAt . Sub ( restoredActiveAt )
timeRemainingPending := alertHoldDuration - timeSpentPending
if timeRemainingPending <= 0 {
// It means that alert was firing when prometheus went down.
// In the next Eval, the state of this alert will be set back to
// firing again if it's still firing in that Eval.
// Nothing to be done in this case.
} else if timeRemainingPending < g . opts . ForGracePeriod {
// (new) restoredActiveAt = (ts + m.opts.ForGracePeriod) - alertHoldDuration
// /* new firing time */ /* moving back by hold duration */
//
// Proof of correctness:
// firingTime = restoredActiveAt.Add(alertHoldDuration)
// = ts + m.opts.ForGracePeriod - alertHoldDuration + alertHoldDuration
// = ts + m.opts.ForGracePeriod
//
// Time remaining to fire = firingTime.Sub(ts)
// = (ts + m.opts.ForGracePeriod) - ts
// = m.opts.ForGracePeriod
restoredActiveAt = ts . Add ( g . opts . ForGracePeriod ) . Add ( - alertHoldDuration )
} else {
// By shifting ActiveAt to the future (ActiveAt + some_duration),
// the total pending time from the original ActiveAt
// would be `alertHoldDuration + some_duration`.
// Here, some_duration = downDuration.
downDuration := ts . Sub ( downAt )
restoredActiveAt = restoredActiveAt . Add ( downDuration )
}
a . ActiveAt = restoredActiveAt
level . Debug ( g . logger ) . Log ( "msg" , "'for' state restored" ,
labels . AlertName , alertRule . Name ( ) , "restored_time" , a . ActiveAt . Format ( time . RFC850 ) ,
"labels" , a . Labels . String ( ) )
} )
alertRule . SetRestored ( true )
}
}
2019-12-19 10:46:22 +00:00
// Equals return if two groups are the same.
2019-12-19 10:41:11 +00:00
func ( g * Group ) Equals ( ng * Group ) bool {
if g . name != ng . name {
return false
}
if g . file != ng . file {
return false
}
if g . interval != ng . interval {
return false
}
2021-09-15 07:48:26 +00:00
if g . limit != ng . limit {
return false
}
2019-12-19 10:41:11 +00:00
if len ( g . rules ) != len ( ng . rules ) {
return false
}
for i , gr := range g . rules {
if gr . String ( ) != ng . rules [ i ] . String ( ) {
return false
}
}
return true
}
2015-12-14 16:40:40 +00:00
// The Manager manages recording and alerting rules.
type Manager struct {
2018-08-02 10:18:24 +00:00
opts * ManagerOptions
groups map [ string ] * Group
mtx sync . RWMutex
block chan struct { }
2020-02-12 15:22:18 +00:00
done chan struct { }
2018-08-02 10:18:24 +00:00
restored bool
2017-06-16 10:22:44 +00:00
logger log . Logger
2015-12-14 16:40:40 +00:00
}
2013-07-30 15:18:07 +00:00
2017-11-24 07:59:05 +00:00
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
2018-08-04 19:31:12 +00:00
type NotifyFunc func ( ctx context . Context , expr string , alerts ... * Alert )
2017-11-24 07:59:05 +00:00
2015-12-14 16:40:40 +00:00
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
2018-08-02 10:18:24 +00:00
ExternalURL * url . URL
QueryFunc QueryFunc
NotifyFunc NotifyFunc
Context context . Context
2020-02-06 15:58:38 +00:00
Appendable storage . Appendable
2020-06-26 18:06:36 +00:00
Queryable storage . Queryable
2018-08-02 10:18:24 +00:00
Logger log . Logger
Registerer prometheus . Registerer
OutageTolerance time . Duration
ForGracePeriod time . Duration
2018-08-27 16:41:42 +00:00
ResendDelay time . Duration
2020-07-22 14:19:34 +00:00
GroupLoader GroupLoader
2018-12-28 10:20:29 +00:00
2019-01-03 12:07:06 +00:00
Metrics * Metrics
2015-12-14 16:40:40 +00:00
}
2015-03-15 02:36:15 +00:00
2015-12-14 16:40:40 +00:00
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager ( o * ManagerOptions ) * Manager {
2019-01-03 12:07:06 +00:00
if o . Metrics == nil {
o . Metrics = NewGroupMetrics ( o . Registerer )
2018-12-28 10:20:29 +00:00
}
2020-07-22 14:19:34 +00:00
if o . GroupLoader == nil {
o . GroupLoader = FileLoader { }
}
2017-11-30 14:36:34 +00:00
m := & Manager {
2015-12-14 16:40:40 +00:00
groups : map [ string ] * Group { } ,
opts : o ,
2016-01-08 16:51:22 +00:00
block : make ( chan struct { } ) ,
2020-02-12 15:22:18 +00:00
done : make ( chan struct { } ) ,
2017-06-16 10:22:44 +00:00
logger : o . Logger ,
2013-01-07 22:24:26 +00:00
}
2018-12-28 10:20:29 +00:00
2017-11-30 14:36:34 +00:00
return m
2013-01-07 22:24:26 +00:00
}
2020-07-21 22:13:24 +00:00
// Run starts processing of the rule manager. It is blocking.
2016-01-08 16:51:22 +00:00
func ( m * Manager ) Run ( ) {
2022-05-20 21:26:06 +00:00
level . Info ( m . logger ) . Log ( "msg" , "Starting rule manager..." )
2020-07-21 22:13:24 +00:00
m . start ( )
<- m . done
}
func ( m * Manager ) start ( ) {
2016-01-08 16:51:22 +00:00
close ( m . block )
}
2015-12-14 16:40:40 +00:00
// Stop the rule manager's rule evaluation cycles.
func ( m * Manager ) Stop ( ) {
rules/manager.go: Fix race between reload and stop
On one relatively large Prometheus instance (1.7M series), I noticed
that upgrades were frequently resulting in Prometheus undergoing crash
recovery on start-up.
On closer examination, I found that Prometheus was panicking on
shutdown.
It seems that our configuration management (or misconfiguration thereof)
is reloading Prometheus then immediately restarting it, which I suspect
is causing this race:
Sep 21 15:12:42 host systemd[1]: Reloading prometheus monitoring system.
Sep 21 15:12:42 host prometheus[18734]: time="2016-09-21T15:12:42Z" level=info msg="Loading configuration file /etc/prometheus/config.yaml" source="main.go:221"
Sep 21 15:12:42 host systemd[1]: Reloaded prometheus monitoring system.
Sep 21 15:12:44 host systemd[1]: Stopping prometheus monitoring system...
Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=warning msg="Received SIGTERM, exiting gracefully..." source="main.go:203"
Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=info msg="See you next time!" source="main.go:210"
Sep 21 15:12:44 host prometheus[18734]: time="2016-09-21T15:12:44Z" level=info msg="Stopping target manager..." source="targetmanager.go:90"
Sep 21 15:12:52 host prometheus[18734]: time="2016-09-21T15:12:52Z" level=info msg="Checkpointing in-memory metrics and chunks..." source="persistence.go:548"
Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=warning msg="Error on ingesting out-of-order samples" numDropped=1 source="scrape.go:467"
Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=error msg="Error adding file watch for \"/etc/prometheus/targets\": no such file or directory" source="file.go:84"
Sep 21 15:12:56 host prometheus[18734]: time="2016-09-21T15:12:56Z" level=error msg="Error adding file watch for \"/etc/prometheus/targets\": no such file or directory" source="file.go:84"
Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping rule manager..." source="manager.go:366"
Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Rule manager stopped." source="manager.go:372"
Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping notification handler..." source="notifier.go:325"
Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping local storage..." source="storage.go:381"
Sep 21 15:13:01 host prometheus[18734]: time="2016-09-21T15:13:01Z" level=info msg="Stopping maintenance loop..." source="storage.go:383"
Sep 21 15:13:01 host prometheus[18734]: panic: close of closed channel
Sep 21 15:13:01 host prometheus[18734]: goroutine 7686074 [running]:
Sep 21 15:13:01 host prometheus[18734]: panic(0xba57a0, 0xc60c42b500)
Sep 21 15:13:01 host prometheus[18734]: /usr/local/go/src/runtime/panic.go:500 +0x1a1
Sep 21 15:13:01 host prometheus[18734]: github.com/prometheus/prometheus/rules.(*Manager).ApplyConfig.func1(0xc6645a9901, 0xc420271ef0, 0xc420338ed0, 0xc60c42b4f0, 0xc6645a9900)
Sep 21 15:13:01 host prometheus[18734]: /home/build/packages/prometheus/tmp/build/gopath/src/github.com/prometheus/prometheus/rules/manager.go:412 +0x3c
Sep 21 15:13:01 host prometheus[18734]: created by github.com/prometheus/prometheus/rules.(*Manager).ApplyConfig
Sep 21 15:13:01 host prometheus[18734]: /home/build/packages/prometheus/tmp/build/gopath/src/github.com/prometheus/prometheus/rules/manager.go:423 +0x56b
Sep 21 15:13:03 host systemd[1]: prometheus.service: main process exited, code=exited, status=2/INVALIDARGUMENT
2016-09-21 21:03:02 +00:00
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
2017-08-11 18:45:52 +00:00
level . Info ( m . logger ) . Log ( "msg" , "Stopping rule manager..." )
2015-06-30 09:51:05 +00:00
2015-12-14 16:40:40 +00:00
for _ , eg := range m . groups {
eg . stop ( )
2015-06-30 09:51:05 +00:00
}
2020-02-12 15:22:18 +00:00
// Shut down the groups waiting multiple evaluation intervals to write
// staleness markers.
close ( m . done )
2017-08-11 18:45:52 +00:00
level . Info ( m . logger ) . Log ( "msg" , "Rule manager stopped" )
2015-06-30 09:51:05 +00:00
}
2017-11-23 14:48:14 +00:00
// Update the rule manager's state as the config requires. If
2016-07-11 14:24:54 +00:00
// loading the new rules failed the old rule set is restored.
2022-03-29 00:16:46 +00:00
func ( m * Manager ) Update ( interval time . Duration , files [ ] string , externalLabels labels . Labels , externalURL string , ruleGroupPostProcessFunc RuleGroupPostProcessFunc ) error {
2015-12-14 16:40:40 +00:00
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
2015-05-12 14:52:56 +00:00
2022-03-29 00:16:46 +00:00
groups , errs := m . LoadGroups ( interval , externalLabels , externalURL , ruleGroupPostProcessFunc , files ... )
2017-06-16 11:14:33 +00:00
if errs != nil {
for _ , e := range errs {
2017-08-11 18:45:52 +00:00
level . Error ( m . logger ) . Log ( "msg" , "loading groups failed" , "err" , e )
2017-06-16 11:14:33 +00:00
}
return errors . New ( "error loading rules, previous rule set restored" )
2015-05-12 14:52:56 +00:00
}
2018-08-02 10:18:24 +00:00
m . restored = true
2015-06-23 10:07:53 +00:00
2015-12-14 16:40:40 +00:00
var wg sync . WaitGroup
for _ , newg := range groups {
2019-12-19 10:41:11 +00:00
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
2020-09-13 15:07:59 +00:00
gn := GroupKey ( newg . file , newg . name )
2017-11-01 11:58:00 +00:00
oldg , ok := m . groups [ gn ]
delete ( m . groups , gn )
2015-12-14 16:40:40 +00:00
2019-12-19 10:41:11 +00:00
if ok && oldg . Equals ( newg ) {
groups [ gn ] = oldg
continue
}
wg . Add ( 1 )
2020-03-01 18:32:14 +00:00
go func ( newg * Group ) {
2015-12-14 16:40:40 +00:00
if ok {
oldg . stop ( )
2018-07-18 13:14:38 +00:00
newg . CopyState ( oldg )
2015-12-14 16:40:40 +00:00
}
wg . Done ( )
2020-09-21 10:29:03 +00:00
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<- m . block
newg . run ( m . opts . Context )
2020-03-01 18:32:14 +00:00
} ( newg )
2015-12-14 16:40:40 +00:00
}
// Stop remaining old groups.
2020-02-12 15:22:18 +00:00
wg . Add ( len ( m . groups ) )
2020-01-27 12:41:32 +00:00
for n , oldg := range m . groups {
2020-02-12 15:22:18 +00:00
go func ( n string , g * Group ) {
2020-04-18 12:32:18 +00:00
g . markStale = true
g . stop ( )
2020-02-12 15:22:18 +00:00
if m := g . metrics ; m != nil {
2021-04-30 17:25:34 +00:00
m . IterationsMissed . DeleteLabelValues ( n )
m . IterationsScheduled . DeleteLabelValues ( n )
m . EvalTotal . DeleteLabelValues ( n )
m . EvalFailures . DeleteLabelValues ( n )
m . GroupInterval . DeleteLabelValues ( n )
m . GroupLastEvalTime . DeleteLabelValues ( n )
m . GroupLastDuration . DeleteLabelValues ( n )
m . GroupRules . DeleteLabelValues ( n )
m . GroupSamples . DeleteLabelValues ( ( n ) )
2020-02-12 15:22:18 +00:00
}
wg . Done ( )
} ( n , oldg )
2015-12-14 16:40:40 +00:00
}
wg . Wait ( )
m . groups = groups
2016-07-11 14:24:54 +00:00
return nil
2015-05-12 14:52:56 +00:00
}
2020-07-22 14:19:34 +00:00
// GroupLoader is responsible for loading rule groups from arbitrary sources and parsing them.
type GroupLoader interface {
Load ( identifier string ) ( * rulefmt . RuleGroups , [ ] error )
Parse ( query string ) ( parser . Expr , error )
}
// FileLoader is the default GroupLoader implementation. It defers to rulefmt.ParseFile
// and parser.ParseExpr
type FileLoader struct { }
func ( FileLoader ) Load ( identifier string ) ( * rulefmt . RuleGroups , [ ] error ) {
return rulefmt . ParseFile ( identifier )
}
func ( FileLoader ) Parse ( query string ) ( parser . Expr , error ) { return parser . ParseExpr ( query ) }
2018-09-25 16:06:26 +00:00
// LoadGroups reads groups from a list of files.
2019-04-15 16:52:58 +00:00
func ( m * Manager ) LoadGroups (
2022-03-29 00:16:46 +00:00
interval time . Duration , externalLabels labels . Labels , externalURL string , ruleGroupPostProcessFunc RuleGroupPostProcessFunc , filenames ... string ,
2019-04-15 16:52:58 +00:00
) ( map [ string ] * Group , [ ] error ) {
2017-06-12 12:44:39 +00:00
groups := make ( map [ string ] * Group )
2018-08-02 10:18:24 +00:00
shouldRestore := ! m . restored
2015-04-29 09:08:56 +00:00
for _ , fn := range filenames {
2020-07-22 14:19:34 +00:00
rgs , errs := m . opts . GroupLoader . Load ( fn )
2017-06-14 06:49:21 +00:00
if errs != nil {
2017-06-16 11:14:33 +00:00
return nil , errs
2015-04-29 09:08:56 +00:00
}
2015-07-03 12:48:22 +00:00
2017-06-12 12:44:39 +00:00
for _ , rg := range rgs . Groups {
itv := interval
2017-06-16 05:16:21 +00:00
if rg . Interval != 0 {
itv = time . Duration ( rg . Interval )
2017-06-12 12:44:39 +00:00
}
2015-12-14 16:40:40 +00:00
2017-06-14 05:43:00 +00:00
rules := make ( [ ] Rule , 0 , len ( rg . Rules ) )
2017-06-12 12:44:39 +00:00
for _ , r := range rg . Rules {
2020-07-22 14:19:34 +00:00
expr , err := m . opts . GroupLoader . Parse ( r . Expr . Value )
2017-06-12 12:44:39 +00:00
if err != nil {
2022-06-17 07:54:25 +00:00
return nil , [ ] error { fmt . Errorf ( "%s: %w" , fn , err ) }
2017-06-12 12:44:39 +00:00
}
2015-12-14 16:40:40 +00:00
2020-01-15 18:07:54 +00:00
if r . Alert . Value != "" {
2017-06-12 12:44:39 +00:00
rules = append ( rules , NewAlertingRule (
2020-01-15 18:07:54 +00:00
r . Alert . Value ,
2017-06-12 12:44:39 +00:00
expr ,
2017-06-16 05:16:21 +00:00
time . Duration ( r . For ) ,
2017-06-12 12:44:39 +00:00
labels . FromMap ( r . Labels ) ,
labels . FromMap ( r . Annotations ) ,
2019-04-15 16:52:58 +00:00
externalLabels ,
2021-05-31 03:35:26 +00:00
externalURL ,
2018-08-02 10:18:24 +00:00
m . restored ,
2017-08-11 18:45:52 +00:00
log . With ( m . logger , "alert" , r . Alert ) ,
2017-06-12 12:44:39 +00:00
) )
continue
}
rules = append ( rules , NewRecordingRule (
2020-01-15 18:07:54 +00:00
r . Record . Value ,
2017-06-12 12:44:39 +00:00
expr ,
labels . FromMap ( r . Labels ) ,
) )
2015-04-29 09:08:56 +00:00
}
2017-06-12 12:44:39 +00:00
2020-09-13 15:07:59 +00:00
groups [ GroupKey ( fn , rg . Name ) ] = NewGroup ( GroupOptions {
2022-03-29 00:16:46 +00:00
Name : rg . Name ,
File : fn ,
Interval : itv ,
Limit : rg . Limit ,
Rules : rules ,
ShouldRestore : shouldRestore ,
Opts : m . opts ,
done : m . done ,
RuleGroupPostProcessFunc : ruleGroupPostProcessFunc ,
2020-02-12 15:22:18 +00:00
} )
2015-04-29 09:08:56 +00:00
}
}
2015-12-14 16:40:40 +00:00
return groups , nil
2013-01-07 22:24:26 +00:00
}
2013-06-11 09:00:55 +00:00
2020-09-13 15:07:59 +00:00
// GroupKey group names need not be unique across filenames.
func GroupKey ( file , name string ) string {
2020-01-27 12:41:32 +00:00
return file + ";" + name
2017-11-01 11:58:00 +00:00
}
2017-06-14 10:39:14 +00:00
// RuleGroups returns the list of manager's rule groups.
func ( m * Manager ) RuleGroups ( ) [ ] * Group {
m . mtx . RLock ( )
defer m . mtx . RUnlock ( )
rgs := make ( [ ] * Group , 0 , len ( m . groups ) )
for _ , g := range m . groups {
rgs = append ( rgs , g )
}
sort . Slice ( rgs , func ( i , j int ) bool {
2019-02-23 08:51:44 +00:00
if rgs [ i ] . file != rgs [ j ] . file {
return rgs [ i ] . file < rgs [ j ] . file
}
return rgs [ i ] . name < rgs [ j ] . name
2017-06-14 10:39:14 +00:00
} )
return rgs
}
2015-04-29 09:08:56 +00:00
// Rules returns the list of the manager's rules.
2015-04-29 08:26:49 +00:00
func ( m * Manager ) Rules ( ) [ ] Rule {
2015-12-14 16:40:40 +00:00
m . mtx . RLock ( )
defer m . mtx . RUnlock ( )
var rules [ ] Rule
for _ , g := range m . groups {
rules = append ( rules , g . rules ... )
}
2013-06-11 09:00:55 +00:00
return rules
}
2013-06-13 14:10:05 +00:00
2015-04-29 09:08:56 +00:00
// AlertingRules returns the list of the manager's alerting rules.
2015-04-29 08:26:49 +00:00
func ( m * Manager ) AlertingRules ( ) [ ] * AlertingRule {
2015-03-30 17:43:19 +00:00
alerts := [ ] * AlertingRule { }
2015-12-14 16:40:40 +00:00
for _ , rule := range m . Rules ( ) {
2015-03-30 17:43:19 +00:00
if alertingRule , ok := rule . ( * AlertingRule ) ; ok {
2013-06-13 14:10:05 +00:00
alerts = append ( alerts , alertingRule )
}
}
2019-05-14 21:14:27 +00:00
2013-06-13 14:10:05 +00:00
return alerts
}