From a86cf88a4a38ebdfc492ec87dcd65cd8f80ad936 Mon Sep 17 00:00:00 2001 From: freddygv Date: Tue, 22 Sep 2020 16:05:09 -0600 Subject: [PATCH] Add method for downstreams from disco chain --- agent/consul/discovery_chain_endpoint.go | 31 +-- agent/consul/state/config_entry.go | 106 +++++++++ agent/consul/state/config_entry_test.go | 271 ++++++++++++++++++++++- 3 files changed, 379 insertions(+), 29 deletions(-) diff --git a/agent/consul/discovery_chain_endpoint.go b/agent/consul/discovery_chain_endpoint.go index a9933fa453..23f545ef02 100644 --- a/agent/consul/discovery_chain_endpoint.go +++ b/agent/consul/discovery_chain_endpoint.go @@ -1,13 +1,11 @@ package consul import ( - "errors" "fmt" "time" metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" @@ -53,39 +51,16 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name, entMeta) - if err != nil { - return err - } - - _, config, err := state.CAConfig(ws) - if err != nil { - return err - } else if config == nil { - return errors.New("no cluster ca config setup") - } - - // Build TrustDomain based on the ClusterID stored. - signingID := connect.SpiffeIDSigningForCluster(config) - if signingID == nil { - // If CA is bootstrapped at all then this should never happen but be - // defensive. - return errors.New("no cluster trust domain setup") - } - currentTrustDomain := signingID.Host() - - // Then we compile it into something useful. - chain, err := discoverychain.Compile(discoverychain.CompileRequest{ + req := discoverychain.CompileRequest{ ServiceName: args.Name, EvaluateInNamespace: entMeta.NamespaceOrDefault(), EvaluateInDatacenter: evalDC, - EvaluateInTrustDomain: currentTrustDomain, UseInDatacenter: c.srv.config.Datacenter, OverrideMeshGateway: args.OverrideMeshGateway, OverrideProtocol: args.OverrideProtocol, OverrideConnectTimeout: args.OverrideConnectTimeout, - Entries: entries, - }) + } + index, chain, err := state.ServiceDiscoveryChain(ws, args.Name, entMeta, req) if err != nil { return err } diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index f19205bbee..21107413c4 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -1,8 +1,10 @@ package state import ( + "errors" "fmt" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" @@ -371,6 +373,72 @@ var serviceGraphKinds = []string{ structs.ServiceResolver, } +// sourcesForTarget will return a list of services whose discovery chains have the input service as a target +func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { + destination := structs.NewServiceName(service, entMeta) + queue := []structs.ServiceName{destination} + + seenLink := make(map[structs.ServiceName]bool) + for len(queue) > 0 { + // The "link" index returns config entries that reference a service + iter, err := tx.Get(configTableName, "link", queue[0].ToServiceID()) + if err != nil { + return 0, nil, err + } + ws.Add(iter.WatchCh()) + + for raw := iter.Next(); raw != nil; raw = iter.Next() { + entry := raw.(structs.ConfigEntry) + + sn := structs.NewServiceName(entry.GetName(), entry.GetEnterpriseMeta()) + if !seenLink[sn] { + seenLink[sn] = true + queue = append(queue, sn) + } + } + queue = queue[1:] + } + + var ( + maxIdx uint64 + resp []structs.ServiceName + ) + + // Only return the services that directly target the destination + seenSource := make(map[structs.ServiceName]bool) + for sn, _ := range seenLink { + req := discoverychain.CompileRequest{ + ServiceName: sn.Name, + EvaluateInNamespace: sn.NamespaceOrDefault(), + + // TODO(freddy) : Should these be anything other than the known DC? + EvaluateInDatacenter: dc, + UseInDatacenter: dc, + } + idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, entMeta, req) + if err != nil { + return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err) + } + + for _, t := range chain.Targets { + em := structs.EnterpriseMetaInitializer(t.Namespace) + candidate := structs.NewServiceName(t.Service, &em) + + if !candidate.Matches(&destination) { + continue + } + if idx > maxIdx { + maxIdx = idx + } + if !seenSource[sn] { + seenSource[sn] = true + resp = append(resp, sn) + } + } + } + return maxIdx, resp, nil +} + func validateProposedConfigEntryInServiceGraph( tx ReadTxn, kind, name string, @@ -555,6 +623,44 @@ func testCompileDiscoveryChain( return chain.Protocol, chain.Nodes[chain.StartNode], nil } +func (s *Store) ServiceDiscoveryChain( + ws memdb.WatchSet, + serviceName string, + entMeta *structs.EnterpriseMeta, + req discoverychain.CompileRequest, +) (uint64, *structs.CompiledDiscoveryChain, error) { + + index, entries, err := s.readDiscoveryChainConfigEntries(ws, serviceName, nil, entMeta) + if err != nil { + return 0, nil, err + } + req.Entries = entries + + _, config, err := s.CAConfig(ws) + if err != nil { + return 0, nil, err + } else if config == nil { + return 0, nil, errors.New("no cluster ca config setup") + } + + // Build TrustDomain based on the ClusterID stored. + signingID := connect.SpiffeIDSigningForCluster(config) + if signingID == nil { + // If CA is bootstrapped at all then this should never happen but be + // defensive. + return 0, nil, errors.New("no cluster trust domain setup") + } + req.EvaluateInTrustDomain = signingID.Host() + + // Then we compile it into something useful. + chain, err := discoverychain.Compile(req) + if err != nil { + return 0, nil, fmt.Errorf("failed to compile discovery chain: %v", err) + } + + return index, chain, nil +} + // ReadDiscoveryChainConfigEntries will query for the full discovery chain for // the provided service name. All relevant config entries will be recursively // fetched and included in the result. diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index fcf7624a5b..887441b8a5 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -1246,7 +1246,7 @@ func TestStore_ReadDiscoveryChainConfigEntries_SubsetSplit(t *testing.T) { require.NoError(t, s.EnsureConfigEntry(0, entry, nil)) } - _, entrySet, err := s.ReadDiscoveryChainConfigEntries(nil, "main", nil) + _, entrySet, err := s.readDiscoveryChainConfigEntries(nil, "main", nil, nil) require.NoError(t, err) require.Len(t, entrySet.Routers, 0) @@ -1452,3 +1452,272 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) { require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil)) }) } + +func TestSourcesForTarget(t *testing.T) { + defaultMeta := *structs.DefaultEnterpriseMeta() + + type expect struct { + idx uint64 + ids []structs.ServiceName + } + tt := []struct { + name string + entries []structs.ConfigEntry + expect expect + }{ + { + name: "from route match", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "web", + Routes: []structs.ServiceRoute{ + { + Match: &structs.ServiceRouteMatch{ + HTTP: &structs.ServiceRouteHTTPMatch{ + PathExact: "/sink", + }, + }, + Destination: &structs.ServiceRouteDestination{ + Service: "sink", + }, + }, + }, + }, + }, + expect: expect{ + idx: 2, + ids: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "from redirect", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "web", + Redirect: &structs.ServiceResolverRedirect{ + Service: "sink", + }, + }, + }, + expect: expect{ + idx: 2, + ids: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "from failover", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "web", + Failover: map[string]structs.ServiceResolverFailover{ + "*": { + Service: "sink", + Datacenters: []string{"dc2", "dc3"}, + }, + }, + }, + }, + expect: expect{ + idx: 2, + ids: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "from splitter", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceSplitterConfigEntry{ + Kind: structs.ServiceSplitter, + Name: "web", + Splits: []structs.ServiceSplit{ + {Weight: 90, Service: "web"}, + {Weight: 10, Service: "sink"}, + }, + }, + }, + expect: expect{ + idx: 2, + ids: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "chained route redirect", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "source", + Routes: []structs.ServiceRoute{ + { + Match: &structs.ServiceRouteMatch{ + HTTP: &structs.ServiceRouteHTTPMatch{ + PathExact: "/route", + }, + }, + Destination: &structs.ServiceRouteDestination{ + Service: "routed", + }, + }, + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "routed", + Redirect: &structs.ServiceResolverRedirect{ + Service: "sink", + }, + }, + }, + expect: expect{ + idx: 3, + ids: []structs.ServiceName{ + {Name: "source", EnterpriseMeta: defaultMeta}, + {Name: "routed", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "kitchen sink with multiple services referencing sink directly", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "routed", + Routes: []structs.ServiceRoute{ + { + Match: &structs.ServiceRouteMatch{ + HTTP: &structs.ServiceRouteHTTPMatch{ + PathExact: "/sink", + }, + }, + Destination: &structs.ServiceRouteDestination{ + Service: "sink", + }, + }, + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "redirected", + Redirect: &structs.ServiceResolverRedirect{ + Service: "sink", + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "failed-over", + Failover: map[string]structs.ServiceResolverFailover{ + "*": { + Service: "sink", + Datacenters: []string{"dc2", "dc3"}, + }, + }, + }, + &structs.ServiceSplitterConfigEntry{ + Kind: structs.ServiceSplitter, + Name: "split", + Splits: []structs.ServiceSplit{ + {Weight: 90, Service: "no-op"}, + {Weight: 10, Service: "sink"}, + }, + }, + &structs.ServiceSplitterConfigEntry{ + Kind: structs.ServiceSplitter, + Name: "unrelated", + Splits: []structs.ServiceSplit{ + {Weight: 90, Service: "zip"}, + {Weight: 10, Service: "zop"}, + }, + }, + }, + expect: expect{ + idx: 6, + ids: []structs.ServiceName{ + {Name: "split", EnterpriseMeta: defaultMeta}, + {Name: "failed-over", EnterpriseMeta: defaultMeta}, + {Name: "redirected", EnterpriseMeta: defaultMeta}, + {Name: "routed", EnterpriseMeta: defaultMeta}, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + s := testStateStore(t) + ws := memdb.NewWatchSet() + + ca := &structs.CAConfiguration{ + Provider: "consul", + } + err := s.CASetConfig(0, ca) + require.NoError(t, err) + + var i uint64 = 1 + for _, entry := range tc.entries { + require.NoError(t, entry.Normalize()) + require.NoError(t, s.EnsureConfigEntry(i, entry, nil)) + i++ + } + + tx := s.db.ReadTxn() + defer tx.Abort() + + idx, ids, err := s.sourcesForTarget(ws, tx, "dc1", "sink", nil) + require.NoError(t, err) + + require.Equal(t, tc.expect.idx, idx) + require.ElementsMatch(t, tc.expect.ids, ids) + }) + } +}