Browse Source

peering: replicate all SpiffeID values necessary for the importing side to do SAN validation (#13612)

When traversing an exported peered service, the discovery chain
evaluation at the other side may re-route the request to a variety of
endpoints. Furthermore we intend to terminate mTLS at the mesh gateway
for arriving peered traffic that is http-like (L7), so the caller needs
to know the mesh gateway's SpiffeID in that case as well.

The following new SpiffeID values will be shipped back in the peerstream
replication:

- tcp: all possible SpiffeIDs resulting from the service-resolver
        component of the exported discovery chain

- http-like: the SpiffeID of the mesh gateway
pull/13614/head
R.B. Boyer 2 years ago committed by GitHub
parent
commit
0fa828db76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      agent/consul/state/config_entry.go
  2. 94
      agent/consul/state/peering.go
  3. 136
      agent/consul/state/peering_test.go
  4. 6
      agent/rpc/peering/service.go
  5. 9
      agent/rpc/peering/service_test.go
  6. 33
      agent/rpc/peering/stream_test.go
  7. 2
      agent/rpc/peering/subscription_blocking.go
  8. 53
      agent/rpc/peering/subscription_manager.go
  9. 161
      agent/rpc/peering/subscription_manager_test.go
  10. 4
      agent/rpc/peering/subscription_state.go
  11. 5
      agent/structs/discovery_chain.go
  12. 46
      agent/structs/peering.go
  13. 11
      lib/maps/maps.go
  14. 34
      lib/maps/maps_test.go

29
agent/consul/state/config_entry.go

@ -506,20 +506,13 @@ var serviceGraphKinds = []string{
// discoveryChainTargets will return a list of services listed as a target for the input's discovery chain
func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
source := structs.NewServiceName(service, entMeta)
req := discoverychain.CompileRequest{
ServiceName: source.Name,
EvaluateInNamespace: source.NamespaceOrDefault(),
EvaluateInPartition: source.PartitionOrDefault(),
EvaluateInDatacenter: dc,
}
idx, chain, _, err := s.serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req)
idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, service, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err)
return 0, nil, err
}
var resp []structs.ServiceName
for _, t := range chain.Targets {
for _, t := range targets {
em := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), t.Namespace)
target := structs.NewServiceName(t.Service, &em)
@ -531,6 +524,22 @@ func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, serv
return idx, resp, nil
}
func (s *Store) discoveryChainOriginalTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []*structs.DiscoveryTarget, error) {
source := structs.NewServiceName(service, entMeta)
req := discoverychain.CompileRequest{
ServiceName: source.Name,
EvaluateInNamespace: source.NamespaceOrDefault(),
EvaluateInPartition: source.PartitionOrDefault(),
EvaluateInDatacenter: dc,
}
idx, chain, _, err := s.serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err)
}
return idx, maps.SliceOfValues(chain.Targets), nil
}
// discoveryChainSourcesTxn will return a list of services whose discovery chains have the given service as a target
func (s *Store) discoveryChainSourcesTxn(tx ReadTxn, ws memdb.WatchSet, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
seenLink := map[structs.ServiceName]bool{destination: true}

94
agent/consul/state/peering.go

@ -3,6 +3,7 @@ package state
import (
"errors"
"fmt"
"sort"
"strings"
"github.com/golang/protobuf/proto"
@ -309,7 +310,7 @@ func (s *Store) PeeringTerminateByID(idx uint64, id string) error {
// gateway's config entry, which we wouldn't want to replicate. How would
// client peers know to route through terminating gateways when they're not
// dialing through a remote mesh gateway?
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) {
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc string) (uint64, *structs.ExportedServiceList, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
@ -321,7 +322,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
return 0, &structs.ExportedServiceList{}, nil
}
return s.exportedServicesForPeerTxn(ws, tx, peering)
return s.exportedServicesForPeerTxn(ws, tx, peering, dc)
}
func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
@ -335,7 +336,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl
out := make(map[string]structs.ServiceList)
for _, peering := range peerings {
idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering)
idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering, "")
if err != nil {
return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err)
}
@ -351,7 +352,11 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl
return maxIdx, out, nil
}
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) {
// exportedServicesForPeerTxn will find all services that are exported to a
// specific peering, and optionally include information about discovery chain
// reachable targets for these exported services if the "dc" parameter is
// specified.
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering, dc string) (uint64, *structs.ExportedServiceList, error) {
maxIdx := peering.ModifyIndex
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
@ -437,42 +442,63 @@ func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peerin
normal := maps.SliceOfKeys(normalSet)
disco := maps.SliceOfKeys(discoSet)
structs.ServiceList(normal).Sort()
structs.ServiceList(disco).Sort()
serviceProtocols := make(map[structs.ServiceName]string)
populateProtocol := func(svc structs.ServiceName) error {
if _, ok := serviceProtocols[svc]; ok {
chainInfo := make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo)
populateChainInfo := func(svc structs.ServiceName) error {
if _, ok := chainInfo[svc]; ok {
return nil // already processed
}
var info structs.ExportedDiscoveryChainInfo
idx, protocol, err := protocolForService(tx, ws, svc)
if err != nil {
return fmt.Errorf("failed to get protocol for service: %w", err)
return fmt.Errorf("failed to get protocol for service %q: %w", svc, err)
}
if idx > maxIdx {
maxIdx = idx
}
info.Protocol = protocol
if dc != "" && !structs.IsProtocolHTTPLike(protocol) {
// We only need to populate the targets for replication purposes for L4 protocols, which
// do not ultimately get intercepted by the mesh gateways.
idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err)
}
if idx > maxIdx {
maxIdx = idx
}
sort.Slice(targets, func(i, j int) bool {
return targets[i].ID < targets[j].ID
})
info.TCPTargets = targets
}
serviceProtocols[svc] = protocol
chainInfo[svc] = info
return nil
}
for _, svc := range normal {
if err := populateProtocol(svc); err != nil {
if err := populateChainInfo(svc); err != nil {
return 0, nil, err
}
}
for _, svc := range disco {
if err := populateProtocol(svc); err != nil {
if err := populateChainInfo(svc); err != nil {
return 0, nil, err
}
}
structs.ServiceList(normal).Sort()
list := &structs.ExportedServiceList{
Services: normal,
DiscoChains: disco,
ConnectProtocol: serviceProtocols,
Services: normal,
DiscoChains: chainInfo,
}
return maxIdx, list, nil
@ -521,25 +547,44 @@ func peeringsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, en
return maxIdx, peerings, nil
}
// TrustBundleListByService returns the trust bundles for all peers that the given service is exported to.
func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) {
// TrustBundleListByService returns the trust bundles for all peers that the
// given service is exported to, via a discovery chain target.
func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
maxIdx, peers, err := peeringsForServiceTxn(tx, ws, service, entMeta)
realSvc := structs.NewServiceName(service, &entMeta)
maxIdx, chainNames, err := s.discoveryChainSourcesTxn(tx, ws, dc, realSvc)
if err != nil {
return 0, nil, fmt.Errorf("failed to get peers for service %s: %v", service, err)
return 0, nil, fmt.Errorf("failed to list all discovery chains referring to %q: %w", realSvc, err)
}
peerNames := make(map[string]struct{})
for _, chainSvc := range chainNames {
idx, peers, err := peeringsForServiceTxn(tx, ws, chainSvc.Name, chainSvc.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get peers for service %s: %v", chainSvc, err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, peer := range peers {
peerNames[peer.Name] = struct{}{}
}
}
peerNamesSlice := maps.SliceOfKeys(peerNames)
sort.Strings(peerNamesSlice)
var resp []*pbpeering.PeeringTrustBundle
for _, peer := range peers {
for _, peerName := range peerNamesSlice {
pq := Query{
Value: strings.ToLower(peer.Name),
Value: strings.ToLower(peerName),
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(entMeta.PartitionOrDefault()),
}
idx, trustBundle, err := peeringTrustBundleReadTxn(tx, ws, pq)
if err != nil {
return 0, nil, fmt.Errorf("failed to read trust bundle for peer %s: %v", peer.Name, err)
return 0, nil, fmt.Errorf("failed to read trust bundle for peer %s: %v", peerName, err)
}
if idx > maxIdx {
maxIdx = idx
@ -548,6 +593,7 @@ func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service string, entM
resp = append(resp, trustBundle)
}
}
return maxIdx, resp, nil
}

136
agent/consul/state/peering_test.go

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/prototest"
@ -674,6 +675,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
var lastIdx uint64
ca := &structs.CAConfiguration{
Provider: "consul",
ClusterID: connect.TestClusterID,
}
lastIdx++
require.NoError(t, s.CASetConfig(lastIdx, ca))
lastIdx++
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
ID: testUUID(),
@ -705,10 +713,18 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.NoError(t, s.EnsureConfigEntry(lastIdx, entry))
}
newTarget := func(service, serviceSubset, datacenter string) *structs.DiscoveryTarget {
t := structs.NewDiscoveryTarget(service, serviceSubset, "default", "default", datacenter)
t.SNI = connect.TargetSNI(t, connect.TestTrustDomain)
t.Name = t.SNI
t.ConnectTimeout = 5 * time.Second // default
return t
}
testutil.RunStep(t, "no exported services", func(t *testing.T) {
expect := &structs.ExportedServiceList{}
idx, got, err := s.ExportedServicesForPeer(ws, id)
idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got)
@ -754,13 +770,23 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta,
},
},
ConnectProtocol: map[structs.ServiceName]string{
newSN("mysql"): "tcp",
newSN("redis"): "tcp",
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("mysql"): {
Protocol: "tcp",
TCPTargets: []*structs.DiscoveryTarget{
newTarget("mysql", "", "dc1"),
},
},
newSN("redis"): {
Protocol: "tcp",
TCPTargets: []*structs.DiscoveryTarget{
newTarget("redis", "", "dc1"),
},
},
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id)
idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got)
@ -800,11 +826,16 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta,
},
},
ConnectProtocol: map[structs.ServiceName]string{
newSN("billing"): "tcp",
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("billing"): {
Protocol: "tcp",
TCPTargets: []*structs.DiscoveryTarget{
newTarget("billing", "", "dc1"),
},
},
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id)
idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got)
@ -869,29 +900,25 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
},
// NOTE: no payments-proxy here
},
DiscoChains: []structs.ServiceName{
{
Name: "resolver",
EnterpriseMeta: *defaultEntMeta,
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("billing"): {
Protocol: "http",
},
{
Name: "router",
EnterpriseMeta: *defaultEntMeta,
newSN("payments"): {
Protocol: "http",
},
{
Name: "splitter",
EnterpriseMeta: *defaultEntMeta,
newSN("resolver"): {
Protocol: "http",
},
newSN("router"): {
Protocol: "http",
},
newSN("splitter"): {
Protocol: "http",
},
},
ConnectProtocol: map[structs.ServiceName]string{
newSN("billing"): "http",
newSN("payments"): "http",
newSN("resolver"): "http",
newSN("router"): "http",
newSN("splitter"): "http",
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id)
idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got)
@ -915,23 +942,19 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
},
// NOTE: no payments-proxy here
},
DiscoChains: []structs.ServiceName{
{
Name: "resolver",
EnterpriseMeta: *defaultEntMeta,
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("payments"): {
Protocol: "http",
},
{
Name: "router",
EnterpriseMeta: *defaultEntMeta,
newSN("resolver"): {
Protocol: "http",
},
newSN("router"): {
Protocol: "http",
},
},
ConnectProtocol: map[structs.ServiceName]string{
newSN("payments"): "http",
newSN("resolver"): "http",
newSN("router"): "http",
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id)
idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got)
@ -941,7 +964,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
expect := &structs.ExportedServiceList{}
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", defaultEntMeta))
idx, got, err := s.ExportedServicesForPeer(ws, id)
idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got)
@ -1218,15 +1241,22 @@ func TestStore_TrustBundleListByService(t *testing.T) {
entMeta := *acl.DefaultEnterpriseMeta()
var lastIdx uint64
ws := memdb.NewWatchSet()
ca := &structs.CAConfiguration{
Provider: "consul",
ClusterID: connect.TestClusterID,
}
lastIdx++
require.NoError(t, store.CASetConfig(lastIdx, ca))
var (
peerID1 = testUUID()
peerID2 = testUUID()
)
ws := memdb.NewWatchSet()
testutil.RunStep(t, "no results on initial setup", func(t *testing.T) {
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Len(t, resp, 0)
@ -1248,7 +1278,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.False(t, watchFired(ws))
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Len(t, resp, 0)
require.Equal(t, lastIdx-2, idx)
@ -1264,10 +1294,10 @@ func TestStore_TrustBundleListByService(t *testing.T) {
// The peering is only watched after the service is exported via config entry.
require.False(t, watchFired(ws))
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, uint64(0), idx)
require.Len(t, resp, 0)
require.Equal(t, lastIdx-3, idx)
})
testutil.RunStep(t, "exporting the service does not yield trust bundles", func(t *testing.T) {
@ -1290,7 +1320,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Len(t, resp, 0)
@ -1307,7 +1337,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Len(t, resp, 1)
@ -1321,7 +1351,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Len(t, resp, 0)
@ -1346,7 +1376,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Len(t, resp, 1)
@ -1371,7 +1401,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.False(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx-2, idx)
require.Len(t, resp, 1)
@ -1400,7 +1430,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Len(t, resp, 2)
@ -1419,7 +1449,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Len(t, resp, 1)
@ -1432,7 +1462,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.False(t, watchFired(ws))
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err)
require.Equal(t, lastIdx-1, idx)
require.Len(t, resp, 1)

6
agent/rpc/peering/service.go

@ -130,12 +130,12 @@ type Store interface {
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error)
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
AbandonCh() <-chan struct{}
}
@ -533,7 +533,7 @@ func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.T
switch {
case req.ServiceName != "":
idx, bundles, err = s.Backend.Store().TrustBundleListByService(nil, req.ServiceName, entMeta)
idx, bundles, err = s.Backend.Store().TrustBundleListByService(nil, req.ServiceName, s.config.Datacenter, entMeta)
case req.Kind == string(structs.ServiceKindMeshGateway):
idx, bundles, err = s.Backend.Store().PeeringTrustBundleList(nil, entMeta)
case req.Kind != "":

9
agent/rpc/peering/service_test.go

@ -490,8 +490,8 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
resp, err := client.TrustBundleListByService(context.Background(), &req)
require.NoError(t, err)
require.Len(t, resp.Bundles, 2)
require.Equal(t, []string{"foo-root-1"}, resp.Bundles[0].RootPEMs)
require.Equal(t, []string{"bar-root-1"}, resp.Bundles[1].RootPEMs)
require.Equal(t, []string{"bar-root-1"}, resp.Bundles[0].RootPEMs)
require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs)
}
func Test_StreamHandler_UpsertServices(t *testing.T) {
@ -912,7 +912,10 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
testrpc.WaitForLeader(t, server.RPC, conf.Datacenter)
backend := consul.NewPeeringBackend(server, deps.GRPCConnPool)
handler := &peering.Service{Backend: backend}
handler := peering.NewService(testutil.Logger(t), peering.Config{
Datacenter: "dc1",
ConnectEnabled: true,
}, backend)
grpcServer := gogrpc.NewServer()
pbpeering.RegisterPeeringServiceServer(grpcServer, handler)

33
agent/rpc/peering/stream_test.go

@ -655,6 +655,18 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
mongoSvcDefaults := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mongo",
Protocol: "grpc",
}
require.NoError(t, mongoSvcDefaults.Normalize())
require.NoError(t, mongoSvcDefaults.Validate())
lastIdx++
require.NoError(t, store.EnsureConfigEntry(lastIdx, mongoSvcDefaults))
// NOTE: for this test we'll just live in a fantasy realm where we assume
// that mongo understands gRPC
var (
mongoSN = structs.NewServiceName("mongo", nil).String()
mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String()
@ -681,6 +693,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
},
},
}
require.NoError(t, entry.Normalize())
require.NoError(t, entry.Validate())
lastIdx++
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
@ -744,8 +758,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1)
svid := "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo"
require.Equal(t, []string{svid}, nodes.Nodes[0].Service.Connect.PeerMeta.SpiffeID)
pm := nodes.Nodes[0].Service.Connect.PeerMeta
require.Equal(t, "grpc", pm.Protocol)
spiffeIDs := []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo",
"spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1",
}
require.Equal(t, spiffeIDs, pm.SpiffeID)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
@ -756,8 +775,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1)
svid := "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql"
require.Equal(t, []string{svid}, nodes.Nodes[0].Service.Connect.PeerMeta.SpiffeID)
pm := nodes.Nodes[0].Service.Connect.PeerMeta
require.Equal(t, "tcp", pm.Protocol)
spiffeIDs := []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
}
require.Equal(t, spiffeIDs, pm.SpiffeID)
},
)
})
@ -800,6 +823,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
},
},
}
require.NoError(t, entry.Normalize())
require.NoError(t, entry.Validate())
lastIdx++
err := store.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err)

