Browse Source

peering: replicate discovery chains information to importing peers

Treat each exported service as a "discovery chain" and replicate one
synthetic CheckServiceNode for each chain and remote mesh gateway.

The health will be a flattened generated check of the checks for that
mesh gateway node.
pull/13150/head
R.B. Boyer 3 years ago
parent
commit
3e4a522882
  1. 31
      agent/consul/state/config_entry.go
  2. 80
      agent/consul/state/peering.go
  3. 202
      agent/consul/state/peering_test.go
  4. 8
      agent/rpc/peering/service.go
  5. 9
      agent/rpc/peering/service_test.go
  6. 108
      agent/rpc/peering/subscription_blocking.go
  7. 414
      agent/rpc/peering/subscription_manager.go
  8. 646
      agent/rpc/peering/subscription_manager_test.go
  9. 165
      agent/rpc/peering/subscription_state.go
  10. 200
      agent/rpc/peering/subscription_state_test.go
  11. 18
      agent/rpc/peering/subscription_view.go
  12. 105
      agent/rpc/peering/subscription_view_test.go
  13. 27
      agent/structs/peering.go
  14. 12
      lib/maps/maps.go
  15. 41
      lib/maps/maps_test.go

31
agent/consul/state/config_entry.go

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/maps"
)
type ConfigEntryLinkIndex struct {
@ -137,6 +138,36 @@ func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string, entMeta *acl
return configEntriesByKindTxn(tx, ws, kind, entMeta)
}
func listDiscoveryChainNamesTxn(tx ReadTxn, ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
// Get the index and watch for updates
idx := maxIndexWatchTxn(tx, ws, tableConfigEntries)
// List all discovery chain top nodes.
seen := make(map[structs.ServiceName]struct{})
for _, kind := range []string{
structs.ServiceRouter,
structs.ServiceSplitter,
structs.ServiceResolver,
} {
iter, err := getConfigEntryKindsWithTxn(tx, kind, &entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
}
ws.Add(iter.WatchCh())
for v := iter.Next(); v != nil; v = iter.Next() {
entry := v.(structs.ConfigEntry)
sn := structs.NewServiceName(entry.GetName(), entry.GetEnterpriseMeta())
seen[sn] = struct{}{}
}
}
results := maps.SliceOfKeys(seen)
structs.ServiceList(results).Sort()
return idx, results, nil
}
func configEntriesByKindTxn(tx ReadTxn, ws memdb.WatchSet, kind string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) {
// Get the index and watch for updates
idx := maxIndexWatchTxn(tx, ws, tableConfigEntries)

80
agent/consul/state/peering.go

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/proto/pbpeering"
)
@ -140,7 +141,10 @@ func peeringReadTxn(tx ReadTxn, ws memdb.WatchSet, q Query) (uint64, *pbpeering.
func (s *Store) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return s.peeringListTxn(ws, tx, entMeta)
}
func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
var (
iter memdb.ResultIterator
err error
@ -281,12 +285,16 @@ func (s *Store) PeeringTerminateByID(idx uint64, id string) error {
return tx.Commit()
}
// ExportedServicesForPeer returns the list of typical and proxy services exported to a peer.
// TODO(peering): What to do about terminating gateways? Sometimes terminating gateways are the appropriate destination
// to dial for an upstream mesh service. However, that information is handled by observing the terminating gateway's
// config entry, which we wouldn't want to replicate. How would client peers know to route through terminating gateways
// when they're not dialing through a remote mesh gateway?
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error) {
// ExportedServicesForPeer returns the list of typical and proxy services
// exported to a peer.
//
// TODO(peering): What to do about terminating gateways? Sometimes terminating
// gateways are the appropriate destination to dial for an upstream mesh
// service. However, that information is handled by observing the terminating
// gateway's config entry, which we wouldn't want to replicate. How would
// client peers know to route through terminating gateways when they're not
// dialing through a remote mesh gateway?
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
@ -295,9 +303,13 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
return 0, nil, fmt.Errorf("failed to read peering: %w", err)
}
if peering == nil {
return 0, nil, nil
return 0, &structs.ExportedServiceList{}, nil
}
return s.exportedServicesForPeerTxn(ws, tx, peering)
}
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) {
maxIdx := peering.ModifyIndex
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
@ -309,14 +321,28 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
maxIdx = idx
}
if raw == nil {
return maxIdx, nil, nil
return maxIdx, &structs.ExportedServiceList{}, nil
}
conf, ok := raw.(*structs.ExportedServicesConfigEntry)
if !ok {
return 0, nil, fmt.Errorf("expected type *structs.ExportedServicesConfigEntry, got %T", raw)
}
set := make(map[structs.ServiceName]struct{})
var (
normalSet = make(map[structs.ServiceName]struct{})
discoSet = make(map[structs.ServiceName]struct{})
)
// TODO(peering): filter the disco chain portion of the results to only be
// things reachable over the mesh to avoid replicating some clutter.
//
// At least one of the following should be true for a name for it to
// replicate:
//
// - are a discovery chain by definition (service-router, service-splitter, service-resolver)
// - have an explicit sidecar kind=connect-proxy
// - use connect native mode
for _, svc := range conf.Services {
svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
@ -325,7 +351,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
for _, consumer := range svc.Consumers {
name := structs.NewServiceName(svc.Name, &svcMeta)
if _, ok := set[name]; ok {
if _, ok := normalSet[name]; ok {
// Service was covered by a wildcard that was already accounted for
continue
}
@ -335,43 +361,47 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
sawPeer = true
if svc.Name != structs.WildcardSpecifier {
set[name] = struct{}{}
normalSet[name] = struct{}{}
}
}
// If the target peer is a consumer, and all services in the namespace are exported, query those service names.
if sawPeer && svc.Name == structs.WildcardSpecifier {
var typicalServices []*KindServiceName
idx, typicalServices, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta)
idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get service names: %w", err)
return 0, nil, fmt.Errorf("failed to get typical service names: %w", err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, s := range typicalServices {
set[s.Service] = struct{}{}
normalSet[s.Service] = struct{}{}
}
var proxyServices []*KindServiceName
idx, proxyServices, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindConnectProxy, svcMeta)
// list all config entries of kind service-resolver, service-router, service-splitter?
idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, svcMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get service names: %w", err)
return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, s := range proxyServices {
set[s.Service] = struct{}{}
for _, sn := range discoChains {
discoSet[sn] = struct{}{}
}
}
}
var resp []structs.ServiceName
for svc := range set {
resp = append(resp, svc)
}
return maxIdx, resp, nil
normal := maps.SliceOfKeys(normalSet)
disco := maps.SliceOfKeys(discoSet)
structs.ServiceList(normal).Sort()
structs.ServiceList(disco).Sort()
return maxIdx, &structs.ExportedServiceList{
Services: normal,
DiscoChains: disco,
}, nil
}
// PeeringsForService returns the list of peerings that are associated with the service name provided in the query.

202
agent/consul/state/peering_test.go

