|
|
|
@ -10,8 +10,13 @@ import (
|
|
|
|
|
|
|
|
|
|
type Routine func(ctx context.Context) error |
|
|
|
|
|
|
|
|
|
// cancelCh is the ctx.Done()
|
|
|
|
|
// When cancel() is called, if the routine is running a blocking call (e.g. some ACL replication RPCs),
|
|
|
|
|
// stoppedCh won't be closed till the blocking call returns, while cancelCh will be closed immediately.
|
|
|
|
|
// cancelCh is used to properly detect routine running status between cancel() and close(stoppedCh)
|
|
|
|
|
type routineTracker struct { |
|
|
|
|
cancel context.CancelFunc |
|
|
|
|
cancelCh <-chan struct{} // closed when ctx is done
|
|
|
|
|
stoppedCh chan struct{} // closed when no longer running
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -19,6 +24,8 @@ func (r *routineTracker) running() bool {
|
|
|
|
|
select { |
|
|
|
|
case <-r.stoppedCh: |
|
|
|
|
return false |
|
|
|
|
case <-r.cancelCh: |
|
|
|
|
return false |
|
|
|
|
default: |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
@ -74,6 +81,7 @@ func (m *Manager) Start(ctx context.Context, name string, routine Routine) error
|
|
|
|
|
rtCtx, cancel := context.WithCancel(ctx) |
|
|
|
|
instance := &routineTracker{ |
|
|
|
|
cancel: cancel, |
|
|
|
|
cancelCh: ctx.Done(), |
|
|
|
|
stoppedCh: make(chan struct{}), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -97,10 +105,14 @@ func (m *Manager) execute(ctx context.Context, name string, routine Routine, don
|
|
|
|
|
"error", err, |
|
|
|
|
) |
|
|
|
|
} else { |
|
|
|
|
m.logger.Debug("stopped routine", "routine", name) |
|
|
|
|
m.logger.Info("stopped routine", "routine", name) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Caveat: The returned stoppedCh indicates that the routine is completed
|
|
|
|
|
// It's possible that ctx is canceled, but stoppedCh not yet closed
|
|
|
|
|
// Use mgr.IsRunning(name) than this stoppedCh to tell whether the
|
|
|
|
|
// instance is still running (not cancelled or completed).
|
|
|
|
|
func (m *Manager) Stop(name string) <-chan struct{} { |
|
|
|
|
instance := m.stopInstance(name) |
|
|
|
|
if instance == nil { |
|
|
|
@ -127,7 +139,7 @@ func (m *Manager) stopInstance(name string) *routineTracker {
|
|
|
|
|
return instance |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
m.logger.Debug("stopping routine", "routine", name) |
|
|
|
|
m.logger.Info("stopping routine", "routine", name) |
|
|
|
|
instance.cancel() |
|
|
|
|
|
|
|
|
|
delete(m.routines, name) |
|
|
|
@ -145,7 +157,7 @@ func (m *Manager) StopAll() {
|
|
|
|
|
if !routine.running() { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
m.logger.Debug("stopping routine", "routine", name) |
|
|
|
|
m.logger.Info("stopping routine", "routine", name) |
|
|
|
|
routine.cancel() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|