Browse Source

Route different alerts to different alertmanagers

Signed-off-by: m.nabokikh <maksim.nabokikh@flant.com>
pull/12551/head
m.nabokikh 1 year ago
parent
commit
39d008f94f
  1. 8
      config/config.go
  2. 44
      notifier/notifier.go
  3. 157
      notifier/notifier_test.go

8
config/config.go

@ -823,6 +823,8 @@ type AlertmanagerConfig struct {
// List of Alertmanager relabel configurations.
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
// Relabel alerts before sending to the specific alertmanager.
AlertRelabelConfigs []*relabel.Config `yaml:"alert_relabel_configs,omitempty"`
}
// SetDirectory joins any relative file paths with dir.
@ -858,6 +860,12 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
}
}
for _, rlcfg := range c.AlertRelabelConfigs {
if rlcfg == nil {
return errors.New("empty or null Alertmanager alert relabeling rule")
}
}
return nil
}

44
notifier/notifier.go

@ -349,7 +349,7 @@ func (n *Manager) Send(alerts ...*Alert) {
n.mtx.Lock()
defer n.mtx.Unlock()
alerts = n.relabelAlerts(alerts)
alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts)
if len(alerts) == 0 {
return
}
@ -377,20 +377,21 @@ func (n *Manager) Send(alerts ...*Alert) {
n.setMore()
}
// Attach external labels and process relabelling rules.
func (n *Manager) relabelAlerts(alerts []*Alert) []*Alert {
func relabelAlerts(relabelConfigs []*relabel.Config, externalLabels labels.Labels, alerts []*Alert) []*Alert {
lb := labels.NewBuilder(labels.EmptyLabels())
var relabeledAlerts []*Alert
for _, a := range alerts {
lb.Reset(a.Labels)
n.opts.ExternalLabels.Range(func(l labels.Label) {
if a.Labels.Get(l.Name) == "" {
lb.Set(l.Name, l.Value)
}
})
if externalLabels.Len() > 0 {
externalLabels.Range(func(l labels.Label) {
if a.Labels.Get(l.Name) == "" {
lb.Set(l.Name, l.Value)
}
})
}
keep := relabel.ProcessBuilder(lb, n.opts.RelabelConfigs...)
keep := relabel.ProcessBuilder(lb, relabelConfigs...)
if !keep {
continue
}
@ -472,17 +473,30 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
)
for _, ams := range amSets {
var (
payload []byte
err error
payload []byte
err error
amAlerts = alerts
)
ams.mtx.RLock()
if len(ams.cfg.AlertRelabelConfigs) > 0 {
amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts)
// TODO(nabokihms): figure out the right way to cache marshalled alerts.
// Now it works well only for happy cases.
v1Payload = nil
v2Payload = nil
if len(amAlerts) == 0 {
continue
}
}
switch ams.cfg.APIVersion {
case config.AlertmanagerAPIVersionV1:
{
if v1Payload == nil {
v1Payload, err = json.Marshal(alerts)
v1Payload, err = json.Marshal(amAlerts)
if err != nil {
level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v1 failed", "err", err)
ams.mtx.RUnlock()
@ -495,7 +509,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
case config.AlertmanagerAPIVersionV2:
{
if v2Payload == nil {
openAPIAlerts := alertsToOpenAPIAlerts(alerts)
openAPIAlerts := alertsToOpenAPIAlerts(amAlerts)
v2Payload, err = json.Marshal(openAPIAlerts)
if err != nil {
@ -526,13 +540,13 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
go func(client *http.Client, url string) {
if err := n.sendOne(ctx, client, url, payload); err != nil {
level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
level.Error(n.logger).Log("alertmanager", url, "count", len(amAlerts), "msg", "Error sending alert", "err", err)
n.metrics.errors.WithLabelValues(url).Inc()
} else {
numSuccess.Inc()
}
n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds())
n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts)))
n.metrics.sent.WithLabelValues(url).Add(float64(len(amAlerts)))
wg.Done()
}(ams.client, am.url().String())

157
notifier/notifier_test.go

@ -98,6 +98,41 @@ func alertsEqual(a, b []*Alert) error {
return nil
}
func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string, status *atomic.Int32) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
defer func() {
if err == nil {
return
}
select {
case errc <- err:
default:
}
}()
user, pass, _ := r.BasicAuth()
if user != u || pass != p {
err = fmt.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p)
w.WriteHeader(http.StatusInternalServerError)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
err = fmt.Errorf("error reading body: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
var alerts []*Alert
err = json.Unmarshal(b, &alerts)
if err == nil {
err = alertsEqual(*expected, alerts)
}
w.WriteHeader(int(status.Load()))
}))
}
func TestHandlerSendAll(t *testing.T) {
var (
errc = make(chan error, 1)
@ -107,42 +142,8 @@ func TestHandlerSendAll(t *testing.T) {
status1.Store(int32(http.StatusOK))
status2.Store(int32(http.StatusOK))
newHTTPServer := func(u, p string, status *atomic.Int32) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
defer func() {
if err == nil {
return
}
select {
case errc <- err:
default:
}
}()
user, pass, _ := r.BasicAuth()
if user != u || pass != p {
err = fmt.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p)
w.WriteHeader(http.StatusInternalServerError)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
err = fmt.Errorf("error reading body: %w", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
var alerts []*Alert
err = json.Unmarshal(b, &alerts)
if err == nil {
err = alertsEqual(expected, alerts)
}
w.WriteHeader(int(status.Load()))
}))
}
server1 := newHTTPServer("prometheus", "testing_password", &status1)
server2 := newHTTPServer("", "", &status2)
server1 := newTestHTTPServerBuilder(&expected, errc, "prometheus", "testing_password", &status1)
server2 := newTestHTTPServerBuilder(&expected, errc, "", "", &status2)
defer server1.Close()
defer server2.Close()
@ -213,6 +214,90 @@ func TestHandlerSendAll(t *testing.T) {
checkNoErr()
}
func TestHandlerSendAllRemapPerAm(t *testing.T) {
var (
errc = make(chan error, 1)
expected1 = make([]*Alert, 0, maxBatchSize)
expected2 = make([]*Alert, 0, maxBatchSize)
status1, status2 atomic.Int32
)
status1.Store(int32(http.StatusOK))
status2.Store(int32(http.StatusOK))
server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &status1)
server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &status2)
defer server1.Close()
defer server2.Close()
h := NewManager(&Options{}, nil)
h.alertmanagers = make(map[string]*alertmanagerSet)
am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(time.Second)
am2Cfg := config.DefaultAlertmanagerConfig
am2Cfg.Timeout = model.Duration(time.Second)
am2Cfg.AlertRelabelConfigs = []*relabel.Config{
{
SourceLabels: model.LabelNames{"alertnamedrop"},
Action: "drop",
Regex: relabel.MustNewRegexp(".+"),
},
}
h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server1.URL },
},
},
cfg: &am1Cfg,
}
h.alertmanagers["2"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server2.URL },
},
},
cfg: &am2Cfg,
}
for i := range make([]struct{}, maxBatchSize/2) {
h.queue = append(h.queue, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
})
h.queue = append(h.queue, &Alert{
Labels: labels.FromStrings("alertnamedrop", fmt.Sprintf("%d", i)),
})
expected1 = append(expected1, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
})
expected1 = append(expected1, &Alert{
Labels: labels.FromStrings("alertnamedrop", fmt.Sprintf("%d", i)),
})
expected2 = append(expected2, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
})
}
checkNoErr := func() {
t.Helper()
select {
case err := <-errc:
require.NoError(t, err)
default:
}
}
require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
checkNoErr()
}
func TestCustomDo(t *testing.T) {
const testURL = "http://testurl.com/"
const testBody = "testbody"

Loading…
Cancel
Save