Update intention topology to use new table

pull/11738/head
freddygv 2021-12-02 09:06:39 -07:00
parent 55970c6ccd
commit f5b25401b3
4 changed files with 35 additions and 29 deletions

View File

@ -570,7 +570,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, services, err := state.ServiceList(ws, nil, &args.EnterpriseMeta)
index, services, err := state.ServiceList(ws, &args.EnterpriseMeta)
if err != nil {
return err
}

View File

@ -968,16 +968,14 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (ui
return idx, results, nil
}
func (s *Store) ServiceList(ws memdb.WatchSet,
include func(svc *structs.ServiceNode) bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return serviceListTxn(tx, ws, include, entMeta)
return serviceListTxn(tx, ws, entMeta)
}
func serviceListTxn(tx ReadTxn, ws memdb.WatchSet,
include func(svc *structs.ServiceNode) bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceList, error) {
idx := catalogServicesMaxIndex(tx, entMeta)
services, err := tx.Get(tableServices, indexID+"_prefix", entMeta)
@ -989,11 +987,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet,
unique := make(map[structs.ServiceName]struct{})
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
// TODO (freddy) This is a hack to exclude certain kinds.
// Need a new index to query by kind and namespace, have to coordinate with consul foundations first
if include == nil || include(svc) {
unique[svc.CompoundServiceName()] = struct{}{}
}
unique[svc.CompoundServiceName()] = struct{}{}
}
results := make(structs.ServiceList, 0, len(unique))
@ -2532,10 +2526,14 @@ func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) {
return result.String(), nil
}
func (s *Store) KindServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
func (s *Store) ServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return serviceNamesOfKindTxn(tx, ws, kind)
}
func serviceNamesOfKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
var names []*KindServiceName
iter, err := tx.Get(tableKindServiceNames, indexKindOnly, kind)
if err != nil {

View File

@ -7714,7 +7714,7 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
require.NoError(t, s.EnsureService(idx, "node1", &svc))
// Ensure the service name was stored for all of them under the appropriate kind
gotIdx, gotNames, err := s.KindServiceNamesOfKind(nil, svc.Kind)
gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, svc.Kind)
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
require.Len(t, gotNames, 1)
@ -7734,7 +7734,7 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
idx++
require.NoError(t, s.EnsureService(idx, "node1", &newIngress))
gotIdx, got, err := s.KindServiceNamesOfKind(nil, structs.ServiceKindIngressGateway)
gotIdx, got, err := s.ServiceNamesOfKind(nil, structs.ServiceKindIngressGateway)
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
@ -7779,7 +7779,7 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
idx++
require.NoError(t, s.EnsureService(idx, "node1", &newMGW))
gotIdx, _, err = s.KindServiceNamesOfKind(nil, structs.ServiceKindMeshGateway)
gotIdx, _, err = s.ServiceNamesOfKind(nil, structs.ServiceKindMeshGateway)
require.NoError(t, err)
require.Equal(t, uint64(2), gotIdx)
@ -7787,7 +7787,7 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
idx++
require.NoError(t, s.DeleteService(idx, "node1", "web", entMeta))
gotIdx, got, err = s.KindServiceNamesOfKind(nil, structs.ServiceKindTypical)
gotIdx, got, err = s.ServiceNamesOfKind(nil, structs.ServiceKindTypical)
require.NoError(t, err)
require.Equal(t, idx, gotIdx)
require.Empty(t, got)

View File

@ -995,23 +995,29 @@ func (s *Store) intentionTopologyTxn(tx ReadTxn, ws memdb.WatchSet,
maxIdx = index
}
index, allServices, err := serviceListTxn(tx, ws, func(svc *structs.ServiceNode) bool {
// Only include ingress gateways as downstreams, since they cannot receive service mesh traffic
// TODO(freddy): One remaining issue is that this includes non-Connect services (typical services without a proxy)
// Ideally those should be excluded as well, since they can't be upstreams/downstreams without a proxy.
// Maybe start tracking services represented by proxies? (both sidecar and ingress)
if svc.ServiceKind == structs.ServiceKindTypical || (svc.ServiceKind == structs.ServiceKindIngressGateway && downstreams) {
return true
}
return false
}, target.WithWildcardNamespace())
// TODO(tproxy): One remaining improvement is that this includes non-Connect services (typical services without a proxy)
// Ideally those should be excluded as well, since they can't be upstreams/downstreams without a proxy.
// Maybe narrow serviceNamesOfKindTxn to services represented by proxies? (ingress, sidecar-proxy, terminating)
index, services, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical)
if err != nil {
return index, nil, fmt.Errorf("failed to fetch catalog service list: %v", err)
return index, nil, fmt.Errorf("failed to list ingress service names: %v", err)
}
if index > maxIdx {
maxIdx = index
}
if downstreams {
// Ingress gateways can only ever be downstreams, since mesh services don't dial them.
index, ingress, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindIngressGateway)
if err != nil {
return index, nil, fmt.Errorf("failed to list ingress service names: %v", err)
}
if index > maxIdx {
maxIdx = index
}
services = append(services, ingress...)
}
// When checking authorization to upstreams, the match type for the decision is `destination` because we are deciding
// if upstream candidates are covered by intentions that have the target service as a source.
// The reverse is true for downstreams.
@ -1019,11 +1025,13 @@ func (s *Store) intentionTopologyTxn(tx ReadTxn, ws memdb.WatchSet,
if downstreams {
decisionMatchType = structs.IntentionMatchSource
}
result := make([]ServiceWithDecision, 0, len(allServices))
for _, candidate := range allServices {
result := make([]ServiceWithDecision, 0, len(services))
for _, svc := range services {
candidate := svc.Service
if candidate.Name == structs.ConsulServiceName {
continue
}
opts := IntentionDecisionOpts{
Target: candidate.Name,
Namespace: candidate.NamespaceOrDefault(),