mirror of https://github.com/prometheus/prometheus
Browse Source
This commit introduces a new `/api/v1/notifications/live` endpoint that utilizes Server-Sent Events (SSE) to stream notifications to the web UI. This is used to display alerts such as when a configuration reload has failed. I opted for SSE over WebSockets because SSE is simpler to implement and more robust for our use case. Since we only need one-way communication from the server to the client, SSE fits perfectly without the overhead of establishing and maintaining a two-way WebSocket connection. When the SSE connection fails, we go back to a classic /api/v1/notifications API endpoint. This commit also contains the required UI changes for the new Mantine UI. Signed-off-by: Julien <roidelapluie@o11y.eu>pull/14946/head
Julien
2 months ago
12 changed files with 668 additions and 69 deletions
@ -0,0 +1,176 @@
|
||||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api |
||||
|
||||
import ( |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
) |
||||
|
||||
const ( |
||||
ConfigurationUnsuccessful = "Configuration reload has failed." |
||||
) |
||||
|
||||
// Notification represents an individual notification message.
|
||||
type Notification struct { |
||||
Text string `json:"text"` |
||||
Date time.Time `json:"date"` |
||||
Active bool `json:"active"` |
||||
} |
||||
|
||||
// Notifications stores a list of Notification objects.
|
||||
// It also manages live subscribers that receive notifications via channels.
|
||||
type Notifications struct { |
||||
mu sync.Mutex |
||||
notifications []Notification |
||||
subscribers map[chan Notification]struct{} // Active subscribers.
|
||||
|
||||
subscriberGauge prometheus.Gauge |
||||
notificationsSent prometheus.Counter |
||||
notificationsDropped prometheus.Counter |
||||
} |
||||
|
||||
// NewNotifications creates a new Notifications instance.
|
||||
func NewNotifications(reg prometheus.Registerer) *Notifications { |
||||
n := &Notifications{ |
||||
subscribers: make(map[chan Notification]struct{}), |
||||
subscriberGauge: prometheus.NewGauge(prometheus.GaugeOpts{ |
||||
Namespace: "prometheus", |
||||
Subsystem: "api", |
||||
Name: "notification_active_subscribers", |
||||
Help: "The current number of active notification subscribers.", |
||||
}), |
||||
notificationsSent: prometheus.NewCounter(prometheus.CounterOpts{ |
||||
Namespace: "prometheus", |
||||
Subsystem: "api", |
||||
Name: "notification_updates_sent_total", |
||||
Help: "Total number of notification updates sent.", |
||||
}), |
||||
notificationsDropped: prometheus.NewCounter(prometheus.CounterOpts{ |
||||
Namespace: "prometheus", |
||||
Subsystem: "api", |
||||
Name: "notification_updates_dropped_total", |
||||
Help: "Total number of notification updates dropped.", |
||||
}), |
||||
} |
||||
|
||||
if reg != nil { |
||||
reg.MustRegister(n.subscriberGauge, n.notificationsSent, n.notificationsDropped) |
||||
} |
||||
|
||||
return n |
||||
} |
||||
|
||||
// AddNotification adds a new notification or updates the timestamp if it already exists.
|
||||
func (n *Notifications) AddNotification(text string) { |
||||
n.mu.Lock() |
||||
defer n.mu.Unlock() |
||||
|
||||
for i, notification := range n.notifications { |
||||
if notification.Text == text { |
||||
n.notifications[i].Date = time.Now() |
||||
|
||||
n.notifySubscribers(n.notifications[i]) |
||||
return |
||||
} |
||||
} |
||||
|
||||
newNotification := Notification{ |
||||
Text: text, |
||||
Date: time.Now(), |
||||
Active: true, |
||||
} |
||||
n.notifications = append(n.notifications, newNotification) |
||||
|
||||
n.notifySubscribers(newNotification) |
||||
} |
||||
|
||||
// notifySubscribers sends a notification to all active subscribers.
|
||||
func (n *Notifications) notifySubscribers(notification Notification) { |
||||
for sub := range n.subscribers { |
||||
// Non-blocking send to avoid subscriber blocking issues.
|
||||
n.notificationsSent.Inc() |
||||
select { |
||||
case sub <- notification: |
||||
// Notification sent to the subscriber.
|
||||
default: |
||||
// Drop the notification if the subscriber's channel is full.
|
||||
n.notificationsDropped.Inc() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// DeleteNotification removes the first notification that matches the provided text.
|
||||
// The deleted notification is sent to subscribers with Active: false before being removed.
|
||||
func (n *Notifications) DeleteNotification(text string) { |
||||
n.mu.Lock() |
||||
defer n.mu.Unlock() |
||||
|
||||
// Iterate through the notifications to find the matching text.
|
||||
for i, notification := range n.notifications { |
||||
if notification.Text == text { |
||||
// Mark the notification as inactive and notify subscribers.
|
||||
notification.Active = false |
||||
n.notifySubscribers(notification) |
||||
|
||||
// Remove the notification from the list.
|
||||
n.notifications = append(n.notifications[:i], n.notifications[i+1:]...) |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Get returns a copy of the list of notifications for safe access outside the struct.
|
||||
func (n *Notifications) Get() []Notification { |
||||
n.mu.Lock() |
||||
defer n.mu.Unlock() |
||||
|
||||
// Return a copy of the notifications slice to avoid modifying the original slice outside.
|
||||
notificationsCopy := make([]Notification, len(n.notifications)) |
||||
copy(notificationsCopy, n.notifications) |
||||
return notificationsCopy |
||||
} |
||||
|
||||
// Sub allows a client to subscribe to live notifications.
|
||||
// It returns a channel where the subscriber will receive notifications and a function to unsubscribe.
|
||||
// Each subscriber has its own goroutine to handle notifications and prevent blocking.
|
||||
func (n *Notifications) Sub() (<-chan Notification, func()) { |
||||
ch := make(chan Notification, 10) // Buffered channel to prevent blocking.
|
||||
|
||||
n.mu.Lock() |
||||
// Add the new subscriber to the list.
|
||||
n.subscribers[ch] = struct{}{} |
||||
n.subscriberGauge.Set(float64(len(n.subscribers))) |
||||
|
||||
// Send all current notifications to the new subscriber.
|
||||
for _, notification := range n.notifications { |
||||
ch <- notification |
||||
} |
||||
n.mu.Unlock() |
||||
|
||||
// Unsubscribe function to remove the channel from subscribers.
|
||||
unsubscribe := func() { |
||||
n.mu.Lock() |
||||
defer n.mu.Unlock() |
||||
|
||||
// Close the channel and remove it from the subscribers map.
|
||||
close(ch) |
||||
delete(n.subscribers, ch) |
||||
n.subscriberGauge.Set(float64(len(n.subscribers))) |
||||
} |
||||
|
||||
return ch, unsubscribe |
||||
} |
@ -0,0 +1,192 @@
|
||||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api |
||||
|
||||
import ( |
||||
"sync" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
// TestNotificationLifecycle tests adding, modifying, and deleting notifications.
|
||||
func TestNotificationLifecycle(t *testing.T) { |
||||
notifs := NewNotifications(nil) |
||||
|
||||
// Add a notification.
|
||||
notifs.AddNotification("Test Notification 1") |
||||
|
||||
// Check if the notification was added.
|
||||
notifications := notifs.Get() |
||||
require.Len(t, notifications, 1, "Expected 1 notification after addition.") |
||||
require.Equal(t, "Test Notification 1", notifications[0].Text, "Notification text mismatch.") |
||||
require.True(t, notifications[0].Active, "Expected notification to be active.") |
||||
|
||||
// Modify the notification.
|
||||
notifs.AddNotification("Test Notification 1") |
||||
notifications = notifs.Get() |
||||
require.Len(t, notifications, 1, "Expected 1 notification after modification.") |
||||
|
||||
// Delete the notification.
|
||||
notifs.DeleteNotification("Test Notification 1") |
||||
notifications = notifs.Get() |
||||
require.Empty(t, notifications, "Expected no notifications after deletion.") |
||||
} |
||||
|
||||
// TestSubscriberReceivesNotifications tests that a subscriber receives notifications, including modifications and deletions.
|
||||
func TestSubscriberReceivesNotifications(t *testing.T) { |
||||
notifs := NewNotifications(nil) |
||||
|
||||
// Subscribe to notifications.
|
||||
sub, unsubscribe := notifs.Sub() |
||||
|
||||
var wg sync.WaitGroup |
||||
wg.Add(1) |
||||
|
||||
receivedNotifications := make([]Notification, 0) |
||||
|
||||
// Goroutine to listen for notifications.
|
||||
go func() { |
||||
defer wg.Done() |
||||
for notification := range sub { |
||||
receivedNotifications = append(receivedNotifications, notification) |
||||
} |
||||
}() |
||||
|
||||
// Add notifications.
|
||||
notifs.AddNotification("Test Notification 1") |
||||
notifs.AddNotification("Test Notification 2") |
||||
|
||||
// Modify a notification.
|
||||
notifs.AddNotification("Test Notification 1") |
||||
|
||||
// Delete a notification.
|
||||
notifs.DeleteNotification("Test Notification 2") |
||||
|
||||
// Wait for notifications to propagate.
|
||||
time.Sleep(100 * time.Millisecond) |
||||
|
||||
unsubscribe() |
||||
wg.Wait() // Wait for the subscriber goroutine to finish.
|
||||
|
||||
// Verify that we received the expected number of notifications.
|
||||
require.Len(t, receivedNotifications, 4, "Expected 4 notifications (2 active, 1 modified, 1 deleted).") |
||||
|
||||
// Check the content and state of received notifications.
|
||||
expected := []struct { |
||||
Text string |
||||
Active bool |
||||
}{ |
||||
{"Test Notification 1", true}, |
||||
{"Test Notification 2", true}, |
||||
{"Test Notification 1", true}, |
||||
{"Test Notification 2", false}, |
||||
} |
||||
|
||||
for i, n := range receivedNotifications { |
||||
require.Equal(t, expected[i].Text, n.Text, "Notification text mismatch at index %d.", i) |
||||
require.Equal(t, expected[i].Active, n.Active, "Notification active state mismatch at index %d.", i) |
||||
} |
||||
} |
||||
|
||||
// TestMultipleSubscribers tests that multiple subscribers receive notifications independently.
|
||||
func TestMultipleSubscribers(t *testing.T) { |
||||
notifs := NewNotifications(nil) |
||||
|
||||
// Subscribe two subscribers to notifications.
|
||||
sub1, unsubscribe1 := notifs.Sub() |
||||
|
||||
sub2, unsubscribe2 := notifs.Sub() |
||||
|
||||
var wg sync.WaitGroup |
||||
wg.Add(2) |
||||
|
||||
receivedSub1 := make([]Notification, 0) |
||||
receivedSub2 := make([]Notification, 0) |
||||
|
||||
// Goroutine for subscriber 1.
|
||||
go func() { |
||||
defer wg.Done() |
||||
for notification := range sub1 { |
||||
receivedSub1 = append(receivedSub1, notification) |
||||
} |
||||
}() |
||||
|
||||
// Goroutine for subscriber 2.
|
||||
go func() { |
||||
defer wg.Done() |
||||
for notification := range sub2 { |
||||
receivedSub2 = append(receivedSub2, notification) |
||||
} |
||||
}() |
||||
|
||||
// Add and delete notifications.
|
||||
notifs.AddNotification("Test Notification 1") |
||||
notifs.DeleteNotification("Test Notification 1") |
||||
|
||||
// Wait for notifications to propagate.
|
||||
time.Sleep(100 * time.Millisecond) |
||||
|
||||
// Unsubscribe both.
|
||||
unsubscribe1() |
||||
unsubscribe2() |
||||
|
||||
wg.Wait() |
||||
|
||||
// Both subscribers should have received the same 2 notifications.
|
||||
require.Len(t, receivedSub1, 2, "Expected 2 notifications for subscriber 1.") |
||||
require.Len(t, receivedSub2, 2, "Expected 2 notifications for subscriber 2.") |
||||
|
||||
// Verify that both subscribers received the same notifications.
|
||||
for i := 0; i < 2; i++ { |
||||
require.Equal(t, receivedSub1[i], receivedSub2[i], "Subscriber notification mismatch at index %d.", i) |
||||
} |
||||
} |
||||
|
||||
// TestUnsubscribe tests that unsubscribing prevents further notifications from being received.
|
||||
func TestUnsubscribe(t *testing.T) { |
||||
notifs := NewNotifications(nil) |
||||
|
||||
// Subscribe to notifications.
|
||||
sub, unsubscribe := notifs.Sub() |
||||
|
||||
var wg sync.WaitGroup |
||||
wg.Add(1) |
||||
|
||||
receivedNotifications := make([]Notification, 0) |
||||
|
||||
// Goroutine to listen for notifications.
|
||||
go func() { |
||||
defer wg.Done() |
||||
for notification := range sub { |
||||
receivedNotifications = append(receivedNotifications, notification) |
||||
} |
||||
}() |
||||
|
||||
// Add a notification and then unsubscribe.
|
||||
notifs.AddNotification("Test Notification 1") |
||||
time.Sleep(100 * time.Millisecond) // Allow time for notification delivery.
|
||||
unsubscribe() // Unsubscribe.
|
||||
|
||||
// Add another notification after unsubscribing.
|
||||
notifs.AddNotification("Test Notification 2") |
||||
|
||||
// Wait for the subscriber goroutine to finish.
|
||||
wg.Wait() |
||||
|
||||
// Only the first notification should have been received.
|
||||
require.Len(t, receivedNotifications, 1, "Expected 1 notification before unsubscribe.") |
||||
require.Equal(t, "Test Notification 1", receivedNotifications[0].Text, "Unexpected notification text.") |
||||
} |
@ -0,0 +1,8 @@
|
||||
export interface Notification { |
||||
text: string; |
||||
date: string; |
||||
active: boolean; |
||||
modified: boolean; |
||||
} |
||||
|
||||
export type NotificationsResult = Notification[]; |
@ -0,0 +1,62 @@
|
||||
import { ActionIcon, Indicator, Popover, Card, Text, Stack, ScrollArea, Group } from "@mantine/core"; |
||||
import { IconBell, IconAlertTriangle, IconNetworkOff } from "@tabler/icons-react"; |
||||
import { useNotifications } from '../state/useNotifications'; |
||||
import { actionIconStyle } from "../styles"; |
||||
import { useSettings } from '../state/settingsSlice'; |
||||
import { formatTimestamp } from "../lib/formatTime"; |
||||
|
||||
const NotificationsIcon = () => { |
||||
const { notifications, isConnectionError } = useNotifications(); |
||||
const { useLocalTime } = useSettings(); |
||||
|
||||
return ( |
||||
(notifications.length === 0 && !isConnectionError) ? null : ( |
||||
<Indicator |
||||
color={"red"} |
||||
size={16} |
||||
label={isConnectionError ? "!" : notifications.length} |
||||
> |
||||
<Popover position="bottom-end" shadow="md" withArrow> |
||||
<Popover.Target> |
||||
<ActionIcon color="gray" title="Notifications" aria-label="Notifications" size={32}> |
||||
<IconBell style={actionIconStyle}/> |
||||
</ActionIcon> |
||||
</Popover.Target> |
||||
|
||||
<Popover.Dropdown> |
||||
<Stack gap="xs"> |
||||
<Text fw={700} size="xs" color="dimmed" ta="center">Notifications</Text> |
||||
<ScrollArea.Autosize mah={200}> |
||||
{ isConnectionError ? ( |
||||
<Card p="xs" color="red"> |
||||
<Group wrap="nowrap"> |
||||
<IconNetworkOff color="red" size={20} /> |
||||
<Stack gap="0"> |
||||
<Text size="sm" fw={500}>Real-time notifications interrupted.</Text> |
||||
<Text size="xs" color="dimmed">Please refresh the page or check your connection.</Text> |
||||
</Stack> |
||||
</Group> |
||||
</Card> |
||||
) : notifications.length === 0 ? ( |
||||
<Text ta="center" color="dimmed">No notifications</Text> |
||||
) : (notifications.map((notification, index) => ( |
||||
<Card key={index} p="xs"> |
||||
<Group wrap="nowrap"> |
||||
<IconAlertTriangle color="red" size={20} /> |
||||
<Stack style={{ maxWidth: 250 }} gap={0}> |
||||
<Text size="sm" fw={500}>{notification.text}</Text> |
||||
<Text size="xs" color="dimmed">{formatTimestamp(new Date(notification.date).valueOf() / 1000, useLocalTime)}</Text> |
||||
</Stack> |
||||
</Group> |
||||
</Card> |
||||
)))} |
||||
</ScrollArea.Autosize> |
||||
</Stack> |
||||
</Popover.Dropdown> |
||||
</Popover> |
||||
</Indicator> |
||||
) |
||||
); |
||||
}; |
||||
|
||||
export default NotificationsIcon; |
@ -0,0 +1,61 @@
|
||||
import React, { useEffect, useState } from 'react'; |
||||
import { useSettings } from '../state/settingsSlice'; |
||||
import { NotificationsContext } from '../state/useNotifications'; |
||||
import { Notification, NotificationsResult } from "../api/responseTypes/notifications"; |
||||
import { useAPIQuery } from '../api/api'; |
||||
|
||||
export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => { |
||||
const { pathPrefix } = useSettings(); |
||||
const [notifications, setNotifications] = useState<Notification[]>([]); |
||||
const [isConnectionError, setIsConnectionError] = useState(false); |
||||
const [shouldFetchFromAPI, setShouldFetchFromAPI] = useState(false); |
||||
|
||||
const { data, isError } = useAPIQuery<NotificationsResult>({ |
||||
path: '/notifications', |
||||
enabled: shouldFetchFromAPI, |
||||
refetchInterval: 10000, |
||||
}); |
||||
|
||||
useEffect(() => { |
||||
if (data && data.data) { |
||||
setNotifications(data.data); |
||||
} |
||||
setIsConnectionError(isError); |
||||
}, [data, isError]); |
||||
|
||||
useEffect(() => { |
||||
const eventSource = new EventSource(`${pathPrefix}/api/v1/notifications/live`); |
||||
|
||||
eventSource.onmessage = (event) => { |
||||
const notification: Notification = JSON.parse(event.data); |
||||
|
||||
setNotifications((prev: Notification[]) => { |
||||
const updatedNotifications = [...prev.filter((n: Notification) => n.text !== notification.text)]; |
||||
|
||||
if (notification.active) { |
||||
updatedNotifications.push(notification); |
||||
} |
||||
|
||||
return updatedNotifications; |
||||
}); |
||||
}; |
||||
|
||||
eventSource.onerror = () => { |
||||
eventSource.close(); |
||||
setIsConnectionError(true); |
||||
setShouldFetchFromAPI(true); |
||||
}; |
||||
|
||||
return () => { |
||||
eventSource.close(); |
||||
}; |
||||
}, [pathPrefix]); |
||||
|
||||
return ( |
||||
<NotificationsContext.Provider value={{ notifications, isConnectionError }}> |
||||
{children} |
||||
</NotificationsContext.Provider> |
||||
); |
||||
}; |
||||
|
||||
export default NotificationsProvider; |
@ -0,0 +1,17 @@
|
||||
import { createContext, useContext } from 'react'; |
||||
import { Notification } from "../api/responseTypes/notifications"; |
||||
|
||||
export type NotificationsContextType = { |
||||
notifications: Notification[]; |
||||
isConnectionError: boolean; |
||||
}; |
||||
|
||||
const defaultContextValue: NotificationsContextType = { |
||||
notifications: [], |
||||
isConnectionError: false, |
||||
}; |
||||
|
||||
export const NotificationsContext = createContext<NotificationsContextType>(defaultContextValue); |
||||
|
||||
// Custom hook to access notifications context
|
||||
export const useNotifications = () => useContext(NotificationsContext); |
Loading…
Reference in new issue