mirror of https://github.com/hashicorp/consul
Address PR comments.
parent
8aff79edfe
commit
3ed9331e48
|
@ -276,7 +276,7 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
||||||
// set up a watch for them.
|
// set up a watch for them.
|
||||||
if mgwMode == structs.MeshGatewayModeLocal {
|
if mgwMode == structs.MeshGatewayModeLocal {
|
||||||
up := &handlerUpstreams{handlerState: s.handlerState}
|
up := &handlerUpstreams{handlerState: s.handlerState}
|
||||||
up.setupWatchForLocalGWEndpoints(ctx, snapConnectProxy)
|
up.setupWatchForLocalGWEndpoints(ctx, &snapConnectProxy.ConfigSnapshotUpstreams)
|
||||||
} else if mgwMode == structs.MeshGatewayModeNone {
|
} else if mgwMode == structs.MeshGatewayModeNone {
|
||||||
s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid))
|
s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid))
|
||||||
}
|
}
|
||||||
|
|
|
@ -766,6 +766,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
require.True(t, snap.ConnectProxy.IntentionsSet)
|
require.True(t, snap.ConnectProxy.IntentionsSet)
|
||||||
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
|
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
|
||||||
require.True(t, snap.ConnectProxy.MeshConfigSet)
|
require.True(t, snap.ConnectProxy.MeshConfigSet)
|
||||||
|
|
||||||
|
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
|
||||||
|
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
|
||||||
|
_, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -799,6 +805,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
|
|
||||||
require.True(t, snap.ConnectProxy.IntentionsSet)
|
require.True(t, snap.ConnectProxy.IntentionsSet)
|
||||||
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
|
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
|
||||||
|
|
||||||
|
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
|
||||||
|
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
|
||||||
|
_, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,15 +82,6 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
|
||||||
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid)
|
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have to inspect the discovery chains to see if local mesh gateways need to be watched.
|
|
||||||
for _, target := range resp.Chain.Targets {
|
|
||||||
// Register a gateway watch if any targets are pointing to a peer and require a mode of local.
|
|
||||||
if target.Peer != "" && target.MeshGateway.Mode == structs.MeshGatewayModeLocal {
|
|
||||||
s.setupWatchForLocalGWEndpoints(ctx, snap.ConnectProxy)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("discovery-chain watch fired for unsupported kind: %s", snap.Kind)
|
return fmt.Errorf("discovery-chain watch fired for unsupported kind: %s", snap.Kind)
|
||||||
}
|
}
|
||||||
|
@ -335,6 +326,10 @@ func (s *handlerUpstreams) resetWatchesFromChain(
|
||||||
if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition {
|
if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition {
|
||||||
needGateways[gk.String()] = struct{}{}
|
needGateways[gk.String()] = struct{}{}
|
||||||
}
|
}
|
||||||
|
// Register a local gateway watch if any targets are pointing to a peer and require a mode of local.
|
||||||
|
if target.Peer != "" && target.MeshGateway.Mode == structs.MeshGatewayModeLocal {
|
||||||
|
s.setupWatchForLocalGWEndpoints(ctx, snap)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the discovery chain's targets do not lead to watching all endpoints
|
// If the discovery chain's targets do not lead to watching all endpoints
|
||||||
|
@ -560,14 +555,14 @@ func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig
|
||||||
|
|
||||||
func (s *handlerUpstreams) setupWatchForLocalGWEndpoints(
|
func (s *handlerUpstreams) setupWatchForLocalGWEndpoints(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
snapConnectProxy configSnapshotConnectProxy,
|
upstreams *ConfigSnapshotUpstreams,
|
||||||
) error {
|
) error {
|
||||||
gk := GatewayKey{
|
gk := GatewayKey{
|
||||||
Partition: s.source.NodePartitionOrDefault(),
|
Partition: s.proxyID.PartitionOrDefault(),
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
}
|
}
|
||||||
// If the watch is already initialized, do nothing.
|
// If the watch is already initialized, do nothing.
|
||||||
if snapConnectProxy.WatchedLocalGWEndpoints.IsWatched(gk.String()) {
|
if upstreams.WatchedLocalGWEndpoints.IsWatched(gk.String()) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -581,6 +576,6 @@ func (s *handlerUpstreams) setupWatchForLocalGWEndpoints(
|
||||||
if err := watchMeshGateway(ctx, opts); err != nil {
|
if err := watchMeshGateway(ctx, opts); err != nil {
|
||||||
return fmt.Errorf("error while watching for local mesh gateway: %w", err)
|
return fmt.Errorf("error while watching for local mesh gateway: %w", err)
|
||||||
}
|
}
|
||||||
snapConnectProxy.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil)
|
upstreams.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue