From b299aba6cf32f940596863fc8e260b96bb18c60b Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 26 Jun 2020 00:33:52 -0600 Subject: [PATCH] Fix panic when updating a remote write queue (#7452) Right now Queue Manager metrics are registered when the metrics struct is created, which happens before a changed queue is shutdown and the old metrics are unregistered. In the case of named queues or updates to external labels the apply config will panic due to duplicate metrics. Instead, register the metrics as part of starting the queue as we always guarantee that Stop will be called before a new Start. Signed-off-by: Chris Marchbanks --- storage/remote/queue_manager.go | 12 ++++++--- storage/remote/write_test.go | 45 ++++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 6a8b4c432..787272788 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -177,8 +177,12 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager ConstLabels: constLabels, }) - if r != nil { - r.MustRegister( + return m +} + +func (m *queueManagerMetrics) register() { + if m.reg != nil { + m.reg.MustRegister( m.succeededSamplesTotal, m.failedSamplesTotal, m.retriedSamplesTotal, @@ -195,7 +199,6 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager m.bytesSent, ) } - return m } func (m *queueManagerMetrics) unregister() { @@ -358,7 +361,8 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { - // Initialise some metrics. + // Register and initialise some metrics. + t.metrics.register() t.metrics.shardCapacity.Set(float64(t.cfg.Capacity)) t.metrics.pendingSamples.Set(0) t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards)) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index bb8551cc6..69b710aa5 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" common_config "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -154,6 +155,48 @@ func TestRestartOnNameChange(t *testing.T) { testutil.Ok(t, err) } +func TestUpdateWithRegisterer(t *testing.T) { + dir, err := ioutil.TempDir("", "TestRestartWithRegisterer") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond) + c1 := &config.RemoteWriteConfig{ + Name: "named", + URL: &common_config.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + QueueConfig: config.DefaultQueueConfig, + } + c2 := &config.RemoteWriteConfig{ + URL: &common_config.URL{ + URL: &url.URL{ + Scheme: "http", + Host: "localhost", + }, + }, + QueueConfig: config.DefaultQueueConfig, + } + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{c1, c2}, + } + testutil.Ok(t, s.ApplyConfig(conf)) + + c1.QueueConfig.MaxShards = 10 + c2.QueueConfig.MaxShards = 10 + testutil.Ok(t, s.ApplyConfig(conf)) + for _, queue := range s.queues { + testutil.Equals(t, 10, queue.cfg.MaxShards) + } + + err = s.Close() + testutil.Ok(t, err) +} + func TestWriteStorageLifecycle(t *testing.T) { dir, err := ioutil.TempDir("", "TestWriteStorageLifecycle") testutil.Ok(t, err) @@ -178,7 +221,7 @@ func TestUpdateExternalLabels(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - s := NewWriteStorage(nil, nil, dir, time.Second) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second) externalLabels := labels.FromStrings("external", "true") conf := &config.Config{