diff --git a/discovery/manager.go b/discovery/manager.go index f14071af3..897d7d151 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -120,6 +120,16 @@ func Name(n string) func(*Manager) { } } +// Updatert sets the updatert of the manager. +// Used to speed up tests. +func Updatert(u time.Duration) func(*Manager) { + return func(m *Manager) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.updatert = u + } +} + // HTTPClientOptions sets the list of HTTP client options to expose to // Discoverers. It is up to Discoverers to choose to use the options provided. func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) { diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index d2e72ca33..5c82decbe 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -26,13 +26,17 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/prometheus/alertmanager/api/v2/models" + "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "gopkg.in/yaml.v2" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" @@ -811,3 +815,148 @@ func TestHangingNotifier(t *testing.T) { }) } } + +// TODO: renameit and even replace TestHangingNotifier with it. +// TestHangingNotifierXXX ensures that the notifier takes into account SD changes even when there are +// queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676. +func TestHangingNotifierXXX(t *testing.T) { + const ( + batches = 100 + alertsCount = maxBatchSize * batches + ) + + var ( + sendTimeout = 10 * time.Millisecond + sdUpdatert = sendTimeout / 2 + + done = make(chan struct{}) + ) + + defer func() { + close(done) + }() + + // Set up a faulty Alertmanager. + var faultyCalled atomic.Bool + faultyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + faultyCalled.Store(true) + select { + case <-done: + case <-time.After(time.Hour): + } + })) + faultyURL, err := url.Parse(faultyServer.URL) + require.NoError(t, err) + + // Set up a functional Alertmanager. + var functionalCalled atomic.Bool + functionalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + functionalCalled.Store(true) + })) + functionalURL, err := url.Parse(functionalServer.URL) + require.NoError(t, err) + + // Initialize the discovery manager + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + reg := prometheus.NewRegistry() + sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) + require.NoError(t, err) + sdManager := discovery.NewManager( + ctx, + log.NewNopLogger(), + reg, + sdMetrics, + discovery.Name("sd-manager"), + discovery.Updatert(sdUpdatert), + ) + go sdManager.Run() + + // Set up the notifier with both faulty and functional Alertmanagers. + notifier := NewManager( + &Options{ + QueueCapacity: alertsCount, + }, + nil, + ) + notifier.alertmanagers = make(map[string]*alertmanagerSet) + amCfg := config.DefaultAlertmanagerConfig + amCfg.Timeout = model.Duration(sendTimeout) + notifier.alertmanagers["config-0"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return faultyURL.String() }, + }, + alertmanagerMock{ + urlf: func() string { return functionalURL.String() }, + }, + }, + cfg: &amCfg, + metrics: notifier.metrics, + } + go notifier.Run(sdManager.SyncCh()) + defer notifier.Stop() + + require.Len(t, notifier.Alertmanagers(), 2) + + // Enqueue the alerts. + var alerts []*Alert + for i := range make([]struct{}, alertsCount) { + alerts = append(alerts, &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }) + } + notifier.Send(alerts...) + + // Wait for the Alertmanagers to start receiving alerts. + // 10*sdUpdatert is used as an arbitrary timeout here. + timeout := time.After(10 * sdUpdatert) +loop1: + for { + select { + case <-timeout: + t.Fatalf("Timeout waiting for the alertmanagers to be reached for the first time.") + default: + if faultyCalled.Load() && functionalCalled.Load() { + break loop1 + } + } + } + + // Request to remove the faulty Alertmanager. + c := map[string]discovery.Configs{ + "config-0": { + discovery.StaticConfig{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue(functionalURL.Host), + }, + }, + }, + }, + }, + } + require.NoError(t, sdManager.ApplyConfig(c)) + + // The notifier should not wait until the alerts queue is empty to apply the discovery changes + // A faulty Alertmanager could cause each alert sending cycle to take up to AlertmanagerConfig.Timeout + // The queue may never be emptied, as the arrival rate could be larger than the departure rate + // It could even overflow and alerts could be dropped. + timeout = time.After(batches * sendTimeout) +loop2: + for { + select { + case <-timeout: + t.Fatalf("Timeout, the faulty alertmanager not removed on time.") + default: + // The faulty alertmanager was dropped. + if len(notifier.Alertmanagers()) == 1 { + // Prevent from TOCTOU. + require.Positive(t, notifier.queueLen()) + break loop2 + } + require.Positive(t, notifier.queueLen(), "The faulty alertmanager wasn't dropped before the alerts queue was emptied.") + } + } +}