Fix a race in the ready logic

pull/5700/head
Kyle Havlovitz 2019-04-24 06:46:30 -07:00
parent c269369760
commit cba47aa0ca
2 changed files with 18 additions and 19 deletions

View File

@ -243,6 +243,8 @@ type Agent struct {
// directly. // directly.
proxyConfig *proxycfg.Manager proxyConfig *proxycfg.Manager
// serviceManager is the manager for combining local service registrations with
// the centrally configured proxy/service defaults.
serviceManager *ServiceManager serviceManager *ServiceManager
// xdsServer is the Server instance that serves xDS gRPC API. // xdsServer is the Server instance that serves xDS gRPC API.

View File

@ -34,6 +34,9 @@ func NewServiceManager(agent *Agent) *ServiceManager {
// to fetch the merged global defaults that apply to the service in order to compose the // to fetch the merged global defaults that apply to the service in order to compose the
// initial registration. // initial registration.
func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
s.lock.Lock()
defer s.lock.Unlock()
reg := serviceRegistration{ reg := serviceRegistration{
service: service, service: service,
chkTypes: chkTypes, chkTypes: chkTypes,
@ -44,9 +47,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st
// If a service watch already exists, update the registration. Otherwise, // If a service watch already exists, update the registration. Otherwise,
// start a new config watcher. // start a new config watcher.
s.lock.Lock()
watch, ok := s.services[service.ID] watch, ok := s.services[service.ID]
s.lock.Unlock()
if ok { if ok {
if err := watch.updateRegistration(&reg); err != nil { if err := watch.updateRegistration(&reg); err != nil {
return err return err
@ -75,9 +76,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st
return err return err
} }
s.lock.Lock()
s.services[service.ID] = watch s.services[service.ID] = watch
s.lock.Unlock()
s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID) s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID)
} }
@ -117,10 +116,8 @@ type serviceConfigWatch struct {
agent *Agent agent *Agent
// readyCh is used for ReadyWait in order to block until the first update // readyCh is used for ReadyWait in order to block until the first update
// for the resolved service config is received from the cache. Both this // for the resolved service config is received from the cache.
// and ready are protected the lock.
readyCh chan error readyCh chan error
ready bool
updateCh chan cache.UpdateEvent updateCh chan cache.UpdateEvent
ctx context.Context ctx context.Context
@ -156,14 +153,16 @@ func (s *serviceConfigWatch) ReadyWait() error {
// runWatch handles any update events from the cache.Notify until the // runWatch handles any update events from the cache.Notify until the
// config watch is shut down. // config watch is shut down.
func (s *serviceConfigWatch) runWatch() { func (s *serviceConfigWatch) runWatch() {
firstRun := true
for { for {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return return
case event := <-s.updateCh: case event := <-s.updateCh:
if err := s.handleUpdate(event, false); err != nil { if err := s.handleUpdate(event, false, firstRun); err != nil {
s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err)
} }
firstRun = false
} }
} }
} }
@ -172,14 +171,13 @@ func (s *serviceConfigWatch) runWatch() {
// global config defaults, updates the local state and re-registers the service with // global config defaults, updates the local state and re-registers the service with
// the newly merged config. This function takes the serviceConfigWatch lock to ensure // the newly merged config. This function takes the serviceConfigWatch lock to ensure
// only one update can be happening at a time. // only one update can be happening at a time.
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, firstRun bool) error {
// Take the agent state lock if needed. This is done before the local config watch // Take the agent state lock if needed. This is done before the local config watch
// lock in order to prevent a race between this config watch and others - the config // lock in order to prevent a race between this config watch and others - the config
// watch lock is the inner lock and the agent stateLock is the outer lock. In the case // watch lock is the inner lock and the agent stateLock is the outer lock. If this is the
// where s.ready == false we assume the lock is already held, since this update is being // first run we also don't need to take the stateLock, as this is being waited on
// waited on for the initial registration by a call from the agent that already holds // synchronously by a caller that already holds it.
// the lock. if !locked && !firstRun {
if !locked && s.ready {
s.agent.stateLock.Lock() s.agent.stateLock.Lock()
defer s.agent.stateLock.Unlock() defer s.agent.stateLock.Unlock()
} }
@ -189,7 +187,7 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool)
// If we got an error, log a warning if this is the first update; otherwise return the error. // If we got an error, log a warning if this is the first update; otherwise return the error.
// We want the initial update to cause a service registration no matter what. // We want the initial update to cause a service registration no matter what.
if event.Err != nil { if event.Err != nil {
if !s.ready { if firstRun {
s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v", s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v",
s.registration.service.ID, event.Err) s.registration.service.ID, event.Err)
} else { } else {
@ -212,16 +210,15 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool)
if err != nil { if err != nil {
// If this is the initial registration, return the error through the readyCh // If this is the initial registration, return the error through the readyCh
// so it can be passed back to the original caller. // so it can be passed back to the original caller.
if !s.ready { if firstRun {
s.readyCh <- err s.readyCh <- err
} }
return fmt.Errorf("error updating service registration: %v", err) return fmt.Errorf("error updating service registration: %v", err)
} }
// If this is the first registration, set the ready status by closing the channel. // If this is the first registration, set the ready status by closing the channel.
if !s.ready { if firstRun {
close(s.readyCh) close(s.readyCh)
s.ready = true
} }
return nil return nil
@ -250,7 +247,7 @@ func (s *serviceConfigWatch) startConfigWatch() error {
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error {
return s.handleUpdate(cache.UpdateEvent{ return s.handleUpdate(cache.UpdateEvent{
Result: registration, Result: registration,
}, true) }, true, false)
} }
// mergeServiceConfig returns the final effective config for the watched service, // mergeServiceConfig returns the final effective config for the watched service,