From dc04075010adc59e4d67c21c46182661dfbd4f27 Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Tue, 16 Jan 2024 08:21:51 -0600 Subject: [PATCH] Fix race condition in reconcilePeering. This resolves an issue where a peering object in the state store was incorrectly mutated by a function, resulting in the test being flagged as failing when the -race flag was used. --- agent/rpc/peering/service.go | 8 ++--- agent/rpc/peering/service_test.go | 56 ++++++++++++++++++++----------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 7f59c2941c..2c6655be66 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -764,16 +764,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ // -- ImportedServicesCount and ExportedServicesCount // NOTE: we return a new peering with this additional data func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering { + cp := copyPeering(peering) streamState, found := s.Tracker.StreamStatus(peering.ID) if !found { // TODO(peering): this may be noise on non-leaders s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+ " exported services count or reconcile peering state", "peerID", peering.ID) - peering.StreamStatus = &pbpeering.StreamStatus{} - return peering + cp.StreamStatus = &pbpeering.StreamStatus{} + return cp } else { - cp := copyPeering(peering) - // reconcile pbpeering.PeeringState_Active if streamState.Connected { cp.State = pbpeering.PeeringState_ACTIVE @@ -1160,6 +1159,5 @@ func validatePeer(peering *pbpeering.Peering, shouldDial bool) error { func copyPeering(p *pbpeering.Peering) *pbpeering.Peering { var copyP pbpeering.Peering proto.Merge(©P, p) - return ©P } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index b2b2075157..5d78588a34 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -754,6 +754,7 @@ func TestPeeringService_Read(t *testing.T) { PeerCAPems: nil, PeerServerName: "test", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p}) require.NoError(t, err) @@ -815,6 +816,7 @@ func TestPeeringService_Read_ACLEnforcement(t *testing.T) { PeerCAPems: nil, PeerServerName: "test", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p}) require.NoError(t, err) @@ -879,8 +881,10 @@ func TestPeeringService_Read_Blocking(t *testing.T) { PeerCAPems: nil, PeerServerName: "test", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } - err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p}) + toWrite := proto.Clone(p).(*pbpeering.Peering) + err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite}) require.NoError(t, err) client := pbpeering.NewPeeringServiceClient(s.ClientConn(t)) @@ -891,37 +895,44 @@ func TestPeeringService_Read_Blocking(t *testing.T) { options := structs.QueryOptions{ MinQueryIndex: lastIdx, - MaxQueryTime: 1 * time.Second, + MaxQueryTime: 10 * time.Second, } ctx, err = external.ContextWithQueryOptions(ctx, options) require.NoError(t, err) // Mutate the original peering - p = proto.Clone(p).(*pbpeering.Peering) p.PeerServerAddresses = append(p.PeerServerAddresses, "addr2") // Async change to trigger update - marker := time.Now() + recvChan := make(chan *pbpeering.PeeringReadResponse) + errChan := make(chan error) + var header metadata.MD go func() { - time.Sleep(100 * time.Millisecond) - lastIdx++ - require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p})) + resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header)) + if err != nil { + errChan <- err + return + } + recvChan <- resp }() - var header metadata.MD - resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header)) - require.NoError(t, err) + lastIdx++ + toWrite = proto.Clone(p).(*pbpeering.Peering) + require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite})) - // The query should return after the async change, but before the timeout - require.True(t, time.Since(marker) >= 100*time.Millisecond) - require.True(t, time.Since(marker) < 1*time.Second) - - // Verify query results - meta, err := external.QueryMetaFromGRPCMeta(header) - require.NoError(t, err) - require.Equal(t, lastIdx, meta.Index) - - prototest.AssertDeepEqual(t, p, resp.Peering) + select { + case err := <-errChan: + require.NoError(t, err) + case resp := <-recvChan: + meta, err := external.QueryMetaFromGRPCMeta(header) + require.NoError(t, err) + require.Equal(t, lastIdx, meta.Index) + resp.Peering.CreateIndex = 0 + resp.Peering.ModifyIndex = 0 + prototest.AssertDeepEqual(t, p, resp.Peering) + case <-time.After(2 * time.Second): + t.Error("blocking query timed out while waiting") + } } func TestPeeringService_Delete(t *testing.T) { @@ -1064,6 +1075,7 @@ func TestPeeringService_List(t *testing.T) { PeerCAPems: nil, PeerServerName: "fooservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: foo})) @@ -1075,6 +1087,7 @@ func TestPeeringService_List(t *testing.T) { PeerCAPems: nil, PeerServerName: "barservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: bar})) @@ -1120,6 +1133,7 @@ func TestPeeringService_List(t *testing.T) { PeerCAPems: nil, PeerServerName: "bazservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } go func() { time.Sleep(100 * time.Millisecond) @@ -1166,6 +1180,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) { PeerCAPems: nil, PeerServerName: "fooservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } require.NoError(t, s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: foo})) bar := &pbpeering.Peering{ @@ -1175,6 +1190,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) { PeerCAPems: nil, PeerServerName: "barservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } require.NoError(t, s.Server.FSM().State().PeeringWrite(15, &pbpeering.PeeringWriteRequest{Peering: bar}))