Integrates Consul with new version of Raft library.

pull/2222/head
James Phillips 2016-07-28 12:11:28 -07:00 committed by James Phillips
parent cc1f709333
commit b32578ccab
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
10 changed files with 258 additions and 181 deletions

View File

@ -223,8 +223,8 @@ func TestACL_NonAuthority_NotFound(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
@ -275,8 +275,8 @@ func TestACL_NonAuthority_Found(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
@ -351,8 +351,8 @@ func TestACL_NonAuthority_Management(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
@ -408,8 +408,8 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
@ -482,8 +482,8 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
@ -558,8 +558,8 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})

View File

@ -310,6 +310,9 @@ func DefaultConfig() *Config {
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
// Enable interoperability with unversioned Raft library
conf.RaftConfig.ProtocolVersion = 0
// Disable shutdown on removal
conf.RaftConfig.ShutdownOnRemove = false

View File

@ -543,10 +543,25 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
}
}
// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
return nil
}
}
// Attempt to add as a peer
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
future := s.raft.AddPeer(addr.String())
if err := future.Error(); err != nil && err != raft.ErrKnownPeer {
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
@ -555,15 +570,30 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
// removeConsulServer is used to try to remove a consul server that has left
func (s *Server) removeConsulServer(m serf.Member, port int) error {
// Attempt to remove as peer
peer := &net.TCPAddr{IP: m.Addr, Port: port}
future := s.raft.RemovePeer(peer.String())
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
peer, err)
// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String()
// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
goto REMOVE
}
}
return nil
REMOVE:
// Attempt to remove as a peer.
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
addr, err)
return err
} else if err == nil {
s.logger.Printf("[INFO] consul: removed server '%s' as peer", m.Name)
}
return nil
}

View File

@ -341,8 +341,8 @@ func TestLeader_LeftServer(t *testing.T) {
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
@ -358,8 +358,8 @@ func TestLeader_LeftServer(t *testing.T) {
}
for _, s := range servers[1:] {
peers, _ := s.raftPeers.Peers()
return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers))
peers, _ := s.numPeers()
return peers == 2, errors.New(fmt.Sprintf("%d", peers))
}
return true, nil
@ -394,8 +394,8 @@ func TestLeader_LeftLeader(t *testing.T) {
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
@ -423,8 +423,8 @@ func TestLeader_LeftLeader(t *testing.T) {
}
remain = s
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers))
peers, _ := s.numPeers()
return peers == 2, errors.New(fmt.Sprintf("%d", peers))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
@ -472,8 +472,8 @@ func TestLeader_MultiBootstrap(t *testing.T) {
// Ensure we don't have multiple raft peers
for _, s := range servers {
peers, _ := s.raftPeers.Peers()
if len(peers) != 1 {
peers, _ := s.numPeers()
if peers != 1 {
t.Fatalf("should only have 1 raft peer!")
}
}
@ -505,8 +505,8 @@ func TestLeader_TombstoneGC_Reset(t *testing.T) {
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft"
)
// RaftLayer implements the raft.StreamLayer interface,
@ -80,8 +81,8 @@ func (l *RaftLayer) Addr() net.Addr {
}
// Dial is used to create a new outgoing connection
func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", address, timeout)
func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", string(address), timeout)
if err != nil {
return nil, err
}

View File

@ -5,6 +5,7 @@ import (
"strings"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
@ -150,7 +151,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
// See if it's configured as part of our DC.
if parts.Datacenter == s.config.Datacenter {
s.localLock.Lock()
s.localConsuls[parts.Addr.String()] = parts
s.localConsuls[raft.ServerAddress(parts.Addr.String())] = parts
s.localLock.Unlock()
}
@ -193,20 +194,20 @@ func (s *Server) wanNodeJoin(me serf.MemberEvent) {
// maybeBootsrap is used to handle bootstrapping when a new consul server joins
func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs, remove our
// expectations of bootstrapping. This is slightly cheaper than the full
// check that BootstrapCluster will do, so this is a good pre-filter.
index, err := s.raftStore.LastIndex()
if err != nil {
s.logger.Printf("[ERR] consul: failed to read last raft index: %v", err)
return
}
// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
s.config.BootstrapExpect = 0
return
}
// Scan for all the known servers
// Scan for all the known servers.
members := s.serfLAN.Members()
addrs := make([]string, 0)
for _, member := range members {
@ -230,18 +231,29 @@ func (s *Server) maybeBootstrap() {
addrs = append(addrs, addr.String())
}
// Skip if we haven't met the minimum expect count
// Skip if we haven't met the minimum expect count.
if len(addrs) < s.config.BootstrapExpect {
return
}
// Update the peer set
s.logger.Printf("[INFO] consul: Attempting bootstrap with nodes: %v", addrs)
if err := s.raft.SetPeers(addrs).Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to bootstrap peers: %v", err)
// Attempt a live bootstrap!
s.logger.Printf("[INFO] consul: found expected number of peers, attempting to bootstrap cluster...")
var configuration raft.Configuration
for _, addr := range addrs {
// TODO (slackpad) - This will need to be updated once we support
// node IDs.
server := raft.Server{
ID: raft.ServerID(addr),
Address: raft.ServerAddress(addr),
}
configuration.Servers = append(configuration.Servers, server)
}
future := s.raft.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to bootstrap cluster: %v", err)
}
// Bootstrapping complete, don't enter this again
// Bootstrapping complete, don't enter this again.
s.config.BootstrapExpect = 0
}
@ -255,7 +267,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
s.logger.Printf("[INFO] consul: removing LAN server %s", parts)
s.localLock.Lock()
delete(s.localConsuls, parts.Addr.String())
delete(s.localConsuls, raft.ServerAddress(parts.Addr.String()))
s.localLock.Unlock()
}
}

