Fix issue with peer stream node cleanup.

This commit encompasses a few problems that are closely related due to their
proximity in the code.

1. The peerstream utilizes node IDs in several locations to determine which
nodes / services / checks should be cleaned up or created. While VM deployments
with agents will likely always have a node ID, agentless uses synthetic nodes
and does not populate the field. This means that for consul-k8s deployments, all
services were likely bundled together into the same synthetic node in some code
paths (but not all), resulting in strange behavior. The Node.Node field should
be used instead as a unique identifier, as it should always be populated.

2. The peerstream cleanup process for unused nodes uses an incorrect query for
node deregistration. This query is NOT namespace aware and results in the node
(and corresponding services) being deregistered prematurely whenever it has zero
default-namespace services and 1+ non-default-namespace services registered on
it. This issue is tricky to find due to the incorrect logic mentioned in #1,
combined with the fact that the affected services must be co-located on the same
node as the currently deregistering service for this to be encountered.

3. The stream tracker did not understand differences between services in
different namespaces and could therefore report incorrect numbers. It was
updated to utilize the full service name to avoid conflicts and return proper
results.
pull/17235/head
Derek Menteer 2023-05-08 08:16:28 -05:00
parent 166d7a39e8
commit 61a281a4d8
4 changed files with 198 additions and 168 deletions

View File

