notifier: optionally drain queued notifications before shutting down (#14290)

* Add draining of queued notifications to `notifier.Manager`

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Update docs

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Address PR feedback

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Add more logging

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Address offline feedback: remove timeout

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Ensure stopping takes priority over further processing, make tests more robust

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Make channel unbuffered

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Update docs

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Fix race in test

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Remove unnecessary context

Signed-off-by: Charles Korn <charles.korn@grafana.com>

* Make Stop safe to call multiple times

Signed-off-by: Charles Korn <charles.korn@grafana.com>

---------

Signed-off-by: Charles Korn <charles.korn@grafana.com>
pull/14351/head
Charles Korn 5 months ago committed by GitHub
parent f24ce00320
commit 2dd07fbb1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -445,6 +445,9 @@ func main() {
serverOnlyFlag(a, "alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). serverOnlyFlag(a, "alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
Default("10000").IntVar(&cfg.notifier.QueueCapacity) Default("10000").IntVar(&cfg.notifier.QueueCapacity)
serverOnlyFlag(a, "alertmanager.drain-notification-queue-on-shutdown", "Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down.").
Default("true").BoolVar(&cfg.notifier.DrainOnShutdown)
// TODO: Remove in Prometheus 3.0. // TODO: Remove in Prometheus 3.0.
alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String() alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String()

@ -50,6 +50,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | | <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
| <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` | | <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` |
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
| <code class="text-nowrap">--alertmanager.drain-notification-queue-on-shutdown</code> | Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` |
| <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |

@ -110,10 +110,11 @@ type Manager struct {
metrics *alertMetrics metrics *alertMetrics
more chan struct{} more chan struct{}
mtx sync.RWMutex mtx sync.RWMutex
ctx context.Context
cancel func() stopOnce *sync.Once
stopRequested chan struct{}
alertmanagers map[string]*alertmanagerSet alertmanagers map[string]*alertmanagerSet
logger log.Logger logger log.Logger
@ -121,9 +122,10 @@ type Manager struct {
// Options are the configurable parameters of a Handler. // Options are the configurable parameters of a Handler.
type Options struct { type Options struct {
QueueCapacity int QueueCapacity int
ExternalLabels labels.Labels DrainOnShutdown bool
RelabelConfigs []*relabel.Config ExternalLabels labels.Labels
RelabelConfigs []*relabel.Config
// Used for sending HTTP requests to the Alertmanager. // Used for sending HTTP requests to the Alertmanager.
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
@ -217,8 +219,6 @@ func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp
// NewManager is the manager constructor. // NewManager is the manager constructor.
func NewManager(o *Options, logger log.Logger) *Manager { func NewManager(o *Options, logger log.Logger) *Manager {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil { if o.Do == nil {
o.Do = do o.Do = do
} }
@ -227,12 +227,12 @@ func NewManager(o *Options, logger log.Logger) *Manager {
} }
n := &Manager{ n := &Manager{
queue: make([]*Alert, 0, o.QueueCapacity), queue: make([]*Alert, 0, o.QueueCapacity),
ctx: ctx, more: make(chan struct{}, 1),
cancel: cancel, stopRequested: make(chan struct{}),
more: make(chan struct{}, 1), stopOnce: &sync.Once{},
opts: o, opts: o,
logger: logger, logger: logger,
} }
queueLenFunc := func() float64 { return float64(n.queueLen()) } queueLenFunc := func() float64 { return float64(n.queueLen()) }
@ -298,40 +298,98 @@ func (n *Manager) nextBatch() []*Alert {
return alerts return alerts
} }
// Run dispatches notifications continuously, returning once Stop has been called and all
// pending notifications have been drained from the queue (if draining is enabled).
//
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
n.targetUpdateLoop(tsets)
}()
go func() {
defer wg.Done()
n.sendLoop()
n.drainQueue()
}()
wg.Wait()
level.Info(n.logger).Log("msg", "Notification manager stopped")
}
// sendLoop continuously consumes the notifications queue and sends alerts to // sendLoop continuously consumes the notifications queue and sends alerts to
// the configured Alertmanagers. // the configured Alertmanagers.
func (n *Manager) sendLoop() { func (n *Manager) sendLoop() {
for { for {
// If we've been asked to stop, that takes priority over sending any further notifications.
select { select {
case <-n.ctx.Done(): case <-n.stopRequested:
return return
case <-n.more: default:
} select {
alerts := n.nextBatch() case <-n.stopRequested:
return
if !n.sendAll(alerts...) { case <-n.more:
n.metrics.dropped.Add(float64(len(alerts))) n.sendOneBatch()
}
// If the queue still has items left, kick off the next iteration. // If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 { if n.queueLen() > 0 {
n.setMore() n.setMore()
}
}
} }
} }
} }
// Run receives updates of target groups and triggers a reload. // targetUpdateLoop receives updates of target groups and triggers a reload.
// The dispatching of notifications occurs in the background to prevent blocking the receipt of target updates. func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) {
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
go n.sendLoop()
for { for {
// If we've been asked to stop, that takes priority over processing any further target group updates.
select { select {
case <-n.ctx.Done(): case <-n.stopRequested:
return return
case ts := <-tsets: default:
n.reload(ts) select {
case <-n.stopRequested:
return
case ts := <-tsets:
n.reload(ts)
}
}
}
}
func (n *Manager) sendOneBatch() {
alerts := n.nextBatch()
if !n.sendAll(alerts...) {
n.metrics.dropped.Add(float64(len(alerts)))
}
}
func (n *Manager) drainQueue() {
if !n.opts.DrainOnShutdown {
if n.queueLen() > 0 {
level.Warn(n.logger).Log("msg", "Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen())
n.metrics.dropped.Add(float64(n.queueLen()))
} }
return
}
level.Info(n.logger).Log("msg", "Draining any remaining notifications...")
for n.queueLen() > 0 {
n.sendOneBatch()
} }
level.Info(n.logger).Log("msg", "Remaining notifications drained")
} }
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
@ -546,7 +604,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
for _, am := range ams.ams { for _, am := range ams.ams {
wg.Add(1) wg.Add(1)
ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ams.cfg.Timeout))
defer cancel() defer cancel()
go func(ctx context.Context, client *http.Client, url string, payload []byte, count int) { go func(ctx context.Context, client *http.Client, url string, payload []byte, count int) {
@ -624,10 +682,19 @@ func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []b
return nil return nil
} }
// Stop shuts down the notification handler. // Stop signals the notification manager to shut down and immediately returns.
//
// Run will return once the notification manager has successfully shut down.
//
// The manager will optionally drain any queued notifications before shutting down.
//
// Stop is safe to call multiple times.
func (n *Manager) Stop() { func (n *Manager) Stop() {
level.Info(n.logger).Log("msg", "Stopping notification manager...") level.Info(n.logger).Log("msg", "Stopping notification manager...")
n.cancel()
n.stopOnce.Do(func() {
close(n.stopRequested)
})
} }
// Alertmanager holds Alertmanager endpoint information. // Alertmanager holds Alertmanager endpoint information.

@ -847,3 +847,173 @@ loop2:
} }
} }
} }
func TestStop_DrainingDisabled(t *testing.T) {
releaseReceiver := make(chan struct{})
receiverReceivedRequest := make(chan struct{}, 2)
alertsReceived := atomic.NewInt64(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Let the test know we've received a request.
receiverReceivedRequest <- struct{}{}
var alerts []*Alert
b, err := io.ReadAll(r.Body)
require.NoError(t, err)
err = json.Unmarshal(b, &alerts)
require.NoError(t, err)
alertsReceived.Add(int64(len(alerts)))
// Wait for the test to release us.
<-releaseReceiver
w.WriteHeader(http.StatusOK)
}))
defer func() {
server.Close()
}()
m := NewManager(
&Options{
QueueCapacity: 10,
DrainOnShutdown: false,
},
nil,
)
m.alertmanagers = make(map[string]*alertmanagerSet)
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(time.Second)
m.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server.URL },
},
},
cfg: &am1Cfg,
}
notificationManagerStopped := make(chan struct{})
go func() {
defer close(notificationManagerStopped)
m.Run(nil)
}()
// Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
select {
case <-receiverReceivedRequest:
// Nothing more to do.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for receiver to receive notification of first alert")
}
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
// Stop the notification manager, pause to allow the shutdown to be observed, and then allow the receiver to proceed.
m.Stop()
time.Sleep(time.Second)
close(releaseReceiver)
// Wait for the notification manager to stop and confirm only the first notification was sent.
// The second notification should be dropped.
select {
case <-notificationManagerStopped:
// Nothing more to do.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for notification manager to stop")
}
require.Equal(t, int64(1), alertsReceived.Load())
}
func TestStop_DrainingEnabled(t *testing.T) {
releaseReceiver := make(chan struct{})
receiverReceivedRequest := make(chan struct{}, 2)
alertsReceived := atomic.NewInt64(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Let the test know we've received a request.
receiverReceivedRequest <- struct{}{}
var alerts []*Alert
b, err := io.ReadAll(r.Body)
require.NoError(t, err)
err = json.Unmarshal(b, &alerts)
require.NoError(t, err)
alertsReceived.Add(int64(len(alerts)))
// Wait for the test to release us.
<-releaseReceiver
w.WriteHeader(http.StatusOK)
}))
defer func() {
server.Close()
}()
m := NewManager(
&Options{
QueueCapacity: 10,
DrainOnShutdown: true,
},
nil,
)
m.alertmanagers = make(map[string]*alertmanagerSet)
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(time.Second)
m.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server.URL },
},
},
cfg: &am1Cfg,
}
notificationManagerStopped := make(chan struct{})
go func() {
defer close(notificationManagerStopped)
m.Run(nil)
}()
// Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
select {
case <-receiverReceivedRequest:
// Nothing more to do.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for receiver to receive notification of first alert")
}
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
// Stop the notification manager and allow the receiver to proceed.
m.Stop()
close(releaseReceiver)
// Wait for the notification manager to stop and confirm both notifications were sent.
select {
case <-notificationManagerStopped:
// Nothing more to do.
case <-time.After(200 * time.Millisecond):
require.FailNow(t, "gave up waiting for notification manager to stop")
}
require.Equal(t, int64(2), alertsReceived.Load())
}

Loading…
Cancel
Save