agent: simplify anti-entropy of services with multiple checks, add tests

pull/591/head
Ryan Uber 2015-01-14 11:48:36 -08:00
parent 0c31e5851c
commit a4039aaa4d
2 changed files with 189 additions and 86 deletions

View File

@ -368,33 +368,7 @@ func (l *localState) syncChanges() error {
l.Lock()
defer l.Unlock()
// Sync the checks first. This allows registering the service in the
// same transaction as its checks.
var checkIDs []string
for id, status := range l.checkStatus {
if status.remoteDelete {
if err := l.deleteCheck(id); err != nil {
return err
}
} else if !status.inSync {
// Cancel a deferred sync
if timer, ok := l.deferCheck[id]; ok {
timer.Stop()
delete(l.deferCheck, id)
}
checkIDs = append(checkIDs, id)
} else {
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
}
}
if len(checkIDs) > 0 {
if err := l.syncChecks(checkIDs); err != nil {
return err
}
}
// Sync any remaining services.
// Sync the services
for id, status := range l.serviceStatus {
if status.remoteDelete {
if err := l.deleteService(id); err != nil {
@ -409,6 +383,26 @@ func (l *localState) syncChanges() error {
}
}
// Sync the checks
for id, status := range l.checkStatus {
if status.remoteDelete {
if err := l.deleteCheck(id); err != nil {
return err
}
} else if !status.inSync {
// Cancel a deferred sync
if timer := l.deferCheck[id]; timer != nil {
timer.Stop()
delete(l.deferCheck, id)
}
if err := l.syncCheck(id); err != nil {
return err
}
} else {
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
}
}
return nil
}
@ -455,79 +449,68 @@ func (l *localState) syncService(id string) error {
Service: l.services[id],
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}
var checks structs.HealthChecks
for _, check := range l.checks {
if check.ServiceID == id {
if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync {
checks = append(checks, check)
}
}
}
if len(checks) == 1 {
req.Check = checks[0]
} else {
req.Checks = checks
}
var out struct{}
err := l.iface.RPC("Catalog.Register", &req, &out)
if err == nil {
l.serviceStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
}
} else if strings.Contains(err.Error(), permissionDenied) {
l.serviceStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
}
return nil
}
return err
}
// syncChecks is used to sync checks to the server. If a check is associated
// with a service and the service is out of sync, it will piggyback with the
// sync so that it is updated as part of the same transaction.
func (l *localState) syncChecks(checkIDs []string) error {
checkMap := make(map[string]structs.HealthChecks)
for _, id := range checkIDs {
if check, ok := l.checks[id]; ok {
checkMap[check.ServiceID] = append(checkMap[check.ServiceID], check)
// syncCheck is used to sync a service to the server
func (l *localState) syncCheck(id string) error {
// Pull in the associated service if any
check := l.checks[id]
var service *structs.NodeService
if check.ServiceID != "" {
if serv, ok := l.services[check.ServiceID]; ok {
service = serv
}
}
for serviceID, checks := range checkMap {
// Create the sync request
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}
// Attach the service if it should also be synced
if service, ok := l.services[serviceID]; ok {
if status, ok := l.serviceStatus[serviceID]; ok && !status.inSync {
req.Service = service
}
}
// Send single Check element for backwards compat with 0.4.x
if len(checks) == 1 {
req.Check = checks[0]
} else {
req.Checks = checks
}
// Perform the sync
var out struct{}
if err := l.iface.RPC("Catalog.Register", &req, &out); err != nil {
if strings.Contains(err.Error(), permissionDenied) {
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
l.logger.Printf(
"[WARN] agent: Check '%s' registration blocked by ACLs",
check.CheckID)
}
return nil
}
return err
}
// Mark the checks and services as synced
if req.Service != nil {
l.serviceStatus[serviceID] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced service '%s'", serviceID)
}
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced check '%s'", check.CheckID)
}
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
Service: service,
Check: l.checks[id],
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}
return nil
var out struct{}
err := l.iface.RPC("Catalog.Register", &req, &out)
if err == nil {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
} else if strings.Contains(err.Error(), permissionDenied) {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
return nil
}
return err
}

View File

@ -155,6 +155,126 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}
}
func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
conf := nextConfig()
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
testutil.WaitForLeader(t, agent.RPC, "dc1")
{
// Single check
srv := &structs.NodeService{
ID: "mysql",
Service: "mysql",
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv)
chk := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "mysql",
Name: "mysql",
ServiceID: "mysql",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk)
// Sync the service once
if err := agent.state.syncService("mysql"); err != nil {
t.Fatalf("err: %s", err)
}
// We should have 2 services (consul included)
svcReq := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
}
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &svcReq, &services); err != nil {
t.Fatalf("err: %v", err)
}
if len(services.NodeServices.Services) != 2 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}
// We should have one health check
chkReq := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "mysql",
}
var checks structs.IndexedHealthChecks
if err := agent.RPC("Health.ServiceChecks", &chkReq, &checks); err != nil {
t.Fatalf("err: %v", err)
}
if len(checks.HealthChecks) != 1 {
t.Fatalf("bad: %v", checks)
}
}
{
// Multiple checks
srv := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"master"},
Port: 5000,
}
agent.state.AddService(srv)
chk1 := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "redis:1",
Name: "redis:1",
ServiceID: "redis",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk1)
chk2 := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "redis:2",
Name: "redis:2",
ServiceID: "redis",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk2)
// Sync the service once
if err := agent.state.syncService("redis"); err != nil {
t.Fatalf("err: %s", err)
}
// We should have 3 services (consul included)
svcReq := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: agent.config.NodeName,
}
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &svcReq, &services); err != nil {
t.Fatalf("err: %v", err)
}
if len(services.NodeServices.Services) != 3 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}
// We should have two health checks
chkReq := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "redis",
}
var checks structs.IndexedHealthChecks
if err := agent.RPC("Health.ServiceChecks", &chkReq, &checks); err != nil {
t.Fatalf("err: %v", err)
}
if len(checks.HealthChecks) != 2 {
t.Fatalf("bad: %v", checks)
}
}
}
func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
conf := nextConfig()
conf.ACLDatacenter = "dc1"