Merge pull request #1894 from hashicorp/f-rename-server-manager

F rename server manager
pull/1583/merge
Sean Chittenden 2016-03-29 17:44:26 -07:00
commit cff05be5b4
13 changed files with 448 additions and 440 deletions

View File

@ -1,4 +1,9 @@
package server_details // Package agent provides a logical endpoint for Consul agents in the
// network. agent data originates from Serf gossip and is primarily used to
// communicate Consul server information. Gossiped information that ends up
// in Server contains the necessary metadata required for servers.Manager to
// select which server an RPC request should be routed to.
package agent
import ( import (
"fmt" "fmt"
@ -18,8 +23,8 @@ func (k *Key) Equal(x *Key) bool {
return k.name == x.name return k.name == x.name
} }
// ServerDetails is used to return details of a consul server // Server is used to return details of a consul server
type ServerDetails struct { type Server struct {
Name string Name string
Datacenter string Datacenter string
Port int Port int
@ -30,14 +35,14 @@ type ServerDetails struct {
} }
// Key returns the corresponding Key // Key returns the corresponding Key
func (s *ServerDetails) Key() *Key { func (s *Server) Key() *Key {
return &Key{ return &Key{
name: s.Name, name: s.Name,
} }
} }
// String returns a string representation of ServerDetails // String returns a string representation of Server
func (s *ServerDetails) String() string { func (s *Server) String() string {
var addrStr, networkStr string var addrStr, networkStr string
if s.Addr != nil { if s.Addr != nil {
addrStr = s.Addr.String() addrStr = s.Addr.String()
@ -47,9 +52,9 @@ func (s *ServerDetails) String() string {
return fmt.Sprintf("%s (Addr: %s/%s) (DC: %s)", s.Name, networkStr, addrStr, s.Datacenter) return fmt.Sprintf("%s (Addr: %s/%s) (DC: %s)", s.Name, networkStr, addrStr, s.Datacenter)
} }
// IsConsulServer returns true if a serf member is a consul server. Returns a // IsConsulServer returns true if a serf member is a consul server
// bool and a pointer to the ServerDetails. // agent. Returns a bool and a pointer to the Server.
func IsConsulServer(m serf.Member) (bool, *ServerDetails) { func IsConsulServer(m serf.Member) (bool, *Server) {
if m.Tags["role"] != "consul" { if m.Tags["role"] != "consul" {
return false, nil return false, nil
} }
@ -81,7 +86,7 @@ func IsConsulServer(m serf.Member) (bool, *ServerDetails) {
addr := &net.TCPAddr{IP: m.Addr, Port: port} addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &ServerDetails{ parts := &Server{
Name: m.Name, Name: m.Name,
Datacenter: datacenter, Datacenter: datacenter,
Port: port, Port: port,

View File

@ -1,10 +1,10 @@
package server_details package agent
import ( import (
"testing" "testing"
) )
func TestServerDetails_Key_Equal(t *testing.T) { func TestServer_Key_Equal(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
k1 *Key k1 *Key
@ -47,16 +47,16 @@ func TestServerDetails_Key_Equal(t *testing.T) {
} }
} }
func TestServerDetails_Key(t *testing.T) { func TestServer_Key(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
sd *ServerDetails sd *Server
k *Key k *Key
equal bool equal bool
}{ }{
{ {
name: "Key equality", name: "Key equality",
sd: &ServerDetails{ sd: &Server{
Name: "s1", Name: "s1",
}, },
k: &Key{ k: &Key{
@ -66,7 +66,7 @@ func TestServerDetails_Key(t *testing.T) {
}, },
{ {
name: "Key inequality", name: "Key inequality",
sd: &ServerDetails{ sd: &Server{
Name: "s1", Name: "s1",
}, },
k: &Key{ k: &Key{

View File

@ -1,32 +1,32 @@
package server_details_test package agent_test
import ( import (
"net" "net"
"testing" "testing"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
func TestServerDetails_Key_params(t *testing.T) { func TestServer_Key_params(t *testing.T) {
ipv4a := net.ParseIP("127.0.0.1") ipv4a := net.ParseIP("127.0.0.1")
ipv4b := net.ParseIP("1.2.3.4") ipv4b := net.ParseIP("1.2.3.4")
tests := []struct { tests := []struct {
name string name string
sd1 *server_details.ServerDetails sd1 *agent.Server
sd2 *server_details.ServerDetails sd2 *agent.Server
equal bool equal bool
}{ }{
{ {
name: "Addr inequality", name: "Addr inequality",
sd1: &server_details.ServerDetails{ sd1: &agent.Server{
Name: "s1", Name: "s1",
Datacenter: "dc1", Datacenter: "dc1",
Port: 8300, Port: 8300,
Addr: &net.IPAddr{IP: ipv4a}, Addr: &net.IPAddr{IP: ipv4a},
}, },
sd2: &server_details.ServerDetails{ sd2: &agent.Server{
Name: "s1", Name: "s1",
Datacenter: "dc1", Datacenter: "dc1",
Port: 8300, Port: 8300,
@ -42,7 +42,7 @@ func TestServerDetails_Key_params(t *testing.T) {
} }
// Test Key to make sure it actually works as a key // Test Key to make sure it actually works as a key
m := make(map[server_details.Key]bool) m := make(map[agent.Key]bool)
m[*test.sd1.Key()] = true m[*test.sd1.Key()] = true
if _, found := m[*test.sd2.Key()]; found != test.equal { if _, found := m[*test.sd2.Key()]; found != test.equal {
t.Errorf("Expected a %v result from map test %s", test.equal, test.name) t.Errorf("Expected a %v result from map test %s", test.equal, test.name)
@ -61,7 +61,7 @@ func TestIsConsulServer(t *testing.T) {
"vsn": "1", "vsn": "1",
}, },
} }
ok, parts := server_details.IsConsulServer(m) ok, parts := agent.IsConsulServer(m)
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
t.Fatalf("bad: %v %v", ok, parts) t.Fatalf("bad: %v %v", ok, parts)
} }
@ -76,7 +76,7 @@ func TestIsConsulServer(t *testing.T) {
} }
m.Tags["bootstrap"] = "1" m.Tags["bootstrap"] = "1"
m.Tags["disabled"] = "1" m.Tags["disabled"] = "1"
ok, parts = server_details.IsConsulServer(m) ok, parts = agent.IsConsulServer(m)
if !ok { if !ok {
t.Fatalf("expected a valid consul server") t.Fatalf("expected a valid consul server")
} }
@ -92,7 +92,7 @@ func TestIsConsulServer(t *testing.T) {
m.Tags["expect"] = "3" m.Tags["expect"] = "3"
delete(m.Tags, "bootstrap") delete(m.Tags, "bootstrap")
delete(m.Tags, "disabled") delete(m.Tags, "disabled")
ok, parts = server_details.IsConsulServer(m) ok, parts = agent.IsConsulServer(m)
if !ok || parts.Expect != 3 { if !ok || parts.Expect != 3 {
t.Fatalf("bad: %v", parts.Expect) t.Fatalf("bad: %v", parts.Expect)
} }
@ -101,7 +101,7 @@ func TestIsConsulServer(t *testing.T) {
} }
delete(m.Tags, "role") delete(m.Tags, "role")
ok, parts = server_details.IsConsulServer(m) ok, parts = agent.IsConsulServer(m)
if ok { if ok {
t.Fatalf("unexpected ok server") t.Fatalf("unexpected ok server")
} }

View File

@ -10,8 +10,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/server_manager" "github.com/hashicorp/consul/consul/servers"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -58,9 +58,9 @@ type Client struct {
// Connection pool to consul servers // Connection pool to consul servers
connPool *ConnPool connPool *ConnPool
// serverMgr is responsible for the selection and maintenance of // servers is responsible for the selection and maintenance of
// Consul servers this agent uses for RPC requests // Consul servers this agent uses for RPC requests
serverMgr *server_manager.ServerManager servers *servers.Manager
// eventCh is used to receive events from the // eventCh is used to receive events from the
// serf cluster in the datacenter // serf cluster in the datacenter
@ -130,9 +130,9 @@ func NewClient(config *Config) (*Client, error) {
return nil, fmt.Errorf("Failed to start lan serf: %v", err) return nil, fmt.Errorf("Failed to start lan serf: %v", err)
} }
// Start maintenance task for server_manager // Start maintenance task for servers
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf, c.connPool) c.servers = servers.New(c.logger, c.shutdownCh, c.serf, c.connPool)
go c.serverMgr.Start() go c.servers.Start()
return c, nil return c, nil
} }
@ -261,7 +261,7 @@ func (c *Client) lanEventHandler() {
// nodeJoin is used to handle join events on the serf cluster // nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) { func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members { for _, m := range me.Members {
ok, parts := server_details.IsConsulServer(m) ok, parts := agent.IsConsulServer(m)
if !ok { if !ok {
continue continue
} }
@ -271,7 +271,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
continue continue
} }
c.logger.Printf("[INFO] consul: adding server %s", parts) c.logger.Printf("[INFO] consul: adding server %s", parts)
c.serverMgr.AddServer(parts) c.servers.AddServer(parts)
// Trigger the callback // Trigger the callback
if c.config.ServerUp != nil { if c.config.ServerUp != nil {
@ -283,12 +283,12 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
// nodeFail is used to handle fail events on the serf cluster // nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) { func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members { for _, m := range me.Members {
ok, parts := server_details.IsConsulServer(m) ok, parts := agent.IsConsulServer(m)
if !ok { if !ok {
continue continue
} }
c.logger.Printf("[INFO] consul: removing server %s", parts) c.logger.Printf("[INFO] consul: removing server %s", parts)
c.serverMgr.RemoveServer(parts) c.servers.RemoveServer(parts)
} }
} }
@ -322,14 +322,14 @@ func (c *Client) localEvent(event serf.UserEvent) {
// RPC is used to forward an RPC call to a consul server, or fail if no servers // RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error { func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
server := c.serverMgr.FindServer() server := c.servers.FindServer()
if server == nil { if server == nil {
return structs.ErrNoServers return structs.ErrNoServers
} }
// Forward to remote Consul // Forward to remote Consul
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
c.serverMgr.NotifyFailedServer(server) c.servers.NotifyFailedServer(server)
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
return err return err
} }
@ -340,7 +340,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Stats is used to return statistics for debugging and insight // Stats is used to return statistics for debugging and insight
// for various sub-systems // for various sub-systems
func (c *Client) Stats() map[string]map[string]string { func (c *Client) Stats() map[string]map[string]string {
numServers := c.serverMgr.NumServers() numServers := c.servers.NumServers()
toString := func(v uint64) string { toString := func(v uint64) string {
return strconv.FormatUint(v, 10) return strconv.FormatUint(v, 10)

View File

@ -84,7 +84,7 @@ func TestClient_JoinLAN(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
return c1.serverMgr.NumServers() == 1, nil return c1.servers.NumServers() == 1, nil
}, func(err error) { }, func(err error) {
t.Fatalf("expected consul server") t.Fatalf("expected consul server")
}) })
@ -100,7 +100,7 @@ func TestClient_JoinLAN(t *testing.T) {
// Check we have a new consul // Check we have a new consul
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
return c1.serverMgr.NumServers() == 1, nil return c1.servers.NumServers() == 1, nil
}, func(err error) { }, func(err error) {
t.Fatalf("expected consul server") t.Fatalf("expected consul server")
}) })
@ -270,7 +270,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
// Sleep to allow Serf to sync, shuffle, and let the shuffle complete // Sleep to allow Serf to sync, shuffle, and let the shuffle complete
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
c.serverMgr.ResetRebalanceTimer() c.servers.ResetRebalanceTimer()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if len(c.LANMembers()) != numServers+numClients { if len(c.LANMembers()) != numServers+numClients {
@ -286,7 +286,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
var pingCount int var pingCount int
for range servers { for range servers {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
s := c.serverMgr.FindServer() s := c.servers.FindServer()
ok, err := c.connPool.PingConsulServer(s) ok, err := c.connPool.PingConsulServer(s)
if !ok { if !ok {
t.Errorf("Unable to ping server %v: %s", s.String(), err) t.Errorf("Unable to ping server %v: %s", s.String(), err)
@ -295,7 +295,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
// Artificially fail the server in order to rotate the server // Artificially fail the server in order to rotate the server
// list // list
c.serverMgr.NotifyFailedServer(s) c.servers.NotifyFailedServer(s)
} }
if pingCount != numServers { if pingCount != numServers {

View File

@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -350,7 +350,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
return true return true
} }
if valid, parts := server_details.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { if valid, parts := agent.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
return true return true
} }
return false return false
@ -361,7 +361,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
func (s *Server) handleAliveMember(member serf.Member) error { func (s *Server) handleAliveMember(member serf.Member) error {
// Register consul service if a server // Register consul service if a server
var service *structs.NodeService var service *structs.NodeService
if valid, parts := server_details.IsConsulServer(member); valid { if valid, parts := agent.IsConsulServer(member); valid {
service = &structs.NodeService{ service = &structs.NodeService{
ID: ConsulServiceID, ID: ConsulServiceID,
Service: ConsulServiceName, Service: ConsulServiceName,
@ -497,7 +497,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
} }
// Remove from Raft peers if this was a server // Remove from Raft peers if this was a server
if valid, parts := server_details.IsConsulServer(member); valid { if valid, parts := agent.IsConsulServer(member); valid {
if err := s.removeConsulServer(member, parts.Port); err != nil { if err := s.removeConsulServer(member, parts.Port); err != nil {
return err return err
} }
@ -524,7 +524,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
} }
// joinConsulServer is used to try to join another consul server // joinConsulServer is used to try to join another consul server
func (s *Server) joinConsulServer(m serf.Member, parts *server_details.ServerDetails) error { func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
// Do not join ourself // Do not join ourself
if m.Name == s.config.NodeName { if m.Name == s.config.NodeName {
return nil return nil
@ -534,7 +534,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *server_details.ServerDet
if parts.Bootstrap { if parts.Bootstrap {
members := s.serfLAN.Members() members := s.serfLAN.Members()
for _, member := range members { for _, member := range members {
valid, p := server_details.IsConsulServer(member) valid, p := agent.IsConsulServer(member)
if valid && member.Name != m.Name && p.Bootstrap { if valid && member.Name != m.Name && p.Bootstrap {
s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name) s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
return nil return nil

View File

@ -3,7 +3,7 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -25,7 +25,7 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
continue continue
} }
ok, parts := server_details.IsConsulServer(*m) ok, parts := agent.IsConsulServer(*m)
if ok && parts.Datacenter != md.dc { if ok && parts.Datacenter != md.dc {
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'", return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
m.Name, parts.Datacenter) m.Name, parts.Datacenter)
@ -42,7 +42,7 @@ type wanMergeDelegate struct {
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error { func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error {
for _, m := range members { for _, m := range members {
ok, _ := server_details.IsConsulServer(*m) ok, _ := agent.IsConsulServer(*m)
if !ok { if !ok {
return fmt.Errorf("Member '%s' is not a server", m.Name) return fmt.Errorf("Member '%s' is not a server", m.Name)
} }

View File

@ -10,7 +10,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux" "github.com/hashicorp/yamux"
@ -408,7 +408,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg
// PingConsulServer sends a Status.Ping message to the specified server and // PingConsulServer sends a Status.Ping message to the specified server and
// returns true if healthy, false if an error occurred // returns true if healthy, false if an error occurred
func (p *ConnPool) PingConsulServer(s *server_details.ServerDetails) (bool, error) { func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) {
// Get a usable client // Get a usable client
conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version) conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version)
if err != nil { if err != nil {

View File

@ -4,7 +4,7 @@ import (
"net" "net"
"strings" "strings"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -141,7 +141,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
// lanNodeJoin is used to handle join events on the LAN pool. // lanNodeJoin is used to handle join events on the LAN pool.
func (s *Server) lanNodeJoin(me serf.MemberEvent) { func (s *Server) lanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members { for _, m := range me.Members {
ok, parts := server_details.IsConsulServer(m) ok, parts := agent.IsConsulServer(m)
if !ok { if !ok {
continue continue
} }
@ -164,7 +164,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
// wanNodeJoin is used to handle join events on the WAN pool. // wanNodeJoin is used to handle join events on the WAN pool.
func (s *Server) wanNodeJoin(me serf.MemberEvent) { func (s *Server) wanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members { for _, m := range me.Members {
ok, parts := server_details.IsConsulServer(m) ok, parts := agent.IsConsulServer(m)
if !ok { if !ok {
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name) s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name)
continue continue
@ -210,7 +210,7 @@ func (s *Server) maybeBootstrap() {
members := s.serfLAN.Members() members := s.serfLAN.Members()
addrs := make([]string, 0) addrs := make([]string, 0)
for _, member := range members { for _, member := range members {
valid, p := server_details.IsConsulServer(member) valid, p := agent.IsConsulServer(member)
if !valid { if !valid {
continue continue
} }
@ -248,7 +248,7 @@ func (s *Server) maybeBootstrap() {
// lanNodeFailed is used to handle fail events on the LAN pool. // lanNodeFailed is used to handle fail events on the LAN pool.
func (s *Server) lanNodeFailed(me serf.MemberEvent) { func (s *Server) lanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members { for _, m := range me.Members {
ok, parts := server_details.IsConsulServer(m) ok, parts := agent.IsConsulServer(m)
if !ok { if !ok {
continue continue
} }
@ -263,7 +263,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
// wanNodeFailed is used to handle fail events on the WAN pool. // wanNodeFailed is used to handle fail events on the WAN pool.
func (s *Server) wanNodeFailed(me serf.MemberEvent) { func (s *Server) wanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members { for _, m := range me.Members {
ok, parts := server_details.IsConsulServer(m) ok, parts := agent.IsConsulServer(m)
if !ok { if !ok {
continue continue
} }

View File

@ -15,7 +15,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
@ -98,7 +98,7 @@ type Server struct {
// localConsuls is used to track the known consuls // localConsuls is used to track the known consuls
// in the local datacenter. Used to do leader forwarding. // in the local datacenter. Used to do leader forwarding.
localConsuls map[string]*server_details.ServerDetails localConsuls map[string]*agent.Server
localLock sync.RWMutex localLock sync.RWMutex
// Logger uses the provided LogOutput // Logger uses the provided LogOutput
@ -120,7 +120,7 @@ type Server struct {
// remoteConsuls is used to track the known consuls in // remoteConsuls is used to track the known consuls in
// remote datacenters. Used to do DC forwarding. // remote datacenters. Used to do DC forwarding.
remoteConsuls map[string][]*server_details.ServerDetails remoteConsuls map[string][]*agent.Server
remoteLock sync.RWMutex remoteLock sync.RWMutex
// rpcListener is used to listen for incoming connections // rpcListener is used to listen for incoming connections
@ -217,10 +217,10 @@ func NewServer(config *Config) (*Server, error) {
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
eventChLAN: make(chan serf.Event, 256), eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[string]*server_details.ServerDetails), localConsuls: make(map[string]*agent.Server),
logger: logger, logger: logger,
reconcileCh: make(chan serf.Member, 32), reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*server_details.ServerDetails), remoteConsuls: make(map[string][]*agent.Server),
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
tombstoneGC: gc, tombstoneGC: gc,

View File

@ -1,4 +1,9 @@
package server_manager // Package servers provides a Manager interface for Manager managed
// agent.Server objects. The servers package manages servers from a Consul
// client's perspective (i.e. a list of servers that a client talks with for
// RPCs). The servers package does not provide any API guarantees and should
// be called only by `hashicorp/consul`.
package servers
import ( import (
"log" "log"
@ -7,7 +12,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
) )
@ -56,25 +61,24 @@ type ConsulClusterInfo interface {
// Pinger is an interface wrapping client.ConnPool to prevent a // Pinger is an interface wrapping client.ConnPool to prevent a
// cyclic import dependency // cyclic import dependency
type Pinger interface { type Pinger interface {
PingConsulServer(server *server_details.ServerDetails) (bool, error) PingConsulServer(s *agent.Server) (bool, error)
} }
// serverConfig is the thread-safe configuration struct used to maintain the // serverList is a local copy of the struct used to maintain the list of
// list of Consul servers in ServerManager. // Consul servers used by Manager.
// //
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will // NOTE(sean@): We are explicitly relying on the fact that serverList will
// be copied onto the stack. Please keep this structure light. // be copied onto the stack. Please keep this structure light.
type serverConfig struct { type serverList struct {
// servers tracks the locally known servers. List membership is // servers tracks the locally known servers. List membership is
// maintained by Serf. // maintained by Serf.
servers []*server_details.ServerDetails servers []*agent.Server
} }
type ServerManager struct { type Manager struct {
// serverConfig provides the necessary load/store semantics for the // listValue manages the atomic load/store of a Manager's serverList
// server list. listValue atomic.Value
serverConfigValue atomic.Value listLock sync.Mutex
serverConfigLock sync.Mutex
// rebalanceTimer controls the duration of the rebalance interval // rebalanceTimer controls the duration of the rebalance interval
rebalanceTimer *time.Timer rebalanceTimer *time.Timer
@ -95,7 +99,7 @@ type ServerManager struct {
connPoolPinger Pinger connPoolPinger Pinger
// notifyFailedBarrier is acts as a barrier to prevent queuing behind // notifyFailedBarrier is acts as a barrier to prevent queuing behind
// serverConfigLog and acts as a TryLock(). // serverListLog and acts as a TryLock().
notifyFailedBarrier int32 notifyFailedBarrier int32
} }
@ -104,23 +108,23 @@ type ServerManager struct {
// begin seeing use after the rebalance timer fires or enough servers fail // begin seeing use after the rebalance timer fires or enough servers fail
// organically. If the server is already known, merge the new server // organically. If the server is already known, merge the new server
// details. // details.
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { func (m *Manager) AddServer(s *agent.Server) {
sm.serverConfigLock.Lock() m.listLock.Lock()
defer sm.serverConfigLock.Unlock() defer m.listLock.Unlock()
sc := sm.getServerConfig() l := m.getServerList()
// Check if this server is known // Check if this server is known
found := false found := false
for idx, existing := range sc.servers { for idx, existing := range l.servers {
if existing.Name == server.Name { if existing.Name == s.Name {
newServers := make([]*server_details.ServerDetails, len(sc.servers)) newServers := make([]*agent.Server, len(l.servers))
copy(newServers, sc.servers) copy(newServers, l.servers)
// Overwrite the existing server details in order to // Overwrite the existing server details in order to
// possibly update metadata (e.g. server version) // possibly update metadata (e.g. server version)
newServers[idx] = server newServers[idx] = s
sc.servers = newServers l.servers = newServers
found = true found = true
break break
} }
@ -128,53 +132,53 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
// Add to the list if not known // Add to the list if not known
if !found { if !found {
newServers := make([]*server_details.ServerDetails, len(sc.servers), len(sc.servers)+1) newServers := make([]*agent.Server, len(l.servers), len(l.servers)+1)
copy(newServers, sc.servers) copy(newServers, l.servers)
newServers = append(newServers, server) newServers = append(newServers, s)
sc.servers = newServers l.servers = newServers
} }
sm.saveServerConfig(sc) m.saveServerList(l)
} }
// cycleServers returns a new list of servers that has dequeued the first // cycleServers returns a new list of servers that has dequeued the first
// server and enqueued it at the end of the list. cycleServers assumes the // server and enqueued it at the end of the list. cycleServers assumes the
// caller is holding the serverConfigLock. cycleServer does not test or ping // caller is holding the listLock. cycleServer does not test or ping
// the next server inline. cycleServer may be called when the environment // the next server inline. cycleServer may be called when the environment
// has just entered an unhealthy situation and blocking on a server test is // has just entered an unhealthy situation and blocking on a server test is
// less desirable than just returning the next server in the firing line. If // less desirable than just returning the next server in the firing line. If
// the next server fails, it will fail fast enough and cycleServer will be // the next server fails, it will fail fast enough and cycleServer will be
// called again. // called again.
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { func (l *serverList) cycleServer() (servers []*agent.Server) {
numServers := len(sc.servers) numServers := len(l.servers)
if numServers < 2 { if numServers < 2 {
return servers // No action required return servers // No action required
} }
newServers := make([]*server_details.ServerDetails, 0, numServers) newServers := make([]*agent.Server, 0, numServers)
newServers = append(newServers, sc.servers[1:]...) newServers = append(newServers, l.servers[1:]...)
newServers = append(newServers, sc.servers[0]) newServers = append(newServers, l.servers[0])
return newServers return newServers
} }
// removeServerByKey performs an inline removal of the first matching server // removeServerByKey performs an inline removal of the first matching server
func (sc *serverConfig) removeServerByKey(targetKey *server_details.Key) { func (l *serverList) removeServerByKey(targetKey *agent.Key) {
for i, s := range sc.servers { for i, s := range l.servers {
if targetKey.Equal(s.Key()) { if targetKey.Equal(s.Key()) {
copy(sc.servers[i:], sc.servers[i+1:]) copy(l.servers[i:], l.servers[i+1:])
sc.servers[len(sc.servers)-1] = nil l.servers[len(l.servers)-1] = nil
sc.servers = sc.servers[:len(sc.servers)-1] l.servers = l.servers[:len(l.servers)-1]
return return
} }
} }
} }
// shuffleServers shuffles the server list in place // shuffleServers shuffles the server list in place
func (sc *serverConfig) shuffleServers() { func (l *serverList) shuffleServers() {
for i := len(sc.servers) - 1; i > 0; i-- { for i := len(l.servers) - 1; i > 0; i-- {
j := rand.Int31n(int32(i + 1)) j := rand.Int31n(int32(i + 1))
sc.servers[i], sc.servers[j] = sc.servers[j], sc.servers[i] l.servers[i], l.servers[j] = l.servers[j], l.servers[i]
} }
} }
@ -184,52 +188,52 @@ func (sc *serverConfig) shuffleServers() {
// server list. If the server at the front of the list has failed or fails // server list. If the server at the front of the list has failed or fails
// during an RPC call, it is rotated to the end of the list. If there are no // during an RPC call, it is rotated to the end of the list. If there are no
// servers available, return nil. // servers available, return nil.
func (sm *ServerManager) FindServer() *server_details.ServerDetails { func (m *Manager) FindServer() *agent.Server {
sc := sm.getServerConfig() l := m.getServerList()
numServers := len(sc.servers) numServers := len(l.servers)
if numServers == 0 { if numServers == 0 {
sm.logger.Printf("[WARN] server manager: No servers available") m.logger.Printf("[WARN] manager: No servers available")
return nil return nil
} else { } else {
// Return whatever is at the front of the list because it is // Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless - // assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a // hypothetically - the server list was rotated right after a
// server was added). // server was added).
return sc.servers[0] return l.servers[0]
} }
} }
// getServerConfig is a convenience method which hides the locking semantics // getServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller. // of atomic.Value from the caller.
func (sm *ServerManager) getServerConfig() serverConfig { func (m *Manager) getServerList() serverList {
return sm.serverConfigValue.Load().(serverConfig) return m.listValue.Load().(serverList)
} }
// saveServerConfig is a convenience method which hides the locking semantics // saveServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller. // of atomic.Value from the caller.
func (sm *ServerManager) saveServerConfig(sc serverConfig) { func (m *Manager) saveServerList(l serverList) {
sm.serverConfigValue.Store(sc) m.listValue.Store(l)
} }
// New is the only way to safely create a new ServerManager struct. // New is the only way to safely create a new Manager struct.
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (sm *ServerManager) { func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (m *Manager) {
sm = new(ServerManager) m = new(Manager)
sm.logger = logger m.logger = logger
sm.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
sm.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
sm.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
sm.shutdownCh = shutdownCh m.shutdownCh = shutdownCh
sc := serverConfig{} l := serverList{}
sc.servers = make([]*server_details.ServerDetails, 0) l.servers = make([]*agent.Server, 0)
sm.saveServerConfig(sc) m.saveServerList(l)
return sm return m
} }
// NotifyFailedServer marks the passed in server as "failed" by rotating it // NotifyFailedServer marks the passed in server as "failed" by rotating it
// to the end of the server list. // to the end of the server list.
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { func (m *Manager) NotifyFailedServer(s *agent.Server) {
sc := sm.getServerConfig() l := m.getServerList()
// If the server being failed is not the first server on the list, // If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on // this is a noop. If, however, the server is failed and first on
@ -237,30 +241,29 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
// the server to the end of the list. // the server to the end of the list.
// Only rotate the server list when there is more than one server // Only rotate the server list when there is more than one server
if len(sc.servers) > 1 && sc.servers[0] == server && if len(l.servers) > 1 && l.servers[0] == s &&
// Use atomic.CAS to emulate a TryLock(). // Use atomic.CAS to emulate a TryLock().
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { atomic.CompareAndSwapInt32(&m.notifyFailedBarrier, 0, 1) {
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0) defer atomic.StoreInt32(&m.notifyFailedBarrier, 0)
// Grab a lock, retest, and take the hit of cycling the first // Grab a lock, retest, and take the hit of cycling the first
// server to the end. // server to the end.
sm.serverConfigLock.Lock() m.listLock.Lock()
defer sm.serverConfigLock.Unlock() defer m.listLock.Unlock()
sc = sm.getServerConfig() l = m.getServerList()
if len(sc.servers) > 1 && sc.servers[0] == server { if len(l.servers) > 1 && l.servers[0] == s {
sc.servers = sc.cycleServer() l.servers = l.cycleServer()
sm.saveServerConfig(sc) m.saveServerList(l)
} }
} }
} }
// NumServers takes out an internal "read lock" and returns the number of // NumServers takes out an internal "read lock" and returns the number of
// servers. numServers includes both healthy and unhealthy servers. // servers. numServers includes both healthy and unhealthy servers.
func (sm *ServerManager) NumServers() (numServers int) { func (m *Manager) NumServers() int {
sc := sm.getServerConfig() l := m.getServerList()
numServers = len(sc.servers) return len(l.servers)
return numServers
} }
// RebalanceServers shuffles the list of servers on this agent. The server // RebalanceServers shuffles the list of servers on this agent. The server
@ -275,46 +278,46 @@ func (sm *ServerManager) NumServers() (numServers int) {
// Unhealthy servers are removed when serf notices the server has been // Unhealthy servers are removed when serf notices the server has been
// deregistered. Before the newly shuffled server list is saved, the new // deregistered. Before the newly shuffled server list is saved, the new
// remote endpoint is tested to ensure its responsive. // remote endpoint is tested to ensure its responsive.
func (sm *ServerManager) RebalanceServers() { func (m *Manager) RebalanceServers() {
// Obtain a copy of the current serverConfig // Obtain a copy of the current serverList
sc := sm.getServerConfig() l := m.getServerList()
// Early abort if there is no value to shuffling // Early abort if there is no value to shuffling
if len(sc.servers) < 2 { if len(l.servers) < 2 {
return return
} }
sc.shuffleServers() l.shuffleServers()
// Iterate through the shuffled server list to find a healthy server. // Iterate through the shuffled server list to find a healthy server.
// Don't iterate on the list directly, this loop mutates the server // Don't iterate on the list directly, this loop mutates the server
// list. // list.
var foundHealthyServer bool var foundHealthyServer bool
for i := 0; i < len(sc.servers); i++ { for i := 0; i < len(l.servers); i++ {
// Always test the first server. Failed servers are cycled // Always test the first server. Failed servers are cycled
// while Serf detects the node has failed. // while Serf detects the node has failed.
selectedServer := sc.servers[0] selectedServer := l.servers[0]
ok, err := sm.connPoolPinger.PingConsulServer(selectedServer) ok, err := m.connPoolPinger.PingConsulServer(selectedServer)
if ok { if ok {
foundHealthyServer = true foundHealthyServer = true
break break
} }
sm.logger.Printf(`[DEBUG] server manager: pinging server "%s" failed: %s`, selectedServer.String(), err) m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, selectedServer.String(), err)
sc.cycleServer() l.cycleServer()
} }
// If no healthy servers were found, sleep and wait for Serf to make // If no healthy servers were found, sleep and wait for Serf to make
// the world a happy place again. // the world a happy place again.
if !foundHealthyServer { if !foundHealthyServer {
sm.logger.Printf("[DEBUG] server manager: No healthy servers during rebalance, aborting") m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance, aborting")
return return
} }
// Verify that all servers are present // Verify that all servers are present
if sm.reconcileServerList(&sc) { if m.reconcileServerList(&l) {
sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String()) m.logger.Printf("[DEBUG] manager: Rebalanced %d servers, next active server is %s", len(l.servers), l.servers[0].String())
} else { } else {
// reconcileServerList failed because Serf removed the server // reconcileServerList failed because Serf removed the server
// that was at the front of the list that had successfully // that was at the front of the list that had successfully
@ -330,36 +333,36 @@ func (sm *ServerManager) RebalanceServers() {
return return
} }
// reconcileServerList returns true when the first server in serverConfig // reconcileServerList returns true when the first server in serverList
// exists in the receiver's serverConfig. If true, the merged serverConfig // exists in the receiver's serverList. If true, the merged serverList
// is stored as the receiver's serverConfig. Returns false if the first // is stored as the receiver's serverList. Returns false if the first
// server does not exist in the list (i.e. was removed by Serf during a // server does not exist in the list (i.e. was removed by Serf during a
// PingConsulServer() call. Newly added servers are appended to the list and // PingConsulServer() call. Newly added servers are appended to the list and
// other missing servers are removed from the list. // other missing servers are removed from the list.
func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { func (m *Manager) reconcileServerList(l *serverList) bool {
sm.serverConfigLock.Lock() m.listLock.Lock()
defer sm.serverConfigLock.Unlock() defer m.listLock.Unlock()
// newServerCfg is a serverConfig that has been kept up to date with // newServerCfg is a serverList that has been kept up to date with
// Serf node join and node leave events. // Serf node join and node leave events.
newServerCfg := sm.getServerConfig() newServerCfg := m.getServerList()
// If Serf has removed all nodes, or there is no selected server // If Serf has removed all nodes, or there is no selected server
// (zero nodes in sc), abort early. // (zero nodes in l), abort early.
if len(newServerCfg.servers) == 0 || len(sc.servers) == 0 { if len(newServerCfg.servers) == 0 || len(l.servers) == 0 {
return false return false
} }
type targetServer struct { type targetServer struct {
server *server_details.ServerDetails server *agent.Server
// 'b' == both // 'b' == both
// 'o' == original // 'o' == original
// 'n' == new // 'n' == new
state byte state byte
} }
mergedList := make(map[server_details.Key]*targetServer, len(sc.servers)) mergedList := make(map[agent.Key]*targetServer, len(l.servers))
for _, s := range sc.servers { for _, s := range l.servers {
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
} }
for _, s := range newServerCfg.servers { for _, s := range newServerCfg.servers {
@ -373,7 +376,7 @@ func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool {
} }
// Ensure the selected server has not been removed by Serf // Ensure the selected server has not been removed by Serf
selectedServerKey := sc.servers[0].Key() selectedServerKey := l.servers[0].Key()
if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
return false return false
} }
@ -385,63 +388,63 @@ func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool {
// Do nothing, server exists in both // Do nothing, server exists in both
case 'o': case 'o':
// Server has been removed // Server has been removed
sc.removeServerByKey(&k) l.removeServerByKey(&k)
case 'n': case 'n':
// Server added // Server added
sc.servers = append(sc.servers, v.server) l.servers = append(l.servers, v.server)
default: default:
panic("unknown merge list state") panic("unknown merge list state")
} }
} }
sm.saveServerConfig(*sc) m.saveServerList(*l)
return true return true
} }
// RemoveServer takes out an internal write lock and removes a server from // RemoveServer takes out an internal write lock and removes a server from
// the server list. // the server list.
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { func (m *Manager) RemoveServer(s *agent.Server) {
sm.serverConfigLock.Lock() m.listLock.Lock()
defer sm.serverConfigLock.Unlock() defer m.listLock.Unlock()
sc := sm.getServerConfig() l := m.getServerList()
// Remove the server if known // Remove the server if known
for i, _ := range sc.servers { for i, _ := range l.servers {
if sc.servers[i].Name == server.Name { if l.servers[i].Name == s.Name {
newServers := make([]*server_details.ServerDetails, 0, len(sc.servers)-1) newServers := make([]*agent.Server, 0, len(l.servers)-1)
newServers = append(newServers, sc.servers[:i]...) newServers = append(newServers, l.servers[:i]...)
newServers = append(newServers, sc.servers[i+1:]...) newServers = append(newServers, l.servers[i+1:]...)
sc.servers = newServers l.servers = newServers
sm.saveServerConfig(sc) m.saveServerList(l)
return return
} }
} }
} }
// refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires. // refreshServerRebalanceTimer is only called once m.rebalanceTimer expires.
func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration { func (m *Manager) refreshServerRebalanceTimer() time.Duration {
sc := sm.getServerConfig() l := m.getServerList()
numConsulServers := len(sc.servers) numConsulServers := len(l.servers)
// Limit this connection's life based on the size (and health) of the // Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than // cluster. Never rebalance a connection more frequently than
// connReuseLowWatermarkDuration, and make sure we never exceed // connReuseLowWatermarkDuration, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers. // clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers := sm.clusterInfo.NumNodes() numLANMembers := m.clusterInfo.NumNodes()
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
sm.rebalanceTimer.Reset(connRebalanceTimeout) m.rebalanceTimer.Reset(connRebalanceTimeout)
return connRebalanceTimeout return connRebalanceTimeout
} }
// ResetRebalanceTimer resets the rebalance timer. This method primarily // ResetRebalanceTimer resets the rebalance timer. This method primarily
// exists for testing and should not be used directly. // exists for testing and should not be used directly.
func (sm *ServerManager) ResetRebalanceTimer() { func (m *Manager) ResetRebalanceTimer() {
sm.serverConfigLock.Lock() m.listLock.Lock()
defer sm.serverConfigLock.Unlock() defer m.listLock.Unlock()
sm.rebalanceTimer.Reset(clientRPCMinReuseDuration) m.rebalanceTimer.Reset(clientRPCMinReuseDuration)
} }
// Start is used to start and manage the task of automatically shuffling and // Start is used to start and manage the task of automatically shuffling and
@ -450,15 +453,15 @@ func (sm *ServerManager) ResetRebalanceTimer() {
// automatically cycled to the end of the list. New servers are appended to // automatically cycled to the end of the list. New servers are appended to
// the list. The order of the server list must be shuffled periodically to // the list. The order of the server list must be shuffled periodically to
// distribute load across all known and available consul servers. // distribute load across all known and available consul servers.
func (sm *ServerManager) Start() { func (m *Manager) Start() {
for { for {
select { select {
case <-sm.rebalanceTimer.C: case <-m.rebalanceTimer.C:
sm.RebalanceServers() m.RebalanceServers()
sm.refreshServerRebalanceTimer() m.refreshServerRebalanceTimer()
case <-sm.shutdownCh: case <-m.shutdownCh:
sm.logger.Printf("[INFO] server manager: shutting down") m.logger.Printf("[INFO] manager: shutting down")
return return
} }
} }

View File

@ -1,4 +1,4 @@
package server_manager package servers
import ( import (
"bytes" "bytes"
@ -9,7 +9,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
) )
var ( var (
@ -31,7 +31,7 @@ type fauxConnPool struct {
failPct float64 failPct float64
} }
func (cp *fauxConnPool) PingConsulServer(server *server_details.ServerDetails) (bool, error) { func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) {
var success bool var success bool
successProb := rand.Float64() successProb := rand.Float64()
if successProb > cp.failPct { if successProb > cp.failPct {
@ -48,108 +48,108 @@ func (s *fauxSerf) NumNodes() int {
return s.numNodes return s.numNodes
} }
func testServerManager() (sm *ServerManager) { func testManager() (m *Manager) {
logger := GetBufferedLogger() logger := GetBufferedLogger()
shutdownCh := make(chan struct{}) shutdownCh := make(chan struct{})
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}) m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{})
return sm return m
} }
func testServerManagerFailProb(failPct float64) (sm *ServerManager) { func testManagerFailProb(failPct float64) (m *Manager) {
logger := GetBufferedLogger() logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags) logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{}) shutdownCh := make(chan struct{})
sm = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
return sm return m
} }
// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { // func (l *serverList) cycleServer() (servers []*agent.Server) {
func TestServerManagerInternal_cycleServer(t *testing.T) { func TestManagerInternal_cycleServer(t *testing.T) {
sm := testServerManager() m := testManager()
sc := sm.getServerConfig() l := m.getServerList()
server0 := &server_details.ServerDetails{Name: "server1"} server0 := &agent.Server{Name: "server1"}
server1 := &server_details.ServerDetails{Name: "server2"} server1 := &agent.Server{Name: "server2"}
server2 := &server_details.ServerDetails{Name: "server3"} server2 := &agent.Server{Name: "server3"}
sc.servers = append(sc.servers, server0, server1, server2) l.servers = append(l.servers, server0, server1, server2)
sm.saveServerConfig(sc) m.saveServerList(l)
sc = sm.getServerConfig() l = m.getServerList()
if len(sc.servers) != 3 { if len(l.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers)) t.Fatalf("server length incorrect: %d/3", len(l.servers))
} }
if sc.servers[0] != server0 && if l.servers[0] != server0 &&
sc.servers[1] != server1 && l.servers[1] != server1 &&
sc.servers[2] != server2 { l.servers[2] != server2 {
t.Fatalf("initial server ordering not correct") t.Fatalf("initial server ordering not correct")
} }
sc.servers = sc.cycleServer() l.servers = l.cycleServer()
if len(sc.servers) != 3 { if len(l.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers)) t.Fatalf("server length incorrect: %d/3", len(l.servers))
} }
if sc.servers[0] != server1 && if l.servers[0] != server1 &&
sc.servers[1] != server2 && l.servers[1] != server2 &&
sc.servers[2] != server0 { l.servers[2] != server0 {
t.Fatalf("server ordering after one cycle not correct") t.Fatalf("server ordering after one cycle not correct")
} }
sc.servers = sc.cycleServer() l.servers = l.cycleServer()
if len(sc.servers) != 3 { if len(l.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers)) t.Fatalf("server length incorrect: %d/3", len(l.servers))
} }
if sc.servers[0] != server2 && if l.servers[0] != server2 &&
sc.servers[1] != server0 && l.servers[1] != server0 &&
sc.servers[2] != server1 { l.servers[2] != server1 {
t.Fatalf("server ordering after two cycles not correct") t.Fatalf("server ordering after two cycles not correct")
} }
sc.servers = sc.cycleServer() l.servers = l.cycleServer()
if len(sc.servers) != 3 { if len(l.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers)) t.Fatalf("server length incorrect: %d/3", len(l.servers))
} }
if sc.servers[0] != server0 && if l.servers[0] != server0 &&
sc.servers[1] != server1 && l.servers[1] != server1 &&
sc.servers[2] != server2 { l.servers[2] != server2 {
t.Fatalf("server ordering after three cycles not correct") t.Fatalf("server ordering after three cycles not correct")
} }
} }
// func (sm *ServerManager) getServerConfig() serverConfig { // func (m *Manager) getServerList() serverList {
func TestServerManagerInternal_getServerConfig(t *testing.T) { func TestManagerInternal_getServerList(t *testing.T) {
sm := testServerManager() m := testManager()
sc := sm.getServerConfig() l := m.getServerList()
if sc.servers == nil { if l.servers == nil {
t.Fatalf("serverConfig.servers nil") t.Fatalf("serverList.servers nil")
} }
if len(sc.servers) != 0 { if len(l.servers) != 0 {
t.Fatalf("serverConfig.servers length not zero") t.Fatalf("serverList.servers length not zero")
} }
} }
// func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) { // func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (m *Manager) {
func TestServerManagerInternal_New(t *testing.T) { func TestManagerInternal_New(t *testing.T) {
sm := testServerManager() m := testManager()
if sm == nil { if m == nil {
t.Fatalf("ServerManager nil") t.Fatalf("Manager nil")
} }
if sm.clusterInfo == nil { if m.clusterInfo == nil {
t.Fatalf("ServerManager.clusterInfo nil") t.Fatalf("Manager.clusterInfo nil")
} }
if sm.logger == nil { if m.logger == nil {
t.Fatalf("ServerManager.logger nil") t.Fatalf("Manager.logger nil")
} }
if sm.shutdownCh == nil { if m.shutdownCh == nil {
t.Fatalf("ServerManager.shutdownCh nil") t.Fatalf("Manager.shutdownCh nil")
} }
} }
// func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { // func (m *Manager) reconcileServerList(l *serverList) bool {
func TestServerManagerInternal_reconcileServerList(t *testing.T) { func TestManagerInternal_reconcileServerList(t *testing.T) {
tests := []int{0, 1, 2, 3, 4, 5, 10, 100} tests := []int{0, 1, 2, 3, 4, 5, 10, 100}
for _, n := range tests { for _, n := range tests {
ok, err := test_reconcileServerList(n) ok, err := test_reconcileServerList(n)
@ -164,22 +164,22 @@ func test_reconcileServerList(maxServers int) (bool, error) {
// missing, the added have been added, and the original server is // missing, the added have been added, and the original server is
// present. // present.
const failPct = 0.5 const failPct = 0.5
sm := testServerManagerFailProb(failPct) m := testManagerFailProb(failPct)
var failedServers, healthyServers []*server_details.ServerDetails var failedServers, healthyServers []*agent.Server
for i := 0; i < maxServers; i++ { for i := 0; i < maxServers; i++ {
nodeName := fmt.Sprintf("s%02d", i) nodeName := fmt.Sprintf("s%02d", i)
node := &server_details.ServerDetails{Name: nodeName} node := &agent.Server{Name: nodeName}
// Add 66% of servers to ServerManager // Add 66% of servers to Manager
if rand.Float64() > 0.33 { if rand.Float64() > 0.33 {
sm.AddServer(node) m.AddServer(node)
// Of healthy servers, (ab)use connPoolPinger to // Of healthy servers, (ab)use connPoolPinger to
// failPct of the servers for the reconcile. This // failPct of the servers for the reconcile. This
// allows for the selected server to no longer be // allows for the selected server to no longer be
// healthy for the reconcile below. // healthy for the reconcile below.
if ok, _ := sm.connPoolPinger.PingConsulServer(node); ok { if ok, _ := m.connPoolPinger.PingConsulServer(node); ok {
// Will still be present // Will still be present
healthyServers = append(healthyServers, node) healthyServers = append(healthyServers, node)
} else { } else {
@ -192,9 +192,9 @@ func test_reconcileServerList(maxServers int) (bool, error) {
} }
} }
// Randomize ServerManager's server list // Randomize Manager's server list
sm.RebalanceServers() m.RebalanceServers()
selectedServer := sm.FindServer() selectedServer := m.FindServer()
var selectedServerFailed bool var selectedServerFailed bool
for _, s := range failedServers { for _, s := range failedServers {
@ -204,39 +204,39 @@ func test_reconcileServerList(maxServers int) (bool, error) {
} }
} }
// Update ServerManager's server list to be "healthy" based on Serf. // Update Manager's server list to be "healthy" based on Serf.
// Reconcile this with origServers, which is shuffled and has a live // Reconcile this with origServers, which is shuffled and has a live
// connection, but possibly out of date. // connection, but possibly out of date.
origServers := sm.getServerConfig() origServers := m.getServerList()
sm.saveServerConfig(serverConfig{servers: healthyServers}) m.saveServerList(serverList{servers: healthyServers})
// This should always succeed with non-zero server lists // This should always succeed with non-zero server lists
if !selectedServerFailed && !sm.reconcileServerList(&origServers) && if !selectedServerFailed && !m.reconcileServerList(&origServers) &&
len(sm.getServerConfig().servers) != 0 && len(m.getServerList().servers) != 0 &&
len(origServers.servers) != 0 { len(origServers.servers) != 0 {
// If the random gods are unfavorable and we end up with zero // If the random gods are unfavorable and we end up with zero
// length lists, expect things to fail and retry the test. // length lists, expect things to fail and retry the test.
return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d", return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d",
selectedServerFailed, selectedServerFailed,
len(sm.getServerConfig().servers), len(m.getServerList().servers),
len(origServers.servers)) len(origServers.servers))
} }
// If we have zero-length server lists, test succeeded in degenerate // If we have zero-length server lists, test succeeded in degenerate
// case. // case.
if len(sm.getServerConfig().servers) == 0 && if len(m.getServerList().servers) == 0 &&
len(origServers.servers) == 0 { len(origServers.servers) == 0 {
// Failed as expected w/ zero length list // Failed as expected w/ zero length list
return true, nil return true, nil
} }
resultingServerMap := make(map[server_details.Key]bool) resultingServerMap := make(map[agent.Key]bool)
for _, s := range sm.getServerConfig().servers { for _, s := range m.getServerList().servers {
resultingServerMap[*s.Key()] = true resultingServerMap[*s.Key()] = true
} }
// Test to make sure no failed servers are in the ServerManager's // Test to make sure no failed servers are in the Manager's
// list. Error if there are any failedServers in sc.servers // list. Error if there are any failedServers in l.servers
for _, s := range failedServers { for _, s := range failedServers {
_, ok := resultingServerMap[*s.Key()] _, ok := resultingServerMap[*s.Key()]
if ok { if ok {
@ -245,7 +245,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
} }
// Test to make sure all healthy servers are in the healthy list. // Test to make sure all healthy servers are in the healthy list.
if len(healthyServers) != len(sm.getServerConfig().servers) { if len(healthyServers) != len(m.getServerList().servers) {
return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers)) return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers))
} }
@ -259,8 +259,8 @@ func test_reconcileServerList(maxServers int) (bool, error) {
return true, nil return true, nil
} }
// func (sc *serverConfig) refreshServerRebalanceTimer() { // func (l *serverList) refreshServerRebalanceTimer() {
func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
type clusterSizes struct { type clusterSizes struct {
numNodes int numNodes int
numServers int numServers int
@ -299,54 +299,54 @@ func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
shutdownCh := make(chan struct{}) shutdownCh := make(chan struct{})
for _, s := range clusters { for _, s := range clusters {
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
for i := 0; i < s.numServers; i++ { for i := 0; i < s.numServers; i++ {
nodeName := fmt.Sprintf("s%02d", i) nodeName := fmt.Sprintf("s%02d", i)
sm.AddServer(&server_details.ServerDetails{Name: nodeName}) m.AddServer(&agent.Server{Name: nodeName})
} }
d := sm.refreshServerRebalanceTimer() d := m.refreshServerRebalanceTimer()
if d < s.minRebalance { if d < s.minRebalance {
t.Errorf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance) t.Errorf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance)
} }
} }
} }
// func (sm *ServerManager) saveServerConfig(sc serverConfig) { // func (m *Manager) saveServerList(l serverList) {
func TestServerManagerInternal_saveServerConfig(t *testing.T) { func TestManagerInternal_saveServerList(t *testing.T) {
sm := testServerManager() m := testManager()
// Initial condition // Initial condition
func() { func() {
sc := sm.getServerConfig() l := m.getServerList()
if len(sc.servers) != 0 { if len(l.servers) != 0 {
t.Fatalf("ServerManager.saveServerConfig failed to load init config") t.Fatalf("Manager.saveServerList failed to load init config")
} }
newServer := new(server_details.ServerDetails) newServer := new(agent.Server)
sc.servers = append(sc.servers, newServer) l.servers = append(l.servers, newServer)
sm.saveServerConfig(sc) m.saveServerList(l)
}() }()
// Test that save works // Test that save works
func() { func() {
sc1 := sm.getServerConfig() l1 := m.getServerList()
t1NumServers := len(sc1.servers) t1NumServers := len(l1.servers)
if t1NumServers != 1 { if t1NumServers != 1 {
t.Fatalf("ServerManager.saveServerConfig failed to save mutated config") t.Fatalf("Manager.saveServerList failed to save mutated config")
} }
}() }()
// Verify mutation w/o a save doesn't alter the original // Verify mutation w/o a save doesn't alter the original
func() { func() {
newServer := new(server_details.ServerDetails) newServer := new(agent.Server)
sc := sm.getServerConfig() l := m.getServerList()
sc.servers = append(sc.servers, newServer) l.servers = append(l.servers, newServer)
sc_orig := sm.getServerConfig() l_orig := m.getServerList()
origNumServers := len(sc_orig.servers) origNumServers := len(l_orig.servers)
if origNumServers >= len(sc.servers) { if origNumServers >= len(l.servers) {
t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original") t.Fatalf("Manager.saveServerList unsaved config overwrote original")
} }
}() }()
} }

View File

@ -1,4 +1,4 @@
package server_manager_test package servers_test
import ( import (
"bytes" "bytes"
@ -9,8 +9,8 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/server_manager" "github.com/hashicorp/consul/consul/servers"
) )
var ( var (
@ -32,7 +32,7 @@ type fauxConnPool struct {
failPct float64 failPct float64
} }
func (cp *fauxConnPool) PingConsulServer(server *server_details.ServerDetails) (bool, error) { func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) {
var success bool var success bool
successProb := rand.Float64() successProb := rand.Float64()
if successProb > cp.failPct { if successProb > cp.failPct {
@ -48,66 +48,66 @@ func (s *fauxSerf) NumNodes() int {
return 16384 return 16384
} }
func testServerManager() (sm *server_manager.ServerManager) { func testManager() (m *servers.Manager) {
logger := GetBufferedLogger() logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags) logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{}) shutdownCh := make(chan struct{})
sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
return sm return m
} }
func testServerManagerFailProb(failPct float64) (sm *server_manager.ServerManager) { func testManagerFailProb(failPct float64) (m *servers.Manager) {
logger := GetBufferedLogger() logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags) logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{}) shutdownCh := make(chan struct{})
sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
return sm return m
} }
// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { // func (m *Manager) AddServer(server *agent.Server) {
func TestServerManager_AddServer(t *testing.T) { func TestServers_AddServer(t *testing.T) {
sm := testServerManager() m := testManager()
var num int var num int
num = sm.NumServers() num = m.NumServers()
if num != 0 { if num != 0 {
t.Fatalf("Expected zero servers to start") t.Fatalf("Expected zero servers to start")
} }
s1 := &server_details.ServerDetails{Name: "s1"} s1 := &agent.Server{Name: "s1"}
sm.AddServer(s1) m.AddServer(s1)
num = sm.NumServers() num = m.NumServers()
if num != 1 { if num != 1 {
t.Fatalf("Expected one server") t.Fatalf("Expected one server")
} }
sm.AddServer(s1) m.AddServer(s1)
num = sm.NumServers() num = m.NumServers()
if num != 1 { if num != 1 {
t.Fatalf("Expected one server (still)") t.Fatalf("Expected one server (still)")
} }
s2 := &server_details.ServerDetails{Name: "s2"} s2 := &agent.Server{Name: "s2"}
sm.AddServer(s2) m.AddServer(s2)
num = sm.NumServers() num = m.NumServers()
if num != 2 { if num != 2 {
t.Fatalf("Expected two servers") t.Fatalf("Expected two servers")
} }
} }
// func (sm *ServerManager) FindServer() (server *server_details.ServerDetails) { // func (m *Manager) FindServer() (server *agent.Server) {
func TestServerManager_FindServer(t *testing.T) { func TestServers_FindServer(t *testing.T) {
sm := testServerManager() m := testManager()
if sm.FindServer() != nil { if m.FindServer() != nil {
t.Fatalf("Expected nil return") t.Fatalf("Expected nil return")
} }
sm.AddServer(&server_details.ServerDetails{Name: "s1"}) m.AddServer(&agent.Server{Name: "s1"})
if sm.NumServers() != 1 { if m.NumServers() != 1 {
t.Fatalf("Expected one server") t.Fatalf("Expected one server")
} }
s1 := sm.FindServer() s1 := m.FindServer()
if s1 == nil { if s1 == nil {
t.Fatalf("Expected non-nil server") t.Fatalf("Expected non-nil server")
} }
@ -115,118 +115,118 @@ func TestServerManager_FindServer(t *testing.T) {
t.Fatalf("Expected s1 server") t.Fatalf("Expected s1 server")
} }
s1 = sm.FindServer() s1 = m.FindServer()
if s1 == nil || s1.Name != "s1" { if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server (still)") t.Fatalf("Expected s1 server (still)")
} }
sm.AddServer(&server_details.ServerDetails{Name: "s2"}) m.AddServer(&agent.Server{Name: "s2"})
if sm.NumServers() != 2 { if m.NumServers() != 2 {
t.Fatalf("Expected two servers") t.Fatalf("Expected two servers")
} }
s1 = sm.FindServer() s1 = m.FindServer()
if s1 == nil || s1.Name != "s1" { if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server (still)") t.Fatalf("Expected s1 server (still)")
} }
sm.NotifyFailedServer(s1) m.NotifyFailedServer(s1)
s2 := sm.FindServer() s2 := m.FindServer()
if s2 == nil || s2.Name != "s2" { if s2 == nil || s2.Name != "s2" {
t.Fatalf("Expected s2 server") t.Fatalf("Expected s2 server")
} }
sm.NotifyFailedServer(s2) m.NotifyFailedServer(s2)
s1 = sm.FindServer() s1 = m.FindServer()
if s1 == nil || s1.Name != "s1" { if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server") t.Fatalf("Expected s1 server")
} }
} }
// func New(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) { // func New(logger *log.Logger, shutdownCh chan struct{}) (m *Manager) {
func TestServerManager_New(t *testing.T) { func TestServers_New(t *testing.T) {
logger := GetBufferedLogger() logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags) logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{}) shutdownCh := make(chan struct{})
sm := server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) m := servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
if sm == nil { if m == nil {
t.Fatalf("ServerManager nil") t.Fatalf("Manager nil")
} }
} }
// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { // func (m *Manager) NotifyFailedServer(server *agent.Server) {
func TestServerManager_NotifyFailedServer(t *testing.T) { func TestServers_NotifyFailedServer(t *testing.T) {
sm := testServerManager() m := testManager()
if sm.NumServers() != 0 { if m.NumServers() != 0 {
t.Fatalf("Expected zero servers to start") t.Fatalf("Expected zero servers to start")
} }
s1 := &server_details.ServerDetails{Name: "s1"} s1 := &agent.Server{Name: "s1"}
s2 := &server_details.ServerDetails{Name: "s2"} s2 := &agent.Server{Name: "s2"}
// Try notifying for a server that is not part of the server manager // Try notifying for a server that is not managed by Manager
sm.NotifyFailedServer(s1) m.NotifyFailedServer(s1)
if sm.NumServers() != 0 { if m.NumServers() != 0 {
t.Fatalf("Expected zero servers to start") t.Fatalf("Expected zero servers to start")
} }
sm.AddServer(s1) m.AddServer(s1)
// Test again w/ a server not in the list // Test again w/ a server not in the list
sm.NotifyFailedServer(s2) m.NotifyFailedServer(s2)
if sm.NumServers() != 1 { if m.NumServers() != 1 {
t.Fatalf("Expected one server") t.Fatalf("Expected one server")
} }
sm.AddServer(s2) m.AddServer(s2)
if sm.NumServers() != 2 { if m.NumServers() != 2 {
t.Fatalf("Expected two servers") t.Fatalf("Expected two servers")
} }
s1 = sm.FindServer() s1 = m.FindServer()
if s1 == nil || s1.Name != "s1" { if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server") t.Fatalf("Expected s1 server")
} }
sm.NotifyFailedServer(s2) m.NotifyFailedServer(s2)
s1 = sm.FindServer() s1 = m.FindServer()
if s1 == nil || s1.Name != "s1" { if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server (still)") t.Fatalf("Expected s1 server (still)")
} }
sm.NotifyFailedServer(s1) m.NotifyFailedServer(s1)
s2 = sm.FindServer() s2 = m.FindServer()
if s2 == nil || s2.Name != "s2" { if s2 == nil || s2.Name != "s2" {
t.Fatalf("Expected s2 server") t.Fatalf("Expected s2 server")
} }
sm.NotifyFailedServer(s2) m.NotifyFailedServer(s2)
s1 = sm.FindServer() s1 = m.FindServer()
if s1 == nil || s1.Name != "s1" { if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server") t.Fatalf("Expected s1 server")
} }
} }
// func (sm *ServerManager) NumServers() (numServers int) { // func (m *Manager) NumServers() (numServers int) {
func TestServerManager_NumServers(t *testing.T) { func TestServers_NumServers(t *testing.T) {
sm := testServerManager() m := testManager()
var num int var num int
num = sm.NumServers() num = m.NumServers()
if num != 0 { if num != 0 {
t.Fatalf("Expected zero servers to start") t.Fatalf("Expected zero servers to start")
} }
s := &server_details.ServerDetails{} s := &agent.Server{}
sm.AddServer(s) m.AddServer(s)
num = sm.NumServers() num = m.NumServers()
if num != 1 { if num != 1 {
t.Fatalf("Expected one server after AddServer") t.Fatalf("Expected one server after AddServer")
} }
} }
// func (sm *ServerManager) RebalanceServers() { // func (m *Manager) RebalanceServers() {
func TestServerManager_RebalanceServers(t *testing.T) { func TestServers_RebalanceServers(t *testing.T) {
const failPct = 0.5 const failPct = 0.5
sm := testServerManagerFailProb(failPct) m := testManagerFailProb(failPct)
const maxServers = 100 const maxServers = 100
const numShuffleTests = 100 const numShuffleTests = 100
const uniquePassRate = 0.5 const uniquePassRate = 0.5
@ -234,18 +234,18 @@ func TestServerManager_RebalanceServers(t *testing.T) {
// Make a huge list of nodes. // Make a huge list of nodes.
for i := 0; i < maxServers; i++ { for i := 0; i < maxServers; i++ {
nodeName := fmt.Sprintf("s%02d", i) nodeName := fmt.Sprintf("s%02d", i)
sm.AddServer(&server_details.ServerDetails{Name: nodeName}) m.AddServer(&agent.Server{Name: nodeName})
} }
// Keep track of how many unique shuffles we get. // Keep track of how many unique shuffles we get.
uniques := make(map[string]struct{}, maxServers) uniques := make(map[string]struct{}, maxServers)
for i := 0; i < numShuffleTests; i++ { for i := 0; i < numShuffleTests; i++ {
sm.RebalanceServers() m.RebalanceServers()
var names []string var names []string
for j := 0; j < maxServers; j++ { for j := 0; j < maxServers; j++ {
server := sm.FindServer() server := m.FindServer()
sm.NotifyFailedServer(server) m.NotifyFailedServer(server)
names = append(names, server.Name) names = append(names, server.Name)
} }
key := strings.Join(names, "|") key := strings.Join(names, "|")
@ -260,48 +260,48 @@ func TestServerManager_RebalanceServers(t *testing.T) {
} }
} }
// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { // func (m *Manager) RemoveServer(server *agent.Server) {
func TestServerManager_RemoveServer(t *testing.T) { func TestManager_RemoveServer(t *testing.T) {
const nodeNameFmt = "s%02d" const nodeNameFmt = "s%02d"
sm := testServerManager() m := testManager()
if sm.NumServers() != 0 { if m.NumServers() != 0 {
t.Fatalf("Expected zero servers to start") t.Fatalf("Expected zero servers to start")
} }
// Test removing server before its added // Test removing server before its added
nodeName := fmt.Sprintf(nodeNameFmt, 1) nodeName := fmt.Sprintf(nodeNameFmt, 1)
s1 := &server_details.ServerDetails{Name: nodeName} s1 := &agent.Server{Name: nodeName}
sm.RemoveServer(s1) m.RemoveServer(s1)
sm.AddServer(s1) m.AddServer(s1)
nodeName = fmt.Sprintf(nodeNameFmt, 2) nodeName = fmt.Sprintf(nodeNameFmt, 2)
s2 := &server_details.ServerDetails{Name: nodeName} s2 := &agent.Server{Name: nodeName}
sm.RemoveServer(s2) m.RemoveServer(s2)
sm.AddServer(s2) m.AddServer(s2)
const maxServers = 19 const maxServers = 19
servers := make([]*server_details.ServerDetails, maxServers) servers := make([]*agent.Server, maxServers)
// Already added two servers above // Already added two servers above
for i := maxServers; i > 2; i-- { for i := maxServers; i > 2; i-- {
nodeName := fmt.Sprintf(nodeNameFmt, i) nodeName := fmt.Sprintf(nodeNameFmt, i)
server := &server_details.ServerDetails{Name: nodeName} server := &agent.Server{Name: nodeName}
servers = append(servers, server) servers = append(servers, server)
sm.AddServer(server) m.AddServer(server)
} }
if sm.NumServers() != maxServers { if m.NumServers() != maxServers {
t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers()) t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers())
} }
sm.RebalanceServers() m.RebalanceServers()
if sm.NumServers() != maxServers { if m.NumServers() != maxServers {
t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers()) t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers())
} }
findServer := func(server *server_details.ServerDetails) bool { findServer := func(server *agent.Server) bool {
for i := sm.NumServers(); i > 0; i-- { for i := m.NumServers(); i > 0; i-- {
s := sm.FindServer() s := m.FindServer()
if s == server { if s == server {
return true return true
} }
@ -310,18 +310,18 @@ func TestServerManager_RemoveServer(t *testing.T) {
} }
expectedNumServers := maxServers expectedNumServers := maxServers
removedServers := make([]*server_details.ServerDetails, 0, maxServers) removedServers := make([]*agent.Server, 0, maxServers)
// Remove servers from the front of the list // Remove servers from the front of the list
for i := 3; i > 0; i-- { for i := 3; i > 0; i-- {
server := sm.FindServer() server := m.FindServer()
if server == nil { if server == nil {
t.Fatalf("FindServer returned nil") t.Fatalf("FindServer returned nil")
} }
sm.RemoveServer(server) m.RemoveServer(server)
expectedNumServers-- expectedNumServers--
if sm.NumServers() != expectedNumServers { if m.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers())
} }
if findServer(server) == true { if findServer(server) == true {
t.Fatalf("Did not expect to find server %s after removal from the front", server.Name) t.Fatalf("Did not expect to find server %s after removal from the front", server.Name)
@ -331,12 +331,12 @@ func TestServerManager_RemoveServer(t *testing.T) {
// Remove server from the end of the list // Remove server from the end of the list
for i := 3; i > 0; i-- { for i := 3; i > 0; i-- {
server := sm.FindServer() server := m.FindServer()
sm.NotifyFailedServer(server) m.NotifyFailedServer(server)
sm.RemoveServer(server) m.RemoveServer(server)
expectedNumServers-- expectedNumServers--
if sm.NumServers() != expectedNumServers { if m.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers())
} }
if findServer(server) == true { if findServer(server) == true {
t.Fatalf("Did not expect to find server %s", server.Name) t.Fatalf("Did not expect to find server %s", server.Name)
@ -346,15 +346,15 @@ func TestServerManager_RemoveServer(t *testing.T) {
// Remove server from the middle of the list // Remove server from the middle of the list
for i := 3; i > 0; i-- { for i := 3; i > 0; i-- {
server := sm.FindServer() server := m.FindServer()
sm.NotifyFailedServer(server) m.NotifyFailedServer(server)
server2 := sm.FindServer() server2 := m.FindServer()
sm.NotifyFailedServer(server2) // server2 now at end of the list m.NotifyFailedServer(server2) // server2 now at end of the list
sm.RemoveServer(server) m.RemoveServer(server)
expectedNumServers-- expectedNumServers--
if sm.NumServers() != expectedNumServers { if m.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers())
} }
if findServer(server) == true { if findServer(server) == true {
t.Fatalf("Did not expect to find server %s", server.Name) t.Fatalf("Did not expect to find server %s", server.Name)
@ -362,21 +362,21 @@ func TestServerManager_RemoveServer(t *testing.T) {
removedServers = append(removedServers, server) removedServers = append(removedServers, server)
} }
if sm.NumServers()+len(removedServers) != maxServers { if m.NumServers()+len(removedServers) != maxServers {
t.Fatalf("Expected %d+%d=%d servers", sm.NumServers(), len(removedServers), maxServers) t.Fatalf("Expected %d+%d=%d servers", m.NumServers(), len(removedServers), maxServers)
} }
// Drain the remaining servers from the middle // Drain the remaining servers from the middle
for i := sm.NumServers(); i > 0; i-- { for i := m.NumServers(); i > 0; i-- {
server := sm.FindServer() server := m.FindServer()
sm.NotifyFailedServer(server) m.NotifyFailedServer(server)
server2 := sm.FindServer() server2 := m.FindServer()
sm.NotifyFailedServer(server2) // server2 now at end of the list m.NotifyFailedServer(server2) // server2 now at end of the list
sm.RemoveServer(server) m.RemoveServer(server)
removedServers = append(removedServers, server) removedServers = append(removedServers, server)
} }
if sm.NumServers() != 0 { if m.NumServers() != 0 {
t.Fatalf("Expected an empty server list") t.Fatalf("Expected an empty server list")
} }
if len(removedServers) != maxServers { if len(removedServers) != maxServers {
@ -384,4 +384,4 @@ func TestServerManager_RemoveServer(t *testing.T) {
} }
} }
// func (sm *ServerManager) Start() { // func (m *Manager) Start() {