support querying upstreams/downstreams from registrations

pull/8772/head
freddygv 2020-09-27 12:38:32 -06:00
parent a86cf88a4a
commit b012d8374e
3 changed files with 853 additions and 22 deletions

View File

@ -12,11 +12,13 @@ import (
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/mitchellh/copystructure"
) )
const ( const (
servicesTableName = "services" servicesTableName = "services"
gatewayServicesTableName = "gateway-services" gatewayServicesTableName = "gateway-services"
topologyTableName = "mesh-topology"
// serviceLastExtinctionIndexName keeps track of the last raft index when the last instance // serviceLastExtinctionIndexName keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services. // of any service was unregistered. This is used by blocking queries on missing services.
@ -103,6 +105,47 @@ func gatewayServicesTableNameSchema() *memdb.TableSchema {
} }
} }
// topologyTableNameSchema returns a new table schema used to store information
// relating upstream and downstream services
func topologyTableNameSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: topologyTableName,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&ServiceNameIndex{
Field: "Upstream",
},
&ServiceNameIndex{
Field: "Downstream",
},
},
},
},
"upstream": {
Name: "upstream",
AllowMissing: true,
Unique: false,
Indexer: &ServiceNameIndex{
Field: "Upstream",
},
},
"downstream": {
Name: "downstream",
AllowMissing: false,
Unique: false,
Indexer: &ServiceNameIndex{
Field: "Downstream",
},
},
},
}
}
type ServiceNameIndex struct { type ServiceNameIndex struct {
Field string Field string
} }
@ -164,6 +207,7 @@ func init() {
registerSchema(servicesTableSchema) registerSchema(servicesTableSchema)
registerSchema(checksTableSchema) registerSchema(checksTableSchema)
registerSchema(gatewayServicesTableNameSchema) registerSchema(gatewayServicesTableNameSchema)
registerSchema(topologyTableNameSchema)
} }
const ( const (
@ -782,10 +826,16 @@ func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, sv
} }
// Check if this service is covered by a gateway's wildcard specifier // Check if this service is covered by a gateway's wildcard specifier
err = checkGatewayWildcardsAndUpdate(tx, idx, svc) if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
if err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err) return fmt.Errorf("failed updating gateway mapping: %s", err)
} }
// Update upstream/downstream mappings if it's a connect service
// TODO (freddy) What to do about Connect native services that don't define upstreams?
if svc.Kind == structs.ServiceKindConnectProxy {
if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil {
return fmt.Errorf("failed updating upstream/downstream association")
}
}
// Create the service node entry and populate the indexes. Note that // Create the service node entry and populate the indexes. Note that
// conversion doesn't populate any of the node-specific information. // conversion doesn't populate any of the node-specific information.
@ -1485,9 +1535,14 @@ func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string
} }
svc := service.(*structs.ServiceNode) svc := service.(*structs.ServiceNode)
name := svc.CompoundServiceName()
if err := catalogUpdateServiceKindIndexes(tx, svc.ServiceKind, idx, &svc.EnterpriseMeta); err != nil { if err := catalogUpdateServiceKindIndexes(tx, svc.ServiceKind, idx, &svc.EnterpriseMeta); err != nil {
return err return err
} }
if err := cleanupMeshTopology(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil { if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil {
if remainingService != nil { if remainingService != nil {
@ -1508,26 +1563,8 @@ func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string
if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta); err != nil { if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta); err != nil {
return err return err
} }
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
// Clean up association between service name and gateways if needed return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
}
for mapping := gateways.Next(); mapping != nil; mapping = gateways.Next() {
if gs, ok := mapping.(*structs.GatewayService); ok && gs != nil {
// Only delete if association was created by a wildcard specifier.
// Otherwise the service was specified in the config entry, and the association should be maintained
// for when the service is re-registered
if gs.FromWildcard {
if err := tx.Delete(gatewayServicesTableName, gs); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
}
}
} }
} }
} else { } else {
@ -2702,6 +2739,30 @@ func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeServic
return nil return nil
} }
func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) error {
// Clean up association between service name and gateways if needed
gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
}
for mapping := gateways.Next(); mapping != nil; mapping = gateways.Next() {
if gs, ok := mapping.(*structs.GatewayService); ok && gs != nil {
// Only delete if association was created by a wildcard specifier.
// Otherwise the service was specified in the config entry, and the association should be maintained
// for when the service is re-registered
if gs.FromWildcard {
if err := tx.Delete(gatewayServicesTableName, gs); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
}
}
}
return nil
}
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up // serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
// all the gateways mapped to this service. // all the gateways mapped to this service.
func serviceGateways(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { func serviceGateways(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
@ -2820,3 +2881,166 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi
return idx, svc.Protocol == protocol, nil return idx, svc.Protocol == protocol, nil
} }
// upstreamsFromRegistration returns the ServiceNames of the upstreams defined across instances of the input
func upstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
return linkedFromRegistration(ws, tx, sn, false)
}
// downstreamsFromRegistration returns the ServiceNames of downstream services based on registrations across instances of the input
func downstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
return linkedFromRegistration(ws, tx, sn, true)
}
func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) {
// To fetch upstreams we query services that have the input listed as a downstream
// To fetch downstreams we query services that have the input listed as an upstream
index := "downstream"
if downstreams {
index = "upstream"
}
iter, err := tx.Get(topologyTableName, index, sn)
if err != nil {
return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
}
ws.Add(iter.WatchCh())
var (
idx uint64
resp []structs.ServiceName
)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(*structs.UpstreamDownstream)
if entry.ModifyIndex > idx {
idx = entry.ModifyIndex
}
linked := entry.Upstream
if downstreams {
linked = entry.Downstream
}
resp = append(resp, linked)
}
// TODO (freddy) This needs a tombstone to avoid the index sliding back on mapping deletion
// Using the table index here means that blocking queries will wake up more often than they should
tableIdx := maxIndexTxn(tx, topologyTableName)
if tableIdx > idx {
idx = tableIdx
}
return idx, resp, nil
}
// updateMeshTopology creates associations between the input service and its upstreams in the topology table
func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error {
oldUpstreams := make(map[structs.ServiceName]bool)
if e, ok := existing.(*structs.ServiceNode); ok {
for _, u := range e.ServiceProxy.Upstreams {
upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace)
sn := structs.NewServiceName(u.DestinationName, &upstreamMeta)
oldUpstreams[sn] = true
}
}
// Despite the name "destination", this service name is downstream of the proxy
downstream := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta)
inserted := make(map[structs.ServiceName]bool)
for _, u := range svc.Proxy.Upstreams {
upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace)
upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta)
obj, err := tx.First(topologyTableName, "id", upstream, downstream)
if err != nil {
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
}
sid := svc.CompoundServiceID()
uid := structs.UniqueID(node, sid.String())
var mapping *structs.UpstreamDownstream
if existing, ok := obj.(*structs.UpstreamDownstream); ok {
rawCopy, err := copystructure.Copy(existing)
if err != nil {
return fmt.Errorf("failed to copy existing topology mapping: %v", err)
}
mapping, ok = rawCopy.(*structs.UpstreamDownstream)
if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy)
}
mapping.Refs[uid] = true
mapping.ModifyIndex = idx
inserted[upstream] = true
}
if mapping == nil {
mapping = &structs.UpstreamDownstream{
Upstream: upstream,
Downstream: downstream,
Refs: map[string]bool{uid: true},
RaftIndex: structs.RaftIndex{
CreateIndex: idx,
ModifyIndex: idx,
},
}
}
if err := tx.Insert(topologyTableName, mapping); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
inserted[upstream] = true
}
for u := range oldUpstreams {
if !inserted[u] {
if _, err := tx.DeleteAll(topologyTableName, "id", u, downstream); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
}
}
return nil
}
// cleanupMeshTopology removes a service from the mesh topology table
// This is only safe to call when there are no more known instances of this proxy
func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) error {
if service.ServiceKind != structs.ServiceKindConnectProxy {
return nil
}
sn := structs.NewServiceName(service.ServiceProxy.DestinationServiceName, &service.EnterpriseMeta)
sid := service.CompoundServiceID()
uid := structs.UniqueID(service.Node, sid.String())
iter, err := tx.Get(topologyTableName, "downstream", sn)
if err != nil {
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(*structs.UpstreamDownstream)
rawCopy, err := copystructure.Copy(entry)
if err != nil {
return fmt.Errorf("failed to copy existing topology mapping: %v", err)
}
copy, ok := rawCopy.(*structs.UpstreamDownstream)
if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy)
}
delete(copy.Refs, uid)
if len(copy.Refs) == 0 {
if err := tx.Delete(topologyTableName, entry); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
}
}
return nil
}