@ -630,25 +630,38 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
var lastIdx uint64
lastIdx++
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
Name: "my-peering",
})
require.NoError(t, err)
}))
q := Query{Value: "my-peering"}
_, p, err := s.PeeringRead(nil, q)
_, p, err := s.PeeringRead(nil, Query{
Value: "my-peering",
})
require.NoError(t, err)
require.NotNil(t, p)
id := p.ID
defaultEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
ws := memdb.NewWatchSet()
ensureConfigEntry := func(t *testing.T, entry structs.ConfigEntry) {
t.Helper()
require.NoError(t, entry.Normalize())
require.NoError(t, entry.Validate())
lastIdx++
require.NoError(t, s.EnsureConfigEntry(lastIdx, entry))
}
testutil.RunStep(t, "no exported services", func(t *testing.T) {
idx, exported, err := s.ExportedServicesForPeer(ws, id)
expect := &structs.ExportedServiceList{}
idx, got, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Empty(t, exported)
require.Equal(t, expect, got)
})
testutil.RunStep(t, "config entry with exact service names", func(t *testing.T) {
@ -658,58 +671,57 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
{
Name: "mysql",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
{PeerName: "my-peering"},
},
},
{
Name: "redis",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
{PeerName: "my-peering"},
},
},
{
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-other-peering",
},
{PeerName: "my-other-peering"},
},
},
},
}
lastIdx++
err = s.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err)
ensureConfigEntry(t, entry)
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
expect := []structs.ServiceName{
{
Name: "mysql",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
{
Name: "redis",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
expect := &structs.ExportedServiceList{
Services: []structs.ServiceName{
{
Name: "mysql",
EnterpriseMeta: *defaultEntMeta,
},
{
Name: "redis",
EnterpriseMeta: *defaultEntMeta,
},
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.ElementsMatch(t, expect, got)
require.Equal(t, expect, got)
})
testutil.RunStep(t, "config entry with wildcard service name picks up existing service", func(t *testing.T) {
lastIdx++
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{
Node: "foo", Address: "127.0.0.1",
}))
lastIdx++
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "billing", Service: "billing", Port: 5000}))
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
ID: "billing", Service: "billing", Port: 5000,
}))
entry := &structs.ExportedServicesConfigEntry{
Name: "default",
@ -717,24 +729,22 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
{
Name: "*",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
{PeerName: "my-peering"},
},
},
},
}
lastIdx++
err = s.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err)
ensureConfigEntry(t, entry)
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
expect := []structs.ServiceName{
{
Name: "billing",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
expect := &structs.ExportedServiceList{
Services: []structs.ServiceName{
{
Name: "billing",
EnterpriseMeta: *defaultEntMeta,
},
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id)
@ -745,69 +755,127 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
testutil.RunStep(t, "config entry with wildcard service names picks up new registrations", func(t *testing.T) {
lastIdx++
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "payments", Service: "payments", Port: 5000}))
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
ID: "payments", Service: "payments", Port: 5000,
}))
// The proxy will be ignored.
lastIdx++
proxy := structs.NodeService{
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "payments-proxy",
Service: "payments-proxy",
Port: 5000,
}
require.NoError(t, s.EnsureService(lastIdx, "foo", &proxy))
}))
// Ensure everything is L7-capable.
ensureConfigEntry(t, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
EnterpriseMeta: *defaultEntMeta,
})
ensureConfigEntry(t, &structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "router",
EnterpriseMeta: *defaultEntMeta,
})
ensureConfigEntry(t, &structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "splitter",
EnterpriseMeta: *defaultEntMeta,
Splits: []structs.ServiceSplit{{Weight: 100}},
})
ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "resolver",
EnterpriseMeta: *defaultEntMeta,
})
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
expect := []structs.ServiceName{
{
Name: "billing",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
{
Name: "payments",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
expect := &structs.ExportedServiceList{
Services: []structs.ServiceName{
{
Name: "billing",
EnterpriseMeta: *defaultEntMeta,
},
{
Name: "payments",
EnterpriseMeta: *defaultEntMeta,
},
// NOTE: no payments-proxy here
},
{
Name: "payments-proxy",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
DiscoChains: []structs.ServiceName{
{
Name: "resolver",
EnterpriseMeta: *defaultEntMeta,
},
{
Name: "router",
EnterpriseMeta: *defaultEntMeta,
},
{
Name: "splitter",
EnterpriseMeta: *defaultEntMeta,
},
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.ElementsMatch(t, expect, got)
require.Equal(t, expect, got)
})
testutil.RunStep(t, "config entry with wildcard service names picks up service deletions", func(t *testing.T) {
lastIdx++
require.NoError(t, s.DeleteService(lastIdx, "foo", "billing", nil, ""))
lastIdx++
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceSplitter, "splitter", nil))
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
expect := []structs.ServiceName{
{
Name: "payments",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
expect := &structs.ExportedServiceList{
Services: []structs.ServiceName{
{
Name: "payments",
EnterpriseMeta: *defaultEntMeta,
},
// NOTE: no payments-proxy here
},
{
Name: "payments-proxy",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
DiscoChains: []structs.ServiceName{
{
Name: "resolver",
EnterpriseMeta: *defaultEntMeta,
},
{
Name: "router",
EnterpriseMeta: *defaultEntMeta,
},
},
}
idx, got, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.ElementsMatch(t, expect, got)
require.Equal(t, expect, got)
})
testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) {
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", structs.DefaultEnterpriseMetaInDefaultPartition()))
idx, exported, err := s.ExportedServicesForPeer(ws, id)
expect := &structs.ExportedServiceList{}
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", defaultEntMeta))
idx, got, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err)
require.Equal(t, lastIdx, idx)
require.Empty(t, exported)
require.Equal(t, expect, got)
})
}

8
agent/rpc/peering/service.go

@ -101,8 +101,9 @@ type Store interface {
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error)
PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
AbandonCh() <-chan struct{}
}
@ -503,7 +504,7 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
defer s.streams.disconnected(req.LocalID)
mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend)
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID)
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.Partition)
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
@ -635,7 +636,8 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
if err := pushServiceResponse(logger, req.Stream, status, update); err != nil {
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
}
case strings.HasPrefix(update.CorrelationID, subMeshGateway):
//TODO(Peering): figure out how to sync this separately
default:
logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID)
continue

9
agent/rpc/peering/service_test.go

