Skip to content

Commit

Permalink
Adds support to configure retry on Rate-Limiting from remote-write co…
Browse files Browse the repository at this point in the history
…nfig.

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Feb 16, 2021
1 parent cdc71a1 commit 77c20fd
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 8 deletions.
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,9 @@ type QueueConfig struct {
BatchSendDeadline model.Duration `yaml:"batch_send_deadline,omitempty"`

// On recoverable errors, backoff exponentially.
MinBackoff model.Duration `yaml:"min_backoff,omitempty"`
MaxBackoff model.Duration `yaml:"max_backoff,omitempty"`
MinBackoff model.Duration `yaml:"min_backoff,omitempty"`
MaxBackoff model.Duration `yaml:"max_backoff,omitempty"`
RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"`
}

// MetadataConfig is the configuration for sending metadata to remote
Expand Down
14 changes: 14 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,20 @@ func TestYAMLRoundtrip(t *testing.T) {
require.Equal(t, want, got)
}

func TestRemoteWriteRetryOnRateLimit(t *testing.T) {
want, err := LoadFile("testdata/remote_write_retry_on_rate_limit.good.yml")
require.NoError(t, err)

out, err := yaml.Marshal(want)

require.NoError(t, err)
got := &Config{}
require.NoError(t, yaml.UnmarshalStrict(out, got))

require.Equal(t, true, got.RemoteWriteConfigs[0].QueueConfig.RetryOnRateLimit)
require.Equal(t, false, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit)
}

func TestLoadConfig(t *testing.T) {
// Parse a valid file that sets a global scrape timeout. This tests whether parsing
// an overwritten default field in the global config permanently changes the default.
Expand Down
5 changes: 5 additions & 0 deletions config/testdata/remote_write_retry_on_rate_limit.good.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
remote_write:
- url: localhost:9090
queue_config:
retry_on_http_429: true
- url: localhost:9201
3 changes: 3 additions & 0 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,9 @@ queue_config:
[ min_backoff: <duration> | default = 30ms ]
# Maximum retry delay.
[ max_backoff: <duration> | default = 100ms ]
# Retry upon receiving a 429 status code from the remote-write storage.
# This is experimental and might change in the future.
[ retry_on_http_429: <boolean> | default = false ]

# Configures the sending of series metadata to remote storage.
# Metadata configuration is subject to change at any point
Expand Down
16 changes: 10 additions & 6 deletions storage/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type Client struct {
timeout time.Duration
headers map[string]string

retryOnRateLimit bool

readQueries prometheus.Gauge
readQueriesTotal *prometheus.CounterVec
readQueriesDuration prometheus.Observer
Expand All @@ -96,6 +98,7 @@ type ClientConfig struct {
Timeout model.Duration
HTTPClientConfig config_util.HTTPClientConfig
Headers map[string]string
RetryOnRateLimit bool
}

// ReadClient uses the SAMPLES method of remote read to read series samples from remote server.
Expand Down Expand Up @@ -140,11 +143,12 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
}

return &Client{
remoteName: name,
url: conf.URL,
Client: httpClient,
timeout: time.Duration(conf.Timeout),
headers: conf.Headers,
remoteName: name,
url: conf.URL,
Client: httpClient,
retryOnRateLimit: conf.RetryOnRateLimit,
timeout: time.Duration(conf.Timeout),
headers: conf.Headers,
}, nil
}

Expand Down Expand Up @@ -209,7 +213,7 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
if httpResp.StatusCode/100 == 5 {
return RecoverableError{err, defaultBackoff}
}
if httpResp.StatusCode == http.StatusTooManyRequests {
if c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests {
return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
}
return err
Expand Down
44 changes: 44 additions & 0 deletions storage/remote/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,50 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
}
}

func TestClientRetryAfter(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, longErrMessage, 429)
}),
)
defer server.Close()

getClient := func(conf *ClientConfig) WriteClient {
hash, err := toHash(conf)
require.NoError(t, err)
c, err := NewWriteClient(hash, conf)
require.NoError(t, err)
return c
}

serverURL, err := url.Parse(server.URL)
require.NoError(t, err)

conf := &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
RetryOnRateLimit: false,
}

c := getClient(conf)
err = c.Store(context.Background(), []byte{})
if _, ok := err.(RecoverableError); ok {
t.Fatal("recoverable error not expected")
}

conf = &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
RetryOnRateLimit: true,
}

c = getClient(conf)
err = c.Store(context.Background(), []byte{})
if _, ok := err.(RecoverableError); !ok {
t.Fatal("recoverable error was expected")
}
}

func TestRetryAfterDuration(t *testing.T) {
tc := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions storage/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig,
Headers: rwConf.Headers,
RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit,
})
if err != nil {
return err
Expand Down

0 comments on commit 77c20fd

Please sign in to comment.