mirror of https://github.com/prometheus/prometheus
Merge pull request #14946 from roidelapluie/notifications
Add notifications to the Web UIpull/14996/head
commit
7aa4721373
|
@ -78,6 +78,7 @@ import (
|
|||
"github.com/prometheus/prometheus/util/logging"
|
||||
prom_runtime "github.com/prometheus/prometheus/util/runtime"
|
||||
"github.com/prometheus/prometheus/web"
|
||||
"github.com/prometheus/prometheus/web/api"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -273,13 +274,17 @@ func main() {
|
|||
)
|
||||
}
|
||||
|
||||
notifs := api.NewNotifications(prometheus.DefaultRegisterer)
|
||||
|
||||
cfg := flagConfig{
|
||||
notifier: notifier.Options{
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
},
|
||||
web: web.Options{
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
Gatherer: prometheus.DefaultGatherer,
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
Gatherer: prometheus.DefaultGatherer,
|
||||
NotificationsSub: notifs.Sub,
|
||||
NotificationsGetter: notifs.Get,
|
||||
},
|
||||
promlogConfig: promlog.Config{},
|
||||
}
|
||||
|
@ -1078,6 +1083,14 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
callback := func(success bool) {
|
||||
if success {
|
||||
notifs.DeleteNotification(api.ConfigurationUnsuccessful)
|
||||
return
|
||||
}
|
||||
notifs.AddNotification(api.ConfigurationUnsuccessful)
|
||||
}
|
||||
|
||||
g.Add(
|
||||
func() error {
|
||||
<-reloadReady.C
|
||||
|
@ -1085,7 +1098,7 @@ func main() {
|
|||
for {
|
||||
select {
|
||||
case <-hup:
|
||||
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, callback, reloaders...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error reloading config", "err", err)
|
||||
} else if cfg.enableAutoReload {
|
||||
if currentChecksum, err := config.GenerateChecksum(cfg.configFile); err == nil {
|
||||
|
@ -1095,7 +1108,7 @@ func main() {
|
|||
}
|
||||
}
|
||||
case rc := <-webHandler.Reload():
|
||||
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, callback, reloaders...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error reloading config", "err", err)
|
||||
rc <- err
|
||||
} else {
|
||||
|
@ -1120,7 +1133,7 @@ func main() {
|
|||
}
|
||||
level.Info(logger).Log("msg", "Configuration file change detected, reloading the configuration.")
|
||||
|
||||
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, callback, reloaders...); err != nil {
|
||||
level.Error(logger).Log("msg", "Error reloading config", "err", err)
|
||||
} else {
|
||||
checksum = currentChecksum
|
||||
|
@ -1150,7 +1163,7 @@ func main() {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
|
||||
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, func(bool) {}, reloaders...); err != nil {
|
||||
return fmt.Errorf("error loading config from %q: %w", cfg.configFile, err)
|
||||
}
|
||||
|
||||
|
@ -1376,7 +1389,7 @@ type reloader struct {
|
|||
reloader func(*config.Config) error
|
||||
}
|
||||
|
||||
func reloadConfig(filename string, expandExternalLabels, enableExemplarStorage bool, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...reloader) (err error) {
|
||||
func reloadConfig(filename string, expandExternalLabels, enableExemplarStorage bool, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, callback func(bool), rls ...reloader) (err error) {
|
||||
start := time.Now()
|
||||
timings := []interface{}{}
|
||||
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
|
||||
|
@ -1385,8 +1398,10 @@ func reloadConfig(filename string, expandExternalLabels, enableExemplarStorage b
|
|||
if err == nil {
|
||||
configSuccess.Set(1)
|
||||
configSuccessTime.SetToCurrentTime()
|
||||
callback(true)
|
||||
} else {
|
||||
configSuccess.Set(0)
|
||||
callback(false)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
@ -0,0 +1,176 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
ConfigurationUnsuccessful = "Configuration reload has failed."
|
||||
)
|
||||
|
||||
// Notification represents an individual notification message.
|
||||
type Notification struct {
|
||||
Text string `json:"text"`
|
||||
Date time.Time `json:"date"`
|
||||
Active bool `json:"active"`
|
||||
}
|
||||
|
||||
// Notifications stores a list of Notification objects.
|
||||
// It also manages live subscribers that receive notifications via channels.
|
||||
type Notifications struct {
|
||||
mu sync.Mutex
|
||||
notifications []Notification
|
||||
subscribers map[chan Notification]struct{} // Active subscribers.
|
||||
|
||||
subscriberGauge prometheus.Gauge
|
||||
notificationsSent prometheus.Counter
|
||||
notificationsDropped prometheus.Counter
|
||||
}
|
||||
|
||||
// NewNotifications creates a new Notifications instance.
|
||||
func NewNotifications(reg prometheus.Registerer) *Notifications {
|
||||
n := &Notifications{
|
||||
subscribers: make(map[chan Notification]struct{}),
|
||||
subscriberGauge: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "api",
|
||||
Name: "notification_active_subscribers",
|
||||
Help: "The current number of active notification subscribers.",
|
||||
}),
|
||||
notificationsSent: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "api",
|
||||
Name: "notification_updates_sent_total",
|
||||
Help: "Total number of notification updates sent.",
|
||||
}),
|
||||
notificationsDropped: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "api",
|
||||
Name: "notification_updates_dropped_total",
|
||||
Help: "Total number of notification updates dropped.",
|
||||
}),
|
||||
}
|
||||
|
||||
if reg != nil {
|
||||
reg.MustRegister(n.subscriberGauge, n.notificationsSent, n.notificationsDropped)
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
// AddNotification adds a new notification or updates the timestamp if it already exists.
|
||||
func (n *Notifications) AddNotification(text string) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
for i, notification := range n.notifications {
|
||||
if notification.Text == text {
|
||||
n.notifications[i].Date = time.Now()
|
||||
|
||||
n.notifySubscribers(n.notifications[i])
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
newNotification := Notification{
|
||||
Text: text,
|
||||
Date: time.Now(),
|
||||
Active: true,
|
||||
}
|
||||
n.notifications = append(n.notifications, newNotification)
|
||||
|
||||
n.notifySubscribers(newNotification)
|
||||
}
|
||||
|
||||
// notifySubscribers sends a notification to all active subscribers.
|
||||
func (n *Notifications) notifySubscribers(notification Notification) {
|
||||
for sub := range n.subscribers {
|
||||
// Non-blocking send to avoid subscriber blocking issues.
|
||||
n.notificationsSent.Inc()
|
||||
select {
|
||||
case sub <- notification:
|
||||
// Notification sent to the subscriber.
|
||||
default:
|
||||
// Drop the notification if the subscriber's channel is full.
|
||||
n.notificationsDropped.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteNotification removes the first notification that matches the provided text.
|
||||
// The deleted notification is sent to subscribers with Active: false before being removed.
|
||||
func (n *Notifications) DeleteNotification(text string) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
// Iterate through the notifications to find the matching text.
|
||||
for i, notification := range n.notifications {
|
||||
if notification.Text == text {
|
||||
// Mark the notification as inactive and notify subscribers.
|
||||
notification.Active = false
|
||||
n.notifySubscribers(notification)
|
||||
|
||||
// Remove the notification from the list.
|
||||
n.notifications = append(n.notifications[:i], n.notifications[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns a copy of the list of notifications for safe access outside the struct.
|
||||
func (n *Notifications) Get() []Notification {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
// Return a copy of the notifications slice to avoid modifying the original slice outside.
|
||||
notificationsCopy := make([]Notification, len(n.notifications))
|
||||
copy(notificationsCopy, n.notifications)
|
||||
return notificationsCopy
|
||||
}
|
||||
|
||||
// Sub allows a client to subscribe to live notifications.
|
||||
// It returns a channel where the subscriber will receive notifications and a function to unsubscribe.
|
||||
// Each subscriber has its own goroutine to handle notifications and prevent blocking.
|
||||
func (n *Notifications) Sub() (<-chan Notification, func()) {
|
||||
ch := make(chan Notification, 10) // Buffered channel to prevent blocking.
|
||||
|
||||
n.mu.Lock()
|
||||
// Add the new subscriber to the list.
|
||||
n.subscribers[ch] = struct{}{}
|
||||
n.subscriberGauge.Set(float64(len(n.subscribers)))
|
||||
|
||||
// Send all current notifications to the new subscriber.
|
||||
for _, notification := range n.notifications {
|
||||
ch <- notification
|
||||
}
|
||||
n.mu.Unlock()
|
||||
|
||||
// Unsubscribe function to remove the channel from subscribers.
|
||||
unsubscribe := func() {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
// Close the channel and remove it from the subscribers map.
|
||||
close(ch)
|
||||
delete(n.subscribers, ch)
|
||||
n.subscriberGauge.Set(float64(len(n.subscribers)))
|
||||
}
|
||||
|
||||
return ch, unsubscribe
|
||||
}
|
|
@ -0,0 +1,192 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestNotificationLifecycle tests adding, modifying, and deleting notifications.
|
||||
func TestNotificationLifecycle(t *testing.T) {
|
||||
notifs := NewNotifications(nil)
|
||||
|
||||
// Add a notification.
|
||||
notifs.AddNotification("Test Notification 1")
|
||||
|
||||
// Check if the notification was added.
|
||||
notifications := notifs.Get()
|
||||
require.Len(t, notifications, 1, "Expected 1 notification after addition.")
|
||||
require.Equal(t, "Test Notification 1", notifications[0].Text, "Notification text mismatch.")
|
||||
require.True(t, notifications[0].Active, "Expected notification to be active.")
|
||||
|
||||
// Modify the notification.
|
||||
notifs.AddNotification("Test Notification 1")
|
||||
notifications = notifs.Get()
|
||||
require.Len(t, notifications, 1, "Expected 1 notification after modification.")
|
||||
|
||||
// Delete the notification.
|
||||
notifs.DeleteNotification("Test Notification 1")
|
||||
notifications = notifs.Get()
|
||||
require.Empty(t, notifications, "Expected no notifications after deletion.")
|
||||
}
|
||||
|
||||
// TestSubscriberReceivesNotifications tests that a subscriber receives notifications, including modifications and deletions.
|
||||
func TestSubscriberReceivesNotifications(t *testing.T) {
|
||||
notifs := NewNotifications(nil)
|
||||
|
||||
// Subscribe to notifications.
|
||||
sub, unsubscribe := notifs.Sub()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
receivedNotifications := make([]Notification, 0)
|
||||
|
||||
// Goroutine to listen for notifications.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for notification := range sub {
|
||||
receivedNotifications = append(receivedNotifications, notification)
|
||||
}
|
||||
}()
|
||||
|
||||
// Add notifications.
|
||||
notifs.AddNotification("Test Notification 1")
|
||||
notifs.AddNotification("Test Notification 2")
|
||||
|
||||
// Modify a notification.
|
||||
notifs.AddNotification("Test Notification 1")
|
||||
|
||||
// Delete a notification.
|
||||
notifs.DeleteNotification("Test Notification 2")
|
||||
|
||||
// Wait for notifications to propagate.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
unsubscribe()
|
||||
wg.Wait() // Wait for the subscriber goroutine to finish.
|
||||
|
||||
// Verify that we received the expected number of notifications.
|
||||
require.Len(t, receivedNotifications, 4, "Expected 4 notifications (2 active, 1 modified, 1 deleted).")
|
||||
|
||||
// Check the content and state of received notifications.
|
||||
expected := []struct {
|
||||
Text string
|
||||
Active bool
|
||||
}{
|
||||
{"Test Notification 1", true},
|
||||
{"Test Notification 2", true},
|
||||
{"Test Notification 1", true},
|
||||
{"Test Notification 2", false},
|
||||
}
|
||||
|
||||
for i, n := range receivedNotifications {
|
||||
require.Equal(t, expected[i].Text, n.Text, "Notification text mismatch at index %d.", i)
|
||||
require.Equal(t, expected[i].Active, n.Active, "Notification active state mismatch at index %d.", i)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMultipleSubscribers tests that multiple subscribers receive notifications independently.
|
||||
func TestMultipleSubscribers(t *testing.T) {
|
||||
notifs := NewNotifications(nil)
|
||||
|
||||
// Subscribe two subscribers to notifications.
|
||||
sub1, unsubscribe1 := notifs.Sub()
|
||||
|
||||
sub2, unsubscribe2 := notifs.Sub()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
receivedSub1 := make([]Notification, 0)
|
||||
receivedSub2 := make([]Notification, 0)
|
||||
|
||||
// Goroutine for subscriber 1.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for notification := range sub1 {
|
||||
receivedSub1 = append(receivedSub1, notification)
|
||||
}
|
||||
}()
|
||||
|
||||
// Goroutine for subscriber 2.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for notification := range sub2 {
|
||||
receivedSub2 = append(receivedSub2, notification)
|
||||
}
|
||||
}()
|
||||
|
||||
// Add and delete notifications.
|
||||
notifs.AddNotification("Test Notification 1")
|
||||
notifs.DeleteNotification("Test Notification 1")
|
||||
|
||||
// Wait for notifications to propagate.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Unsubscribe both.
|
||||
unsubscribe1()
|
||||
unsubscribe2()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Both subscribers should have received the same 2 notifications.
|
||||
require.Len(t, receivedSub1, 2, "Expected 2 notifications for subscriber 1.")
|
||||
require.Len(t, receivedSub2, 2, "Expected 2 notifications for subscriber 2.")
|
||||
|
||||
// Verify that both subscribers received the same notifications.
|
||||
for i := 0; i < 2; i++ {
|
||||
require.Equal(t, receivedSub1[i], receivedSub2[i], "Subscriber notification mismatch at index %d.", i)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUnsubscribe tests that unsubscribing prevents further notifications from being received.
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
notifs := NewNotifications(nil)
|
||||
|
||||
// Subscribe to notifications.
|
||||
sub, unsubscribe := notifs.Sub()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
receivedNotifications := make([]Notification, 0)
|
||||
|
||||
// Goroutine to listen for notifications.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for notification := range sub {
|
||||
receivedNotifications = append(receivedNotifications, notification)
|
||||
}
|
||||
}()
|
||||
|
||||
// Add a notification and then unsubscribe.
|
||||
notifs.AddNotification("Test Notification 1")
|
||||
time.Sleep(100 * time.Millisecond) // Allow time for notification delivery.
|
||||
unsubscribe() // Unsubscribe.
|
||||
|
||||
// Add another notification after unsubscribing.
|
||||
notifs.AddNotification("Test Notification 2")
|
||||
|
||||
// Wait for the subscriber goroutine to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Only the first notification should have been received.
|
||||
require.Len(t, receivedNotifications, 1, "Expected 1 notification before unsubscribe.")
|
||||
require.Equal(t, "Test Notification 1", receivedNotifications[0].Text, "Unexpected notification text.")
|
||||
}
|
|
@ -15,6 +15,7 @@ package v1
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
@ -54,6 +55,7 @@ import (
|
|||
"github.com/prometheus/prometheus/util/annotations"
|
||||
"github.com/prometheus/prometheus/util/httputil"
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
"github.com/prometheus/prometheus/web/api"
|
||||
)
|
||||
|
||||
type status string
|
||||
|
@ -202,16 +204,18 @@ type API struct {
|
|||
ready func(http.HandlerFunc) http.HandlerFunc
|
||||
globalURLOptions GlobalURLOptions
|
||||
|
||||
db TSDBAdminStats
|
||||
dbDir string
|
||||
enableAdmin bool
|
||||
logger log.Logger
|
||||
CORSOrigin *regexp.Regexp
|
||||
buildInfo *PrometheusVersion
|
||||
runtimeInfo func() (RuntimeInfo, error)
|
||||
gatherer prometheus.Gatherer
|
||||
isAgent bool
|
||||
statsRenderer StatsRenderer
|
||||
db TSDBAdminStats
|
||||
dbDir string
|
||||
enableAdmin bool
|
||||
logger log.Logger
|
||||
CORSOrigin *regexp.Regexp
|
||||
buildInfo *PrometheusVersion
|
||||
runtimeInfo func() (RuntimeInfo, error)
|
||||
gatherer prometheus.Gatherer
|
||||
isAgent bool
|
||||
statsRenderer StatsRenderer
|
||||
notificationsGetter func() []api.Notification
|
||||
notificationsSub func() (<-chan api.Notification, func())
|
||||
|
||||
remoteWriteHandler http.Handler
|
||||
remoteReadHandler http.Handler
|
||||
|
@ -245,6 +249,8 @@ func NewAPI(
|
|||
corsOrigin *regexp.Regexp,
|
||||
runtimeInfo func() (RuntimeInfo, error),
|
||||
buildInfo *PrometheusVersion,
|
||||
notificationsGetter func() []api.Notification,
|
||||
notificationsSub func() (<-chan api.Notification, func()),
|
||||
gatherer prometheus.Gatherer,
|
||||
registerer prometheus.Registerer,
|
||||
statsRenderer StatsRenderer,
|
||||
|
@ -261,22 +267,24 @@ func NewAPI(
|
|||
targetRetriever: tr,
|
||||
alertmanagerRetriever: ar,
|
||||
|
||||
now: time.Now,
|
||||
config: configFunc,
|
||||
flagsMap: flagsMap,
|
||||
ready: readyFunc,
|
||||
globalURLOptions: globalURLOptions,
|
||||
db: db,
|
||||
dbDir: dbDir,
|
||||
enableAdmin: enableAdmin,
|
||||
rulesRetriever: rr,
|
||||
logger: logger,
|
||||
CORSOrigin: corsOrigin,
|
||||
runtimeInfo: runtimeInfo,
|
||||
buildInfo: buildInfo,
|
||||
gatherer: gatherer,
|
||||
isAgent: isAgent,
|
||||
statsRenderer: DefaultStatsRenderer,
|
||||
now: time.Now,
|
||||
config: configFunc,
|
||||
flagsMap: flagsMap,
|
||||
ready: readyFunc,
|
||||
globalURLOptions: globalURLOptions,
|
||||
db: db,
|
||||
dbDir: dbDir,
|
||||
enableAdmin: enableAdmin,
|
||||
rulesRetriever: rr,
|
||||
logger: logger,
|
||||
CORSOrigin: corsOrigin,
|
||||
runtimeInfo: runtimeInfo,
|
||||
buildInfo: buildInfo,
|
||||
gatherer: gatherer,
|
||||
isAgent: isAgent,
|
||||
statsRenderer: DefaultStatsRenderer,
|
||||
notificationsGetter: notificationsGetter,
|
||||
notificationsSub: notificationsSub,
|
||||
|
||||
remoteReadHandler: remote.NewReadHandler(logger, registerer, q, configFunc, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame),
|
||||
}
|
||||
|
@ -390,6 +398,8 @@ func (api *API) Register(r *route.Router) {
|
|||
r.Get("/status/flags", wrap(api.serveFlags))
|
||||
r.Get("/status/tsdb", wrapAgent(api.serveTSDBStatus))
|
||||
r.Get("/status/walreplay", api.serveWALReplayStatus)
|
||||
r.Get("/notifications", api.notifications)
|
||||
r.Get("/notifications/live", api.notificationsSSE)
|
||||
r.Post("/read", api.ready(api.remoteRead))
|
||||
r.Post("/write", api.ready(api.remoteWrite))
|
||||
r.Post("/otlp/v1/metrics", api.ready(api.otlpWrite))
|
||||
|
@ -1668,6 +1678,49 @@ func (api *API) serveWALReplayStatus(w http.ResponseWriter, r *http.Request) {
|
|||
}, nil, "")
|
||||
}
|
||||
|
||||
func (api *API) notifications(w http.ResponseWriter, r *http.Request) {
|
||||
httputil.SetCORS(w, api.CORSOrigin, r)
|
||||
api.respond(w, r, api.notificationsGetter(), nil, "")
|
||||
}
|
||||
|
||||
func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) {
|
||||
httputil.SetCORS(w, api.CORSOrigin, r)
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
|
||||
// Subscribe to notifications.
|
||||
notifications, unsubscribe := api.notificationsSub()
|
||||
defer unsubscribe()
|
||||
|
||||
// Set up a flusher to push the response to the client.
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case notification := <-notifications:
|
||||
// Marshal the notification to JSON.
|
||||
jsonData, err := json.Marshal(notification)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
continue
|
||||
}
|
||||
|
||||
// Write the event data in SSE format with JSON content.
|
||||
fmt.Fprintf(w, "data: %s\n\n", jsonData)
|
||||
|
||||
// Flush the response to ensure the data is sent immediately.
|
||||
flusher.Flush()
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||
// This is only really for tests - this will never be nil IRL.
|
||||
if api.remoteReadHandler != nil {
|
||||
|
|
|
@ -134,6 +134,8 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route
|
|||
regexp.MustCompile(".*"),
|
||||
func() (RuntimeInfo, error) { return RuntimeInfo{}, errors.New("not implemented") },
|
||||
&PrometheusVersion{},
|
||||
nil,
|
||||
nil,
|
||||
prometheus.DefaultGatherer,
|
||||
nil,
|
||||
nil,
|
||||
|
|
|
@ -64,6 +64,8 @@ import { useAppDispatch } from "./state/hooks";
|
|||
import { updateSettings, useSettings } from "./state/settingsSlice";
|
||||
import SettingsMenu from "./components/SettingsMenu";
|
||||
import ReadinessWrapper from "./components/ReadinessWrapper";
|
||||
import NotificationsProvider from "./components/NotificationsProvider";
|
||||
import NotificationsIcon from "./components/NotificationsIcon";
|
||||
import { QueryParamProvider } from "use-query-params";
|
||||
import { ReactRouter6Adapter } from "use-query-params/adapters/react-router-6";
|
||||
import ServiceDiscoveryPage from "./pages/service-discovery/ServiceDiscoveryPage";
|
||||
|
@ -314,6 +316,7 @@ function App() {
|
|||
const navActionIcons = (
|
||||
<>
|
||||
<ThemeSelector />
|
||||
<NotificationsIcon />
|
||||
<SettingsMenu />
|
||||
<ActionIcon
|
||||
component="a"
|
||||
|
@ -347,47 +350,49 @@ function App() {
|
|||
}}
|
||||
padding="md"
|
||||
>
|
||||
<AppShell.Header bg="rgb(65, 73, 81)" c="#fff">
|
||||
<Group h="100%" px="md" wrap="nowrap">
|
||||
<Group
|
||||
style={{ flex: 1 }}
|
||||
justify="space-between"
|
||||
wrap="nowrap"
|
||||
>
|
||||
<Group gap={65} wrap="nowrap">
|
||||
<Link
|
||||
to="/"
|
||||
style={{ textDecoration: "none", color: "white" }}
|
||||
>
|
||||
<Group gap={10} wrap="nowrap">
|
||||
<img src={PrometheusLogo} height={30} />
|
||||
<Text fz={20}>Prometheus{agentMode && " Agent"}</Text>
|
||||
<NotificationsProvider>
|
||||
<AppShell.Header bg="rgb(65, 73, 81)" c="#fff">
|
||||
<Group h="100%" px="md" wrap="nowrap">
|
||||
<Group
|
||||
style={{ flex: 1 }}
|
||||
justify="space-between"
|
||||
wrap="nowrap"
|
||||
>
|
||||
<Group gap={65} wrap="nowrap">
|
||||
<Link
|
||||
to="/"
|
||||
style={{ textDecoration: "none", color: "white" }}
|
||||
>
|
||||
<Group gap={10} wrap="nowrap">
|
||||
<img src={PrometheusLogo} height={30} />
|
||||
<Text fz={20}>Prometheus{agentMode && " Agent"}</Text>
|
||||
</Group>
|
||||
</Link>
|
||||
<Group gap={12} visibleFrom="sm" wrap="nowrap">
|
||||
{navLinks}
|
||||
</Group>
|
||||
</Link>
|
||||
<Group gap={12} visibleFrom="sm" wrap="nowrap">
|
||||
{navLinks}
|
||||
</Group>
|
||||
<Group visibleFrom="xs" wrap="nowrap" gap="xs">
|
||||
{navActionIcons}
|
||||
</Group>
|
||||
</Group>
|
||||
<Group visibleFrom="xs" wrap="nowrap" gap="xs">
|
||||
{navActionIcons}
|
||||
</Group>
|
||||
<Burger
|
||||
opened={opened}
|
||||
onClick={toggle}
|
||||
hiddenFrom="sm"
|
||||
size="sm"
|
||||
color="gray.2"
|
||||
/>
|
||||
</Group>
|
||||
<Burger
|
||||
opened={opened}
|
||||
onClick={toggle}
|
||||
hiddenFrom="sm"
|
||||
size="sm"
|
||||
color="gray.2"
|
||||
/>
|
||||
</Group>
|
||||
</AppShell.Header>
|
||||
</AppShell.Header>
|
||||
|
||||
<AppShell.Navbar py="md" px={4} bg="rgb(65, 73, 81)" c="#fff">
|
||||
{navLinks}
|
||||
<Group mt="md" hiddenFrom="xs" justify="center">
|
||||
{navActionIcons}
|
||||
</Group>
|
||||
</AppShell.Navbar>
|
||||
<AppShell.Navbar py="md" px={4} bg="rgb(65, 73, 81)" c="#fff">
|
||||
{navLinks}
|
||||
<Group mt="md" hiddenFrom="xs" justify="center">
|
||||
{navActionIcons}
|
||||
</Group>
|
||||
</AppShell.Navbar>
|
||||
</NotificationsProvider>
|
||||
|
||||
<AppShell.Main>
|
||||
<ErrorBoundary key={location.pathname}>
|
||||
|
|
|
@ -93,6 +93,7 @@ type QueryOptions = {
|
|||
path: string;
|
||||
params?: Record<string, string>;
|
||||
enabled?: boolean;
|
||||
refetchInterval?: false | number;
|
||||
recordResponseTime?: (time: number) => void;
|
||||
};
|
||||
|
||||
|
@ -102,6 +103,7 @@ export const useAPIQuery = <T>({
|
|||
params,
|
||||
enabled,
|
||||
recordResponseTime,
|
||||
refetchInterval,
|
||||
}: QueryOptions) => {
|
||||
const { pathPrefix } = useSettings();
|
||||
|
||||
|
@ -109,6 +111,7 @@ export const useAPIQuery = <T>({
|
|||
queryKey: key !== undefined ? key : [path, params],
|
||||
retry: false,
|
||||
refetchOnWindowFocus: false,
|
||||
refetchInterval: refetchInterval,
|
||||
gcTime: 0,
|
||||
enabled,
|
||||
queryFn: createQueryFn({ pathPrefix, path, params, recordResponseTime }),
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
export interface Notification {
|
||||
text: string;
|
||||
date: string;
|
||||
active: boolean;
|
||||
modified: boolean;
|
||||
}
|
||||
|
||||
export type NotificationsResult = Notification[];
|
|
@ -0,0 +1,62 @@
|
|||
import { ActionIcon, Indicator, Popover, Card, Text, Stack, ScrollArea, Group } from "@mantine/core";
|
||||
import { IconBell, IconAlertTriangle, IconNetworkOff } from "@tabler/icons-react";
|
||||
import { useNotifications } from '../state/useNotifications';
|
||||
import { actionIconStyle } from "../styles";
|
||||
import { useSettings } from '../state/settingsSlice';
|
||||
import { formatTimestamp } from "../lib/formatTime";
|
||||
|
||||
const NotificationsIcon = () => {
|
||||
const { notifications, isConnectionError } = useNotifications();
|
||||
const { useLocalTime } = useSettings();
|
||||
|
||||
return (
|
||||
(notifications.length === 0 && !isConnectionError) ? null : (
|
||||
<Indicator
|
||||
color={"red"}
|
||||
size={16}
|
||||
label={isConnectionError ? "!" : notifications.length}
|
||||
>
|
||||
<Popover position="bottom-end" shadow="md" withArrow>
|
||||
<Popover.Target>
|
||||
<ActionIcon color="gray" title="Notifications" aria-label="Notifications" size={32}>
|
||||
<IconBell style={actionIconStyle}/>
|
||||
</ActionIcon>
|
||||
</Popover.Target>
|
||||
|
||||
<Popover.Dropdown>
|
||||
<Stack gap="xs">
|
||||
<Text fw={700} size="xs" color="dimmed" ta="center">Notifications</Text>
|
||||
<ScrollArea.Autosize mah={200}>
|
||||
{ isConnectionError ? (
|
||||
<Card p="xs" color="red">
|
||||
<Group wrap="nowrap">
|
||||
<IconNetworkOff color="red" size={20} />
|
||||
<Stack gap="0">
|
||||
<Text size="sm" fw={500}>Real-time notifications interrupted.</Text>
|
||||
<Text size="xs" color="dimmed">Please refresh the page or check your connection.</Text>
|
||||
</Stack>
|
||||
</Group>
|
||||
</Card>
|
||||
) : notifications.length === 0 ? (
|
||||
<Text ta="center" color="dimmed">No notifications</Text>
|
||||
) : (notifications.map((notification, index) => (
|
||||
<Card key={index} p="xs">
|
||||
<Group wrap="nowrap">
|
||||
<IconAlertTriangle color="red" size={20} />
|
||||
<Stack style={{ maxWidth: 250 }} gap={0}>
|
||||
<Text size="sm" fw={500}>{notification.text}</Text>
|
||||
<Text size="xs" color="dimmed">{formatTimestamp(new Date(notification.date).valueOf() / 1000, useLocalTime)}</Text>
|
||||
</Stack>
|
||||
</Group>
|
||||
</Card>
|
||||
)))}
|
||||
</ScrollArea.Autosize>
|
||||
</Stack>
|
||||
</Popover.Dropdown>
|
||||
</Popover>
|
||||
</Indicator>
|
||||
)
|
||||
);
|
||||
};
|
||||
|
||||
export default NotificationsIcon;
|
|
@ -0,0 +1,61 @@
|
|||
import React, { useEffect, useState } from 'react';
|
||||
import { useSettings } from '../state/settingsSlice';
|
||||
import { NotificationsContext } from '../state/useNotifications';
|
||||
import { Notification, NotificationsResult } from "../api/responseTypes/notifications";
|
||||
import { useAPIQuery } from '../api/api';
|
||||
|
||||
export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => {
|
||||
const { pathPrefix } = useSettings();
|
||||
const [notifications, setNotifications] = useState<Notification[]>([]);
|
||||
const [isConnectionError, setIsConnectionError] = useState(false);
|
||||
const [shouldFetchFromAPI, setShouldFetchFromAPI] = useState(false);
|
||||
|
||||
const { data, isError } = useAPIQuery<NotificationsResult>({
|
||||
path: '/notifications',
|
||||
enabled: shouldFetchFromAPI,
|
||||
refetchInterval: 10000,
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
if (data && data.data) {
|
||||
setNotifications(data.data);
|
||||
}
|
||||
setIsConnectionError(isError);
|
||||
}, [data, isError]);
|
||||
|
||||
useEffect(() => {
|
||||
const eventSource = new EventSource(`${pathPrefix}/api/v1/notifications/live`);
|
||||
|
||||
eventSource.onmessage = (event) => {
|
||||
const notification: Notification = JSON.parse(event.data);
|
||||
|
||||
setNotifications((prev: Notification[]) => {
|
||||
const updatedNotifications = [...prev.filter((n: Notification) => n.text !== notification.text)];
|
||||
|
||||
if (notification.active) {
|
||||
updatedNotifications.push(notification);
|
||||
}
|
||||
|
||||
return updatedNotifications;
|
||||
});
|
||||
};
|
||||
|
||||
eventSource.onerror = () => {
|
||||
eventSource.close();
|
||||
setIsConnectionError(true);
|
||||
setShouldFetchFromAPI(true);
|
||||
};
|
||||
|
||||
return () => {
|
||||
eventSource.close();
|
||||
};
|
||||
}, [pathPrefix]);
|
||||
|
||||
return (
|
||||
<NotificationsContext.Provider value={{ notifications, isConnectionError }}>
|
||||
{children}
|
||||
</NotificationsContext.Provider>
|
||||
);
|
||||
};
|
||||
|
||||
export default NotificationsProvider;
|
|
@ -0,0 +1,17 @@
|
|||
import { createContext, useContext } from 'react';
|
||||
import { Notification } from "../api/responseTypes/notifications";
|
||||
|
||||
export type NotificationsContextType = {
|
||||
notifications: Notification[];
|
||||
isConnectionError: boolean;
|
||||
};
|
||||
|
||||
const defaultContextValue: NotificationsContextType = {
|
||||
notifications: [],
|
||||
isConnectionError: false,
|
||||
};
|
||||
|
||||
export const NotificationsContext = createContext<NotificationsContextType>(defaultContextValue);
|
||||
|
||||
// Custom hook to access notifications context
|
||||
export const useNotifications = () => useContext(NotificationsContext);
|
|
@ -59,6 +59,7 @@ import (
|
|||
"github.com/prometheus/prometheus/template"
|
||||
"github.com/prometheus/prometheus/util/httputil"
|
||||
"github.com/prometheus/prometheus/util/netconnlimit"
|
||||
"github.com/prometheus/prometheus/web/api"
|
||||
api_v1 "github.com/prometheus/prometheus/web/api/v1"
|
||||
"github.com/prometheus/prometheus/web/ui"
|
||||
)
|
||||
|
@ -266,6 +267,8 @@ type Options struct {
|
|||
RuleManager *rules.Manager
|
||||
Notifier *notifier.Manager
|
||||
Version *PrometheusVersion
|
||||
NotificationsGetter func() []api.Notification
|
||||
NotificationsSub func() (<-chan api.Notification, func())
|
||||
Flags map[string]string
|
||||
|
||||
ListenAddresses []string
|
||||
|
@ -376,6 +379,8 @@ func New(logger log.Logger, o *Options) *Handler {
|
|||
h.options.CORSOrigin,
|
||||
h.runtimeInfo,
|
||||
h.versionInfo,
|
||||
h.options.NotificationsGetter,
|
||||
h.options.NotificationsSub,
|
||||
o.Gatherer,
|
||||
o.Registerer,
|
||||
nil,
|
||||
|
|
Loading…
Reference in New Issue