From eabff257d797177a24ffdec4d5f0d1a94f8ef4c4 Mon Sep 17 00:00:00 2001 From: Derek Menteer <105233703+hashi-derek@users.noreply.github.com> Date: Fri, 15 Mar 2024 13:10:48 -0500 Subject: [PATCH] Various bug-fixes and improvements (#20866) * Shuffle the list of servers returned by `pbserverdiscovery.WatchServers`. This randomizes the list of servers to help reduce the chance of clients all connecting to the same server simultaneously. Consul-dataplane is one such client that does not randomize its own list of servers. * Fix potential goroutine leak in xDS recv loop. This commit ensures that the goroutine which receives xDS messages from proxies will not block forever if the stream's context is cancelled but the `processDelta()` function never consumes the message (due to being terminated). * Add changelog. --- .changelog/20866.txt | 7 +++++++ .../services/serverdiscovery/watch_servers.go | 5 +++++ .../services/serverdiscovery/watch_servers_test.go | 6 +++--- agent/xds/delta.go | 5 ++++- 4 files changed, 19 insertions(+), 4 deletions(-) create mode 100644 .changelog/20866.txt diff --git a/.changelog/20866.txt b/.changelog/20866.txt new file mode 100644 index 0000000000..20c41f02bc --- /dev/null +++ b/.changelog/20866.txt @@ -0,0 +1,7 @@ +```release-note:improvement +api: Randomize the returned server list for the WatchServers gRPC endpoint. +``` + +```release-note:bug +connect: Fix potential goroutine leak in xDS stream handling. +``` diff --git a/agent/grpc-external/services/serverdiscovery/watch_servers.go b/agent/grpc-external/services/serverdiscovery/watch_servers.go index 94ed7ac58a..24960336c8 100644 --- a/agent/grpc-external/services/serverdiscovery/watch_servers.go +++ b/agent/grpc-external/services/serverdiscovery/watch_servers.go @@ -6,6 +6,7 @@ package serverdiscovery import ( "context" "errors" + "math/rand" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/codes" @@ -130,6 +131,10 @@ func eventToResponse(req *pbserverdiscovery.WatchServersRequest, event stream.Ev }) } + // Shuffle servers so that consul-dataplane doesn't consistently choose the same connections on startup. + rand.Shuffle(len(servers), func(i, j int) { + servers[i], servers[j] = servers[j], servers[i] + }) return &pbserverdiscovery.WatchServersResponse{ Servers: servers, }, nil diff --git a/agent/grpc-external/services/serverdiscovery/watch_servers_test.go b/agent/grpc-external/services/serverdiscovery/watch_servers_test.go index 0df48f3bb3..85daa9b606 100644 --- a/agent/grpc-external/services/serverdiscovery/watch_servers_test.go +++ b/agent/grpc-external/services/serverdiscovery/watch_servers_test.go @@ -166,7 +166,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) { // 4. See the corresponding message sent back through the stream. rsp = mustGetServers(t, rspCh) require.NotNil(t, rsp) - prototest.AssertDeepEqual(t, twoServerResponse, rsp) + prototest.AssertElementsMatch(t, twoServerResponse.Servers, rsp.Servers) // 5. Send a NewCloseSubscriptionEvent for the token secret. publisher.Publish([]stream.Event{ @@ -176,7 +176,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) { // 6. Observe another snapshot message rsp = mustGetServers(t, rspCh) require.NotNil(t, rsp) - prototest.AssertDeepEqual(t, twoServerResponse, rsp) + prototest.AssertElementsMatch(t, twoServerResponse.Servers, rsp.Servers) // 7. Publish another event to move to 3 servers. publisher.Publish([]stream.Event{ @@ -192,7 +192,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) { // seen after stream reinitialization. rsp = mustGetServers(t, rspCh) require.NotNil(t, rsp) - prototest.AssertDeepEqual(t, threeServerResponse, rsp) + prototest.AssertElementsMatch(t, threeServerResponse.Servers, rsp.Servers) } func TestWatchServers_ACLToken_AnonymousToken(t *testing.T) { diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 70b65afe27..cd4d4fb164 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -79,7 +79,10 @@ func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error { close(reqCh) return } - reqCh <- req + select { + case <-stream.Context().Done(): + case reqCh <- req: + } } }()