From 119f6a1ed7fef0dfbf88c5961a49fa8c29ed77a1 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Wed, 28 Jun 2017 12:46:25 +0200 Subject: [PATCH] rpc: monkey patch fix for data races for localState The tests that use the localState of the agent access the internal variables and call methods which are not guarded by locks creating data races in tests. While the use of internal variables is somewhat easy to spot the fact that not all methods are thread-safe is a surprise. A proper fix requires the localState struct to be moved into its own package so that tests in the agent can only access the external interface. However, the localState is currently dependent on the agent.Config which would create a circular dependency. Therefore, the Config struct needs to be moved first for this to happen. This patch literally monkey patches the use of the lock around the cases which have data races and marks them with a // todo(fs): data race comment. --- agent/agent_test.go | 14 +- agent/catalog_endpoint_test.go | 26 ++-- agent/local_test.go | 252 ++++++++++++++++++++++----------- 3 files changed, 193 insertions(+), 99 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 5a4e6c5ff2..66e8f3154a 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -893,10 +893,16 @@ func TestAgent_ConsulService(t *testing.T) { t.Fatalf("%s service should be registered", consul.ConsulServiceID) } - // Perform anti-entropy on consul service - if err := a.state.syncService(consul.ConsulServiceID); err != nil { - t.Fatalf("err: %s", err) - } + // todo(fs): data race + func() { + a.state.Lock() + defer a.state.Unlock() + + // Perform anti-entropy on consul service + if err := a.state.syncService(consul.ConsulServiceID); err != nil { + t.Fatalf("err: %s", err) + } + }() // Consul service should be in sync if !a.state.serviceStatus[consul.ConsulServiceID].inSync { diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index fe85d1fb91..aa08fcf267 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -33,16 +33,22 @@ func TestCatalogRegister(t *testing.T) { t.Fatalf("bad: %v", res) } - // Service should be in sync - if err := a.state.syncService("foo"); err != nil { - t.Fatalf("err: %s", err) - } - if _, ok := a.state.serviceStatus["foo"]; !ok { - t.Fatalf("bad: %#v", a.state.serviceStatus) - } - if !a.state.serviceStatus["foo"].inSync { - t.Fatalf("should be in sync") - } + // todo(fs): data race + func() { + a.state.Lock() + defer a.state.Unlock() + + // Service should be in sync + if err := a.state.syncService("foo"); err != nil { + t.Fatalf("err: %s", err) + } + if _, ok := a.state.serviceStatus["foo"]; !ok { + t.Fatalf("bad: %#v", a.state.serviceStatus) + } + if !a.state.serviceStatus["foo"].inSync { + t.Fatalf("should be in sync") + } + }() } func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { diff --git a/agent/local_test.go b/agent/local_test.go index 9a501b8b48..57e639cf08 100644 --- a/agent/local_test.go +++ b/agent/local_test.go @@ -102,7 +102,11 @@ func TestAgentAntiEntropy_Services(t *testing.T) { Port: 11211, } a.state.AddService(srv6, "") + + // todo(fs): data race + a.state.Lock() a.state.serviceStatus["cache"] = syncStatus{inSync: true} + a.state.Unlock() // Trigger anti-entropy run and wait a.StartSync() @@ -164,6 +168,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } + // todo(fs): data race + a.state.RLock() + defer a.state.RUnlock() + // Check the local state if len(a.state.services) != 6 { r.Fatalf("bad: %v", a.state.services) @@ -221,6 +229,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } } + // todo(fs): data race + a.state.RLock() + defer a.state.RUnlock() + // Check the local state if len(a.state.services) != 5 { r.Fatalf("bad: %v", a.state.services) @@ -325,6 +337,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { } } + // todo(fs): data race + a.state.RLock() + defer a.state.RUnlock() + for name, status := range a.state.serviceStatus { if !status.inSync { r.Fatalf("should be in sync: %v %v", name, status) @@ -357,10 +373,16 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { } a.state.AddCheck(chk, "") - // Sync the service once - if err := a.state.syncService("mysql"); err != nil { - t.Fatalf("err: %s", err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Sync the service once + if err := a.state.syncService("mysql"); err != nil { + t.Fatalf("err: %s", err) + } + }() // We should have 2 services (consul included) svcReq := structs.NodeSpecificRequest{ @@ -417,10 +439,16 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { } a.state.AddCheck(chk2, "") - // Sync the service once - if err := a.state.syncService("redis"); err != nil { - t.Fatalf("err: %s", err) - } + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Sync the service once + if err := a.state.syncService("redis"); err != nil { + t.Fatalf("err: %s", err) + } + }() // We should have 3 services (consul included) svcReq := structs.NodeSpecificRequest{ @@ -551,18 +579,24 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { } } - // Check the local state - if len(a.state.services) != 3 { - t.Fatalf("bad: %v", a.state.services) - } - if len(a.state.serviceStatus) != 3 { - t.Fatalf("bad: %v", a.state.serviceStatus) - } - for name, status := range a.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.services) != 3 { + t.Fatalf("bad: %v", a.state.services) } - } + if len(a.state.serviceStatus) != 3 { + t.Fatalf("bad: %v", a.state.serviceStatus) + } + for name, status := range a.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() } // Now remove the service and re-sync @@ -604,18 +638,24 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { } } - // Check the local state - if len(a.state.services) != 2 { - t.Fatalf("bad: %v", a.state.services) - } - if len(a.state.serviceStatus) != 2 { - t.Fatalf("bad: %v", a.state.serviceStatus) - } - for name, status := range a.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.services) != 2 { + t.Fatalf("bad: %v", a.state.services) } - } + if len(a.state.serviceStatus) != 2 { + t.Fatalf("bad: %v", a.state.serviceStatus) + } + for name, status := range a.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() } // Make sure the token got cleaned up. @@ -697,7 +737,11 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { Status: api.HealthPassing, } a.state.AddCheck(chk5, "") + + // todo(fs): data race + a.state.Lock() a.state.checkStatus["cache"] = syncStatus{inSync: true} + a.state.Unlock() // Trigger anti-entropy run and wait a.StartSync() @@ -747,18 +791,24 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } }) - // Check the local state - if len(a.state.checks) != 4 { - t.Fatalf("bad: %v", a.state.checks) - } - if len(a.state.checkStatus) != 4 { - t.Fatalf("bad: %v", a.state.checkStatus) - } - for name, status := range a.state.checkStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.checks) != 4 { + t.Fatalf("bad: %v", a.state.checks) } - } + if len(a.state.checkStatus) != 4 { + t.Fatalf("bad: %v", a.state.checkStatus) + } + for name, status := range a.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() // Make sure we sent along our node info addresses when we synced. { @@ -822,18 +872,24 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } }) - // Check the local state - if len(a.state.checks) != 3 { - t.Fatalf("bad: %v", a.state.checks) - } - if len(a.state.checkStatus) != 3 { - t.Fatalf("bad: %v", a.state.checkStatus) - } - for name, status := range a.state.checkStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.checks) != 3 { + t.Fatalf("bad: %v", a.state.checks) } - } + if len(a.state.checkStatus) != 3 { + t.Fatalf("bad: %v", a.state.checkStatus) + } + for name, status := range a.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() } func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { @@ -923,18 +979,24 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } } - // Check the local state - if len(a.state.services) != 3 { - t.Fatalf("bad: %v", a.state.services) - } - if len(a.state.serviceStatus) != 3 { - t.Fatalf("bad: %v", a.state.serviceStatus) - } - for name, status := range a.state.serviceStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state + if len(a.state.services) != 3 { + t.Fatalf("bad: %v", a.state.services) } - } + if len(a.state.serviceStatus) != 3 { + t.Fatalf("bad: %v", a.state.serviceStatus) + } + for name, status := range a.state.serviceStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() } // This check won't be allowed. @@ -1002,18 +1064,24 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } }) - // Check the local state. - if len(a.state.checks) != 2 { - t.Fatalf("bad: %v", a.state.checks) - } - if len(a.state.checkStatus) != 2 { - t.Fatalf("bad: %v", a.state.checkStatus) - } - for name, status := range a.state.checkStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state. + if len(a.state.checks) != 2 { + t.Fatalf("bad: %v", a.state.checks) } - } + if len(a.state.checkStatus) != 2 { + t.Fatalf("bad: %v", a.state.checkStatus) + } + for name, status := range a.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() // Now delete the check and wait for sync. a.state.RemoveCheck("api-check") @@ -1054,18 +1122,24 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } }) - // Check the local state. - if len(a.state.checks) != 1 { - t.Fatalf("bad: %v", a.state.checks) - } - if len(a.state.checkStatus) != 1 { - t.Fatalf("bad: %v", a.state.checkStatus) - } - for name, status := range a.state.checkStatus { - if !status.inSync { - t.Fatalf("should be in sync: %v %v", name, status) + // todo(fs): data race + func() { + a.state.RLock() + defer a.state.RUnlock() + + // Check the local state. + if len(a.state.checks) != 1 { + t.Fatalf("bad: %v", a.state.checks) } - } + if len(a.state.checkStatus) != 1 { + t.Fatalf("bad: %v", a.state.checkStatus) + } + for name, status := range a.state.checkStatus { + if !status.inSync { + t.Fatalf("should be in sync: %v %v", name, status) + } + } + }() // Make sure the token got cleaned up. if token := a.state.CheckToken("api-check"); token != "" { @@ -1320,6 +1394,10 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { func TestAgentAntiEntropy_deleteService_fails(t *testing.T) { t.Parallel() l := new(localState) + + // todo(fs): data race + l.Lock() + defer l.Unlock() if err := l.deleteService(""); err == nil { t.Fatalf("should have failed") } @@ -1328,6 +1406,10 @@ func TestAgentAntiEntropy_deleteService_fails(t *testing.T) { func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) { t.Parallel() l := new(localState) + + // todo(fs): data race + l.Lock() + defer l.Unlock() if err := l.deleteCheck(""); err == nil { t.Fatalf("should have errored") }