mirror of https://github.com/hashicorp/consul
Add func to combine up+downstream queries
parent
5c913ec312
commit
f906b94351
|
@ -2017,6 +2017,33 @@ func (s *Store) deleteCheckTxn(tx *txn, idx uint64, node string, checkID types.C
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CombinedCheckServiceNodes is used to query all nodes and checks for both typical and Connect endpoints of a service
|
||||||
|
func (s *Store) CombinedCheckServiceNodes(ws memdb.WatchSet, service structs.ServiceName) (uint64, structs.CheckServiceNodes, error) {
|
||||||
|
var (
|
||||||
|
resp structs.CheckServiceNodes
|
||||||
|
maxIdx uint64
|
||||||
|
)
|
||||||
|
idx, csn, err := s.CheckServiceNodes(ws, service.Name, &service.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", service, err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
resp = append(resp, csn...)
|
||||||
|
|
||||||
|
idx, csn, err = s.CheckConnectServiceNodes(ws, service.Name, &service.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", service, err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
resp = append(resp, csn...)
|
||||||
|
|
||||||
|
return maxIdx, resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
// CheckServiceNodes is used to query all nodes and checks for a given service.
|
// CheckServiceNodes is used to query all nodes and checks for a given service.
|
||||||
func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
|
func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
|
||||||
return s.checkServiceNodes(ws, serviceName, false, entMeta)
|
return s.checkServiceNodes(ws, serviceName, false, entMeta)
|
||||||
|
@ -2882,17 +2909,93 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi
|
||||||
return idx, svc.Protocol == protocol, nil
|
return idx, svc.Protocol == protocol, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) ServiceTopology(
|
||||||
|
ws memdb.WatchSet,
|
||||||
|
dc, service string,
|
||||||
|
entMeta *structs.EnterpriseMeta,
|
||||||
|
) (uint64, *structs.ServiceTopology, error) {
|
||||||
|
|
||||||
|
var (
|
||||||
|
maxIdx uint64
|
||||||
|
sn = structs.NewServiceName(service, entMeta)
|
||||||
|
)
|
||||||
|
idx, upstreamNames, err := s.UpstreamsForService(ws, dc, sn)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var upstreams structs.CheckServiceNodes
|
||||||
|
for _, u := range upstreamNames {
|
||||||
|
// Collect both typical and connect endpoints, this allows aggregating check statuses across both
|
||||||
|
idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get upstream nodes for %q: %v", sn.String(), err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
upstreams = append(upstreams, csn...)
|
||||||
|
|
||||||
|
idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get upstream connect nodes for %q: %v", sn.String(), err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
upstreams = append(upstreams, csn...)
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, downstreamNames, err := s.DownstreamsForService(ws, dc, sn)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var downstreams structs.CheckServiceNodes
|
||||||
|
for _, u := range downstreamNames {
|
||||||
|
// Collect both typical and connect endpoints, this allows aggregating check statuses across both
|
||||||
|
idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", sn.String(), err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
downstreams = append(downstreams, csn...)
|
||||||
|
|
||||||
|
idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", sn.String(), err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
downstreams = append(downstreams, csn...)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &structs.ServiceTopology{
|
||||||
|
Upstreams: upstreams,
|
||||||
|
Downstreams: downstreams,
|
||||||
|
}
|
||||||
|
return 0, resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
// UpstreamsForService will find all upstream services that the input could route traffic to.
|
// UpstreamsForService will find all upstream services that the input could route traffic to.
|
||||||
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
|
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
|
||||||
// TODO (freddy): Account for ingress gateways
|
// TODO (freddy): Account for ingress gateways
|
||||||
func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
|
func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
|
||||||
tx := s.db.ReadTxn()
|
tx := s.db.ReadTxn()
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
sn := structs.NewServiceName(service, entMeta)
|
|
||||||
idx, upstreams, err := upstreamsFromRegistration(ws, tx, sn)
|
idx, upstreams, err := upstreamsFromRegistration(ws, tx, sn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err)
|
return 0, nil, fmt.Errorf("failed to get registration upstreams for %q: %v", sn.String(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var maxIdx uint64
|
var maxIdx uint64
|
||||||
|
@ -2930,15 +3033,14 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc, service string, entMe
|
||||||
// DownstreamsForService will find all downstream services that could route traffic to the input service.
|
// DownstreamsForService will find all downstream services that could route traffic to the input service.
|
||||||
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
|
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
|
||||||
// TODO (freddy): Account for ingress gateways
|
// TODO (freddy): Account for ingress gateways
|
||||||
func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
|
func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service structs.ServiceName) (uint64, []structs.ServiceName, error) {
|
||||||
tx := s.db.ReadTxn()
|
tx := s.db.ReadTxn()
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// First fetch services with discovery chains that list the input as a target
|
// First fetch services with discovery chains that list the input as a target
|
||||||
sn := structs.NewServiceName(service, entMeta)
|
idx, sources, err := s.sourcesForTarget(ws, tx, dc, service)
|
||||||
idx, sources, err := s.sourcesForTarget(ws, tx, dc, service, entMeta)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", sn.String(), err)
|
return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var maxIdx uint64
|
var maxIdx uint64
|
||||||
|
@ -2954,7 +3056,7 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, ent
|
||||||
// We then follow these discovery chain sources one level down to the services defining them as an upstream.
|
// We then follow these discovery chain sources one level down to the services defining them as an upstream.
|
||||||
idx, downstreams, err := downstreamsFromRegistration(ws, tx, s)
|
idx, downstreams, err := downstreamsFromRegistration(ws, tx, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", s.String(), err)
|
return 0, nil, fmt.Errorf("failed to get registration downstreams for %q: %v", s.String(), err)
|
||||||
}
|
}
|
||||||
if idx > maxIdx {
|
if idx > maxIdx {
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
|
@ -2968,9 +3070,9 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, ent
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also append services that directly listed the input as an upstream
|
// Also append services that directly listed the input as an upstream
|
||||||
idx, downstreams, err := downstreamsFromRegistration(ws, tx, sn)
|
idx, downstreams, err := downstreamsFromRegistration(ws, tx, service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err)
|
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", service.String(), err)
|
||||||
}
|
}
|
||||||
if idx > maxIdx {
|
if idx > maxIdx {
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
|
@ -2994,7 +3096,7 @@ func downstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.Servi
|
||||||
return linkedFromRegistration(ws, tx, sn, true)
|
return linkedFromRegistration(ws, tx, sn, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) {
|
func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) {
|
||||||
// To fetch upstreams we query services that have the input listed as a downstream
|
// To fetch upstreams we query services that have the input listed as a downstream
|
||||||
// To fetch downstreams we query services that have the input listed as an upstream
|
// To fetch downstreams we query services that have the input listed as an upstream
|
||||||
index := "downstream"
|
index := "downstream"
|
||||||
|
@ -3002,7 +3104,7 @@ func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceNam
|
||||||
index = "upstream"
|
index = "upstream"
|
||||||
}
|
}
|
||||||
|
|
||||||
iter, err := tx.Get(topologyTableName, index, sn)
|
iter, err := tx.Get(topologyTableName, index, service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
|
return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6861,7 +6861,8 @@ func TestCatalog_UpstreamsForService(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ws := memdb.NewWatchSet()
|
ws := memdb.NewWatchSet()
|
||||||
idx, names, err := s.UpstreamsForService(ws, "dc1", "api", structs.DefaultEnterpriseMeta())
|
sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta())
|
||||||
|
idx, names, err := s.UpstreamsForService(ws, "dc1", sn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, tc.expect.idx, idx)
|
require.Equal(t, tc.expect.idx, idx)
|
||||||
|
@ -6993,11 +6994,12 @@ func TestCatalog_DownstreamsForService(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ws := memdb.NewWatchSet()
|
ws := memdb.NewWatchSet()
|
||||||
idx, ids, err := s.DownstreamsForService(ws, "dc1", "admin", structs.DefaultEnterpriseMeta())
|
sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta())
|
||||||
|
idx, names, err := s.DownstreamsForService(ws, "dc1", sn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, tc.expect.idx, idx)
|
require.Equal(t, tc.expect.idx, idx)
|
||||||
require.ElementsMatch(t, tc.expect.names, ids)
|
require.ElementsMatch(t, tc.expect.names, names)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -403,8 +403,7 @@ func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service stri
|
||||||
}
|
}
|
||||||
|
|
||||||
// sourcesForTarget will return a list of services whose discovery chains have the input service as a target
|
// sourcesForTarget will return a list of services whose discovery chains have the input service as a target
|
||||||
func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
|
func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
|
||||||
destination := structs.NewServiceName(service, entMeta)
|
|
||||||
queue := []structs.ServiceName{destination}
|
queue := []structs.ServiceName{destination}
|
||||||
|
|
||||||
seenLink := make(map[structs.ServiceName]bool)
|
seenLink := make(map[structs.ServiceName]bool)
|
||||||
|
@ -444,7 +443,7 @@ func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service stri
|
||||||
EvaluateInDatacenter: dc,
|
EvaluateInDatacenter: dc,
|
||||||
UseInDatacenter: dc,
|
UseInDatacenter: dc,
|
||||||
}
|
}
|
||||||
idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, entMeta, req)
|
idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, &sn.EnterpriseMeta, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err)
|
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1855,6 +1855,17 @@ type IndexedGatewayServices struct {
|
||||||
QueryMeta
|
QueryMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type IndexedServiceTopology struct {
|
||||||
|
ServiceTopology *ServiceTopology
|
||||||
|
FilteredByACLs bool
|
||||||
|
QueryMeta
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServiceTopology struct {
|
||||||
|
Upstreams CheckServiceNodes
|
||||||
|
Downstreams CheckServiceNodes
|
||||||
|
}
|
||||||
|
|
||||||
// IndexedConfigEntries has its own encoding logic which differs from
|
// IndexedConfigEntries has its own encoding logic which differs from
|
||||||
// ConfigEntryRequest as it has to send a slice of ConfigEntry.
|
// ConfigEntryRequest as it has to send a slice of ConfigEntry.
|
||||||
type IndexedConfigEntries struct {
|
type IndexedConfigEntries struct {
|
||||||
|
|
Loading…
Reference in New Issue