View File

@ -11,7 +11,6 @@ import (
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"
@ -94,12 +93,9 @@ type Server struct {
// strong consistency.
fsm *consulFSM
// Have we attempted to leave the cluster
left bool
// localConsuls is used to track the known consuls
// in the local datacenter. Used to do leader forwarding.
localConsuls map[string]*agent.Server
localConsuls map[raft.ServerAddress]*agent.Server
localLock sync.RWMutex
// Logger uses the provided LogOutput
@ -109,7 +105,6 @@ type Server struct {
// DC to protect operations that require strong consistency
raft *raft.Raft
raftLayer *RaftLayer
raftPeers raft.PeerStore
raftStore *raftboltdb.BoltStore
raftTransport *raft.NetworkTransport
raftInmem *raft.InmemStore
@ -219,7 +214,7 @@ func NewServer(config *Config) (*Server, error) {
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[string]*agent.Server),
localConsuls: make(map[raft.ServerAddress]*agent.Server),
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*agent.Server, 4),
@ -332,41 +327,43 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
if s.config.Bootstrap || s.config.DevMode {
s.config.RaftConfig.EnableSingleNode = true
}
// If we have an unclean exit then attempt to close the Raft store.
defer func() {
if s.raft == nil && s.raftStore != nil {
if err := s.raftStore.Close(); err != nil {
s.logger.Printf("[ERR] consul: failed to close Raft store: %v", err)
}
}
}()
// Create the FSM
// Create the FSM.
var err error
s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput)
if err != nil {
return err
}
// Create a transport layer
// Create a transport layer.
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
s.raftTransport = trans
var log raft.LogStore
var stable raft.StableStore
var snap raft.SnapshotStore
if s.config.DevMode {
store := raft.NewInmemStore()
s.raftInmem = store
stable = store
log = store
snap = raft.NewDiscardSnapshotStore()
s.raftPeers = &raft.StaticPeers{}
} else {
// Create the base raft path
// Create the base raft path.
path := filepath.Join(s.config.DataDir, raftState)
if err := ensurePath(path, true); err != nil {
return err
}
// Create the backend raft store for logs and stable storage
// Create the backend raft store for logs and stable storage.
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return err
@ -374,55 +371,77 @@ func (s *Server) setupRaft() error {
s.raftStore = store
stable = store
// Wrap the store in a LogCache to improve performance
// Wrap the store in a LogCache to improve performance.
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
if err != nil {
store.Close()
return err
}
log = cacheStore
// Create the snapshot store
// Create the snapshot store.
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
if err != nil {
store.Close()
return err
}
snap = snapshots
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
// If we see a peers.json file, attempt recovery based on it.
recovery, err := raft.NewPeersJSONRecovery(path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
if recovery != nil {
s.logger.Printf("[INFO] consul: found peers.json file, recovering Raft configuration...")
tmpFsm, err := NewFSM(s.tombstoneGC, s.config.LogOutput)
if err != nil {
return fmt.Errorf("recovery failed to make temp FSM: %v", err)
}
if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm,
log, stable, snap, recovery.Configuration); err != nil {
return fmt.Errorf("recovery failed: %v", err)
}
if err := recovery.Disarm(); err != nil {
return fmt.Errorf("recovery failed to delete peers.json, please delete manually: %v", err)
}
s.logger.Printf("[INFO] consul: deleted peers.json file after successful recovery")
}
}
// Ensure local host is always included if we are in bootstrap mode
if s.config.Bootstrap {
peerAddrs, err := s.raftPeers.Peers()
// If we are in bootstrap or dev mode and the state is clean then we can
// bootstrap now.
if s.config.Bootstrap || s.config.DevMode {
hasState, err := raft.HasExistingState(log, stable, snap)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
return err
}
if !raft.PeerContained(peerAddrs, trans.LocalAddr()) {
s.raftPeers.SetPeers(raft.AddUniquePeer(peerAddrs, trans.LocalAddr()))
if !hasState {
// TODO (slackpad) - This will need to be updated when
// we add support for node IDs.
configuration := raft.Configuration{
Servers: []raft.Server{
raft.Server{
ID: raft.ServerID(trans.LocalAddr()),
Address: trans.LocalAddr(),
},
},
}
if err := raft.BootstrapCluster(s.config.RaftConfig,
log, stable, snap, trans, configuration); err != nil {
return err
}
}
}
// Make sure we set the LogOutput
// Make sure we set the LogOutput.
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
snap, s.raftPeers, trans)
// Setup the Raft store.
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
trans.Close()
return err
}
// Start monitoring leadership
// Start monitoring leadership.
go s.monitorLeadership()
return nil
}
@ -516,11 +535,11 @@ func (s *Server) Shutdown() error {
s.raftStore.Close()
}
// Clear the peer set on a graceful leave to avoid
// triggering elections on a rejoin.
if s.left {
s.raftPeers.SetPeers(nil)
}
// TODO (slackpad) - We used to nerf the Raft configuration here
// if a leave had been done in order to prevent this from joining
// next time if we couldn't be removed after a leave. We wont't
// always get a confirmation from Raft (see comment in Leave). Are
// we losing anything by not doing this?
}
if s.rpcListener != nil {
@ -536,23 +555,26 @@ func (s *Server) Shutdown() error {
// Leave is used to prepare for a graceful shutdown of the server
func (s *Server) Leave() error {
s.logger.Printf("[INFO] consul: server starting leave")
s.left = true
// Check the number of known peers
numPeers, err := s.numOtherPeers()
numPeers, err := s.numPeers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
return err
}
// TODO (slackpad) - This will need to be updated once we support node
// IDs.
addr := s.raftTransport.LocalAddr()
// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
// not the leader, then we should issue our leave intention and wait to be removed
// for some sane period of time.
isLeader := s.IsLeader()
if isLeader && numPeers > 0 {
future := s.raft.RemovePeer(s.raftTransport.LocalAddr())
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
if isLeader && numPeers > 1 {
future := s.raft.RemovePeer(addr)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove ourself as raft peer: %v", err)
}
}
@ -571,44 +593,54 @@ func (s *Server) Leave() error {
}
}
// If we were not leader, wait to be safely removed from the cluster.
// We must wait to allow the raft replication to take place, otherwise
// an immediate shutdown could cause a loss of quorum.
// If we were not leader, wait to be safely removed from the cluster. We
// must wait to allow the raft replication to take place, otherwise an
// immediate shutdown could cause a loss of quorum.
if !isLeader {
left := false
limit := time.Now().Add(raftRemoveGracePeriod)
for numPeers > 0 && time.Now().Before(limit) {
// Update the number of peers
numPeers, err = s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
break
}
// Avoid the sleep if we are done
if numPeers == 0 {
break
}
// Sleep a while and check again
for !left && time.Now().Before(limit) {
// Sleep a while before we check.
time.Sleep(50 * time.Millisecond)
// Get the latest configuration.
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
break
}
// See if we are no longer included.
left = true
for _, server := range future.Configuration().Servers {
if server.Address == addr {
left = false
break
}
}
}
if numPeers != 0 {
s.logger.Printf("[WARN] consul: failed to leave raft peer set gracefully, timeout")
// TODO (slackpad) When we take a later new version of the Raft
// library it won't try to complete replication, so this peer
// may not realize that it has been removed. Need to revisit this
// and the warning here.
if !left {
s.logger.Printf("[WARN] consul: failed to leave raft configuration gracefully, timeout")
}
}
return nil
}
// numOtherPeers is used to check on the number of known peers
// excluding the local node
func (s *Server) numOtherPeers() (int, error) {
peers, err := s.raftPeers.Peers()
if err != nil {
// numPeers is used to check on the number of known peers, including the local
// node.
func (s *Server) numPeers() (int, error) {
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return 0, err
}
otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr())
return len(otherPeers), nil
configuration := future.Configuration()
return len(configuration.Servers), nil
}
// JoinLAN is used to have Consul join the inner-DC pool
@ -738,7 +770,7 @@ func (s *Server) Stats() map[string]map[string]string {
"consul": map[string]string{
"server": "true",
"leader": fmt.Sprintf("%v", s.IsLeader()),
"leader_addr": s.raft.Leader(),
"leader_addr": string(s.raft.Leader()),
"bootstrap": fmt.Sprintf("%v", s.config.Bootstrap),
"known_datacenters": toString(uint64(numKnownDCs)),
},
@ -747,11 +779,6 @@ func (s *Server) Stats() map[string]map[string]string {
"serf_wan": s.serfWAN.Stats(),
"runtime": runtimeStats(),
}
if peers, err := s.raftPeers.Peers(); err == nil {
stats["raft"]["raft_peers"] = strings.Join(peers, ",")
} else {
s.logger.Printf("[DEBUG] server: error getting raft peers: %v", err)
}
return stats
}

