Notifier: fix deadlock when zero alerts

When all alerts were dropped after alert relabeling, the `sendAll()`
function didn't release the lock properly which created a deadlock with
the Alertmanager target discovery.

In addition, the commit detects early when there are no Alertmanager
endpoint to notify to avoid unnecessary work.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
pull/13861/head
Simon Pasquier 8 months ago
parent 855b5ac4b8
commit 8bd6ae1b20

@ -471,6 +471,10 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
numSuccess atomic.Uint64 numSuccess atomic.Uint64
) )
for _, ams := range amSets { for _, ams := range amSets {
if len(ams.ams) == 0 {
continue
}
var ( var (
payload []byte payload []byte
err error err error
@ -482,6 +486,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
if len(ams.cfg.AlertRelabelConfigs) > 0 { if len(ams.cfg.AlertRelabelConfigs) > 0 {
amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts)
if len(amAlerts) == 0 { if len(amAlerts) == 0 {
ams.mtx.RUnlock()
continue continue
} }
// We can't use the cached values from previous iteration. // We can't use the cached values from previous iteration.

@ -219,17 +219,19 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
errc = make(chan error, 1) errc = make(chan error, 1)
expected1 = make([]*Alert, 0, maxBatchSize) expected1 = make([]*Alert, 0, maxBatchSize)
expected2 = make([]*Alert, 0, maxBatchSize) expected2 = make([]*Alert, 0, maxBatchSize)
expected3 = make([]*Alert, 0)
status1, status2 atomic.Int32 statusOK atomic.Int32
) )
status1.Store(int32(http.StatusOK)) statusOK.Store(int32(http.StatusOK))
status2.Store(int32(http.StatusOK))
server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &status1) server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &statusOK)
server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &status2) server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &statusOK)
server3 := newTestHTTPServerBuilder(&expected3, errc, "", "", &statusOK)
defer server1.Close() defer server1.Close()
defer server2.Close() defer server2.Close()
defer server3.Close()
h := NewManager(&Options{}, nil) h := NewManager(&Options{}, nil)
h.alertmanagers = make(map[string]*alertmanagerSet) h.alertmanagers = make(map[string]*alertmanagerSet)
@ -247,38 +249,68 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
}, },
} }
h.alertmanagers["1"] = &alertmanagerSet{ am3Cfg := config.DefaultAlertmanagerConfig
ams: []alertmanager{ am3Cfg.Timeout = model.Duration(time.Second)
alertmanagerMock{ am3Cfg.AlertRelabelConfigs = []*relabel.Config{
urlf: func() string { return server1.URL }, {
}, SourceLabels: model.LabelNames{"alertname"},
Action: "drop",
Regex: relabel.MustNewRegexp(".+"),
}, },
cfg: &am1Cfg,
} }
h.alertmanagers["2"] = &alertmanagerSet{ h.alertmanagers = map[string]*alertmanagerSet{
ams: []alertmanager{ // Drop no alerts.
alertmanagerMock{ "1": {
urlf: func() string { return server2.URL }, ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server1.URL },
},
}, },
cfg: &am1Cfg,
},
// Drop only alerts with the "alertnamedrop" label.
"2": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server2.URL },
},
},
cfg: &am2Cfg,
},
// Drop all alerts.
"3": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server3.URL },
},
},
cfg: &am3Cfg,
},
// Empty list of Alertmanager endpoints.
"4": {
ams: []alertmanager{},
cfg: &config.DefaultAlertmanagerConfig,
}, },
cfg: &am2Cfg,
} }
for i := range make([]struct{}, maxBatchSize/2) { for i := range make([]struct{}, maxBatchSize/2) {
h.queue = append(h.queue, &Alert{ h.queue = append(h.queue,
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), &Alert{
}) Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
h.queue = append(h.queue, &Alert{ },
Labels: labels.FromStrings("alertnamedrop", fmt.Sprintf("%d", i)), &Alert{
}) Labels: labels.FromStrings("alertname", "test", "alertnamedrop", fmt.Sprintf("%d", i)),
},
)
expected1 = append(expected1, &Alert{ expected1 = append(expected1,
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), &Alert{
}) Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
expected1 = append(expected1, &Alert{ }, &Alert{
Labels: labels.FromStrings("alertnamedrop", fmt.Sprintf("%d", i)), Labels: labels.FromStrings("alertname", "test", "alertnamedrop", fmt.Sprintf("%d", i)),
}) },
)
expected2 = append(expected2, &Alert{ expected2 = append(expected2, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)), Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
@ -296,6 +328,13 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
checkNoErr() checkNoErr()
// Verify that individual locks are released.
for k := range h.alertmanagers {
h.alertmanagers[k].mtx.Lock()
h.alertmanagers[k].ams = nil
h.alertmanagers[k].mtx.Unlock()
}
} }
func TestCustomDo(t *testing.T) { func TestCustomDo(t *testing.T) {

Loading…
Cancel
Save