Browse Source

increase protobuf size limit for cluster peering (#14976)

pull/14987/head
malizz 2 years ago committed by GitHub
parent
commit
b0b0cbb8ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      agent/consul/leader_peering.go
  2. 37
      agent/consul/leader_peering_test.go
  3. 1
      agent/grpc-external/server.go
  4. 5
      agent/grpc-external/services/peerstream/stream_resources.go
  5. 2
      website/content/docs/connect/cluster-peering/index.mdx

33
agent/consul/leader_peering.go

@ -338,9 +338,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
// Try a new address on each iteration by advancing the ring buffer on errors.
addr := <-nextServerAddr
logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(streamCtx, addr,
// TODO(peering): use a grpc.WithStatsHandler here?)
opts := []grpc.DialOption{
tlsOption,
// For keep alive parameters there is a larger comment in ClientConnPool.dial about that.
grpc.WithKeepaliveParams(keepalive.ClientParameters{
@ -349,7 +347,12 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
// send keepalive pings even if there is no active streams
PermitWithoutStream: true,
}),
)
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)),
}
// TODO(peering): use a grpc.WithStatsHandler here?)
logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(streamCtx, addr, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
@ -397,10 +400,14 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}, func(err error) {
// TODO(peering): why are we using TrackSendError here? This could also be a receive error.
streamStatus.TrackSendError(err.Error())
if isFailedPreconditionErr(err) {
if isErrCode(err, codes.FailedPrecondition) {
logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting",
"error", err)
return
} else if isErrCode(err, codes.ResourceExhausted) {
logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting",
"error", err)
return
}
logger.Error("error managing peering stream", "error", err)
}, peeringRetryTimeout)
@ -680,7 +687,7 @@ func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn fu
// quickly. For example in the case of connecting with a follower through a load balancer, we just need to retry
// until our request lands on a leader.
func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration {
if loopErr != nil && isFailedPreconditionErr(loopErr) {
if loopErr != nil && isErrCode(loopErr, codes.FailedPrecondition) {
// Wait a constant time for the first number of retries.
if failedAttempts <= maxFastConnRetries {
return fastConnRetryTimeout
@ -697,6 +704,11 @@ func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration {
return ms
}
// if the message sent is too large probably should not retry at all
if loopErr != nil && isErrCode(loopErr, codes.ResourceExhausted) {
return maxFastRetryBackoff
}
// Else we go with the default backoff from retryLoopBackoff.
if (1 << failedAttempts) < maxRetryBackoff {
return (1 << failedAttempts) * time.Second
@ -704,23 +716,22 @@ func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration {
return time.Duration(maxRetryBackoff) * time.Second
}
// isFailedPreconditionErr returns true if err is a gRPC error with code FailedPrecondition.
func isFailedPreconditionErr(err error) bool {
// isErrCode returns true if err is a gRPC error with given error code.
func isErrCode(err error, code codes.Code) bool {
if err == nil {
return false
}
// Handle wrapped errors, since status.FromError does a naive assertion.
var statusErr interface {
GRPCStatus() *grpcstatus.Status
}
if errors.As(err, &statusErr) {
return statusErr.GRPCStatus().Code() == codes.FailedPrecondition
return statusErr.GRPCStatus().Code() == code
}
grpcErr, ok := grpcstatus.FromError(err)
if !ok {
return false
}
return grpcErr.Code() == codes.FailedPrecondition
return grpcErr.Code() == code
}

37
agent/consul/leader_peering_test.go

@ -1686,14 +1686,35 @@ func TestLeader_Peering_retryLoopBackoffPeering_cancelContext(t *testing.T) {
}, allErrors)
}
func Test_isFailedPreconditionErr(t *testing.T) {
st := grpcstatus.New(codes.FailedPrecondition, "cannot establish a peering stream on a follower node")
err := st.Err()
assert.True(t, isFailedPreconditionErr(err))
// test that wrapped errors are checked correctly
werr := fmt.Errorf("wrapped: %w", err)
assert.True(t, isFailedPreconditionErr(werr))
func Test_isErrCode(t *testing.T) {
tests := []struct {
name string
expectedCode codes.Code
}{
{
name: "cannot establish a peering stream on a follower node",
expectedCode: codes.FailedPrecondition,
},
{
name: "received message larger than max ",
expectedCode: codes.ResourceExhausted,
},
{
name: "deadline exceeded",
expectedCode: codes.DeadlineExceeded,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
st := grpcstatus.New(tc.expectedCode, tc.name)
err := st.Err()
assert.True(t, isErrCode(err, tc.expectedCode))
// test that wrapped errors are checked correctly
werr := fmt.Errorf("wrapped: %w", err)
assert.True(t, isErrCode(werr, tc.expectedCode))
})
}
}
func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) {

1
agent/grpc-external/server.go vendored

@ -29,6 +29,7 @@ func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics) *grpc
opts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(2048),
grpc.MaxRecvMsgSize(50 * 1024 * 1024),
grpc.StatsHandler(agentmiddleware.NewStatsHandler(metricsObj, metricsLabels)),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.

5
agent/grpc-external/services/peerstream/stream_resources.go vendored

@ -367,6 +367,11 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// resources, not request/ack messages.
if msg.GetResponse() != nil {
if err != nil {
if id := msg.GetResponse().GetResourceID(); id != "" {
logger.Error("failed to send resource", "resourceID", id, "error", err)
status.TrackSendError(err.Error())
return nil
}
status.TrackSendError(err.Error())
} else {
status.TrackSendSuccess()

2
website/content/docs/connect/cluster-peering/index.mdx

@ -55,7 +55,7 @@ The cluster peering beta release includes the following features and functionali
Not all features and functionality are available in the beta release. In particular, consider the following technical constraints:
- Mesh gateways for _server to server traffic_ are not available.
- Services with node, instance, and check definitions totaling more than 4MB cannot be exported to a peer.
- Services with node, instance, and check definitions totaling more than 50MB cannot be exported to a peer.
- The `service-splitter` and `service-router` configuration entry kinds cannot directly target a peer. To split or route traffic to a service instance on a peer, you must supplement your desired dynamic routing definition with a `service-resolver` config entry on the dialing cluster. Refer to [L7 traffic management between peers](/docs/connect/cluster-peering/create-manage-peering#L7-traffic) for more information.
- The `consul intention` CLI command is not supported. To manage intentions that specify services in peered clusters, use [configuration entries](/docs/connect/config-entries/service-intentions).
- Accessing key/value stores across peers is not supported.

Loading…
Cancel
Save