Browse Source

Fix panic when updating a remote write queue (#7452)

Right now Queue Manager metrics are registered when the metrics struct
is created, which happens before a changed queue is shutdown and the old
metrics are unregistered. In the case of named queues or updates to
external labels the apply config will panic due to duplicate metrics.

Instead, register the metrics as part of starting the queue as we always
guarantee that Stop will be called before a new Start.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
pull/7464/head
Chris Marchbanks 4 years ago committed by GitHub
parent
commit
b299aba6cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      storage/remote/queue_manager.go
  2. 45
      storage/remote/write_test.go

12
storage/remote/queue_manager.go

@ -177,8 +177,12 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
ConstLabels: constLabels,
})
if r != nil {
r.MustRegister(
return m
}
func (m *queueManagerMetrics) register() {
if m.reg != nil {
m.reg.MustRegister(
m.succeededSamplesTotal,
m.failedSamplesTotal,
m.retriedSamplesTotal,
@ -195,7 +199,6 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
m.bytesSent,
)
}
return m
}
func (m *queueManagerMetrics) unregister() {
@ -358,7 +361,8 @@ outer:
// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *QueueManager) Start() {
// Initialise some metrics.
// Register and initialise some metrics.
t.metrics.register()
t.metrics.shardCapacity.Set(float64(t.cfg.Capacity))
t.metrics.pendingSamples.Set(0)
t.metrics.maxNumShards.Set(float64(t.cfg.MaxShards))

45
storage/remote/write_test.go

@ -20,6 +20,7 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
common_config "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
@ -154,6 +155,48 @@ func TestRestartOnNameChange(t *testing.T) {
testutil.Ok(t, err)
}
func TestUpdateWithRegisterer(t *testing.T) {
dir, err := ioutil.TempDir("", "TestRestartWithRegisterer")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
QueueConfig: config.DefaultQueueConfig,
}
c2 := &config.RemoteWriteConfig{
URL: &common_config.URL{
URL: &url.URL{
Scheme: "http",
Host: "localhost",
},
},
QueueConfig: config.DefaultQueueConfig,
}
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{c1, c2},
}
testutil.Ok(t, s.ApplyConfig(conf))
c1.QueueConfig.MaxShards = 10
c2.QueueConfig.MaxShards = 10
testutil.Ok(t, s.ApplyConfig(conf))
for _, queue := range s.queues {
testutil.Equals(t, 10, queue.cfg.MaxShards)
}
err = s.Close()
testutil.Ok(t, err)
}
func TestWriteStorageLifecycle(t *testing.T) {
dir, err := ioutil.TempDir("", "TestWriteStorageLifecycle")
testutil.Ok(t, err)
@ -178,7 +221,7 @@ func TestUpdateExternalLabels(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(dir)
s := NewWriteStorage(nil, nil, dir, time.Second)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{

Loading…
Cancel
Save