mirror of https://github.com/hashicorp/consul
Ensure mesh gateway mode override is set for upstreams for intentions
parent
5140c3e51f
commit
49a4a78fd5
|
@ -807,6 +807,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
|
||||
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()]
|
||||
if ok {
|
||||
u = defaults
|
||||
cfgMap = defaults.Config
|
||||
snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults
|
||||
}
|
||||
|
@ -823,7 +824,18 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||
)
|
||||
}
|
||||
|
||||
err = s.watchDiscoveryChain(snap, cfg, svc.String(), svc.Name, svc.NamespaceOrDefault())
|
||||
meshGateway := s.proxyCfg.MeshGateway
|
||||
if u != nil {
|
||||
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
|
||||
}
|
||||
watchOpts := discoveryChainWatchOpts{
|
||||
id: svc.String(),
|
||||
name: svc.Name,
|
||||
namespace: svc.NamespaceOrDefault(),
|
||||
cfg: cfg,
|
||||
meshGateway: meshGateway,
|
||||
}
|
||||
err = s.watchDiscoveryChain(snap, watchOpts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
|
||||
}
|
||||
|
@ -1607,7 +1619,12 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
|
|||
for _, service := range services.Services {
|
||||
u := makeUpstream(service)
|
||||
|
||||
err := s.watchDiscoveryChain(snap, reducedUpstreamConfig{}, u.Identifier(), u.DestinationName, u.DestinationNamespace)
|
||||
watchOpts := discoveryChainWatchOpts{
|
||||
id: u.Identifier(),
|
||||
name: u.DestinationName,
|
||||
namespace: u.DestinationNamespace,
|
||||
}
|
||||
err := s.watchDiscoveryChain(snap, watchOpts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
|
||||
}
|
||||
|
@ -1657,8 +1674,16 @@ func makeUpstream(g *structs.GatewayService) structs.Upstream {
|
|||
return upstream
|
||||
}
|
||||
|
||||
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamConfig, id, name, namespace string) error {
|
||||
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[id]; ok {
|
||||
type discoveryChainWatchOpts struct {
|
||||
id string
|
||||
name string
|
||||
namespace string
|
||||
cfg reducedUpstreamConfig
|
||||
meshGateway structs.MeshGatewayConfig
|
||||
}
|
||||
|
||||
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
|
||||
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1666,12 +1691,13 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamCon
|
|||
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Name: name,
|
||||
Name: opts.name,
|
||||
EvaluateInDatacenter: s.source.Datacenter,
|
||||
EvaluateInNamespace: namespace,
|
||||
OverrideProtocol: cfg.Protocol,
|
||||
OverrideConnectTimeout: cfg.ConnectTimeout(),
|
||||
}, "discovery-chain:"+id, s.ch)
|
||||
EvaluateInNamespace: opts.namespace,
|
||||
OverrideProtocol: opts.cfg.Protocol,
|
||||
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
|
||||
OverrideMeshGateway: opts.meshGateway,
|
||||
}, "discovery-chain:"+opts.id, s.ch)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
|
@ -1679,9 +1705,9 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamCon
|
|||
|
||||
switch s.kind {
|
||||
case structs.ServiceKindIngressGateway:
|
||||
snap.IngressGateway.WatchedDiscoveryChains[id] = cancel
|
||||
snap.IngressGateway.WatchedDiscoveryChains[opts.id] = cancel
|
||||
case structs.ServiceKindConnectProxy:
|
||||
snap.ConnectProxy.WatchedDiscoveryChains[id] = cancel
|
||||
snap.ConnectProxy.WatchedDiscoveryChains[opts.id] = cancel
|
||||
default:
|
||||
cancel()
|
||||
return fmt.Errorf("unsupported kind %s", s.kind)
|
||||
|
|
|
@ -1615,6 +1615,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
Config: map[string]interface{}{
|
||||
"connect_timeout_ms": 6000,
|
||||
},
|
||||
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1726,6 +1727,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
EvaluateInNamespace: "default",
|
||||
Datacenter: "dc1",
|
||||
OverrideConnectTimeout: 6 * time.Second,
|
||||
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||
}),
|
||||
},
|
||||
events: []cache.UpdateEvent{
|
||||
|
|
Loading…
Reference in New Issue