diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index 9b2a61a5ba..74b3278f21 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -54,11 +54,9 @@ func makeExportedServiceListResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedServiceList, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: subExportedServiceList, - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: subExportedServiceList, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -86,11 +84,9 @@ func makeServiceResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: serviceName, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -104,11 +100,9 @@ func makeCARootsResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: "roots", - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: "roots", + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -122,11 +116,9 @@ func makeServerAddrsResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: "server-addrs", - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: "server-addrs", + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -162,6 +154,15 @@ func (s *Server) processResponse( err.Error(), ), err } + if resp.Nonce == "" { + err := fmt.Errorf("received response without a nonce for: %s:%s", resp.ResourceURL, resp.ResourceID) + return makeNACKReply( + resp.ResourceURL, + resp.Nonce, + code.Code_INVALID_ARGUMENT, + err.Error(), + ), err + } switch resp.Operation { case pbpeerstream.Operation_OPERATION_UPSERT: diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index ce6a5a73ed..e045cfa16b 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -436,6 +436,9 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { incomingHeartbeatCtxCancel() }() + // The nonce is used to correlate response/(ack|nack) pairs. + var nonce uint64 + // The main loop that processes sends and receives. for { select { @@ -585,7 +588,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { } if resp := msg.GetResponse(); resp != nil { - // TODO(peering): Ensure there's a nonce reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp) if err != nil { logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) @@ -669,6 +671,10 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { continue } + // Assign a new unique nonce to the response. + nonce++ + resp.Nonce = fmt.Sprintf("%08x", nonce) + replResp := makeReplicationResponse(resp) if err := streamSend(replResp); err != nil { // note: govet warns of context leak but it is cleaned up in a defer diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index baf437daa4..4674e34b37 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1162,6 +1162,55 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { }) } +func TestStreamResources_Server_AckNackNonce(t *testing.T) { + srv, store := newTestServer(t, func(c *Config) { + c.incomingHeartbeatTimeout = 50 * time.Millisecond + }) + + p := writePeeringToBeDialed(t, store, 1, "my-peer") + require.Empty(t, p.PeerID, "should be empty if being dialed") + + // Set the initial roots and CA configuration. + _, _ = writeInitialRootsAndCA(t, store) + + client := makeClient(t, srv, testPeerID) + client.DrainStream(t) + + testutil.RunStep(t, "ack contains nonce from response", func(t *testing.T) { + resp := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Response_{ + Response: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Nonce: "1234", + }, + }, + } + require.NoError(t, client.Send(resp)) + + msg, err := client.Recv() + require.NoError(t, err) + require.Equal(t, "1234", msg.GetRequest().ResponseNonce) + }) + + testutil.RunStep(t, "nack contains nonce from response", func(t *testing.T) { + resp := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Response_{ + Response: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, // Unspecified gets NACK + Nonce: "5678", + }, + }, + } + require.NoError(t, client.Send(resp)) + + msg, err := client.Recv() + require.NoError(t, err) + require.Equal(t, "5678", msg.GetRequest().ResponseNonce) + }) +} + // Test that when the client doesn't send a heartbeat in time, the stream is disconnected. func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) { it := incrementalTime{ @@ -1618,6 +1667,28 @@ func Test_processResponse_Validation(t *testing.T) { }, wantErr: true, }, + { + name: "missing a nonce", + in: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + ResourceID: "web", + Nonce: "", + Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, + }, + expect: &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Request_{ + Request: &pbpeerstream.ReplicationMessage_Request{ + ResourceURL: pbpeerstream.TypeURLExportedService, + ResponseNonce: "", + Error: &pbstatus.Status{ + Code: int32(code.Code_INVALID_ARGUMENT), + Message: fmt.Sprintf(`received response without a nonce for: %s:web`, pbpeerstream.TypeURLExportedService), + }, + }, + }, + }, + wantErr: true, + }, { name: "unknown operation", in: &pbpeerstream.ReplicationMessage_Response{ @@ -1809,8 +1880,15 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test } }) + nonces := make(map[string]struct{}) for i := 0; i < num; i++ { checkFns[i](t, out[i]) + + // Ensure every nonce was unique. + if resp := out[i].GetResponse(); resp != nil { + require.NotContains(t, nonces, resp.Nonce) + nonces[resp.Nonce] = struct{}{} + } } } @@ -1879,6 +1957,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { resp := &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedServiceList, ResourceID: subExportedServiceList, + Nonce: "2", Operation: pbpeerstream.Operation_OPERATION_UPSERT, Resource: makeAnyPB(t, &pbpeerstream.ExportedServiceList{Services: tc.exportedServices}), }