From c94eaa4957d98415b2450220815c50bf89526db3 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 1 Jun 2021 17:49:31 -0400 Subject: [PATCH 1/2] submatview: improve a couple comments --- agent/submatview/store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agent/submatview/store.go b/agent/submatview/store.go index cf99857089..80e9f30b7d 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -30,7 +30,7 @@ type Store struct { // idleTTL is the duration of time an entry should remain in the Store after the // last request for that entry has been terminated. It is a field on the struct - // so that it can be patched in tests without need a lock. + // so that it can be patched in tests without needing a global lock. idleTTL time.Duration } @@ -122,8 +122,8 @@ func (s *Store) Get(ctx context.Context, req Request) (Result, error) { defer cancel() result, err := materializer.getFromView(ctx, info.MinIndex) - // context.DeadlineExceeded is translated to nil to match the behaviour of - // agent/cache.Cache.Get. + // context.DeadlineExceeded is translated to nil to match the timeout + // behaviour of agent/cache.Cache.Get. if err == nil || errors.Is(err, context.DeadlineExceeded) { return result, nil } From 29e93f63382779457b627c3ad1948bbc3262580b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 1 Jun 2021 18:31:52 -0400 Subject: [PATCH 2/2] grpc: fix a data race by using a static resolver We have seen test flakes caused by 'concurrent map read and map write', and the race detector reports the problem as well (prevent us from running some tests with -race). The root of the problem is the grpc expects resolvers to be registered at init time before any requests are made, but we were using a separate resolver for each test. This commit introduces a resolver registry. The registry is registered as the single resolver for the consul scheme. Each test uses the Authority section of the target (instead of the scheme) to identify the resolver that should be used for the test. The scheme is used for lookup, which is why it can no longer be used as the unique key. This allows us to use a lock around the map of resolvers, preventing the data race. --- agent/consul/subscribe_backend_test.go | 21 +++------- agent/grpc/client.go | 6 +-- agent/grpc/client_test.go | 9 ++++- agent/grpc/resolver/registry.go | 54 ++++++++++++++++++++++++++ agent/grpc/resolver/resolver.go | 23 +++++------ agent/grpc/server_test.go | 8 ---- agent/setup.go | 16 +------- 7 files changed, 81 insertions(+), 56 deletions(-) create mode 100644 agent/grpc/resolver/registry.go diff --git a/agent/consul/subscribe_backend_test.go b/agent/consul/subscribe_backend_test.go index e7debf0ab5..5b412574e6 100644 --- a/agent/consul/subscribe_backend_test.go +++ b/agent/consul/subscribe_backend_test.go @@ -12,7 +12,6 @@ import ( "github.com/stretchr/testify/require" gogrpc "google.golang.org/grpc" - grpcresolver "google.golang.org/grpc/resolver" grpc "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/grpc/resolver" @@ -338,8 +337,11 @@ func TestSubscribeBackend_IntegrationWithServer_DeliversAllMessages(t *testing.T } func newClientWithGRPCResolver(t *testing.T, ops ...func(*Config)) (*Client, *resolver.ServerResolverBuilder) { - builder := resolver.NewServerResolverBuilder(resolver.Config{Scheme: t.Name()}) - registerWithGRPC(builder) + builder := resolver.NewServerResolverBuilder(resolver.Config{Authority: t.Name()}) + resolver.Register(builder) + t.Cleanup(func() { + resolver.Deregister(builder.Authority()) + }) _, config := testClientConfig(t) for _, op := range ops { @@ -361,19 +363,6 @@ func newClientWithGRPCResolver(t *testing.T, ops ...func(*Config)) (*Client, *re return client, builder } -var grpcRegisterLock sync.Mutex - -// registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver. -// This function exists to synchronize registrations with a lock. -// grpc/resolver.Register expects all registration to happen at init and does -// not allow for concurrent registration. This function exists to support -// parallel testing. -func registerWithGRPC(b grpcresolver.Builder) { - grpcRegisterLock.Lock() - defer grpcRegisterLock.Unlock() - grpcresolver.Register(b) -} - type testLogger interface { Logf(format string, args ...interface{}) } diff --git a/agent/grpc/client.go b/agent/grpc/client.go index 8e43873828..e58d5c6496 100644 --- a/agent/grpc/client.go +++ b/agent/grpc/client.go @@ -25,10 +25,10 @@ type ClientConnPool struct { type ServerLocator interface { // ServerForAddr is used to look up server metadata from an address. ServerForAddr(addr string) (*metadata.Server, error) - // Scheme returns the url scheme to use to dial the server. This is primarily + // Authority returns the target authority to use to dial the server. This is primarily // needed for testing multiple agents in parallel, because gRPC requires the // resolver to be registered globally. - Scheme() string + Authority() string } // TLSWrapper wraps a non-TLS connection and returns a connection with TLS @@ -58,7 +58,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) } conn, err := grpc.Dial( - fmt.Sprintf("%s:///server.%s", c.servers.Scheme(), datacenter), + fmt.Sprintf("consul://%s/server.%s", c.servers.Authority(), datacenter), // use WithInsecure mode here because we handle the TLS wrapping in the // custom dialer based on logic around whether the server has TLS enabled. grpc.WithInsecure(), diff --git a/agent/grpc/client_test.go b/agent/grpc/client_test.go index 5028e34fa9..49922a3098 100644 --- a/agent/grpc/client_test.go +++ b/agent/grpc/client_test.go @@ -117,7 +117,7 @@ func newConfig(t *testing.T) resolver.Config { n := t.Name() s := strings.Replace(n, "/", "", -1) s = strings.Replace(s, "_", "", -1) - return resolver.Config{Scheme: strings.ToLower(s)} + return resolver.Config{Authority: strings.ToLower(s)} } func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) { @@ -195,3 +195,10 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) { require.Equal(t, resp.Datacenter, dc) } } + +func registerWithGRPC(t *testing.T, b *resolver.ServerResolverBuilder) { + resolver.Register(b) + t.Cleanup(func() { + resolver.Deregister(b.Authority()) + }) +} diff --git a/agent/grpc/resolver/registry.go b/agent/grpc/resolver/registry.go new file mode 100644 index 0000000000..d305b607d7 --- /dev/null +++ b/agent/grpc/resolver/registry.go @@ -0,0 +1,54 @@ +package resolver + +import ( + "fmt" + "sync" + + "google.golang.org/grpc/resolver" +) + +// registry of ServerResolverBuilder. This type exists because grpc requires that +// resolvers are registered globally before any requests are made. This is +// incompatible with our resolver implementation and testing strategy, which +// requires a different Resolver for each test. +type registry struct { + lock sync.RWMutex + byAuthority map[string]*ServerResolverBuilder +} + +func (r *registry) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { + r.lock.RLock() + defer r.lock.RUnlock() + res, ok := r.byAuthority[target.Authority] + if !ok { + return nil, fmt.Errorf("no resolver registered for %v", target.Authority) + } + return res.Build(target, cc, opts) +} + +func (r *registry) Scheme() string { + return "consul" +} + +var _ resolver.Builder = (*registry)(nil) + +var reg = ®istry{byAuthority: make(map[string]*ServerResolverBuilder)} + +func init() { + resolver.Register(reg) +} + +// Register a ServerResolverBuilder with the global registry. +func Register(res *ServerResolverBuilder) { + reg.lock.Lock() + defer reg.lock.Unlock() + reg.byAuthority[res.Authority()] = res +} + +// Deregister the ServerResolverBuilder associated with the authority. Only used +// for testing. +func Deregister(authority string) { + reg.lock.Lock() + defer reg.lock.Unlock() + delete(reg.byAuthority, authority) +} diff --git a/agent/grpc/resolver/resolver.go b/agent/grpc/resolver/resolver.go index 76a2188d2f..b3eae815ff 100644 --- a/agent/grpc/resolver/resolver.go +++ b/agent/grpc/resolver/resolver.go @@ -15,9 +15,7 @@ import ( // ServerResolverBuilder tracks the current server list and keeps any // ServerResolvers updated when changes occur. type ServerResolverBuilder struct { - // scheme used to query the server. Defaults to consul. Used to support - // parallel testing because gRPC registers resolvers globally. - scheme string + cfg Config // servers is an index of Servers by Server.ID. The map contains server IDs // for all datacenters. servers map[string]*metadata.Server @@ -28,25 +26,22 @@ type ServerResolverBuilder struct { lock sync.RWMutex } -var _ resolver.Builder = (*ServerResolverBuilder)(nil) - type Config struct { - // Scheme used to connect to the server. Defaults to consul. - Scheme string + // Authority used to query the server. Defaults to "". Used to support + // parallel testing because gRPC registers resolvers globally. + Authority string } func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder { - if cfg.Scheme == "" { - cfg.Scheme = "consul" - } return &ServerResolverBuilder{ - scheme: cfg.Scheme, + cfg: cfg, servers: make(map[string]*metadata.Server), resolvers: make(map[resolver.ClientConn]*serverResolver), } } -// Rebalance shuffles the server list for resolvers in all datacenters. +// NewRebalancer returns a function which shuffles the server list for resolvers +// in all datacenters. func (s *ServerResolverBuilder) NewRebalancer(dc string) func() { shuffler := rand.New(rand.NewSource(time.Now().UnixNano())) return func() { @@ -112,7 +107,9 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client return resolver, nil } -func (s *ServerResolverBuilder) Scheme() string { return s.scheme } +func (s *ServerResolverBuilder) Authority() string { + return s.cfg.Authority +} // AddServer updates the resolvers' states to include the new server's address. func (s *ServerResolverBuilder) AddServer(server *metadata.Server) { diff --git a/agent/grpc/server_test.go b/agent/grpc/server_test.go index b660a66a73..442b617d50 100644 --- a/agent/grpc/server_test.go +++ b/agent/grpc/server_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/resolver" "github.com/hashicorp/consul/agent/grpc/internal/testservice" "github.com/hashicorp/consul/agent/metadata" @@ -167,10 +166,3 @@ func (f *fakeRPCListener) handleConn(conn net.Conn) { conn.Close() } } - -func registerWithGRPC(t *testing.T, b resolver.Builder) { - resolver.Register(b) - t.Cleanup(func() { - resolver.UnregisterForTesting(b.Scheme()) - }) -} diff --git a/agent/setup.go b/agent/setup.go index 93fd9e6cc5..bfa4abfade 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -11,7 +11,6 @@ import ( "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" - grpcresolver "google.golang.org/grpc/resolver" autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" @@ -105,7 +104,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) builder := resolver.NewServerResolverBuilder(resolver.Config{}) - registerWithGRPC(builder) + resolver.Register(builder) d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), d.TLSConfigurator.UseTLS) d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder) @@ -169,19 +168,6 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil return pool } -var registerLock sync.Mutex - -// registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver. -// This function exists to synchronize registrations with a lock. -// grpc/resolver.Register expects all registration to happen at init and does -// not allow for concurrent registration. This function exists to support -// parallel testing. -func registerWithGRPC(b grpcresolver.Builder) { - registerLock.Lock() - defer registerLock.Unlock() - grpcresolver.Register(b) -} - // getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends // all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics. func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) {