2
agent/rpc/peering/subscription_blocking.go

@ -23,7 +23,7 @@ func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Contex
// match the list of services exported to the peer.
m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) {
// Get exported services for peer id
_, list, err := store.ExportedServicesForPeer(ws, peerID)
_, list, err := store.ExportedServicesForPeer(ws, peerID, m.config.Datacenter)
if err != nil {
return nil, fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
}

53
agent/rpc/peering/subscription_manager.go

@ -238,8 +238,8 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
if state.exportList != nil {
// Trigger public events for all synthetic discovery chain replies.
for chainName, protocol := range state.connectServices {
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol)
for chainName, info := range state.connectServices {
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info)
}
}
@ -456,17 +456,17 @@ func (m *subscriptionManager) syncDiscoveryChains(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
chainsByName map[structs.ServiceName]string, // TODO(peering):rename variable
chainsByName map[structs.ServiceName]structs.ExportedDiscoveryChainInfo,
) {
// if it was newly added, then try to emit an UPDATE event
for chainName, protocol := range chainsByName {
if oldProtocol, ok := state.connectServices[chainName]; ok && protocol == oldProtocol {
for chainName, info := range chainsByName {
if oldInfo, ok := state.connectServices[chainName]; ok && info.Equal(oldInfo) {
continue
}
state.connectServices[chainName] = protocol
state.connectServices[chainName] = info
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol)
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info)
}
// if it was dropped, try to emit an DELETE event
@ -498,7 +498,7 @@ func (m *subscriptionManager) emitEventForDiscoveryChain(
state *subscriptionState,
pending *pendingPayload,
chainName structs.ServiceName,
protocol string,
info structs.ExportedDiscoveryChainInfo,
) {
if _, ok := state.connectServices[chainName]; !ok {
return // not found
@ -519,7 +519,7 @@ func (m *subscriptionManager) emitEventForDiscoveryChain(
m.config.Datacenter,
m.trustDomain,
chainName,
protocol,
info,
state.meshGateway,
),
)
@ -532,7 +532,7 @@ func createDiscoChainHealth(
peerName string,
datacenter, trustDomain string,
sn structs.ServiceName,
protocol string,
info structs.ExportedDiscoveryChainInfo,
pb *pbservice.IndexedCheckServiceNodes,
) *pbservice.IndexedCheckServiceNodes {
fakeProxyName := sn.Name + syntheticProxyNameSuffix
@ -546,6 +546,7 @@ func createDiscoChainHealth(
Datacenter: datacenter,
Service: sn.Name,
}
mainSpiffeIDString := spiffeID.URI().String()
sni := connect.PeeredServiceSNI(
sn.Name,
@ -559,9 +560,35 @@ func createDiscoChainHealth(
//
// TODO(peering): should this be replicated by service and not by instance?
peerMeta = &pbservice.PeeringServiceMeta{
SNI: []string{sni},
SpiffeID: []string{spiffeID.URI().String()},
Protocol: protocol,
SNI: []string{sni},
SpiffeID: []string{
mainSpiffeIDString,
},
Protocol: info.Protocol,
}
if structs.IsProtocolHTTPLike(info.Protocol) {
gwSpiffeID := connect.SpiffeIDMeshGateway{
Host: trustDomain,
Partition: sn.PartitionOrDefault(),
Datacenter: datacenter,
}
peerMeta.SpiffeID = append(peerMeta.SpiffeID, gwSpiffeID.URI().String())
} else {
for _, target := range info.TCPTargets {
targetSpiffeID := connect.SpiffeIDService{
Host: trustDomain,
Partition: target.Partition,
Namespace: target.Namespace,
Datacenter: target.Datacenter,
Service: target.Service,
}
targetSpiffeIDString := targetSpiffeID.URI().String()
if targetSpiffeIDString != mainSpiffeIDString {
peerMeta.SpiffeID = append(peerMeta.SpiffeID, targetSpiffeIDString)
}
}
}
}

161
agent/rpc/peering/subscription_manager_test.go

@ -281,6 +281,156 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
)
})
testutil.RunStep(t, "peer meta changes when L4 disco chain changes", func(t *testing.T) {
backend.ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "mysql",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Service: "failover",
Datacenters: []string{"dc2", "dc3"},
},
},
})
// ensure we get updated peer meta
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlProxyCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: &pbservice.NodeService{
Kind: "connect-proxy",
ID: "mysql-sidecar-proxy-instance-0",
Service: "mysql-sidecar-proxy",
Port: 8443,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceID: "mysql-instance-0",
DestinationServiceName: "mysql",
},
Connect: &pbservice.ServiceConnect{
PeerMeta: &pbservice.PeeringServiceMeta{
SNI: []string{
"mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul",
},
SpiffeID: []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc2/svc/failover",
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc3/svc/failover",
},
Protocol: "tcp",
},
},
},
}, res.Nodes[0])
},
)
// reset so the next subtest is valid
backend.deleteConfigEntry(t, structs.ServiceResolver, "mysql")
// ensure we get peer meta is restored
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlProxyCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: &pbservice.NodeService{
Kind: "connect-proxy",
ID: "mysql-sidecar-proxy-instance-0",
Service: "mysql-sidecar-proxy",
Port: 8443,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceID: "mysql-instance-0",
DestinationServiceName: "mysql",
},
Connect: &pbservice.ServiceConnect{
PeerMeta: &pbservice.PeeringServiceMeta{
SNI: []string{
"mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul",
},
SpiffeID: []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
},
Protocol: "tcp",
},
},
},
}, res.Nodes[0])
},
)
})
testutil.RunStep(t, "peer meta changes when protocol switches from L4 to L7", func(t *testing.T) {
// NOTE: for this test we'll just live in a fantasy realm where we assume
// that mysql understands gRPC
backend.ensureConfigEntry(t, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mysql",
Protocol: "grpc",
})
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlProxyCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: &pbservice.NodeService{
Kind: "connect-proxy",
ID: "mysql-sidecar-proxy-instance-0",
Service: "mysql-sidecar-proxy",
Port: 8443,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceID: "mysql-instance-0",
DestinationServiceName: "mysql",
},
Connect: &pbservice.ServiceConnect{
PeerMeta: &pbservice.PeeringServiceMeta{
SNI: []string{
"mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul",
},
SpiffeID: []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
"spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1",
},
Protocol: "grpc",
},
},
},
}, res.Nodes[0])
},
)
})
testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) {
backend.deleteService(t, "bar", mysql2.Service.ID)
@ -514,6 +664,11 @@ func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend {
store: store,
}
backend.ensureCAConfig(t, &structs.CAConfiguration{
Provider: "consul",
ClusterID: connect.TestClusterID,
})
// Create some placeholder data to ensure raft index > 0
//
// TODO(peering): is there some extremely subtle max-index table reading bug in play?
@ -545,6 +700,12 @@ func (b *testSubscriptionBackend) ensureConfigEntry(t *testing.T, entry structs.
return b.lastIdx
}
func (b *testSubscriptionBackend) deleteConfigEntry(t *testing.T, kind, name string) uint64 {
b.lastIdx++
require.NoError(t, b.store.DeleteConfigEntry(b.lastIdx, kind, name, nil))
return b.lastIdx
}
func (b *testSubscriptionBackend) ensureNode(t *testing.T, node *structs.Node) uint64 {
b.lastIdx++
require.NoError(t, b.store.EnsureNode(b.lastIdx, node))

4
agent/rpc/peering/subscription_state.go

@ -26,7 +26,7 @@ type subscriptionState struct {
exportList *structs.ExportedServiceList
watchedServices map[structs.ServiceName]context.CancelFunc
connectServices map[structs.ServiceName]string // value:protocol
connectServices map[structs.ServiceName]structs.ExportedDiscoveryChainInfo
// eventVersions is a duplicate event suppression system keyed by the "id"
// not the "correlationID"
@ -48,7 +48,7 @@ func newSubscriptionState(peerName, partition string) *subscriptionState {
peerName: peerName,
partition: partition,
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
connectServices: make(map[structs.ServiceName]string),
connectServices: make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo),
eventVersions: make(map[string]string),
}
}

5
agent/structs/discovery_chain.go

@ -6,7 +6,6 @@ import (
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib"
)
@ -290,3 +289,7 @@ func (t *DiscoveryTarget) String() string {
func (t *DiscoveryTarget) ServiceID() ServiceID {
return NewServiceID(t.Service, t.GetEnterpriseMetadata())
}
func (t *DiscoveryTarget) ServiceName() ServiceName {
return NewServiceName(t.Service, t.GetEnterpriseMetadata())
}

46
agent/structs/peering.go

@ -19,26 +19,54 @@ type ExportedServiceList struct {
// service discovery and service mesh.
Services []ServiceName
// DiscoChains is a list of exported service that ONLY apply to service mesh.
DiscoChains []ServiceName
// DiscoChains is a map of service names to their exported discovery chains
// for service mesh purposes as defined in the exported-services
// configuration entry.
DiscoChains map[ServiceName]ExportedDiscoveryChainInfo
}
// NOTE: this is not serialized via msgpack so it can be changed without concern.
type ExportedDiscoveryChainInfo struct {
// Protocol is the overall protocol associated with this discovery chain.
Protocol string
// TCPTargets is the list of discovery chain targets that are reachable by
// this discovery chain.
//
// NOTE: this is only populated if Protocol=tcp.
TCPTargets []*DiscoveryTarget
}
func (i ExportedDiscoveryChainInfo) Equal(o ExportedDiscoveryChainInfo) bool {
switch {
case i.Protocol != o.Protocol:
return false
case len(i.TCPTargets) != len(o.TCPTargets):
return false
}
for j := 0; j < len(i.TCPTargets); j++ {
if i.TCPTargets[j].ID != o.TCPTargets[j].ID {
return false
}
}
// TODO(peering): reduce duplication here in the response
ConnectProtocol map[ServiceName]string
return true
}
// ListAllDiscoveryChains returns all discovery chains (union of Services and
// DiscoChains).
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]string {
chainsByName := make(map[ServiceName]string)
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]ExportedDiscoveryChainInfo {
chainsByName := make(map[ServiceName]ExportedDiscoveryChainInfo)
if list == nil {
return chainsByName
}
for _, svc := range list.Services {
chainsByName[svc] = list.ConnectProtocol[svc]
chainsByName[svc] = list.DiscoChains[svc]
}
for _, chainName := range list.DiscoChains {
chainsByName[chainName] = list.ConnectProtocol[chainName]
for chainName, info := range list.DiscoChains {
chainsByName[chainName] = info
}
return chainsByName
}

11
lib/maps/maps.go

@ -10,3 +10,14 @@ func SliceOfKeys[K comparable, V any](m map[K]V) []K {
}
return res
}
func SliceOfValues[K comparable, V any](m map[K]V) []V {
if len(m) == 0 {
return nil
}
res := make([]V, 0, len(m))
for _, v := range m {
res = append(res, v)
}
return res
}

34
lib/maps/maps_test.go

@ -39,3 +39,37 @@ func TestSliceOfKeys(t *testing.T) {
require.ElementsMatch(t, []id{{Name: "foo"}, {Name: "bar"}}, SliceOfKeys(m))
})
}
func TestSliceOfValues(t *testing.T) {
t.Run("string to int", func(t *testing.T) {
m := make(map[string]int)
require.Equal(t, []int(nil), SliceOfValues(m))
m["foo"] = 5
m["bar"] = 6
require.ElementsMatch(t, []int{5, 6}, SliceOfValues(m))
})
type blah struct {
V string
}
t.Run("int to struct", func(t *testing.T) {
m := make(map[int]blah)
require.Equal(t, []blah(nil), SliceOfValues(m))
m[5] = blah{V: "foo"}
m[6] = blah{V: "bar"}
require.ElementsMatch(t, []blah{{V: "foo"}, {V: "bar"}}, SliceOfValues(m))
})
type id struct {
Name string
}
t.Run("struct to struct pointer", func(t *testing.T) {
m := make(map[id]*blah)
require.Equal(t, []*blah(nil), SliceOfValues(m))
m[id{Name: "foo"}] = &blah{V: "oof"}
m[id{Name: "bar"}] = &blah{V: "rab"}
require.ElementsMatch(t, []*blah{{V: "oof"}, {V: "rab"}}, SliceOfValues(m))
})
}

Loading…
Cancel
Save