diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 85123e15e4..9ec6639c5f 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -15,6 +15,7 @@ import ( ) type peeringBackend struct { + // TODO(peering): accept a smaller interface; maybe just funcs from the server that we actually need: DC, IsLeader, etc srv *Server connPool GRPCClientConner apply *peeringApply @@ -31,6 +32,7 @@ func NewPeeringBackend(srv *Server, connPool GRPCClientConner) peering.Backend { } } +// Forward should not be used to initiate forwarding over bidirectional streams func (b *peeringBackend) Forward(info structs.RPCInfo, f func(*grpc.ClientConn) error) (handled bool, err error) { // Only forward the request if the dc in the request matches the server's datacenter. if info.RequestDatacenter() != "" && info.RequestDatacenter() != b.srv.config.Datacenter { @@ -103,6 +105,10 @@ func (b *peeringBackend) EnterpriseCheckPartitions(partition string) error { return b.enterpriseCheckPartitions(partition) } +func (b *peeringBackend) IsLeader() bool { + return b.srv.IsLeader() +} + type peeringApply struct { srv *Server } diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 562f161461..57343efb59 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -91,6 +91,9 @@ type Backend interface { Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) + // IsLeader indicates whether the consul server is in a leader state or not. + IsLeader() bool + Store() Store Apply() Apply } @@ -423,6 +426,14 @@ type BidirectionalStream interface { // StreamResources handles incoming streaming connections. func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResourcesServer) error { + if !s.Backend.IsLeader() { + // we are not the leader so we will hang up on the dialer + + // TODO(peering): in the future we want to indicate the address of the leader server as a message to the dialer (best effort, non blocking) + s.logger.Error("cannot establish a peering stream on a follower node") + return grpcstatus.Error(codes.FailedPrecondition, "cannot establish a peering stream on a follower node") + } + // Initial message on a new stream must be a new subscription request. first, err := stream.Recv() if err != nil { @@ -586,6 +597,14 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { return nil } + if !s.Backend.IsLeader() { + // we are not the leader anymore so we will hang up on the dialer + + // TODO(peering): in the future we want to indicate the address of the leader server as a message to the dialer (best effort, non blocking) + logger.Error("node is not a leader anymore; cannot continue streaming") + return grpcstatus.Error(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming") + } + if req := msg.GetRequest(); req != nil { switch { case req.Nonce == "": diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 9af44b75d0..04c4c89a1c 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -30,6 +30,107 @@ import ( "github.com/hashicorp/consul/sdk/testutil/retry" ) +func TestStreamResources_Server_Follower(t *testing.T) { + publisher := stream.NewEventPublisher(10 * time.Second) + store := newStateStore(t, publisher) + + srv := NewService(testutil.Logger(t), &testStreamBackend{ + store: store, + pub: publisher, + leader: func() bool { + return false + }, + }) + + client := NewMockClient(context.Background()) + + errCh := make(chan error, 1) + client.ErrCh = errCh + + go func() { + // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). + // This matches gRPC's behavior when an error is returned by a server. + err := srv.StreamResources(client.ReplicationStream) + if err != nil { + errCh <- err + } + }() + + msg, err := client.Recv() + require.Nil(t, msg) + require.Error(t, err) + require.EqualError(t, err, "rpc error: code = FailedPrecondition desc = cannot establish a peering stream on a follower node") +} + +// TestStreamResources_Server_LeaderBecomesFollower simulates a srv that is a leader when the +// subscription request is sent but loses leadership status for subsequent messages. +func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { + publisher := stream.NewEventPublisher(10 * time.Second) + store := newStateStore(t, publisher) + + first := true + leaderFunc := func() bool { + if first { + first = false + return true + } + return false + } + srv := NewService(testutil.Logger(t), &testStreamBackend{ + store: store, + pub: publisher, + leader: leaderFunc, + }) + + client := NewMockClient(context.Background()) + + errCh := make(chan error, 1) + client.ErrCh = errCh + + go func() { + err := srv.StreamResources(client.ReplicationStream) + if err != nil { + errCh <- err + } + }() + + p := writeInitiatedPeering(t, store, 1, "my-peer") + peerID := p.ID + + // Receive a subscription from a peer + sub := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Request_{ + Request: &pbpeering.ReplicationMessage_Request{ + PeerID: peerID, + ResourceURL: pbpeering.TypeURLService, + }, + }, + } + err := client.Send(sub) + require.NoError(t, err) + + msg, err := client.Recv() + require.NoError(t, err) + require.NotEmpty(t, msg) + + input2 := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Request_{ + Request: &pbpeering.ReplicationMessage_Request{ + ResourceURL: pbpeering.TypeURLService, + Nonce: "1", + }, + }, + } + + err2 := client.Send(input2) + require.NoError(t, err2) + + msg2, err2 := client.Recv() + require.Nil(t, msg2) + require.Error(t, err2) + require.EqualError(t, err2, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming") +} + func TestStreamResources_Server_FirstRequest(t *testing.T) { type testCase struct { name string @@ -694,8 +795,16 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { } type testStreamBackend struct { - pub state.EventPublisher - store *state.Store + pub state.EventPublisher + store *state.Store + leader func() bool +} + +func (b *testStreamBackend) IsLeader() bool { + if b.leader != nil { + return b.leader() + } + return true } func (b *testStreamBackend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) {