mirror of https://github.com/hashicorp/consul
Merge pull request #850 from hashicorp/f-ae-missing
Anti-entropy sync services/checks missing entirely from Consulpull/858/head
commit
585fd2a044
|
@ -310,24 +310,47 @@ func (l *localState) setSyncState() error {
|
|||
if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
||||
return err
|
||||
}
|
||||
services := out1.NodeServices
|
||||
checks := out2.HealthChecks
|
||||
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
if services != nil {
|
||||
for id, service := range services.Services {
|
||||
// If we don't have the service locally, deregister it
|
||||
existing, ok := l.services[id]
|
||||
if !ok {
|
||||
l.serviceStatus[id] = syncStatus{remoteDelete: true}
|
||||
continue
|
||||
}
|
||||
services := make(map[string]*structs.NodeService)
|
||||
if out1.NodeServices != nil {
|
||||
services = out1.NodeServices.Services
|
||||
}
|
||||
|
||||
// If our definition is different, we need to update it
|
||||
equal := reflect.DeepEqual(existing, service)
|
||||
l.serviceStatus[id] = syncStatus{inSync: equal}
|
||||
for id, _ := range l.services {
|
||||
// If the local service doesn't exist remotely, then sync it
|
||||
if _, ok := services[id]; !ok {
|
||||
l.serviceStatus[id] = syncStatus{inSync: false}
|
||||
}
|
||||
}
|
||||
|
||||
for id, service := range services {
|
||||
// If we don't have the service locally, deregister it
|
||||
existing, ok := l.services[id]
|
||||
if !ok {
|
||||
l.serviceStatus[id] = syncStatus{remoteDelete: true}
|
||||
continue
|
||||
}
|
||||
|
||||
// If our definition is different, we need to update it
|
||||
equal := reflect.DeepEqual(existing, service)
|
||||
l.serviceStatus[id] = syncStatus{inSync: equal}
|
||||
}
|
||||
|
||||
for id, _ := range l.checks {
|
||||
// Sync any check which doesn't exist on the remote side
|
||||
found := false
|
||||
for _, check := range checks {
|
||||
if check.CheckID == id {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
l.checkStatus[id] = syncStatus{inSync: false}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -88,6 +88,16 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
}
|
||||
agent.state.AddService(srv5)
|
||||
|
||||
// Exists local, in sync, remote missing (create)
|
||||
srv6 := &structs.NodeService{
|
||||
ID: "cache",
|
||||
Service: "cache",
|
||||
Tags: []string{},
|
||||
Port: 11211,
|
||||
}
|
||||
agent.state.AddService(srv6)
|
||||
agent.state.serviceStatus["cache"] = syncStatus{inSync: true}
|
||||
|
||||
srv5_mod := new(structs.NodeService)
|
||||
*srv5_mod = *srv5
|
||||
srv5_mod.Address = "127.0.0.1"
|
||||
|
@ -110,8 +120,8 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 5 services (consul included)
|
||||
if len(services.NodeServices.Services) != 5 {
|
||||
// We should have 6 services (consul included)
|
||||
if len(services.NodeServices.Services) != 6 {
|
||||
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
|
||||
|
@ -134,6 +144,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
if !reflect.DeepEqual(serv, srv5) {
|
||||
t.Fatalf("bad: %v %v", serv, srv5)
|
||||
}
|
||||
case "cache":
|
||||
if !reflect.DeepEqual(serv, srv6) {
|
||||
t.Fatalf("bad: %v %v", serv, srv6)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
|
@ -142,10 +156,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 5 {
|
||||
if len(agent.state.services) != 6 {
|
||||
t.Fatalf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 5 {
|
||||
if len(agent.state.serviceStatus) != 6 {
|
||||
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
|
@ -439,6 +453,16 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Exists local, in sync, remote missing (create)
|
||||
chk5 := &structs.HealthCheck{
|
||||
Node: agent.config.NodeName,
|
||||
CheckID: "cache",
|
||||
Name: "cache",
|
||||
Status: structs.HealthPassing,
|
||||
}
|
||||
agent.state.AddCheck(chk5)
|
||||
agent.state.checkStatus["cache"] = syncStatus{inSync: true}
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
@ -453,8 +477,8 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 4 services (serf included)
|
||||
if len(checks.HealthChecks) != 4 {
|
||||
// We should have 5 checks (serf included)
|
||||
if len(checks.HealthChecks) != 5 {
|
||||
t.Fatalf("bad: %v", checks)
|
||||
}
|
||||
|
||||
|
@ -473,6 +497,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
if !reflect.DeepEqual(chk, chk3) {
|
||||
t.Fatalf("bad: %v %v", chk, chk3)
|
||||
}
|
||||
case "cache":
|
||||
if !reflect.DeepEqual(chk, chk5) {
|
||||
t.Fatalf("bad: %v %v", chk, chk5)
|
||||
}
|
||||
case "serfHealth":
|
||||
// ignore
|
||||
default:
|
||||
|
@ -481,10 +509,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.checks) != 3 {
|
||||
if len(agent.state.checks) != 4 {
|
||||
t.Fatalf("bad: %v", agent.state.checks)
|
||||
}
|
||||
if len(agent.state.checkStatus) != 3 {
|
||||
if len(agent.state.checkStatus) != 4 {
|
||||
t.Fatalf("bad: %v", agent.state.checkStatus)
|
||||
}
|
||||
for name, status := range agent.state.checkStatus {
|
||||
|
|
Loading…
Reference in New Issue