@ -1005,19 +1005,12 @@ func Test_StreamHandler_UpsertServices(t *testing.T) {
},
}
for _, tc := range tt {
runStep(t, tc.name, func(t *testing.T) {
testutil.RunStep(t, tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas.
// TODO(peering): these are endpoint tests and should live in the agent/consul
// package. Instead, these can be written around a mock client (see testing.go)

108
agent/rpc/peering/subscription_blocking.go

@ -0,0 +1,108 @@
package peering
import (
"context"
"errors"
"fmt"
"time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbservice"
)
// This file contains direct state store functions that need additional
// management to have them emit events. Ideally these would go through
// streaming machinery instead to be cheaper.
func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Context, state *subscriptionState, peerID string) {
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
// match the list of services exported to the peer.
m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) {
// Get exported services for peer id
_, list, err := store.ExportedServicesForPeer(ws, peerID)
if err != nil {
return nil, fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
}
return list, nil
}, subExportedServiceList, state.updateCh)
}
// TODO: add a new streaming subscription type to list-by-kind-and-partition since we're getting evictions
func (m *subscriptionManager) notifyMeshGatewaysForPartition(ctx context.Context, state *subscriptionState, partition string) {
m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) {
// Fetch our current list of all mesh gateways.
entMeta := structs.DefaultEnterpriseMetaInPartition(partition)
idx, nodes, err := store.ServiceDump(ws, structs.ServiceKindMeshGateway, true, entMeta, structs.DefaultPeerKeyword)
if err != nil {
return nil, fmt.Errorf("failed to watch mesh gateways services for partition %q: %w", partition, err)
}
if idx == 0 {
idx = 1
}
// convert back to a protobuf flavor
result := &pbservice.IndexedCheckServiceNodes{
Index: idx,
Nodes: make([]*pbservice.CheckServiceNode, len(nodes)),
}
for i, csn := range nodes {
result.Nodes[i] = pbservice.NewCheckServiceNodeFromStructs(&csn)
}
return result, nil
}, subMeshGateway+partition, state.updateCh)
}
func (m *subscriptionManager) syncViaBlockingQuery(
ctx context.Context,
queryType string,
queryFn func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error),
correlationID string,
updateCh chan<- cache.UpdateEvent,
) {
waiter := &retry.Waiter{
MinFailures: 1,
Factor: 500 * time.Millisecond,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
logger := m.logger
if queryType != "" {
logger = m.logger.With("queryType", queryType)
}
store := m.backend.Store()
for {
ws := memdb.NewWatchSet()
ws.Add(store.AbandonCh())
ws.Add(ctx.Done())
if result, err := queryFn(ctx, store, ws); err != nil {
logger.Error("failed to sync from query", "error", err)
} else {
// Block for any changes to the state store.
updateCh <- cache.UpdateEvent{
CorrelationID: correlationID,
Result: result,
}
ws.WatchCtx(ctx)
}
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.Error("failed to wait before re-trying sync", "error", err)
}
select {
case <-ctx.Done():
return
default:
}
}
}

414
agent/rpc/peering/subscription_manager.go