@ -13,6 +13,7 @@ import (
newproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
@ -341,7 +342,7 @@ func (s *Server) handleUpdateService(
for _, nodeSnap := range snap.Nodes {
// First register the node - skip the unchanged ones
changed := true
if storedNode, ok := storedNodesMap[nodeSnap.Node.ID]; ok {
if storedNode, ok := storedNodesMap[nodeSnap.Node.Node]; ok {
if storedNode.IsSame(nodeSnap.Node) {
changed = false
}
@ -357,7 +358,7 @@ func (s *Server) handleUpdateService(
// Then register all services on that node - skip the unchanged ones
for _, svcSnap := range nodeSnap.Services {
changed = true
if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.ID, svcSnap.Service.ID)]; ok {
if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.Node, svcSnap.Service.ID)]; ok {
if storedSvcInst.IsSame(svcSnap.Service) {
changed = false
}
@ -377,7 +378,7 @@ func (s *Server) handleUpdateService(
for _, svcSnap := range nodeSnap.Services {
for _, c := range svcSnap.Checks {
changed := true
if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.ID, svcSnap.Service.ID, c.CheckID)]; ok {
if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.Node, svcSnap.Service.ID, c.CheckID)]; ok {
if chk.IsSame(c) {
changed = false
}
@ -515,8 +516,10 @@ func (s *Server) handleUpdateService(
// Delete any nodes that do not have any other services registered on them.
for node := range unusedNodes {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
_, ns, err := s.GetStore().NodeServices(nil, node, nodeMeta, peerName)
// The wildcard is used here so that all services, regardless of namespace are returned
// by the following query. Without this, the node might accidentally be cleaned up early.
wildcardNSMeta := acl.NewEnterpriseMetaWithPartition(sn.PartitionOrDefault(), acl.WildcardName)
_, ns, err := s.GetStore().NodeServiceList(nil, node, &wildcardNSMeta, peerName)
if err != nil {
return fmt.Errorf("failed to query services on node: %w", err)
}
@ -529,10 +532,10 @@ func (s *Server) handleUpdateService(
err = s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: node,
PeerName: peerName,
EnterpriseMeta: *nodeMeta,
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()),
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node)
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", sn.PartitionOrDefault(), peerName, node)
return fmt.Errorf("failed to deregister node %q: %w", ident, err)
}
}
@ -635,31 +638,35 @@ type nodeCheckIdentity struct {
checkID string
}
func makeNodeSvcInstID(nodeID types.NodeID, serviceID string) nodeSvcInstIdentity {
func makeNodeSvcInstID(node string, serviceID string) nodeSvcInstIdentity {
return nodeSvcInstIdentity{
nodeID: string(nodeID),
nodeID: node,
serviceID: serviceID,
}
}
func makeNodeCheckID(nodeID types.NodeID, serviceID string, checkID types.CheckID) nodeCheckIdentity {
func makeNodeCheckID(node string, serviceID string, checkID types.CheckID) nodeCheckIdentity {
return nodeCheckIdentity{
serviceID: serviceID,
checkID: string(checkID),
nodeID: string(nodeID),
nodeID: node,
}
}
func buildStoredMap(storedInstances structs.CheckServiceNodes) (map[types.NodeID]*structs.Node, map[nodeSvcInstIdentity]*structs.NodeService, map[nodeCheckIdentity]*structs.HealthCheck) {
nodesMap := map[types.NodeID]*structs.Node{}
func buildStoredMap(storedInstances structs.CheckServiceNodes) (
map[string]*structs.Node,
map[nodeSvcInstIdentity]*structs.NodeService,
map[nodeCheckIdentity]*structs.HealthCheck,
) {
nodesMap := map[string]*structs.Node{}
svcInstMap := map[nodeSvcInstIdentity]*structs.NodeService{}
checksMap := map[nodeCheckIdentity]*structs.HealthCheck{}
for _, csn := range storedInstances {
nodesMap[csn.Node.ID] = csn.Node
svcInstMap[makeNodeSvcInstID(csn.Node.ID, csn.Service.ID)] = csn.Service
nodesMap[csn.Node.Node] = csn.Node
svcInstMap[makeNodeSvcInstID(csn.Node.Node, csn.Service.ID)] = csn.Service
for _, chk := range csn.Checks {
checksMap[makeNodeCheckID(csn.Node.ID, csn.Service.ID, chk.CheckID)] = chk
checksMap[makeNodeCheckID(csn.Node.Node, csn.Service.ID, chk.CheckID)] = chk
}
}
return nodesMap, svcInstMap, checksMap

View File

@ -122,7 +122,7 @@ type StateStore interface {
ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error)
NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error)
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error)

View File

@ -1594,7 +1594,11 @@ func Test_ExportedServicesCount(t *testing.T) {
mst, err := srv.Tracker.Connected(peerID)
require.NoError(t, err)
services := []string{"web", "api", "mongo"}
services := []string{
structs.NewServiceName("web", nil).String(),
structs.NewServiceName("api", nil).String(),
structs.NewServiceName("mongo", nil).String(),
}
update := cache.UpdateEvent{
CorrelationID: subExportedServiceList,
Result: &pbpeerstream.ExportedServiceList{
@ -1938,36 +1942,30 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test
}
}
func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) {
backend := c.Backend.(*testStreamBackend)
backend.leader = func() bool {
return false
}
})
type testCase struct {
name string
seed []*structs.RegisterRequest
input *pbpeerstream.ExportedService
expect map[string]structs.CheckServiceNodes
exportedServices []string
}
peerName := "billing"
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap"))
// "api" service is imported from the billing-ap partition, corresponding to the billing peer.
// Locally it is stored to the default partition.
defaultMeta := *acl.DefaultEnterpriseMeta()
apiSN := structs.NewServiceName("api", &defaultMeta)
type PeeringProcessResponse_testCase struct {
name string
seed []*structs.RegisterRequest
inputServiceName structs.ServiceName
input *pbpeerstream.ExportedService
expect map[structs.ServiceName]structs.CheckServiceNodes
exportedServices []string
}
func processResponse_ExportedServiceUpdates(
t *testing.T,
srv *testServer,
store *state.Store,
localEntMeta acl.EnterpriseMeta,
peerName string,
tests []PeeringProcessResponse_testCase,
) {
// create a peering in the state store
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
require.NoError(t, store.PeeringWrite(31, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
ID: peerID,
Name: peerName,
ID: peerID,
Name: peerName,
Partition: localEntMeta.PartitionOrDefault(),
},
}))
@ -1975,7 +1973,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
mst, err := srv.Tracker.Connected(peerID)
require.NoError(t, err)
run := func(t *testing.T, tc testCase) {
run := func(t *testing.T, tc PeeringProcessResponse_testCase) {
// Seed the local catalog with some data to reconcile against.
// and increment the tracker's imported services count
var serviceNames []structs.ServiceName
@ -1989,14 +1987,14 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
in := &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: apiSN.String(),
ResourceID: tc.inputServiceName.String(),
Nonce: "1",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: makeAnyPB(t, tc.input),
}
// Simulate an update arriving for billing/api.
_, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in)
_, err = srv.processResponse(peerName, localEntMeta.PartitionOrDefault(), mst, in)
require.NoError(t, err)
if len(tc.exportedServices) > 0 {
@ -2009,63 +2007,81 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
}
// Simulate an update arriving for billing/api.
_, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, resp)
_, err = srv.processResponse(peerName, localEntMeta.PartitionOrDefault(), mst, resp)
require.NoError(t, err)
// Test the count and contents separately to ensure the count code path is hit.
require.Equal(t, mst.GetImportedServicesCount(), len(tc.exportedServices))
require.ElementsMatch(t, mst.ImportedServices, tc.exportedServices)
}
_, allServices, err := srv.GetStore().ServiceList(nil, &defaultMeta, peerName)
wildcardNS := acl.NewEnterpriseMetaWithPartition(localEntMeta.PartitionOrDefault(), acl.WildcardName)
_, allServices, err := srv.GetStore().ServiceList(nil, &wildcardNS, peerName)
require.NoError(t, err)
// This ensures that only services specified under tc.expect are stored. It includes
// all exported services plus their sidecar proxies.
for _, svc := range allServices {
_, ok := tc.expect[svc.Name]
_, ok := tc.expect[svc]
require.True(t, ok)
}
for svc, expect := range tc.expect {
t.Run(svc, func(t *testing.T) {
_, got, err := srv.GetStore().CheckServiceNodes(nil, svc, &defaultMeta, peerName)
t.Run(svc.String(), func(t *testing.T) {
_, got, err := srv.GetStore().CheckServiceNodes(nil, svc.Name, &svc.EnterpriseMeta, peerName)
require.NoError(t, err)
requireEqualInstances(t, expect, got)
})
}
}
tt := []testCase{
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
peerName := "billing"
localEntMeta := *acl.DefaultEnterpriseMeta()
remoteMeta := *structs.DefaultEnterpriseMetaInPartition("billing-ap")
pbRemoteMeta := pbcommon.NewEnterpriseMetaFromStructs(remoteMeta)
apiLocalSN := structs.NewServiceName("api", &localEntMeta)
redisLocalSN := structs.NewServiceName("redis", &localEntMeta)
tests := []PeeringProcessResponse_testCase{
{
name: "upsert two service instances to the same node",
exportedServices: []string{"api"},
exportedServices: []string{apiLocalSN.String()},
inputServiceName: structs.NewServiceName("api", &remoteMeta),
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: remoteMeta.Partition,
Partition: pbRemoteMeta.Partition,
PeerName: peerName,
},
Service: &pbservice.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
Checks: []*pbservice.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
Node: "node-foo",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
},
@ -2074,42 +2090,42 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Node: &pbservice.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: remoteMeta.Partition,
Partition: pbRemoteMeta.Partition,
PeerName: peerName,
},
Service: &pbservice.NodeService{
ID: "api-2",
Service: "api",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
Checks: []*pbservice.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
{
CheckID: "api-2-check",
ServiceID: "api-2",
Node: "node-foo",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
},
},
},
},
expect: map[string]structs.CheckServiceNodes{
"api": {
expect: map[structs.ServiceName]structs.CheckServiceNodes{
structs.NewServiceName("api", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
// The remote billing-ap partition is overwritten for all resources with the local default.
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
// The name of the peer "billing" is attached as well.
PeerName: peerName,
@ -2117,21 +2133,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
@ -2140,27 +2156,27 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "api-2",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
{
CheckID: "api-2-check",
ServiceID: "api-2",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
@ -2170,7 +2186,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
{
name: "deleting a service with an empty exported service event",
exportedServices: []string{"api"},
exportedServices: []string{apiLocalSN.String()},
seed: []*structs.RegisterRequest{
{
ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"),
@ -2179,7 +2195,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-2",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2197,41 +2213,43 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
},
input: &pbpeerstream.ExportedService{},
expect: map[string]structs.CheckServiceNodes{
"api": {},
inputServiceName: structs.NewServiceName("api", &remoteMeta),
input: &pbpeerstream.ExportedService{},
expect: map[structs.ServiceName]structs.CheckServiceNodes{
structs.NewServiceName("api", &localEntMeta): {},
},
},
{
name: "upsert two service instances to different nodes",
exportedServices: []string{"api"},
exportedServices: []string{apiLocalSN.String()},
inputServiceName: structs.NewServiceName("api", &remoteMeta),
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: remoteMeta.Partition,
Partition: pbRemoteMeta.Partition,
PeerName: peerName,
},
Service: &pbservice.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
Checks: []*pbservice.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
Node: "node-foo",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
},
@ -2240,60 +2258,60 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Node: &pbservice.Node{
ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2",
Node: "node-bar",
Partition: remoteMeta.Partition,
Partition: pbRemoteMeta.Partition,
PeerName: peerName,
},
Service: &pbservice.NodeService{
ID: "api-2",
Service: "api",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
Checks: []*pbservice.HealthCheck{
{
CheckID: "node-bar-check",
Node: "node-bar",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
{
CheckID: "api-2-check",
ServiceID: "api-2",
Node: "node-bar",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
},
},
},
},
expect: map[string]structs.CheckServiceNodes{
"api": {
expect: map[structs.ServiceName]structs.CheckServiceNodes{
structs.NewServiceName("api", &localEntMeta): {
{
Node: &structs.Node{
ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2",
Node: "node-bar",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "api-2",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-bar-check",
Node: "node-bar",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
{
CheckID: "api-2-check",
ServiceID: "api-2",
Node: "node-bar",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
@ -2304,7 +2322,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Node: "node-foo",
// The remote billing-ap partition is overwritten for all resources with the local default.
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
// The name of the peer "billing" is attached as well.
PeerName: peerName,
@ -2312,21 +2330,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
@ -2336,7 +2354,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
{
name: "deleting one service name from a node does not delete other service names",
exportedServices: []string{"api", "redis"},
exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()},
seed: []*structs.RegisterRequest{
{
ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"),
@ -2345,7 +2363,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "redis-2",
Service: "redis",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2369,7 +2387,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2387,37 +2405,38 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
},
inputServiceName: structs.NewServiceName("api", &remoteMeta),
// Nil input is for the "api" service.
input: &pbpeerstream.ExportedService{},
expect: map[string]structs.CheckServiceNodes{
"api": {},
expect: map[structs.ServiceName]structs.CheckServiceNodes{
structs.NewServiceName("api", &localEntMeta): {},
// Existing redis service was not affected by deletion.
"redis": {
structs.NewServiceName("redis", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "redis-2",
Service: "redis",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
{
CheckID: "redis-2-check",
ServiceID: "redis-2",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
@ -2435,7 +2454,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "redis-2",
Service: "redis",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2459,7 +2478,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "redis-2-sidecar-proxy",
Service: "redis-sidecar-proxy",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2483,7 +2502,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2507,7 +2526,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-1-sidecar-proxy",
Service: "api-sidecar-proxy",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2526,68 +2545,69 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
},
inputServiceName: structs.NewServiceName("api", &remoteMeta),
// Nil input is for the "api" service.
input: &pbpeerstream.ExportedService{},
exportedServices: []string{"redis"},
expect: map[string]structs.CheckServiceNodes{
exportedServices: []string{redisLocalSN.String()},
expect: map[structs.ServiceName]structs.CheckServiceNodes{
// Existing redis service was not affected by deletion.
"redis": {
structs.NewServiceName("redis", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "redis-2",
Service: "redis",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
{
CheckID: "redis-2-check",
ServiceID: "redis-2",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
},
},
"redis-sidecar-proxy": {
structs.NewServiceName("redis-sidecar-proxy", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "redis-2-sidecar-proxy",
Service: "redis-sidecar-proxy",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-foo-check",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
{
CheckID: "redis-2-sidecar-proxy-check",
ServiceID: "redis-2-sidecar-proxy",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
@ -2597,7 +2617,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
{
name: "service checks are cleaned up when not present in a response",
exportedServices: []string{"api"},
exportedServices: []string{apiLocalSN.String()},
seed: []*structs.RegisterRequest{
{
ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"),
@ -2606,7 +2626,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2624,19 +2644,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
},
inputServiceName: structs.NewServiceName("api", &remoteMeta),
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: remoteMeta.Partition,
Partition: pbRemoteMeta.Partition,
PeerName: peerName,
},
Service: &pbservice.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
Checks: []*pbservice.HealthCheck{
@ -2645,20 +2666,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
},
expect: map[string]structs.CheckServiceNodes{
expect: map[structs.ServiceName]structs.CheckServiceNodes{
// Service check should be gone
"api": {
structs.NewServiceName("api", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{},
@ -2668,7 +2689,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
{
name: "node checks are cleaned up when not present in a response",
exportedServices: []string{"api", "redis"},
exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()},
seed: []*structs.RegisterRequest{
{
ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"),
@ -2677,7 +2698,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "redis-2",
Service: "redis",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2701,7 +2722,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2719,19 +2740,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
},
inputServiceName: structs.NewServiceName("api", &remoteMeta),
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: remoteMeta.Partition,
Partition: pbRemoteMeta.Partition,
PeerName: peerName,
},
Service: &pbservice.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
Checks: []*pbservice.HealthCheck{
@ -2740,27 +2762,27 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
CheckID: "api-1-check",
ServiceID: "api-1",
Node: "node-foo",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
},
},
},
},
expect: map[string]structs.CheckServiceNodes{
expect: map[structs.ServiceName]structs.CheckServiceNodes{
// Node check should be gone
"api": {
structs.NewServiceName("api", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
@ -2768,24 +2790,24 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
CheckID: "api-1-check",
ServiceID: "api-1",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
},
},
"redis": {
structs.NewServiceName("redis", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "redis-2",
Service: "redis",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
@ -2793,7 +2815,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
CheckID: "redis-2-check",
ServiceID: "redis-2",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
@ -2803,7 +2825,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
{
name: "replacing a service instance on a node cleans up the old instance",
exportedServices: []string{"api", "redis"},
exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()},
seed: []*structs.RegisterRequest{
{
ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"),
@ -2812,7 +2834,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "redis-2",
Service: "redis",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2836,7 +2858,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
Service: &structs.NodeService{
ID: "api-1",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: structs.HealthChecks{
@ -2854,20 +2876,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
},
inputServiceName: structs.NewServiceName("api", &remoteMeta),
input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{
{
Node: &pbservice.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: remoteMeta.Partition,
Partition: pbRemoteMeta.Partition,
PeerName: peerName,
},
// New service ID and checks for the api service.
Service: &pbservice.NodeService{
ID: "new-api-v2",
Service: "api",
EnterpriseMeta: remoteMeta,
EnterpriseMeta: pbRemoteMeta,
PeerName: peerName,
},
Checks: []*pbservice.HealthCheck{
@ -2886,19 +2909,19 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
},
expect: map[string]structs.CheckServiceNodes{
"api": {
expect: map[structs.ServiceName]structs.CheckServiceNodes{
structs.NewServiceName("api", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "new-api-v2",
Service: "api",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
@ -2911,24 +2934,24 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
CheckID: "new-api-v2-check",
ServiceID: "new-api-v2",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
},
},
"redis": {
structs.NewServiceName("redis", &localEntMeta): {
{
Node: &structs.Node{
ID: "af913374-68ea-41e5-82e8-6ffd3dffc461",
Node: "node-foo",
Partition: defaultMeta.PartitionOrEmpty(),
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: peerName,
},
Service: &structs.NodeService{
ID: "redis-2",
Service: "redis",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
Checks: []*structs.HealthCheck{
@ -2941,7 +2964,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
CheckID: "redis-2-check",
ServiceID: "redis-2",
Node: "node-foo",
EnterpriseMeta: defaultMeta,
EnterpriseMeta: localEntMeta,
PeerName: peerName,
},
},
@ -2950,12 +2973,13 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
srv, store := newTestServer(t, func(c *Config) {
backend := c.Backend.(*testStreamBackend)
backend.leader = func() bool {
return false
}
})
processResponse_ExportedServiceUpdates(t, srv, store, localEntMeta, peerName, tests)
}
// TestLogTraceProto tests that all PB trace log helpers redact the

View File

@ -372,9 +372,8 @@ func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName)
defer s.mu.Unlock()
s.ImportedServices = make([]string, len(serviceNames))
for i, sn := range serviceNames {
s.ImportedServices[i] = sn.Name
s.ImportedServices[i] = sn.String()
}
}
@ -392,7 +391,7 @@ func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName)
s.ExportedServices = make([]string, len(serviceNames))
for i, sn := range serviceNames {
s.ExportedServices[i] = sn.Name
s.ExportedServices[i] = sn.String()
}
}