diff --git a/agent/ae/ae.go b/agent/ae/ae.go index f3b6745036..b055f7aa17 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -51,8 +51,8 @@ type StateSyncer struct { // State contains the data that needs to be synchronized. State interface { - UpdateSyncState() error SyncChanges() error + SyncFull() error } // Interval is the time between two regular sync runs. @@ -91,15 +91,15 @@ func (s *StateSyncer) Run() { return lib.RandomStagger(time.Duration(f) * d) } -Sync: +FullSync: for { - switch err := s.State.UpdateSyncState(); { + switch err := s.State.SyncFull(); { - // update sync status failed + // full sync failed case err != nil: s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - // retry updating sync status after some time or when a consul + // retry full sync after some time or when a consul // server was added. select { @@ -121,10 +121,8 @@ Sync: return } - // update sync status OK + // full sync OK default: - // force-trigger sync to pickup any changes - s.triggerSync() // do partial syncs until it is time for a full sync again for { @@ -140,7 +138,7 @@ Sync: // } case <-time.After(s.Interval + stagger(s.Interval)): - continue Sync + continue FullSync case <-s.TriggerCh: if s.Paused() { diff --git a/agent/local/state.go b/agent/local/state.go index 5058eed0bf..b980c569f5 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -538,9 +538,9 @@ func (l *State) Metadata() map[string]string { return m } -// UpdateSyncState does a read of the server state, and updates +// updateSyncState does a read of the server state, and updates // the local sync status as appropriate -func (l *State) UpdateSyncState() error { +func (l *State) updateSyncState() error { // 1. get all checks and services from the master req := structs.NodeSpecificRequest{ Datacenter: l.config.Datacenter, @@ -631,7 +631,6 @@ func (l *State) UpdateSyncState() error { } for id, rc := range remoteChecks { - lc := l.checks[id] // If we don't have the check locally, deregister it @@ -639,7 +638,7 @@ func (l *State) UpdateSyncState() error { // The Serf check is created automatically and does not // need to be deregistered. if id == structs.SerfCheckID { - l.logger.Printf("Skipping remote check %q since it is managed automatically", id) + l.logger.Printf("[DEBUG] Skipping remote check %q since it is managed automatically", id) continue } @@ -683,6 +682,21 @@ func (l *State) UpdateSyncState() error { return nil } +// SyncFull determines the delta between the local and remote state +// and synchronizes the changes. +func (l *State) SyncFull() error { + // note that we do not acquire the lock here since the methods + // we are calling will do that themself. + + // todo(fs): is it an issue that we do not hold the lock for the entire time? + // todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever + // todo(fs): was determined in the update step. + if err := l.updateSyncState(); err != nil { + return err + } + return l.SyncChanges() +} + // SyncChanges is used to scan the status our local services and checks // and update any that are out of sync with the server func (l *State) SyncChanges() error { diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 89136e88fe..5646462d66 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" @@ -20,7 +20,7 @@ import ( func TestAgentAntiEntropy_Services(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() @@ -113,8 +113,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) { InSync: true, }) - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } var services structs.IndexedNodeServices req := structs.NodeSpecificRequest{ @@ -180,8 +181,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) { // Remove one of the services a.State.RemoveService("api") - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } retry.Run(t, func(r *retry.R) { if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil { @@ -228,7 +230,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() @@ -275,8 +277,9 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { t.Fatalf("err: %v", err) } - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } req := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -348,18 +351,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { } a.State.AddCheck(chk, "") - // todo(fs): data race - // func() { - // a.State.RLock() - // defer a.State.RUnlock() - - // // Sync the service once - // if err := a.State.syncService("mysql"); err != nil { - // t.Fatalf("err: %s", err) - // } - // }() - // todo(fs): is this correct? - if err := a.State.SyncChanges(); err != nil { + if err := a.State.SyncFull(); err != nil { t.Fatal("sync failed: ", err) } @@ -418,18 +410,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { } a.State.AddCheck(chk2, "") - // todo(fs): data race - // func() { - // a.State.RLock() - // defer a.State.RUnlock() - - // // Sync the service once - // if err := a.State.syncService("redis"); err != nil { - // t.Fatalf("err: %s", err) - // } - // }() - // todo(fs): is this correct? - if err := a.State.SyncChanges(); err != nil { + if err := a.State.SyncFull(); err != nil { t.Fatal("sync failed: ", err) } @@ -522,9 +503,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { } a.State.AddService(srv2, token) - // Trigger anti-entropy run and wait - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync { @@ -569,8 +550,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { // Now remove the service and re-sync a.State.RemoveService("api") - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync { @@ -619,7 +601,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { func TestAgentAntiEntropy_Checks(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() @@ -694,8 +676,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { InSync: true, }) - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } req := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -769,8 +752,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { // Remove one of the checks a.State.RemoveCheck("redis") - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync retry.Run(t, func(r *retry.R) { @@ -857,9 +841,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } a.State.AddService(srv2, "root") - // Trigger anti-entropy run and wait - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync { @@ -928,9 +912,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } a.State.AddCheck(chk2, token) - // Trigger anti-entropy run and wait. - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync retry.Run(t, func(r *retry.R) { @@ -975,8 +959,10 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { // Now delete the check and wait for sync. a.State.RemoveCheck("api-check") - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } + // Verify that we are in sync retry.Run(t, func(r *retry.R) { req := structs.NodeSpecificRequest{ @@ -1090,8 +1076,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { } a.State.AddCheck(check, "") - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync req := structs.NodeSpecificRequest{ @@ -1172,9 +1159,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { } } - // Trigger anti-entropy run and wait. - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that the output was synced back to the agent's value. if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil { @@ -1210,9 +1197,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { // Now make an update that should be deferred. a.State.UpdateCheck("web", api.HealthPassing, "deferred") - // Trigger anti-entropy run and wait. - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that the output is still out of sync since there's a deferred // update pending. @@ -1272,8 +1259,9 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { t.Fatalf("err: %v", err) } - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } req := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -1304,8 +1292,10 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { t.Fatalf("err: %v", err) } - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } + // Wait for the sync - this should have been a sync of just the node info retry.Run(t, func(r *retry.R) { if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil { diff --git a/agent/testagent.go b/agent/testagent.go index 9b99cd6471..1c4806d36c 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -65,10 +65,6 @@ type TestAgent struct { // Key is the optional encryption key for the LAN and WAN keyring. Key string - // NoInitialSync determines whether an anti-entropy run - // will be scheduled after the agent started. - NoInitialSync bool - // dns is a reference to the first started DNS endpoint. // It is valid after Start(). dns *DNSServer @@ -175,9 +171,9 @@ func (a *TestAgent) Start() *TestAgent { } } } - if !a.NoInitialSync { - a.Agent.StartSync() - } + + // Start the anti-entropy syncer + a.Agent.StartSync() var out structs.IndexedNodes retry.Run(&panicFailer{}, func(r *retry.R) { @@ -200,7 +196,7 @@ func (a *TestAgent) Start() *TestAgent { r.Fatal(a.Name, "No leader") } if out.Index == 0 { - r.Fatal(a.Name, "Consul index is 0") + r.Fatal(a.Name, ": Consul index is 0") } } else { req, _ := http.NewRequest("GET", "/v1/agent/self", nil)