Merge pull request #2276 from hashicorp/f-deregister-critical

Adds ability to deregister a service based on critical check state longer than a timeout.
pull/2282/head
James Phillips 2016-08-16 12:54:41 -07:00 committed by GitHub
commit 7ca76a613c
12 changed files with 440 additions and 54 deletions

View File

@ -62,8 +62,7 @@ type AgentCheckRegistration struct {
AgentServiceCheck AgentServiceCheck
} }
// AgentServiceCheck is used to create an associated // AgentServiceCheck is used to define a node or service level check
// check for a service
type AgentServiceCheck struct { type AgentServiceCheck struct {
Script string `json:",omitempty"` Script string `json:",omitempty"`
DockerContainerID string `json:",omitempty"` DockerContainerID string `json:",omitempty"`
@ -74,6 +73,14 @@ type AgentServiceCheck struct {
HTTP string `json:",omitempty"` HTTP string `json:",omitempty"`
TCP string `json:",omitempty"` TCP string `json:",omitempty"`
Status string `json:",omitempty"` Status string `json:",omitempty"`
// In Consul 0.7 and later, checks that are associated with a service
// may also contain this optional DeregisterCriticalServiceAfter field,
// which is a timeout in the same Go time format as Interval and TTL. If
// a check is in the critical state for more than this configured value,
// then its associated service (and all of its associated checks) will
// automatically be deregistered.
DeregisterCriticalServiceAfter string `json:",omitempty"`
} }
type AgentServiceChecks []*AgentServiceCheck type AgentServiceChecks []*AgentServiceCheck

View File

@ -455,6 +455,13 @@ func TestAgent_Checks_serviceBound(t *testing.T) {
ServiceID: "redis", ServiceID: "redis",
} }
reg.TTL = "15s" reg.TTL = "15s"
reg.DeregisterCriticalServiceAfter = "nope"
err := agent.CheckRegister(reg)
if err == nil || !strings.Contains(err.Error(), "invalid duration") {
t.Fatalf("err: %v", err)
}
reg.DeregisterCriticalServiceAfter = "90m"
if err := agent.CheckRegister(reg); err != nil { if err := agent.CheckRegister(reg); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }

View File

