peering: expose IsLeader, hung up on dialer if follower (#13164)

Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>
pull/13192/head
alex 3 years ago committed by GitHub
parent 9e1f362499
commit 876f3bb971
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,6 +15,7 @@ import (
)
type peeringBackend struct {
// TODO(peering): accept a smaller interface; maybe just funcs from the server that we actually need: DC, IsLeader, etc
srv *Server
connPool GRPCClientConner
apply *peeringApply
@ -31,6 +32,7 @@ func NewPeeringBackend(srv *Server, connPool GRPCClientConner) peering.Backend {
}
}
// Forward should not be used to initiate forwarding over bidirectional streams
func (b *peeringBackend) Forward(info structs.RPCInfo, f func(*grpc.ClientConn) error) (handled bool, err error) {
// Only forward the request if the dc in the request matches the server's datacenter.
if info.RequestDatacenter() != "" && info.RequestDatacenter() != b.srv.config.Datacenter {
@ -103,6 +105,10 @@ func (b *peeringBackend) EnterpriseCheckPartitions(partition string) error {
return b.enterpriseCheckPartitions(partition)
}
func (b *peeringBackend) IsLeader() bool {
return b.srv.IsLeader()
}
type peeringApply struct {
srv *Server
}

@ -91,6 +91,9 @@ type Backend interface {
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
// IsLeader indicates whether the consul server is in a leader state or not.
IsLeader() bool
Store() Store
Apply() Apply
}
@ -423,6 +426,14 @@ type BidirectionalStream interface {
// StreamResources handles incoming streaming connections.
func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResourcesServer) error {
if !s.Backend.IsLeader() {
// we are not the leader so we will hang up on the dialer
// TODO(peering): in the future we want to indicate the address of the leader server as a message to the dialer (best effort, non blocking)
s.logger.Error("cannot establish a peering stream on a follower node")
return grpcstatus.Error(codes.FailedPrecondition, "cannot establish a peering stream on a follower node")
}
// Initial message on a new stream must be a new subscription request.
first, err := stream.Recv()
if err != nil {
@ -586,6 +597,14 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
return nil
}
if !s.Backend.IsLeader() {
// we are not the leader anymore so we will hang up on the dialer
// TODO(peering): in the future we want to indicate the address of the leader server as a message to the dialer (best effort, non blocking)
logger.Error("node is not a leader anymore; cannot continue streaming")
return grpcstatus.Error(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming")
}
if req := msg.GetRequest(); req != nil {
switch {
case req.Nonce == "":

@ -30,6 +30,107 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry"
)
func TestStreamResources_Server_Follower(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
pub: publisher,
leader: func() bool {
return false
},
})
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
err := srv.StreamResources(client.ReplicationStream)
if err != nil {
errCh <- err
}
}()
msg, err := client.Recv()
require.Nil(t, msg)
require.Error(t, err)
require.EqualError(t, err, "rpc error: code = FailedPrecondition desc = cannot establish a peering stream on a follower node")
}
// TestStreamResources_Server_LeaderBecomesFollower simulates a srv that is a leader when the
// subscription request is sent but loses leadership status for subsequent messages.
func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
first := true
leaderFunc := func() bool {
if first {
first = false
return true
}
return false
}
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
pub: publisher,
leader: leaderFunc,
})
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
err := srv.StreamResources(client.ReplicationStream)
if err != nil {
errCh <- err
}
}()
p := writeInitiatedPeering(t, store, 1, "my-peer")
peerID := p.ID
// Receive a subscription from a peer
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeering.TypeURLService,
},
},
}
err := client.Send(sub)
require.NoError(t, err)
msg, err := client.Recv()
require.NoError(t, err)
require.NotEmpty(t, msg)
input2 := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: pbpeering.TypeURLService,
Nonce: "1",
},
},
}
err2 := client.Send(input2)
require.NoError(t, err2)
msg2, err2 := client.Recv()
require.Nil(t, msg2)
require.Error(t, err2)
require.EqualError(t, err2, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming")
}
func TestStreamResources_Server_FirstRequest(t *testing.T) {
type testCase struct {
name string
@ -696,6 +797,14 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
type testStreamBackend struct {
pub state.EventPublisher
store *state.Store
leader func() bool
}
func (b *testStreamBackend) IsLeader() bool {
if b.leader != nil {
return b.leader()
}
return true
}
func (b *testStreamBackend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) {

Loading…
Cancel
Save