From b7587cac42849ee9738a8fb7550731976fccec17 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Nov 2014 23:58:02 -0800 Subject: [PATCH] agent: allow config reload to modify checks/services persistence This change consolidates loading services and checks from both config and persisted state into methods on the agent. As part of this, we introduce optional persistence when calling RemoveCheck/RemoveService. Fixes a bug where config reloads would kill persisted services/checks. Also fixes an edge case: 1. A service or check is registered via the HTTP API 2. A new service or check definition with the same ID is added to config 3. Config is reloaded The desired behavior (which this implements) is: 1. All services and checks deregistered in memory 2. All services and checks in config are registered first 3. All persisted checks are restored using the same logic as the agent start sequence, which prioritizes config over persisted, and removes any persistence files if new config counterparts are present. --- command/agent/agent.go | 98 +++++++++++++++++++++++---------- command/agent/agent_endpoint.go | 4 +- command/agent/agent_test.go | 76 +++++++++++++++++++++---- command/agent/command.go | 34 +++--------- 4 files changed, 143 insertions(+), 69 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 0b5405da50..bb2c9d63cb 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -141,31 +141,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { return nil, err } - // Register the services from config - for _, service := range config.Services { - ns := service.NodeService() - chkType := service.CheckType() - if err := agent.AddService(ns, chkType, false); err != nil { - return nil, fmt.Errorf( - "Failed to register service '%s': %v", service.Name, err) - } - } - - // Register the checks from config - for _, check := range config.Checks { - health := check.HealthCheck(config.NodeName) - chkType := &check.CheckType - if err := agent.AddCheck(health, chkType, false); err != nil { - return nil, fmt.Errorf( - "Failed to register check '%s': %v %v", check.Name, err, check) - } - } - - // Load any persisted services or checks - if err := agent.restoreServices(); err != nil { + // Load checks/services + if err := agent.reloadServices(config); err != nil { return nil, err } - if err := agent.restoreChecks(); err != nil { + if err := agent.reloadChecks(config); err != nil { return nil, err } @@ -705,7 +685,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType, per // RemoveService is used to remove a service entry. // The agent will make a best effort to ensure it is deregistered -func (a *Agent) RemoveService(serviceID string) error { +func (a *Agent) RemoveService(serviceID string, persist bool) error { // Protect "consul" service from deletion by a user if a.server != nil && serviceID == consul.ConsulServiceID { return fmt.Errorf( @@ -717,13 +697,15 @@ func (a *Agent) RemoveService(serviceID string) error { a.state.RemoveService(serviceID) // Remove the service from the data dir - if err := a.purgeService(serviceID); err != nil { - return err + if persist { + if err := a.purgeService(serviceID); err != nil { + return err + } } // Deregister any associated health checks checkID := fmt.Sprintf("service:%s", serviceID) - return a.RemoveCheck(checkID) + return a.RemoveCheck(checkID, persist) } // AddCheck is used to add a health check to the agent. @@ -792,7 +774,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist // RemoveCheck is used to remove a health check. // The agent will make a best effort to ensure it is deregistered -func (a *Agent) RemoveCheck(checkID string) error { +func (a *Agent) RemoveCheck(checkID string, persist bool) error { // Add to the local state for anti-entropy a.state.RemoveCheck(checkID) @@ -808,7 +790,10 @@ func (a *Agent) RemoveCheck(checkID string) error { check.Stop() delete(a.checkTTLs, checkID) } - return a.purgeCheck(checkID) + if persist { + return a.purgeCheck(checkID) + } + return nil } // UpdateCheck is used to update the status of a check. @@ -904,3 +889,58 @@ func (a *Agent) deletePid() error { } return nil } + +// reloadServices reloads all known services from config and state. It is used +// at initial agent startup as well as during config reloads. +func (a *Agent) reloadServices(conf *Config) error { + for _, service := range a.state.Services() { + if service.ID == consul.ConsulServiceID { + continue + } + if err := a.RemoveService(service.ID, false); err != nil { + return fmt.Errorf("Failed deregistering service '%s': %v", service.ID, err) + } + } + + // Register the services from config + for _, service := range conf.Services { + ns := service.NodeService() + chkType := service.CheckType() + if err := a.AddService(ns, chkType, false); err != nil { + return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) + } + } + + // Load any persisted services + if err := a.restoreServices(); err != nil { + return fmt.Errorf("Failed restoring services: %s", err) + } + + return nil +} + +// reloadChecks reloads all known checks from config and state. It can be used +// during initial agent start or for config reloads. +func (a *Agent) reloadChecks(conf *Config) error { + for _, check := range a.state.Checks() { + if err := a.RemoveCheck(check.CheckID, false); err != nil { + return fmt.Errorf("Failed deregistering check '%s': %s", check.CheckID, err) + } + } + + // Register the checks from config + for _, check := range conf.Checks { + health := check.HealthCheck(conf.NodeName) + chkType := &check.CheckType + if err := a.AddCheck(health, chkType, false); err != nil { + return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) + } + } + + // Load any persisted checks + if err := a.restoreChecks(); err != nil { + return fmt.Errorf("Failed restoring checks: %s", err) + } + + return nil +} diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index de3c24157f..4ee70343fd 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -102,7 +102,7 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) { checkID := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/deregister/") - return nil, s.agent.RemoveCheck(checkID) + return nil, s.agent.RemoveCheck(checkID, true) } func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -174,5 +174,5 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) { serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/deregister/") - return nil, s.agent.RemoveService(serviceID) + return nil, s.agent.RemoveService(serviceID, true) } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 5200df3a87..b7571a0855 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -181,12 +181,12 @@ func TestAgent_RemoveService(t *testing.T) { defer agent.Shutdown() // Remove a service that doesn't exist - if err := agent.RemoveService("redis"); err != nil { + if err := agent.RemoveService("redis", false); err != nil { t.Fatalf("err: %v", err) } // Remove the consul service - if err := agent.RemoveService("consul"); err == nil { + if err := agent.RemoveService("consul", false); err == nil { t.Fatalf("should have errored") } @@ -201,7 +201,7 @@ func TestAgent_RemoveService(t *testing.T) { } // Remove the service - if err := agent.RemoveService("redis"); err != nil { + if err := agent.RemoveService("redis", false); err != nil { t.Fatalf("err: %v", err) } @@ -291,7 +291,7 @@ func TestAgent_RemoveCheck(t *testing.T) { defer agent.Shutdown() // Remove check that doesn't exist - if err := agent.RemoveCheck("mem"); err != nil { + if err := agent.RemoveCheck("mem", false); err != nil { t.Fatalf("err: %v", err) } @@ -311,7 +311,7 @@ func TestAgent_RemoveCheck(t *testing.T) { } // Remove check - if err := agent.RemoveCheck("mem"); err != nil { + if err := agent.RemoveCheck("mem", false); err != nil { t.Fatalf("err: %v", err) } @@ -438,14 +438,40 @@ func TestAgent_PersistService(t *testing.T) { if _, ok := agent2.state.services[svc.ID]; !ok { t.Fatalf("bad: %#v", agent2.state.services) } +} - // Should remove the service file - if err := agent2.RemoveService(svc.ID); err != nil { +func TestAgent_PurgeService(t *testing.T) { + config := nextConfig() + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + + file := filepath.Join(agent.config.DataDir, servicesDir, svc.ID) + if err := agent.AddService(svc, nil, true); err != nil { + t.Fatalf("err: %v", err) + } + + // Not removed + if err := agent.RemoveService(svc.ID, false); err != nil { t.Fatalf("err: %s", err) } - if _, err := os.Stat(file); !os.IsNotExist(err) { + if _, err := os.Stat(file); err != nil { t.Fatalf("err: %s", err) } + + // Removed + if err := agent.RemoveService(svc.ID, true); err != nil { + t.Fatalf("err: %s", err) + } + if _, err := os.Stat(file); !os.IsNotExist(err) { + t.Fatalf("bad: %#v", err) + } } func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { @@ -555,14 +581,42 @@ func TestAgent_PersistCheck(t *testing.T) { if result.Status != structs.HealthCritical { t.Fatalf("bad: %#v", result) } +} + +func TestAgent_PurgeCheck(t *testing.T) { + config := nextConfig() + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + + check := &structs.HealthCheck{ + Node: config.NodeName, + CheckID: "service:redis1", + Name: "redischeck", + Status: structs.HealthPassing, + ServiceID: "redis", + ServiceName: "redis", + } - // Should remove the service file - if err := agent2.RemoveCheck(check.CheckID); err != nil { + file := filepath.Join(agent.config.DataDir, checksDir, check.CheckID) + if err := agent.AddCheck(check, nil, true); err != nil { + t.Fatalf("err: %v", err) + } + + // Not removed + if err := agent.RemoveCheck(check.CheckID, false); err != nil { t.Fatalf("err: %s", err) } - if _, err := os.Stat(file); !os.IsNotExist(err) { + if _, err := os.Stat(file); err != nil { + t.Fatalf("err: %s", err) + } + + // Removed + if err := agent.RemoveCheck(check.CheckID, true); err != nil { t.Fatalf("err: %s", err) } + if _, err := os.Stat(file); !os.IsNotExist(err) { + t.Fatalf("bad: %#v", err) + } } func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { diff --git a/command/agent/command.go b/command/agent/command.go index 03e92e11e2..c537dc3fa0 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -722,34 +722,14 @@ func (c *Command) handleReload(config *Config) *Config { c.agent.PauseSync() defer c.agent.ResumeSync() - // Deregister the old services - for _, service := range config.Services { - ns := service.NodeService() - c.agent.RemoveService(ns.ID) - } - - // Deregister the old checks - for _, check := range config.Checks { - health := check.HealthCheck(config.NodeName) - c.agent.RemoveCheck(health.CheckID) - } - - // Register the services - for _, service := range newConf.Services { - ns := service.NodeService() - chkType := service.CheckType() - if err := c.agent.AddService(ns, chkType, false); err != nil { - c.Ui.Error(fmt.Sprintf("Failed to register service '%s': %v", service.Name, err)) - } + // Reload services and check definitions + if err := c.agent.reloadServices(newConf); err != nil { + c.Ui.Error(fmt.Sprintf("Failed reloading services: %s", err)) + return nil } - - // Register the checks - for _, check := range newConf.Checks { - health := check.HealthCheck(config.NodeName) - chkType := &check.CheckType - if err := c.agent.AddCheck(health, chkType, false); err != nil { - c.Ui.Error(fmt.Sprintf("Failed to register check '%s': %v %v", check.Name, err, check)) - } + if err := c.agent.reloadChecks(newConf); err != nil { + c.Ui.Error(fmt.Sprintf("Failed reloading checks: %s", err)) + return nil } // Get the new client listener addr