Browse Source

local state: fix anti-entropy state tests

The anti-entropy tests relied on the side-effect of the StartSync()
method to perform a full sync instead of a partial sync. This lead to
multiple anti-entropy go routines being started unnecessary retry loops.

This change changes the behavior to perform synchronous full syncs when
necessary removing the need for all of the time.Sleep and most of the
retry loops.
pull/3585/head
Frank Schroeder 7 years ago committed by Frank Schröder
parent
commit
f8e20cd996
  1. 16
      agent/ae/ae.go
  2. 22
      agent/local/state.go
  3. 116
      agent/local/state_test.go
  4. 12
      agent/testagent.go

16
agent/ae/ae.go

@ -51,8 +51,8 @@ type StateSyncer struct {
// State contains the data that needs to be synchronized.
State interface {
UpdateSyncState() error
SyncChanges() error
SyncFull() error
}
// Interval is the time between two regular sync runs.
@ -91,15 +91,15 @@ func (s *StateSyncer) Run() {
return lib.RandomStagger(time.Duration(f) * d)
}
Sync:
FullSync:
for {
switch err := s.State.UpdateSyncState(); {
switch err := s.State.SyncFull(); {
// update sync status failed
// full sync failed
case err != nil:
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
// retry updating sync status after some time or when a consul
// retry full sync after some time or when a consul
// server was added.
select {
@ -121,10 +121,8 @@ Sync:
return
}
// update sync status OK
// full sync OK
default:
// force-trigger sync to pickup any changes
s.triggerSync()
// do partial syncs until it is time for a full sync again
for {
@ -140,7 +138,7 @@ Sync:
// }
case <-time.After(s.Interval + stagger(s.Interval)):
continue Sync
continue FullSync
case <-s.TriggerCh:
if s.Paused() {

22
agent/local/state.go

@ -538,9 +538,9 @@ func (l *State) Metadata() map[string]string {
return m
}
// UpdateSyncState does a read of the server state, and updates
// updateSyncState does a read of the server state, and updates
// the local sync status as appropriate
func (l *State) UpdateSyncState() error {
func (l *State) updateSyncState() error {
// 1. get all checks and services from the master
req := structs.NodeSpecificRequest{
Datacenter: l.config.Datacenter,
@ -631,7 +631,6 @@ func (l *State) UpdateSyncState() error {
}
for id, rc := range remoteChecks {
lc := l.checks[id]
// If we don't have the check locally, deregister it
@ -639,7 +638,7 @@ func (l *State) UpdateSyncState() error {
// The Serf check is created automatically and does not
// need to be deregistered.
if id == structs.SerfCheckID {
l.logger.Printf("Skipping remote check %q since it is managed automatically", id)
l.logger.Printf("[DEBUG] Skipping remote check %q since it is managed automatically", id)
continue
}
@ -683,6 +682,21 @@ func (l *State) UpdateSyncState() error {
return nil
}
// SyncFull determines the delta between the local and remote state
// and synchronizes the changes.
func (l *State) SyncFull() error {
// note that we do not acquire the lock here since the methods
// we are calling will do that themself.
// todo(fs): is it an issue that we do not hold the lock for the entire time?
// todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever
// todo(fs): was determined in the update step.
if err := l.updateSyncState(); err != nil {
return err
}
return l.SyncChanges()
}
// SyncChanges is used to scan the status our local services and checks
// and update any that are out of sync with the server
func (l *State) SyncChanges() error {

116
agent/local/state_test.go

@ -7,8 +7,8 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
@ -20,7 +20,7 @@ import (
func TestAgentAntiEntropy_Services(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
a := &agent.TestAgent{Name: t.Name()}
a.Start()
defer a.Shutdown()
@ -113,8 +113,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
InSync: true,
})
// Trigger anti-entropy run and wait
a.StartSync()
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
var services structs.IndexedNodeServices
req := structs.NodeSpecificRequest{
@ -180,8 +181,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
// Remove one of the services
a.State.RemoveService("api")
// Trigger anti-entropy run and wait
a.StartSync()
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
retry.Run(t, func(r *retry.R) {
if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil {
@ -228,7 +230,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
a := &agent.TestAgent{Name: t.Name()}
a.Start()
defer a.Shutdown()
@ -275,8 +277,9 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Trigger anti-entropy run and wait
a.StartSync()
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
@ -348,18 +351,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
}
a.State.AddCheck(chk, "")
// todo(fs): data race
// func() {
// a.State.RLock()
// defer a.State.RUnlock()
// // Sync the service once
// if err := a.State.syncService("mysql"); err != nil {
// t.Fatalf("err: %s", err)
// }
// }()
// todo(fs): is this correct?
if err := a.State.SyncChanges(); err != nil {
if err := a.State.SyncFull(); err != nil {
t.Fatal("sync failed: ", err)
}
@ -418,18 +410,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
}
a.State.AddCheck(chk2, "")
// todo(fs): data race
// func() {
// a.State.RLock()
// defer a.State.RUnlock()
// // Sync the service once
// if err := a.State.syncService("redis"); err != nil {
// t.Fatalf("err: %s", err)
// }
// }()
// todo(fs): is this correct?
if err := a.State.SyncChanges(); err != nil {
if err := a.State.SyncFull(); err != nil {
t.Fatal("sync failed: ", err)
}
@ -522,9 +503,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
}
a.State.AddService(srv2, token)
// Trigger anti-entropy run and wait
a.StartSync()
time.Sleep(200 * time.Millisecond)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that we are in sync
{
@ -569,8 +550,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
// Now remove the service and re-sync
a.State.RemoveService("api")
a.StartSync()
time.Sleep(200 * time.Millisecond)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that we are in sync
{
@ -619,7 +601,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
a := &agent.TestAgent{Name: t.Name()}
a.Start()
defer a.Shutdown()
@ -694,8 +676,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
InSync: true,
})
// Trigger anti-entropy run and wait
a.StartSync()
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
@ -769,8 +752,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
// Remove one of the checks
a.State.RemoveCheck("redis")
// Trigger anti-entropy run and wait
a.StartSync()
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that we are in sync
retry.Run(t, func(r *retry.R) {
@ -857,9 +841,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
}
a.State.AddService(srv2, "root")
// Trigger anti-entropy run and wait
a.StartSync()
time.Sleep(200 * time.Millisecond)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that we are in sync
{
@ -928,9 +912,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
}
a.State.AddCheck(chk2, token)
// Trigger anti-entropy run and wait.
a.StartSync()
time.Sleep(200 * time.Millisecond)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that we are in sync
retry.Run(t, func(r *retry.R) {
@ -975,8 +959,10 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
// Now delete the check and wait for sync.
a.State.RemoveCheck("api-check")
a.StartSync()
time.Sleep(200 * time.Millisecond)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that we are in sync
retry.Run(t, func(r *retry.R) {
req := structs.NodeSpecificRequest{
@ -1090,8 +1076,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
}
a.State.AddCheck(check, "")
// Trigger anti-entropy run and wait
a.StartSync()
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that we are in sync
req := structs.NodeSpecificRequest{
@ -1172,9 +1159,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
}
}
// Trigger anti-entropy run and wait.
a.StartSync()
time.Sleep(200 * time.Millisecond)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that the output was synced back to the agent's value.
if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil {
@ -1210,9 +1197,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
// Now make an update that should be deferred.
a.State.UpdateCheck("web", api.HealthPassing, "deferred")
// Trigger anti-entropy run and wait.
a.StartSync()
time.Sleep(200 * time.Millisecond)
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Verify that the output is still out of sync since there's a deferred
// update pending.
@ -1272,8 +1259,9 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Trigger anti-entropy run and wait
a.StartSync()
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
@ -1304,8 +1292,10 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Trigger anti-entropy run and wait
a.StartSync()
if err := a.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the sync - this should have been a sync of just the node info
retry.Run(t, func(r *retry.R) {
if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil {

12
agent/testagent.go

@ -65,10 +65,6 @@ type TestAgent struct {
// Key is the optional encryption key for the LAN and WAN keyring.
Key string
// NoInitialSync determines whether an anti-entropy run
// will be scheduled after the agent started.
NoInitialSync bool
// dns is a reference to the first started DNS endpoint.
// It is valid after Start().
dns *DNSServer
@ -175,9 +171,9 @@ func (a *TestAgent) Start() *TestAgent {
}
}
}
if !a.NoInitialSync {
a.Agent.StartSync()
}
// Start the anti-entropy syncer
a.Agent.StartSync()
var out structs.IndexedNodes
retry.Run(&panicFailer{}, func(r *retry.R) {
@ -200,7 +196,7 @@ func (a *TestAgent) Start() *TestAgent {
r.Fatal(a.Name, "No leader")
}
if out.Index == 0 {
r.Fatal(a.Name, "Consul index is 0")
r.Fatal(a.Name, ": Consul index is 0")
}
} else {
req, _ := http.NewRequest("GET", "/v1/agent/self", nil)

Loading…
Cancel
Save