mirror of https://github.com/hashicorp/consul
Backport of Fix mesh gateway configuration with proxy-defaults into release/1.14.x (#15309)
This pull request was automerged via backport-assistantpull/15314/head
parent
1306b7fbbb
commit
11aaa9bcb3
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
connect: Fix issue where mesh-gateway settings were not properly inherited from configuration entries.
|
||||
```
|
|
@ -150,7 +150,7 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
|
|||
|
||||
// localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries.
|
||||
// In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
|
||||
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with those values.
|
||||
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstreams with those values.
|
||||
localUpstreams := make(map[structs.ServiceID]struct{})
|
||||
|
||||
// Merge upstream defaults into the local registration
|
||||
|
|
|
@ -134,10 +134,10 @@ func ComputeResolvedServiceConfig(
|
|||
|
||||
// Then store upstreams inferred from service-defaults and mapify the overrides.
|
||||
var (
|
||||
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
||||
upstreamOverrides = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
||||
upstreamDefaults *structs.UpstreamConfig
|
||||
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
||||
usConfigs = make(map[structs.ServiceID]map[string]interface{})
|
||||
// resolvedConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
||||
resolvedConfigs = make(map[structs.ServiceID]map[string]interface{})
|
||||
)
|
||||
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
|
||||
for i, override := range serviceConf.UpstreamConfig.Overrides {
|
||||
|
@ -152,18 +152,26 @@ func ComputeResolvedServiceConfig(
|
|||
continue // skip this impossible condition
|
||||
}
|
||||
seenUpstreams[override.ServiceID()] = struct{}{}
|
||||
upstreamConfigs[override.ServiceID()] = override
|
||||
upstreamOverrides[override.ServiceID()] = override
|
||||
}
|
||||
if serviceConf.UpstreamConfig.Defaults != nil {
|
||||
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
|
||||
|
||||
if upstreamDefaults.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
||||
upstreamDefaults.MeshGateway.Mode = thisReply.MeshGateway.Mode
|
||||
}
|
||||
|
||||
// Store the upstream defaults under a wildcard key so that they can be applied to
|
||||
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
|
||||
cfgMap := make(map[string]interface{})
|
||||
upstreamDefaults.MergeInto(cfgMap)
|
||||
|
||||
if !args.MeshGateway.IsZero() {
|
||||
cfgMap["mesh_gateway"] = args.MeshGateway
|
||||
}
|
||||
|
||||
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
|
||||
usConfigs[wildcard] = cfgMap
|
||||
resolvedConfigs[wildcard] = cfgMap
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,34 +198,49 @@ func ComputeResolvedServiceConfig(
|
|||
resolvedCfg["protocol"] = protocol
|
||||
}
|
||||
|
||||
// Merge centralized defaults for all upstreams before configuration for specific upstreams
|
||||
// When dialing an upstream, the goal is to flatten the mesh gateway mode in this order
|
||||
// (larger number wins):
|
||||
// 1. Value from the proxy-defaults
|
||||
// 2. Value from top-level of service-defaults (ServiceDefaults.MeshGateway)
|
||||
// 3. Value from centralized upstream defaults (ServiceDefaults.UpstreamConfig.Defaults)
|
||||
// 4. Value from local proxy registration (NodeService.Proxy.MeshGateway)
|
||||
// 5. Value from centralized upstream override (ServiceDefaults.UpstreamConfig.Overrides)
|
||||
// 6. Value from local upstream definition (NodeService.Proxy.Upstreams[].MeshGateway)
|
||||
//
|
||||
// The MeshGateway value from upstream definitions in the proxy registration override
|
||||
// the one from UpstreamConfig.Defaults and UpstreamConfig.Overrides because they are
|
||||
// specific to the proxy instance.
|
||||
//
|
||||
// Step 6 is handled by the dialer's ServiceManager in MergeServiceConfig.
|
||||
|
||||
// Start with the merged value from proxyConf and serviceConf. (steps 1-2)
|
||||
if !thisReply.MeshGateway.IsZero() {
|
||||
resolvedCfg["mesh_gateway"] = thisReply.MeshGateway
|
||||
}
|
||||
|
||||
// Merge in the upstream defaults (step 3).
|
||||
if upstreamDefaults != nil {
|
||||
upstreamDefaults.MergeInto(resolvedCfg)
|
||||
}
|
||||
|
||||
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
|
||||
// because it is specific to the proxy instance.
|
||||
//
|
||||
// The goal is to flatten the mesh gateway mode in this order:
|
||||
// 0. Value from centralized upstream_defaults
|
||||
// 1. Value from local proxy registration
|
||||
// 2. Value from centralized upstream_config
|
||||
// 3. Value from local upstream definition. This last step is done in the client's service manager.
|
||||
// Merge in the top-level mode from the proxy instance (step 4).
|
||||
if !args.MeshGateway.IsZero() {
|
||||
// This means each upstream inherits the value from the `NodeService.Proxy.MeshGateway` field.
|
||||
resolvedCfg["mesh_gateway"] = args.MeshGateway
|
||||
}
|
||||
|
||||
if upstreamConfigs[upstream] != nil {
|
||||
upstreamConfigs[upstream].MergeInto(resolvedCfg)
|
||||
// Merge in Overrides for the upstream (step 5).
|
||||
if upstreamOverrides[upstream] != nil {
|
||||
upstreamOverrides[upstream].MergeInto(resolvedCfg)
|
||||
}
|
||||
|
||||
if len(resolvedCfg) > 0 {
|
||||
usConfigs[upstream] = resolvedCfg
|
||||
resolvedConfigs[upstream] = resolvedCfg
|
||||
}
|
||||
}
|
||||
|
||||
// don't allocate the slices just to not fill them
|
||||
if len(usConfigs) == 0 {
|
||||
if len(resolvedConfigs) == 0 {
|
||||
return &thisReply, nil
|
||||
}
|
||||
|
||||
|
@ -225,14 +248,14 @@ func ComputeResolvedServiceConfig(
|
|||
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
|
||||
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
|
||||
|
||||
for us, conf := range usConfigs {
|
||||
for us, conf := range resolvedConfigs {
|
||||
thisReply.UpstreamConfigs[us.ID] = conf
|
||||
}
|
||||
|
||||
} else {
|
||||
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
|
||||
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(resolvedConfigs))
|
||||
|
||||
for us, conf := range usConfigs {
|
||||
for us, conf := range resolvedConfigs {
|
||||
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
|
||||
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
|
||||
}
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package configentry
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
|
@ -17,8 +19,19 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
|
|||
|
||||
sid := structs.ServiceID{
|
||||
ID: "sid",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
||||
}
|
||||
uid := structs.ServiceID{
|
||||
ID: "upstream1",
|
||||
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
||||
}
|
||||
uids := []structs.ServiceID{uid}
|
||||
wildcard := structs.NewServiceID(structs.WildcardSpecifier, acl.WildcardEnterpriseMeta())
|
||||
|
||||
localMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}
|
||||
remoteMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}
|
||||
noneMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeNone}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
|
@ -88,12 +101,248 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits proxy-defaults",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ProxyDefaults: map[string]*structs.ProxyConfigEntry{
|
||||
acl.DefaultEnterpriseMeta().PartitionOrDefault(): {
|
||||
MeshGateway: remoteMeshGW, // applied 1st
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
MeshGateway: remoteMeshGW,
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits service-defaults",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ProxyDefaults: map[string]*structs.ProxyConfigEntry{
|
||||
acl.DefaultEnterpriseMeta().PartitionOrDefault(): {
|
||||
MeshGateway: localMeshGW, // applied 1st
|
||||
},
|
||||
},
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
MeshGateway: noneMeshGW, // applied 2nd
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
MeshGateway: noneMeshGW, // service-defaults has a higher precedence.
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": noneMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy wildcard upstream mesh-gateway inherits proxy-defaults",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
Mode: structs.ProxyModeTransparent,
|
||||
},
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ProxyDefaults: map[string]*structs.ProxyConfigEntry{
|
||||
acl.DefaultEnterpriseMeta().PartitionOrDefault(): {
|
||||
MeshGateway: localMeshGW,
|
||||
},
|
||||
},
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||
Defaults: &structs.UpstreamConfig{
|
||||
Protocol: "http",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
MeshGateway: localMeshGW,
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": localMeshGW, // From proxy-defaults
|
||||
"protocol": "http",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits upstream defaults",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ProxyDefaults: map[string]*structs.ProxyConfigEntry{
|
||||
acl.DefaultEnterpriseMeta().PartitionOrDefault(): {
|
||||
MeshGateway: localMeshGW,
|
||||
},
|
||||
},
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
MeshGateway: noneMeshGW,
|
||||
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||
Defaults: &structs.UpstreamConfig{
|
||||
MeshGateway: remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
MeshGateway: noneMeshGW, // Merged from proxy-defaults + service-defaults
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
// Wildcard stores the values from UpstreamConfig.Defaults directly
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
// Upstream-specific config comes from UpstreamConfig.Defaults
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits value from node-service",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
|
||||
// MeshGateway from NodeService is received in the request
|
||||
MeshGateway: remoteMeshGW,
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||
Defaults: &structs.UpstreamConfig{
|
||||
MeshGateway: noneMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
// NodeService.Proxy.MeshGateway has a higher precedence than centralized
|
||||
// UpstreamConfig.Defaults, since it's specific to a service instance.
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits value from service-defaults override",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
MeshGateway: localMeshGW, // applied 2nd
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||
Defaults: &structs.UpstreamConfig{
|
||||
MeshGateway: localMeshGW, // applied 1st
|
||||
},
|
||||
Overrides: []*structs.UpstreamConfig{
|
||||
{
|
||||
Name: uid.ID,
|
||||
MeshGateway: remoteMeshGW, // applied 3rd
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
// Wildcard stores the values from UpstreamConfig.Defaults directly
|
||||
"mesh_gateway": localMeshGW,
|
||||
},
|
||||
},
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
// UpstreamConfig.Overrides has a higher precedence than UpstreamConfig.Defaults
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := ComputeResolvedServiceConfig(tt.args.scReq, tt.args.upstreamIDs,
|
||||
false, tt.args.entries, nil)
|
||||
require.NoError(t, err)
|
||||
// This is needed because map iteration is random and determines the order of some outputs.
|
||||
sort.Slice(got.UpstreamIDConfigs, func(i, j int) bool {
|
||||
return got.UpstreamIDConfigs[i].Upstream.ID < got.UpstreamIDConfigs[j].Upstream.ID
|
||||
})
|
||||
require.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1361,7 +1361,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
|||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": map[string]interface{}{
|
||||
"Mode": "remote",
|
||||
"Mode": "none",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1438,7 +1438,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
|||
"EnforcingConsecutive5xx": int64(60),
|
||||
},
|
||||
"mesh_gateway": map[string]interface{}{
|
||||
"Mode": "remote",
|
||||
"Mode": "none",
|
||||
},
|
||||
"protocol": "http",
|
||||
},
|
||||
|
|
|
@ -197,9 +197,9 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
|
||||
case "":
|
||||
if u.DestinationPeer != "" {
|
||||
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, u, dc)
|
||||
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, NewUpstreamID(&u), u.MeshGateway.Mode, dc)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
return snap, fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -211,7 +211,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
EvaluateInDatacenter: dc,
|
||||
EvaluateInNamespace: ns,
|
||||
EvaluateInPartition: partition,
|
||||
OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway),
|
||||
OverrideMeshGateway: u.MeshGateway,
|
||||
OverrideProtocol: cfg.Protocol,
|
||||
OverrideConnectTimeout: cfg.ConnectTimeout(),
|
||||
}, "discovery-chain:"+uid.String(), s.ch)
|
||||
|
@ -230,31 +230,30 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
||||
ctx context.Context,
|
||||
snapConnectProxy configSnapshotConnectProxy,
|
||||
u structs.Upstream,
|
||||
uid UpstreamID,
|
||||
mgwMode structs.MeshGatewayMode,
|
||||
dc string,
|
||||
) error {
|
||||
uid := NewUpstreamID(&u)
|
||||
|
||||
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
||||
|
||||
// NOTE: An upstream that points to a peer by definition will
|
||||
// only ever watch a single catalog query, so a map key of just
|
||||
// "UID" is sufficient to cover the peer data watches here.
|
||||
snapConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil)
|
||||
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
PeerName: uid.Peer,
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
},
|
||||
ServiceName: u.DestinationName,
|
||||
ServiceName: uid.Name,
|
||||
Connect: true,
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: uid.EnterpriseMeta,
|
||||
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to watch health for %s: %v", uid, err)
|
||||
}
|
||||
snapConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil)
|
||||
|
||||
// Check whether a watch for this peer exists to avoid duplicates.
|
||||
if ok := snapConnectProxy.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
|
||||
|
@ -275,7 +274,7 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
|||
|
||||
// If a peered upstream is set to local mesh gw mode,
|
||||
// set up a watch for them.
|
||||
if u.MeshGateway.Mode == structs.MeshGatewayModeLocal {
|
||||
if mgwMode == structs.MeshGatewayModeLocal {
|
||||
gk := GatewayKey{
|
||||
Partition: s.source.NodePartitionOrDefault(),
|
||||
Datacenter: s.source.Datacenter,
|
||||
|
@ -343,44 +342,9 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
|
|||
}
|
||||
seenUpstreams[uid] = struct{}{}
|
||||
|
||||
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
||||
|
||||
hctx, hcancel := context.WithCancel(ctx)
|
||||
err := s.dataSources.Health.Notify(hctx, &structs.ServiceSpecificRequest{
|
||||
PeerName: uid.Peer,
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
},
|
||||
ServiceName: psn.ServiceName.Name,
|
||||
Connect: true,
|
||||
// Note that Identifier doesn't type-prefix for service any more as it's
|
||||
// the default and makes metrics and other things much cleaner. It's
|
||||
// simpler for us if we have the type to make things unambiguous.
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: uid.EnterpriseMeta,
|
||||
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
|
||||
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, uid, s.proxyCfg.MeshGateway.Mode, s.source.Datacenter)
|
||||
if err != nil {
|
||||
hcancel()
|
||||
return fmt.Errorf("failed to watch health for %s: %v", uid, err)
|
||||
}
|
||||
snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, hcancel)
|
||||
|
||||
// Check whether a watch for this peer exists to avoid duplicates.
|
||||
if ok := snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
|
||||
peerCtx, cancel := context.WithCancel(ctx)
|
||||
if err := s.dataSources.TrustBundle.Notify(peerCtx, &cachetype.TrustBundleReadRequest{
|
||||
Request: &pbpeering.TrustBundleReadRequest{
|
||||
Name: uid.Peer,
|
||||
Partition: uid.PartitionOrDefault(),
|
||||
},
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
|
||||
}
|
||||
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
||||
return fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
||||
}
|
||||
}
|
||||
snap.ConnectProxy.PeeredUpstreams = seenUpstreams
|
||||
|
@ -474,7 +438,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
|
|||
|
||||
meshGateway := s.proxyCfg.MeshGateway
|
||||
if u != nil {
|
||||
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
|
||||
meshGateway = u.MeshGateway
|
||||
}
|
||||
watchOpts := discoveryChainWatchOpts{
|
||||
id: NewUpstreamIDFromServiceName(svc),
|
||||
|
|
|
@ -556,7 +556,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
}
|
||||
|
||||
if meshGatewayProxyConfigValue != structs.MeshGatewayModeDefault {
|
||||
ns.Proxy.MeshGateway.Mode = meshGatewayProxyConfigValue
|
||||
for i := range ns.Proxy.Upstreams {
|
||||
if u := &ns.Proxy.Upstreams[i]; u.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
||||
u.MeshGateway.Mode = meshGatewayProxyConfigValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ixnMatch := TestIntentions()
|
||||
|
@ -3116,6 +3120,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
Address: "10.0.1.1",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "api",
|
||||
MeshGateway: structs.MeshGatewayConfig{
|
||||
Mode: structs.MeshGatewayModeLocal,
|
||||
},
|
||||
Mode: structs.ProxyModeTransparent,
|
||||
Upstreams: structs.Upstreams{
|
||||
{
|
||||
|
@ -3228,11 +3235,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Peered upstream will have set up 3 more watches
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
upstreamPeerWatchIDPrefix + extApiUID.String(): genVerifyServiceSpecificPeeredRequest("api-a", "", "dc1", "peer-a", true),
|
||||
upstreamPeerWatchIDPrefix + extDBUID.String(): genVerifyServiceSpecificPeeredRequest("db", "", "dc1", "peer-a", true),
|
||||
peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"),
|
||||
"mesh-gateway:dc1": genVerifyGatewayWatch("dc1"),
|
||||
},
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
|
@ -3271,14 +3278,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.True(t, ok)
|
||||
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb)
|
||||
|
||||
// Sanity check that local upstream maps are not populated
|
||||
// Ensure that maps for non-peering endpoints are not populated.
|
||||
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
|
||||
|
||||
// Local gateway is watched but there are no endpoints
|
||||
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
|
||||
_, ok = snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||
require.False(t, ok)
|
||||
},
|
||||
},
|
||||
{
|
||||
// Add another instance of "api-a" service
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
CorrelationID: upstreamPeerWatchIDPrefix + extDBUID.String(),
|
||||
|
@ -3300,6 +3311,20 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: "mesh-gateway:dc1",
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node1",
|
||||
Address: "10.1.2.3",
|
||||
},
|
||||
Service: structs.TestNodeServiceMeshGateway(t),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
|
||||
|
@ -3319,12 +3344,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.NotNil(t, ep)
|
||||
require.Len(t, ep, 1)
|
||||
|
||||
require.Equal(t, 1, snap.ConnectProxy.WatchedLocalGWEndpoints.Len())
|
||||
|
||||
gwEp, _ := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||
require.NotNil(t, gwEp)
|
||||
require.Len(t, gwEp, 1)
|
||||
|
||||
// Expect a trust bundle
|
||||
ptb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
|
||||
require.True(t, ok)
|
||||
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb)
|
||||
|
||||
// Sanity check that local upstream maps are not populated
|
||||
// Ensure that maps for non-peering endpoints are not populated.
|
||||
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
|
||||
|
@ -3377,6 +3408,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
DestinationName: "api-a",
|
||||
DestinationPeer: "peer-a",
|
||||
LocalBindPort: 10001,
|
||||
MeshGateway: structs.MeshGatewayConfig{
|
||||
Mode: structs.MeshGatewayModeLocal,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -3401,6 +3435,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatch("web"),
|
||||
peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"),
|
||||
upstreamPeerWatchIDPrefix + extApiUID.String(): genVerifyServiceSpecificPeeredRequest("api-a", "", "dc1", "peer-a", true),
|
||||
"mesh-gateway:dc1": genVerifyGatewayWatch("dc1"),
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.False(t, snap.Valid(), "should not be valid")
|
||||
|
@ -3423,6 +3458,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
_, ok = snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
|
||||
require.False(t, ok) // but no data
|
||||
|
||||
// Local gateway is watched but there are no endpoints
|
||||
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
|
||||
_, ok = snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||
require.False(t, ok)
|
||||
|
||||
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
|
||||
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
|
||||
require.Len(t, snap.ConnectProxy.InboundPeerTrustBundles, 0, "%+v", snap.ConnectProxy.InboundPeerTrustBundles)
|
||||
|
@ -3494,6 +3534,20 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: "mesh-gateway:dc1",
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node1",
|
||||
Address: "10.1.2.3",
|
||||
},
|
||||
Service: structs.TestNodeServiceMeshGateway(t),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid())
|
||||
|
@ -3517,8 +3571,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
|
||||
require.NotNil(t, ep)
|
||||
|
||||
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
|
||||
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
|
||||
require.Equal(t, 1, snap.ConnectProxy.WatchedLocalGWEndpoints.Len())
|
||||
|
||||
gwEp, _ := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||
require.NotNil(t, gwEp)
|
||||
require.Len(t, gwEp, 1)
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -256,7 +256,7 @@ func TestConfigSnapshotPeeringLocalMeshGateway(t testing.T) *ConfigSnapshot {
|
|||
DestinationName: "payments",
|
||||
DestinationPeer: "cloud",
|
||||
LocalBindPort: 9090,
|
||||
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
|
||||
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||
}
|
||||
paymentsUID = NewUpstreamID(&paymentsUpstream)
|
||||
|
||||
|
|
|
@ -69,14 +69,6 @@ func (c *MeshGatewayConfig) IsZero() bool {
|
|||
return *c == zeroVal
|
||||
}
|
||||
|
||||
func (base *MeshGatewayConfig) OverlayWith(overlay MeshGatewayConfig) MeshGatewayConfig {
|
||||
out := *base
|
||||
if overlay.Mode != MeshGatewayModeDefault {
|
||||
out.Mode = overlay.Mode
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func ValidateMeshGatewayMode(mode string) (MeshGatewayMode, error) {
|
||||
switch MeshGatewayMode(mode) {
|
||||
case MeshGatewayModeNone:
|
||||
|
|
|
@ -2,7 +2,6 @@ package structs
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
@ -618,47 +617,6 @@ func TestConnectProxyConfig_UnmarshalJSON(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMeshGatewayConfig_OverlayWith(t *testing.T) {
|
||||
var (
|
||||
D = MeshGatewayConfig{Mode: MeshGatewayModeDefault}
|
||||
N = MeshGatewayConfig{Mode: MeshGatewayModeNone}
|
||||
R = MeshGatewayConfig{Mode: MeshGatewayModeRemote}
|
||||
L = MeshGatewayConfig{Mode: MeshGatewayModeLocal}
|
||||
)
|
||||
|
||||
type testCase struct {
|
||||
base, overlay, expect MeshGatewayConfig
|
||||
}
|
||||
cases := []testCase{
|
||||
{D, D, D},
|
||||
{D, N, N},
|
||||
{D, R, R},
|
||||
{D, L, L},
|
||||
{N, D, N},
|
||||
{N, N, N},
|
||||
{N, R, R},
|
||||
{N, L, L},
|
||||
{R, D, R},
|
||||
{R, N, N},
|
||||
{R, R, R},
|
||||
{R, L, L},
|
||||
{L, D, L},
|
||||
{L, N, N},
|
||||
{L, R, R},
|
||||
{L, L, L},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
|
||||
t.Run(fmt.Sprintf("%s overlaid with %s", tc.base.Mode, tc.overlay.Mode),
|
||||
func(t *testing.T) {
|
||||
got := tc.base.OverlayWith(tc.overlay)
|
||||
require.Equal(t, tc.expect, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateMeshGatewayMode(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
modeConstant string
|
||||
|
|
|
@ -547,6 +547,7 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *pr
|
|||
}
|
||||
|
||||
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
|
||||
|
||||
// If an upstream is configured with local mesh gw mode, we make a load assignment
|
||||
// from the gateway endpoints instead of those of the upstreams.
|
||||
if upstream != nil && upstream.MeshGateway.Mode == structs.MeshGatewayModeLocal {
|
||||
|
|
|
@ -29,19 +29,34 @@
|
|||
{
|
||||
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"name": "payments.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
},
|
||||
"resourceApiVersion": "V3"
|
||||
"type": "LOGICAL_DNS",
|
||||
"connectTimeout": "5s",
|
||||
"loadAssignment": {
|
||||
"clusterName": "payments.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"endpoints": [
|
||||
{
|
||||
"lbEndpoints": [
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "123.us-east-1.elb.notaws.com",
|
||||
"portValue": 8443
|
||||
}
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"circuitBreakers": {
|
||||
|
||||
},
|
||||
"dnsRefreshRate": "10s",
|
||||
"dnsLookupFamily": "V4_ONLY",
|
||||
"outlierDetection": {
|
||||
"maxEjectionPercent": 100
|
||||
},
|
||||
|
|
|
@ -1,28 +1,6 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
|
||||
"clusterName": "payments.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"endpoints": [
|
||||
{
|
||||
"lbEndpoints": [
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "10.0.0.1",
|
||||
"portValue": 1234
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
|
||||
"clusterName": "refunds.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
|
|
Loading…
Reference in New Issue