@ -2,17 +2,18 @@ package peering
import (
"context"
"errors"
"fmt"
"time"
"strings"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
)
@ -31,9 +32,6 @@ type subscriptionManager struct {
logger hclog.Logger
viewStore MaterializedViewStore
backend SubscriptionBackend
// watchedServices is a map of exported services to a cancel function for their subscription notifier.
watchedServices map[structs.ServiceName]context.CancelFunc
}
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
@ -43,61 +41,187 @@ func newSubscriptionManager(ctx context.Context, logger hclog.Logger, backend Su
go store.Run(ctx)
return &subscriptionManager{
logger: logger,
viewStore: store,
backend: backend,
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
logger: logger,
viewStore: store,
backend: backend,
}
}
// subscribe returns a channel that will contain updates to exported service instances for a given peer.
func (m *subscriptionManager) subscribe(ctx context.Context, peerID string) <-chan cache.UpdateEvent {
updateCh := make(chan cache.UpdateEvent, 1)
go m.syncSubscriptions(ctx, peerID, updateCh)
func (m *subscriptionManager) subscribe(ctx context.Context, peerID, partition string) <-chan cache.UpdateEvent {
var (
updateCh = make(chan cache.UpdateEvent, 1)
publicUpdateCh = make(chan cache.UpdateEvent, 1)
)
state := newSubscriptionState(partition)
state.publicUpdateCh = publicUpdateCh
state.updateCh = updateCh
// Wrap our bare state store queries in goroutines that emit events.
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
// This goroutine is the only one allowed to manipulate protected
// subscriptionManager fields.
go m.handleEvents(ctx, state, updateCh)
return updateCh
return publicUpdateCh
}
func (m *subscriptionManager) syncSubscriptions(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) {
waiter := &retry.Waiter{
MinFailures: 1,
Factor: 500 * time.Millisecond,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
func (m *subscriptionManager) handleEvents(ctx context.Context, state *subscriptionState, updateCh <-chan cache.UpdateEvent) {
for {
// TODO(peering): exponential backoff
select {
case <-ctx.Done():
return
case update := <-updateCh:
if err := m.handleEvent(ctx, state, update); err != nil {
m.logger.Error("Failed to handle update from watch",
"id", update.CorrelationID, "error", err,
)
continue
}
}
}
}
for {
if err := m.syncSubscriptionsAndBlock(ctx, peerID, updateCh); err != nil {
m.logger.Error("failed to sync subscriptions", "error", err)
func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscriptionState, u cache.UpdateEvent) error {
if u.Err != nil {
return fmt.Errorf("received error event: %w", u.Err)
}
// TODO(peering): on initial stream setup, transmit the list of exported
// services for use in differential DELETE/UPSERT. Akin to streaming's snapshot start/end.
switch {
case u.CorrelationID == subExportedServiceList:
// Everything starts with the exported service list coming from
// our state store watchset loop.
evt, ok := u.Result.(*structs.ExportedServiceList)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
m.logger.Error("failed to wait before re-trying sync", "error", err)
state.exportList = evt
pending := &pendingPayload{}
m.syncNormalServices(ctx, state, pending, evt.Services)
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
state.sendPendingEvents(ctx, m.logger, pending)
// cleanup event versions too
state.cleanupEventVersions(m.logger)
case strings.HasPrefix(u.CorrelationID, subExportedService):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
select {
case <-ctx.Done():
return
default:
// TODO(peering): is it safe to edit these protobufs in place?
// Clear this raft index before exporting.
csn.Index = 0
// Ensure that connect things are scrubbed so we don't mix-and-match
// with the synthetic entries that point to mesh gateways.
filterConnectReferences(csn)
// Flatten health checks
for _, instance := range csn.Nodes {
instance.Checks = flattenChecks(
instance.Node.Node,
instance.Service.ID,
instance.Service.Service,
instance.Service.EnterpriseMeta,
instance.Checks,
)
}
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
// Just ferry this one directly along to the destination.
pending := &pendingPayload{}
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
return err
}
state.sendPendingEvents(ctx, m.logger, pending)
case strings.HasPrefix(u.CorrelationID, subMeshGateway):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway)
if !acl.EqualPartitions(partition, state.partition) {
return nil // ignore event
}
// Clear this raft index before exporting.
csn.Index = 0
state.meshGateway = csn
pending := &pendingPayload{}
// Directly replicate information about our mesh gateways to the consuming side.
// TODO(peering): should we scrub anything before replicating this?
if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil {
return err
}
if state.exportList != nil {
// Trigger public events for all synthetic discovery chain replies.
for chainName := range state.connectServices {
m.emitEventForDiscoveryChain(ctx, state, pending, chainName)
}
}
// TODO(peering): should we ship this down verbatim to the consumer?
state.sendPendingEvents(ctx, m.logger, pending)
default:
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
}
return nil
}
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
// match the list of services exported to the peer.
func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) error {
store := m.backend.Store()
func filterConnectReferences(orig *pbservice.IndexedCheckServiceNodes) {
newNodes := make([]*pbservice.CheckServiceNode, 0, len(orig.Nodes))
for i := range orig.Nodes {
csn := orig.Nodes[i]
ws := memdb.NewWatchSet()
ws.Add(store.AbandonCh())
ws.Add(ctx.Done())
if csn.Service.Kind != string(structs.ServiceKindTypical) {
continue // skip non-typical services
}
// Get exported services for peer id
_, services, err := store.ExportedServicesForPeer(ws, peerID)
if err != nil {
return fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
if strings.HasSuffix(csn.Service.Service, syntheticProxyNameSuffix) {
// Skip things that might LOOK like a proxy so we don't get a
// collision with the ones we generate.
continue
}
// Remove connect things like native mode.
if csn.Service.Connect != nil || csn.Service.Proxy != nil {
csn = proto.Clone(csn).(*pbservice.CheckServiceNode)
csn.Service.Connect = nil
csn.Service.Proxy = nil
}
newNodes = append(newNodes, csn)
}
orig.Nodes = newNodes
}
func (m *subscriptionManager) syncNormalServices(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
services []structs.ServiceName,
) {
// seen contains the set of exported service names and is used to reconcile the list of watched services.
seen := make(map[structs.ServiceName]struct{})
@ -105,45 +229,223 @@ func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, pee
for _, svc := range services {
seen[svc] = struct{}{}
if _, ok := m.watchedServices[svc]; ok {
if _, ok := state.watchedServices[svc]; ok {
// Exported service is already being watched, nothing to do.
continue
}
notifyCtx, cancel := context.WithCancel(ctx)
m.watchedServices[svc] = cancel
if err := m.Notify(notifyCtx, svc, updateCh); err != nil {
if err := m.NotifyStandardService(notifyCtx, svc, state.updateCh); err != nil {
cancel()
m.logger.Error("failed to subscribe to service", "service", svc.String())
continue
}
state.watchedServices[svc] = cancel
}
// For every subscription without an exported service, call the associated cancel fn.
for svc, cancel := range m.watchedServices {
for svc, cancel := range state.watchedServices {
if _, ok := seen[svc]; !ok {
cancel()
delete(state.watchedServices, svc)
// Send an empty event to the stream handler to trigger sending a DELETE message.
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
updateCh <- cache.UpdateEvent{
CorrelationID: subExportedService + svc.String(),
Result: &pbservice.IndexedCheckServiceNodes{},
err := pending.Add(
servicePayloadIDPrefix+svc.String(),
subExportedService+svc.String(),
&pbservice.IndexedCheckServiceNodes{},
)
if err != nil {
m.logger.Error("failed to send event for service", "service", svc.String(), "error", err)
continue
}
}
}
}
// Block for any changes to the state store.
ws.WatchCtx(ctx)
return nil
func (m *subscriptionManager) syncDiscoveryChains(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
chainsByName map[structs.ServiceName]struct{},
) {
// if it was newly added, then try to emit an UPDATE event
for chainName := range chainsByName {
if _, ok := state.connectServices[chainName]; ok {
continue
}
state.connectServices[chainName] = struct{}{}
m.emitEventForDiscoveryChain(ctx, state, pending, chainName)
}
// if it was dropped, try to emit an DELETE event
for chainName := range state.connectServices {
if _, ok := chainsByName[chainName]; ok {
continue
}
delete(state.connectServices, chainName)
if state.meshGateway != nil {
// Only need to clean this up if we know we may have ever sent it in the first place.
proxyName := generateProxyNameForDiscoveryChain(chainName)
err := pending.Add(
discoveryChainPayloadIDPrefix+chainName.String(),
subExportedService+proxyName.String(),
&pbservice.IndexedCheckServiceNodes{},
)
if err != nil {
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
continue
}
}
}
}
func (m *subscriptionManager) emitEventForDiscoveryChain(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
chainName structs.ServiceName,
) {
if _, ok := state.connectServices[chainName]; !ok {
return // not found
}
if state.exportList == nil || state.meshGateway == nil {
return // skip because we don't have the data to do it yet
}
// Emit event with fake data
proxyName := generateProxyNameForDiscoveryChain(chainName)
err := pending.Add(
discoveryChainPayloadIDPrefix+chainName.String(),
subExportedService+proxyName.String(),
createDiscoChainHealth(
chainName,
state.meshGateway,
),
)
if err != nil {
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
}
}
func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckServiceNodes) *pbservice.IndexedCheckServiceNodes {
fakeProxyName := sn.Name + syntheticProxyNameSuffix
newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes))
for i := range pb.Nodes {
gwNode := pb.Nodes[i].Node
gwService := pb.Nodes[i].Service
gwChecks := pb.Nodes[i].Checks
pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta)
fakeProxyID := fakeProxyName
if gwService.ID != "" {
// This is only going to be relevant if multiple mesh gateways are
// on the same exporting node.
fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i)
}
csn := &pbservice.CheckServiceNode{
Node: gwNode,
Service: &pbservice.NodeService{
Kind: string(structs.ServiceKindConnectProxy),
Service: fakeProxyName,
ID: fakeProxyID,
EnterpriseMeta: pbEntMeta,
PeerName: structs.DefaultPeerKeyword,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceName: sn.Name,
DestinationServiceID: sn.Name,
},
// direct
Address: gwService.Address,
TaggedAddresses: gwService.TaggedAddresses,
Port: gwService.Port,
SocketPath: gwService.SocketPath,
Weights: gwService.Weights,
},
Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks),
}
newNodes = append(newNodes, csn)
}
return &pbservice.IndexedCheckServiceNodes{
Index: 0,
Nodes: newNodes,
}
}
func flattenChecks(
nodeName string,
serviceID string,
serviceName string,
entMeta *pbcommon.EnterpriseMeta,
checks []*pbservice.HealthCheck,
) []*pbservice.HealthCheck {
if len(checks) == 0 {
return nil
}
healthStatus := api.HealthPassing
for _, chk := range checks {
if chk.Status != api.HealthPassing {
healthStatus = chk.Status
}
}
if serviceID == "" {
serviceID = serviceName
}
return []*pbservice.HealthCheck{
{
CheckID: serviceID + ":overall-check",
Name: "overall-check",
Status: healthStatus,
Node: nodeName,
ServiceID: serviceID,
ServiceName: serviceName,
EnterpriseMeta: entMeta,
PeerName: structs.DefaultPeerKeyword,
},
}
}
const (
subExportedService = "exported-service:"
subExportedServiceList = "exported-service-list"
subExportedService = "exported-service:"
subMeshGateway = "mesh-gateway:"
)
// Notify the given channel when there are updates to the requested service.
func (m *subscriptionManager) Notify(ctx context.Context, svc structs.ServiceName, updateCh chan<- cache.UpdateEvent) error {
sr := newExportedServiceRequest(m.logger, svc, m.backend)
// NotifyStandardService will notify the given channel when there are updates
// to the requested service of the same name in the catalog.
func (m *subscriptionManager) NotifyStandardService(
ctx context.Context,
svc structs.ServiceName,
updateCh chan<- cache.UpdateEvent,
) error {
sr := newExportedStandardServiceRequest(m.logger, svc, m.backend)
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
}
// syntheticProxyNameSuffix is the suffix to add to synthetic proxies we
// replicate to route traffic to an exported discovery chain through the mesh
// gateways.
//
// This name was chosen to match existing "sidecar service" generation logic
// and similar logic in the Service Identity synthetic ACL policies.
const syntheticProxyNameSuffix = "-sidecar-proxy"
func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName {
return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta)
}

646
agent/rpc/peering/subscription_manager_test.go

@ -2,81 +2,77 @@ package peering
import (
"context"
"sort"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
type testSubscriptionBackend struct {
state.EventPublisher
store *state.Store
}
func (b *testSubscriptionBackend) Store() Store {
return b.store
}
func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
backend := newTestSubscriptionBackend(t)
// initialCatalogIdx := backend.lastIdx
backend := testSubscriptionBackend{
EventPublisher: publisher,
store: store,
}
ctx := context.Background()
mgr := newSubscriptionManager(ctx, hclog.New(nil), &backend)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a peering
var lastIdx uint64 = 1
err := store.PeeringWrite(lastIdx, &pbpeering.Peering{
Name: "my-peering",
})
require.NoError(t, err)
_, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"})
require.NoError(t, err)
require.NotNil(t, p)
mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend)
subCh := mgr.subscribe(ctx, id, partition)
id := p.ID
var (
gatewayCorrID = subMeshGateway + partition
subCh := mgr.subscribe(ctx, id)
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
entry := &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "mysql",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
)
// Expect just the empty mesh gateway event to replicate.
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 0)
})
testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) {
backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "mysql",
Consumers: []structs.ServiceConsumer{
{PeerName: "my-peering"},
},
},
},
{
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-other-peering",
{
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{PeerName: "my-other-peering"},
},
},
},
},
}
lastIdx++
err = store.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err)
})
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlProxyCorrID, 0)
},
)
})
mysql1 := &structs.CheckServiceNode{
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
@ -87,34 +83,40 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
}
testutil.RunStep(t, "registering exported service instance yields update", func(t *testing.T) {
backend.ensureNode(t, mysql1.Node)
backend.ensureService(t, "foo", mysql1.Service)
// We get one update for the service
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
node := res.Nodes[0]
require.NotNil(t, node.Node)
require.Equal(t, "foo", node.Node.Node)
require.NotNil(t, node.Service)
require.Equal(t, "mysql-1", node.Service.ID)
require.Len(t, node.Checks, 0)
})
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql1.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql1.Service))
lastIdx++
require.NoError(t, store.EnsureCheck(lastIdx, mysql1.Checks[0]))
// Receive in a retry loop so that eventually we converge onto the expected CheckServiceNode.
retry.Run(t, func(r *retry.R) {
select {
case update := <-subCh:
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
require.True(r, ok)
require.Equal(r, uint64(5), nodes.Index)
require.Len(r, nodes.Nodes, 1)
require.Equal(r, "foo", nodes.Nodes[0].Node.Node)
require.Equal(r, "mysql-1", nodes.Nodes[0].Service.ID)
require.Len(r, nodes.Nodes[0].Checks, 1)
require.Equal(r, "mysql-check", nodes.Nodes[0].Checks[0].CheckID)
default:
r.Fatalf("invalid update")
}
backend.ensureCheck(t, mysql1.Checks[0])
// and one for the check
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
node := res.Nodes[0]
require.NotNil(t, node.Node)
require.Equal(t, "foo", node.Node.Node)
require.NotNil(t, node.Service)
require.Equal(t, "mysql-1", node.Service.ID)
require.Len(t, node.Checks, 1)
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
})
})
@ -127,237 +129,409 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
}
testutil.RunStep(t, "additional instances are returned when registered", func(t *testing.T) {
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql2.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "bar", mysql2.Service))
lastIdx++
require.NoError(t, store.EnsureCheck(lastIdx, mysql2.Checks[0]))
retry.Run(t, func(r *retry.R) {
select {
case update := <-subCh:
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
require.True(r, ok)
require.Equal(r, uint64(8), nodes.Index)
backend.ensureNode(t, mysql2.Node)
backend.ensureService(t, "bar", mysql2.Service)
require.Len(r, nodes.Nodes, 2)
require.Equal(r, "bar", nodes.Nodes[0].Node.Node)
require.Equal(r, "mysql-2", nodes.Nodes[0].Service.ID)
// We get one update for the service
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(r, nodes.Nodes[0].Checks, 1)
require.Equal(r, "mysql-2-check", nodes.Nodes[0].Checks[0].CheckID)
require.Len(t, res.Nodes, 2)
{
node := res.Nodes[0]
require.NotNil(t, node.Node)
require.Equal(t, "bar", node.Node.Node)
require.NotNil(t, node.Service)
require.Equal(t, "mysql-2", node.Service.ID)
require.Len(t, node.Checks, 0)
}
{
node := res.Nodes[1]
require.NotNil(t, node.Node)
require.Equal(t, "foo", node.Node.Node)
require.NotNil(t, node.Service)
require.Len(t, node.Checks, 1)
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
}
})
require.Equal(r, "foo", nodes.Nodes[1].Node.Node)
require.Equal(r, "mysql-1", nodes.Nodes[1].Service.ID)
backend.ensureCheck(t, mysql2.Checks[0])
require.Len(r, nodes.Nodes[1].Checks, 1)
require.Equal(r, "mysql-check", nodes.Nodes[1].Checks[0].CheckID)
// and one for the check
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
default:
r.Fatalf("invalid update")
require.Len(t, res.Nodes, 2)
{
node := res.Nodes[0]
require.NotNil(t, node.Node)
require.Equal(t, "bar", node.Node.Node)
require.NotNil(t, node.Service)
require.Equal(t, "mysql-2", node.Service.ID)
require.Len(t, node.Checks, 1)
require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID)
}
{
node := res.Nodes[1]
require.NotNil(t, node.Node)
require.Equal(t, "foo", node.Node.Node)
require.NotNil(t, node.Service)
require.Len(t, node.Checks, 1)
require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID)
}
})
})
testutil.RunStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) {
mongo := &structs.CheckServiceNode{
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000},
Checks: structs.HealthChecks{
&structs.HealthCheck{CheckID: "mongo-check", ServiceID: "mongo", Node: "zip"},
},
}
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service))
lastIdx++
require.NoError(t, store.EnsureCheck(lastIdx, mongo.Checks[0]))
// Receive from subCh times out. The retry in the last step already consumed all the mysql events.
select {
case update := <-subCh:
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
mongo := &structs.CheckServiceNode{
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000},
Checks: structs.HealthChecks{
&structs.HealthCheck{CheckID: "mongo-check", ServiceID: "mongo", Node: "zip"},
},
}
if ok && len(nodes.Nodes) > 0 && nodes.Nodes[0].Node.Node == "zip" {
t.Fatalf("received update for mongo node zip")
}
testutil.RunStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) {
backend.ensureNode(t, mongo.Node)
backend.ensureService(t, "zip", mongo.Service)
backend.ensureCheck(t, mongo.Checks[0])
case <-time.After(100 * time.Millisecond):
// Expect this to fire
}
// Receive from subCh times out.
expectEvents(t, subCh)
})
testutil.RunStep(t, "deregister an instance and it gets removed from the output", func(t *testing.T) {
lastIdx++
require.NoError(t, store.DeleteService(lastIdx, "foo", mysql1.Service.ID, nil, ""))
select {
case update := <-subCh:
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
require.True(t, ok)
require.Equal(t, uint64(12), nodes.Index)
require.Len(t, nodes.Nodes, 1)
require.Equal(t, "bar", nodes.Nodes[0].Node.Node)
require.Equal(t, "mysql-2", nodes.Nodes[0].Service.ID)
require.Len(t, nodes.Nodes[0].Checks, 1)
require.Equal(t, "mysql-2-check", nodes.Nodes[0].Checks[0].CheckID)
case <-time.After(100 * time.Millisecond):
t.Fatalf("timed out waiting for update")
}
backend.deleteService(t, "foo", mysql1.Service.ID)
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
node := res.Nodes[0]
require.NotNil(t, node.Node)
require.Equal(t, "bar", node.Node.Node)
require.NotNil(t, node.Service)
require.Equal(t, "mysql-2", node.Service.ID)
require.Len(t, node.Checks, 1)
require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID)
})
})
testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) {
lastIdx++
require.NoError(t, store.DeleteService(lastIdx, "bar", mysql2.Service.ID, nil, ""))
backend.deleteService(t, "bar", mysql2.Service.ID)
select {
case update := <-subCh:
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
require.True(t, ok)
require.Equal(t, uint64(13), nodes.Index)
require.Len(t, nodes.Nodes, 0)
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
case <-time.After(100 * time.Millisecond):
t.Fatalf("timed out waiting for update")
}
require.Len(t, res.Nodes, 0)
})
})
}
func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
backend := newTestSubscriptionBackend(t)
// initialCatalogIdx := backend.lastIdx
backend := testSubscriptionBackend{
EventPublisher: publisher,
store: store,
}
ctx := context.Background()
mgr := newSubscriptionManager(ctx, hclog.New(nil), &backend)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a peering
var lastIdx uint64 = 1
err := store.PeeringWrite(lastIdx, &pbpeering.Peering{
Name: "my-peering",
})
require.NoError(t, err)
_, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"})
require.NoError(t, err)
require.NotNil(t, p)
id := p.ID
subCh := mgr.subscribe(ctx, id)
mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend)
subCh := mgr.subscribe(ctx, id, partition)
// Register two services that are not yet exported
mysql := &structs.CheckServiceNode{
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
}
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
backend.ensureNode(t, mysql.Node)
backend.ensureService(t, "foo", mysql.Service)
mongo := &structs.CheckServiceNode{
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
}
backend.ensureNode(t, mongo.Node)
backend.ensureService(t, "zip", mongo.Service)
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
var (
gatewayCorrID = subMeshGateway + partition
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service))
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
mongoCorrID = subExportedService + structs.NewServiceName("mongo", nil).String()
chainCorrID = subExportedService + structs.NewServiceName("chain", nil).String()
// No updates should be received, because neither service is exported.
select {
case update := <-subCh:
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
if ok && len(nodes.Nodes) > 0 {
t.Fatalf("received unexpected update")
}
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String()
chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String()
)
case <-time.After(100 * time.Millisecond):
// Expect this to fire
}
// Expect just the empty mesh gateway event to replicate.
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 0)
})
// At this point in time we'll have a mesh-gateway notification with no
// content stored and handled.
testutil.RunStep(t, "exporting the two services yields an update for both", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{
backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "mysql",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
{PeerName: "my-peering"},
},
},
{
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
{PeerName: "my-peering"},
},
},
{
Name: "chain",
Consumers: []structs.ServiceConsumer{
{PeerName: "my-peering"},
},
},
},
})
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, chainCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, chainProxyCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mongoProxyCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlProxyCorrID, 0)
},
)
})
testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) {
gateway := &structs.CheckServiceNode{
Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"},
Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443},
// TODO: checks
}
lastIdx++
err = store.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err)
backend.ensureNode(t, gateway.Node)
backend.ensureService(t, "mgw", gateway.Service)
var (
sawMySQL bool
sawMongo bool
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, chainProxyCorrID, 1, "chain-sidecar-proxy", string(structs.ServiceKindConnectProxy))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mongoProxyCorrID, 1, "mongo-sidecar-proxy", string(structs.ServiceKindConnectProxy))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlProxyCorrID, 1, "mysql-sidecar-proxy", string(structs.ServiceKindConnectProxy))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 1, "gateway", string(structs.ServiceKindMeshGateway))
},
)
})
}
retry.Run(t, func(r *retry.R) {
select {
case update := <-subCh:
nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
require.True(r, ok)
require.Len(r, nodes.Nodes, 1)
switch nodes.Nodes[0].Service.Service {
case "mongo":
sawMongo = true
case "mysql":
sawMySQL = true
}
if !sawMySQL || !sawMongo {
r.Fatalf("missing an update")
}
default:
r.Fatalf("invalid update")
}
})
type testSubscriptionBackend struct {
state.EventPublisher
store *state.Store
lastIdx uint64
}
func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend {
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
backend := &testSubscriptionBackend{
EventPublisher: publisher,
store: store,
}
// Create some placeholder data to ensure raft index > 0
//
// TODO(peering): is there some extremely subtle max-index table reading bug in play?
placeholder := &structs.CheckServiceNode{
Node: &structs.Node{Node: "placeholder", Address: "10.0.0.1"},
Service: &structs.NodeService{ID: "placeholder-1", Service: "placeholder", Port: 5000},
}
backend.ensureNode(t, placeholder.Node)
backend.ensureService(t, "placeholder", placeholder.Service)
return backend
}
func (b *testSubscriptionBackend) Store() Store {
return b.store
}
func (b *testSubscriptionBackend) ensurePeering(t *testing.T, name string) (uint64, string) {
b.lastIdx++
return b.lastIdx, setupTestPeering(t, b.store, name, b.lastIdx)
}
func (b *testSubscriptionBackend) ensureConfigEntry(t *testing.T, entry structs.ConfigEntry) uint64 {
require.NoError(t, entry.Normalize())
require.NoError(t, entry.Validate())
b.lastIdx++
require.NoError(t, b.store.EnsureConfigEntry(b.lastIdx, entry))
return b.lastIdx
}
func (b *testSubscriptionBackend) ensureNode(t *testing.T, node *structs.Node) uint64 {
b.lastIdx++
require.NoError(t, b.store.EnsureNode(b.lastIdx, node))
return b.lastIdx
}
func (b *testSubscriptionBackend) ensureService(t *testing.T, node string, svc *structs.NodeService) uint64 {
b.lastIdx++
require.NoError(t, b.store.EnsureService(b.lastIdx, node, svc))
return b.lastIdx
}
func (b *testSubscriptionBackend) ensureCheck(t *testing.T, hc *structs.HealthCheck) uint64 {
b.lastIdx++
require.NoError(t, b.store.EnsureCheck(b.lastIdx, hc))
return b.lastIdx
}
func (b *testSubscriptionBackend) deleteService(t *testing.T, nodeName, serviceID string) uint64 {
b.lastIdx++
require.NoError(t, b.store.DeleteService(b.lastIdx, nodeName, serviceID, nil, ""))
return b.lastIdx
}
func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string {
err := store.PeeringWrite(index, &pbpeering.Peering{
Name: name,
})
require.NoError(t, err)
_, p, err := store.PeeringRead(nil, state.Query{Value: name})
require.NoError(t, err)
require.NotNil(t, p)
return p.ID
}
func newStateStore(t *testing.T, publisher *stream.EventPublisher) *state.Store {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
require.NoError(t, err)
store := state.NewStateStoreWithEventPublisher(gc, publisher)
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot))
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot))
go publisher.Run(context.Background())
go publisher.Run(ctx)
return store
}
func expectEvents(
t *testing.T,
ch <-chan cache.UpdateEvent,
checkFns ...func(t *testing.T, got cache.UpdateEvent),
) {
t.Helper()
num := len(checkFns)
var out []cache.UpdateEvent
if num == 0 {
// No updates should be received.
select {
case <-ch:
t.Fatalf("received unexpected update")
case <-time.After(100 * time.Millisecond):
// Expect this to fire
}
return
}
const timeout = 10 * time.Second
timeoutCh := time.After(timeout)
for len(out) < num {
select {
case <-timeoutCh:
t.Fatalf("timed out with %d of %d events after %v", len(out), num, timeout)
case evt := <-ch:
out = append(out, evt)
}
}
select {
case <-time.After(100 * time.Millisecond):
case evt := <-ch:
t.Fatalf("expected only %d events but got more; prev %+v; next %+v;", num, out, evt)
}
require.Len(t, out, num)
sort.SliceStable(out, func(i, j int) bool {
return out[i].CorrelationID < out[j].CorrelationID
})
for i := 0; i < num; i++ {
checkFns[i](t, out[i])
}
}
func checkEvent(
t *testing.T,
got cache.UpdateEvent,
correlationID string,
expectNodes int,
serviceKindPairs ...string) {
t.Helper()
require.True(t, len(serviceKindPairs) == 2*expectNodes, "sanity check")
require.Equal(t, correlationID, got.CorrelationID)
evt := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), evt.Index)
if expectNodes == 0 {
require.Len(t, evt.Nodes, 0)
} else {
require.Len(t, evt.Nodes, expectNodes)
for i := 0; i < expectNodes; i++ {
expectName := serviceKindPairs[i*2]
expectKind := serviceKindPairs[i*2+1]
require.Equal(t, expectName, evt.Nodes[i].Service.Service)
require.Equal(t, expectKind, evt.Nodes[i].Service.Kind)
}
}
}

