Browse Source

chore(notifier): Split 'Run()' into two goroutines: one to receive target updates and trigger reloads and the other one to send notifications.

This is done to prevent the latter operation from blocking/starving the former, as previously, the `tsets` channel was consumed by the same goroutine that consumes and feeds the buffered `n.more` channel, the `tsets` channel was less likely to be ready as it's unbuffered and only fed every `SDManager.updatert` seconds.

See https://github.com/prometheus/prometheus/issues/13676 and https://github.com/prometheus/prometheus/issues/8768

The synchronization with the sendLoop goroutine is managed through the n.mtx mutex.

This uses a similar approach than scrape manager's efbd6e41c5/scrape/manager.go (L115-L117)

The old TestHangingNotifier was replaced by the new one to more closely reflect reality.

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
pull/14313/head
machine424 6 months ago committed by Ayoub Mrini
parent
commit
690de487e2
  1. 35
      notifier/notifier.go
  2. 123
      notifier/notifier_test.go

35
notifier/notifier.go

@ -298,25 +298,14 @@ func (n *Manager) nextBatch() []*Alert {
return alerts
}
// Run dispatches notifications continuously.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
// sendLoop continuously consumes the notifications queue and sends alerts to
// the configured Alertmanagers.
func (n *Manager) sendLoop() {
for {
// The select is split in two parts, such as we will first try to read
// new alertmanager targets if they are available, before sending new
// alerts.
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
default:
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
}
case <-n.more:
}
alerts := n.nextBatch()
@ -330,6 +319,21 @@ func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
}
}
// Run receives updates of target groups and triggers a reload.
// The dispatching of notifications occurs in the background to prevent blocking the receipt of target updates.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
go n.sendLoop()
for {
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
}
}
}
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
n.mtx.Lock()
defer n.mtx.Unlock()
@ -483,6 +487,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
ams.mtx.RLock()
if len(ams.cfg.AlertRelabelConfigs) > 0 {
amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts)
if len(amAlerts) == 0 {

123
notifier/notifier_test.go

@ -701,125 +701,10 @@ func TestLabelsToOpenAPILabelSet(t *testing.T) {
require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.FromStrings("aaa", "111", "bbb", "222")))
}
// TestHangingNotifier validates that targets updates happen even when there are
// queued alerts.
func TestHangingNotifier(t *testing.T) {
// Note: When targets are not updated in time, this test is flaky because go
// selects are not deterministic. Therefore we run 10 subtests to run into the issue.
for i := 0; i < 10; i++ {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var (
done = make(chan struct{})
changed = make(chan struct{})
syncCh = make(chan map[string][]*targetgroup.Group)
)
defer func() {
close(done)
}()
var calledOnce bool
// Setting up a bad server. This server hangs for 2 seconds.
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if calledOnce {
t.Fatal("hanging server called multiple times")
}
calledOnce = true
select {
case <-done:
case <-time.After(2 * time.Second):
}
}))
badURL, err := url.Parse(badServer.URL)
require.NoError(t, err)
badAddress := badURL.Host // Used for __name__ label in targets.
// Setting up a bad server. This server returns fast, signaling requests on
// by closing the changed channel.
goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(changed)
}))
goodURL, err := url.Parse(goodServer.URL)
require.NoError(t, err)
goodAddress := goodURL.Host // Used for __name__ label in targets.
h := NewManager(
&Options{
QueueCapacity: 20 * maxBatchSize,
},
nil,
)
h.alertmanagers = make(map[string]*alertmanagerSet)
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(200 * time.Millisecond)
h.alertmanagers["config-0"] = &alertmanagerSet{
ams: []alertmanager{},
cfg: &am1Cfg,
metrics: h.metrics,
}
go h.Run(syncCh)
defer h.Stop()
var alerts []*Alert
for i := range make([]struct{}, 20*maxBatchSize) {
alerts = append(alerts, &Alert{
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
})
}
// Injecting the hanging server URL.
syncCh <- map[string][]*targetgroup.Group{
"config-0": {
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(badAddress),
},
},
},
},
}
// Queing alerts.
h.Send(alerts...)
// Updating with a working alertmanager target.
go func() {
select {
case syncCh <- map[string][]*targetgroup.Group{
"config-0": {
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(goodAddress),
},
},
},
},
}:
case <-done:
}
}()
select {
case <-time.After(1 * time.Second):
t.Fatalf("Timeout after 1 second, targets not synced in time.")
case <-changed:
// The good server has been hit in less than 3 seconds, therefore
// targets have been updated before a second call could be made to the
// bad server.
}
})
}
}
// TODO: renameit and even replace TestHangingNotifier with it.
// TestHangingNotifierXXX ensures that the notifier takes into account SD changes even when there are
// TestHangingNotifier ensures that the notifier takes into account SD changes even when there are
// queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676.
func TestHangingNotifierXXX(t *testing.T) {
// and https://github.com/prometheus/prometheus/issues/8768.
func TestHangingNotifier(t *testing.T) {
const (
batches = 100
alertsCount = maxBatchSize * batches
@ -857,6 +742,8 @@ func TestHangingNotifierXXX(t *testing.T) {
require.NoError(t, err)
// Initialize the discovery manager
// This is relevant as the updates aren't sent continually in real life, but only each updatert.
// The old implementation of TestHangingNotifier didn't take that into acount.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reg := prometheus.NewRegistry()

Loading…
Cancel
Save