@ -7,6 +7,7 @@ import (
"errors"
"errors"
"fmt"
"fmt"
"io"
"io"
"math"
"net"
"net"
"strings"
"strings"
"sync/atomic"
"sync/atomic"
@ -556,6 +557,21 @@ func (c *limitedConn) Read(b []byte) (n int, err error) {
return c . lr . Read ( b )
return c . lr . Read ( b )
}
}
func getWaitTime ( rpcHoldTimeout time . Duration , retryCount int ) time . Duration {
const backoffMultiplier = 2.0
rpcHoldTimeoutInMilli := int ( rpcHoldTimeout . Milliseconds ( ) )
initialBackoffInMilli := rpcHoldTimeoutInMilli / structs . JitterFraction
if initialBackoffInMilli < 1 {
initialBackoffInMilli = 1
}
waitTimeInMilli := initialBackoffInMilli * int ( math . Pow ( backoffMultiplier , float64 ( retryCount - 1 ) ) )
return time . Duration ( waitTimeInMilli ) * time . Millisecond
}
// canRetry returns true if the request and error indicate that a retry is safe.
// canRetry returns true if the request and error indicate that a retry is safe.
func canRetry ( info structs . RPCInfo , err error , start time . Time , config * Config , retryableMessages [ ] error ) bool {
func canRetry ( info structs . RPCInfo , err error , start time . Time , config * Config , retryableMessages [ ] error ) bool {
if info != nil {
if info != nil {
@ -714,7 +730,10 @@ func (s *Server) canServeReadRequest(info structs.RPCInfo) bool {
// See the comment for forwardRPC for more details.
// See the comment for forwardRPC for more details.
func ( s * Server ) forwardRequestToLeader ( info structs . RPCInfo , forwardToLeader func ( leader * metadata . Server ) error ) ( handled bool , err error ) {
func ( s * Server ) forwardRequestToLeader ( info structs . RPCInfo , forwardToLeader func ( leader * metadata . Server ) error ) ( handled bool , err error ) {
firstCheck := time . Now ( )
firstCheck := time . Now ( )
retryCount := 0
previousJitter := time . Duration ( 0 )
CHECK_LEADER :
CHECK_LEADER :
retryCount ++
// Fail fast if we are in the process of leaving
// Fail fast if we are in the process of leaving
select {
select {
case <- s . leaveCh :
case <- s . leaveCh :
@ -747,7 +766,9 @@ CHECK_LEADER:
if retry := canRetry ( info , rpcErr , firstCheck , s . config , retryableMessages ) ; retry {
if retry := canRetry ( info , rpcErr , firstCheck , s . config , retryableMessages ) ; retry {
// Gate the request until there is a leader
// Gate the request until there is a leader
jitter := lib . RandomStagger ( s . config . RPCHoldTimeout / structs . JitterFraction )
jitter := lib . RandomStaggerWithRange ( previousJitter , getWaitTime ( s . config . RPCHoldTimeout , retryCount ) )
previousJitter = jitter
select {
select {
case <- time . After ( jitter ) :
case <- time . After ( jitter ) :
goto CHECK_LEADER
goto CHECK_LEADER