mirror of https://github.com/hashicorp/consul
local state: fix anti-entropy state tests
The anti-entropy tests relied on the side-effect of the StartSync() method to perform a full sync instead of a partial sync. This lead to multiple anti-entropy go routines being started unnecessary retry loops. This change changes the behavior to perform synchronous full syncs when necessary removing the need for all of the time.Sleep and most of the retry loops.pull/3609/head
parent
37b95ef98e
commit
71c74e62c7
|
@ -51,8 +51,8 @@ type StateSyncer struct {
|
|||
|
||||
// State contains the data that needs to be synchronized.
|
||||
State interface {
|
||||
UpdateSyncState() error
|
||||
SyncChanges() error
|
||||
SyncFull() error
|
||||
}
|
||||
|
||||
// Interval is the time between two regular sync runs.
|
||||
|
@ -91,15 +91,15 @@ func (s *StateSyncer) Run() {
|
|||
return lib.RandomStagger(time.Duration(f) * d)
|
||||
}
|
||||
|
||||
Sync:
|
||||
FullSync:
|
||||
for {
|
||||
switch err := s.State.UpdateSyncState(); {
|
||||
switch err := s.State.SyncFull(); {
|
||||
|
||||
// update sync status failed
|
||||
// full sync failed
|
||||
case err != nil:
|
||||
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||
|
||||
// retry updating sync status after some time or when a consul
|
||||
// retry full sync after some time or when a consul
|
||||
// server was added.
|
||||
select {
|
||||
|
||||
|
@ -121,10 +121,8 @@ Sync:
|
|||
return
|
||||
}
|
||||
|
||||
// update sync status OK
|
||||
// full sync OK
|
||||
default:
|
||||
// force-trigger sync to pickup any changes
|
||||
s.triggerSync()
|
||||
|
||||
// do partial syncs until it is time for a full sync again
|
||||
for {
|
||||
|
@ -140,7 +138,7 @@ Sync:
|
|||
// }
|
||||
|
||||
case <-time.After(s.Interval + stagger(s.Interval)):
|
||||
continue Sync
|
||||
continue FullSync
|
||||
|
||||
case <-s.TriggerCh:
|
||||
if s.Paused() {
|
||||
|
|
|
@ -538,9 +538,9 @@ func (l *State) Metadata() map[string]string {
|
|||
return m
|
||||
}
|
||||
|
||||
// UpdateSyncState does a read of the server state, and updates
|
||||
// updateSyncState does a read of the server state, and updates
|
||||
// the local sync status as appropriate
|
||||
func (l *State) UpdateSyncState() error {
|
||||
func (l *State) updateSyncState() error {
|
||||
// 1. get all checks and services from the master
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
|
@ -631,7 +631,6 @@ func (l *State) UpdateSyncState() error {
|
|||
}
|
||||
|
||||
for id, rc := range remoteChecks {
|
||||
|
||||
lc := l.checks[id]
|
||||
|
||||
// If we don't have the check locally, deregister it
|
||||
|
@ -639,7 +638,7 @@ func (l *State) UpdateSyncState() error {
|
|||
// The Serf check is created automatically and does not
|
||||
// need to be deregistered.
|
||||
if id == structs.SerfCheckID {
|
||||
l.logger.Printf("Skipping remote check %q since it is managed automatically", id)
|
||||
l.logger.Printf("[DEBUG] Skipping remote check %q since it is managed automatically", id)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -683,6 +682,21 @@ func (l *State) UpdateSyncState() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SyncFull determines the delta between the local and remote state
|
||||
// and synchronizes the changes.
|
||||
func (l *State) SyncFull() error {
|
||||
// note that we do not acquire the lock here since the methods
|
||||
// we are calling will do that themself.
|
||||
|
||||
// todo(fs): is it an issue that we do not hold the lock for the entire time?
|
||||
// todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever
|
||||
// todo(fs): was determined in the update step.
|
||||
if err := l.updateSyncState(); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.SyncChanges()
|
||||
}
|
||||
|
||||
// SyncChanges is used to scan the status our local services and checks
|
||||
// and update any that are out of sync with the server
|
||||
func (l *State) SyncChanges() error {
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
|
@ -20,7 +20,7 @@ import (
|
|||
|
||||
func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a := &agent.TestAgent{Name: t.Name()}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -113,8 +113,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
InSync: true,
|
||||
})
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
var services structs.IndexedNodeServices
|
||||
req := structs.NodeSpecificRequest{
|
||||
|
@ -180,8 +181,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
// Remove one of the services
|
||||
a.State.RemoveService("api")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
|
@ -228,7 +230,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
|
||||
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a := &agent.TestAgent{Name: t.Name()}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -275,8 +277,9 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -348,18 +351,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
}
|
||||
a.State.AddCheck(chk, "")
|
||||
|
||||
// todo(fs): data race
|
||||
// func() {
|
||||
// a.State.RLock()
|
||||
// defer a.State.RUnlock()
|
||||
|
||||
// // Sync the service once
|
||||
// if err := a.State.syncService("mysql"); err != nil {
|
||||
// t.Fatalf("err: %s", err)
|
||||
// }
|
||||
// }()
|
||||
// todo(fs): is this correct?
|
||||
if err := a.State.SyncChanges(); err != nil {
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatal("sync failed: ", err)
|
||||
}
|
||||
|
||||
|
@ -418,18 +410,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
}
|
||||
a.State.AddCheck(chk2, "")
|
||||
|
||||
// todo(fs): data race
|
||||
// func() {
|
||||
// a.State.RLock()
|
||||
// defer a.State.RUnlock()
|
||||
|
||||
// // Sync the service once
|
||||
// if err := a.State.syncService("redis"); err != nil {
|
||||
// t.Fatalf("err: %s", err)
|
||||
// }
|
||||
// }()
|
||||
// todo(fs): is this correct?
|
||||
if err := a.State.SyncChanges(); err != nil {
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatal("sync failed: ", err)
|
||||
}
|
||||
|
||||
|
@ -522,9 +503,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
}
|
||||
a.State.AddService(srv2, token)
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that we are in sync
|
||||
{
|
||||
|
@ -569,8 +550,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
|
||||
// Now remove the service and re-sync
|
||||
a.State.RemoveService("api")
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that we are in sync
|
||||
{
|
||||
|
@ -619,7 +601,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
|
||||
func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a := &agent.TestAgent{Name: t.Name()}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -694,8 +676,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
InSync: true,
|
||||
})
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -769,8 +752,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
// Remove one of the checks
|
||||
a.State.RemoveCheck("redis")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that we are in sync
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -857,9 +841,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
}
|
||||
a.State.AddService(srv2, "root")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that we are in sync
|
||||
{
|
||||
|
@ -928,9 +912,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
}
|
||||
a.State.AddCheck(chk2, token)
|
||||
|
||||
// Trigger anti-entropy run and wait.
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that we are in sync
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -975,8 +959,10 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
|
||||
// Now delete the check and wait for sync.
|
||||
a.State.RemoveCheck("api-check")
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that we are in sync
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
req := structs.NodeSpecificRequest{
|
||||
|
@ -1090,8 +1076,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
}
|
||||
a.State.AddCheck(check, "")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that we are in sync
|
||||
req := structs.NodeSpecificRequest{
|
||||
|
@ -1172,9 +1159,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait.
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that the output was synced back to the agent's value.
|
||||
if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
|
@ -1210,9 +1197,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
// Now make an update that should be deferred.
|
||||
a.State.UpdateCheck("web", api.HealthPassing, "deferred")
|
||||
|
||||
// Trigger anti-entropy run and wait.
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify that the output is still out of sync since there's a deferred
|
||||
// update pending.
|
||||
|
@ -1272,8 +1259,9 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -1304,8 +1292,10 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the sync - this should have been a sync of just the node info
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
|
|
|
@ -65,10 +65,6 @@ type TestAgent struct {
|
|||
// Key is the optional encryption key for the LAN and WAN keyring.
|
||||
Key string
|
||||
|
||||
// NoInitialSync determines whether an anti-entropy run
|
||||
// will be scheduled after the agent started.
|
||||
NoInitialSync bool
|
||||
|
||||
// dns is a reference to the first started DNS endpoint.
|
||||
// It is valid after Start().
|
||||
dns *DNSServer
|
||||
|
@ -175,9 +171,9 @@ func (a *TestAgent) Start() *TestAgent {
|
|||
}
|
||||
}
|
||||
}
|
||||
if !a.NoInitialSync {
|
||||
|
||||
// Start the anti-entropy syncer
|
||||
a.Agent.StartSync()
|
||||
}
|
||||
|
||||
var out structs.IndexedNodes
|
||||
retry.Run(&panicFailer{}, func(r *retry.R) {
|
||||
|
@ -200,7 +196,7 @@ func (a *TestAgent) Start() *TestAgent {
|
|||
r.Fatal(a.Name, "No leader")
|
||||
}
|
||||
if out.Index == 0 {
|
||||
r.Fatal(a.Name, "Consul index is 0")
|
||||
r.Fatal(a.Name, ": Consul index is 0")
|
||||
}
|
||||
} else {
|
||||
req, _ := http.NewRequest("GET", "/v1/agent/self", nil)
|
||||
|
|
Loading…
Reference in New Issue