diff --git a/command/agent/agent.go b/command/agent/agent.go index af872ef224..ba0b24cd29 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -3,6 +3,7 @@ package agent import ( "fmt" "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/serf" "io" "log" @@ -306,3 +307,140 @@ func (a *Agent) StartSync() { // Start the anti entropy routine go a.state.antiEntropy(a.shutdownCh) } + +// AddService is used to add a service entry. +// This entry is persistent and the agent will make a best effort to +// ensure it is registered +func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) error { + if service.Service == "" { + return fmt.Errorf("Service name missing") + } + if service.ID == "" && service.Service != "" { + service.ID = service.Service + } + if chkType != nil && !chkType.Valid() { + return fmt.Errorf("Check type is not valid") + } + + // Add the service + a.state.AddService(service) + + // Create an associated health check + if chkType != nil { + check := &structs.HealthCheck{ + Node: a.config.NodeName, + CheckID: fmt.Sprintf("service:%s", service.ID), + Name: fmt.Sprintf("Service '%s' check", service.Service), + Status: structs.HealthUnknown, + Notes: "Initializing", + ServiceID: service.ID, + ServiceName: service.Service, + } + if err := a.AddCheck(check, chkType); err != nil { + a.state.RemoveService(service.ID) + return err + } + } + return nil +} + +// RemoveService is used to remove a service entry. +// The agent will make a best effort to ensure it is deregistered +func (a *Agent) RemoveService(serviceID string) error { + // Remove service immeidately + a.state.RemoveService(serviceID) + + // Deregister any associated health checks + checkID := fmt.Sprintf("service:%s", serviceID) + return a.RemoveCheck(checkID) +} + +// AddCheck is used to add a health check to the agent. +// This entry is persistent and the agent will make a best effort to +// ensure it is registered. The Check may include a CheckType which +// is used to automatically update the check status +func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error { + if check.CheckID == "" { + return fmt.Errorf("CheckID missing") + } + if chkType != nil && !chkType.Valid() { + return fmt.Errorf("Check type is not valid") + } + + a.checkLock.Lock() + defer a.checkLock.Unlock() + + // Check if already registered + if chkType != nil { + if chkType.IsTTL() { + if _, ok := a.checkTTLs[check.CheckID]; ok { + return fmt.Errorf("CheckID is already registered") + } + + ttl := &CheckTTL{ + Notify: &a.state, + CheckID: check.CheckID, + TTL: chkType.TTL, + Logger: a.logger, + } + ttl.Start() + a.checkTTLs[check.CheckID] = ttl + + } else { + if _, ok := a.checkMonitors[check.CheckID]; ok { + return fmt.Errorf("CheckID is already registered") + } + + monitor := &CheckMonitor{ + Notify: &a.state, + CheckID: check.CheckID, + Script: chkType.Script, + Interval: chkType.Interval, + Logger: a.logger, + } + monitor.Start() + a.checkMonitors[check.CheckID] = monitor + } + } + + // Add to the local state for anti-entropy + a.state.AddCheck(check) + return nil +} + +// RemoveCheck is used to remove a health check. +// The agent will make a best effort to ensure it is deregistered +func (a *Agent) RemoveCheck(checkID string) error { + // Add to the local state for anti-entropy + a.state.RemoveCheck(checkID) + + a.checkLock.Lock() + defer a.checkLock.Unlock() + + // Stop any monitors + if check, ok := a.checkMonitors[checkID]; ok { + check.Stop() + delete(a.checkMonitors, checkID) + } + if check, ok := a.checkTTLs[checkID]; ok { + check.Stop() + delete(a.checkTTLs, checkID) + } + return nil +} + +// UpdateCheck is used to update the status of a check. +// This can only be used with checks of the TTL type. +func (a *Agent) UpdateCheck(checkID, status, output string) error { + a.checkLock.Lock() + defer a.checkLock.Unlock() + + check, ok := a.checkTTLs[checkID] + if !ok { + return fmt.Errorf("CheckID does not have associated TTL") + } + + // Set the status through CheckTTL to reset the TTL + check.SetStatus(status, output) + return nil +} diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 8687c37a75..8dde02690b 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -3,6 +3,7 @@ package agent import ( "fmt" "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/consul/structs" "io" "io/ioutil" "os" @@ -98,3 +99,183 @@ func TestAgent_RPCPing(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestAgent_AddService(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + srv := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tag: "foo", + Port: 8000, + } + chk := &CheckType{TTL: time.Minute} + err := agent.AddService(srv, chk) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a state mapping + if _, ok := agent.state.Services()["redis"]; !ok { + t.Fatalf("missing redis service") + } + + // Ensure we have a check mapping + if _, ok := agent.state.Checks()["service:redis"]; !ok { + t.Fatalf("missing redis check") + } + + // Ensure a TTL is setup + if _, ok := agent.checkTTLs["service:redis"]; !ok { + t.Fatalf("missing redis check ttl") + } +} + +func TestAgent_RemoveService(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + // Remove a service that doesn't exist + if err := agent.RemoveService("redis"); err != nil { + t.Fatalf("err: %v", err) + } + + srv := &structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + } + chk := &CheckType{TTL: time.Minute} + if err := agent.AddService(srv, chk); err != nil { + t.Fatalf("err: %v", err) + } + + // Remove the service + if err := agent.RemoveService("redis"); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a state mapping + if _, ok := agent.state.Services()["redis"]; ok { + t.Fatalf("have redis service") + } + + // Ensure we have a check mapping + if _, ok := agent.state.Checks()["service:redis"]; ok { + t.Fatalf("have redis check") + } + + // Ensure a TTL is setup + if _, ok := agent.checkTTLs["service:redis"]; ok { + t.Fatalf("have redis check ttl") + } +} + +func TestAgent_AddCheck(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "mem", + Name: "memory util", + Status: structs.HealthUnknown, + } + chk := &CheckType{ + Script: "exit 0", + Interval: 15 * time.Second, + } + err := agent.AddCheck(health, chk) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a check mapping + if _, ok := agent.state.Checks()["mem"]; !ok { + t.Fatalf("missing mem check") + } + + // Ensure a TTL is setup + if _, ok := agent.checkMonitors["mem"]; !ok { + t.Fatalf("missing mem monitor") + } +} + +func TestAgent_RemoveCheck(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + // Remove check that doesn't exist + if err := agent.RemoveCheck("mem"); err != nil { + t.Fatalf("err: %v", err) + } + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "mem", + Name: "memory util", + Status: structs.HealthUnknown, + } + chk := &CheckType{ + Script: "exit 0", + Interval: 15 * time.Second, + } + err := agent.AddCheck(health, chk) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Remove check + if err := agent.RemoveCheck("mem"); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a check mapping + if _, ok := agent.state.Checks()["mem"]; ok { + t.Fatalf("have mem check") + } + + // Ensure a TTL is setup + if _, ok := agent.checkMonitors["mem"]; ok { + t.Fatalf("have mem monitor") + } +} + +func TestAgent_UpdateCheck(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + health := &structs.HealthCheck{ + Node: "foo", + CheckID: "mem", + Name: "memory util", + Status: structs.HealthUnknown, + } + chk := &CheckType{ + TTL: 15 * time.Second, + } + err := agent.AddCheck(health, chk) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Remove check + if err := agent.UpdateCheck("mem", structs.HealthPassing, "foo"); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a check mapping + status := agent.state.Checks()["mem"] + if status.Status != structs.HealthPassing { + t.Fatalf("bad: %v", status) + } + if status.Notes != "foo" { + t.Fatalf("bad: %v", status) + } +} diff --git a/command/agent/check.go b/command/agent/check.go index b86aa454a7..b358399a8d 100644 --- a/command/agent/check.go +++ b/command/agent/check.go @@ -2,6 +2,7 @@ package agent import ( "bytes" + "fmt" "github.com/hashicorp/consul/consul/structs" "log" "os/exec" @@ -120,7 +121,16 @@ func (c *CheckMonitor) check() { } // Wait for the check to complete - err := cmd.Wait() + errCh := make(chan error, 2) + go func() { + errCh <- cmd.Wait() + }() + go func() { + time.Sleep(30 * time.Second) + errCh <- fmt.Errorf("Timed out running check '%s'", c.Script) + }() + err := <-errCh + notes := string(output.Bytes()) c.Logger.Printf("[DEBUG] agent: check '%s' script '%s' output: %s", c.CheckID, c.Script, notes) diff --git a/command/agent/rpc.go b/command/agent/rpc.go index 5057165597..0c5936a5fb 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -126,7 +126,7 @@ type Member struct { Name string Addr net.IP Port uint16 - Role string + Tags map[string]string Status string ProtocolMin uint8 ProtocolMax uint8 @@ -436,7 +436,7 @@ func formatMembers(raw []serf.Member, client *rpcClient, seq uint64) error { Name: m.Name, Addr: m.Addr, Port: m.Port, - Role: m.Role, + Tags: m.Tags, Status: m.Status.String(), ProtocolMin: m.ProtocolMin, ProtocolMax: m.ProtocolMax,