From 46a5272a8a3b4c90b151ea4130ed62971d765144 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Mon, 24 Nov 2014 00:36:03 -0800 Subject: [PATCH] agent: first pass at local service and check persistence --- command/agent/agent.go | 165 +++++++++++++++++++++++++++++++++++- command/agent/agent_test.go | 112 ++++++++++++++++++++++++ 2 files changed, 275 insertions(+), 2 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index bdd2197e6f..54b3f95456 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1,6 +1,7 @@ package agent import ( + "encoding/json" "fmt" "io" "log" @@ -15,6 +16,14 @@ import ( "github.com/hashicorp/serf/serf" ) +const ( + // Path to save agent service definitions + servicesDir = "services" + + // Path to save local agent checks + checksDir = "checks" +) + /* The agent is the long running process that is run on every machine. It exposes an RPC interface that is used by the CLI to control the @@ -132,6 +141,14 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { return nil, err } + // Load any persisted services and services + if err := agent.restoreServices(); err != nil { + return nil, err + } + if err := agent.restoreChecks(); err != nil { + return nil, err + } + // Start handling events go agent.handleEvents() @@ -472,6 +489,140 @@ func (a *Agent) ResumeSync() { a.state.Resume() } +// persistService saves a service definition to a JSON file in the data dir +func (a *Agent) persistService(service *structs.NodeService) error { + svcPath := filepath.Join(a.config.DataDir, servicesDir, service.ID) + if _, err := os.Stat(svcPath); os.IsNotExist(err) { + encoded, err := json.Marshal(service) + if err != nil { + return nil + } + if err := os.MkdirAll(filepath.Dir(svcPath), 0700); err != nil { + return err + } + fh, err := os.OpenFile(svcPath, os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return err + } + defer fh.Close() + if _, err := fh.Write(encoded); err != nil { + return err + } + } + return nil +} + +// purgeService removes a persisted service definition file from the data dir +func (a *Agent) purgeService(serviceID string) error { + svcPath := filepath.Join(a.config.DataDir, servicesDir, serviceID) + if _, err := os.Stat(svcPath); err == nil { + return os.Remove(svcPath) + } + return nil +} + +// restoreServices is used to load previously persisted service definitions +// into the agent during startup. +func (a *Agent) restoreServices() error { + svcDir := filepath.Join(a.config.DataDir, servicesDir) + if _, err := os.Stat(svcDir); os.IsNotExist(err) { + return nil + } + + err := filepath.Walk(svcDir, func(path string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + if fi.Name() == servicesDir { + return nil + } + fh, err := os.Open(filepath.Join(svcDir, fi.Name())) + if err != nil { + return err + } + content := make([]byte, fi.Size()) + if _, err := fh.Read(content); err != nil { + return err + } + + var svc *structs.NodeService + if err := json.Unmarshal(content, &svc); err != nil { + return err + } + + a.logger.Printf("[DEBUG] Restored service definition: %s", svc.ID) + return a.AddService(svc, nil) + }) + return err +} + +// persistCheck saves a check definition to the local agent's state directory +func (a *Agent) persistCheck(check *structs.HealthCheck) error { + checkPath := filepath.Join(a.config.DataDir, checksDir, check.CheckID) + if _, err := os.Stat(checkPath); os.IsNotExist(err) { + encoded, err := json.Marshal(check) + if err != nil { + return nil + } + if err := os.MkdirAll(filepath.Dir(checkPath), 0700); err != nil { + return err + } + fh, err := os.OpenFile(checkPath, os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return err + } + defer fh.Close() + if _, err := fh.Write(encoded); err != nil { + return err + } + } + return nil +} + +// purgeCheck removes a persisted check definition file from the data dir +func (a *Agent) purgeCheck(checkID string) error { + checkPath := filepath.Join(a.config.DataDir, checksDir, checkID) + if _, err := os.Stat(checkPath); err == nil { + return os.Remove(checkPath) + } + return nil +} + +// restoreChecks is used to load previously persisted health check definitions +// into the agent during startup. +func (a *Agent) restoreChecks() error { + checkDir := filepath.Join(a.config.DataDir, checksDir) + if _, err := os.Stat(checkDir); os.IsNotExist(err) { + return nil + } + + err := filepath.Walk(checkDir, func(path string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + if fi.Name() == checksDir { + return nil + } + fh, err := os.Open(filepath.Join(checkDir, fi.Name())) + if err != nil { + return err + } + content := make([]byte, fi.Size()) + if _, err := fh.Read(content); err != nil { + return err + } + + var check *structs.HealthCheck + if err := json.Unmarshal(content, &check); err != nil { + return err + } + + a.logger.Printf("[DEBUG] Restored health check: %s", check.CheckID) + return a.AddCheck(check, nil) + }) + return err +} + // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered @@ -489,6 +640,9 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) err // Add the service a.state.AddService(service) + // Persist the service to a file + a.persistService(service) + // Create an associated health check if chkType != nil { check := &structs.HealthCheck{ @@ -520,6 +674,11 @@ func (a *Agent) RemoveService(serviceID string) error { // Remove service immeidately a.state.RemoveService(serviceID) + // Remove the service from the data dir + if err := a.purgeService(serviceID); err != nil { + return err + } + // Deregister any associated health checks checkID := fmt.Sprintf("service:%s", serviceID) return a.RemoveCheck(checkID) @@ -580,7 +739,9 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error { // Add to the local state for anti-entropy a.state.AddCheck(check) - return nil + + // Persist the check + return a.persistCheck(check) } // RemoveCheck is used to remove a health check. @@ -601,7 +762,7 @@ func (a *Agent) RemoveCheck(checkID string) error { check.Stop() delete(a.checkTTLs, checkID) } - return nil + return a.purgeCheck(checkID) } // UpdateCheck is used to update the status of a check. diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 547232431f..9829fedda0 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1,6 +1,8 @@ package agent import ( + "bytes" + "encoding/json" "fmt" "io" "io/ioutil" @@ -380,3 +382,113 @@ func TestAgent_ConsulService(t *testing.T) { t.Fatalf("%s service should be in sync", consul.ConsulServiceID) } } + +func TestAgent_PersistService(t *testing.T) { + config := nextConfig() + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + + if err := agent.AddService(svc, nil); err != nil { + t.Fatalf("err: %v", err) + } + + file := filepath.Join(agent.config.DataDir, servicesDir, svc.ID) + if _, err := os.Stat(file); err != nil { + t.Fatalf("err: %s", err) + } + + expected, err := json.Marshal(svc) + if err != nil { + t.Fatalf("err: %s", err) + } + content, err := ioutil.ReadFile(file) + if err != nil { + t.Fatalf("err: %s", err) + } + if !bytes.Equal(expected, content) { + t.Fatalf("bad: %s", string(content)) + } + agent.Shutdown() + + // Should load it back during later start + agent2, err := Create(config, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + defer agent2.Shutdown() + + if _, ok := agent2.state.services[svc.ID]; !ok { + t.Fatalf("bad: %#v", agent2.state.services) + } + + // Should remove the service file + if err := agent2.RemoveService(svc.ID); err != nil { + t.Fatalf("err: %s", err) + } + if _, err := os.Stat(file); !os.IsNotExist(err) { + t.Fatalf("err: %s", err) + } +} + +func TestAgent_PersistCheck(t *testing.T) { + config := nextConfig() + dir, agent := makeAgent(t, config) + defer os.RemoveAll(dir) + + check := &structs.HealthCheck{ + Node: config.NodeName, + CheckID: "service:redis1", + Name: "redischeck", + Status: structs.HealthPassing, + ServiceID: "redis", + ServiceName: "redis", + } + + if err := agent.AddCheck(check, nil); err != nil { + t.Fatalf("err: %v", err) + } + + file := filepath.Join(agent.config.DataDir, checksDir, check.CheckID) + if _, err := os.Stat(file); err != nil { + t.Fatalf("err: %s", err) + } + + expected, err := json.Marshal(check) + if err != nil { + t.Fatalf("err: %s", err) + } + content, err := ioutil.ReadFile(file) + if err != nil { + t.Fatalf("err: %s", err) + } + if !bytes.Equal(expected, content) { + t.Fatalf("bad: %s", string(content)) + } + agent.Shutdown() + + // Should load it back during later start + agent2, err := Create(config, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + defer agent2.Shutdown() + + if _, ok := agent2.state.checks[check.CheckID]; !ok { + t.Fatalf("bad: %#v", agent2.state.checks) + } + + // Should remove the service file + if err := agent2.RemoveCheck(check.CheckID); err != nil { + t.Fatalf("err: %s", err) + } + if _, err := os.Stat(file); !os.IsNotExist(err) { + t.Fatalf("err: %s", err) + } +}