mirror of https://github.com/prometheus/prometheus
pass registerer from storage to queue manager for its metrics (#6728)
* pass registerer from storage to queue manager for its metrics Signed-off-by: Robert Fratto <robert.fratto@grafana.com>pull/6750/head
parent
8c2bc2f57a
commit
a53e00f9fd
|
@ -27,7 +27,6 @@ import (
|
|||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/relabel"
|
||||
|
@ -46,8 +45,27 @@ const (
|
|||
shardToleranceFraction = 0.3
|
||||
)
|
||||
|
||||
var (
|
||||
succeededSamplesTotal = promauto.NewCounterVec(
|
||||
type queueManagerMetrics struct {
|
||||
succeededSamplesTotal *prometheus.CounterVec
|
||||
failedSamplesTotal *prometheus.CounterVec
|
||||
retriedSamplesTotal *prometheus.CounterVec
|
||||
droppedSamplesTotal *prometheus.CounterVec
|
||||
enqueueRetriesTotal *prometheus.CounterVec
|
||||
sentBatchDuration *prometheus.HistogramVec
|
||||
queueHighestSentTimestamp *prometheus.GaugeVec
|
||||
queuePendingSamples *prometheus.GaugeVec
|
||||
shardCapacity *prometheus.GaugeVec
|
||||
numShards *prometheus.GaugeVec
|
||||
maxNumShards *prometheus.GaugeVec
|
||||
minNumShards *prometheus.GaugeVec
|
||||
desiredNumShards *prometheus.GaugeVec
|
||||
bytesSent *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func newQueueManagerMetrics(r prometheus.Registerer) *queueManagerMetrics {
|
||||
m := &queueManagerMetrics{}
|
||||
|
||||
m.succeededSamplesTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -56,7 +74,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
failedSamplesTotal = promauto.NewCounterVec(
|
||||
m.failedSamplesTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -65,7 +83,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
retriedSamplesTotal = promauto.NewCounterVec(
|
||||
m.retriedSamplesTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -74,7 +92,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
droppedSamplesTotal = promauto.NewCounterVec(
|
||||
m.droppedSamplesTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -83,7 +101,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
enqueueRetriesTotal = promauto.NewCounterVec(
|
||||
m.enqueueRetriesTotal = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -92,7 +110,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
sentBatchDuration = promauto.NewHistogramVec(
|
||||
m.sentBatchDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -102,7 +120,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
queueHighestSentTimestamp = promauto.NewGaugeVec(
|
||||
m.queueHighestSentTimestamp = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -111,7 +129,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
queuePendingSamples = promauto.NewGaugeVec(
|
||||
m.queuePendingSamples = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -120,7 +138,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
shardCapacity = promauto.NewGaugeVec(
|
||||
m.shardCapacity = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -129,7 +147,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
numShards = promauto.NewGaugeVec(
|
||||
m.numShards = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -138,7 +156,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
maxNumShards = promauto.NewGaugeVec(
|
||||
m.maxNumShards = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -147,7 +165,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
minNumShards = promauto.NewGaugeVec(
|
||||
m.minNumShards = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -156,7 +174,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
desiredNumShards = promauto.NewGaugeVec(
|
||||
m.desiredNumShards = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -165,7 +183,7 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
bytesSent = promauto.NewCounterVec(
|
||||
m.bytesSent = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -174,7 +192,27 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint},
|
||||
)
|
||||
)
|
||||
|
||||
if r != nil {
|
||||
r.MustRegister(
|
||||
m.succeededSamplesTotal,
|
||||
m.failedSamplesTotal,
|
||||
m.retriedSamplesTotal,
|
||||
m.droppedSamplesTotal,
|
||||
m.enqueueRetriesTotal,
|
||||
m.sentBatchDuration,
|
||||
m.queueHighestSentTimestamp,
|
||||
m.queuePendingSamples,
|
||||
m.shardCapacity,
|
||||
m.numShards,
|
||||
m.maxNumShards,
|
||||
m.minNumShards,
|
||||
m.desiredNumShards,
|
||||
m.bytesSent,
|
||||
)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// StorageClient defines an interface for sending a batch of samples to an
|
||||
// external timeseries database.
|
||||
|
@ -215,6 +253,7 @@ type QueueManager struct {
|
|||
|
||||
samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate
|
||||
|
||||
metrics *queueManagerMetrics
|
||||
highestSentTimestampMetric *maxGauge
|
||||
pendingSamplesMetric prometheus.Gauge
|
||||
enqueueRetriesMetric prometheus.Counter
|
||||
|
@ -232,7 +271,7 @@ type QueueManager struct {
|
|||
}
|
||||
|
||||
// NewQueueManager builds a new QueueManager.
|
||||
func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager {
|
||||
func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
|
@ -258,6 +297,8 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string
|
|||
samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
|
||||
metrics: metrics,
|
||||
}
|
||||
|
||||
t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, client.Name(), t, walDir)
|
||||
|
@ -320,21 +361,21 @@ func (t *QueueManager) Start() {
|
|||
name := t.client.Name()
|
||||
ep := t.client.Endpoint()
|
||||
t.highestSentTimestampMetric = &maxGauge{
|
||||
Gauge: queueHighestSentTimestamp.WithLabelValues(name, ep),
|
||||
Gauge: t.metrics.queueHighestSentTimestamp.WithLabelValues(name, ep),
|
||||
}
|
||||
t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(name, ep)
|
||||
t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(name, ep)
|
||||
t.droppedSamplesTotal = droppedSamplesTotal.WithLabelValues(name, ep)
|
||||
t.numShardsMetric = numShards.WithLabelValues(name, ep)
|
||||
t.failedSamplesTotal = failedSamplesTotal.WithLabelValues(name, ep)
|
||||
t.sentBatchDuration = sentBatchDuration.WithLabelValues(name, ep)
|
||||
t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name, ep)
|
||||
t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name, ep)
|
||||
t.shardCapacity = shardCapacity.WithLabelValues(name, ep)
|
||||
t.maxNumShards = maxNumShards.WithLabelValues(name, ep)
|
||||
t.minNumShards = minNumShards.WithLabelValues(name, ep)
|
||||
t.desiredNumShards = desiredNumShards.WithLabelValues(name, ep)
|
||||
t.bytesSent = bytesSent.WithLabelValues(name, ep)
|
||||
t.pendingSamplesMetric = t.metrics.queuePendingSamples.WithLabelValues(name, ep)
|
||||
t.enqueueRetriesMetric = t.metrics.enqueueRetriesTotal.WithLabelValues(name, ep)
|
||||
t.droppedSamplesTotal = t.metrics.droppedSamplesTotal.WithLabelValues(name, ep)
|
||||
t.numShardsMetric = t.metrics.numShards.WithLabelValues(name, ep)
|
||||
t.failedSamplesTotal = t.metrics.failedSamplesTotal.WithLabelValues(name, ep)
|
||||
t.sentBatchDuration = t.metrics.sentBatchDuration.WithLabelValues(name, ep)
|
||||
t.succeededSamplesTotal = t.metrics.succeededSamplesTotal.WithLabelValues(name, ep)
|
||||
t.retriedSamplesTotal = t.metrics.retriedSamplesTotal.WithLabelValues(name, ep)
|
||||
t.shardCapacity = t.metrics.shardCapacity.WithLabelValues(name, ep)
|
||||
t.maxNumShards = t.metrics.maxNumShards.WithLabelValues(name, ep)
|
||||
t.minNumShards = t.metrics.minNumShards.WithLabelValues(name, ep)
|
||||
t.desiredNumShards = t.metrics.desiredNumShards.WithLabelValues(name, ep)
|
||||
t.bytesSent = t.metrics.bytesSent.WithLabelValues(name, ep)
|
||||
|
||||
// Initialise some metrics.
|
||||
t.shardCapacity.Set(float64(t.cfg.Capacity))
|
||||
|
@ -374,19 +415,19 @@ func (t *QueueManager) Stop() {
|
|||
// Delete metrics so we don't have alerts for queues that are gone.
|
||||
name := t.client.Name()
|
||||
ep := t.client.Endpoint()
|
||||
queueHighestSentTimestamp.DeleteLabelValues(name, ep)
|
||||
queuePendingSamples.DeleteLabelValues(name, ep)
|
||||
enqueueRetriesTotal.DeleteLabelValues(name, ep)
|
||||
droppedSamplesTotal.DeleteLabelValues(name, ep)
|
||||
numShards.DeleteLabelValues(name, ep)
|
||||
failedSamplesTotal.DeleteLabelValues(name, ep)
|
||||
sentBatchDuration.DeleteLabelValues(name, ep)
|
||||
succeededSamplesTotal.DeleteLabelValues(name, ep)
|
||||
retriedSamplesTotal.DeleteLabelValues(name, ep)
|
||||
shardCapacity.DeleteLabelValues(name, ep)
|
||||
maxNumShards.DeleteLabelValues(name, ep)
|
||||
minNumShards.DeleteLabelValues(name, ep)
|
||||
desiredNumShards.DeleteLabelValues(name, ep)
|
||||
t.metrics.queueHighestSentTimestamp.DeleteLabelValues(name, ep)
|
||||
t.metrics.queuePendingSamples.DeleteLabelValues(name, ep)
|
||||
t.metrics.enqueueRetriesTotal.DeleteLabelValues(name, ep)
|
||||
t.metrics.droppedSamplesTotal.DeleteLabelValues(name, ep)
|
||||
t.metrics.numShards.DeleteLabelValues(name, ep)
|
||||
t.metrics.failedSamplesTotal.DeleteLabelValues(name, ep)
|
||||
t.metrics.sentBatchDuration.DeleteLabelValues(name, ep)
|
||||
t.metrics.succeededSamplesTotal.DeleteLabelValues(name, ep)
|
||||
t.metrics.retriedSamplesTotal.DeleteLabelValues(name, ep)
|
||||
t.metrics.shardCapacity.DeleteLabelValues(name, ep)
|
||||
t.metrics.maxNumShards.DeleteLabelValues(name, ep)
|
||||
t.metrics.minNumShards.DeleteLabelValues(name, ep)
|
||||
t.metrics.desiredNumShards.DeleteLabelValues(name, ep)
|
||||
}
|
||||
|
||||
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
||||
|
|
|
@ -60,7 +60,8 @@ func TestSampleDelivery(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||
m.StoreSeries(series, 0)
|
||||
|
||||
// These should be received by the client.
|
||||
|
@ -88,7 +89,8 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||
m.StoreSeries(series, 0)
|
||||
m.Start()
|
||||
defer m.Stop()
|
||||
|
@ -128,7 +130,8 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||
m.StoreSeries(series, 0)
|
||||
|
||||
m.Start()
|
||||
|
@ -146,7 +149,8 @@ func TestShutdown(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
||||
samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend)
|
||||
m.StoreSeries(series, 0)
|
||||
m.Start()
|
||||
|
@ -182,7 +186,8 @@ func TestSeriesReset(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
||||
for i := 0; i < numSegments; i++ {
|
||||
series := []record.RefSeries{}
|
||||
for j := 0; j < numSeries; j++ {
|
||||
|
@ -210,7 +215,8 @@ func TestReshard(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||
m.StoreSeries(series, 0)
|
||||
|
||||
m.Start()
|
||||
|
@ -242,7 +248,8 @@ func TestReshardRaceWithStop(t *testing.T) {
|
|||
|
||||
go func() {
|
||||
for {
|
||||
m = NewQueueManager(nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m = NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||
m.Start()
|
||||
h.Unlock()
|
||||
h.Lock()
|
||||
|
@ -258,8 +265,9 @@ func TestReshardRaceWithStop(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestReleaseNoninternedString(t *testing.T) {
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
c := NewTestStorageClient()
|
||||
m := NewQueueManager(nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||
m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||
m.Start()
|
||||
|
||||
for i := 1; i < 1000; i++ {
|
||||
|
@ -304,8 +312,9 @@ func TestCalculateDesiredsShards(t *testing.T) {
|
|||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
client := NewTestStorageClient()
|
||||
m := NewQueueManager(nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline)
|
||||
m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline)
|
||||
m.numShards = c.startingShards
|
||||
m.samplesIn.incr(c.samplesIn)
|
||||
m.samplesOut.incr(c.samplesOut)
|
||||
|
@ -496,7 +505,8 @@ func BenchmarkSampleDelivery(b *testing.B) {
|
|||
testutil.Ok(b, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
m := NewQueueManager(nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
|
||||
m.StoreSeries(series, 0)
|
||||
|
||||
// These should be received by the client.
|
||||
|
@ -536,8 +546,9 @@ func BenchmarkStartup(b *testing.B) {
|
|||
logger = log.With(logger, "caller", log.DefaultCaller)
|
||||
|
||||
for n := 0; n < b.N; n++ {
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
c := NewTestBlockedStorageClient()
|
||||
m := NewQueueManager(nil, logger, dir,
|
||||
m := NewQueueManager(nil, metrics, logger, dir,
|
||||
newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
|
||||
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
|
||||
|
@ -586,8 +597,9 @@ func TestCalculateDesiredShards(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
||||
m := NewQueueManager(nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline)
|
||||
|
||||
// Need to start the queue manager so the proper metrics are initialized.
|
||||
// However we can stop it right away since we don't need to do any actual
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
config_util "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
|
@ -93,7 +92,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteReadConfigs: tc.cfgs,
|
||||
|
|
|
@ -65,7 +65,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
|
|||
logger: logging.Dedupe(l, 1*time.Minute),
|
||||
localStartTimeCallback: stCallback,
|
||||
}
|
||||
s.rws = NewWriteStorage(s.logger, walDir, flushDeadline)
|
||||
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline)
|
||||
return s
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
common_config "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
|
@ -30,7 +29,7 @@ func TestStorageLifecycle(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
|
@ -69,7 +68,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline)
|
||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
|
|
|
@ -46,9 +46,11 @@ var (
|
|||
|
||||
// WriteStorage represents all the remote write storage.
|
||||
type WriteStorage struct {
|
||||
reg prometheus.Registerer
|
||||
logger log.Logger
|
||||
mtx sync.Mutex
|
||||
|
||||
queueMetrics *queueManagerMetrics
|
||||
configHash string
|
||||
externalLabelHash string
|
||||
walDir string
|
||||
|
@ -58,12 +60,14 @@ type WriteStorage struct {
|
|||
}
|
||||
|
||||
// NewWriteStorage creates and runs a WriteStorage.
|
||||
func NewWriteStorage(logger log.Logger, walDir string, flushDeadline time.Duration) *WriteStorage {
|
||||
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string, flushDeadline time.Duration) *WriteStorage {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
rws := &WriteStorage{
|
||||
queues: make(map[string]*QueueManager),
|
||||
reg: reg,
|
||||
queueMetrics: newQueueManagerMetrics(reg),
|
||||
logger: logger,
|
||||
flushDeadline: flushDeadline,
|
||||
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
|
@ -148,7 +152,8 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
|||
return err
|
||||
}
|
||||
newQueues[hash] = NewQueueManager(
|
||||
prometheus.DefaultRegisterer,
|
||||
rws.reg,
|
||||
rws.queueMetrics,
|
||||
rws.logger,
|
||||
rws.walDir,
|
||||
rws.samplesIn,
|
||||
|
|
|
@ -111,7 +111,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
s := NewWriteStorage(nil, dir, time.Millisecond)
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: tc.cfgs,
|
||||
|
@ -133,7 +133,7 @@ func TestRestartOnNameChange(t *testing.T) {
|
|||
hash, err := toHash(cfg)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
s := NewWriteStorage(nil, dir, time.Millisecond)
|
||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
|
@ -159,7 +159,7 @@ func TestWriteStorageLifecycle(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewWriteStorage(nil, dir, defaultFlushDeadline)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline)
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.DefaultGlobalConfig,
|
||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||
|
@ -178,7 +178,7 @@ func TestUpdateExternalLabels(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewWriteStorage(nil, dir, time.Second)
|
||||
s := NewWriteStorage(nil, nil, dir, time.Second)
|
||||
|
||||
externalLabels := labels.FromStrings("external", "true")
|
||||
conf := &config.Config{
|
||||
|
@ -209,7 +209,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewWriteStorage(nil, dir, defaultFlushDeadline)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline)
|
||||
|
||||
conf := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{},
|
||||
|
@ -243,7 +243,7 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewWriteStorage(nil, dir, defaultFlushDeadline)
|
||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline)
|
||||
|
||||
c0 := &config.RemoteWriteConfig{
|
||||
RemoteTimeout: model.Duration(10 * time.Second),
|
||||
|
|
Loading…
Reference in New Issue