Browse Source

peering: allow mesh gateways to proxy L4 peered traffic (#13339)

Mesh gateways will now enable tcp connections with SNI names including peering information so that those connections may be proxied.

Note: this does not change the callers to use these mesh gateways.
pull/13294/head
R.B. Boyer 2 years ago committed by GitHub
parent
commit
ab758b7b32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      agent/agent.go
  2. 51
      agent/cache-types/exported_peered_services.go
  3. 69
      agent/cache-types/exported_peered_services_test.go
  4. 10
      agent/consul/acl.go
  5. 33
      agent/consul/internal_endpoint.go
  6. 27
      agent/consul/state/peering.go
  7. 6
      agent/proxycfg-glue/glue.go
  8. 10
      agent/proxycfg/data_sources.go
  9. 108
      agent/proxycfg/mesh_gateway.go
  10. 47
      agent/proxycfg/snapshot.go
  11. 1
      agent/proxycfg/state.go
  12. 27
      agent/proxycfg/state_test.go
  13. 1
      agent/proxycfg/testing.go
  14. 53
      agent/proxycfg/testing_mesh_gateway.go
  15. 7
      agent/rpc/peering/subscription_manager.go
  16. 5
      agent/structs/peering.go
  17. 53
      agent/xds/listeners.go
  18. 6
      agent/xds/listeners_test.go
  19. 147
      agent/xds/testdata/listeners/mesh-gateway-with-exported-peered-services.latest.golden

3
agent/agent.go

@ -651,6 +651,7 @@ func (a *Agent) Start(ctx context.Context) error {
ServiceList: proxycfgglue.CacheServiceList(a.cache),
TrustBundle: proxycfgglue.CacheTrustBundle(a.cache),
TrustBundleList: proxycfgglue.CacheTrustBundleList(a.cache),
ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache),
}
a.fillEnterpriseProxyDataSources(&proxyDataSources)
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
@ -4136,6 +4137,8 @@ func (a *Agent) registerCache() {
a.cache.RegisterType(cachetype.TrustBundleReadName, &cachetype.TrustBundle{Client: a.rpcClientPeering})
a.cache.RegisterType(cachetype.ExportedPeeredServicesName, &cachetype.ExportedPeeredServices{RPC: a})
a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName,
&cachetype.FederationStateListMeshGateways{RPC: a})

51
agent/cache-types/exported_peered_services.go

@ -0,0 +1,51 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const ExportedPeeredServicesName = "exported-peered-services"
type ExportedPeeredServices struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
func (c *ExportedPeeredServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.DCSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.IndexedExportedServiceList
if err := c.RPC.RPC("Internal.ExportedPeeredServices", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}

69
agent/cache-types/exported_peered_services_test.go

@ -0,0 +1,69 @@
package cachetype
import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestExportedPeeredServices(t *testing.T) {
rpc := TestRPC(t)
typ := &ExportedPeeredServices{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedExportedServiceList
rpc.On("RPC", "Internal.ExportedPeeredServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedExportedServiceList)
reply.Services = map[string]structs.ServiceList{
"my-peer": {
structs.ServiceName{
Name: "foo",
},
structs.ServiceName{
Name: "bar",
},
},
}
reply.QueryMeta.Index = 48
resp = reply
})
// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.DCSpecificRequest{
Datacenter: "dc1",
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)
rpc.AssertExpectations(t)
}
func TestExportedPeeredServices_badReqType(t *testing.T) {
rpc := TestRPC(t)
typ := &ExportedPeeredServices{RPC: rpc}
// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(t, err)
require.Contains(t, err.Error(), "wrong type")
rpc.AssertExpectations(t)
}

10
agent/consul/acl.go

@ -1940,6 +1940,16 @@ func filterACLWithAuthorizer(logger hclog.Logger, authorizer acl.Authorizer, sub
case *structs.IndexedServiceList:
v.QueryMeta.ResultsFilteredByACLs = filt.filterServiceList(&v.Services)
case *structs.IndexedExportedServiceList:
for peer, peerServices := range v.Services {
v.QueryMeta.ResultsFilteredByACLs = filt.filterServiceList(&peerServices)
if len(peerServices) == 0 {
delete(v.Services, peer)
} else {
v.Services[peer] = peerServices
}
}
case *structs.IndexedGatewayServices:
v.QueryMeta.ResultsFilteredByACLs = filt.filterGatewayServices(&v.Services)

33
agent/consul/internal_endpoint.go

@ -435,6 +435,39 @@ func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply
)
}
// ExportedPeeredServices is used to query the exported services for peers.
// Returns services as a map of ServiceNames by peer.
func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply *structs.IndexedExportedServiceList) error {
if done, err := m.srv.ForwardRPC("Internal.ExportedPeeredServices", args, reply); done {
return err
}
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
// TODO(peering): acls: mesh gateway needs appropriate wildcard service:read
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.EnterpriseMeta)
if err != nil {
return err
}
reply.Index, reply.Services = index, serviceMap
m.srv.filterACLWithAuthorizer(authz, reply)
return nil
})
}
// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC
// call to fire an event. The primary use case is to enable user events being
// triggered in a remote DC.

