diff --git a/agent/service_manager.go b/agent/service_manager.go index f86adf66c1..9e49748b79 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -10,6 +10,11 @@ import ( "golang.org/x/net/context" ) +// The ServiceManager is a layer for service registration in between the agent +// and the local state. Any services must be registered with the ServiceManager, +// which then maintains a long-running watch of any globally-set service or proxy +// configuration that applies to the service in order to register the final, merged +// service configuration locally in the agent state. type ServiceManager struct { services map[string]*serviceConfigWatch agent *Agent @@ -24,10 +29,11 @@ func NewServiceManager(agent *Agent) *ServiceManager { } } +// AddService starts a new serviceConfigWatch if the service has not been registered, and +// updates the existing registration if it has. For a new service, a call will also be made +// to fetch the merged global defaults that apply to the service in order to compose the +// initial registration. func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { - s.Lock() - defer s.Unlock() - reg := serviceRegistration{ service: service, chkTypes: chkTypes, @@ -38,12 +44,14 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // If a service watch already exists, update the registration. Otherwise, // start a new config watcher. + s.Lock() watch, ok := s.services[service.ID] + s.Unlock() if ok { - s.agent.logger.Printf("[DEBUG] agent: updating local registration for service %q", service.ID) if err := watch.updateRegistration(®); err != nil { return err } + s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID) } else { // This is a new entry, so get the existing global config and do the initial // registration with the merged config. @@ -72,10 +80,13 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st return err } + s.Lock() s.services[service.ID] = watch + s.Unlock() if err := watch.Start(); err != nil { return err } + s.agent.logger.Printf("[DEBUG] agent.manager: adding local registration for service %q", service.ID) } return nil @@ -94,6 +105,7 @@ func (s *ServiceManager) RemoveService(serviceID string) { delete(s.services, serviceID) } +// serviceRegistration represents a locally registered service. type serviceRegistration struct { service *structs.NodeService chkTypes []*structs.CheckType @@ -102,6 +114,9 @@ type serviceRegistration struct { source configSource } +// serviceConfigWatch is a long running helper for composing the end config +// for a given service from both the local registration and the global +// service/proxy defaults. type serviceConfigWatch struct { registration *serviceRegistration config *structs.ServiceDefinition @@ -125,6 +140,12 @@ func (s *serviceConfigWatch) Start() error { return nil } +func (s *serviceConfigWatch) Stop() { + s.cancelFunc() +} + +// runWatch handles any update events from the cache.Notify until the +// config watch is shut down. func (s *serviceConfigWatch) runWatch() { for { select { @@ -132,14 +153,24 @@ func (s *serviceConfigWatch) runWatch() { return case event := <-s.updateCh: if err := s.handleUpdate(event, false); err != nil { - s.agent.logger.Printf("[ERR] agent: error handling service update: %v", err) - continue + s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) } } } } +// handleUpdate receives an update event about either the service registration or the +// global config defaults, updates the local state and re-registers the service with +// the newly merged config. This function takes the serviceConfigWatch lock to ensure +// only one update can be happening at a time. func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { + // Take the agent state lock if needed. This is done before the local config watch + // lock in order to prevent a race between this config watch and others - the config + // watch lock is the inner lock and the agent stateLock is the outer lock. + if !locked { + s.agent.stateLock.Lock() + defer s.agent.stateLock.Unlock() + } s.Lock() defer s.Unlock() @@ -158,12 +189,6 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) } service := s.mergeServiceConfig() - - if !locked { - s.agent.stateLock.Lock() - defer s.agent.stateLock.Unlock() - } - err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) if err != nil { return fmt.Errorf("error updating service registration: %v", err) @@ -172,6 +197,8 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) return nil } +// startConfigWatch starts a cache.Notify goroutine to run a continuous blocking query +// on the resolved service config for this service. func (s *serviceConfigWatch) startConfigWatch() error { name := s.registration.service.Service @@ -188,12 +215,16 @@ func (s *serviceConfigWatch) startConfigWatch() error { return err } +// updateRegistration does a synchronous update of the local service registration and +// returns the result. func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { return s.handleUpdate(cache.UpdateEvent{ Result: registration, }, true) } +// mergeServiceConfig returns the final effective config for the watched service, +// including the latest known global defaults from the servers. func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { if s.config == nil { return s.registration.service @@ -204,7 +235,3 @@ func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { return svc } - -func (s *serviceConfigWatch) Stop() { - s.cancelFunc() -}