diff --git a/consul/acl_test.go b/consul/acl_test.go index c738d7e3fe..1a74564736 100644 --- a/consul/acl_test.go +++ b/consul/acl_test.go @@ -223,8 +223,8 @@ func TestACL_NonAuthority_NotFound(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - p1, _ := s1.raftPeers.Peers() - return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + p1, _ := s1.numPeers() + return p1 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) @@ -275,8 +275,8 @@ func TestACL_NonAuthority_Found(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - p1, _ := s1.raftPeers.Peers() - return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + p1, _ := s1.numPeers() + return p1 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) @@ -351,8 +351,8 @@ func TestACL_NonAuthority_Management(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - p1, _ := s1.raftPeers.Peers() - return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + p1, _ := s1.numPeers() + return p1 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) @@ -408,8 +408,8 @@ func TestACL_DownPolicy_Deny(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - p1, _ := s1.raftPeers.Peers() - return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + p1, _ := s1.numPeers() + return p1 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) @@ -482,8 +482,8 @@ func TestACL_DownPolicy_Allow(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - p1, _ := s1.raftPeers.Peers() - return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + p1, _ := s1.numPeers() + return p1 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) @@ -558,8 +558,8 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - p1, _ := s1.raftPeers.Peers() - return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + p1, _ := s1.numPeers() + return p1 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) diff --git a/consul/config.go b/consul/config.go index 8e252b6351..5168ab573c 100644 --- a/consul/config.go +++ b/consul/config.go @@ -310,6 +310,9 @@ func DefaultConfig() *Config { conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort + // Enable interoperability with unversioned Raft library + conf.RaftConfig.ProtocolVersion = 0 + // Disable shutdown on removal conf.RaftConfig.ShutdownOnRemove = false diff --git a/consul/leader.go b/consul/leader.go index 790c3c330a..0d4b8c4f42 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -543,10 +543,25 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { } } + // TODO (slackpad) - This will need to be changed once we support node IDs. + addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String() + + // See if it's already in the configuration. It's harmless to re-add it + // but we want to avoid doing that if possible. + configFuture := s.raft.GetConfiguration() + if err := configFuture.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) + return err + } + for _, server := range configFuture.Configuration().Servers { + if server.Address == raft.ServerAddress(addr) { + return nil + } + } + // Attempt to add as a peer - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} - future := s.raft.AddPeer(addr.String()) - if err := future.Error(); err != nil && err != raft.ErrKnownPeer { + addFuture := s.raft.AddPeer(raft.ServerAddress(addr)) + if err := addFuture.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) return err } @@ -555,15 +570,30 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { // removeConsulServer is used to try to remove a consul server that has left func (s *Server) removeConsulServer(m serf.Member, port int) error { - // Attempt to remove as peer - peer := &net.TCPAddr{IP: m.Addr, Port: port} - future := s.raft.RemovePeer(peer.String()) - if err := future.Error(); err != nil && err != raft.ErrUnknownPeer { - s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", - peer, err) + // TODO (slackpad) - This will need to be changed once we support node IDs. + addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String() + + // See if it's already in the configuration. It's harmless to re-remove it + // but we want to avoid doing that if possible. + configFuture := s.raft.GetConfiguration() + if err := configFuture.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) + return err + } + for _, server := range configFuture.Configuration().Servers { + if server.Address == raft.ServerAddress(addr) { + goto REMOVE + } + } + return nil + +REMOVE: + // Attempt to remove as a peer. + future := s.raft.RemovePeer(raft.ServerAddress(addr)) + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", + addr, err) return err - } else if err == nil { - s.logger.Printf("[INFO] consul: removed server '%s' as peer", m.Name) } return nil } diff --git a/consul/leader_test.go b/consul/leader_test.go index 900f39617d..6e0f6d5f33 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -341,8 +341,8 @@ func TestLeader_LeftServer(t *testing.T) { for _, s := range servers { testutil.WaitForResult(func() (bool, error) { - peers, _ := s.raftPeers.Peers() - return len(peers) == 3, nil + peers, _ := s.numPeers() + return peers == 3, nil }, func(err error) { t.Fatalf("should have 3 peers") }) @@ -358,8 +358,8 @@ func TestLeader_LeftServer(t *testing.T) { } for _, s := range servers[1:] { - peers, _ := s.raftPeers.Peers() - return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers)) + peers, _ := s.numPeers() + return peers == 2, errors.New(fmt.Sprintf("%d", peers)) } return true, nil @@ -394,8 +394,8 @@ func TestLeader_LeftLeader(t *testing.T) { for _, s := range servers { testutil.WaitForResult(func() (bool, error) { - peers, _ := s.raftPeers.Peers() - return len(peers) == 3, nil + peers, _ := s.numPeers() + return peers == 3, nil }, func(err error) { t.Fatalf("should have 3 peers") }) @@ -423,8 +423,8 @@ func TestLeader_LeftLeader(t *testing.T) { } remain = s testutil.WaitForResult(func() (bool, error) { - peers, _ := s.raftPeers.Peers() - return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers)) + peers, _ := s.numPeers() + return peers == 2, errors.New(fmt.Sprintf("%d", peers)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) @@ -472,8 +472,8 @@ func TestLeader_MultiBootstrap(t *testing.T) { // Ensure we don't have multiple raft peers for _, s := range servers { - peers, _ := s.raftPeers.Peers() - if len(peers) != 1 { + peers, _ := s.numPeers() + if peers != 1 { t.Fatalf("should only have 1 raft peer!") } } @@ -505,8 +505,8 @@ func TestLeader_TombstoneGC_Reset(t *testing.T) { for _, s := range servers { testutil.WaitForResult(func() (bool, error) { - peers, _ := s.raftPeers.Peers() - return len(peers) == 3, nil + peers, _ := s.numPeers() + return peers == 3, nil }, func(err error) { t.Fatalf("should have 3 peers") }) diff --git a/consul/raft_rpc.go b/consul/raft_rpc.go index 545895e195..2c2a6f82ce 100644 --- a/consul/raft_rpc.go +++ b/consul/raft_rpc.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/consul/tlsutil" + "github.com/hashicorp/raft" ) // RaftLayer implements the raft.StreamLayer interface, @@ -80,8 +81,8 @@ func (l *RaftLayer) Addr() net.Addr { } // Dial is used to create a new outgoing connection -func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) { - conn, err := net.DialTimeout("tcp", address, timeout) +func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) { + conn, err := net.DialTimeout("tcp", string(address), timeout) if err != nil { return nil, err } diff --git a/consul/serf.go b/consul/serf.go index 1fc4894001..a94efcc270 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" ) @@ -150,7 +151,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) { // See if it's configured as part of our DC. if parts.Datacenter == s.config.Datacenter { s.localLock.Lock() - s.localConsuls[parts.Addr.String()] = parts + s.localConsuls[raft.ServerAddress(parts.Addr.String())] = parts s.localLock.Unlock() } @@ -193,20 +194,20 @@ func (s *Server) wanNodeJoin(me serf.MemberEvent) { // maybeBootsrap is used to handle bootstrapping when a new consul server joins func (s *Server) maybeBootstrap() { + // Bootstrap can only be done if there are no committed logs, remove our + // expectations of bootstrapping. This is slightly cheaper than the full + // check that BootstrapCluster will do, so this is a good pre-filter. index, err := s.raftStore.LastIndex() if err != nil { s.logger.Printf("[ERR] consul: failed to read last raft index: %v", err) return } - - // Bootstrap can only be done if there are no committed logs, - // remove our expectations of bootstrapping if index != 0 { s.config.BootstrapExpect = 0 return } - // Scan for all the known servers + // Scan for all the known servers. members := s.serfLAN.Members() addrs := make([]string, 0) for _, member := range members { @@ -230,18 +231,29 @@ func (s *Server) maybeBootstrap() { addrs = append(addrs, addr.String()) } - // Skip if we haven't met the minimum expect count + // Skip if we haven't met the minimum expect count. if len(addrs) < s.config.BootstrapExpect { return } - // Update the peer set - s.logger.Printf("[INFO] consul: Attempting bootstrap with nodes: %v", addrs) - if err := s.raft.SetPeers(addrs).Error(); err != nil { - s.logger.Printf("[ERR] consul: failed to bootstrap peers: %v", err) + // Attempt a live bootstrap! + s.logger.Printf("[INFO] consul: found expected number of peers, attempting to bootstrap cluster...") + var configuration raft.Configuration + for _, addr := range addrs { + // TODO (slackpad) - This will need to be updated once we support + // node IDs. + server := raft.Server{ + ID: raft.ServerID(addr), + Address: raft.ServerAddress(addr), + } + configuration.Servers = append(configuration.Servers, server) + } + future := s.raft.BootstrapCluster(configuration) + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to bootstrap cluster: %v", err) } - // Bootstrapping complete, don't enter this again + // Bootstrapping complete, don't enter this again. s.config.BootstrapExpect = 0 } @@ -255,7 +267,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) { s.logger.Printf("[INFO] consul: removing LAN server %s", parts) s.localLock.Lock() - delete(s.localConsuls, parts.Addr.String()) + delete(s.localConsuls, raft.ServerAddress(parts.Addr.String())) s.localLock.Unlock() } } diff --git a/consul/server.go b/consul/server.go index 4138e2cc10..c9b3c52204 100644 --- a/consul/server.go +++ b/consul/server.go @@ -11,7 +11,6 @@ import ( "path/filepath" "reflect" "strconv" - "strings" "sync" "time" @@ -94,12 +93,9 @@ type Server struct { // strong consistency. fsm *consulFSM - // Have we attempted to leave the cluster - left bool - // localConsuls is used to track the known consuls // in the local datacenter. Used to do leader forwarding. - localConsuls map[string]*agent.Server + localConsuls map[raft.ServerAddress]*agent.Server localLock sync.RWMutex // Logger uses the provided LogOutput @@ -109,7 +105,6 @@ type Server struct { // DC to protect operations that require strong consistency raft *raft.Raft raftLayer *RaftLayer - raftPeers raft.PeerStore raftStore *raftboltdb.BoltStore raftTransport *raft.NetworkTransport raftInmem *raft.InmemStore @@ -219,7 +214,7 @@ func NewServer(config *Config) (*Server, error) { connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), - localConsuls: make(map[string]*agent.Server), + localConsuls: make(map[raft.ServerAddress]*agent.Server), logger: logger, reconcileCh: make(chan serf.Member, 32), remoteConsuls: make(map[string][]*agent.Server, 4), @@ -332,41 +327,43 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w // setupRaft is used to setup and initialize Raft func (s *Server) setupRaft() error { - // If we are in bootstrap mode, enable a single node cluster - if s.config.Bootstrap || s.config.DevMode { - s.config.RaftConfig.EnableSingleNode = true - } + // If we have an unclean exit then attempt to close the Raft store. + defer func() { + if s.raft == nil && s.raftStore != nil { + if err := s.raftStore.Close(); err != nil { + s.logger.Printf("[ERR] consul: failed to close Raft store: %v", err) + } + } + }() - // Create the FSM + // Create the FSM. var err error s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput) if err != nil { return err } - // Create a transport layer + // Create a transport layer. trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput) s.raftTransport = trans var log raft.LogStore var stable raft.StableStore var snap raft.SnapshotStore - if s.config.DevMode { store := raft.NewInmemStore() s.raftInmem = store stable = store log = store snap = raft.NewDiscardSnapshotStore() - s.raftPeers = &raft.StaticPeers{} } else { - // Create the base raft path + // Create the base raft path. path := filepath.Join(s.config.DataDir, raftState) if err := ensurePath(path, true); err != nil { return err } - // Create the backend raft store for logs and stable storage + // Create the backend raft store for logs and stable storage. store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db")) if err != nil { return err @@ -374,55 +371,77 @@ func (s *Server) setupRaft() error { s.raftStore = store stable = store - // Wrap the store in a LogCache to improve performance + // Wrap the store in a LogCache to improve performance. cacheStore, err := raft.NewLogCache(raftLogCacheSize, store) if err != nil { - store.Close() return err } log = cacheStore - // Create the snapshot store + // Create the snapshot store. snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput) if err != nil { - store.Close() return err } snap = snapshots - // Setup the peer store - s.raftPeers = raft.NewJSONPeers(path, trans) + // If we see a peers.json file, attempt recovery based on it. + recovery, err := raft.NewPeersJSONRecovery(path) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("recovery failed to parse peers.json: %v", err) + } + if recovery != nil { + s.logger.Printf("[INFO] consul: found peers.json file, recovering Raft configuration...") + tmpFsm, err := NewFSM(s.tombstoneGC, s.config.LogOutput) + if err != nil { + return fmt.Errorf("recovery failed to make temp FSM: %v", err) + } + if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm, + log, stable, snap, recovery.Configuration); err != nil { + return fmt.Errorf("recovery failed: %v", err) + } + if err := recovery.Disarm(); err != nil { + return fmt.Errorf("recovery failed to delete peers.json, please delete manually: %v", err) + } + s.logger.Printf("[INFO] consul: deleted peers.json file after successful recovery") + } } - // Ensure local host is always included if we are in bootstrap mode - if s.config.Bootstrap { - peerAddrs, err := s.raftPeers.Peers() + // If we are in bootstrap or dev mode and the state is clean then we can + // bootstrap now. + if s.config.Bootstrap || s.config.DevMode { + hasState, err := raft.HasExistingState(log, stable, snap) if err != nil { - if s.raftStore != nil { - s.raftStore.Close() - } return err } - if !raft.PeerContained(peerAddrs, trans.LocalAddr()) { - s.raftPeers.SetPeers(raft.AddUniquePeer(peerAddrs, trans.LocalAddr())) + if !hasState { + // TODO (slackpad) - This will need to be updated when + // we add support for node IDs. + configuration := raft.Configuration{ + Servers: []raft.Server{ + raft.Server{ + ID: raft.ServerID(trans.LocalAddr()), + Address: trans.LocalAddr(), + }, + }, + } + if err := raft.BootstrapCluster(s.config.RaftConfig, + log, stable, snap, trans, configuration); err != nil { + return err + } } } - // Make sure we set the LogOutput + // Make sure we set the LogOutput. s.config.RaftConfig.LogOutput = s.config.LogOutput - // Setup the Raft store - s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, - snap, s.raftPeers, trans) + // Setup the Raft store. + s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans) if err != nil { - if s.raftStore != nil { - s.raftStore.Close() - } - trans.Close() return err } - // Start monitoring leadership + // Start monitoring leadership. go s.monitorLeadership() return nil } @@ -516,11 +535,11 @@ func (s *Server) Shutdown() error { s.raftStore.Close() } - // Clear the peer set on a graceful leave to avoid - // triggering elections on a rejoin. - if s.left { - s.raftPeers.SetPeers(nil) - } + // TODO (slackpad) - We used to nerf the Raft configuration here + // if a leave had been done in order to prevent this from joining + // next time if we couldn't be removed after a leave. We wont't + // always get a confirmation from Raft (see comment in Leave). Are + // we losing anything by not doing this? } if s.rpcListener != nil { @@ -536,23 +555,26 @@ func (s *Server) Shutdown() error { // Leave is used to prepare for a graceful shutdown of the server func (s *Server) Leave() error { s.logger.Printf("[INFO] consul: server starting leave") - s.left = true // Check the number of known peers - numPeers, err := s.numOtherPeers() + numPeers, err := s.numPeers() if err != nil { s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err) return err } + // TODO (slackpad) - This will need to be updated once we support node + // IDs. + addr := s.raftTransport.LocalAddr() + // If we are the current leader, and we have any other peers (cluster has multiple // servers), we should do a RemovePeer to safely reduce the quorum size. If we are // not the leader, then we should issue our leave intention and wait to be removed // for some sane period of time. isLeader := s.IsLeader() - if isLeader && numPeers > 0 { - future := s.raft.RemovePeer(s.raftTransport.LocalAddr()) - if err := future.Error(); err != nil && err != raft.ErrUnknownPeer { + if isLeader && numPeers > 1 { + future := s.raft.RemovePeer(addr) + if err := future.Error(); err != nil { s.logger.Printf("[ERR] consul: failed to remove ourself as raft peer: %v", err) } } @@ -571,44 +593,54 @@ func (s *Server) Leave() error { } } - // If we were not leader, wait to be safely removed from the cluster. - // We must wait to allow the raft replication to take place, otherwise - // an immediate shutdown could cause a loss of quorum. + // If we were not leader, wait to be safely removed from the cluster. We + // must wait to allow the raft replication to take place, otherwise an + // immediate shutdown could cause a loss of quorum. if !isLeader { + left := false limit := time.Now().Add(raftRemoveGracePeriod) - for numPeers > 0 && time.Now().Before(limit) { - // Update the number of peers - numPeers, err = s.numOtherPeers() - if err != nil { - s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err) - break - } - - // Avoid the sleep if we are done - if numPeers == 0 { - break - } - - // Sleep a while and check again + for !left && time.Now().Before(limit) { + // Sleep a while before we check. time.Sleep(50 * time.Millisecond) + + // Get the latest configuration. + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err) + break + } + + // See if we are no longer included. + left = true + for _, server := range future.Configuration().Servers { + if server.Address == addr { + left = false + break + } + } } - if numPeers != 0 { - s.logger.Printf("[WARN] consul: failed to leave raft peer set gracefully, timeout") + + // TODO (slackpad) When we take a later new version of the Raft + // library it won't try to complete replication, so this peer + // may not realize that it has been removed. Need to revisit this + // and the warning here. + if !left { + s.logger.Printf("[WARN] consul: failed to leave raft configuration gracefully, timeout") } } return nil } -// numOtherPeers is used to check on the number of known peers -// excluding the local node -func (s *Server) numOtherPeers() (int, error) { - peers, err := s.raftPeers.Peers() - if err != nil { +// numPeers is used to check on the number of known peers, including the local +// node. +func (s *Server) numPeers() (int, error) { + future := s.raft.GetConfiguration() + if err := future.Error(); err != nil { return 0, err } - otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr()) - return len(otherPeers), nil + configuration := future.Configuration() + return len(configuration.Servers), nil } // JoinLAN is used to have Consul join the inner-DC pool @@ -738,7 +770,7 @@ func (s *Server) Stats() map[string]map[string]string { "consul": map[string]string{ "server": "true", "leader": fmt.Sprintf("%v", s.IsLeader()), - "leader_addr": s.raft.Leader(), + "leader_addr": string(s.raft.Leader()), "bootstrap": fmt.Sprintf("%v", s.config.Bootstrap), "known_datacenters": toString(uint64(numKnownDCs)), }, @@ -747,11 +779,6 @@ func (s *Server) Stats() map[string]map[string]string { "serf_wan": s.serfWAN.Stats(), "runtime": runtimeStats(), } - if peers, err := s.raftPeers.Peers(); err == nil { - stats["raft"]["raft_peers"] = strings.Join(peers, ",") - } else { - s.logger.Printf("[DEBUG] server: error getting raft peers: %v", err) - } return stats } diff --git a/consul/server_test.go b/consul/server_test.go index a4a3887dc4..a8d65d36fa 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -336,19 +336,19 @@ func TestServer_LeaveLeader(t *testing.T) { t.Fatalf("err: %v", err) } - var p1 []string - var p2 []string + var p1 int + var p2 int testutil.WaitForResult(func() (bool, error) { - p1, _ = s1.raftPeers.Peers() - return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + p1, _ = s1.numPeers() + return p1 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) testutil.WaitForResult(func() (bool, error) { - p2, _ = s2.raftPeers.Peers() - return len(p2) == 2, errors.New(fmt.Sprintf("%v", p1)) + p2, _ = s2.numPeers() + return p2 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) @@ -366,8 +366,8 @@ func TestServer_LeaveLeader(t *testing.T) { // Should lose a peer for _, s := range []*Server{s1, s2} { testutil.WaitForResult(func() (bool, error) { - p1, _ = s.raftPeers.Peers() - return len(p1) == 1, nil + p1, _ = s.numPeers() + return p1 == 1, nil }, func(err error) { t.Fatalf("should have 1 peer: %v", p1) }) @@ -391,19 +391,19 @@ func TestServer_Leave(t *testing.T) { t.Fatalf("err: %v", err) } - var p1 []string - var p2 []string + var p1 int + var p2 int testutil.WaitForResult(func() (bool, error) { - p1, _ = s1.raftPeers.Peers() - return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + p1, _ = s1.numPeers() + return p1 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) testutil.WaitForResult(func() (bool, error) { - p2, _ = s2.raftPeers.Peers() - return len(p2) == 2, errors.New(fmt.Sprintf("%v", p1)) + p2, _ = s2.numPeers() + return p2 == 2, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 2 peers: %v", err) }) @@ -421,8 +421,8 @@ func TestServer_Leave(t *testing.T) { // Should lose a peer for _, s := range []*Server{s1, s2} { testutil.WaitForResult(func() (bool, error) { - p1, _ = s.raftPeers.Peers() - return len(p1) == 1, nil + p1, _ = s.numPeers() + return p1 == 1, nil }, func(err error) { t.Fatalf("should have 1 peer: %v", p1) }) @@ -486,13 +486,15 @@ func TestServer_JoinLAN_TLS(t *testing.T) { // Verify Raft has established a peer testutil.WaitForResult(func() (bool, error) { - return s1.Stats()["raft"]["num_peers"] == "1", nil + peers, _ := s1.numPeers() + return peers == 2, nil }, func(err error) { t.Fatalf("no peer established") }) testutil.WaitForResult(func() (bool, error) { - return s2.Stats()["raft"]["num_peers"] == "1", nil + peers, _ := s2.numPeers() + return peers == 2, nil }, func(err error) { t.Fatalf("no peer established") }) @@ -519,20 +521,20 @@ func TestServer_Expect(t *testing.T) { t.Fatalf("err: %v", err) } - var p1 []string - var p2 []string + var p1 int + var p2 int // should have no peers yet testutil.WaitForResult(func() (bool, error) { - p1, _ = s1.raftPeers.Peers() - return len(p1) == 0, errors.New(fmt.Sprintf("%v", p1)) + p1, _ = s1.numPeers() + return p1 == 0, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 0 peers: %v", err) }) testutil.WaitForResult(func() (bool, error) { - p2, _ = s2.raftPeers.Peers() - return len(p2) == 0, errors.New(fmt.Sprintf("%v", p2)) + p2, _ = s2.numPeers() + return p2 == 0, errors.New(fmt.Sprintf("%d", p2)) }, func(err error) { t.Fatalf("should have 0 peers: %v", err) }) @@ -542,26 +544,26 @@ func TestServer_Expect(t *testing.T) { t.Fatalf("err: %v", err) } - var p3 []string + var p3 int // should now have all three peers testutil.WaitForResult(func() (bool, error) { - p1, _ = s1.raftPeers.Peers() - return len(p1) == 3, errors.New(fmt.Sprintf("%v", p1)) + p1, _ = s1.numPeers() + return p1 == 3, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 3 peers: %v", err) }) testutil.WaitForResult(func() (bool, error) { - p2, _ = s2.raftPeers.Peers() - return len(p2) == 3, errors.New(fmt.Sprintf("%v", p2)) + p2, _ = s2.numPeers() + return p2 == 3, errors.New(fmt.Sprintf("%d", p2)) }, func(err error) { t.Fatalf("should have 3 peers: %v", err) }) testutil.WaitForResult(func() (bool, error) { - p3, _ = s3.raftPeers.Peers() - return len(p3) == 3, errors.New(fmt.Sprintf("%v", p3)) + p3, _ = s3.numPeers() + return p3 == 3, errors.New(fmt.Sprintf("%d", p3)) }, func(err error) { t.Fatalf("should have 3 peers: %v", err) }) @@ -593,20 +595,20 @@ func TestServer_BadExpect(t *testing.T) { t.Fatalf("err: %v", err) } - var p1 []string - var p2 []string + var p1 int + var p2 int // should have no peers yet testutil.WaitForResult(func() (bool, error) { - p1, _ = s1.raftPeers.Peers() - return len(p1) == 0, errors.New(fmt.Sprintf("%v", p1)) + p1, _ = s1.numPeers() + return p1 == 0, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 0 peers: %v", err) }) testutil.WaitForResult(func() (bool, error) { - p2, _ = s2.raftPeers.Peers() - return len(p2) == 0, errors.New(fmt.Sprintf("%v", p2)) + p2, _ = s2.numPeers() + return p2 == 0, errors.New(fmt.Sprintf("%d", p2)) }, func(err error) { t.Fatalf("should have 0 peers: %v", err) }) @@ -616,26 +618,26 @@ func TestServer_BadExpect(t *testing.T) { t.Fatalf("err: %v", err) } - var p3 []string + var p3 int // should still have no peers (because s2 is in expect=2 mode) testutil.WaitForResult(func() (bool, error) { - p1, _ = s1.raftPeers.Peers() - return len(p1) == 0, errors.New(fmt.Sprintf("%v", p1)) + p1, _ = s1.numPeers() + return p1 == 0, errors.New(fmt.Sprintf("%d", p1)) }, func(err error) { t.Fatalf("should have 0 peers: %v", err) }) testutil.WaitForResult(func() (bool, error) { - p2, _ = s2.raftPeers.Peers() - return len(p2) == 0, errors.New(fmt.Sprintf("%v", p2)) + p2, _ = s2.numPeers() + return p2 == 0, errors.New(fmt.Sprintf("%d", p2)) }, func(err error) { t.Fatalf("should have 0 peers: %v", err) }) testutil.WaitForResult(func() (bool, error) { - p3, _ = s3.raftPeers.Peers() - return len(p3) == 0, errors.New(fmt.Sprintf("%v", p3)) + p3, _ = s3.numPeers() + return p3 == 0, errors.New(fmt.Sprintf("%d", p3)) }, func(err error) { t.Fatalf("should have 0 peers: %v", err) }) diff --git a/consul/session_ttl_test.go b/consul/session_ttl_test.go index aa09b6d70d..79bd094771 100644 --- a/consul/session_ttl_test.go +++ b/consul/session_ttl_test.go @@ -297,8 +297,8 @@ func TestServer_SessionTTL_Failover(t *testing.T) { } testutil.WaitForResult(func() (bool, error) { - peers, _ := s1.raftPeers.Peers() - return len(peers) == 3, nil + peers, _ := s1.numPeers() + return peers == 3, nil }, func(err error) { t.Fatalf("should have 3 peers") }) diff --git a/consul/status_endpoint.go b/consul/status_endpoint.go index ddcbed440e..2cac03a7f6 100644 --- a/consul/status_endpoint.go +++ b/consul/status_endpoint.go @@ -12,7 +12,7 @@ func (s *Status) Ping(args struct{}, reply *struct{}) error { // Leader is used to get the address of the leader func (s *Status) Leader(args struct{}, reply *string) error { - leader := s.server.raft.Leader() + leader := string(s.server.raft.Leader()) if leader != "" { *reply = leader } else { @@ -23,11 +23,13 @@ func (s *Status) Leader(args struct{}, reply *string) error { // Peers is used to get all the Raft peers func (s *Status) Peers(args struct{}, reply *[]string) error { - peers, err := s.server.raftPeers.Peers() - if err != nil { + future := s.server.raft.GetConfiguration() + if err := future.Error(); err != nil { return err } - *reply = peers + for _, server := range future.Configuration().Servers { + *reply = append(*reply, string(server.Address)) + } return nil }