mirror of https://github.com/hashicorp/consul
parent
fca0df59fb
commit
67cdfc038e
|
@ -51,8 +51,8 @@ type StateSyncer struct {
|
|||
|
||||
// State contains the data that needs to be synchronized.
|
||||
State interface {
|
||||
UpdateSyncState() error
|
||||
SyncChanges() error
|
||||
SyncFull() error
|
||||
}
|
||||
|
||||
// Interval is the time between two regular sync runs.
|
||||
|
@ -91,15 +91,15 @@ func (s *StateSyncer) Run() {
|
|||
return lib.RandomStagger(time.Duration(f) * d)
|
||||
}
|
||||
|
||||
FullSync:
|
||||
Sync:
|
||||
for {
|
||||
switch err := s.State.SyncFull(); {
|
||||
switch err := s.State.UpdateSyncState(); {
|
||||
|
||||
// full sync failed
|
||||
// update sync status failed
|
||||
case err != nil:
|
||||
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||
|
||||
// retry full sync after some time or when a consul
|
||||
// retry updating sync status after some time or when a consul
|
||||
// server was added.
|
||||
select {
|
||||
|
||||
|
@ -121,8 +121,10 @@ FullSync:
|
|||
return
|
||||
}
|
||||
|
||||
// full sync OK
|
||||
// update sync status OK
|
||||
default:
|
||||
// force-trigger sync to pickup any changes
|
||||
s.triggerSync()
|
||||
|
||||
// do partial syncs until it is time for a full sync again
|
||||
for {
|
||||
|
@ -138,7 +140,7 @@ FullSync:
|
|||
// }
|
||||
|
||||
case <-time.After(s.Interval + stagger(s.Interval)):
|
||||
continue FullSync
|
||||
continue Sync
|
||||
|
||||
case <-s.TriggerCh:
|
||||
if s.Paused() {
|
||||
|
|
|
@ -538,9 +538,9 @@ func (l *State) Metadata() map[string]string {
|
|||
return m
|
||||
}
|
||||
|
||||
// updateSyncState does a read of the server state, and updates
|
||||
// UpdateSyncState does a read of the server state, and updates
|
||||
// the local sync status as appropriate
|
||||
func (l *State) updateSyncState() error {
|
||||
func (l *State) UpdateSyncState() error {
|
||||
// 1. get all checks and services from the master
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
|
@ -631,6 +631,7 @@ func (l *State) updateSyncState() error {
|
|||
}
|
||||
|
||||
for id, rc := range remoteChecks {
|
||||
|
||||
lc := l.checks[id]
|
||||
|
||||
// If we don't have the check locally, deregister it
|
||||
|
@ -638,7 +639,7 @@ func (l *State) updateSyncState() error {
|
|||
// The Serf check is created automatically and does not
|
||||
// need to be deregistered.
|
||||
if id == structs.SerfCheckID {
|
||||
l.logger.Printf("[DEBUG] Skipping remote check %q since it is managed automatically", id)
|
||||
l.logger.Printf("Skipping remote check %q since it is managed automatically", id)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -682,21 +683,6 @@ func (l *State) updateSyncState() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SyncFull determines the delta between the local and remote state
|
||||
// and synchronizes the changes.
|
||||
func (l *State) SyncFull() error {
|
||||
// note that we do not acquire the lock here since the methods
|
||||
// we are calling will do that themself.
|
||||
|
||||
// todo(fs): is it an issue that we do not hold the lock for the entire time?
|
||||
// todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever
|
||||
// todo(fs): was determined in the update step.
|
||||
if err := l.updateSyncState(); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.SyncChanges()
|
||||
}
|
||||
|
||||
// SyncChanges is used to scan the status our local services and checks
|
||||
// and update any that are out of sync with the server
|
||||
func (l *State) SyncChanges() error {
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
|
@ -20,7 +20,7 @@ import (
|
|||
|
||||
func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name()}
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -113,9 +113,8 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
InSync: true,
|
||||
})
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
||||
var services structs.IndexedNodeServices
|
||||
req := structs.NodeSpecificRequest{
|
||||
|
@ -181,9 +180,8 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
// Remove one of the services
|
||||
a.State.RemoveService("api")
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
|
@ -230,7 +228,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
|
||||
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name()}
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -277,9 +275,8 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -351,7 +348,18 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
}
|
||||
a.State.AddCheck(chk, "")
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
// todo(fs): data race
|
||||
// func() {
|
||||
// a.State.RLock()
|
||||
// defer a.State.RUnlock()
|
||||
|
||||
// // Sync the service once
|
||||
// if err := a.State.syncService("mysql"); err != nil {
|
||||
// t.Fatalf("err: %s", err)
|
||||
// }
|
||||
// }()
|
||||
// todo(fs): is this correct?
|
||||
if err := a.State.SyncChanges(); err != nil {
|
||||
t.Fatal("sync failed: ", err)
|
||||
}
|
||||
|
||||
|
@ -410,7 +418,18 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
|||
}
|
||||
a.State.AddCheck(chk2, "")
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
// todo(fs): data race
|
||||
// func() {
|
||||
// a.State.RLock()
|
||||
// defer a.State.RUnlock()
|
||||
|
||||
// // Sync the service once
|
||||
// if err := a.State.syncService("redis"); err != nil {
|
||||
// t.Fatalf("err: %s", err)
|
||||
// }
|
||||
// }()
|
||||
// todo(fs): is this correct?
|
||||
if err := a.State.SyncChanges(); err != nil {
|
||||
t.Fatal("sync failed: ", err)
|
||||
}
|
||||
|
||||
|
@ -503,9 +522,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
}
|
||||
a.State.AddService(srv2, token)
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
{
|
||||
|
@ -550,9 +569,8 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
|
||||
// Now remove the service and re-sync
|
||||
a.State.RemoveService("api")
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
{
|
||||
|
@ -601,7 +619,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
|
|||
|
||||
func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := &agent.TestAgent{Name: t.Name()}
|
||||
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
|
||||
a.Start()
|
||||
defer a.Shutdown()
|
||||
|
||||
|
@ -676,9 +694,8 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
InSync: true,
|
||||
})
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -752,9 +769,8 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
// Remove one of the checks
|
||||
a.State.RemoveCheck("redis")
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
||||
// Verify that we are in sync
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -841,9 +857,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
}
|
||||
a.State.AddService(srv2, "root")
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
{
|
||||
|
@ -912,9 +928,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
}
|
||||
a.State.AddCheck(chk2, token)
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait.
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -959,10 +975,8 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
|
||||
// Now delete the check and wait for sync.
|
||||
a.State.RemoveCheck("api-check")
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// Verify that we are in sync
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
req := structs.NodeSpecificRequest{
|
||||
|
@ -1076,9 +1090,8 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
}
|
||||
a.State.AddCheck(check, "")
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
||||
// Verify that we are in sync
|
||||
req := structs.NodeSpecificRequest{
|
||||
|
@ -1159,9 +1172,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait.
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that the output was synced back to the agent's value.
|
||||
if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
|
@ -1197,9 +1210,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
// Now make an update that should be deferred.
|
||||
a.State.UpdateCheck("web", api.HealthPassing, "deferred")
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait.
|
||||
a.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that the output is still out of sync since there's a deferred
|
||||
// update pending.
|
||||
|
@ -1259,9 +1272,8 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -1292,10 +1304,8 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := a.State.SyncFull(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
a.StartSync()
|
||||
// Wait for the sync - this should have been a sync of just the node info
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
|
|
|
@ -65,6 +65,10 @@ type TestAgent struct {
|
|||
// Key is the optional encryption key for the LAN and WAN keyring.
|
||||
Key string
|
||||
|
||||
// NoInitialSync determines whether an anti-entropy run
|
||||
// will be scheduled after the agent started.
|
||||
NoInitialSync bool
|
||||
|
||||
// dns is a reference to the first started DNS endpoint.
|
||||
// It is valid after Start().
|
||||
dns *DNSServer
|
||||
|
@ -171,9 +175,9 @@ func (a *TestAgent) Start() *TestAgent {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start the anti-entropy syncer
|
||||
a.Agent.StartSync()
|
||||
if !a.NoInitialSync {
|
||||
a.Agent.StartSync()
|
||||
}
|
||||
|
||||
var out structs.IndexedNodes
|
||||
retry.Run(&panicFailer{}, func(r *retry.R) {
|
||||
|
@ -196,7 +200,7 @@ func (a *TestAgent) Start() *TestAgent {
|
|||
r.Fatal(a.Name, "No leader")
|
||||
}
|
||||
if out.Index == 0 {
|
||||
r.Fatal(a.Name, ": Consul index is 0")
|
||||
r.Fatal(a.Name, "Consul index is 0")
|
||||
}
|
||||
} else {
|
||||
req, _ := http.NewRequest("GET", "/v1/agent/self", nil)
|
||||
|
|
Loading…
Reference in New Issue