diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index ad1db59d9..54aee4e12 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -26,6 +26,7 @@ import ( "unicode" "github.com/asaskevich/govalidator" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -60,6 +61,9 @@ var cfg = struct { deprecatedMaxChunksToPersist uint64 }{ alertmanagerURLs: stringset{}, + notifier: notifier.Options{ + Registerer: prometheus.DefaultRegisterer, + }, } // Value type for flags that are now unused, but which are kept around to diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 42d17d1a0..3b5ff2626 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -203,7 +203,6 @@ func Main() int { if instrumentedStorage, ok := localStorage.(prometheus.Collector); ok { prometheus.MustRegister(instrumentedStorage) } - prometheus.MustRegister(notifier) prometheus.MustRegister(configSuccess) prometheus.MustRegister(configSuccessTime) diff --git a/notifier/notifier.go b/notifier/notifier.go index 2a582fd61..ecb28f489 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -56,18 +56,13 @@ type Notifier struct { queue model.Alerts opts *Options + metrics *alertMetrics + more chan struct{} mtx sync.RWMutex ctx context.Context cancel func() - latency *prometheus.SummaryVec - errors *prometheus.CounterVec - sent *prometheus.CounterVec - dropped prometheus.Counter - queueLength prometheus.Gauge - queueCapacity prometheus.Metric - alertmanagers []*alertmanagerSet cancelDiscovery func() } @@ -79,23 +74,21 @@ type Options struct { RelabelConfigs []*config.RelabelConfig // Used for sending HTTP requests to the Alertmanager. Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) -} -// New constructs a new Notifier. -func New(o *Options) *Notifier { - ctx, cancel := context.WithCancel(context.Background()) - - if o.Do == nil { - o.Do = ctxhttp.Do - } + Registerer prometheus.Registerer +} - return &Notifier{ - queue: make(model.Alerts, 0, o.QueueCapacity), - ctx: ctx, - cancel: cancel, - more: make(chan struct{}, 1), - opts: o, +type alertMetrics struct { + latency *prometheus.SummaryVec + errors *prometheus.CounterVec + sent *prometheus.CounterVec + dropped prometheus.Counter + queueLength prometheus.GaugeFunc + queueCapacity prometheus.Gauge +} +func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen func() float64) *alertMetrics { + m := &alertMetrics{ latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, @@ -126,22 +119,55 @@ func New(o *Options) *Notifier { Name: "dropped_total", Help: "Total number of alerts dropped due to errors when sending to Alertmanager.", }), - queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ + queueLength: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_length", Help: "The number of alert notifications in the queue.", + }, queueLen), + queueCapacity: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_capacity", + Help: "The capacity of the alert notifications queue.", }), - queueCapacity: prometheus.MustNewConstMetric( - prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), - "The capacity of the alert notifications queue.", - nil, nil, - ), - prometheus.GaugeValue, - float64(o.QueueCapacity), - ), } + + m.queueCapacity.Set(float64(queueCap)) + + if r != nil { + r.MustRegister( + m.latency, + m.errors, + m.sent, + m.dropped, + m.queueLength, + m.queueCapacity, + ) + } + + return m +} + +// New constructs a new Notifier. +func New(o *Options) *Notifier { + ctx, cancel := context.WithCancel(context.Background()) + + if o.Do == nil { + o.Do = ctxhttp.Do + } + + n := &Notifier{ + queue: make(model.Alerts, 0, o.QueueCapacity), + ctx: ctx, + cancel: cancel, + more: make(chan struct{}, 1), + opts: o, + } + + queueLenFunc := func() float64 { return float64(n.queueLen()) } + n.metrics = newAlertMetrics(o.Registerer, o.QueueCapacity, queueLenFunc) + return n } // ApplyConfig updates the status state as the new config requires. @@ -160,6 +186,9 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { if err != nil { return err } + + ams.metrics = n.metrics + amSets = append(amSets, ams) } @@ -216,7 +245,7 @@ func (n *Notifier) Run() { alerts := n.nextBatch() if !n.sendAll(alerts...) { - n.dropped.Add(float64(len(alerts))) + n.metrics.dropped.Add(float64(len(alerts))) } // If the queue still has items left, kick off the next iteration. if n.queueLen() > 0 { @@ -248,7 +277,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) { alerts = alerts[d:] log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d) - n.dropped.Add(float64(d)) + n.metrics.dropped.Add(float64(d)) } // If the queue is full, remove the oldest alerts in favor @@ -257,7 +286,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) { n.queue = n.queue[d:] log.Warnf("Alert notification queue full, dropping %d alerts", d) - n.dropped.Add(float64(d)) + n.metrics.dropped.Add(float64(d)) } n.queue = append(n.queue, alerts...) @@ -339,12 +368,12 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool { if err := n.sendOne(ctx, ams.client, u, b); err != nil { log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err) - n.errors.WithLabelValues(u).Inc() + n.metrics.errors.WithLabelValues(u).Inc() } else { atomic.AddUint64(&numSuccess, 1) } - n.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) - n.sent.WithLabelValues(u).Add(float64(len(alerts))) + n.metrics.latency.WithLabelValues(u).Observe(time.Since(begin).Seconds()) + n.metrics.sent.WithLabelValues(u).Add(float64(len(alerts))) wg.Done() }(am) @@ -381,30 +410,6 @@ func (n *Notifier) Stop() { n.cancel() } -// Describe implements prometheus.Collector. -func (n *Notifier) Describe(ch chan<- *prometheus.Desc) { - n.latency.Describe(ch) - n.errors.Describe(ch) - n.sent.Describe(ch) - - ch <- n.dropped.Desc() - ch <- n.queueLength.Desc() - ch <- n.queueCapacity.Desc() -} - -// Collect implements prometheus.Collector. -func (n *Notifier) Collect(ch chan<- prometheus.Metric) { - n.queueLength.Set(float64(n.queueLen())) - - n.latency.Collect(ch) - n.errors.Collect(ch) - n.sent.Collect(ch) - - ch <- n.dropped - ch <- n.queueLength - ch <- n.queueCapacity -} - // alertmanager holds Alertmanager endpoint information. type alertmanager interface { url() string @@ -430,6 +435,8 @@ type alertmanagerSet struct { cfg *config.AlertmanagerConfig client *http.Client + metrics *alertMetrics + mtx sync.RWMutex ams []alertmanager } @@ -474,6 +481,10 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) { continue } + // This will initialise the Counters for the AM to 0. + s.metrics.sent.WithLabelValues(us) + s.metrics.errors.WithLabelValues(us) + seen[us] = struct{}{} s.ams = append(s.ams, am) }