consul/agent/grpc-external/services/peerstream/stream_resources.go

423 lines
14 KiB
Go

package peerstream
import (
"context"
"fmt"
"io"
"strings"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-hclog"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/connect"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbpeerstream"
)
type BidirectionalStream interface {
Send(*pbpeerstream.ReplicationMessage) error
Recv() (*pbpeerstream.ReplicationMessage, error)
Context() context.Context
}
// StreamResources handles incoming streaming connections.
func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamResourcesServer) error {
logger := s.Logger.Named("stream-resources").With("request_id", external.TraceID())
logger.Trace("Started processing request")
defer logger.Trace("Finished processing request")
// NOTE: this code should have similar error handling to the new-request
// handling code in HandleStream()
if !s.Backend.IsLeader() {
// we are not the leader so we will hang up on the dialer
logger.Error("cannot establish a peering stream on a follower node")
st, err := grpcstatus.New(codes.FailedPrecondition,
"cannot establish a peering stream on a follower node").WithDetails(
&pbpeerstream.LeaderAddress{Address: s.Backend.GetLeaderAddress()})
if err != nil {
logger.Error(fmt.Sprintf("failed to marshal the leader address in response; err: %v", err))
return grpcstatus.Error(codes.FailedPrecondition, "cannot establish a peering stream on a follower node")
} else {
return st.Err()
}
}
// Initial message on a new stream must be a new subscription request.
first, err := stream.Recv()
if err != nil {
logger.Error("failed to establish stream", "error", err)
return err
}
// TODO(peering) Make request contain a list of resources, so that roots and services can be
// subscribed to with a single request. See:
// https://github.com/envoyproxy/data-plane-api/blob/main/envoy/service/discovery/v3/discovery.proto#L46
req := first.GetRequest()
if req == nil {
return grpcstatus.Error(codes.InvalidArgument, "first message when initiating a peering must be a subscription request")
}
logger.Trace("received initial replication request from peer")
logTraceRecv(logger, req)
if req.PeerID == "" {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must specify a PeerID")
}
if req.ResponseNonce != "" {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must not contain a nonce")
}
if req.Error != nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must not contain an error")
}
if !pbpeerstream.KnownTypeURL(req.ResourceURL) {
return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL)
}
_, p, err := s.GetStore().PeeringReadByID(nil, req.PeerID)
if err != nil {
logger.Error("failed to look up peer", "peer_id", req.PeerID, "error", err)
return grpcstatus.Error(codes.Internal, "failed to find PeerID: "+req.PeerID)
}
if p == nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID)
}
// TODO(peering): If the peering is marked as deleted, send a Terminated message and return
// TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it
logger.Info("accepted initial replication request from peer", "peer_id", p.ID)
if p.PeerID != "" {
return grpcstatus.Error(codes.InvalidArgument, "expected PeerID to be empty; the wrong end of peering is being dialed")
}
streamReq := HandleStreamRequest{
LocalID: p.ID,
RemoteID: "",
PeerName: p.Name,
Partition: p.Partition,
Stream: stream,
}
err = s.HandleStream(streamReq)
// A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown.
if err == nil {
s.DrainStream(streamReq)
return nil
}
logger.Error("error handling stream", "peer_name", p.Name, "peer_id", req.PeerID, "error", err)
return err
}
type HandleStreamRequest struct {
// LocalID is the UUID for the peering in the local Consul datacenter.
LocalID string
// RemoteID is the UUID for the peering from the perspective of the peer.
RemoteID string
// PeerName is the name of the peering.
PeerName string
// Partition is the local partition associated with the peer.
Partition string
// Stream is the open stream to the peer cluster.
Stream BidirectionalStream
}
func (r HandleStreamRequest) WasDialed() bool {
return r.RemoteID == ""
}
// DrainStream attempts to gracefully drain the stream when the connection is going to be torn down.
// Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message.
// Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated.
func (s *Server) DrainStream(req HandleStreamRequest) {
for {
// Ensure that we read until an error, or the peer has nothing more to send.
if _, err := req.Stream.Recv(); err != nil {
if err != io.EOF {
s.Logger.Warn("failed to tear down stream gracefully: peer may not have received termination message",
"peer_name", req.PeerName, "peer_id", req.LocalID, "error", err)
}
break
}
// Since the peering is being torn down we discard all replication messages without an error.
// We want to avoid importing new data at this point.
}
}
// The localID provided is the locally-generated identifier for the peering.
// The remoteID is an identifier that the remote peer recognizes for the peering.
func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
// TODO: pass logger down from caller?
logger := s.Logger.Named("stream").
With("peer_name", streamReq.PeerName).
With("peer_id", streamReq.LocalID).
With("dialed", streamReq.WasDialed())
logger.Trace("handling stream for peer")
status, err := s.Tracker.Connected(streamReq.LocalID)
if err != nil {
return fmt.Errorf("failed to register stream: %v", err)
}
// TODO(peering) Also need to clear subscriptions associated with the peer
defer s.Tracker.Disconnected(streamReq.LocalID)
var trustDomain string
if s.ConnectEnabled {
// Read the TrustDomain up front - we do not allow users to change the ClusterID
// so reading it once at the beginning of the stream is sufficient.
trustDomain, err = getTrustDomain(s.GetStore(), logger)
if err != nil {
return err
}
}
mgr := newSubscriptionManager(
streamReq.Stream.Context(),
logger,
s.Config,
trustDomain,
s.Backend,
s.GetStore,
)
subCh := mgr.subscribe(streamReq.Stream.Context(), streamReq.LocalID, streamReq.PeerName, streamReq.Partition)
sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService,
PeerID: streamReq.RemoteID,
})
logTraceSend(logger, sub)
if err := streamReq.Stream.Send(sub); err != nil {
if err == io.EOF {
logger.Info("stream ended by peer")
status.TrackReceiveError(err.Error())
return nil
}
// TODO(peering) Test error handling in calls to Send/Recv
status.TrackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err)
}
// TODO(peering): Should this be buffered?
recvChan := make(chan *pbpeerstream.ReplicationMessage)
go func() {
defer close(recvChan)
for {
msg, err := streamReq.Stream.Recv()
if err == nil {
logTraceRecv(logger, msg)
recvChan <- msg
continue
}
if err == io.EOF {
logger.Info("stream ended by peer")
status.TrackReceiveError(err.Error())
return
}
logger.Error("failed to receive from stream", "error", err)
status.TrackReceiveError(err.Error())
return
}
}()
for {
select {
// When the doneCh is closed that means that the peering was deleted locally.
case <-status.Done():
logger.Info("ending stream")
term := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Terminated_{
Terminated: &pbpeerstream.ReplicationMessage_Terminated{},
},
}
logTraceSend(logger, term)
if err := streamReq.Stream.Send(term); err != nil {
status.TrackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err)
}
logger.Trace("deleting stream status")
s.Tracker.DeleteStatus(streamReq.LocalID)
return nil
case msg, open := <-recvChan:
if !open {
logger.Trace("no longer receiving data on the stream")
return nil
}
// NOTE: this code should have similar error handling to the
// initial handling code in StreamResources()
if !s.Backend.IsLeader() {
// we are not the leader anymore so we will hang up on the dialer
logger.Error("node is not a leader anymore; cannot continue streaming")
st, err := grpcstatus.New(codes.FailedPrecondition,
"node is not a leader anymore; cannot continue streaming").WithDetails(
&pbpeerstream.LeaderAddress{Address: s.Backend.GetLeaderAddress()})
if err != nil {
logger.Error(fmt.Sprintf("failed to marshal the leader address in response; err: %v", err))
return grpcstatus.Error(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming")
} else {
return st.Err()
}
}
if req := msg.GetRequest(); req != nil {
if !pbpeerstream.KnownTypeURL(req.ResourceURL) {
return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL)
}
switch {
case req.ResponseNonce == "":
// TODO(peering): This can happen on a client peer since they don't try to receive subscriptions before entering HandleStream.
// Should change that behavior or only allow it that one time.
case req.Error != nil && (req.Error.Code != int32(code.Code_OK) || req.Error.Message != ""):
logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message)
status.TrackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message))
default:
status.TrackAck()
}
continue
}
if resp := msg.GetResponse(); resp != nil {
// TODO(peering): Ensure there's a nonce
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, resp)
if err != nil {
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
status.TrackReceiveError(err.Error())
} else {
status.TrackReceiveSuccess()
}
logTraceSend(logger, reply)
if err := streamReq.Stream.Send(reply); err != nil {
status.TrackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err)
}
continue
}
if term := msg.GetTerminated(); term != nil {
logger.Info("peering was deleted by our peer: marking peering as terminated and cleaning up imported resources")
// Once marked as terminated, a separate deferred deletion routine will clean up imported resources.
if err := s.Backend.PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: streamReq.LocalID}); err != nil {
logger.Error("failed to mark peering as terminated: %w", err)
}
return nil
}
case update := <-subCh:
var resp *pbpeerstream.ReplicationMessage_Response
switch {
case strings.HasPrefix(update.CorrelationID, subExportedService):
resp, err = makeServiceResponse(logger, update)
if err != nil {
// Log the error and skip this response to avoid locking up peering due to a bad update event.
logger.Error("failed to create service response", "error", err)
continue
}
case strings.HasPrefix(update.CorrelationID, subMeshGateway):
// TODO(Peering): figure out how to sync this separately
case update.CorrelationID == subCARoot:
resp, err = makeCARootsResponse(logger, update)
if err != nil {
// Log the error and skip this response to avoid locking up peering due to a bad update event.
logger.Error("failed to create ca roots response", "error", err)
continue
}
default:
logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID)
continue
}
if resp == nil {
continue
}
replResp := makeReplicationResponse(resp)
logTraceSend(logger, replResp)
if err := streamReq.Stream.Send(replResp); err != nil {
status.TrackSendError(err.Error())
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
}
}
}
}
func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) {
_, cfg, err := store.CAConfig(nil)
switch {
case err != nil:
logger.Error("failed to read Connect CA Config", "error", err)
return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config")
case cfg == nil:
logger.Warn("cannot begin stream because Connect CA is not yet initialized")
return "", grpcstatus.Error(codes.FailedPrecondition, "Connect CA is not yet initialized")
}
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
}
func (s *Server) StreamStatus(peer string) (resp Status, found bool) {
return s.Tracker.StreamStatus(peer)
}
// ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.
func (s *Server) ConnectedStreams() map[string]chan struct{} {
return s.Tracker.ConnectedStreams()
}
func logTraceRecv(logger hclog.Logger, pb proto.Message) {
logTraceProto(logger, pb, true)
}
func logTraceSend(logger hclog.Logger, pb proto.Message) {
logTraceProto(logger, pb, false)
}
func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) {
if !logger.IsTrace() {
return
}
dir := "sent"
if received {
dir = "received"
}
m := jsonpb.Marshaler{
Indent: " ",
}
out, err := m.MarshalToString(pb)
if err != nil {
out = "<ERROR: " + err.Error() + ">"
}
logger.Trace("replication message", "direction", dir, "protobuf", out)
}