diff --git a/config/config.go b/config/config.go index 89e2a6d2e..8cb0e5e44 100644 --- a/config/config.go +++ b/config/config.go @@ -651,8 +651,9 @@ type QueueConfig struct { BatchSendDeadline model.Duration `yaml:"batch_send_deadline,omitempty"` // On recoverable errors, backoff exponentially. - MinBackoff model.Duration `yaml:"min_backoff,omitempty"` - MaxBackoff model.Duration `yaml:"max_backoff,omitempty"` + MinBackoff model.Duration `yaml:"min_backoff,omitempty"` + MaxBackoff model.Duration `yaml:"max_backoff,omitempty"` + RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"` } // MetadataConfig is the configuration for sending metadata to remote diff --git a/config/config_test.go b/config/config_test.go index 2602fc56a..bca8c98a3 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -734,6 +734,20 @@ func TestYAMLRoundtrip(t *testing.T) { require.Equal(t, want, got) } +func TestRemoteWriteRetryOnRateLimit(t *testing.T) { + want, err := LoadFile("testdata/remote_write_retry_on_rate_limit.good.yml") + require.NoError(t, err) + + out, err := yaml.Marshal(want) + + require.NoError(t, err) + got := &Config{} + require.NoError(t, yaml.UnmarshalStrict(out, got)) + + require.Equal(t, true, got.RemoteWriteConfigs[0].QueueConfig.RetryOnRateLimit) + require.Equal(t, false, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit) +} + func TestLoadConfig(t *testing.T) { // Parse a valid file that sets a global scrape timeout. This tests whether parsing // an overwritten default field in the global config permanently changes the default. diff --git a/config/testdata/remote_write_retry_on_rate_limit.good.yml b/config/testdata/remote_write_retry_on_rate_limit.good.yml new file mode 100644 index 000000000..757ec1e68 --- /dev/null +++ b/config/testdata/remote_write_retry_on_rate_limit.good.yml @@ -0,0 +1,5 @@ +remote_write: + - url: localhost:9090 + queue_config: + retry_on_http_429: true + - url: localhost:9201 diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 306b00cec..015af31a1 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1775,6 +1775,9 @@ queue_config: [ min_backoff: | default = 30ms ] # Maximum retry delay. [ max_backoff: | default = 100ms ] + # Retry upon receiving a 429 status code from the remote-write storage. + # This is experimental and might change in the future. + [ retry_on_http_429: | default = false ] # Configures the sending of series metadata to remote storage. # Metadata configuration is subject to change at any point diff --git a/storage/remote/client.go b/storage/remote/client.go index e122dcbf4..dae873d86 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -85,6 +85,8 @@ type Client struct { timeout time.Duration headers map[string]string + retryOnRateLimit bool + readQueries prometheus.Gauge readQueriesTotal *prometheus.CounterVec readQueriesDuration prometheus.Observer @@ -96,6 +98,7 @@ type ClientConfig struct { Timeout model.Duration HTTPClientConfig config_util.HTTPClientConfig Headers map[string]string + RetryOnRateLimit bool } // ReadClient uses the SAMPLES method of remote read to read series samples from remote server. @@ -140,11 +143,12 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { } return &Client{ - remoteName: name, - url: conf.URL, - Client: httpClient, - timeout: time.Duration(conf.Timeout), - headers: conf.Headers, + remoteName: name, + url: conf.URL, + Client: httpClient, + retryOnRateLimit: conf.RetryOnRateLimit, + timeout: time.Duration(conf.Timeout), + headers: conf.Headers, }, nil } @@ -209,7 +213,7 @@ func (c *Client) Store(ctx context.Context, req []byte) error { if httpResp.StatusCode/100 == 5 { return RecoverableError{err, defaultBackoff} } - if httpResp.StatusCode == http.StatusTooManyRequests { + if c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests { return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))} } return err diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 82067d3af..c5ae6b87c 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -84,6 +84,50 @@ func TestStoreHTTPErrorHandling(t *testing.T) { } } +func TestClientRetryAfter(t *testing.T) { + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, longErrMessage, 429) + }), + ) + defer server.Close() + + getClient := func(conf *ClientConfig) WriteClient { + hash, err := toHash(conf) + require.NoError(t, err) + c, err := NewWriteClient(hash, conf) + require.NoError(t, err) + return c + } + + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + conf := &ClientConfig{ + URL: &config_util.URL{URL: serverURL}, + Timeout: model.Duration(time.Second), + RetryOnRateLimit: false, + } + + c := getClient(conf) + err = c.Store(context.Background(), []byte{}) + if _, ok := err.(RecoverableError); ok { + t.Fatal("recoverable error not expected") + } + + conf = &ClientConfig{ + URL: &config_util.URL{URL: serverURL}, + Timeout: model.Duration(time.Second), + RetryOnRateLimit: true, + } + + c = getClient(conf) + err = c.Store(context.Background(), []byte{}) + if _, ok := err.(RecoverableError); !ok { + t.Fatal("recoverable error was expected") + } +} + func TestRetryAfterDuration(t *testing.T) { tc := []struct { name string diff --git a/storage/remote/write.go b/storage/remote/write.go index 4929b439c..76c2a650c 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -135,6 +135,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { Timeout: rwConf.RemoteTimeout, HTTPClientConfig: rwConf.HTTPClientConfig, Headers: rwConf.Headers, + RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit, }) if err != nil { return err