mirror of https://github.com/hashicorp/consul
Merge pull request #2911 from hashicorp/serial-leader-loop
Switches to reliable Raft leader notifications.pull/2912/head
commit
c59f8fba5e
|
@ -5,6 +5,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
@ -29,18 +30,29 @@ const (
|
||||||
// as the leader in the Raft cluster. There is some work the leader is
|
// as the leader in the Raft cluster. There is some work the leader is
|
||||||
// expected to do, so we must react to changes
|
// expected to do, so we must react to changes
|
||||||
func (s *Server) monitorLeadership() {
|
func (s *Server) monitorLeadership() {
|
||||||
leaderCh := s.raft.LeaderCh()
|
// We use the notify channel we configured Raft with, NOT Raft's
|
||||||
|
// leaderCh, which is only notified best-effort. Doing this ensures
|
||||||
|
// that we get all notifications in order, which is required for
|
||||||
|
// cleanup and to ensure we never run multiple leader loops.
|
||||||
|
leaderCh := s.leaderCh
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
var stopCh chan struct{}
|
var stopCh chan struct{}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case isLeader := <-leaderCh:
|
case isLeader := <-leaderCh:
|
||||||
if isLeader {
|
if isLeader {
|
||||||
stopCh = make(chan struct{})
|
stopCh = make(chan struct{})
|
||||||
go s.leaderLoop(stopCh)
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
s.leaderLoop(stopCh)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
s.logger.Printf("[INFO] consul: cluster leadership acquired")
|
s.logger.Printf("[INFO] consul: cluster leadership acquired")
|
||||||
} else if stopCh != nil {
|
} else if stopCh != nil {
|
||||||
close(stopCh)
|
close(stopCh)
|
||||||
stopCh = nil
|
stopCh = nil
|
||||||
|
wg.Wait()
|
||||||
s.logger.Printf("[INFO] consul: cluster leadership lost")
|
s.logger.Printf("[INFO] consul: cluster leadership lost")
|
||||||
}
|
}
|
||||||
case <-s.shutdownCh:
|
case <-s.shutdownCh:
|
||||||
|
|
|
@ -132,6 +132,10 @@ type Server struct {
|
||||||
raftTransport *raft.NetworkTransport
|
raftTransport *raft.NetworkTransport
|
||||||
raftInmem *raft.InmemStore
|
raftInmem *raft.InmemStore
|
||||||
|
|
||||||
|
// leaderCh set up by setupRaft() and ensures that we get reliable leader
|
||||||
|
// transition notifications from the Raft layer.
|
||||||
|
leaderCh <-chan bool
|
||||||
|
|
||||||
// reconcileCh is used to pass events from the serf handler
|
// reconcileCh is used to pass events from the serf handler
|
||||||
// into the leader manager, so that the strong state can be
|
// into the leader manager, so that the strong state can be
|
||||||
// updated
|
// updated
|
||||||
|
@ -554,6 +558,11 @@ func (s *Server) setupRaft() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set up a channel for reliable leader notifications.
|
||||||
|
leaderCh := make(chan bool, 1)
|
||||||
|
s.config.RaftConfig.NotifyCh = leaderCh
|
||||||
|
s.leaderCh = leaderCh
|
||||||
|
|
||||||
// Setup the Raft store.
|
// Setup the Raft store.
|
||||||
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
|
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue