mirror of https://github.com/hashicorp/consul
Add topology RPC endpoint
parent
98c81976f5
commit
dbbf6b2e46
|
@ -559,3 +559,289 @@ func registerTestCatalogEntriesMap(t *testing.T, codec rpc.ClientCodec, registra
|
|||
require.NoError(t, err, "Failed catalog registration %q: %v", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func registerTestTopologyEntries(t *testing.T, codec rpc.ClientCodec, token string) {
|
||||
t.Helper()
|
||||
|
||||
// api and api-proxy on node foo - upstream: web
|
||||
// web and web-proxy on node bar - upstream: redis
|
||||
// web and web-proxy on node baz - upstream: redis
|
||||
// redis and redis-proxy on node zip
|
||||
registrations := map[string]*structs.RegisterRequest{
|
||||
"Node foo": {
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"),
|
||||
Address: "127.0.0.2",
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "foo:alive",
|
||||
Name: "foo-liveness",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Service api on foo": {
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
ID: "api",
|
||||
Service: "api",
|
||||
Port: 9090,
|
||||
Address: "198.18.1.2",
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "foo:api",
|
||||
Name: "api-liveness",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "api",
|
||||
ServiceName: "api",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Service api-proxy": {
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "api-proxy",
|
||||
Service: "api-proxy",
|
||||
Port: 8443,
|
||||
Address: "198.18.1.2",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "api",
|
||||
Upstreams: structs.Upstreams{
|
||||
{
|
||||
DestinationName: "web",
|
||||
LocalBindPort: 8080,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "foo:api-proxy",
|
||||
Name: "api proxy listening",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "api-proxy",
|
||||
ServiceName: "api-proxy",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Node bar": {
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
ID: types.NodeID("c3e5fc07-3b2d-4c06-b8fc-a1a12432d459"),
|
||||
Address: "127.0.0.3",
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "bar",
|
||||
CheckID: "bar:alive",
|
||||
Name: "bar-liveness",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Service web on bar": {
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
ID: "web",
|
||||
Service: "web",
|
||||
Port: 80,
|
||||
Address: "198.18.1.20",
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "bar",
|
||||
CheckID: "bar:web",
|
||||
Name: "web-liveness",
|
||||
Status: api.HealthWarning,
|
||||
ServiceID: "web",
|
||||
ServiceName: "web",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Service web-proxy on bar": {
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "web-proxy",
|
||||
Service: "web-proxy",
|
||||
Port: 8443,
|
||||
Address: "198.18.1.20",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "web",
|
||||
Upstreams: structs.Upstreams{
|
||||
{
|
||||
DestinationName: "redis",
|
||||
LocalBindPort: 123,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "bar",
|
||||
CheckID: "bar:web-proxy",
|
||||
Name: "web proxy listening",
|
||||
Status: api.HealthCritical,
|
||||
ServiceID: "web-proxy",
|
||||
ServiceName: "web-proxy",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Node baz": {
|
||||
Datacenter: "dc1",
|
||||
Node: "baz",
|
||||
ID: types.NodeID("37ea7c44-a2a1-4764-ae28-7dfebeb54a22"),
|
||||
Address: "127.0.0.4",
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "baz",
|
||||
CheckID: "baz:alive",
|
||||
Name: "baz-liveness",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Service web on baz": {
|
||||
Datacenter: "dc1",
|
||||
Node: "baz",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
ID: "web",
|
||||
Service: "web",
|
||||
Port: 80,
|
||||
Address: "198.18.1.40",
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "baz",
|
||||
CheckID: "baz:web",
|
||||
Name: "web-liveness",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "web",
|
||||
ServiceName: "web",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Service web-proxy on baz": {
|
||||
Datacenter: "dc1",
|
||||
Node: "baz",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "web-proxy",
|
||||
Service: "web-proxy",
|
||||
Port: 8443,
|
||||
Address: "198.18.1.40",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "web",
|
||||
Upstreams: structs.Upstreams{
|
||||
{
|
||||
DestinationName: "redis",
|
||||
LocalBindPort: 123,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "baz",
|
||||
CheckID: "baz:web-proxy",
|
||||
Name: "web proxy listening",
|
||||
Status: api.HealthCritical,
|
||||
ServiceID: "web-proxy",
|
||||
ServiceName: "web-proxy",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Node zip": {
|
||||
Datacenter: "dc1",
|
||||
Node: "zip",
|
||||
ID: types.NodeID("dc49fc8c-afc7-4a87-815d-74d144535075"),
|
||||
Address: "127.0.0.5",
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "zip",
|
||||
CheckID: "zip:alive",
|
||||
Name: "zip-liveness",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Service redis on zip": {
|
||||
Datacenter: "dc1",
|
||||
Node: "zip",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Port: 6379,
|
||||
Address: "198.18.1.60",
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "zip",
|
||||
CheckID: "zip:redis",
|
||||
Name: "redis-liveness",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "redis",
|
||||
ServiceName: "redis",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
"Service redis-proxy on zip": {
|
||||
Datacenter: "dc1",
|
||||
Node: "zip",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "redis-proxy",
|
||||
Service: "redis-proxy",
|
||||
Port: 8443,
|
||||
Address: "198.18.1.60",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "redis",
|
||||
},
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "zip",
|
||||
CheckID: "zip:redis-proxy",
|
||||
Name: "redis proxy listening",
|
||||
Status: api.HealthCritical,
|
||||
ServiceID: "redis-proxy",
|
||||
ServiceName: "redis-proxy",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: token},
|
||||
},
|
||||
}
|
||||
registerTestCatalogEntriesMap(t, codec, registrations)
|
||||
}
|
||||
|
|
|
@ -144,6 +144,45 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
|
|||
})
|
||||
}
|
||||
|
||||
func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceTopology) error {
|
||||
if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
if args.ServiceName == "" {
|
||||
return fmt.Errorf("Must provide a service name")
|
||||
}
|
||||
|
||||
var authzContext acl.AuthorizerContext
|
||||
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
return m.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, topology, err := state.ServiceTopology(ws, args.Datacenter, args.ServiceName, &args.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reply.Index = index
|
||||
reply.ServiceTopology = topology
|
||||
|
||||
if err := m.srv.filterACL(args.Token, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config
|
||||
func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error {
|
||||
if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done {
|
||||
|
|
|
@ -1605,3 +1605,145 @@ service_prefix "terminating-gateway" { policy = "read" }
|
|||
}
|
||||
assert.ElementsMatch(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestInternal_ServiceTopology(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// api and api-proxy on node foo - upstream: web
|
||||
// web and web-proxy on node bar - upstream: redis
|
||||
// web and web-proxy on node baz - upstream: redis
|
||||
// redis and redis-proxy on node zip
|
||||
registerTestTopologyEntries(t, codec, "")
|
||||
|
||||
t.Run("api", func(t *testing.T) {
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "api",
|
||||
}
|
||||
var out structs.IndexedServiceTopology
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
require.False(t, out.FilteredByACLs)
|
||||
|
||||
// bar/web, bar/web-proxy, baz/web, baz/web-proxy
|
||||
require.Len(t, out.ServiceTopology.Upstreams, 4)
|
||||
require.Len(t, out.ServiceTopology.Downstreams, 0)
|
||||
})
|
||||
|
||||
t.Run("web", func(t *testing.T) {
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "web",
|
||||
}
|
||||
var out structs.IndexedServiceTopology
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
require.False(t, out.FilteredByACLs)
|
||||
|
||||
// foo/api, foo/api-proxy
|
||||
require.Len(t, out.ServiceTopology.Upstreams, 2)
|
||||
|
||||
// zip/redis, zip/redis-proxy
|
||||
require.Len(t, out.ServiceTopology.Downstreams, 2)
|
||||
})
|
||||
|
||||
t.Run("redis", func(t *testing.T) {
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "redis",
|
||||
}
|
||||
var out structs.IndexedServiceTopology
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
require.False(t, out.FilteredByACLs)
|
||||
|
||||
require.Len(t, out.ServiceTopology.Upstreams, 0)
|
||||
|
||||
// bar/web, bar/web-proxy, baz/web, baz/web-proxy
|
||||
require.Len(t, out.ServiceTopology.Downstreams, 4)
|
||||
})
|
||||
}
|
||||
|
||||
func TestInternal_ServiceTopology_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = TestDefaultMasterToken
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// api and api-proxy on node foo - upstream: web
|
||||
// web and web-proxy on node bar - upstream: redis
|
||||
// web and web-proxy on node baz - upstream: redis
|
||||
// redis and redis-proxy on node zip
|
||||
registerTestTopologyEntries(t, codec, TestDefaultMasterToken)
|
||||
|
||||
// Token grants read to: foo/api, foo/api-proxy, bar/web, baz/web
|
||||
userToken, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", `
|
||||
node_prefix "" { policy = "read" }
|
||||
service_prefix "api" { policy = "read" }
|
||||
service "web" { policy = "read" }
|
||||
`)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("api can't read web", func(t *testing.T) {
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "api",
|
||||
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
|
||||
}
|
||||
var out structs.IndexedServiceTopology
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
|
||||
require.True(t, out.FilteredByACLs)
|
||||
|
||||
// The web-proxy upstream gets filtered out from both bar and baz
|
||||
require.Len(t, out.ServiceTopology.Upstreams, 2)
|
||||
require.Equal(t, "web", out.ServiceTopology.Upstreams[0].Service.Service)
|
||||
require.Equal(t, "web", out.ServiceTopology.Upstreams[1].Service.Service)
|
||||
|
||||
require.Len(t, out.ServiceTopology.Downstreams, 0)
|
||||
})
|
||||
|
||||
t.Run("web can't read redis", func(t *testing.T) {
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "web",
|
||||
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
|
||||
}
|
||||
var out structs.IndexedServiceTopology
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
|
||||
require.True(t, out.FilteredByACLs)
|
||||
|
||||
// The redis upstream gets filtered out but the api and proxy downstream are returned
|
||||
require.Len(t, out.ServiceTopology.Upstreams, 0)
|
||||
require.Len(t, out.ServiceTopology.Downstreams, 2)
|
||||
})
|
||||
|
||||
t.Run("redis can't read self", func(t *testing.T) {
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "redis",
|
||||
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
|
||||
}
|
||||
var out structs.IndexedServiceTopology
|
||||
err := msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)
|
||||
|
||||
// Can't read self, fails fast
|
||||
require.True(t, acl.IsErrPermissionDenied(err))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3009,7 +3009,7 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.Ser
|
|||
)
|
||||
for _, u := range upstreams {
|
||||
// Evaluate the targets from the upstream's discovery chain
|
||||
idx, targets, err := s.targetsForSource(ws, tx, dc, u.Name, &u.EnterpriseMeta)
|
||||
idx, targets, err := s.discoveryChainTargets(ws, dc, u.Name, &u.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to get discovery chain targets for %q: %v", u.String(), err)
|
||||
}
|
||||
|
@ -3038,7 +3038,7 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service stru
|
|||
defer tx.Abort()
|
||||
|
||||
// First fetch services with discovery chains that list the input as a target
|
||||
idx, sources, err := s.sourcesForTarget(ws, tx, dc, service)
|
||||
idx, sources, err := s.discoveryChainSources(ws, tx, dc, service)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err)
|
||||
}
|
||||
|
|
|
@ -6194,7 +6194,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
|
|||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = downstreamsFromRegistration(ws, tx, admin)
|
||||
idx, _, err = downstreamsFromRegistration(ws, tx, admin)
|
||||
require.NoError(t, err)
|
||||
|
||||
exp = expect{
|
||||
|
@ -6225,7 +6225,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
|
|||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = downstreamsFromRegistration(ws, tx, cache)
|
||||
idx, _, err = downstreamsFromRegistration(ws, tx, cache)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -6696,7 +6696,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
|
|||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = upstreamsFromRegistration(ws, tx, web)
|
||||
idx, _, err = upstreamsFromRegistration(ws, tx, web)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -6994,7 +6994,7 @@ func TestCatalog_DownstreamsForService(t *testing.T) {
|
|||
}
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta())
|
||||
sn := structs.NewServiceName("admin", structs.DefaultEnterpriseMeta())
|
||||
idx, names, err := s.DownstreamsForService(ws, "dc1", sn)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -373,8 +373,8 @@ var serviceGraphKinds = []string{
|
|||
structs.ServiceResolver,
|
||||
}
|
||||
|
||||
// targetsForSource will return a list of services listed as a target for the input's discovery chain
|
||||
func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
|
||||
// discoveryChainTargets will return a list of services listed as a target for the input's discovery chain
|
||||
func (s *Store) discoveryChainTargets(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
|
||||
source := structs.NewServiceName(service, entMeta)
|
||||
req := discoverychain.CompileRequest{
|
||||
ServiceName: source.Name,
|
||||
|
@ -402,8 +402,8 @@ func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service stri
|
|||
return idx, resp, nil
|
||||
}
|
||||
|
||||
// sourcesForTarget will return a list of services whose discovery chains have the input service as a target
|
||||
func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
|
||||
// discoveryChainSources will return a list of services whose discovery chains have the input service as a target
|
||||
func (s *Store) discoveryChainSources(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
|
||||
queue := []structs.ServiceName{destination}
|
||||
|
||||
seenLink := make(map[structs.ServiceName]bool)
|
||||
|
|
|
@ -1713,7 +1713,8 @@ func TestSourcesForTarget(t *testing.T) {
|
|||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
idx, ids, err := s.sourcesForTarget(ws, tx, "dc1", "sink", nil)
|
||||
sn := structs.NewServiceName("sink", structs.DefaultEnterpriseMeta())
|
||||
idx, ids, err := s.discoveryChainSources(ws, tx, "dc1", sn)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, tc.expect.idx, idx)
|
||||
|
@ -1914,7 +1915,7 @@ func TestTargetsForSource(t *testing.T) {
|
|||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
idx, ids, err := s.targetsForSource(ws, tx, "dc1", "web", nil)
|
||||
idx, ids, err := s.discoveryChainTargets(ws, "dc1", "web", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, tc.expect.idx, idx)
|
||||
|
|
Loading…
Reference in New Issue