From 8bcd5040c74cdc35c034d9319f37e92b47856718 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 14 Oct 2020 16:47:16 -0400 Subject: [PATCH] agent/grpc: Add an integration test for ClientPool with TLS Also deregister the resolver.Builder in tests. --- agent/grpc/client_test.go | 57 +++++++++++++++++++++++++-------- agent/grpc/resolver/resolver.go | 13 -------- agent/grpc/server_test.go | 54 ++++++++++++++++++++++++++----- agent/setup.go | 17 +++++++++- 4 files changed, 106 insertions(+), 35 deletions(-) diff --git a/agent/grpc/client_test.go b/agent/grpc/client_test.go index 8504d088c6..38ecc40aa7 100644 --- a/agent/grpc/client_test.go +++ b/agent/grpc/client_test.go @@ -5,14 +5,17 @@ import ( "fmt" "net" "strings" + "sync/atomic" "testing" "time" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/grpc/internal/testservice" "github.com/hashicorp/consul/agent/grpc/resolver" "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/tlsutil" ) func TestNewDialer_WithTLSWrapper(t *testing.T) { @@ -42,14 +45,43 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) { require.True(t, called, "expected TLSWrapper to be called") } -// TODO: integration test TestNewDialer with TLS and rcp server, when the rpc -// exists as an isolated component. +func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) { + res := resolver.NewServerResolverBuilder(newConfig(t)) + registerWithGRPC(t, res) + + srv := newTestServer(t, "server-1", "dc1") + tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{ + VerifyIncoming: true, + VerifyOutgoing: true, + CAFile: "../../test/hostname/CertAuth.crt", + CertFile: "../../test/hostname/Alice.crt", + KeyFile: "../../test/hostname/Alice.key", + }, hclog.New(nil)) + require.NoError(t, err) + srv.rpc.tlsConf = tlsConf + + res.AddServer(srv.Metadata()) + t.Cleanup(srv.shutdown) + + pool := NewClientConnPool(res, TLSWrapper(tlsConf.OutgoingRPCWrapper())) + + conn, err := pool.ClientConn("dc1") + require.NoError(t, err) + client := testservice.NewSimpleClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + t.Cleanup(cancel) + + resp, err := client.Something(ctx, &testservice.Req{}) + require.NoError(t, err) + require.Equal(t, "server-1", resp.ServerName) + require.True(t, atomic.LoadInt32(&srv.rpc.tlsConnEstablished) > 0) +} func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) { count := 4 - cfg := resolver.Config{Scheme: newScheme(t.Name())} - res := resolver.NewServerResolverBuilder(cfg) - resolver.RegisterWithGRPC(res) + res := resolver.NewServerResolverBuilder(newConfig(t)) + registerWithGRPC(t, res) pool := NewClientConnPool(res, nil) for i := 0; i < count; i++ { @@ -76,17 +108,17 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) { require.NotEqual(t, resp.ServerName, first.ServerName) } -func newScheme(n string) string { +func newConfig(t *testing.T) resolver.Config { + n := t.Name() s := strings.Replace(n, "/", "", -1) s = strings.Replace(s, "_", "", -1) - return strings.ToLower(s) + return resolver.Config{Scheme: strings.ToLower(s)} } func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) { count := 5 - cfg := resolver.Config{Scheme: newScheme(t.Name())} - res := resolver.NewServerResolverBuilder(cfg) - resolver.RegisterWithGRPC(res) + res := resolver.NewServerResolverBuilder(newConfig(t)) + registerWithGRPC(t, res) pool := NewClientConnPool(res, nil) for i := 0; i < count; i++ { @@ -134,9 +166,8 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) { func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) { dcs := []string{"dc1", "dc2", "dc3"} - cfg := resolver.Config{Scheme: newScheme(t.Name())} - res := resolver.NewServerResolverBuilder(cfg) - resolver.RegisterWithGRPC(res) + res := resolver.NewServerResolverBuilder(newConfig(t)) + registerWithGRPC(t, res) pool := NewClientConnPool(res, nil) for _, dc := range dcs { diff --git a/agent/grpc/resolver/resolver.go b/agent/grpc/resolver/resolver.go index 4c6874ddca..76a2188d2f 100644 --- a/agent/grpc/resolver/resolver.go +++ b/agent/grpc/resolver/resolver.go @@ -12,19 +12,6 @@ import ( "github.com/hashicorp/consul/agent/metadata" ) -var registerLock sync.Mutex - -// RegisterWithGRPC registers the ServerResolverBuilder as a grpc/resolver. -// This function exists to synchronize registrations with a lock. -// grpc/resolver.Register expects all registration to happen at init and does -// not allow for concurrent registration. This function exists to support -// parallel testing. -func RegisterWithGRPC(b *ServerResolverBuilder) { - registerLock.Lock() - defer registerLock.Unlock() - resolver.Register(b) -} - // ServerResolverBuilder tracks the current server list and keeps any // ServerResolvers updated when changes occur. type ServerResolverBuilder struct { diff --git a/agent/grpc/server_test.go b/agent/grpc/server_test.go index 413ca34b70..b660a66a73 100644 --- a/agent/grpc/server_test.go +++ b/agent/grpc/server_test.go @@ -2,19 +2,23 @@ package grpc import ( "context" + "crypto/tls" "fmt" "io" "net" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/resolver" "github.com/hashicorp/consul/agent/grpc/internal/testservice" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/consul/tlsutil" ) type testServer struct { @@ -22,10 +26,16 @@ type testServer struct { name string dc string shutdown func() + rpc *fakeRPCListener } func (s testServer) Metadata() *metadata.Server { - return &metadata.Server{ID: s.name, Datacenter: s.dc, Addr: s.addr} + return &metadata.Server{ + ID: s.name, + Datacenter: s.dc, + Addr: s.addr, + UseTLS: s.rpc.tlsConf != nil, + } } func newTestServer(t *testing.T, name string, dc string) testServer { @@ -56,6 +66,7 @@ func newTestServer(t *testing.T, name string, dc string) testServer { addr: lis.Addr(), name: name, dc: dc, + rpc: rpc, shutdown: func() { rpc.shutdown = true if err := lis.Close(); err != nil { @@ -97,9 +108,11 @@ func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice. // For now, since this logic is in agent/consul, we can't easily use Server.listen // so we fake it. type fakeRPCListener struct { - t *testing.T - handler *Handler - shutdown bool + t *testing.T + handler *Handler + shutdown bool + tlsConf *tlsutil.Configurator + tlsConnEstablished int32 } func (f *fakeRPCListener) listen(listener net.Listener) error { @@ -128,11 +141,36 @@ func (f *fakeRPCListener) handleConn(conn net.Conn) { } typ := pool.RPCType(buf[0]) - if typ == pool.RPCGRPC { + switch typ { + + case pool.RPCGRPC: f.handler.Handle(conn) return - } - fmt.Println("ERROR: unexpected byte", typ) - conn.Close() + case pool.RPCTLS: + // occasionally we see a test client connecting to an rpc listener that + // was created as part of another test, despite none of the tests running + // in parallel. + // Maybe some strange grpc behaviour? I'm not sure. + if f.tlsConf == nil { + fmt.Println("ERROR: tls is not configured") + conn.Close() + return + } + + atomic.AddInt32(&f.tlsConnEstablished, 1) + conn = tls.Server(conn, f.tlsConf.IncomingRPCConfig()) + f.handleConn(conn) + + default: + fmt.Println("ERROR: unexpected byte", typ) + conn.Close() + } +} + +func registerWithGRPC(t *testing.T, b resolver.Builder) { + resolver.Register(b) + t.Cleanup(func() { + resolver.UnregisterForTesting(b.Scheme()) + }) } diff --git a/agent/setup.go b/agent/setup.go index 7c65777c9c..c7fe7f523b 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -5,10 +5,12 @@ import ( "io" "net" "net/http" + "sync" "time" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" + grpcresolver "google.golang.org/grpc/resolver" autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" @@ -88,7 +90,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) builder := resolver.NewServerResolverBuilder(resolver.Config{}) - resolver.RegisterWithGRPC(builder) + registerWithGRPC(builder) d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper())) d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder) @@ -162,3 +164,16 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil } return pool } + +var registerLock sync.Mutex + +// registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver. +// This function exists to synchronize registrations with a lock. +// grpc/resolver.Register expects all registration to happen at init and does +// not allow for concurrent registration. This function exists to support +// parallel testing. +func registerWithGRPC(b grpcresolver.Builder) { + registerLock.Lock() + defer registerLock.Unlock() + grpcresolver.Register(b) +}