Finish up cluster peering failover (#14396)

pull/14399/head
Eric Haberkorn 2022-08-30 11:46:34 -04:00 committed by GitHub
parent 70bb6a2abd
commit 3726a0ab7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1298 additions and 184 deletions

3
.changelog/14396.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
peering: Add support to failover to services running on cluster peers.
```

View File

@ -280,16 +280,6 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
}
snap.Roots = roots
case strings.HasPrefix(u.CorrelationID, peerTrustBundleIDPrefix):
resp, ok := u.Result.(*pbpeering.TrustBundleReadResponse)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix)
if resp.Bundle != nil {
snap.ConnectProxy.UpstreamPeerTrustBundles.Set(peer, resp.Bundle)
}
case u.CorrelationID == peeringTrustBundlesWatchID:
resp, ok := u.Result.(*pbpeering.TrustBundleListByServiceResponse)
if !ok {
@ -369,6 +359,17 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
// Clean up data
//
peeredChainTargets := make(map[UpstreamID]struct{})
for _, discoChain := range snap.ConnectProxy.DiscoveryChain {
for _, target := range discoChain.Targets {
if target.Peer == "" {
continue
}
uid := NewUpstreamIDFromTargetID(target.ID)
peeredChainTargets[uid] = struct{}{}
}
}
validPeerNames := make(map[string]struct{})
// Iterate through all known endpoints and remove references to upstream IDs that weren't in the update
@ -383,6 +384,11 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
validPeerNames[uid.Peer] = struct{}{}
return true
}
// Peered upstream came from a discovery chain target
if _, ok := peeredChainTargets[uid]; ok {
validPeerNames[uid.Peer] = struct{}{}
return true
}
snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(uid)
return true
})
@ -463,8 +469,14 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
continue
}
if _, ok := seenUpstreams[uid]; !ok {
for _, cancelFn := range targets {
for targetID, cancelFn := range targets {
cancelFn()
targetUID := NewUpstreamIDFromTargetID(targetID)
if targetUID.Peer != "" {
snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(targetUID)
snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer)
}
}
delete(snap.ConnectProxy.WatchedUpstreams, uid)
}

View File

@ -5,7 +5,9 @@ import (
"fmt"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
)
type handlerIngressGateway struct {
@ -66,6 +68,9 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot,
snap.IngressGateway.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc)
snap.IngressGateway.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
snap.IngressGateway.Listeners = make(map[IngressListenerKey]structs.IngressListener)
snap.IngressGateway.UpstreamPeerTrustBundles = watch.NewMap[string, *pbpeering.PeeringTrustBundle]()
snap.IngressGateway.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
snap.IngressGateway.PeerUpstreamEndpointsUseHostnames = make(map[UpstreamID]struct{})
return snap, nil
}
@ -152,6 +157,12 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u UpdateEvent,
delete(snap.IngressGateway.WatchedUpstreams[uid], targetID)
delete(snap.IngressGateway.WatchedUpstreamEndpoints[uid], targetID)
cancelUpstreamFn()
targetUID := NewUpstreamIDFromTargetID(targetID)
if targetUID.Peer != "" {
snap.IngressGateway.PeerUpstreamEndpoints.CancelWatch(targetUID)
snap.IngressGateway.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer)
}
}
cancelFn()

View File

@ -814,6 +814,18 @@ func (s *ConfigSnapshot) MeshConfigTLSOutgoing() *structs.MeshDirectionalTLSConf
return mesh.TLS.Outgoing
}
func (s *ConfigSnapshot) ToConfigSnapshotUpstreams() (*ConfigSnapshotUpstreams, error) {
switch s.Kind {
case structs.ServiceKindConnectProxy:
return &s.ConnectProxy.ConfigSnapshotUpstreams, nil
case structs.ServiceKindIngressGateway:
return &s.IngressGateway.ConfigSnapshotUpstreams, nil
default:
// This is a coherence check and should never fail
return nil, fmt.Errorf("No upstream snapshot for gateway mode %q", s.Kind)
}
}
func (u *ConfigSnapshotUpstreams) UpstreamPeerMeta(uid UpstreamID) structs.PeeringServiceMeta {
nodes, _ := u.PeerUpstreamEndpoints.Get(uid)
if len(nodes) == 0 {

View File

@ -493,6 +493,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Mode: structs.MeshGatewayModeNone,
},
},
structs.Upstream{
DestinationType: structs.UpstreamDestTypeService,
DestinationName: "api-failover-to-peer",
LocalBindPort: 10007,
},
structs.Upstream{
DestinationType: structs.UpstreamDestTypeService,
DestinationName: "api-dc2",
@ -552,6 +557,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Mode: structs.MeshGatewayModeNone,
},
}),
fmt.Sprintf("discovery-chain:%s-failover-to-peer", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api-failover-to-peer",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
EvaluateInPartition: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{
Mode: meshGatewayProxyConfigValue,
},
}),
fmt.Sprintf("discovery-chain:%s-dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api-dc2",
EvaluateInDatacenter: "dc1",
@ -639,6 +654,26 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
Err: nil,
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-to-peer", apiUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-to-peer", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "api-failover-to-peer",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Targets: []structs.ServiceResolverFailoverTarget{
{Peer: "cluster-01"},
},
},
},
}),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
@ -646,15 +681,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)
require.Len(t, snap.ConnectProxy.DiscoveryChain, 5, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 5, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 5, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 5, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 5, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Len(t, snap.ConnectProxy.DiscoveryChain, 6, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 6, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 6, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 6, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 6, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
require.Equal(t, 1, snap.ConnectProxy.ConfigSnapshotUpstreams.PeerUpstreamEndpoints.Len())
require.Equal(t, 1, snap.ConnectProxy.ConfigSnapshotUpstreams.UpstreamPeerTrustBundles.Len())
require.True(t, snap.ConnectProxy.IntentionsSet)
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
require.True(t, snap.ConnectProxy.MeshConfigSet)
@ -667,6 +705,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-remote", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-local", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-direct", "", "dc2", true),
upstreamPeerWatchIDPrefix + fmt.Sprintf("%s-failover-to-peer?peer=cluster-01", apiUID.String()): genVerifyServiceSpecificPeeredRequest("api-failover-to-peer", "", "", "cluster-01", true),
fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc2"),
fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc1"),
},
@ -676,15 +715,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)
require.Len(t, snap.ConnectProxy.DiscoveryChain, 5, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 5, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 5, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 5, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 5, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Len(t, snap.ConnectProxy.DiscoveryChain, 6, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 6, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 6, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 6, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 6, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
require.Equal(t, 1, snap.ConnectProxy.ConfigSnapshotUpstreams.PeerUpstreamEndpoints.Len())
require.Equal(t, 1, snap.ConnectProxy.ConfigSnapshotUpstreams.UpstreamPeerTrustBundles.Len())
require.True(t, snap.ConnectProxy.IntentionsSet)
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
},

View File

@ -280,6 +280,31 @@ func TestUpstreamNodesDC2(t testing.T) structs.CheckServiceNodes {
}
}
func TestUpstreamNodesPeerCluster01(t testing.T) structs.CheckServiceNodes {
peer := "cluster-01"
service := structs.TestNodeServiceWithNameInPeer(t, "web", peer)
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.40.1.1",
PeerName: peer,
},
Service: service,
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.40.1.2",
PeerName: peer,
},
Service: service,
},
}
}
func TestUpstreamNodesInStatusDC2(t testing.T, status string) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
)
func setupTestVariationConfigEntriesAndSnapshot(
@ -72,6 +73,24 @@ func setupTestVariationConfigEntriesAndSnapshot(
Nodes: TestGatewayNodesDC2(t),
},
})
case "failover-to-cluster-peer":
events = append(events, UpdateEvent{
CorrelationID: "peer-trust-bundle:cluster-01",
Result: &pbpeering.TrustBundleReadResponse{
Bundle: &pbpeering.PeeringTrustBundle{
PeerName: "peer1",
TrustDomain: "peer1.domain",
ExportedPartition: "peer1ap",
RootPEMs: []string{"peer1-root-1"},
},
},
})
events = append(events, UpdateEvent{
CorrelationID: "upstream-peer:db?peer=cluster-01",
Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesPeerCluster01(t),
},
})
case "failover-through-double-remote-gateway-triggered":
events = append(events, UpdateEvent{
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
@ -255,6 +274,21 @@ func setupTestVariationDiscoveryChain(
},
},
)
case "failover-to-cluster-peer":
entries = append(entries,
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
ConnectTimeout: 33 * time.Second,
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Targets: []structs.ServiceResolverFailoverTarget{
{Peer: "cluster-01"},
},
},
},
},
)
case "failover-through-double-remote-gateway-triggered":
fallthrough
case "failover-through-double-remote-gateway":

View File

@ -9,7 +9,9 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
)
type handlerUpstreams struct {
@ -21,9 +23,10 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
upstreamsSnapshot := &snap.ConnectProxy.ConfigSnapshotUpstreams
if snap.Kind == structs.ServiceKindIngressGateway {
upstreamsSnapshot = &snap.IngressGateway.ConfigSnapshotUpstreams
upstreamsSnapshot, err := snap.ToConfigSnapshotUpstreams()
if err != nil {
return err
}
switch {
@ -98,19 +101,16 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
uid := UpstreamIDFromString(uidString)
filteredNodes := hostnameEndpoints(
s.logger,
GatewayKey{ /*empty so it never matches*/ },
resp.Nodes,
)
if len(filteredNodes) > 0 {
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, filteredNodes); set {
upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{}
}
} else {
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, resp.Nodes); set {
delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid)
}
s.setPeerEndpoints(upstreamsSnapshot, uid, resp.Nodes)
case strings.HasPrefix(u.CorrelationID, peerTrustBundleIDPrefix):
resp, ok := u.Result.(*pbpeering.TrustBundleReadResponse)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix)
if resp.Bundle != nil {
upstreamsSnapshot.UpstreamPeerTrustBundles.Set(peer, resp.Bundle)
}
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
@ -216,6 +216,23 @@ func removeColonPrefix(s string) (string, string, bool) {
return s[0:idx], s[idx+1:], true
}
func (s *handlerUpstreams) setPeerEndpoints(upstreamsSnapshot *ConfigSnapshotUpstreams, uid UpstreamID, nodes structs.CheckServiceNodes) {
filteredNodes := hostnameEndpoints(
s.logger,
GatewayKey{ /*empty so it never matches*/ },
nodes,
)
if len(filteredNodes) > 0 {
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, filteredNodes); set {
upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{}
}
} else {
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, nodes); set {
delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid)
}
}
}
func (s *handlerUpstreams) resetWatchesFromChain(
ctx context.Context,
uid UpstreamID,
@ -255,6 +272,12 @@ func (s *handlerUpstreams) resetWatchesFromChain(
delete(snap.WatchedUpstreams[uid], targetID)
delete(snap.WatchedUpstreamEndpoints[uid], targetID)
cancelFn()
targetUID := NewUpstreamIDFromTargetID(targetID)
if targetUID.Peer != "" {
snap.PeerUpstreamEndpoints.CancelWatch(targetUID)
snap.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer)
}
}
var (
@ -274,6 +297,7 @@ func (s *handlerUpstreams) resetWatchesFromChain(
service: target.Service,
filter: target.Subset.Filter,
datacenter: target.Datacenter,
peer: target.Peer,
entMeta: target.GetEnterpriseMetadata(),
}
err := s.watchUpstreamTarget(ctx, snap, opts)
@ -384,6 +408,7 @@ type targetWatchOpts struct {
service string
filter string
datacenter string
peer string
entMeta *acl.EnterpriseMeta
}
@ -397,11 +422,17 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config
var finalMeta acl.EnterpriseMeta
finalMeta.Merge(opts.entMeta)
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID.String()
uid := opts.upstreamID
correlationID := "upstream-target:" + opts.chainID + ":" + uid.String()
if opts.peer != "" {
uid = NewUpstreamIDFromTargetID(opts.chainID)
correlationID = upstreamPeerWatchIDPrefix + uid.String()
}
ctx, cancel := context.WithCancel(ctx)
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
PeerName: opts.upstreamID.Peer,
PeerName: opts.peer,
Datacenter: opts.datacenter,
QueryOptions: structs.QueryOptions{
Token: s.token,
@ -422,6 +453,31 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config
}
snap.WatchedUpstreams[opts.upstreamID][opts.chainID] = cancel
if uid.Peer == "" {
return nil
}
if ok := snap.PeerUpstreamEndpoints.IsWatched(uid); !ok {
snap.PeerUpstreamEndpoints.InitWatch(uid, cancel)
}
// Check whether a watch for this peer exists to avoid duplicates.
if ok := snap.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
peerCtx, cancel := context.WithCancel(ctx)
if err := s.dataSources.TrustBundle.Notify(peerCtx, &cachetype.TrustBundleReadRequest{
Request: &pbpeering.TrustBundleReadRequest{
Name: uid.Peer,
Partition: uid.PartitionOrDefault(),
},
QueryOptions: structs.QueryOptions{Token: s.token},
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
cancel()
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
}
snap.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
}
return nil
}

View File

@ -53,6 +53,28 @@ func TestNodeServiceWithName(t testing.T, name string) *NodeService {
}
}
const peerTrustDomain = "1c053652-8512-4373-90cf-5a7f6263a994.consul"
func TestNodeServiceWithNameInPeer(t testing.T, name string, peer string) *NodeService {
service := "payments"
return &NodeService{
Kind: ServiceKindTypical,
Service: name,
Port: 8080,
Connect: ServiceConnect{
PeerMeta: &PeeringServiceMeta{
SNI: []string{
service + ".default.default." + peer + ".external." + peerTrustDomain,
},
SpiffeID: []string{
"spiffe://" + peerTrustDomain + "/ns/default/dc/" + peer + "-dc/svc/" + service,
},
Protocol: "tcp",
},
},
}
}
// TestNodeServiceProxy returns a *NodeService representing a valid
// Connect proxy.
func TestNodeServiceProxy(t testing.T) *NodeService {

View File

@ -88,29 +88,26 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
clusters = append(clusters, passthroughs...)
}
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in endpoints.go
// so that the sets of endpoints generated matches the sets of clusters.
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
getUpstream := func(uid proxycfg.UpstreamID) (*structs.Upstream, bool) {
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstream.HasLocalPortOrSocket()
implicit := cfgSnap.ConnectProxy.IsImplicitUpstream(uid)
if !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
return upstream, !implicit && !explicit
}
chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", uid)
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in endpoints.go
// so that the sets of endpoints generated matches the sets of clusters.
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstream, skip := getUpstream(uid)
if skip {
continue
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(
uid,
upstream,
chain,
chainEndpoints,
cfgSnap,
false,
)
@ -127,18 +124,15 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
// upstream in endpoints.go so that the sets of endpoints generated matches
// the sets of clusters.
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstreamCfg.HasLocalPortOrSocket()
implicit := cfgSnap.ConnectProxy.IsImplicitUpstream(uid)
if !implicit && !explicit {
// Not associated with a known explicit or implicit upstream so it is skipped.
upstream, skip := getUpstream(uid)
if skip {
continue
}
peerMeta := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid)
cfg := s.getAndModifyUpstreamConfigForPeeredListener(uid, upstream, peerMeta)
upstreamCluster, err := s.makeUpstreamClusterForPeerService(uid, upstreamCfg, peerMeta, cfgSnap)
upstreamCluster, err := s.makeUpstreamClusterForPeerService(uid, cfg, peerMeta, cfgSnap)
if err != nil {
return nil, err
}
@ -652,17 +646,10 @@ func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg
return nil, fmt.Errorf("no discovery chain for upstream %q", uid)
}
chainEndpoints, ok := cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", uid)
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(
uid,
&u,
chain,
chainEndpoints,
cfgSnap,
false,
)
@ -745,7 +732,7 @@ func (s *ResourceGenerator) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot, nam
func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
uid proxycfg.UpstreamID,
upstream *structs.Upstream,
upstreamConfig structs.UpstreamConfig,
peerMeta structs.PeeringServiceMeta,
cfgSnap *proxycfg.ConfigSnapshot,
) (*envoy_cluster_v3.Cluster, error) {
@ -754,16 +741,21 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
err error
)
cfg := s.getAndModifyUpstreamConfigForPeeredListener(uid, upstream, peerMeta)
if cfg.EnvoyClusterJSON != "" {
c, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
if upstreamConfig.EnvoyClusterJSON != "" {
c, err = makeClusterFromUserConfig(upstreamConfig.EnvoyClusterJSON)
if err != nil {
return c, err
}
// In the happy path don't return yet as we need to inject TLS config still.
}
tbs, ok := cfgSnap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer)
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
if err != nil {
return c, err
}
tbs, ok := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(uid.Peer)
if !ok {
// this should never happen since we loop through upstreams with
// set trust bundles
@ -772,7 +764,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
clusterName := generatePeeredClusterName(uid, tbs)
outlierDetection := ToOutlierDetection(cfg.PassiveHealthCheck)
outlierDetection := ToOutlierDetection(upstreamConfig.PassiveHealthCheck)
// We can't rely on health checks for services on cluster peers because they
// don't take into account service resolvers, splitters and routers. Setting
// MaxEjectionPercent too 100% gives outlier detection the power to eject the
@ -783,18 +775,18 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
if c == nil {
c = &envoy_cluster_v3.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(time.Duration(cfg.ConnectTimeoutMs) * time.Millisecond),
ConnectTimeout: durationpb.New(time.Duration(upstreamConfig.ConnectTimeoutMs) * time.Millisecond),
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoy_type_v3.Percent{
Value: 0, // disable panic threshold
},
},
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
Thresholds: makeThresholdsIfNeeded(upstreamConfig.Limits),
},
OutlierDetection: outlierDetection,
}
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
if upstreamConfig.Protocol == "http2" || upstreamConfig.Protocol == "grpc" {
if err := s.setHttp2ProtocolOptions(c); err != nil {
return c, err
}
@ -828,12 +820,11 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
false, /*onlyPassing*/
)
}
}
rootPEMs := cfgSnap.RootPEMs()
if uid.Peer != "" {
tbs, _ := cfgSnap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer)
tbs, _ := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(uid.Peer)
rootPEMs = tbs.ConcatenatedRootPEMs()
}
@ -968,7 +959,6 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
uid proxycfg.UpstreamID,
upstream *structs.Upstream,
chain *structs.CompiledDiscoveryChain,
chainEndpoints map[string]structs.CheckServiceNodes,
cfgSnap *proxycfg.ConfigSnapshot,
forMeshGateway bool,
) ([]*envoy_cluster_v3.Cluster, error) {
@ -985,7 +975,15 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
upstreamConfigMap = upstream.Config
}
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap)
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
// Mesh gateways are exempt because upstreamsSnapshot is only used for
// cluster peering targets and transative failover/redirects are unsupported.
if err != nil && !forMeshGateway {
return nil, fmt.Errorf("No upstream snapshot for gateway mode %q", cfgSnap.Kind)
}
rawUpstreamConfig, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
@ -993,13 +991,28 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
"error", err)
}
finalizeUpstreamConfig := func(cfg structs.UpstreamConfig, connectTimeout time.Duration) structs.UpstreamConfig {
if cfg.Protocol == "" {
cfg.Protocol = chain.Protocol
}
if cfg.Protocol == "" {
cfg.Protocol = "tcp"
}
if cfg.ConnectTimeoutMs == 0 {
cfg.ConnectTimeoutMs = int(connectTimeout / time.Millisecond)
}
return cfg
}
var escapeHatchCluster *envoy_cluster_v3.Cluster
if !forMeshGateway {
if cfg.EnvoyClusterJSON != "" {
if rawUpstreamConfig.EnvoyClusterJSON != "" {
if chain.Default {
// If you haven't done anything to setup the discovery chain, then
// you can use the envoy_cluster_json escape hatch.
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
escapeHatchCluster, err = makeClusterFromUserConfig(rawUpstreamConfig.EnvoyClusterJSON)
if err != nil {
return nil, err
}
@ -1013,14 +1026,20 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
var out []*envoy_cluster_v3.Cluster
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
switch {
case node == nil:
return nil, fmt.Errorf("impossible to process a nil node")
case node.Type != structs.DiscoveryGraphNodeTypeResolver:
continue
case node.Resolver == nil:
return nil, fmt.Errorf("impossible to process a non-resolver node")
}
failover := node.Resolver.Failover
// These variables are prefixed with primary to avoid shaddowing bugs.
primaryTargetID := node.Resolver.Target
primaryTarget := chain.Targets[primaryTargetID]
primaryClusterName := CustomizeClusterName(primaryTarget.Name, chain)
upstreamConfig := finalizeUpstreamConfig(rawUpstreamConfig, node.Resolver.ConnectTimeout)
if forMeshGateway {
primaryClusterName = meshGatewayExportedClusterNamePrefix + primaryClusterName
}
@ -1033,22 +1052,38 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
continue
}
type targetClusterOptions struct {
type targetClusterOption struct {
targetID string
clusterName string
}
// Construct the information required to make target clusters. When
// failover is configured, create the aggregate cluster.
var targetClustersOptions []targetClusterOptions
var targetClustersOptions []targetClusterOption
if failover != nil && !forMeshGateway {
var failoverClusterNames []string
for _, tid := range append([]string{primaryTargetID}, failover.Targets...) {
target := chain.Targets[tid]
clusterName := CustomizeClusterName(target.Name, chain)
clusterName := target.Name
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
if targetUID.Peer != "" {
tbs, ok := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(targetUID.Peer)
// We can't generate cluster on peers without the trust bundle. The
// trust bundle should be ready soon.
if !ok {
s.Logger.Debug("peer trust bundle not ready for discovery chain target",
"peer", targetUID.Peer,
"target", tid,
)
continue
}
clusterName = generatePeeredClusterName(targetUID, tbs)
}
clusterName = CustomizeClusterName(clusterName, chain)
clusterName = failoverClusterNamePrefix + clusterName
targetClustersOptions = append(targetClustersOptions, targetClusterOptions{
targetClustersOptions = append(targetClustersOptions, targetClusterOption{
targetID: tid,
clusterName: clusterName,
})
@ -1077,7 +1112,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
out = append(out, c)
} else {
targetClustersOptions = append(targetClustersOptions, targetClusterOptions{
targetClustersOptions = append(targetClustersOptions, targetClusterOption{
targetID: primaryTargetID,
clusterName: primaryClusterName,
})
@ -1096,11 +1131,20 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
Datacenter: target.Datacenter,
Service: target.Service,
}.URI().String()
if uid.Peer != "" {
return nil, fmt.Errorf("impossible to get a peer discovery chain")
targetUID := proxycfg.NewUpstreamIDFromTargetID(targetInfo.targetID)
s.Logger.Debug("generating cluster for", "cluster", targetInfo.clusterName)
if targetUID.Peer != "" {
peerMeta := upstreamsSnapshot.UpstreamPeerMeta(targetUID)
upstreamCluster, err := s.makeUpstreamClusterForPeerService(targetUID, upstreamConfig, peerMeta, cfgSnap)
if err != nil {
continue
}
// Override the cluster name to include the failover-target~ prefix.
upstreamCluster.Name = targetInfo.clusterName
out = append(out, upstreamCluster)
continue
}
s.Logger.Trace("generating cluster for", "cluster", targetInfo.clusterName)
c := &envoy_cluster_v3.Cluster{
Name: targetInfo.clusterName,
AltStatName: targetInfo.clusterName,
@ -1121,9 +1165,9 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
},
// TODO(peering): make circuit breakers or outlier detection work?
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
Thresholds: makeThresholdsIfNeeded(upstreamConfig.Limits),
},
OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
OutlierDetection: ToOutlierDetection(upstreamConfig.PassiveHealthCheck),
}
var lb *structs.LoadBalancer
@ -1134,19 +1178,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
return nil, fmt.Errorf("failed to apply load balancer configuration to cluster %q: %v", targetInfo.clusterName, err)
}
var proto string
if !forMeshGateway {
proto = cfg.Protocol
}
if proto == "" {
proto = chain.Protocol
}
if proto == "" {
proto = "tcp"
}
if proto == "http2" || proto == "grpc" {
if upstreamConfig.Protocol == "http2" || upstreamConfig.Protocol == "grpc" {
if err := s.setHttp2ProtocolOptions(c); err != nil {
return nil, err
}
@ -1155,7 +1187,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
configureTLS := true
if forMeshGateway {
// We only initiate TLS if we're doing an L7 proxy.
configureTLS = structs.IsProtocolHTTPLike(proto)
configureTLS = structs.IsProtocolHTTPLike(upstreamConfig.Protocol)
}
if configureTLS {
@ -1228,7 +1260,6 @@ func (s *ResourceGenerator) makeExportedUpstreamClustersForMeshGateway(cfgSnap *
proxycfg.NewUpstreamIDFromServiceName(svc),
nil,
chain,
nil,
cfgSnap,
true,
)

View File

@ -257,6 +257,12 @@ func TestClustersFromSnapshot(t *testing.T) {
return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover", nil, nil)
},
},
{
name: "connect-proxy-with-chain-and-failover-to-cluster-peer",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-to-cluster-peer", nil, nil)
},
},
{
name: "connect-proxy-with-tcp-chain-failover-through-remote-gateway",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
@ -495,6 +501,13 @@ func TestClustersFromSnapshot(t *testing.T) {
"failover", nil, nil, nil)
},
},
{
name: "ingress-with-chain-and-failover-to-cluster-peer",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp",
"failover-to-cluster-peer", nil, nil, nil)
},
},
{
name: "ingress-with-tcp-chain-failover-through-remote-gateway",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {

View File

@ -50,14 +50,19 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Len()+
len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go
// so that the sets of endpoints generated matches the sets of clusters.
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
getUpstream := func(uid proxycfg.UpstreamID) (*structs.Upstream, bool) {
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstream.HasLocalPortOrSocket()
implicit := cfgSnap.ConnectProxy.IsImplicitUpstream(uid)
if !implicit && !explicit {
return upstream, !implicit && !explicit
}
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go
// so that the sets of endpoints generated matches the sets of clusters.
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstream, skip := getUpstream(uid)
if skip {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
@ -70,6 +75,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
es, err := s.endpointsFromDiscoveryChain(
uid,
chain,
cfgSnap,
cfgSnap.Locality,
upstreamConfigMap,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid],
@ -86,12 +92,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
// upstream in clusters.go so that the sets of endpoints generated matches
// the sets of clusters.
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstreamCfg.HasLocalPortOrSocket()
implicit := cfgSnap.ConnectProxy.IsImplicitUpstream(uid)
if !implicit && !explicit {
// Not associated with a known explicit or implicit upstream so it is skipped.
_, skip := getUpstream(uid)
if skip {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
@ -104,22 +107,14 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
clusterName := generatePeeredClusterName(uid, tbs)
// Also skip peer instances with a hostname as their address. EDS
// cannot resolve hostnames, so we provide them through CDS instead.
if _, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpointsUseHostnames[uid]; ok {
continue
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid)
if err != nil {
return nil, err
}
endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Get(uid)
if ok {
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
proxycfg.GatewayKey{ /*empty so it never matches*/ },
)
resources = append(resources, la)
if loadAssignment != nil {
resources = append(resources, loadAssignment)
}
}
@ -375,6 +370,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf
es, err := s.endpointsFromDiscoveryChain(
uid,
cfgSnap.IngressGateway.DiscoveryChain[uid],
cfgSnap,
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition},
u.Config,
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid],
@ -412,9 +408,38 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint {
}
}
func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, uid proxycfg.UpstreamID) (*envoy_endpoint_v3.ClusterLoadAssignment, error) {
var la *envoy_endpoint_v3.ClusterLoadAssignment
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
if err != nil {
return la, err
}
// Also skip peer instances with a hostname as their address. EDS
// cannot resolve hostnames, so we provide them through CDS instead.
if _, ok := upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid]; ok {
return la, nil
}
endpoints, ok := upstreamsSnapshot.PeerUpstreamEndpoints.Get(uid)
if !ok {
return nil, nil
}
la = makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
proxycfg.GatewayKey{ /*empty so it never matches*/ },
)
return la, nil
}
func (s *ResourceGenerator) endpointsFromDiscoveryChain(
uid proxycfg.UpstreamID,
chain *structs.CompiledDiscoveryChain,
cfgSnap *proxycfg.ConfigSnapshot,
gatewayKey proxycfg.GatewayKey,
upstreamConfigMap map[string]interface{},
upstreamEndpoints map[string]structs.CheckServiceNodes,
@ -432,6 +457,14 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
upstreamConfigMap = make(map[string]interface{}) // TODO:needed?
}
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
// Mesh gateways are exempt because upstreamsSnapshot is only used for
// cluster peering targets and transative failover/redirects are unsupported.
if err != nil && !forMeshGateway {
return nil, fmt.Errorf("No upstream snapshot for gateway mode %q", cfgSnap.Kind)
}
var resources []proto.Message
var escapeHatchCluster *envoy_cluster_v3.Cluster
@ -465,8 +498,15 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
continue
}
primaryTargetID := node.Resolver.Target
failover := node.Resolver.Failover
type targetLoadAssignmentOption struct {
targetID string
clusterName string
}
var targetLoadAssignmentOptions []targetLoadAssignmentOption
var numFailoverTargets int
if failover != nil {
numFailoverTargets = len(failover.Targets)
@ -474,66 +514,84 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
clusterNamePrefix := ""
if numFailoverTargets > 0 && !forMeshGateway {
clusterNamePrefix = failoverClusterNamePrefix
for _, failTargetID := range failover.Targets {
target := chain.Targets[failTargetID]
endpointGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
upstreamEndpoints,
gatewayEndpoints,
failTargetID,
gatewayKey,
forMeshGateway,
)
if !valid {
continue // skip the failover target if we're still populating the snapshot
}
for _, targetID := range append([]string{primaryTargetID}, failover.Targets...) {
target := chain.Targets[targetID]
clusterName := target.Name
targetUID := proxycfg.NewUpstreamIDFromTargetID(targetID)
if targetUID.Peer != "" {
tbs, ok := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(targetUID.Peer)
// We can't generate cluster on peers without the trust bundle. The
// trust bundle should be ready soon.
if !ok {
s.Logger.Debug("peer trust bundle not ready for discovery chain target",
"peer", targetUID.Peer,
"target", targetID,
)
continue
}
clusterName := CustomizeClusterName(target.Name, chain)
clusterName = generatePeeredClusterName(targetUID, tbs)
}
clusterName = CustomizeClusterName(clusterName, chain)
clusterName = failoverClusterNamePrefix + clusterName
if escapeHatchCluster != nil {
clusterName = escapeHatchCluster.Name
}
s.Logger.Debug("generating endpoints for", "cluster", clusterName)
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{endpointGroup},
gatewayKey,
)
resources = append(resources, la)
targetLoadAssignmentOptions = append(targetLoadAssignmentOptions, targetLoadAssignmentOption{
targetID: targetID,
clusterName: clusterName,
})
}
}
targetID := node.Resolver.Target
target := chain.Targets[targetID]
clusterName := CustomizeClusterName(target.Name, chain)
clusterName = clusterNamePrefix + clusterName
if escapeHatchCluster != nil {
clusterName = escapeHatchCluster.Name
}
if forMeshGateway {
clusterName = meshGatewayExportedClusterNamePrefix + clusterName
}
s.Logger.Debug("generating endpoints for", "cluster", clusterName)
endpointGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
upstreamEndpoints,
gatewayEndpoints,
targetID,
gatewayKey,
forMeshGateway,
)
if !valid {
continue // skip the cluster if we're still populating the snapshot
} else {
target := chain.Targets[primaryTargetID]
clusterName := CustomizeClusterName(target.Name, chain)
clusterName = clusterNamePrefix + clusterName
if escapeHatchCluster != nil {
clusterName = escapeHatchCluster.Name
}
if forMeshGateway {
clusterName = meshGatewayExportedClusterNamePrefix + clusterName
}
targetLoadAssignmentOptions = append(targetLoadAssignmentOptions, targetLoadAssignmentOption{
targetID: primaryTargetID,
clusterName: clusterName,
})
}
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{endpointGroup},
gatewayKey,
)
resources = append(resources, la)
for _, targetInfo := range targetLoadAssignmentOptions {
s.Logger.Debug("generating endpoints for", "cluster", targetInfo.clusterName)
targetUID := proxycfg.NewUpstreamIDFromTargetID(targetInfo.targetID)
if targetUID.Peer != "" {
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetInfo.clusterName, targetUID)
if err != nil {
return nil, err
}
if loadAssignment != nil {
resources = append(resources, loadAssignment)
}
continue
}
endpointGroup, valid := makeLoadAssignmentEndpointGroup(
chain.Targets,
upstreamEndpoints,
gatewayEndpoints,
targetInfo.targetID,
gatewayKey,
forMeshGateway,
)
if !valid {
continue // skip the cluster if we're still populating the snapshot
}
la := makeLoadAssignment(
targetInfo.clusterName,
[]loadAssignmentEndpointGroup{endpointGroup},
gatewayKey,
)
resources = append(resources, la)
}
}
return resources, nil
@ -586,6 +644,7 @@ func (s *ResourceGenerator) makeExportedUpstreamEndpointsForMeshGateway(cfgSnap
clusterEndpoints, err := s.endpointsFromDiscoveryChain(
proxycfg.NewUpstreamIDFromServiceName(svc),
chain,
cfgSnap,
cfgSnap.Locality,
nil,
chainEndpoints,
@ -640,11 +699,12 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo
healthStatus = endpointGroup.OverrideHealth
}
endpoint := &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
}
es = append(es, &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
},
Endpoint: endpoint,
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),

View File

@ -284,6 +284,12 @@ func TestEndpointsFromSnapshot(t *testing.T) {
return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover", nil, nil)
},
},
{
name: "connect-proxy-with-chain-and-failover-to-cluster-peer",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-to-cluster-peer", nil, nil)
},
},
{
name: "connect-proxy-with-tcp-chain-failover-through-remote-gateway",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
@ -396,6 +402,13 @@ func TestEndpointsFromSnapshot(t *testing.T) {
"failover", nil, nil, nil)
},
},
{
name: "ingress-with-chain-and-failover-to-cluster-peer",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp",
"failover-to-cluster-peer", nil, nil, nil)
},
},
{
name: "ingress-with-tcp-chain-failover-through-remote-gateway",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {

View File

@ -0,0 +1,219 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"clusterType": {
"name": "envoy.clusters.aggregate",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.clusters.aggregate.v3.ClusterConfig",
"clusters": [
"failover-target~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"failover-target~db.default.cluster-01.external.peer1.domain"
]
}
},
"connectTimeout": "33s",
"lbPolicy": "CLUSTER_PROVIDED"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "failover-target~db.default.cluster-01.external.peer1.domain",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "1s",
"circuitBreakers": {
},
"outlierDetection": {
"maxEjectionPercent": 100
},
"commonLbConfig": {
"healthyPanicThreshold": {
}
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "peer1-root-1\n"
},
"matchSubjectAltNames": [
{
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/payments"
}
]
}
},
"sni": "payments.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "failover-target~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "failover-target~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "33s",
"circuitBreakers": {
},
"outlierDetection": {
},
"commonLbConfig": {
"healthyPanicThreshold": {
}
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
},
"matchSubjectAltNames": [
{
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/db"
}
]
}
},
"sni": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "5s",
"circuitBreakers": {
},
"outlierDetection": {
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
},
"matchSubjectAltNames": [
{
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/geo-cache-target"
},
{
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc2/svc/geo-cache-target"
}
]
}
},
"sni": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "local_app",
"type": "STATIC",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "local_app",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8080
}
}
}
}
]
}
]
}
}
],
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,139 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"clusterType": {
"name": "envoy.clusters.aggregate",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.clusters.aggregate.v3.ClusterConfig",
"clusters": [
"failover-target~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"failover-target~db.default.cluster-01.external.peer1.domain"
]
}
},
"connectTimeout": "33s",
"lbPolicy": "CLUSTER_PROVIDED"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "failover-target~db.default.cluster-01.external.peer1.domain",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "33s",
"circuitBreakers": {
},
"outlierDetection": {
"maxEjectionPercent": 100
},
"commonLbConfig": {
"healthyPanicThreshold": {
}
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "peer1-root-1\n"
},
"matchSubjectAltNames": [
{
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/payments"
}
]
}
},
"sni": "payments.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "failover-target~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "failover-target~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "33s",
"circuitBreakers": {
},
"outlierDetection": {
},
"commonLbConfig": {
"healthyPanicThreshold": {
}
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
},
"matchSubjectAltNames": [
{
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/db"
}
]
}
},
"sni": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
}
],
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,109 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "failover-target~db.default.cluster-01.external.peer1.domain",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.40.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.40.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
},
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "failover-target~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
},
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.20.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"nonce": "00000001"
}

View File

@ -0,0 +1,75 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "failover-target~db.default.cluster-01.external.peer1.domain",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.40.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.40.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
},
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "failover-target~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"nonce": "00000001"
}

View File

@ -0,0 +1,5 @@
primary_datacenter = "alpha"
log_level = "trace"
peering {
enabled = true
}

View File

@ -0,0 +1,26 @@
config_entries {
bootstrap = [
{
kind = "proxy-defaults"
name = "global"
config {
protocol = "tcp"
}
},
{
kind = "exported-services"
name = "default"
services = [
{
name = "s2"
consumers = [
{
peer_name = "alpha-to-primary"
}
]
}
]
}
]
}

View File

@ -0,0 +1,5 @@
services {
name = "mesh-gateway"
kind = "mesh-gateway"
port = 4432
}

View File

@ -0,0 +1 @@
# We don't want an s1 service in this peer

View File

@ -0,0 +1,7 @@
services {
name = "s2"
port = 8181
connect {
sidecar_service {}
}
}

View File

@ -0,0 +1,11 @@
#!/bin/bash
set -euo pipefail
register_services alpha
gen_envoy_bootstrap s2 19002 alpha
gen_envoy_bootstrap mesh-gateway 19003 alpha true
wait_for_config_entry proxy-defaults global alpha
wait_for_config_entry exported-services default alpha

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bats
load helpers
@test "s2 proxy is running correct version" {
assert_envoy_version 19002
}
@test "s2 proxy admin is up on :19002" {
retry_default curl -f -s localhost:19002/stats -o /dev/null
}
@test "gateway-alpha proxy admin is up on :19003" {
retry_default curl -f -s localhost:19003/stats -o /dev/null
}
@test "s2 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s2 alpha
}
@test "s2 proxy should be healthy" {
assert_service_has_healthy_instances s2 1 alpha
}
@test "gateway-alpha should be up and listening" {
retry_long nc -z consul-alpha-client:4432
}

View File

@ -0,0 +1,2 @@
bind_addr = "0.0.0.0"
advertise_addr = "{{ GetInterfaceIP \"eth0\" }}"

View File

@ -0,0 +1,6 @@
#!/bin/bash
snapshot_envoy_admin localhost:19000 s1 primary || true
snapshot_envoy_admin localhost:19001 s2 primary || true
snapshot_envoy_admin localhost:19002 s2 alpha || true
snapshot_envoy_admin localhost:19003 mesh-gateway alpha || true

View File

@ -0,0 +1,3 @@
peering {
enabled = true
}

View File

@ -0,0 +1,21 @@
config_entries {
bootstrap {
kind = "proxy-defaults"
name = "global"
config {
protocol = "tcp"
}
}
bootstrap {
kind = "service-resolver"
name = "s2"
failover = {
"*" = {
targets = [{peer = "primary-to-alpha"}]
}
}
}
}

View File

@ -0,0 +1,16 @@
services {
name = "s1"
port = 8080
connect {
sidecar_service {
proxy {
upstreams = [
{
destination_name = "s2"
local_bind_port = 5000
}
]
}
}
}
}

View File

@ -0,0 +1,7 @@
services {
name = "s2"
port = 8181
connect {
sidecar_service {}
}
}

View File

@ -0,0 +1,10 @@
#!/bin/bash
set -euo pipefail
register_services primary
gen_envoy_bootstrap s1 19000 primary
gen_envoy_bootstrap s2 19001 primary
wait_for_config_entry proxy-defaults global

View File

@ -0,0 +1,87 @@
#!/usr/bin/env bats
load helpers
@test "s1 proxy is running correct version" {
assert_envoy_version 19000
}
@test "s1 proxy admin is up on :19000" {
retry_default curl -f -s localhost:19000/stats -o /dev/null
}
@test "s2 proxy admin is up on :19001" {
retry_default curl -f -s localhost:19001/stats -o /dev/null
}
@test "gateway-primary proxy admin is up on :19001" {
retry_default curl localhost:19000/config_dump
}
@test "s1 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s1
}
@test "s2 proxies should be healthy in primary" {
assert_service_has_healthy_instances s2 1 primary
}
@test "s2 proxies should be healthy in alpha" {
assert_service_has_healthy_instances s2 1 alpha
}
@test "gateway-alpha should be up and listening" {
retry_long nc -z consul-alpha-client:4432
}
@test "peer the two clusters together" {
create_peering primary alpha
}
@test "s2 alpha proxies should be healthy in primary" {
assert_service_has_healthy_instances s2 1 primary "" "" primary-to-alpha
}
@test "s1 upstream should have healthy endpoints for s2 in both primary and failover" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 failover-target~s2.default.primary.internal HEALTHY 1
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 failover-target~s2.default.primary-to-alpha.external HEALTHY 1
}
@test "s1 upstream should be able to connect to s2" {
run retry_default curl -s -f -d hello localhost:5000
[ "$status" -eq 0 ]
[ "$output" = "hello" ]
}
@test "s1 upstream made 1 connection" {
assert_envoy_metric_at_least 127.0.0.1:19000 "cluster.failover-target~s2.default.primary.internal.*cx_total" 1
}
@test "terminate instance of s2 primary envoy which should trigger failover to s2 alpha when the tcp check fails" {
kill_envoy s2 primary
}
@test "s2 proxies should be unhealthy in primary" {
assert_service_has_healthy_instances s2 0 primary
}
@test "s1 upstream should have healthy endpoints for s2 in the failover cluster peer" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 failover-target~s2.default.primary.internal UNHEALTHY 1
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 failover-target~s2.default.primary-to-alpha.external HEALTHY 1
}
@test "reset envoy statistics" {
reset_envoy_metrics 127.0.0.1:19000
}
@test "s1 upstream should be able to connect to s2 in the failover cluster peer" {
run retry_default curl -s -f -d hello localhost:5000
[ "$status" -eq 0 ]
[ "$output" = "hello" ]
}
@test "s1 upstream made 1 connection to s2 through the cluster peer" {
assert_envoy_metric_at_least 127.0.0.1:19000 "cluster.failover-target~s2.default.primary-to-alpha.external.*cx_total" 1
}

View File

@ -0,0 +1,4 @@
#!/bin/bash
export REQUIRED_SERVICES="s1 s1-sidecar-proxy s2 s2-sidecar-proxy s2-alpha s2-sidecar-proxy-alpha gateway-alpha tcpdump-primary tcpdump-alpha"
export REQUIRE_PEERS=1