diff --git a/agent/consul/acl.go b/agent/consul/acl.go index 4a3e312815..7796c3756c 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -1410,7 +1410,7 @@ func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) { } // filterServiceTopology is used to filter upstreams/downstreams based on ACL rules. -// this filter is unlike other in that it also returns whether the result was filtered by ACLs +// this filter is unlike others in that it also returns whether the result was filtered by ACLs func (f *aclFilter) filterServiceTopology(topology *structs.ServiceTopology) bool { numUp := len(topology.Upstreams) numDown := len(topology.Downstreams) @@ -1418,10 +1418,7 @@ func (f *aclFilter) filterServiceTopology(topology *structs.ServiceTopology) boo f.filterCheckServiceNodes(&topology.Upstreams) f.filterCheckServiceNodes(&topology.Downstreams) - if numUp != len(topology.Upstreams) || numDown != len(topology.Downstreams) { - return true - } - return false + return numUp != len(topology.Upstreams) || numDown != len(topology.Downstreams) } // filterDatacenterCheckServiceNodes is used to filter nodes based on ACL rules. diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 534f313aa1..24cc611da8 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -830,7 +830,6 @@ func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, sv return fmt.Errorf("failed updating gateway mapping: %s", err) } // Update upstream/downstream mappings if it's a connect service - // TODO (freddy) What to do about Connect native services that don't define upstreams? if svc.Kind == structs.ServiceKindConnectProxy { if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil { return fmt.Errorf("failed updating upstream/downstream association") @@ -1541,7 +1540,7 @@ func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string return err } if err := cleanupMeshTopology(tx, idx, svc); err != nil { - return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) + return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", name.String(), err) } if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil { @@ -2914,12 +2913,22 @@ func (s *Store) ServiceTopology( dc, service string, entMeta *structs.EnterpriseMeta, ) (uint64, *structs.ServiceTopology, error) { + tx := s.db.ReadTxn() + defer tx.Abort() var ( maxIdx uint64 sn = structs.NewServiceName(service, entMeta) ) - idx, upstreamNames, err := s.UpstreamsForService(ws, dc, sn) + + idx, upstreamNames, err := s.upstreamsForServiceTxn(tx, ws, dc, sn) + if err != nil { + return 0, nil, err + } + if idx > maxIdx { + maxIdx = idx + } + idx, upstreams, err := s.combinedServiceNodesTxn(tx, ws, upstreamNames) if err != nil { return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err) } @@ -2927,29 +2936,14 @@ func (s *Store) ServiceTopology( maxIdx = idx } - var upstreams structs.CheckServiceNodes - for _, u := range upstreamNames { - // Collect both typical and connect endpoints, this allows aggregating check statuses across both - idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta) - if err != nil { - return 0, nil, fmt.Errorf("failed to get upstream nodes for %q: %v", sn.String(), err) - } - if idx > maxIdx { - maxIdx = idx - } - upstreams = append(upstreams, csn...) - - idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta) - if err != nil { - return 0, nil, fmt.Errorf("failed to get upstream connect nodes for %q: %v", sn.String(), err) - } - if idx > maxIdx { - maxIdx = idx - } - upstreams = append(upstreams, csn...) + idx, downstreamNames, err := s.downstreamsForServiceTxn(tx, ws, dc, sn) + if err != nil { + return 0, nil, err } - - idx, downstreamNames, err := s.DownstreamsForService(ws, dc, sn) + if idx > maxIdx { + maxIdx = idx + } + idx, downstreams, err := s.combinedServiceNodesTxn(tx, ws, downstreamNames) if err != nil { return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err) } @@ -2957,43 +2951,49 @@ func (s *Store) ServiceTopology( maxIdx = idx } - var downstreams structs.CheckServiceNodes - for _, u := range downstreamNames { - // Collect both typical and connect endpoints, this allows aggregating check statuses across both - idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta) - if err != nil { - return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", sn.String(), err) - } - if idx > maxIdx { - maxIdx = idx - } - downstreams = append(downstreams, csn...) - - idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta) - if err != nil { - return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", sn.String(), err) - } - if idx > maxIdx { - maxIdx = idx - } - downstreams = append(downstreams, csn...) - } - resp := &structs.ServiceTopology{ Upstreams: upstreams, Downstreams: downstreams, } - return 0, resp, nil + return maxIdx, resp, nil } -// UpstreamsForService will find all upstream services that the input could route traffic to. +// combinedServiceNodesTxn returns typical and connect endpoints for a list of services. +// This enabled aggregating checks statuses across both. +func (s *Store) combinedServiceNodesTxn(tx *txn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) { + var ( + maxIdx uint64 + resp structs.CheckServiceNodes + ) + for _, u := range names { + // Collect typical then connect instances + idx, csn, err := checkServiceNodesTxn(tx, ws, u.Name, false, &u.EnterpriseMeta) + if err != nil { + return 0, nil, err + } + if idx > maxIdx { + maxIdx = idx + } + resp = append(resp, csn...) + + idx, csn, err = checkServiceNodesTxn(tx, ws, u.Name, true, &u.EnterpriseMeta) + if err != nil { + return 0, nil, err + } + if idx > maxIdx { + maxIdx = idx + } + resp = append(resp, csn...) + } + return maxIdx, resp, nil +} + +// upstreamsForServiceTxn will find all upstream services that the input could route traffic to. // There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams. // TODO (freddy): Account for ingress gateways -func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { - tx := s.db.ReadTxn() - defer tx.Abort() - - idx, upstreams, err := upstreamsFromRegistration(ws, tx, sn) +// TODO (freddy): Account for multi-dc upstreams +func (s *Store) upstreamsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, dc string, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { + idx, upstreams, err := upstreamsFromRegistrationTxn(tx, ws, sn) if err != nil { return 0, nil, fmt.Errorf("failed to get registration upstreams for %q: %v", sn.String(), err) } @@ -3009,7 +3009,7 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.Ser ) for _, u := range upstreams { // Evaluate the targets from the upstream's discovery chain - idx, targets, err := s.discoveryChainTargets(ws, dc, u.Name, &u.EnterpriseMeta) + idx, targets, err := s.discoveryChainTargetsTxn(tx, ws, dc, u.Name, &u.EnterpriseMeta) if err != nil { return 0, nil, fmt.Errorf("failed to get discovery chain targets for %q: %v", u.String(), err) } @@ -3030,15 +3030,12 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.Ser return maxIdx, resp, nil } -// DownstreamsForService will find all downstream services that could route traffic to the input service. +// downstreamsForServiceTxn will find all downstream services that could route traffic to the input service. // There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams. // TODO (freddy): Account for ingress gateways -func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service structs.ServiceName) (uint64, []structs.ServiceName, error) { - tx := s.db.ReadTxn() - defer tx.Abort() - - // First fetch services with discovery chains that list the input as a target - idx, sources, err := s.discoveryChainSources(ws, tx, dc, service) +func (s *Store) downstreamsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, dc string, service structs.ServiceName) (uint64, []structs.ServiceName, error) { + // First fetch services that have discovery chains that eventually route to the target service + idx, sources, err := s.discoveryChainSourcesTxn(tx, ws, dc, service) if err != nil { return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err) } @@ -3053,8 +3050,8 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service stru seen = make(map[structs.ServiceName]bool) ) for _, s := range sources { - // We then follow these discovery chain sources one level down to the services defining them as an upstream. - idx, downstreams, err := downstreamsFromRegistration(ws, tx, s) + // We then follow these sources one level down to the services defining them as an upstream. + idx, downstreams, err := downstreamsFromRegistrationTxn(tx, ws, s) if err != nil { return 0, nil, fmt.Errorf("failed to get registration downstreams for %q: %v", s.String(), err) } @@ -3068,35 +3065,20 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service stru } } } - - // Also append services that directly listed the input as an upstream - idx, downstreams, err := downstreamsFromRegistration(ws, tx, service) - if err != nil { - return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", service.String(), err) - } - if idx > maxIdx { - maxIdx = idx - } - for _, d := range downstreams { - if !seen[d] { - resp = append(resp, d) - seen[d] = true - } - } return maxIdx, resp, nil } -// upstreamsFromRegistration returns the ServiceNames of the upstreams defined across instances of the input -func upstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { - return linkedFromRegistration(ws, tx, sn, false) +// upstreamsFromRegistrationTxn returns the ServiceNames of the upstreams defined across instances of the input +func upstreamsFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { + return linkedFromRegistrationTxn(tx, ws, sn, false) } -// downstreamsFromRegistration returns the ServiceNames of downstream services based on registrations across instances of the input -func downstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { - return linkedFromRegistration(ws, tx, sn, true) +// downstreamsFromRegistrationTxn returns the ServiceNames of downstream services based on registrations across instances of the input +func downstreamsFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { + return linkedFromRegistrationTxn(tx, ws, sn, true) } -func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) { +func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) { // To fetch upstreams we query services that have the input listed as a downstream // To fetch downstreams we query services that have the input listed as an upstream index := "downstream" @@ -3177,7 +3159,7 @@ func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeServi if !ok { return fmt.Errorf("unexpected topology type %T", rawCopy) } - mapping.Refs[uid] = true + mapping.Refs[uid] = struct{}{} mapping.ModifyIndex = idx inserted[upstream] = true @@ -3186,7 +3168,7 @@ func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeServi mapping = &structs.UpstreamDownstream{ Upstream: upstream, Downstream: downstream, - Refs: map[string]bool{uid: true}, + Refs: map[string]struct{}{uid: {}}, RaftIndex: structs.RaftIndex{ CreateIndex: idx, ModifyIndex: idx, diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index e0cdc40791..07d746f91a 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -6136,7 +6136,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) { // Watch should fire since the admin <-> web-proxy pairing was inserted into the topology table ws := memdb.NewWatchSet() tx := s.db.ReadTxn() - idx, names, err := downstreamsFromRegistration(ws, tx, admin) + idx, names, err := downstreamsFromRegistrationTxn(tx, ws, admin) require.NoError(t, err) assert.Zero(t, idx) assert.Len(t, names, 0) @@ -6165,7 +6165,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = downstreamsFromRegistration(ws, tx, admin) + idx, names, err = downstreamsFromRegistrationTxn(tx, ws, admin) require.NoError(t, err) exp := expect{ @@ -6194,7 +6194,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, _, err = downstreamsFromRegistration(ws, tx, admin) + idx, _, err = downstreamsFromRegistrationTxn(tx, ws, admin) require.NoError(t, err) exp = expect{ @@ -6207,7 +6207,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) { // Should still be able to get downstream for one of the other upstreams ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = downstreamsFromRegistration(ws, tx, cache) + idx, names, err = downstreamsFromRegistrationTxn(tx, ws, cache) require.NoError(t, err) exp = expect{ @@ -6225,7 +6225,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, _, err = downstreamsFromRegistration(ws, tx, cache) + idx, _, err = downstreamsFromRegistrationTxn(tx, ws, cache) require.NoError(t, err) @@ -6354,7 +6354,7 @@ func TestCatalog_catalogDownstreams(t *testing.T) { } tx := s.db.ReadTxn() - idx, names, err := downstreamsFromRegistration(ws, tx, structs.NewServiceName("admin", structs.DefaultEnterpriseMeta())) + idx, names, err := downstreamsFromRegistrationTxn(tx, ws, structs.NewServiceName("admin", structs.DefaultEnterpriseMeta())) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) @@ -6529,7 +6529,7 @@ func TestCatalog_upstreamsFromRegistration(t *testing.T) { } tx := s.db.ReadTxn() - idx, names, err := upstreamsFromRegistration(ws, tx, structs.NewServiceName("api", structs.DefaultEnterpriseMeta())) + idx, names, err := upstreamsFromRegistrationTxn(tx, ws, structs.NewServiceName("api", structs.DefaultEnterpriseMeta())) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) @@ -6556,7 +6556,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { ws := memdb.NewWatchSet() tx := s.db.ReadTxn() - idx, names, err := upstreamsFromRegistration(ws, tx, web) + idx, names, err := upstreamsFromRegistrationTxn(tx, ws, web) require.NoError(t, err) assert.Zero(t, idx) assert.Len(t, names, 0) @@ -6586,7 +6586,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = upstreamsFromRegistration(ws, tx, web) + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web) require.NoError(t, err) exp := expect{ @@ -6613,7 +6613,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = upstreamsFromRegistration(ws, tx, web) + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web) require.NoError(t, err) exp = expect{ @@ -6655,7 +6655,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = upstreamsFromRegistration(ws, tx, web) + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web) require.NoError(t, err) exp = expect{ @@ -6676,7 +6676,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = upstreamsFromRegistration(ws, tx, web) + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web) require.NoError(t, err) exp = expect{ @@ -6696,7 +6696,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, _, err = upstreamsFromRegistration(ws, tx, web) + idx, _, err = upstreamsFromRegistrationTxn(tx, ws, web) require.NoError(t, err) @@ -6860,9 +6860,12 @@ func TestCatalog_UpstreamsForService(t *testing.T) { i++ } + tx := s.db.ReadTxn() + defer tx.Abort() + ws := memdb.NewWatchSet() sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta()) - idx, names, err := s.UpstreamsForService(ws, "dc1", sn) + idx, names, err := s.upstreamsForServiceTxn(tx, ws, "dc1", sn) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) @@ -6957,9 +6960,9 @@ func TestCatalog_DownstreamsForService(t *testing.T) { expect: expect{ idx: 4, names: []structs.ServiceName{ - // get web from old-admin routing to admin and web listing old-admin as an upstream + // get web from listing admin directly as an upstream {Name: "web", EnterpriseMeta: *defaultMeta}, - // get api from listing admin directly as an upstream + // get api from old-admin routing to admin and web listing old-admin as an upstream {Name: "api", EnterpriseMeta: *defaultMeta}, }, }, @@ -6993,9 +6996,12 @@ func TestCatalog_DownstreamsForService(t *testing.T) { i++ } + tx := s.db.ReadTxn() + defer tx.Abort() + ws := memdb.NewWatchSet() sn := structs.NewServiceName("admin", structs.DefaultEnterpriseMeta()) - idx, names, err := s.DownstreamsForService(ws, "dc1", sn) + idx, names, err := s.downstreamsForServiceTxn(tx, ws, "dc1", sn) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) @@ -7003,3 +7009,130 @@ func TestCatalog_DownstreamsForService(t *testing.T) { }) } } + +func TestCatalog_DownstreamsForService_Updates(t *testing.T) { + var ( + defaultMeta = structs.DefaultEnterpriseMeta() + target = structs.NewServiceName("admin", defaultMeta) + ) + + s := testStateStore(t) + ca := &structs.CAConfiguration{ + Provider: "consul", + } + err := s.CASetConfig(1, ca) + require.NoError(t, err) + + require.NoError(t, s.EnsureNode(2, &structs.Node{ + ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c", + Node: "foo", + })) + + // Register a service with our target as an upstream, and it should show up as a downstream + web := structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Address: "127.0.0.2", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + } + require.NoError(t, s.EnsureService(3, "foo", &web)) + + ws := memdb.NewWatchSet() + tx := s.db.ReadTxn() + idx, names, err := s.downstreamsForServiceTxn(tx, ws, "dc1", target) + require.NoError(t, err) + tx.Abort() + + expect := []structs.ServiceName{ + {Name: "web", EnterpriseMeta: *defaultMeta}, + } + require.Equal(t, uint64(3), idx) + require.ElementsMatch(t, expect, names) + + // Register a service WITHOUT our target as an upstream, and the watch should not fire + api := structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "127.0.0.1", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "old-admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + } + require.NoError(t, s.EnsureService(4, "foo", &api)) + require.False(t, watchFired(ws)) + + // Update the routing so that api's upstream routes to our target and watches should fire + defaults := structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + } + require.NoError(t, defaults.Normalize()) + require.NoError(t, s.EnsureConfigEntry(5, &defaults, nil)) + + router := structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "old-admin", + Routes: []structs.ServiceRoute{ + { + Match: &structs.ServiceRouteMatch{ + HTTP: &structs.ServiceRouteHTTPMatch{ + PathExact: "/v2", + }, + }, + Destination: &structs.ServiceRouteDestination{ + Service: "admin", + }, + }, + }, + } + require.NoError(t, router.Normalize()) + require.NoError(t, s.EnsureConfigEntry(6, &router, nil)) + + // We updated a relevant config entry + require.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = s.downstreamsForServiceTxn(tx, ws, "dc1", target) + require.NoError(t, err) + tx.Abort() + + expect = []structs.ServiceName{ + // get web from listing admin directly as an upstream + {Name: "web", EnterpriseMeta: *defaultMeta}, + // get api from old-admin routing to admin and web listing old-admin as an upstream + {Name: "api", EnterpriseMeta: *defaultMeta}, + } + require.Equal(t, uint64(6), idx) + require.ElementsMatch(t, expect, names) +} diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 98c0e2430d..2a22f65296 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -374,17 +374,15 @@ var serviceGraphKinds = []string{ } // discoveryChainTargets will return a list of services listed as a target for the input's discovery chain -func (s *Store) discoveryChainTargets(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { +func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { source := structs.NewServiceName(service, entMeta) req := discoverychain.CompileRequest{ - ServiceName: source.Name, - EvaluateInNamespace: source.NamespaceOrDefault(), - - // TODO(freddy) : Should these be anything other than the known DC? + ServiceName: source.Name, + EvaluateInNamespace: source.NamespaceOrDefault(), EvaluateInDatacenter: dc, UseInDatacenter: dc, } - idx, chain, err := s.ServiceDiscoveryChain(ws, source.Name, entMeta, req) + idx, chain, err := s.serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req) if err != nil { return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err) } @@ -402,11 +400,11 @@ func (s *Store) discoveryChainTargets(ws memdb.WatchSet, dc, service string, ent return idx, resp, nil } -// discoveryChainSources will return a list of services whose discovery chains have the input service as a target -func (s *Store) discoveryChainSources(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) { - queue := []structs.ServiceName{destination} +// discoveryChainSourcesTxn will return a list of services whose discovery chains have the given service as a target +func (s *Store) discoveryChainSourcesTxn(tx ReadTxn, ws memdb.WatchSet, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) { + seenLink := map[structs.ServiceName]bool{destination: true} - seenLink := make(map[structs.ServiceName]bool) + queue := []structs.ServiceName{destination} for len(queue) > 0 { // The "link" index returns config entries that reference a service iter, err := tx.Get(configTableName, "link", queue[0].ToServiceID()) @@ -428,22 +426,20 @@ func (s *Store) discoveryChainSources(ws memdb.WatchSet, tx ReadTxn, dc string, } var ( - maxIdx uint64 + maxIdx uint64 = 1 resp []structs.ServiceName ) - // Only return the services that directly target the destination + // Only return the services that target the destination anywhere in their discovery chains. seenSource := make(map[structs.ServiceName]bool) for sn := range seenLink { req := discoverychain.CompileRequest{ - ServiceName: sn.Name, - EvaluateInNamespace: sn.NamespaceOrDefault(), - - // TODO(freddy) : Should these be anything other than the known DC? + ServiceName: sn.Name, + EvaluateInNamespace: sn.NamespaceOrDefault(), EvaluateInDatacenter: dc, UseInDatacenter: dc, } - idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, &sn.EnterpriseMeta, req) + idx, chain, err := s.serviceDiscoveryChainTxn(tx, ws, sn.Name, &sn.EnterpriseMeta, req) if err != nil { return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err) } @@ -657,8 +653,21 @@ func (s *Store) ServiceDiscoveryChain( entMeta *structs.EnterpriseMeta, req discoverychain.CompileRequest, ) (uint64, *structs.CompiledDiscoveryChain, error) { + tx := s.db.ReadTxn() + defer tx.Abort() - index, entries, err := s.readDiscoveryChainConfigEntries(ws, serviceName, nil, entMeta) + return s.serviceDiscoveryChainTxn(tx, ws, serviceName, entMeta, req) +} + +func (s *Store) serviceDiscoveryChainTxn( + tx ReadTxn, + ws memdb.WatchSet, + serviceName string, + entMeta *structs.EnterpriseMeta, + req discoverychain.CompileRequest, +) (uint64, *structs.CompiledDiscoveryChain, error) { + + index, entries, err := readDiscoveryChainConfigEntriesTxn(tx, ws, serviceName, nil, entMeta) if err != nil { return 0, nil, err } diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index 4d9bb4900e..e94e622e1e 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -1457,14 +1457,24 @@ func TestSourcesForTarget(t *testing.T) { defaultMeta := *structs.DefaultEnterpriseMeta() type expect struct { - idx uint64 - ids []structs.ServiceName + idx uint64 + names []structs.ServiceName } tt := []struct { name string entries []structs.ConfigEntry expect expect }{ + { + name: "no relevant config entries", + entries: []structs.ConfigEntry{}, + expect: expect{ + idx: 1, + names: []structs.ServiceName{ + {Name: "sink", EnterpriseMeta: defaultMeta}, + }, + }, + }, { name: "from route match", entries: []structs.ConfigEntry{ @@ -1494,8 +1504,9 @@ func TestSourcesForTarget(t *testing.T) { }, expect: expect{ idx: 2, - ids: []structs.ServiceName{ + names: []structs.ServiceName{ {Name: "web", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, }, }, }, @@ -1519,8 +1530,9 @@ func TestSourcesForTarget(t *testing.T) { }, expect: expect{ idx: 2, - ids: []structs.ServiceName{ + names: []structs.ServiceName{ {Name: "web", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, }, }, }, @@ -1547,8 +1559,9 @@ func TestSourcesForTarget(t *testing.T) { }, expect: expect{ idx: 2, - ids: []structs.ServiceName{ + names: []structs.ServiceName{ {Name: "web", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, }, }, }, @@ -1573,8 +1586,9 @@ func TestSourcesForTarget(t *testing.T) { }, expect: expect{ idx: 2, - ids: []structs.ServiceName{ + names: []structs.ServiceName{ {Name: "web", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, }, }, }, @@ -1614,9 +1628,10 @@ func TestSourcesForTarget(t *testing.T) { }, expect: expect{ idx: 3, - ids: []structs.ServiceName{ + names: []structs.ServiceName{ {Name: "source", EnterpriseMeta: defaultMeta}, {Name: "routed", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, }, }, }, @@ -1682,11 +1697,12 @@ func TestSourcesForTarget(t *testing.T) { }, expect: expect{ idx: 6, - ids: []structs.ServiceName{ + names: []structs.ServiceName{ {Name: "split", EnterpriseMeta: defaultMeta}, {Name: "failed-over", EnterpriseMeta: defaultMeta}, {Name: "redirected", EnterpriseMeta: defaultMeta}, {Name: "routed", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, }, }, }, @@ -1714,11 +1730,11 @@ func TestSourcesForTarget(t *testing.T) { defer tx.Abort() sn := structs.NewServiceName("sink", structs.DefaultEnterpriseMeta()) - idx, ids, err := s.discoveryChainSources(ws, tx, "dc1", sn) + idx, names, err := s.discoveryChainSourcesTxn(tx, ws, "dc1", sn) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) - require.ElementsMatch(t, tc.expect.ids, ids) + require.ElementsMatch(t, tc.expect.names, names) }) } } @@ -1915,7 +1931,7 @@ func TestTargetsForSource(t *testing.T) { tx := s.db.ReadTxn() defer tx.Abort() - idx, ids, err := s.discoveryChainTargets(ws, "dc1", "web", nil) + idx, ids, err := s.discoveryChainTargetsTxn(tx, ws, "dc1", "web", nil) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index f0af193b6b..4527e68f4a 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1033,6 +1033,8 @@ func (ns *NodeService) CompoundServiceName() ServiceName { // UniqueID is a unique identifier for a service instance within a datacenter by encoding: // node/namespace/service_id +// +// Note: We do not have strict character restrictions in all node names, so this should NOT be split on / to retrieve components. func UniqueID(node string, compoundID string) string { return fmt.Sprintf("%s/%s", node, compoundID) } @@ -2412,7 +2414,14 @@ func (r *KeyringResponses) New() interface{} { type UpstreamDownstream struct { Upstream ServiceName Downstream ServiceName - Refs map[string]bool + + // Upstream/Downstream pairs come from individual service registrations, and they can be updated individually. + // Refs stores the registrations that contain this pairing. + // When there are no remaining Refs, the UpstreamDownstream can be deleted. + // + // Note: This map must be treated as immutable when accessed in MemDB. + // The entire UpstreamDownstream structure must be deep copied on updates. + Refs map[string]struct{} RaftIndex } diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index 8f141d85c0..637384970d 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -255,6 +255,9 @@ RPC: return prepSummaryOutput(summaries, false), nil } +// UIServiceTopology returns the list of upstreams and downstreams for a Connect enabled service. +// - Downstreams are services that may route to the input service. +// - Upstreams are the upstreams defined in the target service's proxy registrations func (s *HTTPHandlers) UIServiceTopology(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Parse arguments args := structs.ServiceSpecificRequest{}