mirror of https://github.com/hashicorp/consul
Fix race condition in reconcilePeering.
This resolves an issue where a peering object in the state store was incorrectly mutated by a function, resulting in the test being flagged as failing when the -race flag was used.pull/20212/head
parent
91dfaad67b
commit
dc04075010
|
@ -764,16 +764,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
|
|||
// -- ImportedServicesCount and ExportedServicesCount
|
||||
// NOTE: we return a new peering with this additional data
|
||||
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
|
||||
cp := copyPeering(peering)
|
||||
streamState, found := s.Tracker.StreamStatus(peering.ID)
|
||||
if !found {
|
||||
// TODO(peering): this may be noise on non-leaders
|
||||
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
|
||||
" exported services count or reconcile peering state", "peerID", peering.ID)
|
||||
peering.StreamStatus = &pbpeering.StreamStatus{}
|
||||
return peering
|
||||
cp.StreamStatus = &pbpeering.StreamStatus{}
|
||||
return cp
|
||||
} else {
|
||||
cp := copyPeering(peering)
|
||||
|
||||
// reconcile pbpeering.PeeringState_Active
|
||||
if streamState.Connected {
|
||||
cp.State = pbpeering.PeeringState_ACTIVE
|
||||
|
@ -1160,6 +1159,5 @@ func validatePeer(peering *pbpeering.Peering, shouldDial bool) error {
|
|||
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
|
||||
var copyP pbpeering.Peering
|
||||
proto.Merge(©P, p)
|
||||
|
||||
return ©P
|
||||
}
|
||||
|
|
|
@ -754,6 +754,7 @@ func TestPeeringService_Read(t *testing.T) {
|
|||
PeerCAPems: nil,
|
||||
PeerServerName: "test",
|
||||
PeerServerAddresses: []string{"addr1"},
|
||||
StreamStatus: &pbpeering.StreamStatus{},
|
||||
}
|
||||
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
|
||||
require.NoError(t, err)
|
||||
|
@ -815,6 +816,7 @@ func TestPeeringService_Read_ACLEnforcement(t *testing.T) {
|
|||
PeerCAPems: nil,
|
||||
PeerServerName: "test",
|
||||
PeerServerAddresses: []string{"addr1"},
|
||||
StreamStatus: &pbpeering.StreamStatus{},
|
||||
}
|
||||
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
|
||||
require.NoError(t, err)
|
||||
|
@ -879,8 +881,10 @@ func TestPeeringService_Read_Blocking(t *testing.T) {
|
|||
PeerCAPems: nil,
|
||||
PeerServerName: "test",
|
||||
PeerServerAddresses: []string{"addr1"},
|
||||
StreamStatus: &pbpeering.StreamStatus{},
|
||||
}
|
||||
err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p})
|
||||
toWrite := proto.Clone(p).(*pbpeering.Peering)
|
||||
err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite})
|
||||
require.NoError(t, err)
|
||||
|
||||
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
|
||||
|
@ -891,37 +895,44 @@ func TestPeeringService_Read_Blocking(t *testing.T) {
|
|||
|
||||
options := structs.QueryOptions{
|
||||
MinQueryIndex: lastIdx,
|
||||
MaxQueryTime: 1 * time.Second,
|
||||
MaxQueryTime: 10 * time.Second,
|
||||
}
|
||||
ctx, err = external.ContextWithQueryOptions(ctx, options)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Mutate the original peering
|
||||
p = proto.Clone(p).(*pbpeering.Peering)
|
||||
p.PeerServerAddresses = append(p.PeerServerAddresses, "addr2")
|
||||
|
||||
// Async change to trigger update
|
||||
marker := time.Now()
|
||||
recvChan := make(chan *pbpeering.PeeringReadResponse)
|
||||
errChan := make(chan error)
|
||||
var header metadata.MD
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
lastIdx++
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p}))
|
||||
resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header))
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
recvChan <- resp
|
||||
}()
|
||||
|
||||
var header metadata.MD
|
||||
resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header))
|
||||
require.NoError(t, err)
|
||||
lastIdx++
|
||||
toWrite = proto.Clone(p).(*pbpeering.Peering)
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite}))
|
||||
|
||||
// The query should return after the async change, but before the timeout
|
||||
require.True(t, time.Since(marker) >= 100*time.Millisecond)
|
||||
require.True(t, time.Since(marker) < 1*time.Second)
|
||||
|
||||
// Verify query results
|
||||
meta, err := external.QueryMetaFromGRPCMeta(header)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastIdx, meta.Index)
|
||||
|
||||
prototest.AssertDeepEqual(t, p, resp.Peering)
|
||||
select {
|
||||
case err := <-errChan:
|
||||
require.NoError(t, err)
|
||||
case resp := <-recvChan:
|
||||
meta, err := external.QueryMetaFromGRPCMeta(header)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastIdx, meta.Index)
|
||||
resp.Peering.CreateIndex = 0
|
||||
resp.Peering.ModifyIndex = 0
|
||||
prototest.AssertDeepEqual(t, p, resp.Peering)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("blocking query timed out while waiting")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeeringService_Delete(t *testing.T) {
|
||||
|
@ -1064,6 +1075,7 @@ func TestPeeringService_List(t *testing.T) {
|
|||
PeerCAPems: nil,
|
||||
PeerServerName: "fooservername",
|
||||
PeerServerAddresses: []string{"addr1"},
|
||||
StreamStatus: &pbpeering.StreamStatus{},
|
||||
}
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: foo}))
|
||||
|
||||
|
@ -1075,6 +1087,7 @@ func TestPeeringService_List(t *testing.T) {
|
|||
PeerCAPems: nil,
|
||||
PeerServerName: "barservername",
|
||||
PeerServerAddresses: []string{"addr1"},
|
||||
StreamStatus: &pbpeering.StreamStatus{},
|
||||
}
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: bar}))
|
||||
|
||||
|
@ -1120,6 +1133,7 @@ func TestPeeringService_List(t *testing.T) {
|
|||
PeerCAPems: nil,
|
||||
PeerServerName: "bazservername",
|
||||
PeerServerAddresses: []string{"addr1"},
|
||||
StreamStatus: &pbpeering.StreamStatus{},
|
||||
}
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
@ -1166,6 +1180,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) {
|
|||
PeerCAPems: nil,
|
||||
PeerServerName: "fooservername",
|
||||
PeerServerAddresses: []string{"addr1"},
|
||||
StreamStatus: &pbpeering.StreamStatus{},
|
||||
}
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: foo}))
|
||||
bar := &pbpeering.Peering{
|
||||
|
@ -1175,6 +1190,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) {
|
|||
PeerCAPems: nil,
|
||||
PeerServerName: "barservername",
|
||||
PeerServerAddresses: []string{"addr1"},
|
||||
StreamStatus: &pbpeering.StreamStatus{},
|
||||
}
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(15, &pbpeering.PeeringWriteRequest{Peering: bar}))
|
||||
|
||||
|
|
Loading…
Reference in New Issue