View File

@ -6114,3 +6114,596 @@ func TestStateStore_DumpGatewayServices(t *testing.T) {
assert.Len(t, out, 0) assert.Len(t, out, 0)
}) })
} }
func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
type expect struct {
idx uint64
names []structs.ServiceName
}
s := testStateStore(t)
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
defaultMeta := structs.DefaultEnterpriseMeta()
admin := structs.NewServiceName("admin", defaultMeta)
cache := structs.NewServiceName("cache", defaultMeta)
// Watch should fire since the admin <-> web-proxy pairing was inserted into the topology table
ws := memdb.NewWatchSet()
tx := s.db.ReadTxn()
idx, names, err := downstreamsFromRegistration(ws, tx, admin)
require.NoError(t, err)
assert.Zero(t, idx)
assert.Len(t, names, 0)
svc := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(1, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistration(ws, tx, admin)
require.NoError(t, err)
exp := expect{
idx: 1,
names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now replace the admin upstream to verify watch fires and mapping is removed
svc.Proxy.Upstreams = structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "not-admin",
},
structs.Upstream{
DestinationName: "cache",
},
}
require.NoError(t, s.EnsureService(2, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistration(ws, tx, admin)
require.NoError(t, err)
exp = expect{
// Expect index where the upstream was replaced
idx: 2,
}
require.Equal(t, exp.idx, idx)
require.Empty(t, exp.names)
// Should still be able to get downstream for one of the other upstreams
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistration(ws, tx, cache)
require.NoError(t, err)
exp = expect{
idx: 2,
names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now delete the web-proxy service and the result should be empty
require.NoError(t, s.DeleteService(3, "foo", "web-proxy", defaultMeta))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistration(ws, tx, cache)
require.NoError(t, err)
exp = expect{
// Expect deletion index
idx: 3,
}
require.Equal(t, exp.idx, idx)
require.Empty(t, exp.names)
}
func TestCatalog_catalogDownstreams(t *testing.T) {
defaultMeta := structs.DefaultEnterpriseMeta()
type expect struct {
idx uint64
names []structs.ServiceName
}
tt := []struct {
name string
services []*structs.NodeService
expect expect
}{
{
name: "single proxy with multiple upstreams",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
expect: expect{
idx: 1,
names: []structs.ServiceName{
{Name: "api", EnterpriseMeta: *defaultMeta},
},
},
},
{
name: "multiple proxies with multiple upstreams",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
expect: expect{
idx: 2,
names: []structs.ServiceName{
{Name: "api", EnterpriseMeta: *defaultMeta},
{Name: "web", EnterpriseMeta: *defaultMeta},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
ws := memdb.NewWatchSet()
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
var i uint64 = 1
for _, svc := range tc.services {
require.NoError(t, s.EnsureService(i, "foo", svc))
i++
}
tx := s.db.ReadTxn()
idx, names, err := downstreamsFromRegistration(ws, tx, structs.NewServiceName("admin", structs.DefaultEnterpriseMeta()))
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.names, names)
})
}
}
func TestCatalog_upstreamsFromRegistration(t *testing.T) {
defaultMeta := structs.DefaultEnterpriseMeta()
type expect struct {
idx uint64
names []structs.ServiceName
}
tt := []struct {
name string
services []*structs.NodeService
expect expect
}{
{
name: "single proxy with multiple upstreams",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
expect: expect{
idx: 1,
names: []structs.ServiceName{
{Name: "cache", EnterpriseMeta: *defaultMeta},
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "admin", EnterpriseMeta: *defaultMeta},
},
},
},
{
name: "multiple proxies with multiple upstreams",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy-2",
Service: "api-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "new-admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "different-api-proxy",
Service: "different-api-proxy",
Address: "127.0.0.4",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "elasticache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.3",
Port: 80,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "billing",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
expect: expect{
idx: 4,
names: []structs.ServiceName{
{Name: "cache", EnterpriseMeta: *defaultMeta},
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "admin", EnterpriseMeta: *defaultMeta},
{Name: "new-admin", EnterpriseMeta: *defaultMeta},
{Name: "elasticache", EnterpriseMeta: *defaultMeta},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
ws := memdb.NewWatchSet()
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
var i uint64 = 1
for _, svc := range tc.services {
require.NoError(t, s.EnsureService(i, "foo", svc))
i++
}
tx := s.db.ReadTxn()
idx, names, err := upstreamsFromRegistration(ws, tx, structs.NewServiceName("api", structs.DefaultEnterpriseMeta()))
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.names, names)
})
}
}
func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
type expect struct {
idx uint64
names []structs.ServiceName
}
s := testStateStore(t)
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
defaultMeta := structs.DefaultEnterpriseMeta()
web := structs.NewServiceName("web", defaultMeta)
ws := memdb.NewWatchSet()
tx := s.db.ReadTxn()
idx, names, err := upstreamsFromRegistration(ws, tx, web)
require.NoError(t, err)
assert.Zero(t, idx)
assert.Len(t, names, 0)
// Watch should fire since the admin <-> web pairing was inserted into the topology table
svc := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(1, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web)
require.NoError(t, err)
exp := expect{
idx: 1,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "admin", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now edit the upstreams list to verify watch fires and mapping is removed
svc.Proxy.Upstreams = structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "not-admin",
},
}
require.NoError(t, s.EnsureService(2, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web)
require.NoError(t, err)
exp = expect{
// Expect index where the upstream was replaced
idx: 2,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "not-admin", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Adding a new instance with distinct upstreams should result in a list that joins both
svc = structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy-2",
Service: "web-proxy",
Address: "127.0.0.3",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "also-not-admin",
},
structs.Upstream{
DestinationName: "cache",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(3, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web)
require.NoError(t, err)
exp = expect{
idx: 3,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "not-admin", EnterpriseMeta: *defaultMeta},
{Name: "also-not-admin", EnterpriseMeta: *defaultMeta},
{Name: "cache", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now delete the web-proxy service and the result should mirror the one of the remaining instance
require.NoError(t, s.DeleteService(4, "foo", "web-proxy", defaultMeta))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web)
require.NoError(t, err)
exp = expect{
idx: 4,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "also-not-admin", EnterpriseMeta: *defaultMeta},
{Name: "cache", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now delete the last web-proxy instance and the mappings should be cleared
require.NoError(t, s.DeleteService(5, "foo", "web-proxy-2", defaultMeta))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web)
require.NoError(t, err)
exp = expect{
// Expect deletion index
idx: 5,
}
require.Equal(t, exp.idx, idx)
require.Empty(t, exp.names)
}

View File

@ -1031,6 +1031,12 @@ func (ns *NodeService) CompoundServiceName() ServiceName {
} }
} }
// UniqueID is a unique identifier for a service instance within a datacenter by encoding:
// node/namespace/service_id
func UniqueID(node string, compoundID string) string {
return fmt.Sprintf("%s/%s", node, compoundID)
}
// ServiceConnect are the shared Connect settings between all service // ServiceConnect are the shared Connect settings between all service
// definitions from the agent to the state store. // definitions from the agent to the state store.
type ServiceConnect struct { type ServiceConnect struct {
@ -2391,3 +2397,11 @@ func (r *KeyringResponses) Add(v interface{}) {
func (r *KeyringResponses) New() interface{} { func (r *KeyringResponses) New() interface{} {
return new(KeyringResponses) return new(KeyringResponses)
} }
type UpstreamDownstream struct {
Upstream ServiceName
Downstream ServiceName
Refs map[string]bool
RaftIndex
}