@ -74,6 +74,10 @@ type Agent struct {
// services and checks. Used for anti-entropy. // services and checks. Used for anti-entropy.
state localState state localState
// checkReapAfter maps the check ID to a timeout after which we should
// reap its associated service
checkReapAfter map[types.CheckID]time.Duration
// checkMonitors maps the check ID to an associated monitor // checkMonitors maps the check ID to an associated monitor
checkMonitors map[types.CheckID]*CheckMonitor checkMonitors map[types.CheckID]*CheckMonitor
@ -175,24 +179,25 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
} }
agent := &Agent{ agent := &Agent{
config: config, config: config,
logger: log.New(logOutput, "", log.LstdFlags), logger: log.New(logOutput, "", log.LstdFlags),
logOutput: logOutput, logOutput: logOutput,
checkMonitors: make(map[types.CheckID]*CheckMonitor), checkReapAfter: make(map[types.CheckID]time.Duration),
checkTTLs: make(map[types.CheckID]*CheckTTL), checkMonitors: make(map[types.CheckID]*CheckMonitor),
checkHTTPs: make(map[types.CheckID]*CheckHTTP), checkTTLs: make(map[types.CheckID]*CheckTTL),
checkTCPs: make(map[types.CheckID]*CheckTCP), checkHTTPs: make(map[types.CheckID]*CheckHTTP),
checkDockers: make(map[types.CheckID]*CheckDocker), checkTCPs: make(map[types.CheckID]*CheckTCP),
eventCh: make(chan serf.UserEvent, 1024), checkDockers: make(map[types.CheckID]*CheckDocker),
eventBuf: make([]*UserEvent, 256), eventCh: make(chan serf.UserEvent, 1024),
shutdownCh: make(chan struct{}), eventBuf: make([]*UserEvent, 256),
endpoints: make(map[string]string), shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
} }
// Initialize the local state // Initialize the local state.
agent.state.Init(config, agent.logger) agent.state.Init(config, agent.logger)
// Setup either the client or the server // Setup either the client or the server.
var err error var err error
if config.Server { if config.Server {
err = agent.setupServer() err = agent.setupServer()
@ -214,7 +219,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err return nil, err
} }
// Load checks/services // Load checks/services.
if err := agent.loadServices(config); err != nil { if err := agent.loadServices(config); err != nil {
return nil, err return nil, err
} }
@ -222,7 +227,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err return nil, err
} }
// Start handling events // Start watching for critical services to deregister, based on their
// checks.
go agent.reapServices()
// Start handling events.
go agent.handleEvents() go agent.handleEvents()
// Start sending network coordinate to the server. // Start sending network coordinate to the server.
@ -230,7 +239,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
go agent.sendCoordinate() go agent.sendCoordinate()
} }
// Write out the PID file if necessary // Write out the PID file if necessary.
err = agent.storePid() err = agent.storePid()
if err != nil { if err != nil {
return nil, err return nil, err
@ -665,6 +674,53 @@ func (a *Agent) sendCoordinate() {
} }
} }
// reapServicesInternal does a single pass, looking for services to reap.
func (a *Agent) reapServicesInternal() {
reaped := make(map[string]struct{})
for checkID, check := range a.state.CriticalChecks() {
// There's nothing to do if there's no service.
if check.Check.ServiceID == "" {
continue
}
// There might be multiple checks for one service, so
// we don't need to reap multiple times.
serviceID := check.Check.ServiceID
if _, ok := reaped[serviceID]; ok {
continue
}
// See if there's a timeout.
a.checkLock.Lock()
timeout, ok := a.checkReapAfter[checkID]
a.checkLock.Unlock()
// Reap, if necessary. We keep track of which service
// this is so that we won't try to remove it again.
if ok && check.CriticalFor > timeout {
reaped[serviceID] = struct{}{}
a.RemoveService(serviceID, true)
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
checkID, serviceID)
}
}
}
// reapServices is a long running goroutine that looks for checks that have been
// critical too long and dregisters their associated services.
func (a *Agent) reapServices() {
for {
select {
case <-time.After(a.config.CheckReapInterval):
a.reapServicesInternal()
case <-a.shutdownCh:
return
}
}
}
// persistService saves a service definition to a JSON file in the data dir // persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error { func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
@ -988,6 +1044,18 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
} else { } else {
return fmt.Errorf("Check type is not valid") return fmt.Errorf("Check type is not valid")
} }
if chkType.DeregisterCriticalServiceAfter > 0 {
timeout := chkType.DeregisterCriticalServiceAfter
if timeout < a.config.CheckDeregisterIntervalMin {
timeout = a.config.CheckDeregisterIntervalMin
a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has deregister interval below minimum of %v",
check.CheckID, a.config.CheckDeregisterIntervalMin))
}
a.checkReapAfter[check.CheckID] = timeout
} else {
delete(a.checkReapAfter, check.CheckID)
}
} }
// Add to the local state for anti-entropy // Add to the local state for anti-entropy
@ -1016,6 +1084,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
defer a.checkLock.Unlock() defer a.checkLock.Unlock()
// Stop any monitors // Stop any monitors
delete(a.checkReapAfter, checkID)
if check, ok := a.checkMonitors[checkID]; ok { if check, ok := a.checkMonitors[checkID]; ok {
check.Stop() check.Stop()
delete(a.checkMonitors, checkID) delete(a.checkMonitors, checkID)
@ -1044,25 +1113,27 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
return nil return nil
} }
// UpdateCheck is used to update the status of a check. // updateTTLCheck is used to update the status of a TTL check via the Agent API.
// This can only be used with checks of the TTL type. func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error {
func (a *Agent) UpdateCheck(checkID types.CheckID, status, output string) error {
a.checkLock.Lock() a.checkLock.Lock()
defer a.checkLock.Unlock() defer a.checkLock.Unlock()
// Grab the TTL check.
check, ok := a.checkTTLs[checkID] check, ok := a.checkTTLs[checkID]
if !ok { if !ok {
return fmt.Errorf("CheckID %q does not have associated TTL", checkID) return fmt.Errorf("CheckID %q does not have associated TTL", checkID)
} }
// Set the status through CheckTTL to reset the TTL // Set the status through CheckTTL to reset the TTL.
check.SetStatus(status, output) check.SetStatus(status, output)
// We don't write any files in dev mode so bail here.
if a.config.DevMode { if a.config.DevMode {
return nil return nil
} }
// Always persist the state for TTL checks // Persist the state so the TTL check can come up in a good state after
// an agent restart, especially with long TTL values.
if err := a.persistCheckState(check, status, output); err != nil { if err := a.persistCheckState(check, status, output); err != nil {
return fmt.Errorf("failed persisting state for check %q: %s", checkID, err) return fmt.Errorf("failed persisting state for check %q: %s", checkID, err)
} }

View File

@ -144,7 +144,7 @@ func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Re
func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/")) checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/"))
note := req.URL.Query().Get("note") note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthPassing, note); err != nil { if err := s.agent.updateTTLCheck(checkID, structs.HealthPassing, note); err != nil {
return nil, err return nil, err
} }
s.syncChanges() s.syncChanges()
@ -154,7 +154,7 @@ func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request)
func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/")) checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/"))
note := req.URL.Query().Get("note") note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthWarning, note); err != nil { if err := s.agent.updateTTLCheck(checkID, structs.HealthWarning, note); err != nil {
return nil, err return nil, err
} }
s.syncChanges() s.syncChanges()
@ -164,7 +164,7 @@ func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request)
func (s *HTTPServer) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/")) checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/"))
note := req.URL.Query().Get("note") note := req.URL.Query().Get("note")
if err := s.agent.UpdateCheck(checkID, structs.HealthCritical, note); err != nil { if err := s.agent.updateTTLCheck(checkID, structs.HealthCritical, note); err != nil {
return nil, err return nil, err
} }
s.syncChanges() s.syncChanges()
@ -216,7 +216,7 @@ func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Reques
} }
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/")) checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
if err := s.agent.UpdateCheck(checkID, update.Status, update.Output); err != nil { if err := s.agent.updateTTLCheck(checkID, update.Status, update.Output); err != nil {
return nil, err return nil, err
} }
s.syncChanges() s.syncChanges()

