mirror of https://github.com/hashicorp/consul
Prevent peering acceptor from subscribing to addr updates. (#15214)
parent
05e93f7569
commit
bd1019fadb
|
@ -277,7 +277,7 @@ type HandleStreamRequest struct {
|
|||
Stream BidirectionalStream
|
||||
}
|
||||
|
||||
func (r HandleStreamRequest) WasDialed() bool {
|
||||
func (r HandleStreamRequest) IsAcceptor() bool {
|
||||
return r.RemoteID == ""
|
||||
}
|
||||
|
||||
|
@ -316,7 +316,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
|||
logger := s.Logger.Named("stream").
|
||||
With("peer_name", streamReq.PeerName).
|
||||
With("peer_id", streamReq.LocalID).
|
||||
With("dialed", streamReq.WasDialed())
|
||||
With("dailer", !streamReq.IsAcceptor())
|
||||
logger.Trace("handling stream for peer")
|
||||
|
||||
// handleStreamCtx is local to this function.
|
||||
|
@ -380,13 +380,18 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Subscribe to all relevant resource types.
|
||||
for _, resourceURL := range []string{
|
||||
resources := []string{
|
||||
pbpeerstream.TypeURLExportedService,
|
||||
pbpeerstream.TypeURLExportedServiceList,
|
||||
pbpeerstream.TypeURLPeeringTrustBundle,
|
||||
pbpeerstream.TypeURLPeeringServerAddresses,
|
||||
} {
|
||||
}
|
||||
// Acceptors should not subscribe to server address updates, because they should always have an empty list.
|
||||
if !streamReq.IsAcceptor() {
|
||||
resources = append(resources, pbpeerstream.TypeURLPeeringServerAddresses)
|
||||
}
|
||||
|
||||
// Subscribe to all relevant resource types.
|
||||
for _, resourceURL := range resources {
|
||||
sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
|
||||
ResourceURL: resourceURL,
|
||||
PeerID: streamReq.RemoteID,
|
||||
|
@ -558,7 +563,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
|||
// This must be a new subscription request to add a new
|
||||
// resource type, vet it like a new request.
|
||||
|
||||
if !streamReq.WasDialed() {
|
||||
if !streamReq.IsAcceptor() {
|
||||
if req.PeerID != "" && req.PeerID != streamReq.RemoteID {
|
||||
// Not necessary after the first request from the dialer,
|
||||
// but if provided must match.
|
||||
|
|
|
@ -125,7 +125,7 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
|
|||
|
||||
// Receive a subscription from a peer. This message arrives while the
|
||||
// server is a leader and should work.
|
||||
testutil.RunStep(t, "send subscription request to leader and consume its four requests", func(t *testing.T) {
|
||||
testutil.RunStep(t, "send subscription request to leader and consume its three requests", func(t *testing.T) {
|
||||
sub := &pbpeerstream.ReplicationMessage{
|
||||
Payload: &pbpeerstream.ReplicationMessage_Open_{
|
||||
Open: &pbpeerstream.ReplicationMessage_Open{
|
||||
|
@ -148,10 +148,6 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
|
|||
msg3, err := client.Recv()
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, msg3)
|
||||
|
||||
msg4, err := client.Recv()
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, msg4)
|
||||
})
|
||||
|
||||
// The ACK will be a new request but at this point the server is not the
|
||||
|
@ -551,6 +547,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
it := incrementalTime{
|
||||
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
|
||||
}
|
||||
waitUntil := it.FutureNow(6)
|
||||
|
||||
srv, store := newTestServer(t, nil)
|
||||
srv.Tracker.setClock(it.Now)
|
||||
|
@ -576,6 +573,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
|
||||
client.DrainStream(t)
|
||||
|
||||
// Wait for async workflows to complete.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, waitUntil, it.FutureNow(1))
|
||||
})
|
||||
|
||||
// Manually grab the last success time from sending the trust bundle or exported services list.
|
||||
status, ok := srv.StreamStatus(testPeerID)
|
||||
require.True(t, ok)
|
||||
|
@ -605,7 +607,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
LastAck: lastSendAck,
|
||||
ExportedServices: []string{},
|
||||
}
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
rStatus, ok := srv.StreamStatus(testPeerID)
|
||||
require.True(r, ok)
|
||||
|
@ -894,9 +895,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
|||
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
|
||||
|
||||
expectReplEvents(t, client,
|
||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||
require.Equal(t, pbpeerstream.TypeURLPeeringServerAddresses, msg.GetRequest().ResourceURL)
|
||||
},
|
||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
|
||||
// Roots tested in TestStreamResources_Server_CARootUpdates
|
||||
|
@ -1105,9 +1103,6 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
|
|||
|
||||
testutil.RunStep(t, "initial CA Roots replication", func(t *testing.T) {
|
||||
expectReplEvents(t, client,
|
||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||
require.Equal(t, pbpeerstream.TypeURLPeeringServerAddresses, msg.GetRequest().ResourceURL)
|
||||
},
|
||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
|
||||
require.Equal(t, "roots", msg.GetResponse().ResourceID)
|
||||
|
|
Loading…
Reference in New Issue