@ -20,6 +20,7 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -38,8 +39,9 @@ const (
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "notifications"
namespace = "prometheus"
subsystem = "notifications"
alertmanagerLabel = "alertmanager"
)
// Notifier is responsible for dispatching alert notifications to an
@ -53,20 +55,20 @@ type Notifier struct {
ctx context . Context
cancel func ( )
latency prometheus . Summary
errors prometheus . Counter
latency * prometheus . SummaryVec
errors * prometheus . CounterVec
sent * prometheus . CounterVec
dropped prometheus . Counter
sent prometheus . Counter
queueLength prometheus . Gauge
queueCapacity prometheus . Metric
}
// Options are the configurable parameters of a Handler.
type Options struct {
AlertmanagerURL string
QueueCapacity int
Timeout time . Duration
ExternalLabels model . LabelSet
AlertmanagerURLs [ ] string
QueueCapacity int
Timeout time . Duration
ExternalLabels model . LabelSet
}
// New constructs a neww Notifier.
@ -80,24 +82,30 @@ func New(o *Options) *Notifier {
more : make ( chan struct { } , 1 ) ,
opts : o ,
latency : prometheus . NewSummary ( prometheus . SummaryOpts {
latency : prometheus . NewSummaryVec ( prometheus . SummaryOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "latency_seconds" ,
Help : "Latency quantiles for sending alert notifications (not including dropped notifications)." ,
} ) ,
errors : prometheus . NewCounter ( prometheus . CounterOpts {
} ,
[ ] string { alertmanagerLabel } ,
) ,
errors : prometheus . NewCounterVec ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "errors_total" ,
Help : "Total number of errors sending alert notifications." ,
} ) ,
sent : prometheus . NewCounter ( prometheus . CounterOpts {
} ,
[ ] string { alertmanagerLabel } ,
) ,
sent : prometheus . NewCounterVec ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "sent_total" ,
Help : "Total number of alerts successfully sent." ,
} ) ,
} ,
[ ] string { alertmanagerLabel } ,
) ,
dropped : prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
@ -160,9 +168,11 @@ func (n *Notifier) nextBatch() []*model.Alert {
// Run dispatches notifications continuously.
func ( n * Notifier ) Run ( ) {
numAMs := len ( n . opts . AlertmanagerURLs )
// Just warn once in the beginning to prevent noisy logs.
if n . opts . AlertmanagerURL == "" {
log . Warnf ( "No AlertManager configured, not dispatching any alerts" )
if numAMs == 0 {
log . Warnf ( "No AlertManagers configured, not dispatching any alerts" )
return
}
for {
@ -171,28 +181,21 @@ func (n *Notifier) Run() {
return
case <- n . more :
}
alerts := n . nextBatch ( )
if len ( alerts ) == 0 {
continue
}
if n . opts . AlertmanagerURL == "" {
n . dropped . Add ( float64 ( len ( alerts ) ) )
continue
}
begin := time . Now ( )
if numAMs > 0 {
if err := n . send ( alerts ... ) ; err != nil {
log . Errorf ( "Error sending %d alerts: %s" , len ( alerts ) , err )
n . errors . Inc ( )
if len ( alerts ) > 0 {
numErrors := n . sendAll ( alerts ... )
// Increment the dropped counter if we could not send
// successfully to a single AlertManager.
if numErrors == numAMs {
n . dropped . Add ( float64 ( len ( alerts ) ) )
}
}
} else {
n . dropped . Add ( float64 ( len ( alerts ) ) )
}
n . latency . Observe ( float64 ( time . Since ( begin ) ) / float64 ( time . Second ) )
n . sent . Add ( float64 ( len ( alerts ) ) )
// If the queue still has items left, kick off the next iteration.
if n . queueLen ( ) > 0 {
n . setMore ( )
@ -239,11 +242,15 @@ func (n *Notifier) setMore() {
}
}
func ( n * Notifier ) postURL ( ) string {
return strings . TrimRight ( n . opts . AlertmanagerURL , "/" ) + alertPushEndpoint
func postURL ( u string ) string {
return strings . TrimRight ( u , "/" ) + alertPushEndpoint
}
func ( n * Notifier ) send ( alerts ... * model . Alert ) error {
// sendAll sends the alerts to all configured Alertmanagers at concurrently.
// It returns the number of sends that have failed.
func ( n * Notifier ) sendAll ( alerts ... * model . Alert ) int {
begin := time . Now ( )
// Attach external labels before sending alerts.
for _ , a := range alerts {
for ln , lv := range n . opts . ExternalLabels {
@ -253,36 +260,62 @@ func (n *Notifier) send(alerts ...*model.Alert) error {
}
}
var buf bytes . Buffer
if err := json . NewEncoder ( & buf ) . Encode ( alerts ) ; err != nil {
return err
b , err := json . Marshal ( alerts )
if err != nil {
log . Errorf ( "Encoding alerts failed: %s" , err )
return len ( n . opts . AlertmanagerURLs )
}
ctx , _ := context . WithTimeout ( context . Background ( ) , n . opts . Timeout )
resp , err := ctxhttp . Post ( ctx , http . DefaultClient , n . postURL ( ) , contentTypeJSON , & buf )
if err != nil {
send := func ( u string ) error {
resp , err := ctxhttp . Post ( ctx , http . DefaultClient , postURL ( u ) , contentTypeJSON , bytes . NewReader ( b ) )
if err != nil {
return err
}
defer resp . Body . Close ( )
if resp . StatusCode / 100 != 2 {
return fmt . Errorf ( "bad response status %v" , resp . Status )
}
return err
}
defer resp . Body . Close ( )
if resp . StatusCode / 100 != 2 {
return fmt . Errorf ( "bad response status %v" , resp . Status )
var (
wg sync . WaitGroup
numErrors uint64
)
for _ , u := range n . opts . AlertmanagerURLs {
wg . Add ( 1 )
go func ( u string ) {
if err := send ( u ) ; err != nil {
log . With ( "alertmanager" , u ) . With ( "count" , fmt . Sprintf ( "%d" , len ( alerts ) ) ) . Errorf ( "Error sending alerts: %s" , err )
n . errors . WithLabelValues ( u ) . Inc ( )
atomic . AddUint64 ( & numErrors , 1 )
}
n . latency . WithLabelValues ( u ) . Observe ( float64 ( time . Since ( begin ) ) / float64 ( time . Second ) )
n . sent . WithLabelValues ( u ) . Add ( float64 ( len ( alerts ) ) )
wg . Done ( )
} ( u )
}
return nil
wg . Wait ( )
return int ( numErrors )
}
// Stop shuts down the notification handler.
func ( n * Notifier ) Stop ( ) {
log . Info ( "Stopping notification handler..." )
n . cancel ( )
}
// Describe implements prometheus.Collector.
func ( n * Notifier ) Describe ( ch chan <- * prometheus . Desc ) {
ch <- n . latency . Desc ( )
ch <- n . errors . Desc ( )
ch <- n . sent . Desc ( )
n . latency . Describe ( ch )
n . errors . Describe ( ch )
n . sent . Describe ( ch )
ch <- n . dropped . Desc ( )
ch <- n . queueLength . Desc ( )
ch <- n . queueCapacity . Desc ( )
@ -292,9 +325,10 @@ func (n *Notifier) Describe(ch chan<- *prometheus.Desc) {
func ( n * Notifier ) Collect ( ch chan <- prometheus . Metric ) {
n . queueLength . Set ( float64 ( n . queueLen ( ) ) )
ch <- n . latency
ch <- n . errors
ch <- n . sent
n . latency . Collect ( ch )
n . errors . Collect ( ch )
n . sent . Collect ( ch )
ch <- n . dropped
ch <- n . queueLength
ch <- n . queueCapacity