Port 'Make queue manager configurable.' to 2.0, see #2991

pull/3336/head
Tom Wilkie 7 years ago
parent 3760f56c0c
commit 8fe0212ff7

@ -124,42 +124,6 @@ func init() {
prometheus.MustRegister(numShards)
}
// QueueManagerConfig is the configuration for the queue used to write to remote
// storage.
type QueueManagerConfig struct {
// Number of samples to buffer per shard before we start dropping them.
QueueCapacity int
// Max number of shards, i.e. amount of concurrency.
MaxShards int
// Maximum number of samples per send.
MaxSamplesPerSend int
// Maximum time sample will wait in buffer.
BatchSendDeadline time.Duration
// Max number of times to retry a batch on recoverable errors.
MaxRetries int
// On recoverable errors, backoff exponentially.
MinBackoff time.Duration
MaxBackoff time.Duration
}
// defaultQueueManagerConfig is the default remote queue configuration.
var defaultQueueManagerConfig = QueueManagerConfig{
// With a maximum of 1000 shards, assuming an average of 100ms remote write
// time and 100 samples per batch, we will be able to push 1M samples/s.
MaxShards: 1000,
MaxSamplesPerSend: 100,
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
// 1000 shards, this will buffer 100M samples total.
QueueCapacity: 100 * 1000,
BatchSendDeadline: 5 * time.Second,
// Max number of times to retry a batch on recoverable errors.
MaxRetries: 10,
MinBackoff: 30 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
}
// StorageClient defines an interface for sending a batch of samples to an
// external timeseries database.
type StorageClient interface {
@ -174,7 +138,7 @@ type StorageClient interface {
type QueueManager struct {
logger log.Logger
cfg QueueManagerConfig
cfg config.QueueConfig
externalLabels model.LabelSet
relabelConfigs []*config.RelabelConfig
client StorageClient
@ -193,7 +157,7 @@ type QueueManager struct {
}
// NewQueueManager builds a new QueueManager.
func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
}
@ -216,7 +180,7 @@ func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels m
}
t.shards = t.newShards(t.numShards)
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity))
return t
}
@ -408,7 +372,7 @@ type shards struct {
func (t *QueueManager) newShards(numShards int) *shards {
queues := make([]chan *model.Sample, numShards)
for i := 0; i < numShards; i++ {
queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity)
queues[i] = make(chan *model.Sample, t.cfg.Capacity)
}
s := &shards{
qm: t,

@ -21,6 +21,7 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
type TestStorageClient struct {
@ -81,7 +82,7 @@ func (c *TestStorageClient) Name() string {
func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := defaultQueueManagerConfig.QueueCapacity * 2
n := config.DefaultQueueConfig.Capacity * 2
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
@ -97,7 +98,7 @@ func TestSampleDelivery(t *testing.T) {
c := NewTestStorageClient()
c.expectSamples(samples[:len(samples)/2])
cfg := defaultQueueManagerConfig
cfg := config.DefaultQueueConfig
cfg.MaxShards = 1
m := NewQueueManager(nil, cfg, nil, nil, c)
@ -117,7 +118,7 @@ func TestSampleDelivery(t *testing.T) {
func TestSampleDeliveryOrder(t *testing.T) {
ts := 10
n := defaultQueueManagerConfig.MaxSamplesPerSend * ts
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
@ -133,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestStorageClient()
c.expectSamples(samples)
m := NewQueueManager(nil, defaultQueueManagerConfig, nil, nil, c)
m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c)
// These should be received by the client.
for _, s := range samples {
@ -194,7 +195,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
// `MaxSamplesPerSend*Shards` samples should be consumed by the
// per-shard goroutines, and then another `MaxSamplesPerSend`
// should be left on the queue.
n := defaultQueueManagerConfig.MaxSamplesPerSend * 2
n := config.DefaultQueueConfig.MaxSamplesPerSend * 2
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
@ -208,9 +209,9 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
}
c := NewTestBlockedStorageClient()
cfg := defaultQueueManagerConfig
cfg := config.DefaultQueueConfig
cfg.MaxShards = 1
cfg.QueueCapacity = n
cfg.Capacity = n
m := NewQueueManager(nil, cfg, nil, nil, c)
m.Start()
@ -240,7 +241,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}
if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend {
if m.queueLen() != config.DefaultQueueConfig.MaxSamplesPerSend {
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
m.queueLen(),
)

@ -68,7 +68,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
}
newQueues = append(newQueues, NewQueueManager(
s.logger,
defaultQueueManagerConfig,
config.DefaultQueueConfig,
conf.GlobalConfig.ExternalLabels,
rwConf.WriteRelabelConfigs,
c,

Loading…
Cancel
Save