diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index d68082ac0c..2f449a039c 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -12,11 +12,13 @@ import ( "github.com/hashicorp/consul/types" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" + "github.com/mitchellh/copystructure" ) const ( servicesTableName = "services" gatewayServicesTableName = "gateway-services" + topologyTableName = "mesh-topology" // serviceLastExtinctionIndexName keeps track of the last raft index when the last instance // of any service was unregistered. This is used by blocking queries on missing services. @@ -103,6 +105,47 @@ func gatewayServicesTableNameSchema() *memdb.TableSchema { } } +// topologyTableNameSchema returns a new table schema used to store information +// relating upstream and downstream services +func topologyTableNameSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: topologyTableName, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &ServiceNameIndex{ + Field: "Upstream", + }, + &ServiceNameIndex{ + Field: "Downstream", + }, + }, + }, + }, + "upstream": { + Name: "upstream", + AllowMissing: true, + Unique: false, + Indexer: &ServiceNameIndex{ + Field: "Upstream", + }, + }, + "downstream": { + Name: "downstream", + AllowMissing: false, + Unique: false, + Indexer: &ServiceNameIndex{ + Field: "Downstream", + }, + }, + }, + } +} + type ServiceNameIndex struct { Field string } @@ -164,6 +207,7 @@ func init() { registerSchema(servicesTableSchema) registerSchema(checksTableSchema) registerSchema(gatewayServicesTableNameSchema) + registerSchema(topologyTableNameSchema) } const ( @@ -782,10 +826,16 @@ func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, sv } // Check if this service is covered by a gateway's wildcard specifier - err = checkGatewayWildcardsAndUpdate(tx, idx, svc) - if err != nil { + if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil { return fmt.Errorf("failed updating gateway mapping: %s", err) } + // Update upstream/downstream mappings if it's a connect service + // TODO (freddy) What to do about Connect native services that don't define upstreams? + if svc.Kind == structs.ServiceKindConnectProxy { + if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil { + return fmt.Errorf("failed updating upstream/downstream association") + } + } // Create the service node entry and populate the indexes. Note that // conversion doesn't populate any of the node-specific information. @@ -1485,9 +1535,14 @@ func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string } svc := service.(*structs.ServiceNode) + name := svc.CompoundServiceName() + if err := catalogUpdateServiceKindIndexes(tx, svc.ServiceKind, idx, &svc.EnterpriseMeta); err != nil { return err } + if err := cleanupMeshTopology(tx, idx, svc); err != nil { + return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) + } if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil { if remainingService != nil { @@ -1508,26 +1563,8 @@ func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta); err != nil { return err } - - // Clean up association between service name and gateways if needed - gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta) - if err != nil { - return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err) - } - for mapping := gateways.Next(); mapping != nil; mapping = gateways.Next() { - if gs, ok := mapping.(*structs.GatewayService); ok && gs != nil { - // Only delete if association was created by a wildcard specifier. - // Otherwise the service was specified in the config entry, and the association should be maintained - // for when the service is re-registered - if gs.FromWildcard { - if err := tx.Delete(gatewayServicesTableName, gs); err != nil { - return fmt.Errorf("failed to truncate gateway services table: %v", err) - } - if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { - return fmt.Errorf("failed updating gateway-services index: %v", err) - } - } - } + if err := cleanupGatewayWildcards(tx, idx, svc); err != nil { + return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) } } } else { @@ -2702,6 +2739,30 @@ func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeServic return nil } +func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) error { + // Clean up association between service name and gateways if needed + gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta) + if err != nil { + return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err) + } + for mapping := gateways.Next(); mapping != nil; mapping = gateways.Next() { + if gs, ok := mapping.(*structs.GatewayService); ok && gs != nil { + // Only delete if association was created by a wildcard specifier. + // Otherwise the service was specified in the config entry, and the association should be maintained + // for when the service is re-registered + if gs.FromWildcard { + if err := tx.Delete(gatewayServicesTableName, gs); err != nil { + return fmt.Errorf("failed to truncate gateway services table: %v", err) + } + if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { + return fmt.Errorf("failed updating gateway-services index: %v", err) + } + } + } + } + return nil +} + // serviceGateways returns all GatewayService entries with the given service name. This effectively looks up // all the gateways mapped to this service. func serviceGateways(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { @@ -2820,3 +2881,166 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi return idx, svc.Protocol == protocol, nil } + +// upstreamsFromRegistration returns the ServiceNames of the upstreams defined across instances of the input +func upstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { + return linkedFromRegistration(ws, tx, sn, false) +} + +// downstreamsFromRegistration returns the ServiceNames of downstream services based on registrations across instances of the input +func downstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { + return linkedFromRegistration(ws, tx, sn, true) +} + +func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) { + // To fetch upstreams we query services that have the input listed as a downstream + // To fetch downstreams we query services that have the input listed as an upstream + index := "downstream" + if downstreams { + index = "upstream" + } + + iter, err := tx.Get(topologyTableName, index, sn) + if err != nil { + return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err) + } + ws.Add(iter.WatchCh()) + + var ( + idx uint64 + resp []structs.ServiceName + ) + for raw := iter.Next(); raw != nil; raw = iter.Next() { + entry := raw.(*structs.UpstreamDownstream) + if entry.ModifyIndex > idx { + idx = entry.ModifyIndex + } + + linked := entry.Upstream + if downstreams { + linked = entry.Downstream + } + resp = append(resp, linked) + } + + // TODO (freddy) This needs a tombstone to avoid the index sliding back on mapping deletion + // Using the table index here means that blocking queries will wake up more often than they should + tableIdx := maxIndexTxn(tx, topologyTableName) + if tableIdx > idx { + idx = tableIdx + } + return idx, resp, nil +} + +// updateMeshTopology creates associations between the input service and its upstreams in the topology table +func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error { + oldUpstreams := make(map[structs.ServiceName]bool) + if e, ok := existing.(*structs.ServiceNode); ok { + for _, u := range e.ServiceProxy.Upstreams { + upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace) + sn := structs.NewServiceName(u.DestinationName, &upstreamMeta) + + oldUpstreams[sn] = true + } + } + + // Despite the name "destination", this service name is downstream of the proxy + downstream := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta) + inserted := make(map[structs.ServiceName]bool) + for _, u := range svc.Proxy.Upstreams { + upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace) + upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta) + + obj, err := tx.First(topologyTableName, "id", upstream, downstream) + if err != nil { + return fmt.Errorf("%q lookup failed: %v", topologyTableName, err) + } + sid := svc.CompoundServiceID() + uid := structs.UniqueID(node, sid.String()) + + var mapping *structs.UpstreamDownstream + if existing, ok := obj.(*structs.UpstreamDownstream); ok { + rawCopy, err := copystructure.Copy(existing) + if err != nil { + return fmt.Errorf("failed to copy existing topology mapping: %v", err) + } + mapping, ok = rawCopy.(*structs.UpstreamDownstream) + if !ok { + return fmt.Errorf("unexpected topology type %T", rawCopy) + } + mapping.Refs[uid] = true + mapping.ModifyIndex = idx + + inserted[upstream] = true + } + if mapping == nil { + mapping = &structs.UpstreamDownstream{ + Upstream: upstream, + Downstream: downstream, + Refs: map[string]bool{uid: true}, + RaftIndex: structs.RaftIndex{ + CreateIndex: idx, + ModifyIndex: idx, + }, + } + } + if err := tx.Insert(topologyTableName, mapping); err != nil { + return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err) + } + if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { + return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + } + inserted[upstream] = true + } + + for u := range oldUpstreams { + if !inserted[u] { + if _, err := tx.DeleteAll(topologyTableName, "id", u, downstream); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + } + if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { + return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + } + } + } + return nil +} + +// cleanupMeshTopology removes a service from the mesh topology table +// This is only safe to call when there are no more known instances of this proxy +func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) error { + if service.ServiceKind != structs.ServiceKindConnectProxy { + return nil + } + sn := structs.NewServiceName(service.ServiceProxy.DestinationServiceName, &service.EnterpriseMeta) + + sid := service.CompoundServiceID() + uid := structs.UniqueID(service.Node, sid.String()) + + iter, err := tx.Get(topologyTableName, "downstream", sn) + if err != nil { + return fmt.Errorf("%q lookup failed: %v", topologyTableName, err) + } + for raw := iter.Next(); raw != nil; raw = iter.Next() { + entry := raw.(*structs.UpstreamDownstream) + rawCopy, err := copystructure.Copy(entry) + if err != nil { + return fmt.Errorf("failed to copy existing topology mapping: %v", err) + } + copy, ok := rawCopy.(*structs.UpstreamDownstream) + if !ok { + return fmt.Errorf("unexpected topology type %T", rawCopy) + } + delete(copy.Refs, uid) + + if len(copy.Refs) == 0 { + if err := tx.Delete(topologyTableName, entry); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + } + if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { + return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + } + } + } + return nil +} diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index ca7340adde..cf16c98f4f 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -6114,3 +6114,596 @@ func TestStateStore_DumpGatewayServices(t *testing.T) { assert.Len(t, out, 0) }) } + +func TestCatalog_catalogDownstreams_Watches(t *testing.T) { + type expect struct { + idx uint64 + names []structs.ServiceName + } + + s := testStateStore(t) + + require.NoError(t, s.EnsureNode(0, &structs.Node{ + ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c", + Node: "foo", + })) + + defaultMeta := structs.DefaultEnterpriseMeta() + + admin := structs.NewServiceName("admin", defaultMeta) + cache := structs.NewServiceName("cache", defaultMeta) + + // Watch should fire since the admin <-> web-proxy pairing was inserted into the topology table + ws := memdb.NewWatchSet() + tx := s.db.ReadTxn() + idx, names, err := downstreamsFromRegistration(ws, tx, admin) + require.NoError(t, err) + assert.Zero(t, idx) + assert.Len(t, names, 0) + + svc := structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Address: "127.0.0.2", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + } + require.NoError(t, s.EnsureService(1, "foo", &svc)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = downstreamsFromRegistration(ws, tx, admin) + require.NoError(t, err) + + exp := expect{ + idx: 1, + names: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Now replace the admin upstream to verify watch fires and mapping is removed + svc.Proxy.Upstreams = structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "not-admin", + }, + structs.Upstream{ + DestinationName: "cache", + }, + } + require.NoError(t, s.EnsureService(2, "foo", &svc)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = downstreamsFromRegistration(ws, tx, admin) + require.NoError(t, err) + + exp = expect{ + // Expect index where the upstream was replaced + idx: 2, + } + require.Equal(t, exp.idx, idx) + require.Empty(t, exp.names) + + // Should still be able to get downstream for one of the other upstreams + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = downstreamsFromRegistration(ws, tx, cache) + require.NoError(t, err) + + exp = expect{ + idx: 2, + names: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Now delete the web-proxy service and the result should be empty + require.NoError(t, s.DeleteService(3, "foo", "web-proxy", defaultMeta)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = downstreamsFromRegistration(ws, tx, cache) + + require.NoError(t, err) + + exp = expect{ + // Expect deletion index + idx: 3, + } + require.Equal(t, exp.idx, idx) + require.Empty(t, exp.names) +} + +func TestCatalog_catalogDownstreams(t *testing.T) { + defaultMeta := structs.DefaultEnterpriseMeta() + + type expect struct { + idx uint64 + names []structs.ServiceName + } + tt := []struct { + name string + services []*structs.NodeService + expect expect + }{ + { + name: "single proxy with multiple upstreams", + services: []*structs.NodeService{ + { + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "127.0.0.1", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + }, + expect: expect{ + idx: 1, + names: []structs.ServiceName{ + {Name: "api", EnterpriseMeta: *defaultMeta}, + }, + }, + }, + { + name: "multiple proxies with multiple upstreams", + services: []*structs.NodeService{ + { + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "127.0.0.1", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + { + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Address: "127.0.0.2", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + }, + expect: expect{ + idx: 2, + names: []structs.ServiceName{ + {Name: "api", EnterpriseMeta: *defaultMeta}, + {Name: "web", EnterpriseMeta: *defaultMeta}, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + s := testStateStore(t) + ws := memdb.NewWatchSet() + + require.NoError(t, s.EnsureNode(0, &structs.Node{ + ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c", + Node: "foo", + })) + + var i uint64 = 1 + for _, svc := range tc.services { + require.NoError(t, s.EnsureService(i, "foo", svc)) + i++ + } + + tx := s.db.ReadTxn() + idx, names, err := downstreamsFromRegistration(ws, tx, structs.NewServiceName("admin", structs.DefaultEnterpriseMeta())) + require.NoError(t, err) + + require.Equal(t, tc.expect.idx, idx) + require.ElementsMatch(t, tc.expect.names, names) + }) + } +} + +func TestCatalog_upstreamsFromRegistration(t *testing.T) { + defaultMeta := structs.DefaultEnterpriseMeta() + + type expect struct { + idx uint64 + names []structs.ServiceName + } + tt := []struct { + name string + services []*structs.NodeService + expect expect + }{ + { + name: "single proxy with multiple upstreams", + services: []*structs.NodeService{ + { + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "127.0.0.1", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + }, + expect: expect{ + idx: 1, + names: []structs.ServiceName{ + {Name: "cache", EnterpriseMeta: *defaultMeta}, + {Name: "db", EnterpriseMeta: *defaultMeta}, + {Name: "admin", EnterpriseMeta: *defaultMeta}, + }, + }, + }, + { + name: "multiple proxies with multiple upstreams", + services: []*structs.NodeService{ + { + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "127.0.0.1", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + { + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy-2", + Service: "api-proxy", + Address: "127.0.0.2", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "new-admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + { + Kind: structs.ServiceKindConnectProxy, + ID: "different-api-proxy", + Service: "different-api-proxy", + Address: "127.0.0.4", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "elasticache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + { + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Address: "127.0.0.3", + Port: 80, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "billing", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + }, + expect: expect{ + idx: 4, + names: []structs.ServiceName{ + {Name: "cache", EnterpriseMeta: *defaultMeta}, + {Name: "db", EnterpriseMeta: *defaultMeta}, + {Name: "admin", EnterpriseMeta: *defaultMeta}, + {Name: "new-admin", EnterpriseMeta: *defaultMeta}, + {Name: "elasticache", EnterpriseMeta: *defaultMeta}, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + s := testStateStore(t) + ws := memdb.NewWatchSet() + + require.NoError(t, s.EnsureNode(0, &structs.Node{ + ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c", + Node: "foo", + })) + + var i uint64 = 1 + for _, svc := range tc.services { + require.NoError(t, s.EnsureService(i, "foo", svc)) + i++ + } + + tx := s.db.ReadTxn() + idx, names, err := upstreamsFromRegistration(ws, tx, structs.NewServiceName("api", structs.DefaultEnterpriseMeta())) + require.NoError(t, err) + + require.Equal(t, tc.expect.idx, idx) + require.ElementsMatch(t, tc.expect.names, names) + }) + } +} + +func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { + type expect struct { + idx uint64 + names []structs.ServiceName + } + + s := testStateStore(t) + + require.NoError(t, s.EnsureNode(0, &structs.Node{ + ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c", + Node: "foo", + })) + + defaultMeta := structs.DefaultEnterpriseMeta() + web := structs.NewServiceName("web", defaultMeta) + + ws := memdb.NewWatchSet() + tx := s.db.ReadTxn() + idx, names, err := upstreamsFromRegistration(ws, tx, web) + require.NoError(t, err) + assert.Zero(t, idx) + assert.Len(t, names, 0) + + // Watch should fire since the admin <-> web pairing was inserted into the topology table + svc := structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Address: "127.0.0.2", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + } + require.NoError(t, s.EnsureService(1, "foo", &svc)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistration(ws, tx, web) + require.NoError(t, err) + + exp := expect{ + idx: 1, + names: []structs.ServiceName{ + {Name: "db", EnterpriseMeta: *defaultMeta}, + {Name: "admin", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Now edit the upstreams list to verify watch fires and mapping is removed + svc.Proxy.Upstreams = structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "not-admin", + }, + } + require.NoError(t, s.EnsureService(2, "foo", &svc)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistration(ws, tx, web) + require.NoError(t, err) + + exp = expect{ + // Expect index where the upstream was replaced + idx: 2, + names: []structs.ServiceName{ + {Name: "db", EnterpriseMeta: *defaultMeta}, + {Name: "not-admin", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Adding a new instance with distinct upstreams should result in a list that joins both + svc = structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy-2", + Service: "web-proxy", + Address: "127.0.0.3", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "also-not-admin", + }, + structs.Upstream{ + DestinationName: "cache", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + } + require.NoError(t, s.EnsureService(3, "foo", &svc)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistration(ws, tx, web) + require.NoError(t, err) + + exp = expect{ + idx: 3, + names: []structs.ServiceName{ + {Name: "db", EnterpriseMeta: *defaultMeta}, + {Name: "not-admin", EnterpriseMeta: *defaultMeta}, + {Name: "also-not-admin", EnterpriseMeta: *defaultMeta}, + {Name: "cache", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Now delete the web-proxy service and the result should mirror the one of the remaining instance + require.NoError(t, s.DeleteService(4, "foo", "web-proxy", defaultMeta)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistration(ws, tx, web) + require.NoError(t, err) + + exp = expect{ + idx: 4, + names: []structs.ServiceName{ + {Name: "db", EnterpriseMeta: *defaultMeta}, + {Name: "also-not-admin", EnterpriseMeta: *defaultMeta}, + {Name: "cache", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Now delete the last web-proxy instance and the mappings should be cleared + require.NoError(t, s.DeleteService(5, "foo", "web-proxy-2", defaultMeta)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistration(ws, tx, web) + + require.NoError(t, err) + + exp = expect{ + // Expect deletion index + idx: 5, + } + require.Equal(t, exp.idx, idx) + require.Empty(t, exp.names) +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 907d4ef95e..b42136049b 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1031,6 +1031,12 @@ func (ns *NodeService) CompoundServiceName() ServiceName { } } +// UniqueID is a unique identifier for a service instance within a datacenter by encoding: +// node/namespace/service_id +func UniqueID(node string, compoundID string) string { + return fmt.Sprintf("%s/%s", node, compoundID) +} + // ServiceConnect are the shared Connect settings between all service // definitions from the agent to the state store. type ServiceConnect struct { @@ -2391,3 +2397,11 @@ func (r *KeyringResponses) Add(v interface{}) { func (r *KeyringResponses) New() interface{} { return new(KeyringResponses) } + +type UpstreamDownstream struct { + Upstream ServiceName + Downstream ServiceName + Refs map[string]bool + + RaftIndex +}