Backport of [OSS] fix: wait and try longer to peer through mesh gw into release/1.14.x (#15329)

This pull request was automerged via backport-assistant
pull/15350/head
hc-github-team-consul-core 2022-11-10 13:54:47 -05:00 committed by GitHub
parent 6645e02c6c
commit 8c2e5e26ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 44 deletions

View File

@ -1956,7 +1956,8 @@ func Test_Leader_PeeringSync_PeerThroughMeshGateways_ServerFallBack(t *testing.T
})) }))
// Create a peering at dialer by establishing a peering with acceptor's token // Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) // 7 second = 1 second wait + 3 second gw retry + 3 second token addr retry
ctx, cancel = context.WithTimeout(context.Background(), 7*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),

View File

@ -106,27 +106,38 @@ func (b *PeeringBackend) GetLocalServerAddresses() ([]string, error) {
} }
// GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers, // GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers,
// a boolean indicating whether mesh gateways are present, and an optional error. // an optional buffer of just gateway addresses, and an optional error.
// The resulting ring buffer is front-loaded with the local mesh gateway addresses if they are present. // The resulting ring buffer is front-loaded with the local mesh gateway addresses if they are present.
func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) { func (b *PeeringBackend) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, *ring.Ring, error) {
newRing, err := b.fetchPeerServerAddresses(ws, peerID) newRing, err := b.fetchPeerServerAddresses(ws, peerID)
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err) return nil, nil, fmt.Errorf("failed to refresh peer server addresses, will continue to use initial addresses: %w", err)
} }
gatewayRing, err := b.maybeFetchGatewayAddresses(ws) gatewayRing, err := b.maybeFetchGatewayAddresses(ws)
if err != nil { if err != nil {
// If we couldn't fetch the mesh gateway addresses we fall back to dialing the remote server addresses. // If we couldn't fetch the mesh gateway addresses we fall back to dialing the remote server addresses.
logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly: %w", "error", err) logger.Warn("failed to refresh local gateway addresses, will attempt to dial peer directly", "error", err)
return newRing, false, nil return newRing, nil, nil
} }
if gatewayRing != nil { if gatewayRing != nil {
// The ordering is important here. We always want to start with the mesh gateway // The ordering is important here. We always want to start with the mesh gateway
// addresses and fallback to the remote addresses, so we append the server addresses // addresses and fallback to the remote addresses, so we append the server addresses
// in newRing to gatewayRing. // in newRing to gatewayRing. We also need a new ring to prevent mixing up pointers
newRing = gatewayRing.Link(newRing) // with the gateway-only buffer
compositeRing := ring.New(gatewayRing.Len() + newRing.Len())
gatewayRing.Do(func(s any) {
compositeRing.Value = s.(string)
compositeRing = compositeRing.Next()
})
newRing.Do(func(s any) {
compositeRing.Value = s.(string)
compositeRing = compositeRing.Next()
})
newRing = compositeRing
} }
return newRing, gatewayRing != nil, nil return newRing, gatewayRing, nil
} }
// fetchPeerServerAddresses will return a ring buffer with the latest peer server addresses. // fetchPeerServerAddresses will return a ring buffer with the latest peer server addresses.

View File