165
agent/rpc/peering/subscription_state.go

@ -0,0 +1,165 @@
package peering
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strings"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbservice"
)
// subscriptionState is a collection of working state tied to a peerID subscription.
type subscriptionState struct {
// partition is immutable
partition string
// plain data
exportList *structs.ExportedServiceList
watchedServices map[structs.ServiceName]context.CancelFunc
connectServices map[structs.ServiceName]struct{}
// eventVersions is a duplicate event suppression system keyed by the "id"
// not the "correlationID"
eventVersions map[string]string
meshGateway *pbservice.IndexedCheckServiceNodes
// updateCh is an internal implementation detail for the machinery of the
// manager.
updateCh chan<- cache.UpdateEvent
// publicUpdateCh is the channel the manager uses to pass data back to the
// caller.
publicUpdateCh chan<- cache.UpdateEvent
}
func newSubscriptionState(partition string) *subscriptionState {
return &subscriptionState{
partition: partition,
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
connectServices: make(map[structs.ServiceName]struct{}),
eventVersions: make(map[string]string),
}
}
func (s *subscriptionState) sendPendingEvents(
ctx context.Context,
logger hclog.Logger,
pending *pendingPayload,
) {
for _, pendingEvt := range pending.Events {
cID := pendingEvt.CorrelationID
newVersion := pendingEvt.Version
oldVersion, ok := s.eventVersions[pendingEvt.ID]
if ok && newVersion == oldVersion {
logger.Trace("skipping send of duplicate public event", "correlationID", cID)
continue
}
logger.Trace("sending public event", "correlationID", cID)
s.eventVersions[pendingEvt.ID] = newVersion
evt := cache.UpdateEvent{
CorrelationID: cID,
Result: pendingEvt.Result,
}
select {
case s.publicUpdateCh <- evt:
case <-ctx.Done():
}
}
}
func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) {
for id := range s.eventVersions {
keep := false
switch {
case id == meshGatewayPayloadID:
keep = true
case strings.HasPrefix(id, servicePayloadIDPrefix):
name := strings.TrimPrefix(id, servicePayloadIDPrefix)
sn := structs.ServiceNameFromString(name)
if _, ok := s.watchedServices[sn]; ok {
keep = true
}
case strings.HasPrefix(id, discoveryChainPayloadIDPrefix):
name := strings.TrimPrefix(id, discoveryChainPayloadIDPrefix)
sn := structs.ServiceNameFromString(name)
if _, ok := s.connectServices[sn]; ok {
keep = true
}
}
if !keep {
logger.Trace("cleaning up unreferenced event id version", "id", id)
delete(s.eventVersions, id)
}
}
}
type pendingPayload struct {
Events []pendingEvent
}
type pendingEvent struct {
ID string
CorrelationID string
Result proto.Message
Version string
}
const (
meshGatewayPayloadID = "mesh-gateway"
servicePayloadIDPrefix = "service:"
discoveryChainPayloadIDPrefix = "chain:"
)
func (p *pendingPayload) Add(id string, correlationID string, raw interface{}) error {
result, ok := raw.(proto.Message)
if !ok {
return fmt.Errorf("invalid type for %q event: %T", correlationID, raw)
}
version, err := hashProtobuf(result)
if err != nil {
return fmt.Errorf("error hashing %q event: %w", correlationID, err)
}
p.Events = append(p.Events, pendingEvent{
ID: id,
CorrelationID: correlationID,
Result: result,
Version: version,
})
return nil
}
func hashProtobuf(res proto.Message) (string, error) {
h := sha256.New()
buffer := proto.NewBuffer(nil)
buffer.SetDeterministic(true)
err := buffer.Marshal(res)
if err != nil {
return "", err
}
h.Write(buffer.Bytes())
buffer.Reset()
return hex.EncodeToString(h.Sum(nil)), nil
}

