@ -43,7 +43,6 @@ import (
const (
testPeerID = "caf067a6-f112-4907-9101-d45857d2b149"
testActiveStreamSecretID = "e778c518-f0db-473a-9224-24b357da971d"
testPendingStreamSecretID = "522c0daf-2ef2-4dab-bc78-5e04e3daf552"
testEstablishmentSecretID = "f6569d37-1c5b-4415-aae5-26f4594f7f60"
)
@ -126,7 +125,7 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
// Receive a subscription from a peer. This message arrives while the
// server is a leader and should work.
testutil . RunStep ( t , "send subscription request to leader and consume its three requests" , func ( t * testing . T ) {
testutil . RunStep ( t , "send subscription request to leader and consume its four requests" , func ( t * testing . T ) {
sub := & pbpeerstream . ReplicationMessage {
Payload : & pbpeerstream . ReplicationMessage_Open_ {
Open : & pbpeerstream . ReplicationMessage_Open {
@ -149,6 +148,10 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
msg3 , err := client . Recv ( )
require . NoError ( t , err )
require . NotEmpty ( t , msg3 )
msg4 , err := client . Recv ( )
require . NoError ( t , err )
require . NotEmpty ( t , msg4 )
} )
// The ACK will be a new request but at this point the server is not the
@ -514,13 +517,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
client := makeClient ( t , srv , testPeerID )
// TODO(peering): test fails if we don't drain the stream with this call because the
// server gets blocked sending the termination message. Figure out a way to let
// messages queue and filter replication messages.
receiveRoots , err := client . Recv ( )
require . NoError ( t , err )
require . NotNil ( t , receiveRoots . GetResponse ( ) )
require . Equal ( t , pbpeerstream . TypeURLPeeringTrustBundle , receiveRoots . GetResponse ( ) . ResourceURL )
client . DrainStream ( t )
testutil . RunStep ( t , "new stream gets tracked" , func ( t * testing . T ) {
retry . Run ( t , func ( r * retry . R ) {
@ -559,7 +556,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
srv . Tracker . setClock ( it . Now )
// Set the initial roots and CA configuration.
_ , rootA := writeInitialRootsAndCA ( t , store )
writeInitialRootsAndCA ( t , store )
p := writePeeringToBeDialed ( t , store , 1 , "my-peer" )
require . Empty ( t , p . PeerID , "should be empty if being dialed" )
@ -575,6 +572,14 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
} )
var lastSendAck time . Time
var lastSendSuccess time . Time
client . DrainStream ( t )
// Manually grab the last success time from sending the trust bundle or exported services list.
status , ok := srv . StreamStatus ( testPeerID )
require . True ( t , ok )
lastSendSuccess = status . LastSendSuccess
testutil . RunStep ( t , "ack tracked as success" , func ( t * testing . T ) {
ack := & pbpeerstream . ReplicationMessage {
@ -590,12 +595,15 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}
lastSendAck = it . FutureNow ( 1 )
err := client . Send ( ack )
require . NoError ( t , err )
expect := Status {
Connected : true ,
LastAck : lastSendAck ,
Connected : true ,
LastSendSuccess : lastSendSuccess ,
LastAck : lastSendAck ,
ExportedServices : [ ] string { } ,
}
retry . Run ( t , func ( r * retry . R ) {
@ -630,10 +638,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
lastNackMsg = "client peer was unable to apply resource: bad bad not good"
expect := Status {
Connected : true ,
LastAck : lastSendAck ,
LastNack : lastNack ,
LastNackMessage : lastNackMsg ,
Connected : true ,
LastSendSuccess : lastSendSuccess ,
LastAck : lastSendAck ,
LastNack : lastNack ,
LastNackMessage : lastNackMsg ,
ExportedServices : [ ] string { } ,
}
retry . Run ( t , func ( r * retry . R ) {
@ -661,27 +671,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
err := client . Send ( resp )
require . NoError ( t , err )
expectRoots := & pbpeerstream . ReplicationMessage {
Payload : & pbpeerstream . ReplicationMessage_Response_ {
Response : & pbpeerstream . ReplicationMessage_Response {
ResourceURL : pbpeerstream . TypeURLPeeringTrustBundle ,
ResourceID : "roots" ,
Resource : makeAnyPB ( t , & pbpeering . PeeringTrustBundle {
TrustDomain : connect . TestTrustDomain ,
RootPEMs : [ ] string { rootA . RootCert } ,
} ) ,
Operation : pbpeerstream . Operation_OPERATION_UPSERT ,
} ,
} ,
}
roots , err := client . Recv ( )
require . NoError ( t , err )
prototest . AssertDeepEqual ( t , expectRoots , roots )
ack , err := client . Recv ( )
require . NoError ( t , err )
expectAck := & pbpeerstream . ReplicationMessage {
Payload : & pbpeerstream . ReplicationMessage_Request_ {
Request : & pbpeerstream . ReplicationMessage_Request {
@ -690,19 +679,24 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
} ,
} ,
}
prototest . AssertDeepEqual ( t , expectAck , ack )
api := structs . NewServiceName ( "api" , nil )
retry . Run ( t , func ( r * retry . R ) {
msg , err := client . Recv ( )
require . NoError ( r , err )
req := msg . GetRequest ( )
require . NotNil ( r , req )
require . Equal ( r , pbpeerstream . TypeURLExportedService , req . ResourceURL )
prototest . AssertDeepEqual ( t , expectAck , msg )
} )
expect := Status {
Connected : true ,
LastSendSuccess : lastSendSuccess ,
LastAck : lastSendAck ,
LastNack : lastNack ,
LastNackMessage : lastNackMsg ,
LastRecvResourceSuccess : lastRecvResourceSuccess ,
ImportedServices : map [ string ] struct { } {
api . String ( ) : { } ,
} ,
ExportedServices : [ ] string { } ,
}
retry . Run ( t , func ( r * retry . R ) {
@ -751,19 +745,16 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
lastRecvErrorMsg = ` unsupported operation: "OPERATION_UNSPECIFIED" `
api := structs . NewServiceName ( "api" , nil )
expect := Status {
Connected : true ,
LastSendSuccess : lastSendSuccess ,
LastAck : lastSendAck ,
LastNack : lastNack ,
LastNackMessage : lastNackMsg ,
LastRecvResourceSuccess : lastRecvResourceSuccess ,
LastRecvError : lastRecvError ,
LastRecvErrorMessage : lastRecvErrorMsg ,
ImportedServices : map [ string ] struct { } {
api . String ( ) : { } ,
} ,
ExportedServices : [ ] string { } ,
}
retry . Run ( t , func ( r * retry . R ) {
@ -783,10 +774,10 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
lastRecvHeartbeat = it . FutureNow ( 1 )
err := client . Send ( resp )
require . NoError ( t , err )
api := structs . NewServiceName ( "api" , nil )
expect := Status {
Connected : true ,
LastSendSuccess : lastSendSuccess ,
LastAck : lastSendAck ,
LastNack : lastNack ,
LastNackMessage : lastNackMsg ,
@ -794,9 +785,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
LastRecvError : lastRecvError ,
LastRecvErrorMessage : lastRecvErrorMsg ,
LastRecvHeartbeat : lastRecvHeartbeat ,
ImportedServices : map [ string ] struct { } {
api . String ( ) : { } ,
} ,
ExportedServices : [ ] string { } ,
}
retry . Run ( t , func ( r * retry . R ) {
@ -813,11 +802,10 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
client . Close ( )
api := structs . NewServiceName ( "api" , nil )
expect := Status {
Connected : false ,
DisconnectErrorMessage : lastRecvErrorMsg ,
LastSendSuccess : lastSendSuccess ,
LastAck : lastSendAck ,
LastNack : lastNack ,
LastNackMessage : lastNackMsg ,
@ -826,9 +814,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
LastRecvError : lastRecvError ,
LastRecvErrorMessage : lastRecvErrorMsg ,
LastRecvHeartbeat : lastRecvHeartbeat ,
ImportedServices : map [ string ] struct { } {
api . String ( ) : { } ,
} ,
ExportedServices : [ ] string { } ,
}
retry . Run ( t , func ( r * retry . R ) {
@ -890,14 +876,14 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
{
Name : "mysql" ,
Consumers : [ ] structs . ServiceConsumer {
{ PeerName : "my-peering" } ,
{ Peer : "my-peering" } ,
} ,
} ,
{
// Mongo does not get pushed because it does not have instances registered.
Name : "mongo" ,
Consumers : [ ] structs . ServiceConsumer {
{ PeerName : "my-peering" } ,
{ Peer : "my-peering" } ,
} ,
} ,
} ,
@ -908,6 +894,9 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require . NoError ( t , store . EnsureConfigEntry ( lastIdx , entry ) )
expectReplEvents ( t , client ,
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
require . Equal ( t , pbpeerstream . TypeURLPeeringServerAddresses , msg . GetRequest ( ) . ResourceURL )
} ,
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
require . Equal ( t , pbpeerstream . TypeURLPeeringTrustBundle , msg . GetResponse ( ) . ResourceURL )
// Roots tested in TestStreamResources_Server_CARootUpdates
@ -916,15 +905,21 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
// no mongo instances exist
require . Equal ( t , pbpeerstream . TypeURLExportedService , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , mongoSN , msg . GetResponse ( ) . ResourceID )
require . Equal ( t , pbpeerstream . Operation_OPERATION_DELETE , msg . GetResponse ( ) . Operation )
require . Nil ( t , msg . GetResponse ( ) . Resource )
require . Equal ( t , pbpeerstream . Operation_OPERATION_UPSERT , msg . GetResponse ( ) . Operation )
var nodes pbpeerstream . ExportedService
require . NoError ( t , msg . GetResponse ( ) . Resource . UnmarshalTo ( & nodes ) )
require . Len ( t , nodes . Nodes , 0 )
} ,
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
// proxies can't export because no mesh gateway exists yet
require . Equal ( t , pbpeerstream . TypeURLExportedService , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , mongoProxySN , msg . GetResponse ( ) . ResourceID )
require . Equal ( t , pbpeerstream . Operation_OPERATION_DELETE , msg . GetResponse ( ) . Operation )
require . Nil ( t , msg . GetResponse ( ) . Resource )
require . Equal ( t , pbpeerstream . Operation_OPERATION_UPSERT , msg . GetResponse ( ) . Operation )
var nodes pbpeerstream . ExportedService
require . NoError ( t , msg . GetResponse ( ) . Resource . UnmarshalTo ( & nodes ) )
require . Len ( t , nodes . Nodes , 0 )
} ,
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
require . Equal ( t , pbpeerstream . TypeURLExportedService , msg . GetResponse ( ) . ResourceURL )
@ -939,8 +934,33 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
// proxies can't export because no mesh gateway exists yet
require . Equal ( t , pbpeerstream . TypeURLExportedService , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , mysqlProxySN , msg . GetResponse ( ) . ResourceID )
require . Equal ( t , pbpeerstream . Operation_OPERATION_DELETE , msg . GetResponse ( ) . Operation )
require . Nil ( t , msg . GetResponse ( ) . Resource )
require . Equal ( t , pbpeerstream . Operation_OPERATION_UPSERT , msg . GetResponse ( ) . Operation )
var nodes pbpeerstream . ExportedService
require . NoError ( t , msg . GetResponse ( ) . Resource . UnmarshalTo ( & nodes ) )
require . Len ( t , nodes . Nodes , 0 )
} ,
// This event happens because this is the first test case and there are
// no exported services when replication is initially set up.
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
require . Equal ( t , pbpeerstream . TypeURLExportedServiceList , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , subExportedServiceList , msg . GetResponse ( ) . ResourceID )
require . Equal ( t , pbpeerstream . Operation_OPERATION_UPSERT , msg . GetResponse ( ) . Operation )
var exportedServices pbpeerstream . ExportedServiceList
require . NoError ( t , msg . GetResponse ( ) . Resource . UnmarshalTo ( & exportedServices ) )
require . ElementsMatch ( t , [ ] string { } , exportedServices . Services )
} ,
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
require . Equal ( t , pbpeerstream . TypeURLExportedServiceList , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , subExportedServiceList , msg . GetResponse ( ) . ResourceID )
require . Equal ( t , pbpeerstream . Operation_OPERATION_UPSERT , msg . GetResponse ( ) . Operation )
var exportedServices pbpeerstream . ExportedServiceList
require . NoError ( t , msg . GetResponse ( ) . Resource . UnmarshalTo ( & exportedServices ) )
require . ElementsMatch ( t ,
[ ] string { structs . ServiceName { Name : "mongo" } . String ( ) , structs . ServiceName { Name : "mysql" } . String ( ) } ,
exportedServices . Services )
} ,
)
} )
@ -1019,7 +1039,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
} )
} )
testutil . RunStep ( t , "un-exporting mysql leads to a DELETE event for mysql " , func ( t * testing . T ) {
testutil . RunStep ( t , "un-exporting mysql leads to an exported service list update " , func ( t * testing . T ) {
entry := & structs . ExportedServicesConfigEntry {
Name : "default" ,
Services : [ ] structs . ExportedService {
@ -1027,7 +1047,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
Name : "mongo" ,
Consumers : [ ] structs . ServiceConsumer {
{
PeerName : "my-peering" ,
Peer : "my-peering" ,
} ,
} ,
} ,
@ -1042,23 +1062,30 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
retry . Run ( t , func ( r * retry . R ) {
msg , err := client . RecvWithTimeout ( 100 * time . Millisecond )
require . NoError ( r , err )
require . Equal ( r , pbpeerstream . Operation_OPERATION_DELETE , msg . GetResponse ( ) . Operation )
require . Equal ( r , mysql . Service . CompoundServiceName ( ) . String ( ) , msg . GetResponse ( ) . ResourceID )
require . Nil ( r , msg . GetResponse ( ) . Resource )
require . Equal ( r , pbpeerstream . TypeURLExportedServiceList , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , subExportedServiceList , msg . GetResponse ( ) . ResourceID )
require . Equal ( t , pbpeerstream . Operation_OPERATION_UPSERT , msg . GetResponse ( ) . Operation )
var exportedServices pbpeerstream . ExportedServiceList
require . NoError ( t , msg . GetResponse ( ) . Resource . UnmarshalTo ( & exportedServices ) )
require . Equal ( t , [ ] string { structs . ServiceName { Name : "mongo" } . String ( ) } , exportedServices . Services )
} )
} )
testutil . RunStep ( t , "deleting the config entry leads to a DELETE event for mongo" , func ( t * testing . T ) {
lastIdx ++
err := store . DeleteConfigEntry ( lastIdx , structs . ExportedServices , "default" , nil )
require . NoError ( t , err )
retry . Run ( t , func ( r * retry . R ) {
msg , err := client . RecvWithTimeout ( 100 * time . Millisecond )
require . NoError ( r , err )
require . Equal ( r , pbpeerstream . Operation_OPERATION_DELETE , msg . GetResponse ( ) . Operation )
require . Equal ( r , mongo . Service . CompoundServiceName ( ) . String ( ) , msg . GetResponse ( ) . ResourceID )
require . Nil ( r , msg . GetResponse ( ) . Resource )
require . Equal ( r , pbpeerstream . TypeURLExportedServiceList , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , subExportedServiceList , msg . GetResponse ( ) . ResourceID )
require . Equal ( t , pbpeerstream . Operation_OPERATION_UPSERT , msg . GetResponse ( ) . Operation )
var exportedServices pbpeerstream . ExportedServiceList
require . NoError ( t , msg . GetResponse ( ) . Resource . UnmarshalTo ( & exportedServices ) )
require . Len ( t , exportedServices . Services , 0 )
} )
} )
}
@ -1078,6 +1105,9 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
testutil . RunStep ( t , "initial CA Roots replication" , func ( t * testing . T ) {
expectReplEvents ( t , client ,
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
require . Equal ( t , pbpeerstream . TypeURLPeeringServerAddresses , msg . GetRequest ( ) . ResourceURL )
} ,
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
require . Equal ( t , pbpeerstream . TypeURLPeeringTrustBundle , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , "roots" , msg . GetResponse ( ) . ResourceID )
@ -1090,6 +1120,15 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
expect := connect . SpiffeIDSigningForCluster ( clusterID ) . Host ( )
require . Equal ( t , expect , trustBundle . TrustDomain )
} ,
func ( t * testing . T , msg * pbpeerstream . ReplicationMessage ) {
require . Equal ( t , pbpeerstream . TypeURLExportedServiceList , msg . GetResponse ( ) . ResourceURL )
require . Equal ( t , subExportedServiceList , msg . GetResponse ( ) . ResourceID )
require . Equal ( t , pbpeerstream . Operation_OPERATION_UPSERT , msg . GetResponse ( ) . Operation )
var exportedServices pbpeerstream . ExportedServiceList
require . NoError ( t , msg . GetResponse ( ) . Resource . UnmarshalTo ( & exportedServices ) )
require . ElementsMatch ( t , [ ] string { } , exportedServices . Services )
} ,
)
} )
@ -1142,13 +1181,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
client := makeClient ( t , srv , testPeerID )
// TODO(peering): test fails if we don't drain the stream with this call because the
// server gets blocked sending the termination message. Figure out a way to let
// messages queue and filter replication messages.
receiveRoots , err := client . Recv ( )
require . NoError ( t , err )
require . NotNil ( t , receiveRoots . GetResponse ( ) )
require . Equal ( t , pbpeerstream . TypeURLPeeringTrustBundle , receiveRoots . GetResponse ( ) . ResourceURL )
client . DrainStream ( t )
testutil . RunStep ( t , "new stream gets tracked" , func ( t * testing . T ) {
retry . Run ( t , func ( r * retry . R ) {
@ -1190,16 +1223,10 @@ func TestStreamResources_Server_SendsHeartbeats(t *testing.T) {
client := makeClient ( t , srv , testPeerID )
// TODO(peering): test fails if we don't drain the stream with this call because the
// server gets blocked sending the termination message. Figure out a way to let
// messages queue and filter replication messages.
receiveRoots , err := client . Recv ( )
require . NoError ( t , err )
require . NotNil ( t , receiveRoots . GetResponse ( ) )
require . Equal ( t , pbpeerstream . TypeURLPeeringTrustBundle , receiveRoots . GetResponse ( ) . ResourceURL )
testutil . RunStep ( t , "new stream gets tracked" , func ( t * testing . T ) {
retry . Run ( t , func ( r * retry . R ) {
_ , err := client . Recv ( )
require . NoError ( r , err )
status , ok := srv . StreamStatus ( testPeerID )
require . True ( r , ok )
require . True ( r , status . Connected )
@ -1212,8 +1239,8 @@ func TestStreamResources_Server_SendsHeartbeats(t *testing.T) {
Wait : outgoingHeartbeatInterval / 2 ,
} , t , func ( r * retry . R ) {
heartbeat , err := client . Recv ( )
require . NoError ( t , err )
require . NotNil ( t , heartbeat . GetHeartbeat ( ) )
require . NoError ( r , err )
require . NotNil ( r , heartbeat . GetHeartbeat ( ) )
} )
} )
@ -1223,8 +1250,8 @@ func TestStreamResources_Server_SendsHeartbeats(t *testing.T) {
Wait : outgoingHeartbeatInterval / 2 ,
} , t , func ( r * retry . R ) {
heartbeat , err := client . Recv ( )
require . NoError ( t , err )
require . NotNil ( t , heartbeat . GetHeartbeat ( ) )
require . NoError ( r , err )
require . NotNil ( r , heartbeat . GetHeartbeat ( ) )
} )
} )
}
@ -1249,13 +1276,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) {
client := makeClient ( t , srv , testPeerID )
// TODO(peering): test fails if we don't drain the stream with this call because the
// server gets blocked sending the termination message. Figure out a way to let
// messages queue and filter replication messages.
receiveRoots , err := client . Recv ( )
require . NoError ( t , err )
require . NotNil ( t , receiveRoots . GetResponse ( ) )
require . Equal ( t , pbpeerstream . TypeURLPeeringTrustBundle , receiveRoots . GetResponse ( ) . ResourceURL )
client . DrainStream ( t )
testutil . RunStep ( t , "new stream gets tracked" , func ( t * testing . T ) {
retry . Run ( t , func ( r * retry . R ) {
@ -1494,7 +1515,7 @@ func (b *testStreamBackend) CatalogDeregister(req *structs.DeregisterRequest) er
return nil
}
func Test_makeServiceResponse_ ExportedServicesCount ( t * testing . T ) {
func Test_ExportedServicesCount ( t * testing . T ) {
peerName := "billing"
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
@ -1510,37 +1531,17 @@ func Test_makeServiceResponse_ExportedServicesCount(t *testing.T) {
mst , err := srv . Tracker . Connected ( peerID )
require . NoError ( t , err )
testutil . RunStep ( t , "simulate an update to export a service" , func ( t * testing . T ) {
update := cache . UpdateEvent {
CorrelationID : subExportedService + "api" ,
Result : & pbservice . IndexedCheckServiceNodes {
Nodes : [ ] * pbservice . CheckServiceNode {
{
Service : & pbservice . NodeService {
ID : "api-1" ,
Service : "api" ,
PeerName : peerName ,
} ,
} ,
} ,
} }
_ , err := makeServiceResponse ( mst , update )
require . NoError ( t , err )
require . Equal ( t , 1 , mst . GetExportedServicesCount ( ) )
} )
testutil . RunStep ( t , "simulate a delete for an exported service" , func ( t * testing . T ) {
update := cache . UpdateEvent {
CorrelationID : subExportedService + "api" ,
Result : & pbservice . IndexedCheckServiceNodes {
Nodes : [ ] * pbservice . CheckServiceNode { } ,
} }
_ , err := makeServiceResponse ( mst , update )
require . NoError ( t , err )
require . Equal ( t , 0 , mst . GetExportedServicesCount ( ) )
} )
services := [ ] string { "web" , "api" , "mongo" }
update := cache . UpdateEvent {
CorrelationID : subExportedServiceList ,
Result : & pbpeerstream . ExportedServiceList {
Services : services ,
} }
_ , err = makeExportedServiceListResponse ( mst , update )
require . NoError ( t , err )
// Test the count and contents separately to ensure the count code path is hit.
require . Equal ( t , 3 , mst . GetExportedServicesCount ( ) )
require . ElementsMatch ( t , services , mst . ExportedServices )
}
func Test_processResponse_Validation ( t * testing . T ) {
@ -1596,24 +1597,6 @@ func Test_processResponse_Validation(t *testing.T) {
} ,
wantErr : false ,
} ,
{
name : "valid delete" ,
in : & pbpeerstream . ReplicationMessage_Response {
ResourceURL : pbpeerstream . TypeURLExportedService ,
ResourceID : "api" ,
Nonce : "1" ,
Operation : pbpeerstream . Operation_OPERATION_DELETE ,
} ,
expect : & pbpeerstream . ReplicationMessage {
Payload : & pbpeerstream . ReplicationMessage_Request_ {
Request : & pbpeerstream . ReplicationMessage_Request {
ResourceURL : pbpeerstream . TypeURLExportedService ,
ResponseNonce : "1" ,
} ,
} ,
} ,
wantErr : false ,
} ,
{
name : "invalid resource url" ,
in : & pbpeerstream . ReplicationMessage_Response {
@ -1831,7 +1814,7 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test
}
}
func Test_processResponse_handleUpsert_handleDelete ( t * testing . T ) {
func Test_processResponse_ExportedServiceUpdates ( t * testing . T ) {
srv , store := newTestServer ( t , func ( c * Config ) {
backend := c . Backend . ( * testStreamBackend )
backend . leader = func ( ) bool {
@ -1840,11 +1823,11 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} )
type testCase struct {
name string
seed [ ] * structs . RegisterRequest
input * pbpeerstream . ExportedService
expect map [ string ] structs . CheckServiceNodes
expectedImportedServicesCount int
name string
seed [ ] * structs . RegisterRequest
input * pbpeerstream . ExportedService
expect map [ string ] structs . CheckServiceNodes
exportedServices [ ] string
}
peerName := "billing"
@ -1871,24 +1854,20 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
run := func ( t * testing . T , tc testCase ) {
// Seed the local catalog with some data to reconcile against.
// and increment the tracker's imported services count
var serviceNames [ ] structs . ServiceName
for _ , reg := range tc . seed {
require . NoError ( t , srv . Backend . CatalogRegister ( reg ) )
mst . TrackImportedService ( reg . Service . CompoundServiceName ( ) )
}
var op pbpeerstream . Operation
if len ( tc . input . Nodes ) == 0 {
op = pbpeerstream . Operation_OPERATION_DELETE
} else {
op = pbpeerstream . Operation_OPERATION_UPSERT
sn := reg . Service . CompoundServiceName ( )
serviceNames = append ( serviceNames , sn )
}
mst . SetImportedServices ( serviceNames )
in := & pbpeerstream . ReplicationMessage_Response {
ResourceURL : pbpeerstream . TypeURLExportedService ,
ResourceID : apiSN . String ( ) ,
Nonce : "1" ,
Operation : op ,
Operation : pbpeerstream . Operation_OPERATION_UPSERT ,
Resource : makeAnyPB ( t , tc . input ) ,
}
@ -1896,6 +1875,32 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
_ , err = srv . processResponse ( peerName , acl . DefaultPartitionName , mst , in )
require . NoError ( t , err )
if len ( tc . exportedServices ) > 0 {
resp := & pbpeerstream . ReplicationMessage_Response {
ResourceURL : pbpeerstream . TypeURLExportedServiceList ,
ResourceID : subExportedServiceList ,
Operation : pbpeerstream . Operation_OPERATION_UPSERT ,
Resource : makeAnyPB ( t , & pbpeerstream . ExportedServiceList { Services : tc . exportedServices } ) ,
}
// Simulate an update arriving for billing/api.
_ , err = srv . processResponse ( peerName , acl . DefaultPartitionName , mst , resp )
require . NoError ( t , err )
// Test the count and contents separately to ensure the count code path is hit.
require . Equal ( t , mst . GetImportedServicesCount ( ) , len ( tc . exportedServices ) )
require . ElementsMatch ( t , mst . ImportedServices , tc . exportedServices )
}
_ , allServices , err := srv . GetStore ( ) . ServiceList ( nil , & defaultMeta , peerName )
require . NoError ( t , err )
// This ensures that only services specified under tc.expect are stored. It includes
// all exported services plus their sidecar proxies.
for _ , svc := range allServices {
_ , ok := tc . expect [ svc . Name ]
require . True ( t , ok )
}
for svc , expect := range tc . expect {
t . Run ( svc , func ( t * testing . T ) {
_ , got , err := srv . GetStore ( ) . CheckServiceNodes ( nil , svc , & defaultMeta , peerName )
@ -1903,14 +1908,12 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
requireEqualInstances ( t , expect , got )
} )
}
// assert the imported services count modifications
require . Equal ( t , tc . expectedImportedServicesCount , mst . GetImportedServicesCount ( ) )
}
tt := [ ] testCase {
{
name : "upsert two service instances to the same node" ,
name : "upsert two service instances to the same node" ,
exportedServices : [ ] string { "api" } ,
input : & pbpeerstream . ExportedService {
Nodes : [ ] * pbservice . CheckServiceNode {
{
@ -2039,10 +2042,44 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
expectedImportedServicesCount : 1 ,
} ,
{
name : "upsert two service instances to different nodes" ,
name : "deleting a service with an empty exported service event" ,
exportedServices : [ ] string { "api" } ,
seed : [ ] * structs . RegisterRequest {
{
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461" ) ,
Node : "node-foo" ,
PeerName : peerName ,
Service : & structs . NodeService {
ID : "api-2" ,
Service : "api" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
Checks : structs . HealthChecks {
{
Node : "node-foo" ,
ServiceID : "api-2" ,
CheckID : types . CheckID ( "api-2-check" ) ,
PeerName : peerName ,
} ,
{
Node : "node-foo" ,
CheckID : types . CheckID ( "node-foo-check" ) ,
PeerName : peerName ,
} ,
} ,
} ,
} ,
input : & pbpeerstream . ExportedService { } ,
expect : map [ string ] structs . CheckServiceNodes {
"api" : { } ,
} ,
} ,
{
name : "upsert two service instances to different nodes" ,
exportedServices : [ ] string { "api" } ,
input : & pbpeerstream . ExportedService {
Nodes : [ ] * pbservice . CheckServiceNode {
{
@ -2171,31 +2208,31 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
expectedImportedServicesCount : 1 ,
} ,
{
name : "receiving a nil input leads to deleting data in the catalog" ,
name : "deleting one service name from a node does not delete other service names" ,
exportedServices : [ ] string { "api" , "redis" } ,
seed : [ ] * structs . RegisterRequest {
{
ID : types . NodeID ( "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2 " ) ,
Node : "node-bar " ,
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461 " ) ,
Node : "node-foo " ,
PeerName : peerName ,
Service : & structs . NodeService {
ID : "api -2" ,
Service : "api " ,
ID : "redis -2" ,
Service : "redis " ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
Checks : structs . HealthChecks {
{
Node : "node-bar " ,
ServiceID : "api -2" ,
CheckID : types . CheckID ( "api -2-check" ) ,
Node : "node-foo " ,
ServiceID : "redis -2" ,
CheckID : types . CheckID ( "redis -2-check" ) ,
PeerName : peerName ,
} ,
{
Node : "node-bar " ,
CheckID : types . CheckID ( "node-bar -check" ) ,
Node : "node-foo " ,
CheckID : types . CheckID ( "node-foo -check" ) ,
PeerName : peerName ,
} ,
} ,
@ -2225,14 +2262,46 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
// Nil input is for the "api" service.
input : & pbpeerstream . ExportedService { } ,
expect : map [ string ] structs . CheckServiceNodes {
"api" : { } ,
// Existing redis service was not affected by deletion.
"redis" : {
{
Node : & structs . Node {
ID : "af913374-68ea-41e5-82e8-6ffd3dffc461" ,
Node : "node-foo" ,
Partition : defaultMeta . PartitionOrEmpty ( ) ,
PeerName : peerName ,
} ,
Service : & structs . NodeService {
ID : "redis-2" ,
Service : "redis" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
Checks : [ ] * structs . HealthCheck {
{
CheckID : "node-foo-check" ,
Node : "node-foo" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
{
CheckID : "redis-2-check" ,
ServiceID : "redis-2" ,
Node : "node-foo" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
} ,
} ,
} ,
} ,
expectedImportedServicesCount : 0 ,
} ,
{
name : "deleting one service name from a node does not delete other service names" ,
name : "unexporting a service does not delete other servic es" ,
seed : [ ] * structs . RegisterRequest {
{
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461" ) ,
@ -2258,6 +2327,30 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
{
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461" ) ,
Node : "node-foo" ,
PeerName : peerName ,
Service : & structs . NodeService {
ID : "redis-2-sidecar-proxy" ,
Service : "redis-sidecar-proxy" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
Checks : structs . HealthChecks {
{
Node : "node-foo" ,
ServiceID : "redis-2-sidecar-proxy" ,
CheckID : types . CheckID ( "redis-2-sidecar-proxy-check" ) ,
PeerName : peerName ,
} ,
{
Node : "node-foo" ,
CheckID : types . CheckID ( "node-foo-check" ) ,
PeerName : peerName ,
} ,
} ,
} ,
{
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461" ) ,
Node : "node-foo" ,
@ -2282,11 +2375,36 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
{
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461" ) ,
Node : "node-foo" ,
PeerName : peerName ,
Service : & structs . NodeService {
ID : "api-1-sidecar-proxy" ,
Service : "api-sidecar-proxy" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
Checks : structs . HealthChecks {
{
Node : "node-foo" ,
ServiceID : "api-1-sidecar-proxy" ,
CheckID : types . CheckID ( "api-1-check" ) ,
PeerName : peerName ,
} ,
{
Node : "node-foo" ,
CheckID : types . CheckID ( "node-foo-sidecar-proxy-check" ) ,
ServiceID : "api-1-sidecar-proxy" ,
PeerName : peerName ,
} ,
} ,
} ,
} ,
// Nil input is for the "api" service.
input : & pbpeerstream . ExportedService { } ,
input : & pbpeerstream . ExportedService { } ,
exportedServices : [ ] string { "redis" } ,
expect : map [ string ] structs . CheckServiceNodes {
"api" : { } ,
// Existing redis service was not affected by deletion.
"redis" : {
{
@ -2319,11 +2437,42 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
"redis-sidecar-proxy" : {
{
Node : & structs . Node {
ID : "af913374-68ea-41e5-82e8-6ffd3dffc461" ,
Node : "node-foo" ,
Partition : defaultMeta . PartitionOrEmpty ( ) ,
PeerName : peerName ,
} ,
Service : & structs . NodeService {
ID : "redis-2-sidecar-proxy" ,
Service : "redis-sidecar-proxy" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
Checks : [ ] * structs . HealthCheck {
{
CheckID : "node-foo-check" ,
Node : "node-foo" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
{
CheckID : "redis-2-sidecar-proxy-check" ,
ServiceID : "redis-2-sidecar-proxy" ,
Node : "node-foo" ,
EnterpriseMeta : defaultMeta ,
PeerName : peerName ,
} ,
} ,
} ,
} ,
} ,
expectedImportedServicesCount : 1 ,
} ,
{
name : "service checks are cleaned up when not present in a response" ,
name : "service checks are cleaned up when not present in a response" ,
exportedServices : [ ] string { "api" } ,
seed : [ ] * structs . RegisterRequest {
{
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461" ) ,
@ -2391,10 +2540,10 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
expectedImportedServicesCount : 2 ,
} ,
{
name : "node checks are cleaned up when not present in a response" ,
name : "node checks are cleaned up when not present in a response" ,
exportedServices : [ ] string { "api" , "redis" } ,
seed : [ ] * structs . RegisterRequest {
{
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461" ) ,
@ -2526,10 +2675,10 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
expectedImportedServicesCount : 2 ,
} ,
{
name : "replacing a service instance on a node cleans up the old instance" ,
name : "replacing a service instance on a node cleans up the old instance" ,
exportedServices : [ ] string { "api" , "redis" } ,
seed : [ ] * structs . RegisterRequest {
{
ID : types . NodeID ( "af913374-68ea-41e5-82e8-6ffd3dffc461" ) ,
@ -2674,7 +2823,6 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} ,
} ,
} ,
expectedImportedServicesCount : 2 ,
} ,
}