Merge pull request #10396 from hashicorp/dnephin/fix-more-data-races

Fix some data races
pull/10639/head
Daniel Nephin 2021-07-16 18:21:58 -04:00 committed by GitHub
commit 499250cbf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 122 additions and 55 deletions

View File

@ -18,6 +18,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -3729,10 +3730,27 @@ func TestAgent_SecurityChecks(t *testing.T) {
defer a.Shutdown()
data := make([]byte, 0, 8192)
bytesBuffer := bytes.NewBuffer(data)
a.LogOutput = bytesBuffer
buf := &syncBuffer{b: bytes.NewBuffer(data)}
a.LogOutput = buf
assert.NoError(t, a.Start(t))
assert.Contains(t, bytesBuffer.String(), "using enable-script-checks without ACLs and without allow_write_http_from is DANGEROUS")
assert.Contains(t, buf.String(), "using enable-script-checks without ACLs and without allow_write_http_from is DANGEROUS")
}
type syncBuffer struct {
lock sync.RWMutex
b *bytes.Buffer
}
func (b *syncBuffer) Write(data []byte) (int, error) {
b.lock.Lock()
defer b.lock.Unlock()
return b.b.Write(data)
}
func (b *syncBuffer) String() string {
b.lock.Lock()
defer b.lock.Unlock()
return b.b.String()
}
func TestAgent_ReloadConfigOutgoingRPCConfig(t *testing.T) {

View File

@ -310,43 +310,51 @@ func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) {
func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) {
switch policyID {
case "acl-ro":
return true, &structs.ACLPolicy{
p := &structs.ACLPolicy{
ID: "acl-ro",
Name: "acl-ro",
Description: "acl-ro",
Rules: `acl = "read"`,
Syntax: acl.SyntaxCurrent,
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
}, nil
}
p.SetHash(false)
return true, p, nil
case "acl-wr":
return true, &structs.ACLPolicy{
p := &structs.ACLPolicy{
ID: "acl-wr",
Name: "acl-wr",
Description: "acl-wr",
Rules: `acl = "write"`,
Syntax: acl.SyntaxCurrent,
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
}, nil
}
p.SetHash(false)
return true, p, nil
case "service-ro":
return true, &structs.ACLPolicy{
p := &structs.ACLPolicy{
ID: "service-ro",
Name: "service-ro",
Description: "service-ro",
Rules: `service_prefix "" { policy = "read" }`,
Syntax: acl.SyntaxCurrent,
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
}, nil
}
p.SetHash(false)
return true, p, nil
case "service-wr":
return true, &structs.ACLPolicy{
p := &structs.ACLPolicy{
ID: "service-wr",
Name: "service-wr",
Description: "service-wr",
Rules: `service_prefix "" { policy = "write" }`,
Syntax: acl.SyntaxCurrent,
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
}, nil
}
p.SetHash(false)
return true, p, nil
case "node-wr":
return true, &structs.ACLPolicy{
p := &structs.ACLPolicy{
ID: "node-wr",
Name: "node-wr",
Description: "node-wr",
@ -354,9 +362,11 @@ func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) {
Syntax: acl.SyntaxCurrent,
Datacenters: []string{"dc1"},
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
}, nil
}
p.SetHash(false)
return true, p, nil
case "dc2-key-wr":
return true, &structs.ACLPolicy{
p := &structs.ACLPolicy{
ID: "dc2-key-wr",
Name: "dc2-key-wr",
Description: "dc2-key-wr",
@ -364,7 +374,9 @@ func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) {
Syntax: acl.SyntaxCurrent,
Datacenters: []string{"dc2"},
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
}, nil
}
p.SetHash(false)
return true, p, nil
default:
return testPolicyForIDEnterprise(policyID)
}

View File

@ -373,7 +373,10 @@ func TestLeader_Vault_PrimaryCA_IntermediateRenew(t *testing.T) {
}
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
defer func() {
s1.Shutdown()
s1.leaderRoutineManager.Wait()
}()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -482,7 +485,10 @@ func TestLeader_SecondaryCA_IntermediateRenew(t *testing.T) {
}
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
defer func() {
s1.Shutdown()
s1.leaderRoutineManager.Wait()
}()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -493,7 +499,10 @@ func TestLeader_SecondaryCA_IntermediateRenew(t *testing.T) {
c.Build = "1.6.0"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
defer func() {
s2.Shutdown()
s2.leaderRoutineManager.Wait()
}()
// Create the WAN link
joinWAN(t, s2, s1)

View File

@ -20,6 +20,11 @@ import (
"time"
"github.com/NYTimes/gziphandler"
cleanhttp "github.com/hashicorp/go-cleanhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
@ -27,10 +32,6 @@ import (
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
cleanhttp "github.com/hashicorp/go-cleanhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
)
func TestHTTPServer_UnixSocket(t *testing.T) {
@ -632,7 +633,7 @@ func TestHTTP_wrap_obfuscateLog(t *testing.T) {
}
t.Parallel()
buf := new(bytes.Buffer)
buf := &syncBuffer{b: new(bytes.Buffer)}
a := StartTestAgent(t, TestAgent{LogOutput: buf})
defer a.Shutdown()

View File

@ -8,11 +8,12 @@ import (
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestServiceManager_RegisterService(t *testing.T) {
@ -330,26 +331,27 @@ func TestServiceManager_PersistService_API(t *testing.T) {
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Now register a sidecar proxy via the API.
svc := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 21000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8000,
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
DestinationNamespace: "default",
LocalBindPort: 5000,
newNodeService := func() *structs.NodeService {
return &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 21000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8000,
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
DestinationNamespace: "default",
LocalBindPort: 5000,
},
},
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
}
expectState := &structs.NodeService{
@ -385,6 +387,7 @@ func TestServiceManager_PersistService_API(t *testing.T) {
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
svc := newNodeService()
svcID := svc.CompoundServiceID()
svcFile := filepath.Join(a.Config.DataDir, servicesDir, svcID.StringHash())
@ -443,8 +446,15 @@ func TestServiceManager_PersistService_API(t *testing.T) {
}
// Updates service definition on disk
svc = newNodeService()
svc.Proxy.LocalServicePort = 8001
require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote))
err = a.AddService(AddServiceRequest{
Service: svc,
persist: true,
token: "mytoken",
Source: ConfigSourceRemote,
})
require.NoError(err)
requireFileIsPresent(t, svcFile)
requireFileIsPresent(t, configFile)

View File

@ -53,6 +53,8 @@ type TestAgent struct {
Config *config.RuntimeConfig
// LogOutput is the sink for the logs. If nil, logs are written to os.Stderr.
// The io.Writer must allow concurrent reads and writes. Note that
// bytes.Buffer is not safe for concurrent reads and writes.
LogOutput io.Writer
// DataDir may be set to a directory which exists. If is it not set,
@ -343,8 +345,8 @@ func (a *TestAgent) Client() *api.Client {
// DNSDisableCompression disables compression for all started DNS servers.
func (a *TestAgent) DNSDisableCompression(b bool) {
for _, srv := range a.dnsServers {
cfg := srv.config.Load().(*dnsConfig)
cfg.DisableCompression = b
a.config.DNSDisableCompression = b
srv.ReloadConfig(a.config)
}
}

2
go.mod
View File

@ -55,7 +55,7 @@ require (
github.com/hashicorp/raft v1.3.1
github.com/hashicorp/raft-autopilot v0.1.5
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/serf v0.9.5
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086
github.com/hashicorp/yamux v0.0.0-20200609203250-aecfd211c9ce
github.com/imdario/mergo v0.3.6

3
go.sum
View File

@ -291,8 +291,9 @@ github.com/hashicorp/raft-autopilot v0.1.5 h1:onEfMH5uHVdXQqtas36zXUHEZxLdsJVu/n
github.com/hashicorp/raft-autopilot v0.1.5/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM=
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9 h1:lCZfMBDn/Puwg9VosHMf/9p9jNDYYkbzVjb4jYjVfqU=
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9/go.mod h1:qapjppkpNXHYTyzx+HqkyWGGkmUxafHjuspm/Bqb2Jc=
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 h1:OKsyxKi2sNmqm1Gv93adf2AID2FOBFdCbbZn9fGtIdg=
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086/go.mod h1:R3Umvhlxi2TN7Ex2hzOowyeNb+SfbVWI973N+ctaFMk=
github.com/hashicorp/vault/sdk v0.1.14-0.20200519221838-e0cfd64bc267 h1:e1ok06zGrWJW91rzRroyl5nRNqraaBe4d5hiKcVZuHM=

View File

@ -24,6 +24,10 @@ func (r *routineTracker) running() bool {
}
}
func (r *routineTracker) wait() {
<-r.stoppedCh
}
type Manager struct {
lock sync.RWMutex
logger hclog.Logger
@ -131,6 +135,8 @@ func (m *Manager) stopInstance(name string) *routineTracker {
return instance
}
// StopAll goroutines. Once StopAll is called, it is no longer safe to add no
// goroutines to the Manager.
func (m *Manager) StopAll() {
m.lock.Lock()
defer m.lock.Unlock()
@ -142,7 +148,14 @@ func (m *Manager) StopAll() {
m.logger.Debug("stopping routine", "routine", name)
routine.cancel()
}
// just wipe out the entire map
m.routines = make(map[string]*routineTracker)
}
// Wait for all goroutines to stop after StopAll is called.
func (m *Manager) Wait() {
m.lock.Lock()
defer m.lock.Unlock()
for _, routine := range m.routines {
routine.wait()
}
}

View File

@ -958,7 +958,7 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) {
member.Status = StatusAlive
member.leaveTime = time.Time{}
member.Addr = net.IP(n.Addr)
member.Addr = n.Addr
member.Port = n.Port
member.Tags = s.decodeTags(n.Meta)
}
@ -1088,6 +1088,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
// handleNodeLeaveIntent is called when an intent to leave is received.
func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
state := s.State()
// Witness a potentially newer time
s.clock.Witness(leaveMsg.LTime)
@ -1108,7 +1109,7 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
// Refute us leaving if we are in the alive state
// Must be done in another goroutine since we have the memberLock
if leaveMsg.Node == s.config.NodeName && s.state == SerfAlive {
if leaveMsg.Node == s.config.NodeName && state == SerfAlive {
s.logger.Printf("[DEBUG] serf: Refuting an older leave intent")
go s.broadcastJoin(s.clock.Time())
return false
@ -1639,7 +1640,6 @@ func (s *Serf) reconnect() {
// Select a random member to try and join
idx := rand.Int31n(int32(n))
mem := s.failedMembers[idx]
s.memberLock.RUnlock()
// Format the addr
addr := net.UDPAddr{IP: mem.Addr, Port: int(mem.Port)}
@ -1649,6 +1649,7 @@ func (s *Serf) reconnect() {
if mem.Name != "" {
joinAddr = mem.Name + "/" + addr.String()
}
s.memberLock.RUnlock()
// Attempt to join at the memberlist level
s.memberlist.Join([]string{joinAddr})

2
vendor/modules.txt vendored
View File

@ -489,7 +489,7 @@ github.com/hashicorp/raft
github.com/hashicorp/raft-autopilot
# github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/raft-boltdb
# github.com/hashicorp/serf v0.9.5
# github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9
github.com/hashicorp/serf/coordinate
github.com/hashicorp/serf/serf
# github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086