200
agent/rpc/peering/subscription_state_test.go

@ -0,0 +1,200 @@
package peering
import (
"context"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestSubscriptionState_Events(t *testing.T) {
logger := hclog.NewNullLogger()
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
state := newSubscriptionState(partition)
testutil.RunStep(t, "empty", func(t *testing.T) {
pending := &pendingPayload{}
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 0)
})
meshNode1 := &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "foo"},
Service: &pbservice.NodeService{ID: "mgw-1", Service: "mgw", Kind: "mesh-gateway"},
}
testutil.RunStep(t, "one", func(t *testing.T) {
pending := &pendingPayload{}
require.NoError(t, pending.Add(
meshGatewayPayloadID,
subMeshGateway+partition,
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
},
},
))
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 1)
evt := got[0]
require.Equal(t, subMeshGateway+partition, evt.CorrelationID)
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1)
})
testutil.RunStep(t, "a duplicate is omitted", func(t *testing.T) {
pending := &pendingPayload{}
require.NoError(t, pending.Add(
meshGatewayPayloadID,
subMeshGateway+partition,
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
},
},
))
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 0)
})
webNode1 := &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "zim"},
Service: &pbservice.NodeService{ID: "web-1", Service: "web"},
}
webSN := structs.NewServiceName("web", nil)
testutil.RunStep(t, "a duplicate is omitted even if mixed", func(t *testing.T) {
pending := &pendingPayload{}
require.NoError(t, pending.Add(
meshGatewayPayloadID,
subMeshGateway+partition,
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
},
},
))
require.NoError(t, pending.Add(
servicePayloadIDPrefix+webSN.String(),
subExportedService+webSN.String(),
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(webNode1).(*pbservice.CheckServiceNode),
},
},
))
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 1)
evt := got[0]
require.Equal(t, subExportedService+webSN.String(), evt.CorrelationID)
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1)
})
meshNode2 := &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "bar"},
Service: &pbservice.NodeService{ID: "mgw-2", Service: "mgw", Kind: "mesh-gateway"},
}
testutil.RunStep(t, "an update to an existing item is published", func(t *testing.T) {
pending := &pendingPayload{}
require.NoError(t, pending.Add(
meshGatewayPayloadID,
subMeshGateway+partition,
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
proto.Clone(meshNode2).(*pbservice.CheckServiceNode),
},
},
))
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 1)
evt := got[0]
require.Equal(t, subMeshGateway+partition, evt.CorrelationID)
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 2)
})
}
func drainEvents(t *testing.T, ch <-chan cache.UpdateEvent) []cache.UpdateEvent {
var out []cache.UpdateEvent
for {
select {
case evt, ok := <-ch:
if !ok {
return out
}
out = append(out, evt)
case <-time.After(100 * time.Millisecond):
t.Fatalf("channel did not close in time")
}
}
}
func testNewSubscriptionState(partition string) (
*subscriptionState,
chan cache.UpdateEvent,
) {
var (
publicUpdateCh = make(chan cache.UpdateEvent, 1)
)
state := newSubscriptionState(partition)
state.publicUpdateCh = publicUpdateCh
return state, publicUpdateCh
}

