mirror of https://github.com/hashicorp/consul
RPC Timeout/Retries account for blocking requests (#8978)
parent
b130f355ee
commit
f785c5b332
|
@ -0,0 +1,4 @@
|
|||
```release-note:bug
|
||||
use the MaxQueryTime instead of RPCHoldTimeout for blocking RPC queries
|
||||
[[GH-8978](https://github.com/hashicorp/consul/pull/8978)].
|
||||
```
|
|
@ -1665,7 +1665,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
|
|||
defer codec.Close()
|
||||
|
||||
// Run the query, do not wait for leader, never any contact with leader, should fail
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err.Error() != structs.ErrNoLeader.Error() {
|
||||
t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out)
|
||||
}
|
||||
|
||||
|
@ -1677,6 +1677,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
|
|||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
out = structs.IndexedServices{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1696,24 +1697,25 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
|
|||
|
||||
args.AllowStale = false
|
||||
// Since the leader is now down, non-stale query should fail now
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() {
|
||||
out = structs.IndexedServices{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err.Error() != structs.ErrLeaderNotTracked.Error() {
|
||||
t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out)
|
||||
}
|
||||
if out.KnownLeader {
|
||||
t.Fatalf("should not have a leader anymore: %#v", out)
|
||||
}
|
||||
|
||||
// With stale, request should still work
|
||||
args.AllowStale = true
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should find old service
|
||||
if len(out.Services) != 1 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
|
||||
if out.KnownLeader {
|
||||
t.Fatalf("should not have a leader anymore: %#v", out)
|
||||
}
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
out = structs.IndexedServices{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
if out.KnownLeader || len(out.Services) != 1 {
|
||||
r.Fatalf("got %t nodes want %d", out.KnownLeader, len(out.Services))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_ListServiceNodes(t *testing.T) {
|
||||
|
|
|
@ -285,18 +285,16 @@ TRY:
|
|||
|
||||
// Use the zero value for RPCInfo if the request doesn't implement RPCInfo
|
||||
info, _ := args.(structs.RPCInfo)
|
||||
if retry := canRetry(info, rpcErr); !retry {
|
||||
if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry {
|
||||
return rpcErr
|
||||
}
|
||||
|
||||
// We can wait a bit and retry!
|
||||
if time.Since(firstCheck) < c.config.RPCHoldTimeout {
|
||||
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / jitterFraction)
|
||||
select {
|
||||
case <-time.After(jitter):
|
||||
goto TRY
|
||||
case <-c.shutdownCh:
|
||||
}
|
||||
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
|
||||
select {
|
||||
case <-time.After(jitter):
|
||||
goto TRY
|
||||
case <-c.shutdownCh:
|
||||
}
|
||||
return rpcErr
|
||||
}
|
||||
|
|
|
@ -75,12 +75,6 @@ var RPCSummaries = []prometheus.SummaryDefinition{
|
|||
}
|
||||
|
||||
const (
|
||||
// jitterFraction is a the limit to the amount of jitter we apply
|
||||
// to a user specified MaxQueryTime. We divide the specified time by
|
||||
// the fraction. So 16 == 6.25% limit of jitter. This same fraction
|
||||
// is applied to the RPCHoldTimeout
|
||||
jitterFraction = 16
|
||||
|
||||
// Warn if the Raft command is larger than this.
|
||||
// If it's over 1MB something is probably being abusive.
|
||||
raftWarnSize = 1024 * 1024
|
||||
|
@ -526,7 +520,14 @@ func (c *limitedConn) Read(b []byte) (n int, err error) {
|
|||
}
|
||||
|
||||
// canRetry returns true if the request and error indicate that a retry is safe.
|
||||
func canRetry(info structs.RPCInfo, err error) bool {
|
||||
func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) bool {
|
||||
if info != nil && info.HasTimedOut(start, config.RPCHoldTimeout, config.MaxQueryTime, config.DefaultQueryTime) {
|
||||
// RPCInfo timeout may include extra time for MaxQueryTime
|
||||
return false
|
||||
} else if info == nil && time.Since(start) > config.RPCHoldTimeout {
|
||||
// When not RPCInfo, timeout is only RPCHoldTimeout
|
||||
return false
|
||||
}
|
||||
// No leader errors are always safe to retry since no state could have
|
||||
// been changed.
|
||||
if structs.IsErrNoLeader(err) {
|
||||
|
@ -545,16 +546,16 @@ func canRetry(info structs.RPCInfo, err error) bool {
|
|||
|
||||
// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader
|
||||
// Returns a bool of if forwarding was performed, as well as any error
|
||||
func (s *Server) ForwardRPC(method string, req structs.RPCInfo, reply interface{}) (bool, error) {
|
||||
var firstCheck time.Time
|
||||
func (s *Server) ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) {
|
||||
firstCheck := time.Now()
|
||||
|
||||
// Handle DC forwarding
|
||||
dc := req.RequestDatacenter()
|
||||
dc := info.RequestDatacenter()
|
||||
if dc != s.config.Datacenter {
|
||||
// Local tokens only work within the current datacenter. Check to see
|
||||
// if we are attempting to forward one to a remote datacenter and strip
|
||||
// it, falling back on the anonymous token on the other end.
|
||||
if token := req.TokenSecret(); token != "" {
|
||||
if token := info.TokenSecret(); token != "" {
|
||||
done, ident, err := s.ResolveIdentityFromToken(token)
|
||||
if done {
|
||||
if err != nil && !acl.IsErrNotFound(err) {
|
||||
|
@ -562,18 +563,18 @@ func (s *Server) ForwardRPC(method string, req structs.RPCInfo, reply interface{
|
|||
}
|
||||
if ident != nil && ident.IsLocal() {
|
||||
// Strip it from the request.
|
||||
req.SetTokenSecret("")
|
||||
defer req.SetTokenSecret(token)
|
||||
info.SetTokenSecret("")
|
||||
defer info.SetTokenSecret(token)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err := s.forwardDC(method, dc, req, reply)
|
||||
err := s.forwardDC(method, dc, info, reply)
|
||||
return true, err
|
||||
}
|
||||
|
||||
// Check if we can allow a stale read, ensure our local DB is initialized
|
||||
if req.IsRead() && req.AllowStaleRead() && !s.raft.LastContact().IsZero() {
|
||||
if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
@ -596,20 +597,15 @@ CHECK_LEADER:
|
|||
// Handle the case of a known leader
|
||||
if leader != nil {
|
||||
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
|
||||
method, req, reply)
|
||||
if rpcErr != nil && canRetry(req, rpcErr) {
|
||||
goto RETRY
|
||||
method, info, reply)
|
||||
if rpcErr == nil {
|
||||
return true, nil
|
||||
}
|
||||
return true, rpcErr
|
||||
}
|
||||
|
||||
RETRY:
|
||||
// Gate the request until there is a leader
|
||||
if firstCheck.IsZero() {
|
||||
firstCheck = time.Now()
|
||||
}
|
||||
if time.Since(firstCheck) < s.config.RPCHoldTimeout {
|
||||
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
|
||||
if retry := canRetry(info, rpcErr, firstCheck, s.config); retry {
|
||||
// Gate the request until there is a leader
|
||||
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction)
|
||||
select {
|
||||
case <-time.After(jitter):
|
||||
goto CHECK_LEADER
|
||||
|
@ -832,7 +828,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
|
|||
}
|
||||
|
||||
// Apply a small amount of jitter to the request.
|
||||
queryTimeout += lib.RandomStagger(queryTimeout / jitterFraction)
|
||||
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
|
||||
|
||||
// wrap the base context with a deadline
|
||||
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout))
|
||||
|
@ -933,7 +929,7 @@ func (s *Server) consistentRead() error {
|
|||
if s.isReadyForConsistentReads() {
|
||||
return nil
|
||||
}
|
||||
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
|
||||
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction)
|
||||
deadline := time.Now().Add(s.config.RPCHoldTimeout)
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
|
|
|
@ -962,10 +962,17 @@ func TestCanRetry(t *testing.T) {
|
|||
req structs.RPCInfo
|
||||
err error
|
||||
expected bool
|
||||
timeout time.Time
|
||||
}
|
||||
|
||||
config := DefaultConfig()
|
||||
now := time.Now()
|
||||
config.RPCHoldTimeout = 7 * time.Second
|
||||
run := func(t *testing.T, tc testCase) {
|
||||
require.Equal(t, tc.expected, canRetry(tc.req, tc.err))
|
||||
timeOutValue := tc.timeout
|
||||
if timeOutValue.IsZero() {
|
||||
timeOutValue = now
|
||||
}
|
||||
require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config))
|
||||
}
|
||||
|
||||
var testCases = []testCase{
|
||||
|
@ -990,6 +997,46 @@ func TestCanRetry(t *testing.T) {
|
|||
err: io.EOF,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "EOF error",
|
||||
req: &structs.DCSpecificRequest{},
|
||||
err: io.EOF,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "HasTimedOut implementation with no error",
|
||||
req: &structs.DCSpecificRequest{},
|
||||
err: nil,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "HasTimedOut implementation timedOut with no error",
|
||||
req: &structs.DCSpecificRequest{},
|
||||
err: nil,
|
||||
expected: false,
|
||||
timeout: now.Add(-(config.RPCHoldTimeout + time.Second)),
|
||||
},
|
||||
{
|
||||
name: "HasTimedOut implementation timedOut (with EOF error)",
|
||||
req: &structs.DCSpecificRequest{},
|
||||
err: io.EOF,
|
||||
expected: false,
|
||||
timeout: now.Add(-(config.RPCHoldTimeout + time.Second)),
|
||||
},
|
||||
{
|
||||
name: "HasTimedOut implementation timedOut blocking call",
|
||||
req: &structs.DCSpecificRequest{QueryOptions: structs.QueryOptions{MaxQueryTime: 300, MinQueryIndex: 1}},
|
||||
err: nil,
|
||||
expected: false,
|
||||
timeout: now.Add(-(config.RPCHoldTimeout + config.MaxQueryTime + time.Second)),
|
||||
},
|
||||
{
|
||||
name: "HasTimedOut implementation timedOut blocking call (MaxQueryTime not set)",
|
||||
req: &structs.DCSpecificRequest{QueryOptions: structs.QueryOptions{MinQueryIndex: 1}},
|
||||
err: nil,
|
||||
expected: false,
|
||||
timeout: now.Add(-(config.RPCHoldTimeout + config.MaxQueryTime + time.Second)),
|
||||
},
|
||||
{
|
||||
name: "EOF on write request",
|
||||
err: io.EOF,
|
||||
|
@ -1011,3 +1058,7 @@ type isReadRequest struct {
|
|||
func (r isReadRequest) IsRead() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -159,6 +159,12 @@ const (
|
|||
// we multiply by time.Second
|
||||
lockDelayMinThreshold = 1000
|
||||
|
||||
// JitterFraction is a the limit to the amount of jitter we apply
|
||||
// to a user specified MaxQueryTime. We divide the specified time by
|
||||
// the fraction. So 16 == 6.25% limit of jitter. This same fraction
|
||||
// is applied to the RPCHoldTimeout
|
||||
JitterFraction = 16
|
||||
|
||||
// WildcardSpecifier is the string which should be used for specifying a wildcard
|
||||
// The exact semantics of the wildcard is left up to the code where its used.
|
||||
WildcardSpecifier = "*"
|
||||
|
@ -193,6 +199,7 @@ type RPCInfo interface {
|
|||
AllowStaleRead() bool
|
||||
TokenSecret() string
|
||||
SetTokenSecret(string)
|
||||
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool
|
||||
}
|
||||
|
||||
// QueryOptions is used to specify various flags for read queries
|
||||
|
@ -291,6 +298,20 @@ func (q *QueryOptions) SetTokenSecret(s string) {
|
|||
q.Token = s
|
||||
}
|
||||
|
||||
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
|
||||
if q.MinQueryIndex > 0 {
|
||||
if q.MaxQueryTime > maxQueryTime {
|
||||
q.MaxQueryTime = maxQueryTime
|
||||
} else if q.MaxQueryTime <= 0 {
|
||||
q.MaxQueryTime = defaultQueryTime
|
||||
}
|
||||
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)
|
||||
|
||||
return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout)
|
||||
}
|
||||
return time.Since(start) > rpcHoldTimeout
|
||||
}
|
||||
|
||||
type WriteRequest struct {
|
||||
// Token is the ACL token ID. If not provided, the 'anonymous'
|
||||
// token is assumed for backwards compatibility.
|
||||
|
@ -314,6 +335,10 @@ func (w *WriteRequest) SetTokenSecret(s string) {
|
|||
w.Token = s
|
||||
}
|
||||
|
||||
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
|
||||
return time.Since(start) > rpcHoldTimeout
|
||||
}
|
||||
|
||||
// QueryMeta allows a query response to include potentially
|
||||
// useful metadata about a query
|
||||
type QueryMeta struct {
|
||||
|
|
|
@ -16,6 +16,9 @@ var yamuxSessionShutdown = yamux.ErrSessionShutdown.Error()
|
|||
// IsErrEOF returns true if we get an EOF error from the socket itself, or
|
||||
// an EOF equivalent error from yamux.
|
||||
func IsErrEOF(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if errors.Is(err, io.EOF) {
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package pbautoconf
|
||||
|
||||
import "time"
|
||||
|
||||
func (req *AutoConfigRequest) RequestDatacenter() string {
|
||||
return req.Datacenter
|
||||
}
|
||||
|
@ -19,3 +21,7 @@ func (req *AutoConfigRequest) TokenSecret() string {
|
|||
func (req *AutoConfigRequest) SetTokenSecret(token string) {
|
||||
req.ConsulToken = token
|
||||
}
|
||||
|
||||
func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
|
||||
return time.Since(start) > rpcHoldTimeout
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue