agent: Adding methods to register services and checks

pull/19/head
Armon Dadgar 2014-01-30 13:39:02 -08:00
parent 438c9537d3
commit 1eb0a76ee8
4 changed files with 332 additions and 3 deletions

View File

@ -3,6 +3,7 @@ package agent
import (
"fmt"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
"io"
"log"
@ -306,3 +307,140 @@ func (a *Agent) StartSync() {
// Start the anti entropy routine
go a.state.antiEntropy(a.shutdownCh)
}
// AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) error {
if service.Service == "" {
return fmt.Errorf("Service name missing")
}
if service.ID == "" && service.Service != "" {
service.ID = service.Service
}
if chkType != nil && !chkType.Valid() {
return fmt.Errorf("Check type is not valid")
}
// Add the service
a.state.AddService(service)
// Create an associated health check
if chkType != nil {
check := &structs.HealthCheck{
Node: a.config.NodeName,
CheckID: fmt.Sprintf("service:%s", service.ID),
Name: fmt.Sprintf("Service '%s' check", service.Service),
Status: structs.HealthUnknown,
Notes: "Initializing",
ServiceID: service.ID,
ServiceName: service.Service,
}
if err := a.AddCheck(check, chkType); err != nil {
a.state.RemoveService(service.ID)
return err
}
}
return nil
}
// RemoveService is used to remove a service entry.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveService(serviceID string) error {
// Remove service immeidately
a.state.RemoveService(serviceID)
// Deregister any associated health checks
checkID := fmt.Sprintf("service:%s", serviceID)
return a.RemoveCheck(checkID)
}
// AddCheck is used to add a health check to the agent.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered. The Check may include a CheckType which
// is used to automatically update the check status
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error {
if check.CheckID == "" {
return fmt.Errorf("CheckID missing")
}
if chkType != nil && !chkType.Valid() {
return fmt.Errorf("Check type is not valid")
}
a.checkLock.Lock()
defer a.checkLock.Unlock()
// Check if already registered
if chkType != nil {
if chkType.IsTTL() {
if _, ok := a.checkTTLs[check.CheckID]; ok {
return fmt.Errorf("CheckID is already registered")
}
ttl := &CheckTTL{
Notify: &a.state,
CheckID: check.CheckID,
TTL: chkType.TTL,
Logger: a.logger,
}
ttl.Start()
a.checkTTLs[check.CheckID] = ttl
} else {
if _, ok := a.checkMonitors[check.CheckID]; ok {
return fmt.Errorf("CheckID is already registered")
}
monitor := &CheckMonitor{
Notify: &a.state,
CheckID: check.CheckID,
Script: chkType.Script,
Interval: chkType.Interval,
Logger: a.logger,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
}
}
// Add to the local state for anti-entropy
a.state.AddCheck(check)
return nil
}
// RemoveCheck is used to remove a health check.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveCheck(checkID string) error {
// Add to the local state for anti-entropy
a.state.RemoveCheck(checkID)
a.checkLock.Lock()
defer a.checkLock.Unlock()
// Stop any monitors
if check, ok := a.checkMonitors[checkID]; ok {
check.Stop()
delete(a.checkMonitors, checkID)
}
if check, ok := a.checkTTLs[checkID]; ok {
check.Stop()
delete(a.checkTTLs, checkID)
}
return nil
}
// UpdateCheck is used to update the status of a check.
// This can only be used with checks of the TTL type.
func (a *Agent) UpdateCheck(checkID, status, output string) error {
a.checkLock.Lock()
defer a.checkLock.Unlock()
check, ok := a.checkTTLs[checkID]
if !ok {
return fmt.Errorf("CheckID does not have associated TTL")
}
// Set the status through CheckTTL to reset the TTL
check.SetStatus(status, output)
return nil
}

View File

@ -3,6 +3,7 @@ package agent
import (
"fmt"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"io"
"io/ioutil"
"os"
@ -98,3 +99,183 @@ func TestAgent_RPCPing(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestAgent_AddService(t *testing.T) {
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
srv := &structs.NodeService{
ID: "redis",
Service: "redis",
Tag: "foo",
Port: 8000,
}
chk := &CheckType{TTL: time.Minute}
err := agent.AddService(srv, chk)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a state mapping
if _, ok := agent.state.Services()["redis"]; !ok {
t.Fatalf("missing redis service")
}
// Ensure we have a check mapping
if _, ok := agent.state.Checks()["service:redis"]; !ok {
t.Fatalf("missing redis check")
}
// Ensure a TTL is setup
if _, ok := agent.checkTTLs["service:redis"]; !ok {
t.Fatalf("missing redis check ttl")
}
}
func TestAgent_RemoveService(t *testing.T) {
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
// Remove a service that doesn't exist
if err := agent.RemoveService("redis"); err != nil {
t.Fatalf("err: %v", err)
}
srv := &structs.NodeService{
ID: "redis",
Service: "redis",
Port: 8000,
}
chk := &CheckType{TTL: time.Minute}
if err := agent.AddService(srv, chk); err != nil {
t.Fatalf("err: %v", err)
}
// Remove the service
if err := agent.RemoveService("redis"); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a state mapping
if _, ok := agent.state.Services()["redis"]; ok {
t.Fatalf("have redis service")
}
// Ensure we have a check mapping
if _, ok := agent.state.Checks()["service:redis"]; ok {
t.Fatalf("have redis check")
}
// Ensure a TTL is setup
if _, ok := agent.checkTTLs["service:redis"]; ok {
t.Fatalf("have redis check ttl")
}
}
func TestAgent_AddCheck(t *testing.T) {
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: structs.HealthUnknown,
}
chk := &CheckType{
Script: "exit 0",
Interval: 15 * time.Second,
}
err := agent.AddCheck(health, chk)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping
if _, ok := agent.state.Checks()["mem"]; !ok {
t.Fatalf("missing mem check")
}
// Ensure a TTL is setup
if _, ok := agent.checkMonitors["mem"]; !ok {
t.Fatalf("missing mem monitor")
}
}
func TestAgent_RemoveCheck(t *testing.T) {
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
// Remove check that doesn't exist
if err := agent.RemoveCheck("mem"); err != nil {
t.Fatalf("err: %v", err)
}
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: structs.HealthUnknown,
}
chk := &CheckType{
Script: "exit 0",
Interval: 15 * time.Second,
}
err := agent.AddCheck(health, chk)
if err != nil {
t.Fatalf("err: %v", err)
}
// Remove check
if err := agent.RemoveCheck("mem"); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping
if _, ok := agent.state.Checks()["mem"]; ok {
t.Fatalf("have mem check")
}
// Ensure a TTL is setup
if _, ok := agent.checkMonitors["mem"]; ok {
t.Fatalf("have mem monitor")
}
}
func TestAgent_UpdateCheck(t *testing.T) {
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: structs.HealthUnknown,
}
chk := &CheckType{
TTL: 15 * time.Second,
}
err := agent.AddCheck(health, chk)
if err != nil {
t.Fatalf("err: %v", err)
}
// Remove check
if err := agent.UpdateCheck("mem", structs.HealthPassing, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping
status := agent.state.Checks()["mem"]
if status.Status != structs.HealthPassing {
t.Fatalf("bad: %v", status)
}
if status.Notes != "foo" {
t.Fatalf("bad: %v", status)
}
}

View File

@ -2,6 +2,7 @@ package agent
import (
"bytes"
"fmt"
"github.com/hashicorp/consul/consul/structs"
"log"
"os/exec"
@ -120,7 +121,16 @@ func (c *CheckMonitor) check() {
}
// Wait for the check to complete
err := cmd.Wait()
errCh := make(chan error, 2)
go func() {
errCh <- cmd.Wait()
}()
go func() {
time.Sleep(30 * time.Second)
errCh <- fmt.Errorf("Timed out running check '%s'", c.Script)
}()
err := <-errCh
notes := string(output.Bytes())
c.Logger.Printf("[DEBUG] agent: check '%s' script '%s' output: %s",
c.CheckID, c.Script, notes)

View File

@ -126,7 +126,7 @@ type Member struct {
Name string
Addr net.IP
Port uint16
Role string
Tags map[string]string
Status string
ProtocolMin uint8
ProtocolMax uint8
@ -436,7 +436,7 @@ func formatMembers(raw []serf.Member, client *rpcClient, seq uint64) error {
Name: m.Name,
Addr: m.Addr,
Port: m.Port,
Role: m.Role,
Tags: m.Tags,
Status: m.Status.String(),
ProtocolMin: m.ProtocolMin,
ProtocolMax: m.ProtocolMax,