Merge pull request #11431 from hashicorp/ap/exports-proxycfg

[OSS] Update partitioned mesh gw handling for connect proxies
pull/11255/head
Freddy 2021-10-27 11:27:43 -06:00 committed by GitHub
commit a8762be529
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 183 additions and 120 deletions

3
.changelog/11431.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
connect: **(Enterprise only)** add support for dialing upstreams in remote partitions through mesh gateways.
```

View File

@ -16,7 +16,7 @@ func TestCompiledDiscoveryChain(t *testing.T) {
typ := &CompiledDiscoveryChain{RPC: rpc}
// just do the default chain
chain := discoverychain.TestCompileConfigEntries(t, "web", "default", "default", "dc1", "trustdomain.consul", "dc1", nil)
chain := discoverychain.TestCompileConfigEntries(t, "web", "default", "default", "dc1", "trustdomain.consul", nil)
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.

View File

@ -57,7 +57,6 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
EvaluateInNamespace: entMeta.NamespaceOrDefault(),
EvaluateInPartition: entMeta.PartitionOrDefault(),
EvaluateInDatacenter: evalDC,
UseInDatacenter: c.srv.config.Datacenter,
OverrideMeshGateway: args.OverrideMeshGateway,
OverrideProtocol: args.OverrideProtocol,
OverrideConnectTimeout: args.OverrideConnectTimeout,

View File

@ -18,7 +18,6 @@ type CompileRequest struct {
EvaluateInPartition string
EvaluateInDatacenter string
EvaluateInTrustDomain string
UseInDatacenter string // where the results will be used from
// OverrideMeshGateway allows for the setting to be overridden for any
// resolver in the compiled chain.
@ -62,7 +61,6 @@ func Compile(req CompileRequest) (*structs.CompiledDiscoveryChain, error) {
evaluateInPartition = req.EvaluateInPartition
evaluateInDatacenter = req.EvaluateInDatacenter
evaluateInTrustDomain = req.EvaluateInTrustDomain
useInDatacenter = req.UseInDatacenter
entries = req.Entries
)
if serviceName == "" {
@ -80,9 +78,6 @@ func Compile(req CompileRequest) (*structs.CompiledDiscoveryChain, error) {
if evaluateInTrustDomain == "" {
return nil, fmt.Errorf("evaluateInTrustDomain is required")
}
if useInDatacenter == "" {
return nil, fmt.Errorf("useInDatacenter is required")
}
if entries == nil {
return nil, fmt.Errorf("entries is required")
}
@ -93,7 +88,6 @@ func Compile(req CompileRequest) (*structs.CompiledDiscoveryChain, error) {
evaluateInNamespace: evaluateInNamespace,
evaluateInDatacenter: evaluateInDatacenter,
evaluateInTrustDomain: evaluateInTrustDomain,
useInDatacenter: useInDatacenter,
overrideMeshGateway: req.OverrideMeshGateway,
overrideProtocol: req.OverrideProtocol,
overrideConnectTimeout: req.OverrideConnectTimeout,
@ -130,7 +124,6 @@ type compiler struct {
evaluateInPartition string
evaluateInDatacenter string
evaluateInTrustDomain string
useInDatacenter string
overrideMeshGateway structs.MeshGatewayConfig
overrideProtocol string
overrideConnectTimeout time.Duration
@ -936,10 +929,10 @@ RESOLVE_AGAIN:
}
}
// TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point
if target.Datacenter == c.useInDatacenter {
target.MeshGateway.Mode = structs.MeshGatewayModeDefault
} else if target.External {
// TODO(partitions): Document this change in behavior. Discovery chain targets will now return a mesh gateway
// mode as long as they are not external. Regardless of the datacenter/partition where
// the chain will be used.
if target.External {
// Bypass mesh gateways if it is an external service.
target.MeshGateway.Mode = structs.MeshGatewayModeDefault
} else {

View File

@ -105,7 +105,6 @@ func TestCompile(t *testing.T) {
EvaluateInPartition: "default",
EvaluateInDatacenter: "dc1",
EvaluateInTrustDomain: "trustdomain.consul",
UseInDatacenter: "dc1",
Entries: tc.entries,
}
if tc.setup != nil {
@ -1338,7 +1337,11 @@ func testcase_DatacenterFailover_WithMeshGateways() compileTestCase {
},
},
Targets: map[string]*structs.DiscoveryTarget{
"main.default.default.dc1": newTarget("main", "", "default", "default", "dc1", nil),
"main.default.default.dc1": newTarget("main", "", "default", "default", "dc1", func(t *structs.DiscoveryTarget) {
t.MeshGateway = structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
}
}),
"main.default.default.dc2": newTarget("main", "", "default", "default", "dc2", func(t *structs.DiscoveryTarget) {
t.MeshGateway = structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
@ -1469,7 +1472,11 @@ func testcase_DefaultResolver_WithProxyDefaults() compileTestCase {
},
},
Targets: map[string]*structs.DiscoveryTarget{
"main.default.default.dc1": newTarget("main", "", "default", "default", "dc1", nil),
"main.default.default.dc1": newTarget("main", "", "default", "default", "dc1", func(t *structs.DiscoveryTarget) {
t.MeshGateway = structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
}
}),
},
}
return compileTestCase{entries: entries, expect: expect, expectIsDefault: true}

View File

@ -12,7 +12,6 @@ func TestCompileConfigEntries(t testing.T,
evaluateInPartition string,
evaluateInDatacenter string,
evaluateInTrustDomain string,
useInDatacenter string,
setup func(req *CompileRequest), entries ...structs.ConfigEntry) *structs.CompiledDiscoveryChain {
set := structs.NewDiscoveryChainConfigEntries()
@ -24,7 +23,6 @@ func TestCompileConfigEntries(t testing.T,
EvaluateInPartition: evaluateInPartition,
EvaluateInDatacenter: evaluateInDatacenter,
EvaluateInTrustDomain: evaluateInTrustDomain,
UseInDatacenter: useInDatacenter,
Entries: set,
}
if setup != nil {

View File

@ -394,7 +394,6 @@ func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, serv
EvaluateInNamespace: source.NamespaceOrDefault(),
EvaluateInPartition: source.PartitionOrDefault(),
EvaluateInDatacenter: dc,
UseInDatacenter: dc,
}
idx, chain, err := s.serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req)
if err != nil {
@ -452,7 +451,6 @@ func (s *Store) discoveryChainSourcesTxn(tx ReadTxn, ws memdb.WatchSet, dc strin
EvaluateInNamespace: sn.NamespaceOrDefault(),
EvaluateInPartition: sn.PartitionOrDefault(),
EvaluateInDatacenter: dc,
UseInDatacenter: dc,
}
idx, chain, err := s.serviceDiscoveryChainTxn(tx, ws, sn.Name, &sn.EnterpriseMeta, req)
if err != nil {
@ -723,7 +721,6 @@ func testCompileDiscoveryChain(
EvaluateInPartition: entMeta.PartitionOrDefault(),
EvaluateInDatacenter: "dc1",
EvaluateInTrustDomain: "b6fc9da3-03d4-4b5a-9134-c045e9b20152.consul",
UseInDatacenter: "dc1",
Entries: speculativeEntries,
}
chain, err := discoverychain.Compile(req)
@ -1208,7 +1205,6 @@ func protocolForService(
EvaluateInDatacenter: "dc1",
// Use a dummy trust domain since that won't affect the protocol here.
EvaluateInTrustDomain: "b6fc9da3-03d4-4b5a-9134-c045e9b20152.consul",
UseInDatacenter: "dc1",
Entries: entries,
}
chain, err := discoverychain.Compile(req)

View File

@ -264,6 +264,11 @@ func TestDiscoveryChainRead(t *testing.T) {
})
}))
expectTarget_DC1 := newTarget("web", "", "default", "default", "dc1")
expectTarget_DC1.MeshGateway = structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
}
expectTarget_DC2 := newTarget("web", "", "default", "default", "dc2")
expectTarget_DC2.MeshGateway = structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
@ -291,8 +296,8 @@ func TestDiscoveryChainRead(t *testing.T) {
},
},
Targets: map[string]*structs.DiscoveryTarget{
"web.default.default.dc1": newTarget("web", "", "default", "default", "dc1"),
expectTarget_DC2.ID: expectTarget_DC2,
expectTarget_DC1.ID: expectTarget_DC1,
expectTarget_DC2.ID: expectTarget_DC2,
},
}

View File

@ -59,7 +59,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
roots, leaf := TestCerts(t)
dbDefaultChain := func() *structs.CompiledDiscoveryChain {
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", func(req *discoverychain.CompileRequest) {
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", func(req *discoverychain.CompileRequest) {
// This is because structs.TestUpstreams uses an opaque config
// to override connect timeouts.
req.OverrideConnectTimeout = 1 * time.Second
@ -69,7 +69,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
})
}
dbSplitChain := func() *structs.CompiledDiscoveryChain {
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", "dc1", func(req *discoverychain.CompileRequest) {
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) {
// This is because structs.TestUpstreams uses an opaque config
// to override connect timeouts.
req.OverrideConnectTimeout = 1 * time.Second

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"github.com/mitchellh/copystructure"
@ -38,11 +39,11 @@ type ConfigSnapshotUpstreams struct {
WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes
// WatchedGateways is a map of upstream.Identifier() -> (map of
// TargetID -> CancelFunc) in order to cancel watches for mesh gateways
// GatewayKey.String() -> CancelFunc) in order to cancel watches for mesh gateways
WatchedGateways map[string]map[string]context.CancelFunc
// WatchedGatewayEndpoints is a map of upstream.Identifier() -> (map of
// TargetID -> CheckServiceNodes) and is used to determine the backing
// GatewayKey.String() -> CheckServiceNodes) and is used to determine the backing
// endpoints of a mesh gateway.
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes
@ -53,6 +54,27 @@ type ConfigSnapshotUpstreams struct {
PassthroughUpstreams map[string]ServicePassthroughAddrs
}
type GatewayKey struct {
Datacenter string
Partition string
}
func (k GatewayKey) String() string {
return k.Partition + "." + k.Datacenter
}
func (k GatewayKey) IsEmpty() bool {
return k.Partition == "" && k.Datacenter == ""
}
func gatewayKeyFromString(s string) GatewayKey {
split := strings.SplitN(s, ".", 2)
return GatewayKey{
Partition: split[0],
Datacenter: split[1],
}
}
// ServicePassthroughAddrs contains the LAN addrs
type ServicePassthroughAddrs struct {
// SNI is the Service SNI of the upstream.

View File

@ -3,6 +3,7 @@ package proxycfg
import (
"context"
"errors"
"fmt"
"net"
"reflect"
"time"
@ -426,3 +427,23 @@ func hostnameEndpoints(logger hclog.Logger, localDC string, nodes structs.CheckS
}
return resp
}
type gatewayWatchOpts struct {
notifier CacheNotifier
notifyCh chan cache.UpdateEvent
source structs.QuerySource
token string
key GatewayKey
upstreamID string
}
func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error {
return opts.notifier.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
Datacenter: opts.key.Datacenter,
QueryOptions: structs.QueryOptions{Token: opts.token},
ServiceKind: structs.ServiceKindMeshGateway,
UseServiceKind: true,
Source: opts.source,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(opts.key.Partition),
}, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID), opts.notifyCh)
}

View File

@ -568,7 +568,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:api",
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", "dc1",
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}),
@ -578,7 +578,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:api-failover-remote?dc=dc2",
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-remote", "default", "default", "dc2", "trustdomain.consul", "dc1",
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-remote", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeRemote
}),
@ -588,7 +588,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:api-failover-local?dc=dc2",
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-local", "default", "default", "dc2", "trustdomain.consul", "dc1",
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-local", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}),
@ -598,7 +598,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:api-failover-direct?dc=dc2",
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-direct", "default", "default", "dc2", "trustdomain.consul", "dc1",
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-direct", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeNone
}),
@ -608,7 +608,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:api-dc2",
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-dc2", "default", "default", "dc1", "trustdomain.consul", "dc1",
Chain: discoverychain.TestCompileConfigEntries(t, "api-dc2", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}, &structs.ServiceResolverConfigEntry{
@ -649,8 +649,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
"upstream-target:api-failover-remote.default.default.dc2:api-failover-remote?dc=dc2": genVerifyServiceWatch("api-failover-remote", "", "dc2", true),
"upstream-target:api-failover-local.default.default.dc2:api-failover-local?dc=dc2": genVerifyServiceWatch("api-failover-local", "", "dc2", true),
"upstream-target:api-failover-direct.default.default.dc2:api-failover-direct?dc=dc2": genVerifyServiceWatch("api-failover-direct", "", "dc2", true),
"mesh-gateway:dc2:api-failover-remote?dc=dc2": genVerifyGatewayWatch("dc2"),
"mesh-gateway:dc1:api-failover-local?dc=dc2": genVerifyGatewayWatch("dc1"),
"mesh-gateway:default.dc2:api-failover-remote?dc=dc2": genVerifyGatewayWatch("dc2"),
"mesh-gateway:default.dc1:api-failover-local?dc=dc2": genVerifyGatewayWatch("dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
@ -673,7 +673,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
stage1.requiredWatches["mesh-gateway:dc1:api-dc2"] = genVerifyGatewayWatch("dc1")
stage1.requiredWatches["mesh-gateway:default.dc1:api-dc2"] = genVerifyGatewayWatch("dc1")
}
return testCase{
@ -1032,7 +1032,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:" + api.String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", "dc1", nil),
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil),
},
Err: nil,
},
@ -1863,7 +1863,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:" + db.String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", "dc1", nil),
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil),
},
Err: nil,
},
@ -2012,7 +2012,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:" + db.String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", "dc1", nil, &structs.ServiceResolverConfigEntry{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
Redirect: &structs.ServiceResolverRedirect{
@ -2180,7 +2180,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
CorrelationID: "discovery-chain:" + upstreamIDForDC2(db.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc2", "trustdomain.consul", "dc1",
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}),

View File

@ -669,7 +669,7 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
roots, leaf := TestCerts(t)
// no entries implies we'll get a default chain
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
upstreams := structs.TestUpstreams(t)
@ -1396,7 +1396,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
entries = append(entries, additionalEntries...)
}
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", compileSetup, entries...)
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", compileSetup, entries...)
upstreams := structs.TestUpstreams(t)
snap := ConfigSnapshotUpstreams{
@ -1429,7 +1429,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
"db": {
"dc2": TestGatewayNodesDC2(t),
"default.dc2": TestGatewayNodesDC2(t),
},
}
case "failover-through-double-remote-gateway-triggered":
@ -1442,8 +1442,8 @@ func setupTestVariationConfigEntriesAndSnapshot(
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
"db": {
"dc2": TestGatewayNodesDC2(t),
"dc3": TestGatewayNodesDC3(t),
"default.dc2": TestGatewayNodesDC2(t),
"default.dc3": TestGatewayNodesDC3(t),
},
}
case "failover-through-local-gateway-triggered":
@ -1455,7 +1455,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
"db": {
"dc1": TestGatewayNodesDC1(t),
"default.dc1": TestGatewayNodesDC1(t),
},
}
case "failover-through-double-local-gateway-triggered":
@ -1468,7 +1468,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
"db": {
"dc1": TestGatewayNodesDC1(t),
"default.dc1": TestGatewayNodesDC1(t),
},
}
case "splitter-with-resolver-redirect-multidc":
@ -1737,9 +1737,10 @@ func testConfigSnapshotIngressGateway(
{protocol, 9191}: {
{
// We rely on this one having default type in a few tests...
DestinationName: "db",
LocalBindPort: 9191,
LocalBindAddress: "2.3.4.5",
DestinationName: "db",
DestinationPartition: "default",
LocalBindPort: 9191,
LocalBindAddress: "2.3.4.5",
},
},
},
@ -2084,8 +2085,8 @@ func TestConfigSnapshotIngress_MultipleListenersDuplicateService(t testing.T) *C
},
}
fooChain := discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
barChain := discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
fooChain := discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
barChain := discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.IngressGateway.DiscoveryChain = map[string]*structs.CompiledDiscoveryChain{
"foo": fooChain,

View File

@ -118,14 +118,15 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
dc, svc, ok := removeColonPrefix(correlationID)
key, svc, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[svc]; !ok {
upstreamsSnapshot.WatchedGatewayEndpoints[svc] = make(map[string]structs.CheckServiceNodes)
}
upstreamsSnapshot.WatchedGatewayEndpoints[svc][dc] = resp.Nodes
upstreamsSnapshot.WatchedGatewayEndpoints[svc][key] = resp.Nodes
default:
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
}
@ -207,11 +208,22 @@ func (s *handlerUpstreams) resetWatchesFromChain(
// We'll get endpoints from the gateway query, but the health still has
// to come from the backing service query.
var gk GatewayKey
switch target.MeshGateway.Mode {
case structs.MeshGatewayModeRemote:
needGateways[target.Datacenter] = struct{}{}
gk = GatewayKey{
Partition: target.Partition,
Datacenter: target.Datacenter,
}
case structs.MeshGatewayModeLocal:
needGateways[s.source.Datacenter] = struct{}{}
gk = GatewayKey{
Partition: s.source.NodePartitionOrDefault(),
Datacenter: s.source.Datacenter,
}
}
if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition {
needGateways[gk.String()] = struct{}{}
}
}
@ -240,38 +252,51 @@ func (s *handlerUpstreams) resetWatchesFromChain(
}
}
for dc := range needGateways {
if _, ok := snap.WatchedGateways[id][dc]; ok {
for key := range needGateways {
if _, ok := snap.WatchedGateways[id][key]; ok {
continue
}
gwKey := gatewayKeyFromString(key)
s.logger.Trace("initializing watch of mesh gateway in datacenter",
s.logger.Trace("initializing watch of mesh gateway",
"upstream", id,
"chain", chain.ServiceName,
"datacenter", dc,
"datacenter", gwKey.Datacenter,
"partition", gwKey.Partition,
)
ctx, cancel := context.WithCancel(ctx)
err := s.watchMeshGateway(ctx, dc, id)
opts := gatewayWatchOpts{
notifier: s.cache,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gwKey,
upstreamID: id,
}
err := watchMeshGateway(ctx, opts)
if err != nil {
cancel()
return err
}
snap.WatchedGateways[id][dc] = cancel
snap.WatchedGateways[id][key] = cancel
}
for dc, cancelFn := range snap.WatchedGateways[id] {
if _, ok := needGateways[dc]; ok {
for key, cancelFn := range snap.WatchedGateways[id] {
if _, ok := needGateways[key]; ok {
continue
}
s.logger.Trace("stopping watch of mesh gateway in datacenter",
gwKey := gatewayKeyFromString(key)
s.logger.Trace("stopping watch of mesh gateway",
"upstream", id,
"chain", chain.ServiceName,
"datacenter", dc,
"datacenter", gwKey.Datacenter,
"partition", gwKey.Partition,
)
delete(snap.WatchedGateways[id], dc)
delete(snap.WatchedGatewayEndpoints[id], dc)
delete(snap.WatchedGateways[id], key)
delete(snap.WatchedGatewayEndpoints[id], key)
cancelFn()
}
@ -287,17 +312,6 @@ type targetWatchOpts struct {
entMeta *structs.EnterpriseMeta
}
func (s *handlerUpstreams) watchMeshGateway(ctx context.Context, dc string, upstreamID string) error {
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceKind: structs.ServiceKindMeshGateway,
UseServiceKind: true,
Source: *s.source,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}, "mesh-gateway:"+dc+":"+upstreamID, s.ch)
}
func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
s.logger.Trace("initializing watch of target",
"upstream", opts.upstreamID,

View File

@ -697,7 +697,7 @@ func TestClustersFromSnapshot(t *testing.T) {
}
// There should still be a cluster for non-passthrough requests
snap.ConnectProxy.DiscoveryChain["mongo"] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
snap.ConnectProxy.DiscoveryChain["mongo"] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints["mongo"] = map[string]structs.CheckServiceNodes{
"mongo.default.dc1": {
structs.CheckServiceNode{

View File

@ -3,7 +3,6 @@ package xds
import (
"errors"
"fmt"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
@ -51,7 +50,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
es := s.endpointsFromDiscoveryChain(
id,
chain,
cfgSnap.Datacenter,
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: cfgSnap.ProxyID.PartitionOrDefault()},
cfgSnap.ConnectProxy.UpstreamConfig[id],
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
@ -275,7 +274,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf
es := s.endpointsFromDiscoveryChain(
id,
cfgSnap.IngressGateway.DiscoveryChain[id],
cfgSnap.Datacenter,
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition},
&u,
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id],
cfgSnap.IngressGateway.WatchedGatewayEndpoints[id],
@ -311,9 +310,10 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint {
func (s *ResourceGenerator) endpointsFromDiscoveryChain(
id string,
chain *structs.CompiledDiscoveryChain,
datacenter string,
gatewayKey proxycfg.GatewayKey,
upstream *structs.Upstream,
upstreamEndpoints, gatewayEndpoints map[string]structs.CheckServiceNodes,
upstreamEndpoints map[string]structs.CheckServiceNodes,
gatewayEndpoints map[string]structs.CheckServiceNodes,
) []proto.Message {
var resources []proto.Message
@ -387,7 +387,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
upstreamEndpoints,
gatewayEndpoints,
targetID,
datacenter,
gatewayKey,
)
if !valid {
continue // skip the cluster if we're still populating the snapshot
@ -406,7 +406,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
upstreamEndpoints,
gatewayEndpoints,
failTargetID,
datacenter,
gatewayKey,
)
if !valid {
continue // skip the failover target if we're still populating the snapshot
@ -420,7 +420,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
la := makeLoadAssignment(
clusterName,
endpointGroups,
datacenter,
gatewayKey.Datacenter,
)
resources = append(resources, la)
}
@ -486,7 +486,7 @@ func makeLoadAssignmentEndpointGroup(
targetHealth map[string]structs.CheckServiceNodes,
gatewayHealth map[string]structs.CheckServiceNodes,
targetID string,
currentDatacenter string,
localKey proxycfg.GatewayKey,
) (loadAssignmentEndpointGroup, bool) {
realEndpoints, ok := targetHealth[targetID]
if !ok {
@ -495,15 +495,19 @@ func makeLoadAssignmentEndpointGroup(
}
target := targets[targetID]
var gatewayDatacenter string
var gatewayKey proxycfg.GatewayKey
switch target.MeshGateway.Mode {
case structs.MeshGatewayModeRemote:
gatewayDatacenter = target.Datacenter
gatewayKey.Datacenter = target.Datacenter
gatewayKey.Partition = target.Partition
case structs.MeshGatewayModeLocal:
gatewayDatacenter = currentDatacenter
gatewayKey = localKey
}
if gatewayDatacenter == "" {
if gatewayKey.IsEmpty() || (structs.EqualPartitions(localKey.Partition, target.Partition) && localKey.Datacenter == target.Datacenter) {
// Gateways are not needed if the request isn't for a remote DC or partition.
return loadAssignmentEndpointGroup{
Endpoints: realEndpoints,
OnlyPassing: target.Subset.OnlyPassing,
@ -511,7 +515,7 @@ func makeLoadAssignmentEndpointGroup(
}
// If using a mesh gateway we need to pull those endpoints instead.
gatewayEndpoints, ok := gatewayHealth[gatewayDatacenter]
gatewayEndpoints, ok := gatewayHealth[gatewayKey.String()]
if !ok {
// skip the cluster if we're still populating the snapshot
return loadAssignmentEndpointGroup{}, false

View File

@ -810,7 +810,7 @@ func TestListenersFromSnapshot(t *testing.T) {
snap.ConnectProxy.MeshConfigSet = true
// DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode
snap.ConnectProxy.DiscoveryChain["google"] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
snap.ConnectProxy.DiscoveryChain["google"] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints["google"] = map[string]structs.CheckServiceNodes{
"google.default.default.dc1": {
structs.CheckServiceNode{
@ -847,7 +847,7 @@ func TestListenersFromSnapshot(t *testing.T) {
}
// DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on.
snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
},
},
{
@ -864,7 +864,7 @@ func TestListenersFromSnapshot(t *testing.T) {
}
// DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode
snap.ConnectProxy.DiscoveryChain["google"] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
snap.ConnectProxy.DiscoveryChain["google"] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints["google"] = map[string]structs.CheckServiceNodes{
"google.default.default.dc1": {
structs.CheckServiceNode{
@ -885,7 +885,7 @@ func TestListenersFromSnapshot(t *testing.T) {
}
// DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on.
snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
},
},
{
@ -894,9 +894,9 @@ func TestListenersFromSnapshot(t *testing.T) {
setup: func(snap *proxycfg.ConfigSnapshot) {
snap.Proxy.Mode = structs.ProxyModeTransparent
snap.ConnectProxy.DiscoveryChain["mongo"] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
snap.ConnectProxy.DiscoveryChain["mongo"] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain["kafka"] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil)
snap.ConnectProxy.DiscoveryChain["kafka"] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
kafka := structs.NewServiceName("kafka", structs.DefaultEnterpriseMetaInDefaultPartition())
mongo := structs.NewServiceName("mongo", structs.DefaultEnterpriseMetaInDefaultPartition())

View File

@ -200,10 +200,10 @@ func TestRoutesFromSnapshot(t *testing.T) {
ConnectTimeout: 22 * time.Second,
},
}
fooChain := discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil, entries...)
barChain := discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil, entries...)
bazChain := discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil, entries...)
quxChain := discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil, entries...)
fooChain := discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
barChain := discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
bazChain := discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
quxChain := discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
snap.IngressGateway.DiscoveryChain = map[string]*structs.CompiledDiscoveryChain{
"foo": fooChain,
@ -801,11 +801,11 @@ func setupIngressWithTwoHTTPServices(t *testing.T, o ingressSDSOpts) func(snap *
webChain := discoverychain.TestCompileConfigEntries(t, "web",
o.entMetas["web"].NamespaceOrDefault(),
o.entMetas["web"].PartitionOrDefault(), "dc1",
connect.TestClusterID+".consul", "dc1", nil, entries...)
connect.TestClusterID+".consul", nil, entries...)
fooChain := discoverychain.TestCompileConfigEntries(t, "foo",
o.entMetas["foo"].NamespaceOrDefault(),
o.entMetas["web"].PartitionOrDefault(), "dc1",
connect.TestClusterID+".consul", "dc1", nil, entries...)
connect.TestClusterID+".consul", nil, entries...)
snap.IngressGateway.DiscoveryChain[webUpstream.Identifier()] = webChain
snap.IngressGateway.DiscoveryChain[fooUpstream.Identifier()] = fooChain

View File

@ -3,8 +3,8 @@
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"name": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {

View File

@ -3,8 +3,8 @@
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"name": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {

View File

@ -3,8 +3,8 @@
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"name": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {

View File

@ -3,8 +3,8 @@
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"name": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {

View File

@ -3,7 +3,7 @@
"resources": [
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"clusterName": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [

View File

@ -3,7 +3,7 @@
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"clusterName": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"clusterName": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [

View File

@ -3,7 +3,7 @@
"resources": [
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"clusterName": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [

View File

@ -3,7 +3,7 @@
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"clusterName": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"clusterName": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [

View File

@ -16,7 +16,7 @@
"prefix": "/"
},
"route": {
"cluster": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
"cluster": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]

View File

@ -16,7 +16,7 @@
"prefix": "/"
},
"route": {
"cluster": "a236e964~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
"cluster": "78ebd528~db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]