From 73cd0b6fac4bec7901fd6d35e21f817f9c4d57b3 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 17 Apr 2020 17:04:58 -0400 Subject: [PATCH] agent/service_manager: remove 'updateCh' field from serviceConfigWatch Passing the channel to the function which uses it significantly reduces the scope of the variable, and makes its usage more explicit. It also moves the initialization of the channel closer to where it is used. Also includes a couple very small cleanups to remove a local var and read the error from `ctx.Err()` directly instead of creating a channel to check for an error. --- agent/service_manager.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/agent/service_manager.go b/agent/service_manager.go index 16d15255be..a20a00d198 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -168,7 +168,6 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error { // merged config. watch := &serviceConfigWatch{ registration: reg, - updateCh: make(chan cache.UpdateEvent, 1), agent: s.agent, registerCh: s.registerCh, } @@ -229,9 +228,6 @@ type serviceConfigWatch struct { // we check to see if a new cache watch is needed. cacheKey string - // updateCh receives changes from cache watchers - updateCh chan cache.UpdateEvent - cancelFunc func() running sync.WaitGroup } @@ -244,8 +240,6 @@ func (w *serviceConfigWatch) RegisterAndStart( persistServiceConfig bool, wg *sync.WaitGroup, ) error { - service := w.registration.service - // Either we explicitly block waiting for defaults before registering, // or we feed it some seed data (or NO data) and bypass the blocking // operation. Either way the watcher will end up with something flagged @@ -254,7 +248,8 @@ func (w *serviceConfigWatch) RegisterAndStart( var err error serviceDefaults, err = w.fetchDefaults(ctx) if err != nil { - return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) + return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", + w.registration.service.ID, err) } } @@ -318,6 +313,8 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro req := makeConfigRequest(w.agent, w.registration) w.cacheKey = req.CacheInfo().Key + updateCh := make(chan cache.UpdateEvent, 1) + // We use the cache key as the correlationID here. Notify in general will not // respond on the updateCh after the context is cancelled however there could // possible be a race where it has only just got an update and checked the @@ -329,7 +326,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro cachetype.ResolvedServiceConfigName, req, w.cacheKey, - w.updateCh, + updateCh, ) if err != nil { w.cancelFunc() @@ -338,7 +335,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro w.running.Add(1) wg.Add(1) - go w.runWatch(ctx, wg) + go w.runWatch(ctx, wg, updateCh) return nil } @@ -352,7 +349,7 @@ func (w *serviceConfigWatch) Stop() { // config watch is shut down. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { +func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) { defer wg.Done() defer w.running.Done() @@ -360,7 +357,7 @@ func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { select { case <-ctx.Done(): return - case event := <-w.updateCh: + case event := <-updateCh: if err := w.handleUpdate(ctx, event); err != nil { w.agent.logger.Error("error handling service update", "error", err) } @@ -401,10 +398,8 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. - select { - case <-ctx.Done(): + if err := ctx.Err(); err != nil { return nil - default: } registerReq := &asyncRegisterRequest{