diff --git a/rules/alerting.go b/rules/alerting.go index bb6f93a6d..4740d3457 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/common/model" + "go.uber.org/atomic" yaml "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/model/labels" @@ -121,17 +122,17 @@ type AlertingRule struct { externalURL string // true if old state has been restored. We start persisting samples for ALERT_FOR_STATE // only after the restoration. - restored bool - // Protects the below. - mtx sync.Mutex + restored *atomic.Bool // Time in seconds taken to evaluate rule. - evaluationDuration time.Duration + evaluationDuration *atomic.Duration // Timestamp of last evaluation of rule. - evaluationTimestamp time.Time + evaluationTimestamp *atomic.Time // The health of the alerting rule. - health RuleHealth + health *atomic.String // The last error seen by the alerting rule. - lastError error + lastError *atomic.Error + // activeMtx Protects the `active` map. + activeMtx sync.Mutex // A map of alerts which are currently active (Pending or Firing), keyed by // the fingerprint of the labelset they correspond to. active map[uint64]*Alert @@ -151,17 +152,20 @@ func NewAlertingRule( } return &AlertingRule{ - name: name, - vector: vec, - holdDuration: hold, - labels: labels, - annotations: annotations, - externalLabels: el, - externalURL: externalURL, - health: HealthUnknown, - active: map[uint64]*Alert{}, - logger: logger, - restored: restored, + name: name, + vector: vec, + holdDuration: hold, + labels: labels, + annotations: annotations, + externalLabels: el, + externalURL: externalURL, + active: map[uint64]*Alert{}, + logger: logger, + restored: atomic.NewBool(restored), + health: atomic.NewString(string(HealthUnknown)), + evaluationTimestamp: atomic.NewTime(time.Time{}), + evaluationDuration: atomic.NewDuration(0), + lastError: atomic.NewError(nil), } } @@ -172,30 +176,22 @@ func (r *AlertingRule) Name() string { // SetLastError sets the current error seen by the alerting rule. func (r *AlertingRule) SetLastError(err error) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.lastError = err + r.lastError.Store(err) } // LastError returns the last error seen by the alerting rule. func (r *AlertingRule) LastError() error { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.lastError + return r.lastError.Load() } // SetHealth sets the current health of the alerting rule. func (r *AlertingRule) SetHealth(health RuleHealth) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.health = health + r.health.Store(string(health)) } // Health returns the current health of the alerting rule. func (r *AlertingRule) Health() RuleHealth { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.health + return RuleHealth(r.health.String()) } // Query returns the query expression of the alerting rule. @@ -283,42 +279,32 @@ func (r *AlertingRule) QueryforStateSeries(alert *Alert, q storage.Querier) (sto // SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. func (r *AlertingRule) SetEvaluationDuration(dur time.Duration) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationDuration = dur + r.evaluationDuration.Store(dur) } // GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule. func (r *AlertingRule) GetEvaluationDuration() time.Duration { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationDuration + return r.evaluationDuration.Load() } // SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated. func (r *AlertingRule) SetEvaluationTimestamp(ts time.Time) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationTimestamp = ts + r.evaluationTimestamp.Store(ts) } // GetEvaluationTimestamp returns the time the evaluation took place. func (r *AlertingRule) GetEvaluationTimestamp() time.Time { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationTimestamp + return r.evaluationTimestamp.Load() } // SetRestored updates the restoration state of the alerting rule. func (r *AlertingRule) SetRestored(restored bool) { - r.restored = restored + r.restored.Store(restored) } // Restored returns the restoration state of the alerting rule. func (r *AlertingRule) Restored() bool { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.restored + return r.restored.Load() } // resolvedRetention is the duration for which a resolved alert instance @@ -333,9 +319,6 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, return nil, err } - r.mtx.Lock() - defer r.mtx.Unlock() - // Create pending alerts for any new vector elements in the alert expression // or update the expression value for existing elements. resultFPs := map[uint64]struct{}{} @@ -407,6 +390,9 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, } } + r.activeMtx.Lock() + defer r.activeMtx.Unlock() + for h, a := range alerts { // Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. @@ -441,7 +427,7 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, a.FiredAt = ts } - if r.restored { + if r.restored.Load() { vec = append(vec, r.sample(a, ts)) vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix()))) } @@ -458,8 +444,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, // State returns the maximum state of alert instances for this rule. // StateFiring > StatePending > StateInactive func (r *AlertingRule) State() AlertState { - r.mtx.Lock() - defer r.mtx.Unlock() + r.activeMtx.Lock() + defer r.activeMtx.Unlock() maxState := StateInactive for _, a := range r.active { @@ -484,8 +470,8 @@ func (r *AlertingRule) ActiveAlerts() []*Alert { // currentAlerts returns all instances of alerts for this rule. This may include // inactive alerts that were previously firing. func (r *AlertingRule) currentAlerts() []*Alert { - r.mtx.Lock() - defer r.mtx.Unlock() + r.activeMtx.Lock() + defer r.activeMtx.Unlock() alerts := make([]*Alert, 0, len(r.active)) @@ -501,8 +487,8 @@ func (r *AlertingRule) currentAlerts() []*Alert { // and not on its copy. // If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'. func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) { - r.mtx.Lock() - defer r.mtx.Unlock() + r.activeMtx.Lock() + defer r.activeMtx.Unlock() for _, a := range r.active { f(a) diff --git a/rules/alerting_test.go b/rules/alerting_test.go index efe24b027..3f3eda12a 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -416,6 +416,83 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) { require.Equal(t, result, filteredRes) } +func TestAlertingRuleQueryInTemplate(t *testing.T) { + suite, err := promql.NewTest(t, ` + load 1m + http_requests{job="app-server", instance="0"} 70 85 70 70 + `) + require.NoError(t, err) + defer suite.Close() + + require.NoError(t, suite.Run()) + + expr, err := parser.ParseExpr(`sum(http_requests) < 100`) + require.NoError(t, err) + + ruleWithQueryInTemplate := NewAlertingRule( + "ruleWithQueryInTemplate", + expr, + time.Minute, + labels.FromStrings("label", "value"), + labels.FromStrings("templated_label", `{{- with "sort(sum(http_requests) by (instance))" | query -}} +{{- range $i,$v := . -}} +instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }}; +{{- end -}} +{{- end -}} +`), + nil, + "", + true, log.NewNopLogger(), + ) + evalTime := time.Unix(0, 0) + + startQueryCh := make(chan struct{}) + getDoneCh := make(chan struct{}) + slowQueryFunc := func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + if q == "sort(sum(http_requests) by (instance))" { + // This is a minimum reproduction of issue 10703, expand template with query. + close(startQueryCh) + select { + case <-getDoneCh: + case <-time.After(time.Millisecond * 10): + // Assert no blocking when template expanding. + require.Fail(t, "unexpected blocking when template expanding.") + } + } + return EngineQueryFunc(suite.QueryEngine(), suite.Storage())(ctx, q, ts) + } + go func() { + <-startQueryCh + _ = ruleWithQueryInTemplate.Health() + _ = ruleWithQueryInTemplate.LastError() + _ = ruleWithQueryInTemplate.GetEvaluationDuration() + _ = ruleWithQueryInTemplate.GetEvaluationTimestamp() + close(getDoneCh) + }() + _, err = ruleWithQueryInTemplate.Eval( + suite.Context(), evalTime, slowQueryFunc, nil, 0, + ) + require.NoError(t, err) +} + +func BenchmarkAlertingRuleAtomicField(b *testing.B) { + b.ReportAllocs() + rule := NewAlertingRule("bench", nil, 0, nil, nil, nil, "", true, nil) + done := make(chan struct{}) + go func() { + for i := 0; i < b.N; i++ { + rule.GetEvaluationTimestamp() + } + close(done) + }() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + rule.SetEvaluationTimestamp(time.Now()) + } + }) + <-done +} + func TestAlertingRuleDuplicate(t *testing.T) { storage := teststorage.New(t) defer storage.Close() diff --git a/rules/manager_test.go b/rules/manager_test.go index f69bf109a..2af757ff2 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/goleak" yaml "gopkg.in/yaml.v2" @@ -1290,17 +1291,20 @@ func TestUpdateMissedEvalMetrics(t *testing.T) { m[1] = activeAlert rule := &AlertingRule{ - name: "HTTPRequestRateLow", - vector: expr, - holdDuration: 5 * time.Minute, - labels: labels.FromStrings("severity", "critical"), - annotations: nil, - externalLabels: nil, - externalURL: "", - health: HealthUnknown, - active: m, - logger: nil, - restored: true, + name: "HTTPRequestRateLow", + vector: expr, + holdDuration: 5 * time.Minute, + labels: labels.FromStrings("severity", "critical"), + annotations: nil, + externalLabels: nil, + externalURL: "", + active: m, + logger: nil, + restored: atomic.NewBool(true), + health: atomic.NewString(string(HealthUnknown)), + evaluationTimestamp: atomic.NewTime(time.Time{}), + evaluationDuration: atomic.NewDuration(0), + lastError: atomic.NewError(nil), } group := NewGroup(GroupOptions{ diff --git a/rules/recording.go b/rules/recording.go index 2ce15979f..6d16a80b0 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -17,9 +17,9 @@ import ( "context" "fmt" "net/url" - "sync" "time" + "go.uber.org/atomic" yaml "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/model/labels" @@ -33,25 +33,26 @@ type RecordingRule struct { name string vector parser.Expr labels labels.Labels - // Protects the below. - mtx sync.Mutex // The health of the recording rule. - health RuleHealth + health *atomic.String // Timestamp of last evaluation of the recording rule. - evaluationTimestamp time.Time + evaluationTimestamp *atomic.Time // The last error seen by the recording rule. - lastError error + lastError *atomic.Error // Duration of how long it took to evaluate the recording rule. - evaluationDuration time.Duration + evaluationDuration *atomic.Duration } // NewRecordingRule returns a new recording rule. func NewRecordingRule(name string, vector parser.Expr, lset labels.Labels) *RecordingRule { return &RecordingRule{ - name: name, - vector: vector, - health: HealthUnknown, - labels: lset, + name: name, + vector: vector, + labels: lset, + health: atomic.NewString(string(HealthUnknown)), + evaluationTimestamp: atomic.NewTime(time.Time{}), + evaluationDuration: atomic.NewDuration(0), + lastError: atomic.NewError(nil), } } @@ -124,56 +125,40 @@ func (rule *RecordingRule) String() string { // SetEvaluationDuration updates evaluationDuration to the time in seconds it took to evaluate the rule on its last evaluation. func (rule *RecordingRule) SetEvaluationDuration(dur time.Duration) { - rule.mtx.Lock() - defer rule.mtx.Unlock() - rule.evaluationDuration = dur + rule.evaluationDuration.Store(dur) } // SetLastError sets the current error seen by the recording rule. func (rule *RecordingRule) SetLastError(err error) { - rule.mtx.Lock() - defer rule.mtx.Unlock() - rule.lastError = err + rule.lastError.Store(err) } // LastError returns the last error seen by the recording rule. func (rule *RecordingRule) LastError() error { - rule.mtx.Lock() - defer rule.mtx.Unlock() - return rule.lastError + return rule.lastError.Load() } // SetHealth sets the current health of the recording rule. func (rule *RecordingRule) SetHealth(health RuleHealth) { - rule.mtx.Lock() - defer rule.mtx.Unlock() - rule.health = health + rule.health.Store(string(health)) } // Health returns the current health of the recording rule. func (rule *RecordingRule) Health() RuleHealth { - rule.mtx.Lock() - defer rule.mtx.Unlock() - return rule.health + return RuleHealth(rule.health.Load()) } // GetEvaluationDuration returns the time in seconds it took to evaluate the recording rule. func (rule *RecordingRule) GetEvaluationDuration() time.Duration { - rule.mtx.Lock() - defer rule.mtx.Unlock() - return rule.evaluationDuration + return rule.evaluationDuration.Load() } // SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated. func (rule *RecordingRule) SetEvaluationTimestamp(ts time.Time) { - rule.mtx.Lock() - defer rule.mtx.Unlock() - rule.evaluationTimestamp = ts + rule.evaluationTimestamp.Store(ts) } // GetEvaluationTimestamp returns the time the evaluation took place. func (rule *RecordingRule) GetEvaluationTimestamp() time.Time { - rule.mtx.Lock() - defer rule.mtx.Unlock() - return rule.evaluationTimestamp + return rule.evaluationTimestamp.Load() }