Browse Source

Backport of Fix local mesh gateway with peering discovery chains. into release/1.14.x (#15715)

* backport of commit 8aff79edfe

* backport of commit 3ed9331e48

Co-authored-by: Derek Menteer <derek.menteer@hashicorp.com>
pull/15716/head
hc-github-team-consul-core 2 years ago committed by GitHub
parent
commit
17d6705c5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      .changelog/15690.txt
  2. 19
      agent/proxycfg/connect_proxy.go
  3. 12
      agent/proxycfg/state_test.go
  4. 31
      agent/proxycfg/upstreams.go
  5. 26
      agent/xds/endpoints.go

3
.changelog/15690.txt

@ -0,0 +1,3 @@
```release-note:bug
connect: Fix peering failovers ignoring local mesh gateway configuration.
```

19
agent/proxycfg/connect_proxy.go

@ -275,23 +275,8 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
// If a peered upstream is set to local mesh gw mode,
// set up a watch for them.
if mgwMode == structs.MeshGatewayModeLocal {
gk := GatewayKey{
Partition: s.source.NodePartitionOrDefault(),
Datacenter: s.source.Datacenter,
}
if !snapConnectProxy.WatchedLocalGWEndpoints.IsWatched(gk.String()) {
opts := gatewayWatchOpts{
internalServiceDump: s.dataSources.InternalServiceDump,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gk,
}
if err := watchMeshGateway(ctx, opts); err != nil {
return fmt.Errorf("error while watching for local mesh gateway: %w", err)
}
snapConnectProxy.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil)
}
up := &handlerUpstreams{handlerState: s.handlerState}
up.setupWatchForLocalGWEndpoints(ctx, &snapConnectProxy.ConfigSnapshotUpstreams)
} else if mgwMode == structs.MeshGatewayModeNone {
s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid))
}

12
agent/proxycfg/state_test.go

@ -766,6 +766,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.True(t, snap.ConnectProxy.IntentionsSet)
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
require.True(t, snap.ConnectProxy.MeshConfigSet)
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
_, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
require.False(t, ok)
}
},
}
@ -799,6 +805,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.True(t, snap.ConnectProxy.IntentionsSet)
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
_, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
require.False(t, ok)
}
},
}

31
agent/proxycfg/upstreams.go

@ -326,6 +326,10 @@ func (s *handlerUpstreams) resetWatchesFromChain(
if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition {
needGateways[gk.String()] = struct{}{}
}
// Register a local gateway watch if any targets are pointing to a peer and require a mode of local.
if target.Peer != "" && target.MeshGateway.Mode == structs.MeshGatewayModeLocal {
s.setupWatchForLocalGWEndpoints(ctx, snap)
}
}
// If the discovery chain's targets do not lead to watching all endpoints
@ -548,3 +552,30 @@ func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig
err := mapstructure.WeakDecode(m, &cfg)
return cfg, err
}
func (s *handlerUpstreams) setupWatchForLocalGWEndpoints(
ctx context.Context,
upstreams *ConfigSnapshotUpstreams,
) error {
gk := GatewayKey{
Partition: s.proxyID.PartitionOrDefault(),
Datacenter: s.source.Datacenter,
}
// If the watch is already initialized, do nothing.
if upstreams.WatchedLocalGWEndpoints.IsWatched(gk.String()) {
return nil
}
opts := gatewayWatchOpts{
internalServiceDump: s.dataSources.InternalServiceDump,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gk,
}
if err := watchMeshGateway(ctx, opts); err != nil {
return fmt.Errorf("error while watching for local mesh gateway: %w", err)
}
upstreams.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil)
return nil
}

26
agent/xds/endpoints.go

@ -92,7 +92,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
// upstream in clusters.go so that the sets of endpoints generated matches
// the sets of clusters.
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
_, skip := getUpstream(uid)
upstream, skip := getUpstream(uid)
if skip {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
@ -107,7 +107,11 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
clusterName := generatePeeredClusterName(uid, tbs)
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid)
mgwMode := structs.MeshGatewayModeDefault
if upstream != nil {
mgwMode = upstream.MeshGateway.Mode
}
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid, mgwMode)
if err != nil {
return nil, err
}
@ -538,7 +542,12 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint {
}
}
func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, uid proxycfg.UpstreamID) (*envoy_endpoint_v3.ClusterLoadAssignment, error) {
func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
cfgSnap *proxycfg.ConfigSnapshot,
clusterName string,
uid proxycfg.UpstreamID,
upstreamGatewayMode structs.MeshGatewayMode,
) (*envoy_endpoint_v3.ClusterLoadAssignment, error) {
var la *envoy_endpoint_v3.ClusterLoadAssignment
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
@ -546,11 +555,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *pr
return la, err
}
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
// If an upstream is configured with local mesh gw mode, we make a load assignment
// from the gateway endpoints instead of those of the upstreams.
if upstream != nil && upstream.MeshGateway.Mode == structs.MeshGatewayModeLocal {
if upstreamGatewayMode == structs.MeshGatewayModeLocal {
localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String())
if !ok {
// local GW is not ready; return early
@ -643,6 +650,11 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
}
}
mgwMode := structs.MeshGatewayModeDefault
if upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]; upstream != nil {
mgwMode = upstream.MeshGateway.Mode
}
// Find all resolver nodes.
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
@ -682,7 +694,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
s.Logger.Debug("generating endpoints for", "cluster", targetOpt.clusterName)
targetUID := proxycfg.NewUpstreamIDFromTargetID(targetOpt.targetID)
if targetUID.Peer != "" {
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetOpt.clusterName, targetUID)
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetOpt.clusterName, targetUID, mgwMode)
if err != nil {
return nil, err
}

Loading…
Cancel
Save