View File

@ -336,19 +336,19 @@ func TestServer_LeaveLeader(t *testing.T) {
t.Fatalf("err: %v", err)
}
var p1 []string
var p2 []string
var p1 int
var p2 int
testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ = s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p2, _ = s2.raftPeers.Peers()
return len(p2) == 2, errors.New(fmt.Sprintf("%v", p1))
p2, _ = s2.numPeers()
return p2 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
@ -366,8 +366,8 @@ func TestServer_LeaveLeader(t *testing.T) {
// Should lose a peer
for _, s := range []*Server{s1, s2} {
testutil.WaitForResult(func() (bool, error) {
p1, _ = s.raftPeers.Peers()
return len(p1) == 1, nil
p1, _ = s.numPeers()
return p1 == 1, nil
}, func(err error) {
t.Fatalf("should have 1 peer: %v", p1)
})
@ -391,19 +391,19 @@ func TestServer_Leave(t *testing.T) {
t.Fatalf("err: %v", err)
}
var p1 []string
var p2 []string
var p1 int
var p2 int
testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ = s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p2, _ = s2.raftPeers.Peers()
return len(p2) == 2, errors.New(fmt.Sprintf("%v", p1))
p2, _ = s2.numPeers()
return p2 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
@ -421,8 +421,8 @@ func TestServer_Leave(t *testing.T) {
// Should lose a peer
for _, s := range []*Server{s1, s2} {
testutil.WaitForResult(func() (bool, error) {
p1, _ = s.raftPeers.Peers()
return len(p1) == 1, nil
p1, _ = s.numPeers()
return p1 == 1, nil
}, func(err error) {
t.Fatalf("should have 1 peer: %v", p1)
})
@ -486,13 +486,15 @@ func TestServer_JoinLAN_TLS(t *testing.T) {
// Verify Raft has established a peer
testutil.WaitForResult(func() (bool, error) {
return s1.Stats()["raft"]["num_peers"] == "1", nil
peers, _ := s1.numPeers()
return peers == 2, nil
}, func(err error) {
t.Fatalf("no peer established")
})
testutil.WaitForResult(func() (bool, error) {
return s2.Stats()["raft"]["num_peers"] == "1", nil
peers, _ := s2.numPeers()
return peers == 2, nil
}, func(err error) {
t.Fatalf("no peer established")
})
@ -519,20 +521,20 @@ func TestServer_Expect(t *testing.T) {
t.Fatalf("err: %v", err)
}
var p1 []string
var p2 []string
var p1 int
var p2 int
// should have no peers yet
testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 0, errors.New(fmt.Sprintf("%v", p1))
p1, _ = s1.numPeers()
return p1 == 0, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 0 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p2, _ = s2.raftPeers.Peers()
return len(p2) == 0, errors.New(fmt.Sprintf("%v", p2))
p2, _ = s2.numPeers()
return p2 == 0, errors.New(fmt.Sprintf("%d", p2))
}, func(err error) {
t.Fatalf("should have 0 peers: %v", err)
})
@ -542,26 +544,26 @@ func TestServer_Expect(t *testing.T) {
t.Fatalf("err: %v", err)
}
var p3 []string
var p3 int
// should now have all three peers
testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 3, errors.New(fmt.Sprintf("%v", p1))
p1, _ = s1.numPeers()
return p1 == 3, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 3 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p2, _ = s2.raftPeers.Peers()
return len(p2) == 3, errors.New(fmt.Sprintf("%v", p2))
p2, _ = s2.numPeers()
return p2 == 3, errors.New(fmt.Sprintf("%d", p2))
}, func(err error) {
t.Fatalf("should have 3 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p3, _ = s3.raftPeers.Peers()
return len(p3) == 3, errors.New(fmt.Sprintf("%v", p3))
p3, _ = s3.numPeers()
return p3 == 3, errors.New(fmt.Sprintf("%d", p3))
}, func(err error) {
t.Fatalf("should have 3 peers: %v", err)
})
@ -593,20 +595,20 @@ func TestServer_BadExpect(t *testing.T) {
t.Fatalf("err: %v", err)
}
var p1 []string
var p2 []string
var p1 int
var p2 int
// should have no peers yet
testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 0, errors.New(fmt.Sprintf("%v", p1))
p1, _ = s1.numPeers()
return p1 == 0, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 0 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p2, _ = s2.raftPeers.Peers()
return len(p2) == 0, errors.New(fmt.Sprintf("%v", p2))
p2, _ = s2.numPeers()
return p2 == 0, errors.New(fmt.Sprintf("%d", p2))
}, func(err error) {
t.Fatalf("should have 0 peers: %v", err)
})
@ -616,26 +618,26 @@ func TestServer_BadExpect(t *testing.T) {
t.Fatalf("err: %v", err)
}
var p3 []string
var p3 int
// should still have no peers (because s2 is in expect=2 mode)
testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 0, errors.New(fmt.Sprintf("%v", p1))
p1, _ = s1.numPeers()
return p1 == 0, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 0 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p2, _ = s2.raftPeers.Peers()
return len(p2) == 0, errors.New(fmt.Sprintf("%v", p2))
p2, _ = s2.numPeers()
return p2 == 0, errors.New(fmt.Sprintf("%d", p2))
}, func(err error) {
t.Fatalf("should have 0 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p3, _ = s3.raftPeers.Peers()
return len(p3) == 0, errors.New(fmt.Sprintf("%v", p3))
p3, _ = s3.numPeers()
return p3 == 0, errors.New(fmt.Sprintf("%d", p3))
}, func(err error) {
t.Fatalf("should have 0 peers: %v", err)
})

View File

@ -297,8 +297,8 @@ func TestServer_SessionTTL_Failover(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
peers, _ := s1.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s1.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})

View File

@ -12,7 +12,7 @@ func (s *Status) Ping(args struct{}, reply *struct{}) error {
// Leader is used to get the address of the leader
func (s *Status) Leader(args struct{}, reply *string) error {
leader := s.server.raft.Leader()
leader := string(s.server.raft.Leader())
if leader != "" {
*reply = leader
} else {
@ -23,11 +23,13 @@ func (s *Status) Leader(args struct{}, reply *string) error {
// Peers is used to get all the Raft peers
func (s *Status) Peers(args struct{}, reply *[]string) error {
peers, err := s.server.raftPeers.Peers()
if err != nil {
future := s.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
*reply = peers
for _, server := range future.Configuration().Servers {
*reply = append(*reply, string(server.Address))
}
return nil
}