View File

@ -642,7 +642,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
} }
} }
func TestAgent_UpdateCheck(t *testing.T) { func TestAgent_updateTTLCheck(t *testing.T) {
dir, agent := makeAgent(t, nextConfig()) dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer agent.Shutdown() defer agent.Shutdown()
@ -656,17 +656,17 @@ func TestAgent_UpdateCheck(t *testing.T) {
chk := &CheckType{ chk := &CheckType{
TTL: 15 * time.Second, TTL: 15 * time.Second,
} }
// Add check and update it.
err := agent.AddCheck(health, chk, false, "") err := agent.AddCheck(health, chk, false, "")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := agent.updateTTLCheck("mem", structs.HealthPassing, "foo"); err != nil {
// Remove check
if err := agent.UpdateCheck("mem", structs.HealthPassing, "foo"); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Ensure we have a check mapping // Ensure we have a check mapping.
status := agent.state.Checks()["mem"] status := agent.state.Checks()["mem"]
if status.Status != structs.HealthPassing { if status.Status != structs.HealthPassing {
t.Fatalf("bad: %v", status) t.Fatalf("bad: %v", status)
@ -1248,7 +1248,7 @@ func TestAgent_unloadServices(t *testing.T) {
} }
} }
func TestAgent_ServiceMaintenanceMode(t *testing.T) { func TestAgent_Service_MaintenanceMode(t *testing.T) {
config := nextConfig() config := nextConfig()
dir, agent := makeAgent(t, config) dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
@ -1313,6 +1313,133 @@ func TestAgent_ServiceMaintenanceMode(t *testing.T) {
} }
} }
func TestAgent_Service_Reap(t *testing.T) {
config := nextConfig()
config.CheckReapInterval = time.Millisecond
config.CheckDeregisterIntervalMin = 0
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
chkTypes := CheckTypes{
&CheckType{
Status: structs.HealthPassing,
TTL: 10 * time.Millisecond,
DeregisterCriticalServiceAfter: 100 * time.Millisecond,
},
}
// Register the service.
if err := agent.AddService(svc, chkTypes, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's there and there's no critical check yet.
if _, ok := agent.state.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := agent.state.CriticalChecks(); len(checks) > 0 {
t.Fatalf("should not have critical checks")
}
// Wait for the check TTL to fail.
time.Sleep(30 * time.Millisecond)
if _, ok := agent.state.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := agent.state.CriticalChecks(); len(checks) != 1 {
t.Fatalf("should have a critical check")
}
// Pass the TTL.
if err := agent.updateTTLCheck("service:redis", structs.HealthPassing, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := agent.state.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := agent.state.CriticalChecks(); len(checks) > 0 {
t.Fatalf("should not have critical checks")
}
// Wait for the check TTL to fail again.
time.Sleep(30 * time.Millisecond)
if _, ok := agent.state.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := agent.state.CriticalChecks(); len(checks) != 1 {
t.Fatalf("should have a critical check")
}
// Wait for the reap.
time.Sleep(300 * time.Millisecond)
if _, ok := agent.state.Services()["redis"]; ok {
t.Fatalf("redis service should have been reaped")
}
if checks := agent.state.CriticalChecks(); len(checks) > 0 {
t.Fatalf("should not have critical checks")
}
}
func TestAgent_Service_NoReap(t *testing.T) {
config := nextConfig()
config.CheckReapInterval = time.Millisecond
config.CheckDeregisterIntervalMin = 0
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
chkTypes := CheckTypes{
&CheckType{
Status: structs.HealthPassing,
TTL: 10 * time.Millisecond,
},
}
// Register the service.
if err := agent.AddService(svc, chkTypes, false, ""); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's there and there's no critical check yet.
if _, ok := agent.state.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := agent.state.CriticalChecks(); len(checks) > 0 {
t.Fatalf("should not have critical checks")
}
// Wait for the check TTL to fail.
time.Sleep(30 * time.Millisecond)
if _, ok := agent.state.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := agent.state.CriticalChecks(); len(checks) != 1 {
t.Fatalf("should have a critical check")
}
// Wait a while and make sure it doesn't reap.
time.Sleep(300 * time.Millisecond)
if _, ok := agent.state.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := agent.state.CriticalChecks(); len(checks) != 1 {
t.Fatalf("should have a critical check")
}
}
func TestAgent_addCheck_restoresSnapshot(t *testing.T) { func TestAgent_addCheck_restoresSnapshot(t *testing.T) {
config := nextConfig() config := nextConfig()
dir, agent := makeAgent(t, config) dir, agent := makeAgent(t, config)

View File

@ -35,12 +35,11 @@ const (
HttpUserAgent = "Consul Health Check" HttpUserAgent = "Consul Health Check"
) )
// CheckType is used to create either the CheckMonitor // CheckType is used to create either the CheckMonitor or the CheckTTL.
// or the CheckTTL. // Five types are supported: Script, HTTP, TCP, Docker and TTL. Script, HTTP,
// Five types are supported: Script, HTTP, TCP, Docker and TTL // Docker and TCP all require Interval. Only one of the types may to be
// Script, HTTP, Docker and TCP all require Interval // provided: TTL or Script/Interval or HTTP/Interval or TCP/Interval or
// Only one of the types needs to be provided // Docker/Interval.
// TTL or Script/Interval or HTTP/Interval or TCP/Interval or Docker/Interval
type CheckType struct { type CheckType struct {
Script string Script string
HTTP string HTTP string
@ -52,6 +51,11 @@ type CheckType struct {
Timeout time.Duration Timeout time.Duration
TTL time.Duration TTL time.Duration
// DeregisterCriticalServiceAfter, if >0, will cause the associated
// service, if any, to be deregistered if this check is critical for
// longer than this duration.
DeregisterCriticalServiceAfter time.Duration
Status string Status string
Notes string Notes string

View File

@ -422,6 +422,14 @@ type Config struct {
CheckUpdateInterval time.Duration `mapstructure:"-"` CheckUpdateInterval time.Duration `mapstructure:"-"`
CheckUpdateIntervalRaw string `mapstructure:"check_update_interval" json:"-"` CheckUpdateIntervalRaw string `mapstructure:"check_update_interval" json:"-"`
// CheckReapInterval controls the interval on which we will look for
// failed checks and reap their associated services, if so configured.
CheckReapInterval time.Duration `mapstructure:"-"`
// CheckDeregisterIntervalMin is the smallest allowed interval to set
// a check's DeregisterCriticalServiceAfter value to.
CheckDeregisterIntervalMin time.Duration `mapstructure:"-"`
// ACLToken is the default token used to make requests if a per-request // ACLToken is the default token used to make requests if a per-request
// token is not provided. If not configured the 'anonymous' token is used. // token is not provided. If not configured the 'anonymous' token is used.
ACLToken string `mapstructure:"acl_token" json:"-"` ACLToken string `mapstructure:"acl_token" json:"-"`
@ -632,11 +640,13 @@ func DefaultConfig() *Config {
Telemetry: Telemetry{ Telemetry: Telemetry{
StatsitePrefix: "consul", StatsitePrefix: "consul",
}, },
SyslogFacility: "LOCAL0", SyslogFacility: "LOCAL0",
Protocol: consul.ProtocolVersion2Compatible, Protocol: consul.ProtocolVersion2Compatible,
CheckUpdateInterval: 5 * time.Minute, CheckUpdateInterval: 5 * time.Minute,
AEInterval: time.Minute, CheckDeregisterIntervalMin: time.Minute,
DisableCoordinates: false, CheckReapInterval: 30 * time.Second,
AEInterval: time.Minute,
DisableCoordinates: false,
// SyncCoordinateRateTarget is set based on the rate that we want // SyncCoordinateRateTarget is set based on the rate that we want
// the server to handle as an aggregate across the entire cluster. // the server to handle as an aggregate across the entire cluster.
@ -975,6 +985,7 @@ AFTER_FIX:
func FixupCheckType(raw interface{}) error { func FixupCheckType(raw interface{}) error {
var ttlKey, intervalKey, timeoutKey string var ttlKey, intervalKey, timeoutKey string
const deregisterKey = "DeregisterCriticalServiceAfter"
// Handle decoding of time durations // Handle decoding of time durations
rawMap, ok := raw.(map[string]interface{}) rawMap, ok := raw.(map[string]interface{})
@ -990,12 +1001,15 @@ func FixupCheckType(raw interface{}) error {
intervalKey = k intervalKey = k
case "timeout": case "timeout":
timeoutKey = k timeoutKey = k
case "deregister_critical_service_after":
rawMap[deregisterKey] = v
delete(rawMap, k)
case "service_id": case "service_id":
rawMap["serviceid"] = v rawMap["serviceid"] = v
delete(rawMap, "service_id") delete(rawMap, k)
case "docker_container_id": case "docker_container_id":
rawMap["DockerContainerID"] = v rawMap["DockerContainerID"] = v
delete(rawMap, "docker_container_id") delete(rawMap, k)
} }
} }
@ -1032,6 +1046,17 @@ func FixupCheckType(raw interface{}) error {
} }
} }
if deregister, ok := rawMap[deregisterKey]; ok {
timeoutS, ok := deregister.(string)
if ok {
if dur, err := time.ParseDuration(timeoutS); err != nil {
return err
} else {
rawMap[deregisterKey] = dur
}
}
}
return nil return nil
} }

View File

@ -1258,7 +1258,7 @@ func TestDecodeConfig_Multiples(t *testing.T) {
func TestDecodeConfig_Service(t *testing.T) { func TestDecodeConfig_Service(t *testing.T) {
// Basics // Basics
input := `{"service": {"id": "red1", "name": "redis", "tags": ["master"], "port":8000, "check": {"script": "/bin/check_redis", "interval": "10s", "ttl": "15s" }}}` input := `{"service": {"id": "red1", "name": "redis", "tags": ["master"], "port":8000, "check": {"script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "DeregisterCriticalServiceAfter": "90m" }}}`
config, err := DecodeConfig(bytes.NewReader([]byte(input))) config, err := DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
@ -1296,11 +1296,15 @@ func TestDecodeConfig_Service(t *testing.T) {
if serv.Check.TTL != 15*time.Second { if serv.Check.TTL != 15*time.Second {
t.Fatalf("bad: %v", serv) t.Fatalf("bad: %v", serv)
} }
if serv.Check.DeregisterCriticalServiceAfter != 90*time.Minute {
t.Fatalf("bad: %v", serv)
}
} }
func TestDecodeConfig_Check(t *testing.T) { func TestDecodeConfig_Check(t *testing.T) {
// Basics // Basics
input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis" }}` input := `{"check": {"id": "chk1", "name": "mem", "notes": "foobar", "script": "/bin/check_redis", "interval": "10s", "ttl": "15s", "shell": "/bin/bash", "docker_container_id": "redis", "deregister_critical_service_after": "90s" }}`
config, err := DecodeConfig(bytes.NewReader([]byte(input))) config, err := DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
@ -1342,6 +1346,10 @@ func TestDecodeConfig_Check(t *testing.T) {
if chk.DockerContainerID != "redis" { if chk.DockerContainerID != "redis" {
t.Fatalf("bad: %v", chk) t.Fatalf("bad: %v", chk)
} }
if chk.DeregisterCriticalServiceAfter != 90*time.Second {
t.Fatalf("bad: %v", chk)
}
} }
func TestMergeConfig(t *testing.T) { func TestMergeConfig(t *testing.T) {

View File

@ -57,9 +57,10 @@ type localState struct {
serviceTokens map[string]string serviceTokens map[string]string
// Checks tracks the local checks // Checks tracks the local checks
checks map[types.CheckID]*structs.HealthCheck checks map[types.CheckID]*structs.HealthCheck
checkStatus map[types.CheckID]syncStatus checkStatus map[types.CheckID]syncStatus
checkTokens map[types.CheckID]string checkTokens map[types.CheckID]string
checkCriticalTime map[types.CheckID]time.Time
// Used to track checks that are being deferred // Used to track checks that are being deferred
deferCheck map[types.CheckID]*time.Timer deferCheck map[types.CheckID]*time.Timer
@ -83,6 +84,7 @@ func (l *localState) Init(config *Config, logger *log.Logger) {
l.checks = make(map[types.CheckID]*structs.HealthCheck) l.checks = make(map[types.CheckID]*structs.HealthCheck)
l.checkStatus = make(map[types.CheckID]syncStatus) l.checkStatus = make(map[types.CheckID]syncStatus)
l.checkTokens = make(map[types.CheckID]string) l.checkTokens = make(map[types.CheckID]string)
l.checkCriticalTime = make(map[types.CheckID]time.Time)
l.deferCheck = make(map[types.CheckID]*time.Timer) l.deferCheck = make(map[types.CheckID]*time.Timer)
l.consulCh = make(chan struct{}, 1) l.consulCh = make(chan struct{}, 1)
l.triggerCh = make(chan struct{}, 1) l.triggerCh = make(chan struct{}, 1)
@ -222,6 +224,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) {
l.checks[check.CheckID] = check l.checks[check.CheckID] = check
l.checkStatus[check.CheckID] = syncStatus{} l.checkStatus[check.CheckID] = syncStatus{}
l.checkTokens[check.CheckID] = token l.checkTokens[check.CheckID] = token
delete(l.checkCriticalTime, check.CheckID)
l.changeMade() l.changeMade()
} }
@ -233,6 +236,7 @@ func (l *localState) RemoveCheck(checkID types.CheckID) {
delete(l.checks, checkID) delete(l.checks, checkID)
delete(l.checkTokens, checkID) delete(l.checkTokens, checkID)
delete(l.checkCriticalTime, checkID)
l.checkStatus[checkID] = syncStatus{remoteDelete: true} l.checkStatus[checkID] = syncStatus{remoteDelete: true}
l.changeMade() l.changeMade()
} }
@ -247,6 +251,17 @@ func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) {
return return
} }
// Update the critical time tracking (this doesn't cause a server updates
// so we can always keep this up to date).
if status == structs.HealthCritical {
_, wasCritical := l.checkCriticalTime[checkID]
if !wasCritical {
l.checkCriticalTime[checkID] = time.Now()
}
} else {
delete(l.checkCriticalTime, checkID)
}
// Do nothing if update is idempotent // Do nothing if update is idempotent
if check.Status == status && check.Output == output { if check.Status == status && check.Output == output {
return return
@ -294,6 +309,34 @@ func (l *localState) Checks() map[types.CheckID]*structs.HealthCheck {
return checks return checks
} }
// CriticalCheck is used to return the duration a check has been critical along
// with its associated health check.
type CriticalCheck struct {
CriticalFor time.Duration
Check *structs.HealthCheck
}
// CriticalChecks returns locally registered health checks that the agent is
// aware of and are being kept in sync with the server, and that are in a
// critical state. This also returns information about how long each check has
// been critical.
func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
checks := make(map[types.CheckID]CriticalCheck)
l.RLock()
defer l.RUnlock()
now := time.Now()
for checkID, criticalTime := range l.checkCriticalTime {
checks[checkID] = CriticalCheck{
CriticalFor: now.Sub(criticalTime),
Check: l.checks[checkID],
}
}
return checks
}
// antiEntropy is a long running method used to perform anti-entropy // antiEntropy is a long running method used to perform anti-entropy
// between local and remote state. // between local and remote state.
func (l *localState) antiEntropy(shutdownCh chan struct{}) { func (l *localState) antiEntropy(shutdownCh chan struct{}) {
@ -546,7 +589,7 @@ func (l *localState) deleteService(id string) error {
return err return err
} }
// deleteCheck is used to delete a service from the server // deleteCheck is used to delete a check from the server
func (l *localState) deleteCheck(id types.CheckID) error { func (l *localState) deleteCheck(id types.CheckID) error {
if id == "" { if id == "" {
return fmt.Errorf("CheckID missing") return fmt.Errorf("CheckID missing")

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/types"
) )
func TestAgentAntiEntropy_Services(t *testing.T) { func TestAgentAntiEntropy_Services(t *testing.T) {
@ -959,6 +960,66 @@ func TestAgent_checkTokens(t *testing.T) {
} }
} }
func TestAgent_checkCriticalTime(t *testing.T) {
config := nextConfig()
l := new(localState)
l.Init(config, nil)
// Add a passing check and make sure it's not critical.
checkID := types.CheckID("redis:1")
chk := &structs.HealthCheck{
Node: "node",
CheckID: checkID,
Name: "redis:1",
ServiceID: "redis",
Status: structs.HealthPassing,
}
l.AddCheck(chk, "")
if checks := l.CriticalChecks(); len(checks) > 0 {
t.Fatalf("should not have any critical checks")
}
// Set it to warning and make sure that doesn't show up as critical.
l.UpdateCheck(checkID, structs.HealthWarning, "")
if checks := l.CriticalChecks(); len(checks) > 0 {
t.Fatalf("should not have any critical checks")
}
// Fail the check and make sure the time looks reasonable.
l.UpdateCheck(checkID, structs.HealthCritical, "")
if crit, ok := l.CriticalChecks()[checkID]; !ok {
t.Fatalf("should have a critical check")
} else if crit.CriticalFor > time.Millisecond {
t.Fatalf("bad: %#v", crit)
}
// Wait a while, then fail it again and make sure the time keeps track
// of the initial failure, and doesn't reset here.
time.Sleep(10 * time.Millisecond)
l.UpdateCheck(chk.CheckID, structs.HealthCritical, "")
if crit, ok := l.CriticalChecks()[checkID]; !ok {
t.Fatalf("should have a critical check")
} else if crit.CriticalFor < 5*time.Millisecond ||
crit.CriticalFor > 15*time.Millisecond {
t.Fatalf("bad: %#v", crit)
}
// Set it passing again.
l.UpdateCheck(checkID, structs.HealthPassing, "")
if checks := l.CriticalChecks(); len(checks) > 0 {
t.Fatalf("should not have any critical checks")
}
// Fail the check and make sure the time looks like it started again
// from the latest failure, not the original one.
l.UpdateCheck(checkID, structs.HealthCritical, "")
if crit, ok := l.CriticalChecks()[checkID]; !ok {
t.Fatalf("should have a critical check")
} else if crit.CriticalFor > time.Millisecond {
t.Fatalf("bad: %#v", crit)
}
}
func TestAgent_nestedPauseResume(t *testing.T) { func TestAgent_nestedPauseResume(t *testing.T) {
l := new(localState) l := new(localState)
if l.isPaused() != false { if l.isPaused() != false {

View File

@ -169,6 +169,16 @@ parsed by Go's `time` package, and has the following
> optional fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". > optional fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m".
> Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". > Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
In Consul 0.7 and later, checks that are associated with a service may also contain
an optional `deregister_critical_service_after` field, which is a timeout in the
same Go time format as `interval` and `ttl`. If a check is in the critical state
for more than this configured value, then its associated service (and all of its
associated checks) will automatically be deregistered. The minimum timeout is 1
minute, and the process that reaps critical services runs every 30 seconds, so it
may take slightly longer than the configured timeout to trigger the deregistration.
This should generally be configured with a timeout that's much, much longer than
any expected recoverable outage for the given service.
To configure a check, either provide it as a `-config-file` option to the To configure a check, either provide it as a `-config-file` option to the
agent or place it inside the `-config-dir` of the agent. The file must agent or place it inside the `-config-dir` of the agent. The file must
end in the ".json" extension to be loaded by Consul. Check definitions can end in the ".json" extension to be loaded by Consul. Check definitions can

View File

@ -243,13 +243,14 @@ body must look like:
"ID": "mem", "ID": "mem",
"Name": "Memory utilization", "Name": "Memory utilization",
"Notes": "Ensure we don't oversubscribe memory", "Notes": "Ensure we don't oversubscribe memory",
"DeregisterCriticalServiceAfter": "90m"
"Script": "/usr/local/bin/check_mem.py", "Script": "/usr/local/bin/check_mem.py",
"DockerContainerID": "f972c95ebf0e", "DockerContainerID": "f972c95ebf0e",
"Shell": "/bin/bash", "Shell": "/bin/bash",
"HTTP": "http://example.com", "HTTP": "http://example.com",
"TCP": "example.com:22", "TCP": "example.com:22",
"Interval": "10s", "Interval": "10s",
"TTL": "15s" "TTL": "15s",
} }
``` ```
@ -261,6 +262,16 @@ If an `ID` is not provided, it is set to `Name`. You cannot have duplicate
The `Notes` field is not used internally by Consul and is meant to be human-readable. The `Notes` field is not used internally by Consul and is meant to be human-readable.
In Consul 0.7 and later, checks that are associated with a service may also contain
an optional `DeregisterCriticalServiceAfter` field, which is a timeout in the same Go
time format as `Interval` and `TTL`. If a check is in the critical state for more than
this configured value, then its associated service (and all of its associated checks)
will automatically be deregistered. The minimum timeout is 1 minute, and the
process that reaps critical services runs every 30 seconds, so it may take slightly
longer than the configured timeout to trigger the deregistration. This should
generally be configured with a timeout that's much, much longer than any expected
recoverable outage for the given service.
If a `Script` is provided, the check type is a script, and Consul will If a `Script` is provided, the check type is a script, and Consul will
evaluate the script every `Interval` to update the status. evaluate the script every `Interval` to update the status.
@ -389,6 +400,7 @@ body must look like:
"Port": 8000, "Port": 8000,
"EnableTagOverride": false, "EnableTagOverride": false,
"Check": { "Check": {
"DeregisterCriticalServiceAfter": "90m",
"Script": "/usr/local/bin/check_redis.py", "Script": "/usr/local/bin/check_redis.py",
"HTTP": "http://localhost:5000/health", "HTTP": "http://localhost:5000/health",
"Interval": "10s", "Interval": "10s",
@ -413,6 +425,17 @@ information.
If `Check` is provided, only one of `Script`, `HTTP`, `TCP` or `TTL` should be specified. If `Check` is provided, only one of `Script`, `HTTP`, `TCP` or `TTL` should be specified.
`Script` and `HTTP` also require `Interval`. The created check will be named "service:\<ServiceId\>". `Script` and `HTTP` also require `Interval`. The created check will be named "service:\<ServiceId\>".
In Consul 0.7 and later, checks that are associated with a service may also contain
an optional `DeregisterCriticalServiceAfter` field, which is a timeout in the same Go time
format as `Interval` and `TTL`. If a check is in the critical state for more than this
configured value, then its associated service (and all of its associated checks)
will automatically be deregistered. The minimum timeout is 1 minute, and the
process that reaps critical services runs every 30 seconds, so it may take slightly
longer than the configured timeout to trigger the deregistration. This should
generally be configured with a timeout that's much, much longer than any expected
recoverable outage for the given service.
There is more information about checks [here](/docs/agent/checks.html). There is more information about checks [here](/docs/agent/checks.html).
`EnableTagOverride` can optionally be specified to disable the anti-entropy `EnableTagOverride` can optionally be specified to disable the anti-entropy