mirror of https://github.com/prometheus/prometheus
Adds support to configure retry on Rate-Limiting from remote-write config.
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>pull/8477/head
parent
cdc71a1c2a
commit
77c20fd2f8
|
@ -651,8 +651,9 @@ type QueueConfig struct {
|
||||||
BatchSendDeadline model.Duration `yaml:"batch_send_deadline,omitempty"`
|
BatchSendDeadline model.Duration `yaml:"batch_send_deadline,omitempty"`
|
||||||
|
|
||||||
// On recoverable errors, backoff exponentially.
|
// On recoverable errors, backoff exponentially.
|
||||||
MinBackoff model.Duration `yaml:"min_backoff,omitempty"`
|
MinBackoff model.Duration `yaml:"min_backoff,omitempty"`
|
||||||
MaxBackoff model.Duration `yaml:"max_backoff,omitempty"`
|
MaxBackoff model.Duration `yaml:"max_backoff,omitempty"`
|
||||||
|
RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetadataConfig is the configuration for sending metadata to remote
|
// MetadataConfig is the configuration for sending metadata to remote
|
||||||
|
|
|
@ -734,6 +734,20 @@ func TestYAMLRoundtrip(t *testing.T) {
|
||||||
require.Equal(t, want, got)
|
require.Equal(t, want, got)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoteWriteRetryOnRateLimit(t *testing.T) {
|
||||||
|
want, err := LoadFile("testdata/remote_write_retry_on_rate_limit.good.yml")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
out, err := yaml.Marshal(want)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
got := &Config{}
|
||||||
|
require.NoError(t, yaml.UnmarshalStrict(out, got))
|
||||||
|
|
||||||
|
require.Equal(t, true, got.RemoteWriteConfigs[0].QueueConfig.RetryOnRateLimit)
|
||||||
|
require.Equal(t, false, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit)
|
||||||
|
}
|
||||||
|
|
||||||
func TestLoadConfig(t *testing.T) {
|
func TestLoadConfig(t *testing.T) {
|
||||||
// Parse a valid file that sets a global scrape timeout. This tests whether parsing
|
// Parse a valid file that sets a global scrape timeout. This tests whether parsing
|
||||||
// an overwritten default field in the global config permanently changes the default.
|
// an overwritten default field in the global config permanently changes the default.
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
remote_write:
|
||||||
|
- url: localhost:9090
|
||||||
|
queue_config:
|
||||||
|
retry_on_http_429: true
|
||||||
|
- url: localhost:9201
|
|
@ -1775,6 +1775,9 @@ queue_config:
|
||||||
[ min_backoff: <duration> | default = 30ms ]
|
[ min_backoff: <duration> | default = 30ms ]
|
||||||
# Maximum retry delay.
|
# Maximum retry delay.
|
||||||
[ max_backoff: <duration> | default = 100ms ]
|
[ max_backoff: <duration> | default = 100ms ]
|
||||||
|
# Retry upon receiving a 429 status code from the remote-write storage.
|
||||||
|
# This is experimental and might change in the future.
|
||||||
|
[ retry_on_http_429: <boolean> | default = false ]
|
||||||
|
|
||||||
# Configures the sending of series metadata to remote storage.
|
# Configures the sending of series metadata to remote storage.
|
||||||
# Metadata configuration is subject to change at any point
|
# Metadata configuration is subject to change at any point
|
||||||
|
|
|
@ -85,6 +85,8 @@ type Client struct {
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
headers map[string]string
|
headers map[string]string
|
||||||
|
|
||||||
|
retryOnRateLimit bool
|
||||||
|
|
||||||
readQueries prometheus.Gauge
|
readQueries prometheus.Gauge
|
||||||
readQueriesTotal *prometheus.CounterVec
|
readQueriesTotal *prometheus.CounterVec
|
||||||
readQueriesDuration prometheus.Observer
|
readQueriesDuration prometheus.Observer
|
||||||
|
@ -96,6 +98,7 @@ type ClientConfig struct {
|
||||||
Timeout model.Duration
|
Timeout model.Duration
|
||||||
HTTPClientConfig config_util.HTTPClientConfig
|
HTTPClientConfig config_util.HTTPClientConfig
|
||||||
Headers map[string]string
|
Headers map[string]string
|
||||||
|
RetryOnRateLimit bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadClient uses the SAMPLES method of remote read to read series samples from remote server.
|
// ReadClient uses the SAMPLES method of remote read to read series samples from remote server.
|
||||||
|
@ -140,11 +143,12 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
remoteName: name,
|
remoteName: name,
|
||||||
url: conf.URL,
|
url: conf.URL,
|
||||||
Client: httpClient,
|
Client: httpClient,
|
||||||
timeout: time.Duration(conf.Timeout),
|
retryOnRateLimit: conf.RetryOnRateLimit,
|
||||||
headers: conf.Headers,
|
timeout: time.Duration(conf.Timeout),
|
||||||
|
headers: conf.Headers,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,7 +213,7 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
|
||||||
if httpResp.StatusCode/100 == 5 {
|
if httpResp.StatusCode/100 == 5 {
|
||||||
return RecoverableError{err, defaultBackoff}
|
return RecoverableError{err, defaultBackoff}
|
||||||
}
|
}
|
||||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
if c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests {
|
||||||
return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
|
return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -84,6 +84,50 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientRetryAfter(t *testing.T) {
|
||||||
|
server := httptest.NewServer(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
http.Error(w, longErrMessage, 429)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
getClient := func(conf *ClientConfig) WriteClient {
|
||||||
|
hash, err := toHash(conf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
c, err := NewWriteClient(hash, conf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
serverURL, err := url.Parse(server.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
conf := &ClientConfig{
|
||||||
|
URL: &config_util.URL{URL: serverURL},
|
||||||
|
Timeout: model.Duration(time.Second),
|
||||||
|
RetryOnRateLimit: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
c := getClient(conf)
|
||||||
|
err = c.Store(context.Background(), []byte{})
|
||||||
|
if _, ok := err.(RecoverableError); ok {
|
||||||
|
t.Fatal("recoverable error not expected")
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = &ClientConfig{
|
||||||
|
URL: &config_util.URL{URL: serverURL},
|
||||||
|
Timeout: model.Duration(time.Second),
|
||||||
|
RetryOnRateLimit: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
c = getClient(conf)
|
||||||
|
err = c.Store(context.Background(), []byte{})
|
||||||
|
if _, ok := err.(RecoverableError); !ok {
|
||||||
|
t.Fatal("recoverable error was expected")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRetryAfterDuration(t *testing.T) {
|
func TestRetryAfterDuration(t *testing.T) {
|
||||||
tc := []struct {
|
tc := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
|
@ -135,6 +135,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
Timeout: rwConf.RemoteTimeout,
|
Timeout: rwConf.RemoteTimeout,
|
||||||
HTTPClientConfig: rwConf.HTTPClientConfig,
|
HTTPClientConfig: rwConf.HTTPClientConfig,
|
||||||
Headers: rwConf.Headers,
|
Headers: rwConf.Headers,
|
||||||
|
RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue