diff --git a/agent/service_manager.go b/agent/service_manager.go index 5a163f3596..915cf9dfd8 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -173,10 +173,10 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error { } err := watch.RegisterAndStart( + s.ctx, previousDefaults, waitForCentralConfig, persistServiceConfig, - s.ctx, &s.running, ) if err != nil { @@ -232,17 +232,16 @@ type serviceConfigWatch struct { // updateCh receives changes from cache watchers updateCh chan cache.UpdateEvent - ctx context.Context cancelFunc func() running sync.WaitGroup } // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) RegisterAndStart( + ctx context.Context, previousDefaults *structs.ServiceConfigResponse, waitForCentralConfig bool, persistServiceConfig bool, - ctx context.Context, wg *sync.WaitGroup, ) error { service := w.registration.service @@ -314,7 +313,7 @@ func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error { // // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error { - w.ctx, w.cancelFunc = context.WithCancel(ctx) + ctx, w.cancelFunc = context.WithCancel(ctx) // Configure and start a cache.Notify goroutine to run a continuous // blocking query on the resolved service config for this service. @@ -328,7 +327,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro // the cacheKey allows us to ignore updates from the old cache watch and makes // even this rare edge case safe. err := w.agent.cache.Notify( - w.ctx, + ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, @@ -341,7 +340,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro w.running.Add(1) wg.Add(1) - go w.runWatch(wg) + go w.runWatch(ctx, wg) return nil } @@ -355,16 +354,16 @@ func (w *serviceConfigWatch) Stop() { // config watch is shut down. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) { +func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() defer w.running.Done() for { select { - case <-w.ctx.Done(): + case <-ctx.Done(): return case event := <-w.updateCh: - if err := w.handleUpdate(event); err != nil { + if err := w.handleUpdate(ctx, event); err != nil { w.agent.logger.Error("error handling service update", "error", err) } } @@ -375,7 +374,7 @@ func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) { // the local state and re-registers the service with the newly merged config. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { +func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.UpdateEvent) error { // If we got an error, log a warning if this is the first update; otherwise return the error. // We want the initial update to cause a service registration no matter what. if event.Err != nil { @@ -406,7 +405,7 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. select { - case <-w.ctx.Done(): + case <-ctx.Done(): return nil default: } @@ -427,13 +426,13 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { } select { - case <-w.ctx.Done(): + case <-ctx.Done(): return nil case w.registerCh <- registerReq: } select { - case <-w.ctx.Done(): + case <-ctx.Done(): return nil case err := <-registerReq.Reply: