From 22572470956e2c2d5e2c5fd080bdceb39a566066 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 22 Jul 2020 19:57:29 -0400 Subject: [PATCH] server: add gRPC server for streaming events Includes a stats handler and stream interceptor for grpc metrics. Co-authored-by: Paul Banks --- agent/consul/config.go | 3 ++ agent/consul/rpc.go | 6 +++ agent/consul/server.go | 33 +++++++++++- agent/grpc/handler.go | 113 +++++++++++++++++++++++++++++++++++++++++ agent/grpc/stats.go | 82 ++++++++++++++++++++++++++++++ agent/pool/conn.go | 22 ++++---- 6 files changed, 247 insertions(+), 12 deletions(-) create mode 100644 agent/grpc/handler.go create mode 100644 agent/grpc/stats.go diff --git a/agent/consul/config.go b/agent/consul/config.go index 4316475651..c1b2451ab4 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -470,6 +470,9 @@ type Config struct { // AutoEncrypt.Sign requests. AutoEncryptAllowTLS bool + // TODO: godoc, set this value from Agent + EnableGRPCServer bool + // Embedded Consul Enterprise specific configuration *EnterpriseConfig } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 0a520dcee0..ac1096292b 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -188,6 +188,9 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { conn = tls.Server(conn, s.tlsConfigurator.IncomingInsecureRPCConfig()) s.handleInsecureConn(conn) + case pool.RPCGRPC: + s.grpcHandler.Handle(conn) + default: if !s.handleEnterpriseRPCConn(typ, conn, isTLS) { s.rpcLogger().Error("unrecognized RPC byte", @@ -254,6 +257,9 @@ func (s *Server) handleNativeTLS(conn net.Conn) { case pool.ALPN_RPCSnapshot: s.handleSnapshotConn(tlsConn) + case pool.ALPN_RPCGRPC: + s.grpcHandler.Handle(conn) + case pool.ALPN_WANGossipPacket: if err := s.handleALPN_WANGossipPacketStream(tlsConn); err != nil && err != io.EOF { s.rpcLogger().Error( diff --git a/agent/consul/server.go b/agent/consul/server.go index c1c1a6d76e..2a496d9624 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -26,6 +26,7 @@ import ( "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/usagemetrics" + "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -239,8 +240,9 @@ type Server struct { rpcConnLimiter connlimit.Limiter // Listener is used to listen for incoming connections - Listener net.Listener - rpcServer *rpc.Server + Listener net.Listener + grpcHandler connHandler + rpcServer *rpc.Server // insecureRPCServer is a RPC server that is configure with // IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign @@ -314,6 +316,12 @@ type Server struct { EnterpriseServer } +type connHandler interface { + Run() error + Handle(conn net.Conn) + Shutdown() error +} + // NewServer is used to construct a new Consul server from the configuration // and extra options, potentially returning an error. func NewServer(config *Config, options ...ConsulOption) (*Server, error) { @@ -603,6 +611,8 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) { } go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + s.grpcHandler = newGRPCHandlerFromConfig(logger, config) + // Initialize Autopilot. This must happen before starting leadership monitoring // as establishing leadership could attempt to use autopilot and cause a panic. s.initAutopilot(config) @@ -612,6 +622,11 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) { go s.monitorLeadership() // Start listening for RPC requests. + go func() { + if err := s.grpcHandler.Run(); err != nil { + s.logger.Error("gRPC server failed", "error", err) + } + }() go s.listen(s.Listener) // Start listeners for any segments with separate RPC listeners. @@ -625,6 +640,14 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) { return s, nil } +func newGRPCHandlerFromConfig(logger hclog.Logger, config *Config) connHandler { + if !config.EnableGRPCServer { + return grpc.NoOpHandler{Logger: logger} + } + + return grpc.NewHandler(config.RPCAddr) +} + func (s *Server) connectCARootsMonitor(ctx context.Context) { for { ws := memdb.NewWatchSet() @@ -949,6 +972,12 @@ func (s *Server) Shutdown() error { s.Listener.Close() } + if s.grpcHandler != nil { + if err := s.grpcHandler.Shutdown(); err != nil { + s.logger.Warn("failed to stop gRPC server", "error", err) + } + } + // Close the connection pool if s.connPool != nil { s.connPool.Shutdown() diff --git a/agent/grpc/handler.go b/agent/grpc/handler.go new file mode 100644 index 0000000000..82577ed699 --- /dev/null +++ b/agent/grpc/handler.go @@ -0,0 +1,113 @@ +/* +Package grpc provides a Handler and client for agent gRPC connections. +*/ +package grpc + +import ( + "fmt" + "net" + + "google.golang.org/grpc" +) + +// NewHandler returns a Handler for addr. +func NewHandler(addr net.Addr) *Handler { + conns := make(chan net.Conn) + + // We don't need to pass tls.Config to the server since it's multiplexed + // behind the RPC listener, which already has TLS configured. + srv := grpc.NewServer( + grpc.StatsHandler(&statsHandler{}), + grpc.StreamInterceptor((&activeStreamCounter{}).Intercept), + ) + + // TODO(streaming): add gRPC services to srv here + + return &Handler{ + conns: conns, + srv: srv, + listener: &chanListener{addr: addr, conns: conns}, + } +} + +// Handler implements a handler for the rpc server listener, and the +// agent.Component interface for managing the lifecycle of the grpc.Server. +type Handler struct { + conns chan net.Conn + srv *grpc.Server + listener *chanListener +} + +// Handle the connection by sending it to a channel for the grpc.Server to receive. +func (h *Handler) Handle(conn net.Conn) { + h.conns <- conn +} + +func (h *Handler) Run() error { + return h.srv.Serve(h.listener) +} + +func (h *Handler) Shutdown() error { + h.srv.Stop() + return nil +} + +// chanListener implements net.Listener for grpc.Server. +type chanListener struct { + conns chan net.Conn + addr net.Addr +} + +// Accept blocks until a connection is received from Handle, and then returns the +// connection. Accept implements part of the net.Listener interface for grpc.Server. +func (l *chanListener) Accept() (net.Conn, error) { + return <-l.conns, nil +} + +func (l *chanListener) Addr() net.Addr { + return l.addr +} + +// Close does nothing. The connections are managed by the caller. +func (l *chanListener) Close() error { + return nil +} + +// NoOpHandler implements the same methods as Handler, but performs no handling. +// It may be used in place of Handler to disable the grpc server. +type NoOpHandler struct { + Logger Logger +} + +type Logger interface { + Error(string, ...interface{}) +} + +func (h NoOpHandler) Handle(conn net.Conn) { + h.Logger.Error("gRPC conn opened but gRPC RPC is disabled, closing", + "conn", logConn(conn)) + _ = conn.Close() +} + +func (h NoOpHandler) Run() error { + return nil +} + +func (h NoOpHandler) Shutdown() error { + return nil +} + +// logConn is a local copy of github.com/hashicorp/memberlist.LogConn, to avoid +// a large dependency for a minor formatting function. +// logConn is used to keep log formatting consistent. +func logConn(conn net.Conn) string { + if conn == nil { + return "from=" + } + addr := conn.RemoteAddr() + if addr == nil { + return "from=" + } + + return fmt.Sprintf("from=%s", addr.String()) +} diff --git a/agent/grpc/stats.go b/agent/grpc/stats.go new file mode 100644 index 0000000000..40821244cf --- /dev/null +++ b/agent/grpc/stats.go @@ -0,0 +1,82 @@ +package grpc + +import ( + "context" + "sync/atomic" + + "github.com/armon/go-metrics" + "google.golang.org/grpc" + "google.golang.org/grpc/stats" +) + +// statsHandler is a grpc/stats.StatsHandler which emits connection and +// request metrics to go-metrics. +type statsHandler struct { + activeConns uint64 // must be 8-byte aligned for atomic access +} + +// TagRPC implements grpcStats.StatsHandler +func (c *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + // No-op + return ctx +} + +// HandleRPC implements grpcStats.StatsHandler +func (c *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) { + label := "server" + if s.IsClient() { + label = "client" + } + switch s.(type) { + case *stats.InHeader: + metrics.IncrCounter([]string{"grpc", label, "request"}, 1) + } +} + +// TagConn implements grpcStats.StatsHandler +func (c *statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + // No-op + return ctx +} + +// HandleConn implements grpcStats.StatsHandler +func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) { + label := "server" + if s.IsClient() { + label = "client" + } + var count uint64 + switch s.(type) { + case *stats.ConnBegin: + count = atomic.AddUint64(&c.activeConns, 1) + case *stats.ConnEnd: + // Decrement! + count = atomic.AddUint64(&c.activeConns, ^uint64(0)) + } + metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count)) +} + +type activeStreamCounter struct { + // count of the number of open streaming RPCs on a server. It is accessed + // atomically. + count uint64 +} + +// GRPCCountingStreamInterceptor is a grpc.ServerStreamInterceptor that emits a +// a metric of the count of open streams. +func (i *activeStreamCounter) Intercept( + srv interface{}, + ss grpc.ServerStream, + _ *grpc.StreamServerInfo, + handler grpc.StreamHandler, +) error { + + count := atomic.AddUint64(&i.count, 1) + metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) + defer func() { + count := atomic.AddUint64(&i.count, ^uint64(0)) + metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) + }() + + return handler(srv, ss) +} diff --git a/agent/pool/conn.go b/agent/pool/conn.go index 8a046fa4ca..79731953b4 100644 --- a/agent/pool/conn.go +++ b/agent/pool/conn.go @@ -40,23 +40,24 @@ const ( // that is supported and it might be the only one there // ever is. RPCTLSInsecure = 7 + RPCGRPC = 8 - // RPCMaxTypeValue is the maximum rpc type byte value currently used for - // the various protocols riding over our "rpc" port. + // RPCMaxTypeValue is the maximum rpc type byte value currently used for the + // various protocols riding over our "rpc" port. // - // Currently our 0-7 values are mutually exclusive with any valid first - // byte of a TLS header. The first TLS header byte will begin with a TLS - // content type and the values 0-19 are all explicitly unassigned and - // marked as requiring coordination. RFC 7983 does the marking and goes - // into some details about multiplexing connections and identifying TLS. + // Currently our 0-8 values are mutually exclusive with any valid first byte + // of a TLS header. The first TLS header byte will begin with a TLS content + // type and the values 0-19 are all explicitly unassigned and marked as + // requiring coordination. RFC 7983 does the marking and goes into some + // details about multiplexing connections and identifying TLS. // // We use this value to determine if the incoming request is actual real - // native TLS (where we can demultiplex based on ALPN protocol) or our - // older type-byte system when new connections are established. + // native TLS (where we can de-multiplex based on ALPN protocol) or our older + // type-byte system when new connections are established. // // NOTE: if you add new RPCTypes beyond this value, you must similarly bump // this value. - RPCMaxTypeValue = 7 + RPCMaxTypeValue = 8 ) const ( @@ -66,6 +67,7 @@ const ( ALPN_RPCMultiplexV2 = "consul/rpc-multi" // RPCMultiplexV2 ALPN_RPCSnapshot = "consul/rpc-snapshot" // RPCSnapshot ALPN_RPCGossip = "consul/rpc-gossip" // RPCGossip + ALPN_RPCGRPC = "consul/rpc-grpc" // RPCGRPC // wan federation additions ALPN_WANGossipPacket = "consul/wan-gossip/packet" ALPN_WANGossipStream = "consul/wan-gossip/stream"