|
|
|
@ -56,18 +56,13 @@ type Notifier struct {
|
|
|
|
|
queue model.Alerts |
|
|
|
|
opts *Options |
|
|
|
|
|
|
|
|
|
metrics *alertMetrics |
|
|
|
|
|
|
|
|
|
more chan struct{} |
|
|
|
|
mtx sync.RWMutex |
|
|
|
|
ctx context.Context |
|
|
|
|
cancel func() |
|
|
|
|
|
|
|
|
|
latency *prometheus.SummaryVec |
|
|
|
|
errors *prometheus.CounterVec |
|
|
|
|
sent *prometheus.CounterVec |
|
|
|
|
dropped prometheus.Counter |
|
|
|
|
queueLength prometheus.Gauge |
|
|
|
|
queueCapacity prometheus.Metric |
|
|
|
|
|
|
|
|
|
alertmanagers []*alertmanagerSet |
|
|
|
|
cancelDiscovery func() |
|
|
|
|
} |
|
|
|
@ -79,23 +74,21 @@ type Options struct {
|
|
|
|
|
RelabelConfigs []*config.RelabelConfig |
|
|
|
|
// Used for sending HTTP requests to the Alertmanager.
|
|
|
|
|
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// New constructs a new Notifier.
|
|
|
|
|
func New(o *Options) *Notifier { |
|
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
|
|
|
|
|
|
if o.Do == nil { |
|
|
|
|
o.Do = ctxhttp.Do |
|
|
|
|
} |
|
|
|
|
Registerer prometheus.Registerer |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &Notifier{ |
|
|
|
|
queue: make(model.Alerts, 0, o.QueueCapacity), |
|
|
|
|
ctx: ctx, |
|
|
|
|
cancel: cancel, |
|
|
|
|
more: make(chan struct{}, 1), |
|
|
|
|
opts: o, |
|
|
|
|
type alertMetrics struct { |
|
|
|
|
latency *prometheus.SummaryVec |
|
|
|
|
errors *prometheus.CounterVec |
|
|
|
|
sent *prometheus.CounterVec |
|
|
|
|
dropped prometheus.Counter |
|
|
|
|
queueLength prometheus.GaugeFunc |
|
|
|
|
queueCapacity prometheus.Gauge |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen func() float64) *alertMetrics { |
|
|
|
|
m := &alertMetrics{ |
|
|
|
|
latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{ |
|
|
|
|
Namespace: namespace, |
|
|
|
|
Subsystem: subsystem, |
|
|
|
@ -126,22 +119,55 @@ func New(o *Options) *Notifier {
|
|
|
|
|
Name: "dropped_total", |
|
|
|
|
Help: "Total number of alerts dropped due to errors when sending to Alertmanager.", |
|
|
|
|
}), |
|
|
|
|
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
|
|
|
queueLength: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
|
|
|
|
Namespace: namespace, |
|
|
|
|
Subsystem: subsystem, |
|
|
|
|
Name: "queue_length", |
|
|
|
|
Help: "The number of alert notifications in the queue.", |
|
|
|
|
}, queueLen), |
|
|
|
|
queueCapacity: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
|
|
|
Namespace: namespace, |
|
|
|
|
Subsystem: subsystem, |
|
|
|
|
Name: "queue_capacity", |
|
|
|
|
Help: "The capacity of the alert notifications queue.", |
|
|
|
|
}), |
|
|
|
|
queueCapacity: prometheus.MustNewConstMetric( |
|
|
|
|
prometheus.NewDesc( |
|
|
|
|
prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), |
|
|
|
|
"The capacity of the alert notifications queue.", |
|
|
|
|
nil, nil, |
|
|
|
|
), |
|
|
|
|
prometheus.GaugeValue, |
|
|
|
|
float64(o.QueueCapacity), |
|
|
|
|
), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
m.queueCapacity.Set(float64(queueCap)) |
|
|
|
|
|
|
|
|
|
if r != nil { |
|
|
|
|
r.MustRegister( |
|
|
|
|
m.latency, |
|
|
|
|
m.errors, |
|
|
|
|
m.sent, |
|
|
|
|
m.dropped, |
|
|
|
|
m.queueLength, |
|
|
|
|
m.queueCapacity, |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return m |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// New constructs a new Notifier.
|
|
|
|
|
func New(o *Options) *Notifier { |
|
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
|
|
|
|
|
|
if o.Do == nil { |
|
|
|
|
o.Do = ctxhttp.Do |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
n := &Notifier{ |
|
|
|
|
queue: make(model.Alerts, 0, o.QueueCapacity), |
|
|
|
|
ctx: ctx, |
|
|
|
|
cancel: cancel, |
|
|
|
|
more: make(chan struct{}, 1), |
|
|
|
|
opts: o, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
queueLenFunc := func() float64 { return float64(n.queueLen()) } |
|
|
|
|
n.metrics = newAlertMetrics(o.Registerer, o.QueueCapacity, queueLenFunc) |
|
|
|
|
return n |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ApplyConfig updates the status state as the new config requires.
|
|
|
|
@ -160,6 +186,9 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
|
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ams.metrics = n.metrics |
|
|
|
|
|
|
|
|
|
amSets = append(amSets, ams) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -216,7 +245,7 @@ func (n *Notifier) Run() {
|
|
|
|
|
alerts := n.nextBatch() |
|
|
|
|
|
|
|
|
|
if !n.sendAll(alerts...) { |
|
|
|
|
n.dropped.Add(float64(len(alerts))) |
|
|
|
|
n.metrics.dropped.Add(float64(len(alerts))) |
|
|
|
|
} |
|
|
|
|
// If the queue still has items left, kick off the next iteration.
|
|
|
|
|
if n.queueLen() > 0 { |
|
|
|
@ -248,7 +277,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
|
|
|
|
|
alerts = alerts[d:] |
|
|
|
|
|
|
|
|
|
log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d) |
|
|
|
|
n.dropped.Add(float64(d)) |
|
|
|
|
n.metrics.dropped.Add(float64(d)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If the queue is full, remove the oldest alerts in favor
|
|
|
|
@ -257,7 +286,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
|
|
|
|
|
n.queue = n.queue[d:] |
|
|
|
|
|
|
|
|
|
log.Warnf("Alert notification queue full, dropping %d alerts", d) |
|
|
|
|
n.dropped.Add(float64(d)) |
|
|
|
|
n.metrics.dropped.Add(float64(d)) |
|
|
|
|
} |
|
|
|
|
n.queue = append(n.queue, alerts...) |
|
|
|
|
|
|
|
|
@ -339,12 +368,12 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
|
|
|
|
|
|
|
|
|
|
if err := n.sendOne(ctx, ams.client, u, b); err != nil { |
|
|
|
|
log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err) |
|
|
|
|
n.errors.WithLabelValues(u).Inc() |
|
|
|
|
n.metrics.errors.WithLabelValues(u).Inc() |
|
|
|
|
} else { |
|
|
|
|
atomic.AddUint64(&numSuccess, 1) |
|
|
|
|
} |
|
|
|
|
n.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) |
|
|
|
|
n.sent.WithLabelValues(u).Add(float64(len(alerts))) |
|
|
|
|
n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) |
|
|
|
|
n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts))) |
|
|
|
|
|
|
|
|
|
wg.Done() |
|
|
|
|
}(am) |
|
|
|
@ -381,30 +410,6 @@ func (n *Notifier) Stop() {
|
|
|
|
|
n.cancel() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Describe implements prometheus.Collector.
|
|
|
|
|
func (n *Notifier) Describe(ch chan<- *prometheus.Desc) { |
|
|
|
|
n.latency.Describe(ch) |
|
|
|
|
n.errors.Describe(ch) |
|
|
|
|
n.sent.Describe(ch) |
|
|
|
|
|
|
|
|
|
ch <- n.dropped.Desc() |
|
|
|
|
ch <- n.queueLength.Desc() |
|
|
|
|
ch <- n.queueCapacity.Desc() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Collect implements prometheus.Collector.
|
|
|
|
|
func (n *Notifier) Collect(ch chan<- prometheus.Metric) { |
|
|
|
|
n.queueLength.Set(float64(n.queueLen())) |
|
|
|
|
|
|
|
|
|
n.latency.Collect(ch) |
|
|
|
|
n.errors.Collect(ch) |
|
|
|
|
n.sent.Collect(ch) |
|
|
|
|
|
|
|
|
|
ch <- n.dropped |
|
|
|
|
ch <- n.queueLength |
|
|
|
|
ch <- n.queueCapacity |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// alertmanager holds Alertmanager endpoint information.
|
|
|
|
|
type alertmanager interface { |
|
|
|
|
url() string |
|
|
|
@ -430,6 +435,8 @@ type alertmanagerSet struct {
|
|
|
|
|
cfg *config.AlertmanagerConfig |
|
|
|
|
client *http.Client |
|
|
|
|
|
|
|
|
|
metrics *alertMetrics |
|
|
|
|
|
|
|
|
|
mtx sync.RWMutex |
|
|
|
|
ams []alertmanager |
|
|
|
|
} |
|
|
|
@ -474,6 +481,10 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) {
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This will initialise the Counters for the AM to 0.
|
|
|
|
|
s.metrics.sent.WithLabelValues(us) |
|
|
|
|
s.metrics.errors.WithLabelValues(us) |
|
|
|
|
|
|
|
|
|
seen[us] = struct{}{} |
|
|
|
|
s.ams = append(s.ams, am) |
|
|
|
|
} |
|
|
|
|