@ -173,10 +173,10 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error {
}
err := watch . RegisterAndStart (
s . ctx ,
previousDefaults ,
waitForCentralConfig ,
persistServiceConfig ,
s . ctx ,
& s . running ,
)
if err != nil {
@ -232,17 +232,16 @@ type serviceConfigWatch struct {
// updateCh receives changes from cache watchers
updateCh chan cache . UpdateEvent
ctx context . Context
cancelFunc func ( )
running sync . WaitGroup
}
// NOTE: this is called while holding the Agent.stateLock
func ( w * serviceConfigWatch ) RegisterAndStart (
ctx context . Context ,
previousDefaults * structs . ServiceConfigResponse ,
waitForCentralConfig bool ,
persistServiceConfig bool ,
ctx context . Context ,
wg * sync . WaitGroup ,
) error {
service := w . registration . service
@ -314,7 +313,7 @@ func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error {
//
// NOTE: this is called while holding the Agent.stateLock
func ( w * serviceConfigWatch ) start ( ctx context . Context , wg * sync . WaitGroup ) error {
w . ctx , w . cancelFunc = context . WithCancel ( ctx )
ctx , w . cancelFunc = context . WithCancel ( ctx )
// Configure and start a cache.Notify goroutine to run a continuous
// blocking query on the resolved service config for this service.
@ -328,7 +327,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
// the cacheKey allows us to ignore updates from the old cache watch and makes
// even this rare edge case safe.
err := w . agent . cache . Notify (
w . ctx ,
ctx ,
cachetype . ResolvedServiceConfigName ,
req ,
w . cacheKey ,
@ -341,7 +340,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
w . running . Add ( 1 )
wg . Add ( 1 )
go w . runWatch ( wg )
go w . runWatch ( ctx , wg )
return nil
}
@ -355,16 +354,16 @@ func (w *serviceConfigWatch) Stop() {
// config watch is shut down.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func ( w * serviceConfigWatch ) runWatch ( wg * sync . WaitGroup ) {
func ( w * serviceConfigWatch ) runWatch ( ctx context . Context , wg * sync . WaitGroup ) {
defer wg . Done ( )
defer w . running . Done ( )
for {
select {
case <- w . ctx . Done ( ) :
case <- ctx . Done ( ) :
return
case event := <- w . updateCh :
if err := w . handleUpdate ( event ) ; err != nil {
if err := w . handleUpdate ( ctx , event ) ; err != nil {
w . agent . logger . Error ( "error handling service update" , "error" , err )
}
}
@ -375,7 +374,7 @@ func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) {
// the local state and re-registers the service with the newly merged config.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func ( w * serviceConfigWatch ) handleUpdate ( event cache . UpdateEvent ) error {
func ( w * serviceConfigWatch ) handleUpdate ( ctx context . Context , event cache . UpdateEvent ) error {
// If we got an error, log a warning if this is the first update; otherwise return the error.
// We want the initial update to cause a service registration no matter what.
if event . Err != nil {
@ -406,7 +405,7 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error {
// While we were waiting on the agent state lock we may have been shutdown.
// So avoid doing a registration in that case.
select {
case <- w . ctx . Done ( ) :
case <- ctx . Done ( ) :
return nil
default :
}
@ -427,13 +426,13 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error {
}
select {
case <- w . ctx . Done ( ) :
case <- ctx . Done ( ) :
return nil
case w . registerCh <- registerReq :
}
select {
case <- w . ctx . Done ( ) :
case <- ctx . Done ( ) :
return nil
case err := <- registerReq . Reply :