27
agent/consul/state/peering.go

@ -310,6 +310,33 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
return s.exportedServicesForPeerTxn(ws, tx, peering)
}
func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
maxIdx, peerings, err := s.peeringListTxn(ws, tx, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to list peerings: %w", err)
}
out := make(map[string]structs.ServiceList)
for _, peering := range peerings {
idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering)
if err != nil {
return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err)
}
if idx > maxIdx {
maxIdx = idx
}
m := list.ListAllDiscoveryChains()
if len(m) > 0 {
out[peering.Name] = maps.SliceOfKeys(m)
}
}
return maxIdx, out, nil
}
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) {
maxIdx := peering.ModifyIndex

6
agent/proxycfg-glue/glue.go

@ -113,6 +113,12 @@ func CacheTrustBundleList(c *cache.Cache) proxycfg.TrustBundleList {
return &cacheProxyDataSource[*pbpeering.TrustBundleListByServiceRequest]{c, cachetype.TrustBundleListName}
}
// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices
// interface by sourcing data from the agent cache.
func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ExportedPeeredServicesName}
}
// cacheProxyDataSource implements a generic wrapper around the agent cache to
// provide data to the proxycfg.Manager.
type cacheProxyDataSource[ReqType cache.Request] struct {

10
agent/proxycfg/data_sources.go

@ -86,6 +86,10 @@ type DataSources struct {
// peered clusters that the given proxy is exported to.
TrustBundleList TrustBundleList
// ExportedPeeredServices provides updates about the list of all exported
// services in a datacenter on a notification channel.
ExportedPeeredServices ExportedPeeredServices
DataSourcesEnterprise
}
@ -195,3 +199,9 @@ type TrustBundle interface {
type TrustBundleList interface {
Notify(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest, correlationID string, ch chan<- UpdateEvent) error
}
// ExportedPeeredServices is the interface used to consume updates about the
// list of all services exported to peers in a datacenter.
type ExportedPeeredServices interface {
Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}

108
agent/proxycfg/mesh_gateway.go

@ -3,10 +3,12 @@ package proxycfg
import (
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/logging"
)
@ -67,12 +69,26 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
return snap, err
}
// Watch for all exported services from this mesh gateway's partition in any peering.
err = s.dataSources.ExportedPeeredServices.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, exportedServiceListWatchID, s.ch)
if err != nil {
return snap, err
}
snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
snap.MeshGateway.GatewayGroups = make(map[string]structs.CheckServiceNodes)
snap.MeshGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry)
snap.MeshGateway.HostnameDatacenters = make(map[string]structs.CheckServiceNodes)
snap.MeshGateway.ExportedServicesWithPeers = make(map[structs.ServiceName][]string)
snap.MeshGateway.DiscoveryChain = make(map[structs.ServiceName]*structs.CompiledDiscoveryChain)
snap.MeshGateway.WatchedDiscoveryChains = make(map[structs.ServiceName]context.CancelFunc)
// there is no need to initialize the map of service resolvers as we
// fully rebuild it every time we get updates
@ -295,6 +311,80 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
snap.MeshGateway.ConsulServers = resp.Nodes
case exportedServiceListWatchID:
exportedServices, ok := u.Result.(*structs.IndexedExportedServiceList)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
seenServices := make(map[structs.ServiceName][]string) // svc -> peername slice
for peerName, services := range exportedServices.Services {
for _, svc := range services {
seenServices[svc] = append(seenServices[svc], peerName)
}
}
// Sort the peer names so ultimately xDS has a stable output.
for svc := range seenServices {
sort.Strings(seenServices[svc])
}
peeredServiceList := maps.SliceOfKeys(seenServices)
structs.ServiceList(peeredServiceList).Sort()
snap.MeshGateway.ExportedServicesSlice = peeredServiceList
snap.MeshGateway.ExportedServicesWithPeers = seenServices
snap.MeshGateway.WatchedExportedServices = exportedServices.Services
snap.MeshGateway.WatchedExportedServicesSet = true
// For each service that we should be exposing, also watch disco chains
// in the same manner as an ingress gateway would.
for _, svc := range snap.MeshGateway.ExportedServicesSlice {
if _, ok := snap.MeshGateway.WatchedDiscoveryChains[svc]; ok {
continue
}
ctx, cancel := context.WithCancel(ctx)
err := s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: svc.Name,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: svc.NamespaceOrDefault(),
EvaluateInPartition: svc.PartitionOrDefault(),
}, "discovery-chain:"+svc.String(), s.ch)
if err != nil {
meshLogger.Error("failed to register watch for discovery chain",
"service", svc.String(),
"error", err,
)
cancel()
return err
}
snap.MeshGateway.WatchedDiscoveryChains[svc] = cancel
}
// Clean up data from services that were not in the update
for svc, cancelFn := range snap.MeshGateway.WatchedDiscoveryChains {
if _, ok := seenServices[svc]; !ok {
cancelFn()
delete(snap.MeshGateway.WatchedDiscoveryChains, svc)
}
}
// These entries are intentionally handled separately from the
// WatchedDiscoveryChains above. There have been situations where a
// discovery watch was cancelled, then fired. That update event then
// re-populated the DiscoveryChain map entry, which wouldn't get
// cleaned up since there was no known watch for it.
for svc := range snap.MeshGateway.DiscoveryChain {
if _, ok := seenServices[svc]; !ok {
delete(snap.MeshGateway.DiscoveryChain, svc)
}
}
default:
switch {
case strings.HasPrefix(u.CorrelationID, "connect-service:"):
@ -330,6 +420,24 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
)
}
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
svcString := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
svc := structs.ServiceNameFromString(svcString)
if !snap.MeshGateway.IsServiceExported(svc) {
delete(snap.MeshGateway.DiscoveryChain, svc)
s.logger.Trace("discovery-chain watch fired for unknown service", "service", svc)
return nil
}
snap.MeshGateway.DiscoveryChain[svc] = resp.Chain
// TODO(peering): we need to do this if we are going to setup a cross-partition or cross-datacenter target
default:
if err := s.handleEntUpdate(meshLogger, ctx, u, snap); err != nil {
return err

47
agent/proxycfg/snapshot.go

@ -331,6 +331,33 @@ type configSnapshotMeshGateway struct {
// HostnameDatacenters is a map of datacenters to mesh gateway instances with a hostname as the address.
// If hostnames are configured they must be provided to Envoy via CDS not EDS.
HostnameDatacenters map[string]structs.CheckServiceNodes
// TODO(peering):
ExportedServicesSlice []structs.ServiceName
// TODO(peering): svc -> peername slice
ExportedServicesWithPeers map[structs.ServiceName][]string
// TODO(peering): discard this maybe
WatchedExportedServices map[string]structs.ServiceList
// TODO(peering):
WatchedExportedServicesSet bool
// TODO(peering):
DiscoveryChain map[structs.ServiceName]*structs.CompiledDiscoveryChain
// TODO(peering):
WatchedDiscoveryChains map[structs.ServiceName]context.CancelFunc
}
func (c *configSnapshotMeshGateway) IsServiceExported(svc structs.ServiceName) bool {
if c == nil || len(c.ExportedServicesWithPeers) == 0 {
return false
}
_, ok := c.ExportedServicesWithPeers[svc]
return ok
}
func (c *configSnapshotMeshGateway) GatewayKeys() []GatewayKey {
@ -376,7 +403,22 @@ func (c *configSnapshotMeshGateway) isEmpty() bool {
len(c.GatewayGroups) == 0 &&
len(c.FedStateGateways) == 0 &&
len(c.ConsulServers) == 0 &&
len(c.HostnameDatacenters) == 0
len(c.HostnameDatacenters) == 0 &&
c.isEmptyPeering()
}
// isEmptyPeering is a test helper
func (c *configSnapshotMeshGateway) isEmptyPeering() bool {
if c == nil {
return true
}
return len(c.ExportedServicesSlice) == 0 &&
len(c.ExportedServicesWithPeers) == 0 &&
len(c.WatchedExportedServices) == 0 &&
!c.WatchedExportedServicesSet &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0
}
type configSnapshotIngressGateway struct {
@ -496,7 +538,8 @@ func (s *ConfigSnapshot) Valid() bool {
}
}
return s.Roots != nil &&
(s.MeshGateway.WatchedServicesSet || len(s.MeshGateway.ServiceGroups) > 0)
(s.MeshGateway.WatchedServicesSet || len(s.MeshGateway.ServiceGroups) > 0) &&
s.MeshGateway.WatchedExportedServicesSet
case structs.ServiceKindIngressGateway:
return s.Roots != nil &&

1
agent/proxycfg/state.go

@ -37,6 +37,7 @@ const (
serviceIntentionsIDPrefix = "service-intentions:"
intentionUpstreamsID = "intention-upstreams"
upstreamPeerWatchIDPrefix = "upstream-peer:"
exportedServiceListWatchID = "exported-service-list"
meshConfigEntryID = "mesh"
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"

27
agent/proxycfg/state_test.go

@ -136,6 +136,7 @@ func recordWatches(sc *stateConfig) *watchRecorder {
ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
TrustBundle: typedWatchRecorder[*pbpeering.TrustBundleReadRequest]{wr},
TrustBundleList: typedWatchRecorder[*pbpeering.TrustBundleListByServiceRequest]{wr},
ExportedPeeredServices: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
}
recordWatchesEnterprise(sc, wr)
@ -725,9 +726,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
datacentersWatchID: verifyDatacentersWatch,
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
datacentersWatchID: verifyDatacentersWatch,
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without root is not valid")
@ -737,6 +739,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
events: []UpdateEvent{
rootWatchEvent(),
{
CorrelationID: exportedServiceListWatchID,
Result: &structs.IndexedExportedServiceList{
Services: nil,
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without services is valid")
@ -786,12 +794,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
datacentersWatchID: verifyDatacentersWatch,
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
datacentersWatchID: verifyDatacentersWatch,
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
},
events: []UpdateEvent{
rootWatchEvent(),
{
CorrelationID: exportedServiceListWatchID,
Result: &structs.IndexedExportedServiceList{
Services: nil,
},
},
{
CorrelationID: serviceListWatchID,
Result: &structs.IndexedServiceList{

1
agent/proxycfg/testing.go

@ -721,6 +721,7 @@ func testConfigSnapshotFixture(
ServiceList: &noopDataSource[*structs.DCSpecificRequest]{},
TrustBundle: &noopDataSource[*pbpeering.TrustBundleReadRequest]{},
TrustBundleList: &noopDataSource[*pbpeering.TrustBundleListByServiceRequest]{},
ExportedPeeredServices: &noopDataSource[*structs.DCSpecificRequest]{},
},
dnsConfig: DNSConfig{ // TODO: make configurable
Domain: "consul",

53
agent/proxycfg/testing_mesh_gateway.go

@ -5,7 +5,10 @@ import (
"time"
"github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
)
@ -20,6 +23,50 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
switch variant {
case "default":
case "peered-services":
var (
fooSN = structs.NewServiceName("foo", nil)
barSN = structs.NewServiceName("bar", nil)
girSN = structs.NewServiceName("gir", nil)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
girChain = discoverychain.TestCompileConfigEntries(t, "gir", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
)
assert.True(t, fooChain.Default)
assert.True(t, barChain.Default)
assert.True(t, girChain.Default)
extraUpdates = append(extraUpdates,
UpdateEvent{
CorrelationID: exportedServiceListWatchID,
Result: &structs.IndexedExportedServiceList{
Services: map[string]structs.ServiceList{
"peer1": []structs.ServiceName{fooSN, barSN},
"peer2": []structs.ServiceName{girSN},
},
},
},
UpdateEvent{
CorrelationID: "discovery-chain:" + fooSN.String(),
Result: &structs.DiscoveryChainResponse{
Chain: fooChain,
},
},
UpdateEvent{
CorrelationID: "discovery-chain:" + barSN.String(),
Result: &structs.DiscoveryChainResponse{
Chain: barChain,
},
},
UpdateEvent{
CorrelationID: "discovery-chain:" + girSN.String(),
Result: &structs.DiscoveryChainResponse{
Chain: girChain,
},
},
)
case "federation-states":
populateServices = true
useFederationStates = true
@ -257,6 +304,12 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
CorrelationID: rootsWatchID,
Result: roots,
},
{
CorrelationID: exportedServiceListWatchID,
Result: &structs.IndexedExportedServiceList{
Services: nil,
},
},
{
CorrelationID: serviceListWatchID,
Result: &structs.IndexedServiceList{

7
agent/rpc/peering/subscription_manager.go

@ -181,6 +181,13 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
// skip checks since we just generated one from scratch
}
// Scrub raft indexes
for _, instance := range csn.Nodes {
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
// skip checks since we just generated one from scratch
}
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
// Just ferry this one directly along to the destination.

5
agent/structs/peering.go

@ -8,6 +8,11 @@ type PeeringToken struct {
PeerID string
}
type IndexedExportedServiceList struct {
Services map[string]ServiceList
QueryMeta
}
// NOTE: this is not serialized via msgpack so it can be changed without concern.
type ExportedServiceList struct {
// Services is a list of exported services that apply to both standard

53
agent/xds/listeners.go

@ -1371,6 +1371,59 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
l := makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_UNSPECIFIED)
l.ListenerFilters = []*envoy_listener_v3.ListenerFilter{tlsInspector}
// Add in TCP filter chains for plain peered passthrough.
//
// TODO(peering): make this work for L7 as well
// TODO(peering): make failover work
for _, svc := range cfgSnap.MeshGateway.ExportedServicesSlice {
peerNames, ok := cfgSnap.MeshGateway.ExportedServicesWithPeers[svc]
if !ok {
continue // not possible
}
chain, ok := cfgSnap.MeshGateway.DiscoveryChain[svc]
if !ok {
continue // ignore; not ready
}
if structs.IsProtocolHTTPLike(chain.Protocol) {
continue // temporary skip
}
target, err := simpleChainTarget(chain)
if err != nil {
return nil, err
}
clusterName := CustomizeClusterName(target.Name, chain)
filterName := fmt.Sprintf("%s.%s.%s.%s", chain.ServiceName, chain.Namespace, chain.Partition, chain.Datacenter)
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_local_peered.")
if err != nil {
return nil, err
}
var peeredServerNames []string
for _, peerName := range peerNames {
peeredSNI := connect.PeeredServiceSNI(
svc.Name,
svc.NamespaceOrDefault(),
svc.PartitionOrDefault(),
peerName,
cfgSnap.Roots.TrustDomain,
)
peeredServerNames = append(peeredServerNames, peeredSNI)
}
l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
ServerNames: peeredServerNames,
},
Filters: []*envoy_listener_v3.Filter{
dcTCPProxy,
},
})
}
// We need 1 Filter Chain per remote cluster
keys := cfgSnap.MeshGateway.GatewayKeys()
for _, key := range keys {

6
agent/xds/listeners_test.go

@ -476,6 +476,12 @@ func TestListenersFromSnapshot(t *testing.T) {
return proxycfg.TestConfigSnapshotMeshGateway(t, "no-services", nil, nil)
},
},
{
name: "mesh-gateway-with-exported-peered-services",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotMeshGateway(t, "peered-services", nil, nil)
},
},
{
name: "mesh-gateway-tagged-addresses",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {

147
agent/xds/testdata/listeners/mesh-gateway-with-exported-peered-services.latest.golden vendored

@ -0,0 +1,147 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "default:1.2.3.4:8443",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 8443
}
},
"filterChains": [
{
"filterChainMatch": {
"serverNames": [
"bar.default.default.peer1.external.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local_peered.bar.default.default.dc1",
"cluster": "bar.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filterChainMatch": {
"serverNames": [
"foo.default.default.peer1.external.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local_peered.foo.default.default.dc1",
"cluster": "foo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filterChainMatch": {
"serverNames": [
"gir.default.default.peer2.external.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local_peered.gir.default.default.dc1",
"cluster": "gir.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filterChainMatch": {
"serverNames": [
"*.dc2.internal.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_remote.default.dc2",
"cluster": "dc2.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filterChainMatch": {
"serverNames": [
"*.dc4.internal.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_remote.default.dc4",
"cluster": "dc4.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filterChainMatch": {
"serverNames": [
"*.dc6.internal.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_remote.default.dc6",
"cluster": "dc6.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filters": [
{
"name": "envoy.filters.network.sni_cluster",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.sni_cluster.v3.SniCluster"
}
},
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local.default",
"cluster": ""
}
}
]
}
],
"listenerFilters": [
{
"name": "envoy.filters.listener.tls_inspector",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector"
}
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
"nonce": "00000001"
}
Loading…
Cancel
Save