Register and deregisters services and their checks atomically in the local state (#5012)

Prevent race between register and deregister requests by saving them
together in the local state on registration.
Also adds more cleaning in case of failure when registering services
/ checks.
pull/5428/head
Aestek 2019-03-04 15:34:05 +01:00 committed by Matt Keeler
parent 219e9f8297
commit 2aac4d5168
3 changed files with 257 additions and 88 deletions

View File

@ -178,8 +178,8 @@ type Agent struct {
// checkAliases maps the check ID to an associated Alias checks // checkAliases maps the check ID to an associated Alias checks
checkAliases map[types.CheckID]*checks.CheckAlias checkAliases map[types.CheckID]*checks.CheckAlias
// checkLock protects updates to the check* maps // stateLock protects the agent state
checkLock sync.Mutex stateLock sync.Mutex
// dockerClient is the client for performing docker health checks. // dockerClient is the client for performing docker health checks.
dockerClient *checks.DockerClient dockerClient *checks.DockerClient
@ -236,10 +236,6 @@ type Agent struct {
// proxyManager is the proxy process manager for managed Connect proxies. // proxyManager is the proxy process manager for managed Connect proxies.
proxyManager *proxyprocess.Manager proxyManager *proxyprocess.Manager
// proxyLock protects _managed_ proxy information in the local state from
// concurrent modification. It is not needed to work with proxyConfig state.
proxyLock sync.Mutex
// proxyConfig is the manager for proxy service (Kind = connect-proxy) // proxyConfig is the manager for proxy service (Kind = connect-proxy)
// configuration state. This ensures all state needed by a proxy registration // configuration state. This ensures all state needed by a proxy registration
// is maintained in cache and handles pushing updates to that state into XDS // is maintained in cache and handles pushing updates to that state into XDS
@ -342,6 +338,9 @@ func (a *Agent) setupProxyManager() error {
} }
func (a *Agent) Start() error { func (a *Agent) Start() error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
c := a.config c := a.config
logOutput := a.LogOutput logOutput := a.LogOutput
@ -1439,8 +1438,8 @@ func (a *Agent) ShutdownAgent() error {
a.logger.Println("[INFO] agent: Requesting shutdown") a.logger.Println("[INFO] agent: Requesting shutdown")
// Stop all the checks // Stop all the checks
a.checkLock.Lock() a.stateLock.Lock()
defer a.checkLock.Unlock() defer a.stateLock.Unlock()
for _, chk := range a.checkMonitors { for _, chk := range a.checkMonitors {
chk.Stop() chk.Stop()
} }
@ -1754,17 +1753,21 @@ func (a *Agent) reapServicesInternal() {
// See if there's a timeout. // See if there's a timeout.
// todo(fs): this looks fishy... why is there another data structure in the agent with its own lock? // todo(fs): this looks fishy... why is there another data structure in the agent with its own lock?
a.checkLock.Lock() a.stateLock.Lock()
timeout := a.checkReapAfter[checkID] timeout := a.checkReapAfter[checkID]
a.checkLock.Unlock() a.stateLock.Unlock()
// Reap, if necessary. We keep track of which service // Reap, if necessary. We keep track of which service
// this is so that we won't try to remove it again. // this is so that we won't try to remove it again.
if timeout > 0 && cs.CriticalFor() > timeout { if timeout > 0 && cs.CriticalFor() > timeout {
reaped[serviceID] = true reaped[serviceID] = true
a.RemoveService(serviceID, true) if err := a.RemoveService(serviceID, true); err != nil {
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service", a.logger.Printf("[ERR] agent: unable to deregister service %q after check %q has been critical for too long: %s",
checkID, serviceID) serviceID, checkID, err)
} else {
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
checkID, serviceID)
}
} }
} }
} }
@ -1886,6 +1889,12 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addServiceLocked(service, chkTypes, persist, token, source)
}
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
if service.Service == "" { if service.Service == "" {
return fmt.Errorf("Service name missing") return fmt.Errorf("Service name missing")
} }
@ -1932,15 +1941,12 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
a.PauseSync() a.PauseSync()
defer a.ResumeSync() defer a.ResumeSync()
// Add the service // Take a snapshot of the current state of checks (if any), and
a.State.AddService(service, token) // restore them before resuming anti-entropy.
snap := a.snapshotCheckState()
defer a.restoreCheckState(snap)
// Persist the service to a file var checks []*structs.HealthCheck
if persist && a.config.DataDir != "" {
if err := a.persistService(service); err != nil {
return err
}
}
// Create an associated health check // Create an associated health check
for i, chkType := range chkTypes { for i, chkType := range chkTypes {
@ -1968,7 +1974,51 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
if chkType.Status != "" { if chkType.Status != "" {
check.Status = chkType.Status check.Status = chkType.Status
} }
if err := a.AddCheck(check, chkType, persist, token, source); err != nil {
checks = append(checks, check)
}
// cleanup, store the ids of services and checks that weren't previously
// registered so we clean them up if somthing fails halfway through the
// process.
var cleanupServices []string
var cleanupChecks []types.CheckID
if s := a.State.Service(service.ID); s == nil {
cleanupServices = append(cleanupServices, service.ID)
}
for _, check := range checks {
if c := a.State.Check(check.CheckID); c == nil {
cleanupChecks = append(cleanupChecks, check.CheckID)
}
}
err := a.State.AddServiceWithChecks(service, checks, token)
if err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
for i := range checks {
if err := a.addCheck(checks[i], chkTypes[i], service, persist, token, source); err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
if persist && a.config.DataDir != "" {
if err := a.persistCheck(checks[i], chkTypes[i]); err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err
}
}
}
// Persist the service to a file
if persist && a.config.DataDir != "" {
if err := a.persistService(service); err != nil {
a.cleanupRegistration(cleanupServices, cleanupChecks)
return err return err
} }
} }
@ -1976,16 +2026,53 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
return nil return nil
} }
// cleanupRegistration is called on registration error to ensure no there are no
// leftovers after a partial failure
func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) {
for _, s := range serviceIDs {
if err := a.State.RemoveService(s); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to remove service %s: %s", s, err)
}
if err := a.purgeService(s); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge service %s file: %s", s, err)
}
}
for _, c := range checksIDs {
a.cancelCheckMonitors(c)
if err := a.State.RemoveCheck(c); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to remove check %s: %s", c, err)
}
if err := a.purgeCheck(c); err != nil {
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge check %s file: %s", c, err)
}
}
}
// RemoveService is used to remove a service entry. // RemoveService is used to remove a service entry.
// The agent will make a best effort to ensure it is deregistered // The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveService(serviceID string, persist bool) error { func (a *Agent) RemoveService(serviceID string, persist bool) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.removeServiceLocked(serviceID, persist)
}
// removeServiceLocked is used to remove a service entry.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) removeServiceLocked(serviceID string, persist bool) error {
// Validate ServiceID // Validate ServiceID
if serviceID == "" { if serviceID == "" {
return fmt.Errorf("ServiceID missing") return fmt.Errorf("ServiceID missing")
} }
checks := a.State.Checks()
var checkIDs []types.CheckID
for id := range checks {
checkIDs = append(checkIDs, id)
}
// Remove service immediately // Remove service immediately
if err := a.State.RemoveService(serviceID); err != nil { if err := a.State.RemoveServiceWithChecks(serviceID, checkIDs); err != nil {
a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err) a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err)
return nil return nil
} }
@ -1998,11 +2085,11 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
} }
// Deregister any associated health checks // Deregister any associated health checks
for checkID, check := range a.State.Checks() { for checkID, check := range checks {
if check.ServiceID != serviceID { if check.ServiceID != serviceID {
continue continue
} }
if err := a.RemoveCheck(checkID, persist); err != nil { if err := a.removeCheckLocked(checkID, persist); err != nil {
return err return err
} }
} }
@ -2010,7 +2097,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
// Remove the associated managed proxy if it exists // Remove the associated managed proxy if it exists
for proxyID, p := range a.State.Proxies() { for proxyID, p := range a.State.Proxies() {
if p.Proxy.TargetServiceID == serviceID { if p.Proxy.TargetServiceID == serviceID {
if err := a.RemoveProxy(proxyID, true); err != nil { if err := a.removeProxyLocked(proxyID, true); err != nil {
return err return err
} }
} }
@ -2024,7 +2111,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
// this from a sidecar. // this from a sidecar.
if sidecar.LocallyRegisteredAsSidecar { if sidecar.LocallyRegisteredAsSidecar {
// Remove it! // Remove it!
err := a.RemoveService(a.sidecarServiceID(serviceID), persist) err := a.removeServiceLocked(a.sidecarServiceID(serviceID), persist)
if err != nil { if err != nil {
return err return err
} }
@ -2039,6 +2126,50 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
// ensure it is registered. The Check may include a CheckType which // ensure it is registered. The Check may include a CheckType which
// is used to automatically update the check status // is used to automatically update the check status
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, persist bool, token string, source configSource) error { func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addCheckLocked(check, chkType, persist, token, source)
}
func (a *Agent) addCheckLocked(check *structs.HealthCheck, chkType *structs.CheckType, persist bool, token string, source configSource) error {
var service *structs.NodeService
if check.ServiceID != "" {
service = a.State.Service(check.ServiceID)
if service == nil {
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
}
}
// snapshot the current state of the health check to avoid potential flapping
existing := a.State.Check(check.CheckID)
defer func() {
if existing != nil {
a.State.UpdateCheck(check.CheckID, existing.Status, existing.Output)
}
}()
err := a.addCheck(check, chkType, service, persist, token, source)
if err != nil {
a.State.RemoveCheck(check.CheckID)
return err
}
// Add to the local state for anti-entropy
err = a.State.AddCheck(check, token)
if err != nil {
return err
}
// Persist the check
if persist && a.config.DataDir != "" {
return a.persistCheck(check, chkType)
}
return nil
}
func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType, service *structs.NodeService, persist bool, token string, source configSource) error {
if check.CheckID == "" { if check.CheckID == "" {
return fmt.Errorf("CheckID missing") return fmt.Errorf("CheckID missing")
} }
@ -2060,25 +2191,10 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
if check.ServiceID != "" { if check.ServiceID != "" {
s := a.State.Service(check.ServiceID) check.ServiceName = service.Service
if s == nil { check.ServiceTags = service.Tags
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
}
check.ServiceName = s.Service
check.ServiceTags = s.Tags
} }
a.checkLock.Lock()
defer a.checkLock.Unlock()
// snapshot the current state of the health check to avoid potential flapping
existing := a.State.Check(check.CheckID)
defer func() {
if existing != nil {
a.State.UpdateCheck(check.CheckID, existing.Status, existing.Output)
}
}()
// Check if already registered // Check if already registered
if chkType != nil { if chkType != nil {
switch { switch {
@ -2296,37 +2412,28 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
} }
// Add to the local state for anti-entropy
err := a.State.AddCheck(check, token)
if err != nil {
a.cancelCheckMonitors(check.CheckID)
return err
}
// Persist the check
if persist && a.config.DataDir != "" {
return a.persistCheck(check, chkType)
}
return nil return nil
} }
// RemoveCheck is used to remove a health check. // RemoveCheck is used to remove a health check.
// The agent will make a best effort to ensure it is deregistered // The agent will make a best effort to ensure it is deregistered
func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.removeCheckLocked(checkID, persist)
}
// removeCheckLocked is used to remove a health check.
// The agent will make a best effort to ensure it is deregistered
func (a *Agent) removeCheckLocked(checkID types.CheckID, persist bool) error {
// Validate CheckID // Validate CheckID
if checkID == "" { if checkID == "" {
return fmt.Errorf("CheckID missing") return fmt.Errorf("CheckID missing")
} }
// Add to the local state for anti-entropy
a.State.RemoveCheck(checkID)
a.tlsConfigurator.RemoveCheck(string(checkID)) a.tlsConfigurator.RemoveCheck(string(checkID))
a.checkLock.Lock()
defer a.checkLock.Unlock()
a.cancelCheckMonitors(checkID) a.cancelCheckMonitors(checkID)
a.State.RemoveCheck(checkID)
if persist { if persist {
if err := a.purgeCheck(checkID); err != nil { if err := a.purgeCheck(checkID); err != nil {
@ -2400,7 +2507,7 @@ func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, From
} }
} }
err = a.AddService(proxyService, chkTypes, persist, token, source) err = a.addServiceLocked(proxyService, chkTypes, persist, token, source)
if err != nil { if err != nil {
// Remove the state too // Remove the state too
a.State.RemoveProxy(proxyService.ID) a.State.RemoveProxy(proxyService.ID)
@ -2431,8 +2538,8 @@ func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, From
// running proxies that already had that credential injected. // running proxies that already had that credential injected.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist, FromFile bool, func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist, FromFile bool,
restoredProxyToken string, source configSource) error { restoredProxyToken string, source configSource) error {
a.proxyLock.Lock() a.stateLock.Lock()
defer a.proxyLock.Unlock() defer a.stateLock.Unlock()
return a.addProxyLocked(proxy, persist, FromFile, restoredProxyToken, source) return a.addProxyLocked(proxy, persist, FromFile, restoredProxyToken, source)
} }
@ -2595,7 +2702,7 @@ func (a *Agent) removeProxyLocked(proxyID string, persist bool) error {
// Remove the proxy service as well. The proxy ID is also the ID // Remove the proxy service as well. The proxy ID is also the ID
// of the servie, but we might as well use the service pointer. // of the servie, but we might as well use the service pointer.
if err := a.RemoveService(p.Proxy.ProxyService.ID, persist); err != nil { if err := a.removeServiceLocked(p.Proxy.ProxyService.ID, persist); err != nil {
return err return err
} }
@ -2608,8 +2715,8 @@ func (a *Agent) removeProxyLocked(proxyID string, persist bool) error {
// RemoveProxy stops and removes a local proxy instance. // RemoveProxy stops and removes a local proxy instance.
func (a *Agent) RemoveProxy(proxyID string, persist bool) error { func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
a.proxyLock.Lock() a.stateLock.Lock()
defer a.proxyLock.Unlock() defer a.stateLock.Unlock()
return a.removeProxyLocked(proxyID, persist) return a.removeProxyLocked(proxyID, persist)
} }
@ -2717,8 +2824,8 @@ func (a *Agent) cancelCheckMonitors(checkID types.CheckID) {
// updateTTLCheck is used to update the status of a TTL check via the Agent API. // updateTTLCheck is used to update the status of a TTL check via the Agent API.
func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error { func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) error {
a.checkLock.Lock() a.stateLock.Lock()
defer a.checkLock.Unlock() defer a.stateLock.Unlock()
// Grab the TTL check. // Grab the TTL check.
check, ok := a.checkTTLs[checkID] check, ok := a.checkTTLs[checkID]
@ -2921,13 +3028,13 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// syntax sugar and shouldn't be persisted in local or server state. // syntax sugar and shouldn't be persisted in local or server state.
ns.Connect.SidecarService = nil ns.Connect.SidecarService = nil
if err := a.AddService(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil { if err := a.addServiceLocked(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register service %q: %v", service.Name, err) return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
} }
// If there is a sidecar service, register that too. // If there is a sidecar service, register that too.
if sidecar != nil { if sidecar != nil {
if err := a.AddService(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil { if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err) return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
} }
} }
@ -2990,7 +3097,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
} else { } else {
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
serviceID, file) serviceID, file)
if err := a.AddService(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil { if err := a.addServiceLocked(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil {
return fmt.Errorf("failed adding service %q: %s", serviceID, err) return fmt.Errorf("failed adding service %q: %s", serviceID, err)
} }
} }
@ -3002,7 +3109,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// unloadServices will deregister all services. // unloadServices will deregister all services.
func (a *Agent) unloadServices() error { func (a *Agent) unloadServices() error {
for id := range a.State.Services() { for id := range a.State.Services() {
if err := a.RemoveService(id, false); err != nil { if err := a.removeServiceLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering service '%s': %v", id, err) return fmt.Errorf("Failed deregistering service '%s': %v", id, err)
} }
} }
@ -3016,7 +3123,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
for _, check := range conf.Checks { for _, check := range conf.Checks {
health := check.HealthCheck(conf.NodeName) health := check.HealthCheck(conf.NodeName)
chkType := check.CheckType() chkType := check.CheckType()
if err := a.AddCheck(health, chkType, false, check.Token, ConfigSourceLocal); err != nil { if err := a.addCheckLocked(health, chkType, false, check.Token, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check)
} }
} }
@ -3071,7 +3178,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
// services into the active pool // services into the active pool
p.Check.Status = api.HealthCritical p.Check.Status = api.HealthCritical
if err := a.AddCheck(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil { if err := a.addCheckLocked(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil {
// Purge the check if it is unable to be restored. // Purge the check if it is unable to be restored.
a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", a.logger.Printf("[WARN] agent: Failed to restore check %q: %s",
checkID, err) checkID, err)
@ -3090,7 +3197,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
// unloadChecks will deregister all checks known to the local agent. // unloadChecks will deregister all checks known to the local agent.
func (a *Agent) unloadChecks() error { func (a *Agent) unloadChecks() error {
for id := range a.State.Checks() { for id := range a.State.Checks() {
if err := a.RemoveCheck(id, false); err != nil { if err := a.removeCheckLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering check '%s': %s", id, err) return fmt.Errorf("Failed deregistering check '%s': %s", id, err)
} }
} }
@ -3153,9 +3260,6 @@ func (a *Agent) loadPersistedProxies() (map[string]persistedProxy, error) {
// loadProxies will load connect proxy definitions from configuration and // loadProxies will load connect proxy definitions from configuration and
// persisted definitions on disk, and load them into the local agent. // persisted definitions on disk, and load them into the local agent.
func (a *Agent) loadProxies(conf *config.RuntimeConfig) error { func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
persistedProxies, persistenceErr := a.loadPersistedProxies() persistedProxies, persistenceErr := a.loadPersistedProxies()
for _, svc := range conf.Services { for _, svc := range conf.Services {
@ -3287,8 +3391,6 @@ func (a *Agent) loadTokens(conf *config.RuntimeConfig) error {
// unloadProxies will deregister all proxies known to the local agent. // unloadProxies will deregister all proxies known to the local agent.
func (a *Agent) unloadProxies() error { func (a *Agent) unloadProxies() error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
for id := range a.State.Proxies() { for id := range a.State.Proxies() {
if err := a.removeProxyLocked(id, false); err != nil { if err := a.removeProxyLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err) return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err)
@ -3432,6 +3534,9 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
a.PauseSync() a.PauseSync()
defer a.ResumeSync() defer a.ResumeSync()
a.stateLock.Lock()
defer a.stateLock.Unlock()
// Snapshot the current state, and restore it afterwards // Snapshot the current state, and restore it afterwards
snap := a.snapshotCheckState() snap := a.snapshotCheckState()
defer a.restoreCheckState(snap) defer a.restoreCheckState(snap)

View File

@ -1859,7 +1859,7 @@ func TestAgent_PersistCheck(t *testing.T) {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if !bytes.Equal(expected, content) { if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content)) t.Fatalf("bad: %s != %s", string(content), expected)
} }
// Updates the check definition on disk // Updates the check definition on disk

View File

@ -11,7 +11,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -19,7 +19,7 @@ import (
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid" uuid "github.com/hashicorp/go-uuid"
) )
// Config is the configuration for the State. // Config is the configuration for the State.
@ -265,6 +265,12 @@ func (l *State) serviceToken(id string) string {
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (l *State) AddService(service *structs.NodeService, token string) error { func (l *State) AddService(service *structs.NodeService, token string) error {
l.Lock()
defer l.Unlock()
return l.addServiceLocked(service, token)
}
func (l *State) addServiceLocked(service *structs.NodeService, token string) error {
if service == nil { if service == nil {
return fmt.Errorf("no service") return fmt.Errorf("no service")
} }
@ -274,18 +280,58 @@ func (l *State) AddService(service *structs.NodeService, token string) error {
service.ID = service.Service service.ID = service.Service
} }
l.SetServiceState(&ServiceState{ l.setServiceStateLocked(&ServiceState{
Service: service, Service: service,
Token: token, Token: token,
}) })
return nil return nil
} }
// AddServiceWithChecks adds a service and its check tp the local state atomically
func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*structs.HealthCheck, token string) error {
l.Lock()
defer l.Unlock()
if err := l.addServiceLocked(service, token); err != nil {
return err
}
for _, check := range checks {
if err := l.addCheckLocked(check, token); err != nil {
return err
}
}
return nil
}
// RemoveService is used to remove a service entry from the local state. // RemoveService is used to remove a service entry from the local state.
// The agent will make a best effort to ensure it is deregistered. // The agent will make a best effort to ensure it is deregistered.
func (l *State) RemoveService(id string) error { func (l *State) RemoveService(id string) error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
return l.removeServiceLocked(id)
}
// RemoveServiceWithChecks removes a service and its check from the local state atomically
func (l *State) RemoveServiceWithChecks(serviceID string, checkIDs []types.CheckID) error {
l.Lock()
defer l.Unlock()
if err := l.removeServiceLocked(serviceID); err != nil {
return err
}
for _, id := range checkIDs {
if err := l.removeCheckLocked(id); err != nil {
return err
}
}
return nil
}
func (l *State) removeServiceLocked(id string) error {
s := l.services[id] s := l.services[id]
if s == nil || s.Deleted { if s == nil || s.Deleted {
@ -358,6 +404,10 @@ func (l *State) SetServiceState(s *ServiceState) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
l.setServiceStateLocked(s)
}
func (l *State) setServiceStateLocked(s *ServiceState) {
s.WatchCh = make(chan struct{}) s.WatchCh = make(chan struct{})
old, hasOld := l.services[s.Service.ID] old, hasOld := l.services[s.Service.ID]
@ -414,6 +464,13 @@ func (l *State) checkToken(id types.CheckID) string {
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (l *State) AddCheck(check *structs.HealthCheck, token string) error { func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
l.Lock()
defer l.Unlock()
return l.addCheckLocked(check, token)
}
func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
if check == nil { if check == nil {
return fmt.Errorf("no check") return fmt.Errorf("no check")
} }
@ -427,14 +484,14 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
// if there is a serviceID associated with the check, make sure it exists before adding it // if there is a serviceID associated with the check, make sure it exists before adding it
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor // NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
if check.ServiceID != "" && l.Service(check.ServiceID) == nil { if _, ok := l.services[check.ServiceID]; check.ServiceID != "" && !ok {
return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID) return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID)
} }
// hard-set the node name // hard-set the node name
check.Node = l.config.NodeName check.Node = l.config.NodeName
l.SetCheckState(&CheckState{ l.setCheckStateLocked(&CheckState{
Check: check, Check: check,
Token: token, Token: token,
}) })
@ -482,7 +539,10 @@ func (l *State) RemoveAliasCheck(checkID types.CheckID, srcServiceID string) {
func (l *State) RemoveCheck(id types.CheckID) error { func (l *State) RemoveCheck(id types.CheckID) error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
return l.removeCheckLocked(id)
}
func (l *State) removeCheckLocked(id types.CheckID) error {
c := l.checks[id] c := l.checks[id]
if c == nil || c.Deleted { if c == nil || c.Deleted {
return fmt.Errorf("Check %q does not exist", id) return fmt.Errorf("Check %q does not exist", id)
@ -620,6 +680,10 @@ func (l *State) SetCheckState(c *CheckState) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
l.setCheckStateLocked(c)
}
func (l *State) setCheckStateLocked(c *CheckState) {
l.checks[c.Check.CheckID] = c l.checks[c.Check.CheckID] = c
l.TriggerSyncChanges() l.TriggerSyncChanges()
} }