From 690de487e2fb0be3e3a41121bbe2b9ac3e7a844c Mon Sep 17 00:00:00 2001 From: machine424 Date: Mon, 3 Jun 2024 18:09:51 +0200 Subject: [PATCH] chore(notifier): Split 'Run()' into two goroutines: one to receive target updates and trigger reloads and the other one to send notifications. This is done to prevent the latter operation from blocking/starving the former, as previously, the `tsets` channel was consumed by the same goroutine that consumes and feeds the buffered `n.more` channel, the `tsets` channel was less likely to be ready as it's unbuffered and only fed every `SDManager.updatert` seconds. See https://github.com/prometheus/prometheus/issues/13676 and https://github.com/prometheus/prometheus/issues/8768 The synchronization with the sendLoop goroutine is managed through the n.mtx mutex. This uses a similar approach than scrape manager's https://github.com/prometheus/prometheus/blob/efbd6e41c59ec8d6b7a0791c1fb337fdac53b4f2/scrape/manager.go#L115-L117 The old TestHangingNotifier was replaced by the new one to more closely reflect reality. Signed-off-by: machine424 --- notifier/notifier.go | 35 ++++++----- notifier/notifier_test.go | 123 ++------------------------------------ 2 files changed, 25 insertions(+), 133 deletions(-) diff --git a/notifier/notifier.go b/notifier/notifier.go index 4cf376aa0..a375a0749 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -298,25 +298,14 @@ func (n *Manager) nextBatch() []*Alert { return alerts } -// Run dispatches notifications continuously. -func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { +// sendLoop continuously consumes the notifications queue and sends alerts to +// the configured Alertmanagers. +func (n *Manager) sendLoop() { for { - // The select is split in two parts, such as we will first try to read - // new alertmanager targets if they are available, before sending new - // alerts. select { case <-n.ctx.Done(): return - case ts := <-tsets: - n.reload(ts) - default: - select { - case <-n.ctx.Done(): - return - case ts := <-tsets: - n.reload(ts) - case <-n.more: - } + case <-n.more: } alerts := n.nextBatch() @@ -330,6 +319,21 @@ func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { } } +// Run receives updates of target groups and triggers a reload. +// The dispatching of notifications occurs in the background to prevent blocking the receipt of target updates. +// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. +func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { + go n.sendLoop() + for { + select { + case <-n.ctx.Done(): + return + case ts := <-tsets: + n.reload(ts) + } + } +} + func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { n.mtx.Lock() defer n.mtx.Unlock() @@ -483,6 +487,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { ams.mtx.RLock() + if len(ams.cfg.AlertRelabelConfigs) > 0 { amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) if len(amAlerts) == 0 { diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 5c82decbe..03290a58c 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -701,125 +701,10 @@ func TestLabelsToOpenAPILabelSet(t *testing.T) { require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.FromStrings("aaa", "111", "bbb", "222"))) } -// TestHangingNotifier validates that targets updates happen even when there are -// queued alerts. -func TestHangingNotifier(t *testing.T) { - // Note: When targets are not updated in time, this test is flaky because go - // selects are not deterministic. Therefore we run 10 subtests to run into the issue. - for i := 0; i < 10; i++ { - t.Run(strconv.Itoa(i), func(t *testing.T) { - var ( - done = make(chan struct{}) - changed = make(chan struct{}) - syncCh = make(chan map[string][]*targetgroup.Group) - ) - - defer func() { - close(done) - }() - - var calledOnce bool - // Setting up a bad server. This server hangs for 2 seconds. - badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if calledOnce { - t.Fatal("hanging server called multiple times") - } - calledOnce = true - select { - case <-done: - case <-time.After(2 * time.Second): - } - })) - badURL, err := url.Parse(badServer.URL) - require.NoError(t, err) - badAddress := badURL.Host // Used for __name__ label in targets. - - // Setting up a bad server. This server returns fast, signaling requests on - // by closing the changed channel. - goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - close(changed) - })) - goodURL, err := url.Parse(goodServer.URL) - require.NoError(t, err) - goodAddress := goodURL.Host // Used for __name__ label in targets. - - h := NewManager( - &Options{ - QueueCapacity: 20 * maxBatchSize, - }, - nil, - ) - - h.alertmanagers = make(map[string]*alertmanagerSet) - - am1Cfg := config.DefaultAlertmanagerConfig - am1Cfg.Timeout = model.Duration(200 * time.Millisecond) - - h.alertmanagers["config-0"] = &alertmanagerSet{ - ams: []alertmanager{}, - cfg: &am1Cfg, - metrics: h.metrics, - } - go h.Run(syncCh) - defer h.Stop() - - var alerts []*Alert - for i := range make([]struct{}, 20*maxBatchSize) { - alerts = append(alerts, &Alert{ - Labels: labels.FromStrings("alertname", strconv.Itoa(i)), - }) - } - - // Injecting the hanging server URL. - syncCh <- map[string][]*targetgroup.Group{ - "config-0": { - { - Targets: []model.LabelSet{ - { - model.AddressLabel: model.LabelValue(badAddress), - }, - }, - }, - }, - } - - // Queing alerts. - h.Send(alerts...) - - // Updating with a working alertmanager target. - go func() { - select { - case syncCh <- map[string][]*targetgroup.Group{ - "config-0": { - { - Targets: []model.LabelSet{ - { - model.AddressLabel: model.LabelValue(goodAddress), - }, - }, - }, - }, - }: - case <-done: - } - }() - - select { - case <-time.After(1 * time.Second): - t.Fatalf("Timeout after 1 second, targets not synced in time.") - case <-changed: - // The good server has been hit in less than 3 seconds, therefore - // targets have been updated before a second call could be made to the - // bad server. - } - }) - } -} - -// TODO: renameit and even replace TestHangingNotifier with it. -// TestHangingNotifierXXX ensures that the notifier takes into account SD changes even when there are +// TestHangingNotifier ensures that the notifier takes into account SD changes even when there are // queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676. -func TestHangingNotifierXXX(t *testing.T) { +// and https://github.com/prometheus/prometheus/issues/8768. +func TestHangingNotifier(t *testing.T) { const ( batches = 100 alertsCount = maxBatchSize * batches @@ -857,6 +742,8 @@ func TestHangingNotifierXXX(t *testing.T) { require.NoError(t, err) // Initialize the discovery manager + // This is relevant as the updates aren't sent continually in real life, but only each updatert. + // The old implementation of TestHangingNotifier didn't take that into acount. ctx, cancel := context.WithCancel(context.Background()) defer cancel() reg := prometheus.NewRegistry()