18
agent/rpc/peering/subscription_view.go

@ -24,12 +24,10 @@ type exportedServiceRequest struct {
sub Subscriber
}
func newExportedServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest {
func newExportedStandardServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest {
req := structs.ServiceSpecificRequest{
// TODO(peering): Need to subscribe to both Connect and not
Connect: false,
ServiceName: svc.Name,
Connect: false,
EnterpriseMeta: svc.EnterpriseMeta,
}
return &exportedServiceRequest{
@ -46,10 +44,12 @@ func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo {
// NewMaterializer implements submatview.Request
func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) {
if e.req.Connect {
return nil, fmt.Errorf("connect views are not supported")
}
reqFn := func(index uint64) *pbsubscribe.SubscribeRequest {
// TODO(peering): We need to be able to receive both connect proxies and typical service instances for a given name.
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
r := &pbsubscribe.SubscribeRequest{
return &pbsubscribe.SubscribeRequest{
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
Topic: pbsubscribe.Topic_ServiceHealth,
Key: e.req.ServiceName,
Token: e.req.Token,
@ -58,10 +58,6 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
Namespace: e.req.EnterpriseMeta.NamespaceOrEmpty(),
Partition: e.req.EnterpriseMeta.PartitionOrEmpty(),
}
if e.req.Connect {
r.Topic = pbsubscribe.Topic_ServiceHealthConnect
}
return r
}
deps := submatview.Deps{
View: newExportedServicesView(),

105
agent/rpc/peering/subscription_view_test.go

@ -2,6 +2,7 @@ package peering
import (
"context"
"fmt"
"math/rand"
"sort"
"sync"
@ -38,87 +39,36 @@ func TestExportedServiceSubscription(t *testing.T) {
apiSN := structs.NewServiceName("api", nil)
webSN := structs.NewServiceName("web", nil)
newRegisterHealthEvent := func(id, service string) stream.Event {
return stream.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
ID: id,
Service: service,
},
},
},
}
}
// List of updates to the state store:
// - api: {register api-1, register api-2, register api-3}
// - web: {register web-1, deregister web-1, register web-2}1
events := []map[string]stream.Event{
{
apiSN.String(): stream.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
},
},
},
},
webSN.String(): stream.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
ID: "web-1",
Service: "web",
},
},
},
},
apiSN.String(): newRegisterHealthEvent("api-1", "api"),
webSN.String(): newRegisterHealthEvent("web-1", "web"),
},
{
apiSN.String(): stream.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
ID: "api-2",
Service: "api",
},
},
},
},
webSN.String(): stream.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
ID: "web-1",
Service: "web",
},
},
},
},
apiSN.String(): newRegisterHealthEvent("api-2", "api"),
webSN.String(): newRegisterHealthEvent("web-1", "web"),
},
{
apiSN.String(): stream.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
ID: "api-3",
Service: "api",
},
},
},
},
webSN.String(): stream.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
ID: "web-2",
Service: "web",
},
},
},
},
apiSN.String(): newRegisterHealthEvent("api-3", "api"),
webSN.String(): newRegisterHealthEvent("web-2", "web"),
},
}
@ -224,9 +174,10 @@ func (s *store) simulateUpdates(ctx context.Context, events []map[string]stream.
switch payload.Op {
case pbsubscribe.CatalogOp_Register:
svcState.current[payload.Value.Service.ID] = payload.Value
default:
// If not a registration it must be a deregistration:
case pbsubscribe.CatalogOp_Deregister:
delete(svcState.current, payload.Value.Service.ID)
default:
panic(fmt.Sprintf("unable to handle op type %v", payload.Op))
}
svcState.idsByIndex[idx] = serviceIDsFromMap(svcState.current)
@ -305,7 +256,11 @@ func (c *consumer) consume(ctx context.Context, service string, countExpected in
updateCh := make(chan cache.UpdateEvent, 10)
group.Go(func() error {
sr := newExportedServiceRequest(hclog.New(nil), structs.NewServiceName(service, nil), c.publisher)
sr := newExportedStandardServiceRequest(
hclog.New(nil),
structs.NewServiceName(service, nil),
c.publisher,
)
return c.viewStore.Notify(gctx, sr, "", updateCh)
})
group.Go(func() error {

27
agent/structs/peering.go

@ -13,3 +13,30 @@ type PeeredService struct {
Name ServiceName
PeerName string
}
// NOTE: this is not serialized via msgpack so it can be changed without concern.
type ExportedServiceList struct {
// Services is a list of exported services that apply to both standard
// service discovery and service mesh.
Services []ServiceName
// DiscoChains is a list of exported service that ONLY apply to service mesh.
DiscoChains []ServiceName
}
// ListAllDiscoveryChains returns all discovery chains (union of Services and
// DiscoChains).
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]struct{} {
chainsByName := make(map[ServiceName]struct{})
if list == nil {
return chainsByName
}
for _, svc := range list.Services {
chainsByName[svc] = struct{}{}
}
for _, chainName := range list.DiscoChains {
chainsByName[chainName] = struct{}{}
}
return chainsByName
}

12
lib/maps/maps.go

@ -0,0 +1,12 @@
package maps
func SliceOfKeys[K comparable, V any](m map[K]V) []K {
if len(m) == 0 {
return nil
}
res := make([]K, 0, len(m))
for k := range m {
res = append(res, k)
}
return res
}

41
lib/maps/maps_test.go

@ -0,0 +1,41 @@
package maps
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestSliceOfKeys(t *testing.T) {
t.Run("string to int", func(t *testing.T) {
m := make(map[string]int)
require.Equal(t, []string(nil), SliceOfKeys(m))
m["foo"] = 5
m["bar"] = 6
require.ElementsMatch(t, []string{"foo", "bar"}, SliceOfKeys(m))
})
type blah struct {
V string
}
t.Run("int to struct", func(t *testing.T) {
m := make(map[int]blah)
require.Equal(t, []int(nil), SliceOfKeys(m))
m[5] = blah{V: "foo"}
m[6] = blah{V: "bar"}
require.ElementsMatch(t, []int{5, 6}, SliceOfKeys(m))
})
type id struct {
Name string
}
t.Run("struct to struct pointer", func(t *testing.T) {
m := make(map[id]*blah)
require.Equal(t, []id(nil), SliceOfKeys(m))
m[id{Name: "foo"}] = &blah{V: "oof"}
m[id{Name: "bar"}] = &blah{V: "rab"}
require.ElementsMatch(t, []id{{Name: "foo"}, {Name: "bar"}}, SliceOfKeys(m))
})
}
Loading…
Cancel
Save