agent/service_manager: remove 'updateCh' field from serviceConfigWatch

Passing the channel to the function which uses it significantly
reduces the scope of the variable, and makes its usage more explicit. It
also moves the initialization of the channel closer to where it is used.

Also includes a couple very small cleanups to remove a local var and
read the error from `ctx.Err()` directly instead of creating a channel
to check for an error.
pull/7675/head
Daniel Nephin 2020-04-17 17:04:58 -04:00
parent 26291a8482
commit 73cd0b6fac
1 changed files with 9 additions and 14 deletions

View File

@ -168,7 +168,6 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error {
// merged config. // merged config.
watch := &serviceConfigWatch{ watch := &serviceConfigWatch{
registration: reg, registration: reg,
updateCh: make(chan cache.UpdateEvent, 1),
agent: s.agent, agent: s.agent,
registerCh: s.registerCh, registerCh: s.registerCh,
} }
@ -229,9 +228,6 @@ type serviceConfigWatch struct {
// we check to see if a new cache watch is needed. // we check to see if a new cache watch is needed.
cacheKey string cacheKey string
// updateCh receives changes from cache watchers
updateCh chan cache.UpdateEvent
cancelFunc func() cancelFunc func()
running sync.WaitGroup running sync.WaitGroup
} }
@ -244,8 +240,6 @@ func (w *serviceConfigWatch) RegisterAndStart(
persistServiceConfig bool, persistServiceConfig bool,
wg *sync.WaitGroup, wg *sync.WaitGroup,
) error { ) error {
service := w.registration.service
// Either we explicitly block waiting for defaults before registering, // Either we explicitly block waiting for defaults before registering,
// or we feed it some seed data (or NO data) and bypass the blocking // or we feed it some seed data (or NO data) and bypass the blocking
// operation. Either way the watcher will end up with something flagged // operation. Either way the watcher will end up with something flagged
@ -254,7 +248,8 @@ func (w *serviceConfigWatch) RegisterAndStart(
var err error var err error
serviceDefaults, err = w.fetchDefaults(ctx) serviceDefaults, err = w.fetchDefaults(ctx)
if err != nil { if err != nil {
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
w.registration.service.ID, err)
} }
} }
@ -318,6 +313,8 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
req := makeConfigRequest(w.agent, w.registration) req := makeConfigRequest(w.agent, w.registration)
w.cacheKey = req.CacheInfo().Key w.cacheKey = req.CacheInfo().Key
updateCh := make(chan cache.UpdateEvent, 1)
// We use the cache key as the correlationID here. Notify in general will not // We use the cache key as the correlationID here. Notify in general will not
// respond on the updateCh after the context is cancelled however there could // respond on the updateCh after the context is cancelled however there could
// possible be a race where it has only just got an update and checked the // possible be a race where it has only just got an update and checked the
@ -329,7 +326,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
cachetype.ResolvedServiceConfigName, cachetype.ResolvedServiceConfigName,
req, req,
w.cacheKey, w.cacheKey,
w.updateCh, updateCh,
) )
if err != nil { if err != nil {
w.cancelFunc() w.cancelFunc()
@ -338,7 +335,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
w.running.Add(1) w.running.Add(1)
wg.Add(1) wg.Add(1)
go w.runWatch(ctx, wg) go w.runWatch(ctx, wg, updateCh)
return nil return nil
} }
@ -352,7 +349,7 @@ func (w *serviceConfigWatch) Stop() {
// config watch is shut down. // config watch is shut down.
// //
// NOTE: the caller must NOT hold the Agent.stateLock! // NOTE: the caller must NOT hold the Agent.stateLock!
func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) {
defer wg.Done() defer wg.Done()
defer w.running.Done() defer w.running.Done()
@ -360,7 +357,7 @@ func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case event := <-w.updateCh: case event := <-updateCh:
if err := w.handleUpdate(ctx, event); err != nil { if err := w.handleUpdate(ctx, event); err != nil {
w.agent.logger.Error("error handling service update", "error", err) w.agent.logger.Error("error handling service update", "error", err)
} }
@ -401,10 +398,8 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
// While we were waiting on the agent state lock we may have been shutdown. // While we were waiting on the agent state lock we may have been shutdown.
// So avoid doing a registration in that case. // So avoid doing a registration in that case.
select { if err := ctx.Err(); err != nil {
case <-ctx.Done():
return nil return nil
default:
} }
registerReq := &asyncRegisterRequest{ registerReq := &asyncRegisterRequest{