From f9bbad1148db0300977cd666a76a9d5609c884b6 Mon Sep 17 00:00:00 2001 From: Julien Date: Fri, 27 Sep 2024 13:51:50 +0200 Subject: [PATCH] Limit the number of SSE Subscribers to 16 by default Signed-off-by: Julien --- cmd/prometheus/main.go | 52 ++++++++++--------- docs/command-line/prometheus.md | 1 + web/api/notifications.go | 25 +++++---- web/api/notifications_test.go | 47 ++++++++++++++--- web/api/v1/api.go | 10 ++-- .../src/components/NotificationsProvider.tsx | 3 +- web/web.go | 2 +- 7 files changed, 94 insertions(+), 46 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index dd068b86c..f39eba3c3 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -135,24 +135,25 @@ func agentOnlyFlag(app *kingpin.Application, name, help string) *kingpin.FlagCla type flagConfig struct { configFile string - agentStoragePath string - serverStoragePath string - notifier notifier.Options - forGracePeriod model.Duration - outageTolerance model.Duration - resendDelay model.Duration - maxConcurrentEvals int64 - web web.Options - scrape scrape.Options - tsdb tsdbOptions - agent agentOptions - lookbackDelta model.Duration - webTimeout model.Duration - queryTimeout model.Duration - queryConcurrency int - queryMaxSamples int - RemoteFlushDeadline model.Duration - nameEscapingScheme string + agentStoragePath string + serverStoragePath string + notifier notifier.Options + forGracePeriod model.Duration + outageTolerance model.Duration + resendDelay model.Duration + maxConcurrentEvals int64 + web web.Options + scrape scrape.Options + tsdb tsdbOptions + agent agentOptions + lookbackDelta model.Duration + webTimeout model.Duration + queryTimeout model.Duration + queryConcurrency int + queryMaxSamples int + RemoteFlushDeadline model.Duration + nameEscapingScheme string + maxNotificationsSubscribers int enableAutoReload bool autoReloadInterval model.Duration @@ -274,17 +275,13 @@ func main() { ) } - notifs := api.NewNotifications(prometheus.DefaultRegisterer) - cfg := flagConfig{ notifier: notifier.Options{ Registerer: prometheus.DefaultRegisterer, }, web: web.Options{ - Registerer: prometheus.DefaultRegisterer, - Gatherer: prometheus.DefaultGatherer, - NotificationsSub: notifs.Sub, - NotificationsGetter: notifs.Get, + Registerer: prometheus.DefaultRegisterer, + Gatherer: prometheus.DefaultGatherer, }, promlogConfig: promlog.Config{}, } @@ -319,6 +316,9 @@ func main() { a.Flag("web.max-connections", "Maximum number of simultaneous connections across all listeners."). Default("512").IntVar(&cfg.web.MaxConnections) + a.Flag("web.max-notifications-subscribers", "Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close."). + Default("16").IntVar(&cfg.maxNotificationsSubscribers) + a.Flag("web.external-url", "The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically."). PlaceHolder("").StringVar(&cfg.prometheusURL) @@ -500,6 +500,10 @@ func main() { logger := promlog.New(&cfg.promlogConfig) + notifs := api.NewNotifications(cfg.maxNotificationsSubscribers, prometheus.DefaultRegisterer) + cfg.web.NotificationsSub = notifs.Sub + cfg.web.NotificationsGetter = notifs.Get + if err := cfg.setFeatureListOptions(logger); err != nil { fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err)) os.Exit(1) diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 7737b5021..eacb45ad0 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -21,6 +21,7 @@ The Prometheus monitoring server | --web.config.file | [EXPERIMENTAL] Path to configuration file that can enable TLS or authentication. | | | --web.read-timeout | Maximum duration before timing out read of the request, and closing idle connections. | `5m` | | --web.max-connections | Maximum number of simultaneous connections across all listeners. | `512` | +| --web.max-notifications-subscribers | Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close. | `16` | | --web.external-url | The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically. | | | --web.route-prefix | Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url. | | | --web.user-assets | Path to static asset directory, available at /user. | | diff --git a/web/api/notifications.go b/web/api/notifications.go index 47f29f6eb..976f0b076 100644 --- a/web/api/notifications.go +++ b/web/api/notifications.go @@ -34,9 +34,10 @@ type Notification struct { // Notifications stores a list of Notification objects. // It also manages live subscribers that receive notifications via channels. type Notifications struct { - mu sync.Mutex - notifications []Notification - subscribers map[chan Notification]struct{} // Active subscribers. + mu sync.Mutex + notifications []Notification + subscribers map[chan Notification]struct{} // Active subscribers. + maxSubscribers int subscriberGauge prometheus.Gauge notificationsSent prometheus.Counter @@ -44,9 +45,10 @@ type Notifications struct { } // NewNotifications creates a new Notifications instance. -func NewNotifications(reg prometheus.Registerer) *Notifications { +func NewNotifications(maxSubscribers int, reg prometheus.Registerer) *Notifications { n := &Notifications{ - subscribers: make(map[chan Notification]struct{}), + subscribers: make(map[chan Notification]struct{}), + maxSubscribers: maxSubscribers, subscriberGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "prometheus", Subsystem: "api", @@ -147,10 +149,16 @@ func (n *Notifications) Get() []Notification { // Sub allows a client to subscribe to live notifications. // It returns a channel where the subscriber will receive notifications and a function to unsubscribe. // Each subscriber has its own goroutine to handle notifications and prevent blocking. -func (n *Notifications) Sub() (<-chan Notification, func()) { +func (n *Notifications) Sub() (<-chan Notification, func(), bool) { + n.mu.Lock() + defer n.mu.Unlock() + + if len(n.subscribers) >= n.maxSubscribers { + return nil, nil, false + } + ch := make(chan Notification, 10) // Buffered channel to prevent blocking. - n.mu.Lock() // Add the new subscriber to the list. n.subscribers[ch] = struct{}{} n.subscriberGauge.Set(float64(len(n.subscribers))) @@ -159,7 +167,6 @@ func (n *Notifications) Sub() (<-chan Notification, func()) { for _, notification := range n.notifications { ch <- notification } - n.mu.Unlock() // Unsubscribe function to remove the channel from subscribers. unsubscribe := func() { @@ -172,5 +179,5 @@ func (n *Notifications) Sub() (<-chan Notification, func()) { n.subscriberGauge.Set(float64(len(n.subscribers))) } - return ch, unsubscribe + return ch, unsubscribe, true } diff --git a/web/api/notifications_test.go b/web/api/notifications_test.go index 7aa596163..437ff1ec4 100644 --- a/web/api/notifications_test.go +++ b/web/api/notifications_test.go @@ -23,7 +23,7 @@ import ( // TestNotificationLifecycle tests adding, modifying, and deleting notifications. func TestNotificationLifecycle(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Add a notification. notifs.AddNotification("Test Notification 1") @@ -47,10 +47,11 @@ func TestNotificationLifecycle(t *testing.T) { // TestSubscriberReceivesNotifications tests that a subscriber receives notifications, including modifications and deletions. func TestSubscriberReceivesNotifications(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe to notifications. - sub, unsubscribe := notifs.Sub() + sub, unsubscribe, ok := notifs.Sub() + require.True(t, ok) var wg sync.WaitGroup wg.Add(1) @@ -103,12 +104,14 @@ func TestSubscriberReceivesNotifications(t *testing.T) { // TestMultipleSubscribers tests that multiple subscribers receive notifications independently. func TestMultipleSubscribers(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe two subscribers to notifications. - sub1, unsubscribe1 := notifs.Sub() + sub1, unsubscribe1, ok1 := notifs.Sub() + require.True(t, ok1) - sub2, unsubscribe2 := notifs.Sub() + sub2, unsubscribe2, ok2 := notifs.Sub() + require.True(t, ok2) var wg sync.WaitGroup wg.Add(2) @@ -157,10 +160,11 @@ func TestMultipleSubscribers(t *testing.T) { // TestUnsubscribe tests that unsubscribing prevents further notifications from being received. func TestUnsubscribe(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe to notifications. - sub, unsubscribe := notifs.Sub() + sub, unsubscribe, ok := notifs.Sub() + require.True(t, ok) var wg sync.WaitGroup wg.Add(1) @@ -190,3 +194,30 @@ func TestUnsubscribe(t *testing.T) { require.Len(t, receivedNotifications, 1, "Expected 1 notification before unsubscribe.") require.Equal(t, "Test Notification 1", receivedNotifications[0].Text, "Unexpected notification text.") } + +// TestMaxSubscribers tests that exceeding the max subscribers limit prevents additional subscriptions. +func TestMaxSubscribers(t *testing.T) { + maxSubscribers := 2 + notifs := NewNotifications(maxSubscribers, nil) + + // Subscribe the maximum number of subscribers. + _, unsubscribe1, ok1 := notifs.Sub() + require.True(t, ok1, "Expected first subscription to succeed.") + + _, unsubscribe2, ok2 := notifs.Sub() + require.True(t, ok2, "Expected second subscription to succeed.") + + // Try to subscribe more than the max allowed. + _, _, ok3 := notifs.Sub() + require.False(t, ok3, "Expected third subscription to fail due to max subscriber limit.") + + // Unsubscribe one subscriber and try again. + unsubscribe1() + + _, unsubscribe4, ok4 := notifs.Sub() + require.True(t, ok4, "Expected subscription to succeed after unsubscribing a subscriber.") + + // Clean up the subscriptions. + unsubscribe2() + unsubscribe4() +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 5eadbdbe7..4589e14e0 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -215,7 +215,7 @@ type API struct { isAgent bool statsRenderer StatsRenderer notificationsGetter func() []api.Notification - notificationsSub func() (<-chan api.Notification, func()) + notificationsSub func() (<-chan api.Notification, func(), bool) remoteWriteHandler http.Handler remoteReadHandler http.Handler @@ -250,7 +250,7 @@ func NewAPI( runtimeInfo func() (RuntimeInfo, error), buildInfo *PrometheusVersion, notificationsGetter func() []api.Notification, - notificationsSub func() (<-chan api.Notification, func()), + notificationsSub func() (<-chan api.Notification, func(), bool), gatherer prometheus.Gatherer, registerer prometheus.Registerer, statsRenderer StatsRenderer, @@ -1690,7 +1690,11 @@ func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "keep-alive") // Subscribe to notifications. - notifications, unsubscribe := api.notificationsSub() + notifications, unsubscribe, ok := api.notificationsSub() + if !ok { + w.WriteHeader(http.StatusNoContent) + return + } defer unsubscribe() // Set up a flusher to push the response to the client. diff --git a/web/ui/mantine-ui/src/components/NotificationsProvider.tsx b/web/ui/mantine-ui/src/components/NotificationsProvider.tsx index 73de54131..44510061e 100644 --- a/web/ui/mantine-ui/src/components/NotificationsProvider.tsx +++ b/web/ui/mantine-ui/src/components/NotificationsProvider.tsx @@ -42,7 +42,8 @@ export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ eventSource.onerror = () => { eventSource.close(); - setIsConnectionError(true); + // We do not call setIsConnectionError(true), we only set it to true if + // the fallback API does not work either. setShouldFetchFromAPI(true); }; diff --git a/web/web.go b/web/web.go index 87e4164c5..724ca9105 100644 --- a/web/web.go +++ b/web/web.go @@ -268,7 +268,7 @@ type Options struct { Notifier *notifier.Manager Version *PrometheusVersion NotificationsGetter func() []api.Notification - NotificationsSub func() (<-chan api.Notification, func()) + NotificationsSub func() (<-chan api.Notification, func(), bool) Flags map[string]string ListenAddresses []string