Browse Source

Fixes handling of stop channel and failed barrier attempts. (#3546)

* Fixes handling of stop channel and failed barrier attempts.

There were two issues here. First, we needed to not exit when there
was a timeout trying to write the barrier, because Raft might not
step down, so we'd be left as the leader but having run all the step
down actions.

Second, we didn't close over the stopCh correctly, so it was possible
to nil that out and have the leaderLoop never exit. We close over it
properly AND sequence the nil-ing of it AFTER the leaderLoop exits for
good measure, so the code is more robust.

Fixes #3545

* Cleans up based on code review feedback.

* Tweaks comments.

* Renames variables and removes comments.
pull/3412/head
James Phillips 7 years ago committed by GitHub
parent
commit
4dab70cb93
  1. 51
      agent/consul/leader.go

51
agent/consul/leader.go

@ -33,25 +33,39 @@ func (s *Server) monitorLeadership() {
// cleanup and to ensure we never run multiple leader loops. // cleanup and to ensure we never run multiple leader loops.
raftNotifyCh := s.raftNotifyCh raftNotifyCh := s.raftNotifyCh
var wg sync.WaitGroup var weAreLeaderCh chan struct{}
var stopCh chan struct{} var leaderLoop sync.WaitGroup
for { for {
select { select {
case isLeader := <-raftNotifyCh: case isLeader := <-raftNotifyCh:
if isLeader { switch {
stopCh = make(chan struct{}) case isLeader:
wg.Add(1) if weAreLeaderCh != nil {
go func() { s.logger.Printf("[ERR] consul: attempted to start the leader loop while running")
s.leaderLoop(stopCh) continue
wg.Done() }
}()
weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Printf("[INFO] consul: cluster leadership acquired") s.logger.Printf("[INFO] consul: cluster leadership acquired")
} else if stopCh != nil {
close(stopCh) default:
stopCh = nil if weAreLeaderCh == nil {
wg.Wait() s.logger.Printf("[ERR] consul: attempted to stop the leader loop while not running")
continue
}
s.logger.Printf("[DEBUG] consul: shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Printf("[INFO] consul: cluster leadership lost") s.logger.Printf("[INFO] consul: cluster leadership lost")
} }
case <-s.shutdownCh: case <-s.shutdownCh:
return return
} }
@ -97,7 +111,7 @@ RECONCILE:
barrier := s.raft.Barrier(barrierWriteTimeout) barrier := s.raft.Barrier(barrierWriteTimeout)
if err := barrier.Error(); err != nil { if err := barrier.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err) s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err)
return goto WAIT
} }
metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start) metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start)
metrics.MeasureSince([]string{"leader", "barrier"}, start) metrics.MeasureSince([]string{"leader", "barrier"}, start)
@ -127,6 +141,15 @@ RECONCILE:
reconcileCh = s.reconcileCh reconcileCh = s.reconcileCh
WAIT: WAIT:
// Poll the stop channel to give it priority so we don't waste time
// trying to perform the other operations if we have been asked to shut
// down.
select {
case <-stopCh:
return
default:
}
// Periodically reconcile as long as we are the leader, // Periodically reconcile as long as we are the leader,
// or when Serf events arrive // or when Serf events arrive
for { for {

Loading…
Cancel
Save