From 5169f990f9d8e2c09219d1d1d5b1ba28e5d30451 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 1 Aug 2017 11:00:40 +0100 Subject: [PATCH] Review feedback: add yaml struct tags, don't embed queue config. Also, rename QueueManageConfig to QueueConfig, for consistency with tags. --- config/config.go | 32 ++++++++++++++-------------- config/config_test.go | 8 +++---- storage/remote/queue_manager.go | 8 +++---- storage/remote/queue_manager_test.go | 16 +++++++------- storage/remote/write.go | 2 +- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/config/config.go b/config/config.go index 2f3e13cf2..f757c5c24 100644 --- a/config/config.go +++ b/config/config.go @@ -172,12 +172,12 @@ var ( // DefaultRemoteWriteConfig is the default remote write configuration. DefaultRemoteWriteConfig = RemoteWriteConfig{ - RemoteTimeout: model.Duration(30 * time.Second), - QueueManagerConfig: DefaultQueueManagerConfig, + RemoteTimeout: model.Duration(30 * time.Second), + QueueConfig: DefaultQueueConfig, } - // DefaultQueueManagerConfig is the default remote queue configuration. - DefaultQueueManagerConfig = QueueManagerConfig{ + // DefaultQueueConfig is the default remote queue configuration. + DefaultQueueConfig = QueueConfig{ // With a maximum of 1000 shards, assuming an average of 100ms remote write // time and 100 samples per batch, we will be able to push 1M samples/s. MaxShards: 1000, @@ -185,7 +185,7 @@ var ( // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At // 1000 shards, this will buffer 100M samples total. - QueueCapacity: 100 * 1000, + Capacity: 100 * 1000, BatchSendDeadline: 5 * time.Second, // Max number of times to retry a batch on recoverable errors. @@ -1410,8 +1410,8 @@ type RemoteWriteConfig struct { // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. - HTTPClientConfig HTTPClientConfig `yaml:",inline"` - QueueManagerConfig QueueManagerConfig `yaml:",inline"` + HTTPClientConfig HTTPClientConfig `yaml:",inline"` + QueueConfig QueueConfig `yaml:"queue_config,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -1438,27 +1438,27 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err return nil } -// QueueManagerConfig is the configuration for the queue used to write to remote +// QueueConfig is the configuration for the queue used to write to remote // storage. -type QueueManagerConfig struct { +type QueueConfig struct { // Number of samples to buffer per shard before we start dropping them. - QueueCapacity int + Capacity int `yaml:"capacity,omitempty"` // Max number of shards, i.e. amount of concurrency. - MaxShards int + MaxShards int `yaml:"max_shards,omitempty"` // Maximum number of samples per send. - MaxSamplesPerSend int + MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"` // Maximum time sample will wait in buffer. - BatchSendDeadline time.Duration + BatchSendDeadline time.Duration `yaml:"batch_send_deadline,omitempty"` // Max number of times to retry a batch on recoverable errors. - MaxRetries int + MaxRetries int `yaml:"max_retries,omitempty"` // On recoverable errors, backoff exponentially. - MinBackoff time.Duration - MaxBackoff time.Duration + MinBackoff time.Duration `yaml:"min_backoff,omitempty"` + MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` } // RemoteReadConfig is the configuration for reading from remote storage. diff --git a/config/config_test.go b/config/config_test.go index d80edc73c..8e4590b3f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -66,12 +66,12 @@ var expectedConf = &Config{ Action: RelabelDrop, }, }, - QueueManagerConfig: DefaultQueueManagerConfig, + QueueConfig: DefaultQueueConfig, }, { - URL: mustParseURL("http://remote2/push"), - RemoteTimeout: model.Duration(30 * time.Second), - QueueManagerConfig: DefaultQueueManagerConfig, + URL: mustParseURL("http://remote2/push"), + RemoteTimeout: model.Duration(30 * time.Second), + QueueConfig: DefaultQueueConfig, }, }, diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3450f2eab..f2bc85571 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -135,7 +135,7 @@ type StorageClient interface { // QueueManager manages a queue of samples to be sent to the Storage // indicated by the provided StorageClient. type QueueManager struct { - cfg config.QueueManagerConfig + cfg config.QueueConfig externalLabels model.LabelSet relabelConfigs []*config.RelabelConfig client StorageClient @@ -154,7 +154,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(cfg config.QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { +func NewQueueManager(cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { t := &QueueManager{ cfg: cfg, externalLabels: externalLabels, @@ -173,7 +173,7 @@ func NewQueueManager(cfg config.QueueManagerConfig, externalLabels model.LabelSe } t.shards = t.newShards(t.numShards) numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) - queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) + queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) return t } @@ -359,7 +359,7 @@ type shards struct { func (t *QueueManager) newShards(numShards int) *shards { queues := make([]chan *model.Sample, numShards) for i := 0; i < numShards; i++ { - queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity) + queues[i] = make(chan *model.Sample, t.cfg.Capacity) } s := &shards{ qm: t, diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 73f5bc1cf..4c169cbb9 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -82,7 +82,7 @@ func (c *TestStorageClient) Name() string { func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := config.DefaultQueueManagerConfig.QueueCapacity * 2 + n := config.DefaultQueueConfig.Capacity * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -98,7 +98,7 @@ func TestSampleDelivery(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - cfg := config.DefaultQueueManagerConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 m := NewQueueManager(cfg, nil, nil, c) @@ -118,7 +118,7 @@ func TestSampleDelivery(t *testing.T) { func TestSampleDeliveryOrder(t *testing.T) { ts := 10 - n := config.DefaultQueueManagerConfig.MaxSamplesPerSend * ts + n := config.DefaultQueueConfig.MaxSamplesPerSend * ts samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -134,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewQueueManager(config.DefaultQueueManagerConfig, nil, nil, c) + m := NewQueueManager(config.DefaultQueueConfig, nil, nil, c) // These should be received by the client. for _, s := range samples { @@ -195,7 +195,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // `MaxSamplesPerSend*Shards` samples should be consumed by the // per-shard goroutines, and then another `MaxSamplesPerSend` // should be left on the queue. - n := config.DefaultQueueManagerConfig.MaxSamplesPerSend * 2 + n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -209,9 +209,9 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } c := NewTestBlockedStorageClient() - cfg := config.DefaultQueueManagerConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 - cfg.QueueCapacity = n + cfg.Capacity = n m := NewQueueManager(cfg, nil, nil, c) m.Start() @@ -241,7 +241,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { time.Sleep(10 * time.Millisecond) } - if m.queueLen() != config.DefaultQueueManagerConfig.MaxSamplesPerSend { + if m.queueLen() != config.DefaultQueueConfig.MaxSamplesPerSend { t.Fatalf("Failed to drain QueueManager queue, %d elements left", m.queueLen(), ) diff --git a/storage/remote/write.go b/storage/remote/write.go index 8960e2cbb..ddfa9489a 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -45,7 +45,7 @@ func (w *Writer) ApplyConfig(conf *config.Config) error { return err } newQueues = append(newQueues, NewQueueManager( - rwConf.QueueManagerConfig, + rwConf.QueueConfig, conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c,