@ -198,7 +198,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
type expectation struct { type expectation struct {
addrs []string addrs []string
haveGateways bool gatewayAddrs []string
err string err string
} }
@ -214,14 +214,25 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
tc.setup(srv.fsm.State()) tc.setup(srv.fsm.State())
} }
ring, haveGateways, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID) ring, gatewayRing, err := backend.GetDialAddresses(testutil.Logger(t), nil, tc.peerID)
if tc.expect.err != "" { if tc.expect.err != "" {
testutil.RequireErrorContains(t, err, tc.expect.err) testutil.RequireErrorContains(t, err, tc.expect.err)
return return
} }
require.Equal(t, tc.expect.haveGateways, haveGateways) require.Equal(t, len(tc.expect.gatewayAddrs) > 0, gatewayRing != nil)
require.NotNil(t, ring) require.NotNil(t, ring)
if len(tc.expect.gatewayAddrs) > 0 {
var addrs []string
gatewayRing.Do(func(value any) {
addr, ok := value.(string)
require.True(t, ok)
addrs = append(addrs, addr)
})
require.Equal(t, tc.expect.gatewayAddrs, addrs)
}
var addrs []string var addrs []string
ring.Do(func(value any) { ring.Do(func(value any) {
addr, ok := value.(string) addr, ok := value.(string)
@ -275,8 +286,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
}, },
peerID: dialerPeerID, peerID: dialerPeerID,
expect: expectation{ expect: expectation{
haveGateways: false, addrs: []string{"5.6.7.8:8502"},
addrs: []string{"5.6.7.8:8502"},
}, },
}, },
{ {
@ -294,8 +304,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
}, },
peerID: dialerPeerID, peerID: dialerPeerID,
expect: expectation{ expect: expectation{
haveGateways: false, addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
}, },
}, },
{ {
@ -309,8 +318,7 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
}, },
peerID: dialerPeerID, peerID: dialerPeerID,
expect: expectation{ expect: expectation{
haveGateways: false, addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
}, },
}, },
{ {
@ -326,8 +334,6 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
}, },
peerID: dialerPeerID, peerID: dialerPeerID,
expect: expectation{ expect: expectation{
haveGateways: false,
// Fall back to remote server addresses // Fall back to remote server addresses
addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"}, addrs: []string{"1.2.3.4:8502", "2.3.4.5:8503"},
}, },
@ -372,10 +378,9 @@ func TestPeeringBackend_GetDialAddresses(t *testing.T) {
}, },
peerID: dialerPeerID, peerID: dialerPeerID,
expect: expectation{ expect: expectation{
haveGateways: true,
// Gateways come first, and we use their LAN addresses since this is for outbound communication. // Gateways come first, and we use their LAN addresses since this is for outbound communication.
addrs: []string{"6.7.8.9:8443", "5.6.7.8:8443", "1.2.3.4:8502", "2.3.4.5:8503"}, addrs: []string{"5.6.7.8:8443", "6.7.8.9:8443", "1.2.3.4:8502", "2.3.4.5:8503"},
gatewayAddrs: []string{"5.6.7.8:8443", "6.7.8.9:8443"},
}, },
}, },
{ {

View File

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
@ -18,6 +17,8 @@ import (
grpcstatus "google.golang.org/grpc/status" grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
@ -41,10 +42,10 @@ var (
const ( const (
// meshGatewayWait is the initial wait on calls to exchange a secret with a peer when dialing through a gateway. // meshGatewayWait is the initial wait on calls to exchange a secret with a peer when dialing through a gateway.
// This wait provides some time for the first gateway address to configure a route to the peer servers. // This wait provides some time for the first gateway address to configure a route to the peer servers.
// Why 350ms? That is roughly the p50 latency we observed in a scale test for proxy config propagation: // This study shows latency distribution https://www.hashicorp.com/cgsb.
// https://www.hashicorp.com/cgsb // With 1s we cover ~p96, then we initiate the 3-second retry loop.
meshGatewayWait = 350 * time.Millisecond meshGatewayWait = 1 * time.Second
establishmentTimeout = 5 * time.Second establishmentTimeout = 3 * time.Second
) )
// errPeeringInvalidServerAddress is returned when an establish request contains // errPeeringInvalidServerAddress is returned when an establish request contains
@ -140,10 +141,10 @@ type Backend interface {
DecodeToken([]byte) (*structs.PeeringToken, error) DecodeToken([]byte) (*structs.PeeringToken, error)
// GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers, // GetDialAddresses returns: the addresses to cycle through when dialing a peer's servers,
// a boolean indicating whether mesh gateways are present, and an optional error. // an optional buffer of just gateway addresses, and an optional error.
// The resulting ring buffer is front-loaded with the local mesh gateway addresses if the local // The resulting ring buffer is front-loaded with the local mesh gateway addresses if the local
// datacenter is configured to dial through mesh gateways. // datacenter is configured to dial through mesh gateways.
GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, bool, error) GetDialAddresses(logger hclog.Logger, ws memdb.WatchSet, peerID string) (*ring.Ring, *ring.Ring, error)
EnterpriseCheckPartitions(partition string) error EnterpriseCheckPartitions(partition string) error
@ -516,11 +517,28 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering,
return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err) return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err)
} }
ringBuf, usingGateways, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID) allAddrs, gatewayAddrs, err := s.Backend.GetDialAddresses(s.Logger, nil, peering.ID)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get addresses to dial peer: %w", err) return nil, fmt.Errorf("failed to get addresses to dial peer: %w", err)
} }
if gatewayAddrs != nil {
// If we are dialing through local gateways we sleep before issuing the first request.
// This gives the local gateways some time to configure a route to the peer servers.
time.Sleep(meshGatewayWait)
// Exclusively try
resp, _ := retryExchange(ctx, &req, gatewayAddrs, tlsOption)
if resp != nil {
return resp, nil
}
}
return retryExchange(ctx, &req, allAddrs, tlsOption)
}
// retryExchange attempts a secret exchange in a retry loop, taking a new address from the ring buffer on each iteration
func retryExchange(ctx context.Context, req *pbpeerstream.ExchangeSecretRequest, ringBuf *ring.Ring, tlsOption grpc.DialOption) (*pbpeerstream.ExchangeSecretResponse, error) {
var ( var (
resp *pbpeerstream.ExchangeSecretResponse resp *pbpeerstream.ExchangeSecretResponse
dialErrors error dialErrors error
@ -529,12 +547,6 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering,
retryWait := 150 * time.Millisecond retryWait := 150 * time.Millisecond
jitter := retry.NewJitter(25) jitter := retry.NewJitter(25)
if usingGateways {
// If we are dialing through local gateways we sleep before issuing the first request.
// This gives the local gateways some time to configure a route to the peer servers.
time.Sleep(meshGatewayWait)
}
retryCtx, cancel := context.WithTimeout(ctx, establishmentTimeout) retryCtx, cancel := context.WithTimeout(ctx, establishmentTimeout)
defer cancel() defer cancel()
@ -553,7 +565,7 @@ func (s *Server) exchangeSecret(ctx context.Context, peering *pbpeering.Peering,
defer conn.Close() defer conn.Close()
client := pbpeerstream.NewPeerStreamServiceClient(conn) client := pbpeerstream.NewPeerStreamServiceClient(conn)
resp, err = client.ExchangeSecret(ctx, &req) resp, err = client.ExchangeSecret(ctx, req)
// If we got a permission denied error that means out establishment secret is invalid, so we do not retry. // If we got a permission denied error that means out establishment secret is invalid, so we do not retry.
grpcErr, ok := grpcstatus.FromError(err) grpcErr, ok := grpcstatus.FromError(err)

View File

@ -514,7 +514,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
require.Error(t, err) require.Error(t, err)
testutil.RequireErrorContains(t, err, "connection refused") testutil.RequireErrorContains(t, err, "connection refused")
require.Greater(t, time.Since(start), 5*time.Second) require.Greater(t, time.Since(start), 3*time.Second)
}) })
testutil.RunStep(t, "peering can be established from token", func(t *testing.T) { testutil.RunStep(t, "peering can be established from token", func(t *testing.T) {
@ -528,7 +528,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
// Capture peering token for re-use later // Capture peering token for re-use later
peeringToken = tokenResp.PeeringToken peeringToken = tokenResp.PeeringToken
// The context timeout is short, it checks that we do not wait the 350ms that we do when peering through mesh gateways // The context timeout is short, it checks that we do not wait the 1s that we do when peering through mesh gateways
ctx, cancel = context.WithTimeout(context.Background(), 300*time.Millisecond) ctx, cancel = context.WithTimeout(context.Background(), 300*time.Millisecond)
t.Cleanup(cancel) t.Cleanup(cancel)
@ -585,7 +585,7 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
}, },
})) }))
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel = context.WithTimeout(context.Background(), 6*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
// Call to establish should succeed when we fall back to remote server address. // Call to establish should succeed when we fall back to remote server address.
@ -630,7 +630,8 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
}, },
})) }))
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) // Context is 1s sleep + 3s retry loop. Any longer and we're trying the remote gateway
ctx, cancel = context.WithTimeout(context.Background(), 4*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
start := time.Now() start := time.Now()
@ -642,8 +643,8 @@ func TestPeeringService_Establish_ThroughMeshGateway(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
// Dialing through a gateway is preceded by a mandatory 350ms sleep. // Dialing through a gateway is preceded by a mandatory 1s sleep.
require.Greater(t, time.Since(start), 350*time.Millisecond) require.Greater(t, time.Since(start), 1*time.Second)
// target.called is true when the tcproxy's conn handler was invoked. // target.called is true when the tcproxy's conn handler was invoked.
// This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway. // This lets us know that the "Establish" success flowed through the proxy masquerading as a gateway.