diff --git a/agent/agent.go b/agent/agent.go index 2056c8a5e7..f0b345f951 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -651,6 +651,7 @@ func (a *Agent) Start(ctx context.Context) error { ServiceList: proxycfgglue.CacheServiceList(a.cache), TrustBundle: proxycfgglue.CacheTrustBundle(a.cache), TrustBundleList: proxycfgglue.CacheTrustBundleList(a.cache), + ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache), } a.fillEnterpriseProxyDataSources(&proxyDataSources) a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ @@ -4136,6 +4137,8 @@ func (a *Agent) registerCache() { a.cache.RegisterType(cachetype.TrustBundleReadName, &cachetype.TrustBundle{Client: a.rpcClientPeering}) + a.cache.RegisterType(cachetype.ExportedPeeredServicesName, &cachetype.ExportedPeeredServices{RPC: a}) + a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName, &cachetype.FederationStateListMeshGateways{RPC: a}) diff --git a/agent/cache-types/exported_peered_services.go b/agent/cache-types/exported_peered_services.go new file mode 100644 index 0000000000..02bc46a4c2 --- /dev/null +++ b/agent/cache-types/exported_peered_services.go @@ -0,0 +1,51 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const ExportedPeeredServicesName = "exported-peered-services" + +type ExportedPeeredServices struct { + RegisterOptionsBlockingRefresh + RPC RPC +} + +func (c *ExportedPeeredServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a DCSpecificRequest. + reqReal, ok := req.(*structs.DCSpecificRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Lightweight copy this object so that manipulating QueryOptions doesn't race. + dup := *reqReal + reqReal = &dup + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Always allow stale - there's no point in hitting leader if the request is + // going to be served from cache and end up arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.IndexedExportedServiceList + if err := c.RPC.RPC("Internal.ExportedPeeredServices", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} diff --git a/agent/cache-types/exported_peered_services_test.go b/agent/cache-types/exported_peered_services_test.go new file mode 100644 index 0000000000..74b0cd7b36 --- /dev/null +++ b/agent/cache-types/exported_peered_services_test.go @@ -0,0 +1,69 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +func TestExportedPeeredServices(t *testing.T) { + rpc := TestRPC(t) + typ := &ExportedPeeredServices{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.IndexedExportedServiceList + rpc.On("RPC", "Internal.ExportedPeeredServices", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.DCSpecificRequest) + require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) + require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) + require.True(t, req.AllowStale) + + reply := args.Get(2).(*structs.IndexedExportedServiceList) + reply.Services = map[string]structs.ServiceList{ + "my-peer": { + structs.ServiceName{ + Name: "foo", + }, + structs.ServiceName{ + Name: "bar", + }, + }, + } + reply.QueryMeta.Index = 48 + resp = reply + }) + + // Fetch + resultA, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.DCSpecificRequest{ + Datacenter: "dc1", + }) + require.NoError(t, err) + require.Equal(t, cache.FetchResult{ + Value: resp, + Index: 48, + }, resultA) + + rpc.AssertExpectations(t) +} + +func TestExportedPeeredServices_badReqType(t *testing.T) { + rpc := TestRPC(t) + typ := &ExportedPeeredServices{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong type") + rpc.AssertExpectations(t) +} diff --git a/agent/consul/acl.go b/agent/consul/acl.go index 8543b7f516..efa9b2665e 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -1940,6 +1940,16 @@ func filterACLWithAuthorizer(logger hclog.Logger, authorizer acl.Authorizer, sub case *structs.IndexedServiceList: v.QueryMeta.ResultsFilteredByACLs = filt.filterServiceList(&v.Services) + case *structs.IndexedExportedServiceList: + for peer, peerServices := range v.Services { + v.QueryMeta.ResultsFilteredByACLs = filt.filterServiceList(&peerServices) + if len(peerServices) == 0 { + delete(v.Services, peer) + } else { + v.Services[peer] = peerServices + } + } + case *structs.IndexedGatewayServices: v.QueryMeta.ResultsFilteredByACLs = filt.filterGatewayServices(&v.Services) diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 718002889c..20169562d7 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -435,6 +435,39 @@ func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply ) } +// ExportedPeeredServices is used to query the exported services for peers. +// Returns services as a map of ServiceNames by peer. +func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply *structs.IndexedExportedServiceList) error { + if done, err := m.srv.ForwardRPC("Internal.ExportedPeeredServices", args, reply); done { + return err + } + + authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) + if err != nil { + return err + } + + if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + + // TODO(peering): acls: mesh gateway needs appropriate wildcard service:read + + return m.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.EnterpriseMeta) + if err != nil { + return err + } + + reply.Index, reply.Services = index, serviceMap + m.srv.filterACLWithAuthorizer(authz, reply) + return nil + }) +} + // EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC // call to fire an event. The primary use case is to enable user events being // triggered in a remote DC. diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index d18d0ab808..f53ce60811 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -310,6 +310,33 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6 return s.exportedServicesForPeerTxn(ws, tx, peering) } +func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + maxIdx, peerings, err := s.peeringListTxn(ws, tx, entMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to list peerings: %w", err) + } + + out := make(map[string]structs.ServiceList) + for _, peering := range peerings { + idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering) + if err != nil { + return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err) + } + if idx > maxIdx { + maxIdx = idx + } + m := list.ListAllDiscoveryChains() + if len(m) > 0 { + out[peering.Name] = maps.SliceOfKeys(m) + } + } + + return maxIdx, out, nil +} + func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) { maxIdx := peering.ModifyIndex diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 5538aab5be..c047da48f9 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -113,6 +113,12 @@ func CacheTrustBundleList(c *cache.Cache) proxycfg.TrustBundleList { return &cacheProxyDataSource[*pbpeering.TrustBundleListByServiceRequest]{c, cachetype.TrustBundleListName} } +// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices +// interface by sourcing data from the agent cache. +func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices { + return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ExportedPeeredServicesName} +} + // cacheProxyDataSource implements a generic wrapper around the agent cache to // provide data to the proxycfg.Manager. type cacheProxyDataSource[ReqType cache.Request] struct { diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go index c454c2052e..5e2da0ad26 100644 --- a/agent/proxycfg/data_sources.go +++ b/agent/proxycfg/data_sources.go @@ -86,6 +86,10 @@ type DataSources struct { // peered clusters that the given proxy is exported to. TrustBundleList TrustBundleList + // ExportedPeeredServices provides updates about the list of all exported + // services in a datacenter on a notification channel. + ExportedPeeredServices ExportedPeeredServices + DataSourcesEnterprise } @@ -195,3 +199,9 @@ type TrustBundle interface { type TrustBundleList interface { Notify(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest, correlationID string, ch chan<- UpdateEvent) error } + +// ExportedPeeredServices is the interface used to consume updates about the +// list of all services exported to peers in a datacenter. +type ExportedPeeredServices interface { + Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error +} diff --git a/agent/proxycfg/mesh_gateway.go b/agent/proxycfg/mesh_gateway.go index 45b80b5524..725c1c0c70 100644 --- a/agent/proxycfg/mesh_gateway.go +++ b/agent/proxycfg/mesh_gateway.go @@ -3,10 +3,12 @@ package proxycfg import ( "context" "fmt" + "sort" "strings" "time" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/maps" "github.com/hashicorp/consul/logging" ) @@ -67,12 +69,26 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er return snap, err } + // Watch for all exported services from this mesh gateway's partition in any peering. + err = s.dataSources.ExportedPeeredServices.Notify(ctx, &structs.DCSpecificRequest{ + Datacenter: s.source.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.token}, + Source: *s.source, + EnterpriseMeta: s.proxyID.EnterpriseMeta, + }, exportedServiceListWatchID, s.ch) + if err != nil { + return snap, err + } + snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc) snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc) snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes) snap.MeshGateway.GatewayGroups = make(map[string]structs.CheckServiceNodes) snap.MeshGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry) snap.MeshGateway.HostnameDatacenters = make(map[string]structs.CheckServiceNodes) + snap.MeshGateway.ExportedServicesWithPeers = make(map[structs.ServiceName][]string) + snap.MeshGateway.DiscoveryChain = make(map[structs.ServiceName]*structs.CompiledDiscoveryChain) + snap.MeshGateway.WatchedDiscoveryChains = make(map[structs.ServiceName]context.CancelFunc) // there is no need to initialize the map of service resolvers as we // fully rebuild it every time we get updates @@ -295,6 +311,80 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn snap.MeshGateway.ConsulServers = resp.Nodes + case exportedServiceListWatchID: + exportedServices, ok := u.Result.(*structs.IndexedExportedServiceList) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + + seenServices := make(map[structs.ServiceName][]string) // svc -> peername slice + for peerName, services := range exportedServices.Services { + for _, svc := range services { + seenServices[svc] = append(seenServices[svc], peerName) + } + } + // Sort the peer names so ultimately xDS has a stable output. + for svc := range seenServices { + sort.Strings(seenServices[svc]) + } + peeredServiceList := maps.SliceOfKeys(seenServices) + structs.ServiceList(peeredServiceList).Sort() + + snap.MeshGateway.ExportedServicesSlice = peeredServiceList + snap.MeshGateway.ExportedServicesWithPeers = seenServices + snap.MeshGateway.WatchedExportedServices = exportedServices.Services + snap.MeshGateway.WatchedExportedServicesSet = true + + // For each service that we should be exposing, also watch disco chains + // in the same manner as an ingress gateway would. + + for _, svc := range snap.MeshGateway.ExportedServicesSlice { + if _, ok := snap.MeshGateway.WatchedDiscoveryChains[svc]; ok { + continue + } + + ctx, cancel := context.WithCancel(ctx) + err := s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{ + Datacenter: s.source.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.token}, + Name: svc.Name, + EvaluateInDatacenter: s.source.Datacenter, + EvaluateInNamespace: svc.NamespaceOrDefault(), + EvaluateInPartition: svc.PartitionOrDefault(), + }, "discovery-chain:"+svc.String(), s.ch) + if err != nil { + meshLogger.Error("failed to register watch for discovery chain", + "service", svc.String(), + "error", err, + ) + cancel() + return err + } + + snap.MeshGateway.WatchedDiscoveryChains[svc] = cancel + } + + // Clean up data from services that were not in the update + + for svc, cancelFn := range snap.MeshGateway.WatchedDiscoveryChains { + if _, ok := seenServices[svc]; !ok { + cancelFn() + delete(snap.MeshGateway.WatchedDiscoveryChains, svc) + } + } + + // These entries are intentionally handled separately from the + // WatchedDiscoveryChains above. There have been situations where a + // discovery watch was cancelled, then fired. That update event then + // re-populated the DiscoveryChain map entry, which wouldn't get + // cleaned up since there was no known watch for it. + + for svc := range snap.MeshGateway.DiscoveryChain { + if _, ok := seenServices[svc]; !ok { + delete(snap.MeshGateway.DiscoveryChain, svc) + } + } + default: switch { case strings.HasPrefix(u.CorrelationID, "connect-service:"): @@ -330,6 +420,24 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn ) } + case strings.HasPrefix(u.CorrelationID, "discovery-chain:"): + resp, ok := u.Result.(*structs.DiscoveryChainResponse) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + svcString := strings.TrimPrefix(u.CorrelationID, "discovery-chain:") + svc := structs.ServiceNameFromString(svcString) + + if !snap.MeshGateway.IsServiceExported(svc) { + delete(snap.MeshGateway.DiscoveryChain, svc) + s.logger.Trace("discovery-chain watch fired for unknown service", "service", svc) + return nil + } + + snap.MeshGateway.DiscoveryChain[svc] = resp.Chain + + // TODO(peering): we need to do this if we are going to setup a cross-partition or cross-datacenter target + default: if err := s.handleEntUpdate(meshLogger, ctx, u, snap); err != nil { return err diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 02cdf0580e..1bcf05f886 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -331,6 +331,33 @@ type configSnapshotMeshGateway struct { // HostnameDatacenters is a map of datacenters to mesh gateway instances with a hostname as the address. // If hostnames are configured they must be provided to Envoy via CDS not EDS. HostnameDatacenters map[string]structs.CheckServiceNodes + + // TODO(peering): + ExportedServicesSlice []structs.ServiceName + + // TODO(peering): svc -> peername slice + ExportedServicesWithPeers map[structs.ServiceName][]string + + // TODO(peering): discard this maybe + WatchedExportedServices map[string]structs.ServiceList + + // TODO(peering): + WatchedExportedServicesSet bool + + // TODO(peering): + DiscoveryChain map[structs.ServiceName]*structs.CompiledDiscoveryChain + + // TODO(peering): + WatchedDiscoveryChains map[structs.ServiceName]context.CancelFunc +} + +func (c *configSnapshotMeshGateway) IsServiceExported(svc structs.ServiceName) bool { + if c == nil || len(c.ExportedServicesWithPeers) == 0 { + return false + } + + _, ok := c.ExportedServicesWithPeers[svc] + return ok } func (c *configSnapshotMeshGateway) GatewayKeys() []GatewayKey { @@ -376,7 +403,22 @@ func (c *configSnapshotMeshGateway) isEmpty() bool { len(c.GatewayGroups) == 0 && len(c.FedStateGateways) == 0 && len(c.ConsulServers) == 0 && - len(c.HostnameDatacenters) == 0 + len(c.HostnameDatacenters) == 0 && + c.isEmptyPeering() +} + +// isEmptyPeering is a test helper +func (c *configSnapshotMeshGateway) isEmptyPeering() bool { + if c == nil { + return true + } + + return len(c.ExportedServicesSlice) == 0 && + len(c.ExportedServicesWithPeers) == 0 && + len(c.WatchedExportedServices) == 0 && + !c.WatchedExportedServicesSet && + len(c.DiscoveryChain) == 0 && + len(c.WatchedDiscoveryChains) == 0 } type configSnapshotIngressGateway struct { @@ -496,7 +538,8 @@ func (s *ConfigSnapshot) Valid() bool { } } return s.Roots != nil && - (s.MeshGateway.WatchedServicesSet || len(s.MeshGateway.ServiceGroups) > 0) + (s.MeshGateway.WatchedServicesSet || len(s.MeshGateway.ServiceGroups) > 0) && + s.MeshGateway.WatchedExportedServicesSet case structs.ServiceKindIngressGateway: return s.Roots != nil && diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 7b27e1dd72..9fc5c88f66 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -37,6 +37,7 @@ const ( serviceIntentionsIDPrefix = "service-intentions:" intentionUpstreamsID = "intention-upstreams" upstreamPeerWatchIDPrefix = "upstream-peer:" + exportedServiceListWatchID = "exported-service-list" meshConfigEntryID = "mesh" svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":" preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":" diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index a449a99731..b95e41b046 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -136,6 +136,7 @@ func recordWatches(sc *stateConfig) *watchRecorder { ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr}, TrustBundle: typedWatchRecorder[*pbpeering.TrustBundleReadRequest]{wr}, TrustBundleList: typedWatchRecorder[*pbpeering.TrustBundleListByServiceRequest]{wr}, + ExportedPeeredServices: typedWatchRecorder[*structs.DCSpecificRequest]{wr}, } recordWatchesEnterprise(sc, wr) @@ -725,9 +726,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - datacentersWatchID: verifyDatacentersWatch, - serviceListWatchID: genVerifyDCSpecificWatch("dc1"), - rootsWatchID: genVerifyDCSpecificWatch("dc1"), + datacentersWatchID: verifyDatacentersWatch, + serviceListWatchID: genVerifyDCSpecificWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.False(t, snap.Valid(), "gateway without root is not valid") @@ -737,6 +739,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { { events: []UpdateEvent{ rootWatchEvent(), + { + CorrelationID: exportedServiceListWatchID, + Result: &structs.IndexedExportedServiceList{ + Services: nil, + }, + }, }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.False(t, snap.Valid(), "gateway without services is valid") @@ -786,12 +794,19 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - datacentersWatchID: verifyDatacentersWatch, - serviceListWatchID: genVerifyDCSpecificWatch("dc1"), - rootsWatchID: genVerifyDCSpecificWatch("dc1"), + datacentersWatchID: verifyDatacentersWatch, + serviceListWatchID: genVerifyDCSpecificWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"), }, events: []UpdateEvent{ rootWatchEvent(), + { + CorrelationID: exportedServiceListWatchID, + Result: &structs.IndexedExportedServiceList{ + Services: nil, + }, + }, { CorrelationID: serviceListWatchID, Result: &structs.IndexedServiceList{ diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 77eb84eb08..f032a86e77 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -721,6 +721,7 @@ func testConfigSnapshotFixture( ServiceList: &noopDataSource[*structs.DCSpecificRequest]{}, TrustBundle: &noopDataSource[*pbpeering.TrustBundleReadRequest]{}, TrustBundleList: &noopDataSource[*pbpeering.TrustBundleListByServiceRequest]{}, + ExportedPeeredServices: &noopDataSource[*structs.DCSpecificRequest]{}, }, dnsConfig: DNSConfig{ // TODO: make configurable Domain: "consul", diff --git a/agent/proxycfg/testing_mesh_gateway.go b/agent/proxycfg/testing_mesh_gateway.go index c5baaea2f3..270816127e 100644 --- a/agent/proxycfg/testing_mesh_gateway.go +++ b/agent/proxycfg/testing_mesh_gateway.go @@ -5,7 +5,10 @@ import ( "time" "github.com/mitchellh/go-testing-interface" + "github.com/stretchr/testify/assert" + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" ) @@ -20,6 +23,50 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st switch variant { case "default": + case "peered-services": + var ( + fooSN = structs.NewServiceName("foo", nil) + barSN = structs.NewServiceName("bar", nil) + girSN = structs.NewServiceName("gir", nil) + + fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + girChain = discoverychain.TestCompileConfigEntries(t, "gir", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + ) + + assert.True(t, fooChain.Default) + assert.True(t, barChain.Default) + assert.True(t, girChain.Default) + + extraUpdates = append(extraUpdates, + UpdateEvent{ + CorrelationID: exportedServiceListWatchID, + Result: &structs.IndexedExportedServiceList{ + Services: map[string]structs.ServiceList{ + "peer1": []structs.ServiceName{fooSN, barSN}, + "peer2": []structs.ServiceName{girSN}, + }, + }, + }, + UpdateEvent{ + CorrelationID: "discovery-chain:" + fooSN.String(), + Result: &structs.DiscoveryChainResponse{ + Chain: fooChain, + }, + }, + UpdateEvent{ + CorrelationID: "discovery-chain:" + barSN.String(), + Result: &structs.DiscoveryChainResponse{ + Chain: barChain, + }, + }, + UpdateEvent{ + CorrelationID: "discovery-chain:" + girSN.String(), + Result: &structs.DiscoveryChainResponse{ + Chain: girChain, + }, + }, + ) case "federation-states": populateServices = true useFederationStates = true @@ -257,6 +304,12 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st CorrelationID: rootsWatchID, Result: roots, }, + { + CorrelationID: exportedServiceListWatchID, + Result: &structs.IndexedExportedServiceList{ + Services: nil, + }, + }, { CorrelationID: serviceListWatchID, Result: &structs.IndexedServiceList{ diff --git a/agent/rpc/peering/subscription_manager.go b/agent/rpc/peering/subscription_manager.go index 0c00a2ca75..42409fd14b 100644 --- a/agent/rpc/peering/subscription_manager.go +++ b/agent/rpc/peering/subscription_manager.go @@ -181,6 +181,13 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti // skip checks since we just generated one from scratch } + // Scrub raft indexes + for _, instance := range csn.Nodes { + instance.Node.RaftIndex = nil + instance.Service.RaftIndex = nil + // skip checks since we just generated one from scratch + } + id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService) // Just ferry this one directly along to the destination. diff --git a/agent/structs/peering.go b/agent/structs/peering.go index 1cf74bdaad..70a59c486a 100644 --- a/agent/structs/peering.go +++ b/agent/structs/peering.go @@ -8,6 +8,11 @@ type PeeringToken struct { PeerID string } +type IndexedExportedServiceList struct { + Services map[string]ServiceList + QueryMeta +} + // NOTE: this is not serialized via msgpack so it can be changed without concern. type ExportedServiceList struct { // Services is a list of exported services that apply to both standard diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index bcbf28063a..2c66ba3a0e 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -1371,6 +1371,59 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int, l := makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_UNSPECIFIED) l.ListenerFilters = []*envoy_listener_v3.ListenerFilter{tlsInspector} + // Add in TCP filter chains for plain peered passthrough. + // + // TODO(peering): make this work for L7 as well + // TODO(peering): make failover work + for _, svc := range cfgSnap.MeshGateway.ExportedServicesSlice { + peerNames, ok := cfgSnap.MeshGateway.ExportedServicesWithPeers[svc] + if !ok { + continue // not possible + } + chain, ok := cfgSnap.MeshGateway.DiscoveryChain[svc] + if !ok { + continue // ignore; not ready + } + + if structs.IsProtocolHTTPLike(chain.Protocol) { + continue // temporary skip + } + + target, err := simpleChainTarget(chain) + if err != nil { + return nil, err + } + clusterName := CustomizeClusterName(target.Name, chain) + + filterName := fmt.Sprintf("%s.%s.%s.%s", chain.ServiceName, chain.Namespace, chain.Partition, chain.Datacenter) + + dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_local_peered.") + if err != nil { + return nil, err + } + + var peeredServerNames []string + for _, peerName := range peerNames { + peeredSNI := connect.PeeredServiceSNI( + svc.Name, + svc.NamespaceOrDefault(), + svc.PartitionOrDefault(), + peerName, + cfgSnap.Roots.TrustDomain, + ) + peeredServerNames = append(peeredServerNames, peeredSNI) + } + + l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{ + FilterChainMatch: &envoy_listener_v3.FilterChainMatch{ + ServerNames: peeredServerNames, + }, + Filters: []*envoy_listener_v3.Filter{ + dcTCPProxy, + }, + }) + } + // We need 1 Filter Chain per remote cluster keys := cfgSnap.MeshGateway.GatewayKeys() for _, key := range keys { diff --git a/agent/xds/listeners_test.go b/agent/xds/listeners_test.go index 39d779d215..d1c3cdf165 100644 --- a/agent/xds/listeners_test.go +++ b/agent/xds/listeners_test.go @@ -476,6 +476,12 @@ func TestListenersFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotMeshGateway(t, "no-services", nil, nil) }, }, + { + name: "mesh-gateway-with-exported-peered-services", + create: func(t testinf.T) *proxycfg.ConfigSnapshot { + return proxycfg.TestConfigSnapshotMeshGateway(t, "peered-services", nil, nil) + }, + }, { name: "mesh-gateway-tagged-addresses", create: func(t testinf.T) *proxycfg.ConfigSnapshot { diff --git a/agent/xds/testdata/listeners/mesh-gateway-with-exported-peered-services.latest.golden b/agent/xds/testdata/listeners/mesh-gateway-with-exported-peered-services.latest.golden new file mode 100644 index 0000000000..287bc4aa5f --- /dev/null +++ b/agent/xds/testdata/listeners/mesh-gateway-with-exported-peered-services.latest.golden @@ -0,0 +1,147 @@ +{ + "versionInfo": "00000001", + "resources": [ + { + "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", + "name": "default:1.2.3.4:8443", + "address": { + "socketAddress": { + "address": "1.2.3.4", + "portValue": 8443 + } + }, + "filterChains": [ + { + "filterChainMatch": { + "serverNames": [ + "bar.default.default.peer1.external.11111111-2222-3333-4444-555555555555.consul" + ] + }, + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "mesh_gateway_local_peered.bar.default.default.dc1", + "cluster": "bar.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + }, + { + "filterChainMatch": { + "serverNames": [ + "foo.default.default.peer1.external.11111111-2222-3333-4444-555555555555.consul" + ] + }, + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "mesh_gateway_local_peered.foo.default.default.dc1", + "cluster": "foo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + }, + { + "filterChainMatch": { + "serverNames": [ + "gir.default.default.peer2.external.11111111-2222-3333-4444-555555555555.consul" + ] + }, + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "mesh_gateway_local_peered.gir.default.default.dc1", + "cluster": "gir.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + }, + { + "filterChainMatch": { + "serverNames": [ + "*.dc2.internal.11111111-2222-3333-4444-555555555555.consul" + ] + }, + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "mesh_gateway_remote.default.dc2", + "cluster": "dc2.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + }, + { + "filterChainMatch": { + "serverNames": [ + "*.dc4.internal.11111111-2222-3333-4444-555555555555.consul" + ] + }, + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "mesh_gateway_remote.default.dc4", + "cluster": "dc4.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + }, + { + "filterChainMatch": { + "serverNames": [ + "*.dc6.internal.11111111-2222-3333-4444-555555555555.consul" + ] + }, + "filters": [ + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "mesh_gateway_remote.default.dc6", + "cluster": "dc6.internal.11111111-2222-3333-4444-555555555555.consul" + } + } + ] + }, + { + "filters": [ + { + "name": "envoy.filters.network.sni_cluster", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.sni_cluster.v3.SniCluster" + } + }, + { + "name": "envoy.filters.network.tcp_proxy", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", + "statPrefix": "mesh_gateway_local.default", + "cluster": "" + } + } + ] + } + ], + "listenerFilters": [ + { + "name": "envoy.filters.listener.tls_inspector", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector" + } + } + ] + } + ], + "typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener", + "nonce": "00000001" +} \ No newline at end of file