feat: client RPC is retries on ErrRetryElsewhere error and forwardRequestToLeader method retries ErrRetryLater error (#16099)

pull/16140/head^2
Poonam Jadhav 2023-02-06 11:31:25 -05:00 committed by GitHub
parent 674c5570b6
commit 24c431270c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 14 deletions

View File

@ -16,6 +16,7 @@ import (
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -296,7 +297,16 @@ TRY:
// Use the zero value for RPCInfo if the request doesn't implement RPCInfo // Use the zero value for RPCInfo if the request doesn't implement RPCInfo
info, _ := args.(structs.RPCInfo) info, _ := args.(structs.RPCInfo)
if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry { retryableMessages := []error{
// If we are chunking and it doesn't seem to have completed, try again.
ErrChunkingResubmit,
// These rate limit errors are returned before the handler is called, so are
// safe to retry.
rpcRate.ErrRetryElsewhere,
}
if retry := canRetry(info, rpcErr, firstCheck, c.config, retryableMessages); !retry {
c.logger.Error("RPC failed to server", c.logger.Error("RPC failed to server",
"method", method, "method", method,
"server", server.Addr, "server", server.Addr,

View File

@ -557,7 +557,7 @@ func (c *limitedConn) Read(b []byte) (n int, err error) {
} }
// canRetry returns true if the request and error indicate that a retry is safe. // canRetry returns true if the request and error indicate that a retry is safe.
func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) bool { func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config, retryableMessages []error) bool {
if info != nil { if info != nil {
timedOut, timeoutError := info.HasTimedOut(start, config.RPCHoldTimeout, config.MaxQueryTime, config.DefaultQueryTime) timedOut, timeoutError := info.HasTimedOut(start, config.RPCHoldTimeout, config.MaxQueryTime, config.DefaultQueryTime)
if timeoutError != nil { if timeoutError != nil {
@ -579,15 +579,6 @@ func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config)
return true return true
} }
retryableMessages := []error{
// If we are chunking and it doesn't seem to have completed, try again.
ErrChunkingResubmit,
// These rate limit errors are returned before the handler is called, so are
// safe to retry.
rate.ErrRetryElsewhere,
rate.ErrRetryLater,
}
for _, m := range retryableMessages { for _, m := range retryableMessages {
if err != nil && strings.Contains(err.Error(), m.Error()) { if err != nil && strings.Contains(err.Error(), m.Error()) {
return true return true
@ -747,7 +738,14 @@ CHECK_LEADER:
} }
} }
if retry := canRetry(info, rpcErr, firstCheck, s.config); retry { retryableMessages := []error{
// If we are chunking and it doesn't seem to have completed, try again.
ErrChunkingResubmit,
rate.ErrRetryLater,
}
if retry := canRetry(info, rpcErr, firstCheck, s.config, retryableMessages); retry {
// Gate the request until there is a leader // Gate the request until there is a leader
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction) jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction)
select { select {

View File

@ -31,6 +31,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/rate"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
agent_grpc "github.com/hashicorp/consul/agent/grpc-internal" agent_grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
@ -1287,12 +1288,16 @@ func TestCanRetry(t *testing.T) {
config := DefaultConfig() config := DefaultConfig()
now := time.Now() now := time.Now()
config.RPCHoldTimeout = 7 * time.Second config.RPCHoldTimeout = 7 * time.Second
retryableMessages := []error{
ErrChunkingResubmit,
rpcRate.ErrRetryElsewhere,
}
run := func(t *testing.T, tc testCase) { run := func(t *testing.T, tc testCase) {
timeOutValue := tc.timeout timeOutValue := tc.timeout
if timeOutValue.IsZero() { if timeOutValue.IsZero() {
timeOutValue = now timeOutValue = now
} }
require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config)) require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config, retryableMessages))
} }
var testCases = []testCase{ var testCases = []testCase{
@ -1319,7 +1324,7 @@ func TestCanRetry(t *testing.T) {
{ {
name: "ErrRetryLater", name: "ErrRetryLater",
err: fmt.Errorf("some wrapping: %w", rate.ErrRetryLater), err: fmt.Errorf("some wrapping: %w", rate.ErrRetryLater),
expected: true, expected: false,
}, },
{ {
name: "EOF on read request", name: "EOF on read request",