diff --git a/consul/server_details/server_details.go b/consul/agent/server.go similarity index 67% rename from consul/server_details/server_details.go rename to consul/agent/server.go index 4f3dc794a6..8ba1f0c5da 100644 --- a/consul/server_details/server_details.go +++ b/consul/agent/server.go @@ -1,4 +1,9 @@ -package server_details +// Package agent provides a logical endpoint for Consul agents in the +// network. agent data originates from Serf gossip and is primarily used to +// communicate Consul server information. Gossiped information that ends up +// in Server contains the necessary metadata required for servers.Manager to +// select which server an RPC request should be routed to. +package agent import ( "fmt" @@ -18,8 +23,8 @@ func (k *Key) Equal(x *Key) bool { return k.name == x.name } -// ServerDetails is used to return details of a consul server -type ServerDetails struct { +// Server is used to return details of a consul server +type Server struct { Name string Datacenter string Port int @@ -30,14 +35,14 @@ type ServerDetails struct { } // Key returns the corresponding Key -func (s *ServerDetails) Key() *Key { +func (s *Server) Key() *Key { return &Key{ name: s.Name, } } -// String returns a string representation of ServerDetails -func (s *ServerDetails) String() string { +// String returns a string representation of Server +func (s *Server) String() string { var addrStr, networkStr string if s.Addr != nil { addrStr = s.Addr.String() @@ -47,9 +52,9 @@ func (s *ServerDetails) String() string { return fmt.Sprintf("%s (Addr: %s/%s) (DC: %s)", s.Name, networkStr, addrStr, s.Datacenter) } -// IsConsulServer returns true if a serf member is a consul server. Returns a -// bool and a pointer to the ServerDetails. -func IsConsulServer(m serf.Member) (bool, *ServerDetails) { +// IsConsulServer returns true if a serf member is a consul server +// agent. Returns a bool and a pointer to the Server. +func IsConsulServer(m serf.Member) (bool, *Server) { if m.Tags["role"] != "consul" { return false, nil } @@ -81,7 +86,7 @@ func IsConsulServer(m serf.Member) (bool, *ServerDetails) { addr := &net.TCPAddr{IP: m.Addr, Port: port} - parts := &ServerDetails{ + parts := &Server{ Name: m.Name, Datacenter: datacenter, Port: port, diff --git a/consul/server_details/server_details_internal_test.go b/consul/agent/server_internal_test.go similarity index 86% rename from consul/server_details/server_details_internal_test.go rename to consul/agent/server_internal_test.go index ee09d8f57f..3b4f3d7e1f 100644 --- a/consul/server_details/server_details_internal_test.go +++ b/consul/agent/server_internal_test.go @@ -1,10 +1,10 @@ -package server_details +package agent import ( "testing" ) -func TestServerDetails_Key_Equal(t *testing.T) { +func TestServer_Key_Equal(t *testing.T) { tests := []struct { name string k1 *Key @@ -47,16 +47,16 @@ func TestServerDetails_Key_Equal(t *testing.T) { } } -func TestServerDetails_Key(t *testing.T) { +func TestServer_Key(t *testing.T) { tests := []struct { name string - sd *ServerDetails + sd *Server k *Key equal bool }{ { name: "Key equality", - sd: &ServerDetails{ + sd: &Server{ Name: "s1", }, k: &Key{ @@ -66,7 +66,7 @@ func TestServerDetails_Key(t *testing.T) { }, { name: "Key inequality", - sd: &ServerDetails{ + sd: &Server{ Name: "s1", }, k: &Key{ diff --git a/consul/server_details/server_details_test.go b/consul/agent/server_test.go similarity index 79% rename from consul/server_details/server_details_test.go rename to consul/agent/server_test.go index b16cbf5a0a..0dda84f851 100644 --- a/consul/server_details/server_details_test.go +++ b/consul/agent/server_test.go @@ -1,32 +1,32 @@ -package server_details_test +package agent_test import ( "net" "testing" - "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/serf/serf" ) -func TestServerDetails_Key_params(t *testing.T) { +func TestServer_Key_params(t *testing.T) { ipv4a := net.ParseIP("127.0.0.1") ipv4b := net.ParseIP("1.2.3.4") tests := []struct { name string - sd1 *server_details.ServerDetails - sd2 *server_details.ServerDetails + sd1 *agent.Server + sd2 *agent.Server equal bool }{ { name: "Addr inequality", - sd1: &server_details.ServerDetails{ + sd1: &agent.Server{ Name: "s1", Datacenter: "dc1", Port: 8300, Addr: &net.IPAddr{IP: ipv4a}, }, - sd2: &server_details.ServerDetails{ + sd2: &agent.Server{ Name: "s1", Datacenter: "dc1", Port: 8300, @@ -42,7 +42,7 @@ func TestServerDetails_Key_params(t *testing.T) { } // Test Key to make sure it actually works as a key - m := make(map[server_details.Key]bool) + m := make(map[agent.Key]bool) m[*test.sd1.Key()] = true if _, found := m[*test.sd2.Key()]; found != test.equal { t.Errorf("Expected a %v result from map test %s", test.equal, test.name) @@ -61,7 +61,7 @@ func TestIsConsulServer(t *testing.T) { "vsn": "1", }, } - ok, parts := server_details.IsConsulServer(m) + ok, parts := agent.IsConsulServer(m) if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { t.Fatalf("bad: %v %v", ok, parts) } @@ -76,7 +76,7 @@ func TestIsConsulServer(t *testing.T) { } m.Tags["bootstrap"] = "1" m.Tags["disabled"] = "1" - ok, parts = server_details.IsConsulServer(m) + ok, parts = agent.IsConsulServer(m) if !ok { t.Fatalf("expected a valid consul server") } @@ -92,7 +92,7 @@ func TestIsConsulServer(t *testing.T) { m.Tags["expect"] = "3" delete(m.Tags, "bootstrap") delete(m.Tags, "disabled") - ok, parts = server_details.IsConsulServer(m) + ok, parts = agent.IsConsulServer(m) if !ok || parts.Expect != 3 { t.Fatalf("bad: %v", parts.Expect) } @@ -101,7 +101,7 @@ func TestIsConsulServer(t *testing.T) { } delete(m.Tags, "role") - ok, parts = server_details.IsConsulServer(m) + ok, parts = agent.IsConsulServer(m) if ok { t.Fatalf("unexpected ok server") } diff --git a/consul/client.go b/consul/client.go index 82a4bd7fac..bd5132c663 100644 --- a/consul/client.go +++ b/consul/client.go @@ -10,8 +10,8 @@ import ( "sync" "time" - "github.com/hashicorp/consul/consul/server_details" - "github.com/hashicorp/consul/consul/server_manager" + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/servers" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" @@ -58,9 +58,9 @@ type Client struct { // Connection pool to consul servers connPool *ConnPool - // serverMgr is responsible for the selection and maintenance of + // servers is responsible for the selection and maintenance of // Consul servers this agent uses for RPC requests - serverMgr *server_manager.ServerManager + servers *servers.Manager // eventCh is used to receive events from the // serf cluster in the datacenter @@ -130,9 +130,9 @@ func NewClient(config *Config) (*Client, error) { return nil, fmt.Errorf("Failed to start lan serf: %v", err) } - // Start maintenance task for server_manager - c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf, c.connPool) - go c.serverMgr.Start() + // Start maintenance task for servers + c.servers = servers.New(c.logger, c.shutdownCh, c.serf, c.connPool) + go c.servers.Start() return c, nil } @@ -261,7 +261,7 @@ func (c *Client) lanEventHandler() { // nodeJoin is used to handle join events on the serf cluster func (c *Client) nodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := server_details.IsConsulServer(m) + ok, parts := agent.IsConsulServer(m) if !ok { continue } @@ -271,7 +271,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { continue } c.logger.Printf("[INFO] consul: adding server %s", parts) - c.serverMgr.AddServer(parts) + c.servers.AddServer(parts) // Trigger the callback if c.config.ServerUp != nil { @@ -283,12 +283,12 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { // nodeFail is used to handle fail events on the serf cluster func (c *Client) nodeFail(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := server_details.IsConsulServer(m) + ok, parts := agent.IsConsulServer(m) if !ok { continue } c.logger.Printf("[INFO] consul: removing server %s", parts) - c.serverMgr.RemoveServer(parts) + c.servers.RemoveServer(parts) } } @@ -322,14 +322,14 @@ func (c *Client) localEvent(event serf.UserEvent) { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { - server := c.serverMgr.FindServer() + server := c.servers.FindServer() if server == nil { return structs.ErrNoServers } // Forward to remote Consul if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { - c.serverMgr.NotifyFailedServer(server) + c.servers.NotifyFailedServer(server) c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) return err } @@ -340,7 +340,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // Stats is used to return statistics for debugging and insight // for various sub-systems func (c *Client) Stats() map[string]map[string]string { - numServers := c.serverMgr.NumServers() + numServers := c.servers.NumServers() toString := func(v uint64) string { return strconv.FormatUint(v, 10) diff --git a/consul/client_test.go b/consul/client_test.go index 124a7fb811..3e4d7706cc 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -84,7 +84,7 @@ func TestClient_JoinLAN(t *testing.T) { t.Fatalf("err: %v", err) } testutil.WaitForResult(func() (bool, error) { - return c1.serverMgr.NumServers() == 1, nil + return c1.servers.NumServers() == 1, nil }, func(err error) { t.Fatalf("expected consul server") }) @@ -100,7 +100,7 @@ func TestClient_JoinLAN(t *testing.T) { // Check we have a new consul testutil.WaitForResult(func() (bool, error) { - return c1.serverMgr.NumServers() == 1, nil + return c1.servers.NumServers() == 1, nil }, func(err error) { t.Fatalf("expected consul server") }) @@ -270,7 +270,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { // Sleep to allow Serf to sync, shuffle, and let the shuffle complete time.Sleep(1 * time.Second) - c.serverMgr.ResetRebalanceTimer() + c.servers.ResetRebalanceTimer() time.Sleep(1 * time.Second) if len(c.LANMembers()) != numServers+numClients { @@ -286,7 +286,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { var pingCount int for range servers { time.Sleep(1 * time.Second) - s := c.serverMgr.FindServer() + s := c.servers.FindServer() ok, err := c.connPool.PingConsulServer(s) if !ok { t.Errorf("Unable to ping server %v: %s", s.String(), err) @@ -295,7 +295,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { // Artificially fail the server in order to rotate the server // list - c.serverMgr.NotifyFailedServer(s) + c.servers.NotifyFailedServer(s) } if pingCount != numServers { diff --git a/consul/leader.go b/consul/leader.go index 3f4bc14844..8aa1f19948 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -8,7 +8,7 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -350,7 +350,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { return true } - if valid, parts := server_details.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { + if valid, parts := agent.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { return true } return false @@ -361,7 +361,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { func (s *Server) handleAliveMember(member serf.Member) error { // Register consul service if a server var service *structs.NodeService - if valid, parts := server_details.IsConsulServer(member); valid { + if valid, parts := agent.IsConsulServer(member); valid { service = &structs.NodeService{ ID: ConsulServiceID, Service: ConsulServiceName, @@ -497,7 +497,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error } // Remove from Raft peers if this was a server - if valid, parts := server_details.IsConsulServer(member); valid { + if valid, parts := agent.IsConsulServer(member); valid { if err := s.removeConsulServer(member, parts.Port); err != nil { return err } @@ -524,7 +524,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error } // joinConsulServer is used to try to join another consul server -func (s *Server) joinConsulServer(m serf.Member, parts *server_details.ServerDetails) error { +func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { // Do not join ourself if m.Name == s.config.NodeName { return nil @@ -534,7 +534,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *server_details.ServerDet if parts.Bootstrap { members := s.serfLAN.Members() for _, member := range members { - valid, p := server_details.IsConsulServer(member) + valid, p := agent.IsConsulServer(member) if valid && member.Name != m.Name && p.Bootstrap { s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name) return nil diff --git a/consul/merge.go b/consul/merge.go index ed86c69843..defa7ef108 100644 --- a/consul/merge.go +++ b/consul/merge.go @@ -3,7 +3,7 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/serf/serf" ) @@ -25,7 +25,7 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { continue } - ok, parts := server_details.IsConsulServer(*m) + ok, parts := agent.IsConsulServer(*m) if ok && parts.Datacenter != md.dc { return fmt.Errorf("Member '%s' part of wrong datacenter '%s'", m.Name, parts.Datacenter) @@ -42,7 +42,7 @@ type wanMergeDelegate struct { func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error { for _, m := range members { - ok, _ := server_details.IsConsulServer(*m) + ok, _ := agent.IsConsulServer(*m) if !ok { return fmt.Errorf("Member '%s' is not a server", m.Name) } diff --git a/consul/pool.go b/consul/pool.go index cdfd2129df..fd42bf21fe 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -10,7 +10,7 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/yamux" @@ -408,7 +408,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg // PingConsulServer sends a Status.Ping message to the specified server and // returns true if healthy, false if an error occurred -func (p *ConnPool) PingConsulServer(s *server_details.ServerDetails) (bool, error) { +func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) { // Get a usable client conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version) if err != nil { diff --git a/consul/serf.go b/consul/serf.go index d9f27309c0..1fc4894001 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -4,7 +4,7 @@ import ( "net" "strings" - "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/serf/serf" ) @@ -141,7 +141,7 @@ func (s *Server) localEvent(event serf.UserEvent) { // lanNodeJoin is used to handle join events on the LAN pool. func (s *Server) lanNodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := server_details.IsConsulServer(m) + ok, parts := agent.IsConsulServer(m) if !ok { continue } @@ -164,7 +164,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) { // wanNodeJoin is used to handle join events on the WAN pool. func (s *Server) wanNodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := server_details.IsConsulServer(m) + ok, parts := agent.IsConsulServer(m) if !ok { s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name) continue @@ -210,7 +210,7 @@ func (s *Server) maybeBootstrap() { members := s.serfLAN.Members() addrs := make([]string, 0) for _, member := range members { - valid, p := server_details.IsConsulServer(member) + valid, p := agent.IsConsulServer(member) if !valid { continue } @@ -248,7 +248,7 @@ func (s *Server) maybeBootstrap() { // lanNodeFailed is used to handle fail events on the LAN pool. func (s *Server) lanNodeFailed(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := server_details.IsConsulServer(m) + ok, parts := agent.IsConsulServer(m) if !ok { continue } @@ -263,7 +263,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) { // wanNodeFailed is used to handle fail events on the WAN pool. func (s *Server) wanNodeFailed(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := server_details.IsConsulServer(m) + ok, parts := agent.IsConsulServer(m) if !ok { continue } diff --git a/consul/server.go b/consul/server.go index 347cc6f26a..4d0005f088 100644 --- a/consul/server.go +++ b/consul/server.go @@ -15,7 +15,7 @@ import ( "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/raft" @@ -98,7 +98,7 @@ type Server struct { // localConsuls is used to track the known consuls // in the local datacenter. Used to do leader forwarding. - localConsuls map[string]*server_details.ServerDetails + localConsuls map[string]*agent.Server localLock sync.RWMutex // Logger uses the provided LogOutput @@ -120,7 +120,7 @@ type Server struct { // remoteConsuls is used to track the known consuls in // remote datacenters. Used to do DC forwarding. - remoteConsuls map[string][]*server_details.ServerDetails + remoteConsuls map[string][]*agent.Server remoteLock sync.RWMutex // rpcListener is used to listen for incoming connections @@ -217,10 +217,10 @@ func NewServer(config *Config) (*Server, error) { connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), - localConsuls: make(map[string]*server_details.ServerDetails), + localConsuls: make(map[string]*agent.Server), logger: logger, reconcileCh: make(chan serf.Member, 32), - remoteConsuls: make(map[string][]*server_details.ServerDetails), + remoteConsuls: make(map[string][]*agent.Server), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, tombstoneGC: gc, diff --git a/consul/server_manager/server_manager.go b/consul/servers/manager.go similarity index 62% rename from consul/server_manager/server_manager.go rename to consul/servers/manager.go index 24844b0f91..15814697e8 100644 --- a/consul/server_manager/server_manager.go +++ b/consul/servers/manager.go @@ -1,4 +1,9 @@ -package server_manager +// Package servers provides a Manager interface for Manager managed +// agent.Server objects. The servers package manages servers from a Consul +// client's perspective (i.e. a list of servers that a client talks with for +// RPCs). The servers package does not provide any API guarantees and should +// be called only by `hashicorp/consul`. +package servers import ( "log" @@ -7,7 +12,7 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/lib" ) @@ -56,25 +61,24 @@ type ConsulClusterInfo interface { // Pinger is an interface wrapping client.ConnPool to prevent a // cyclic import dependency type Pinger interface { - PingConsulServer(server *server_details.ServerDetails) (bool, error) + PingConsulServer(s *agent.Server) (bool, error) } -// serverConfig is the thread-safe configuration struct used to maintain the -// list of Consul servers in ServerManager. +// serverList is a local copy of the struct used to maintain the list of +// Consul servers used by Manager. // -// NOTE(sean@): We are explicitly relying on the fact that serverConfig will +// NOTE(sean@): We are explicitly relying on the fact that serverList will // be copied onto the stack. Please keep this structure light. -type serverConfig struct { +type serverList struct { // servers tracks the locally known servers. List membership is // maintained by Serf. - servers []*server_details.ServerDetails + servers []*agent.Server } -type ServerManager struct { - // serverConfig provides the necessary load/store semantics for the - // server list. - serverConfigValue atomic.Value - serverConfigLock sync.Mutex +type Manager struct { + // listValue manages the atomic load/store of a Manager's serverList + listValue atomic.Value + listLock sync.Mutex // rebalanceTimer controls the duration of the rebalance interval rebalanceTimer *time.Timer @@ -95,7 +99,7 @@ type ServerManager struct { connPoolPinger Pinger // notifyFailedBarrier is acts as a barrier to prevent queuing behind - // serverConfigLog and acts as a TryLock(). + // serverListLog and acts as a TryLock(). notifyFailedBarrier int32 } @@ -104,23 +108,23 @@ type ServerManager struct { // begin seeing use after the rebalance timer fires or enough servers fail // organically. If the server is already known, merge the new server // details. -func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() - sc := sm.getServerConfig() +func (m *Manager) AddServer(s *agent.Server) { + m.listLock.Lock() + defer m.listLock.Unlock() + l := m.getServerList() // Check if this server is known found := false - for idx, existing := range sc.servers { - if existing.Name == server.Name { - newServers := make([]*server_details.ServerDetails, len(sc.servers)) - copy(newServers, sc.servers) + for idx, existing := range l.servers { + if existing.Name == s.Name { + newServers := make([]*agent.Server, len(l.servers)) + copy(newServers, l.servers) // Overwrite the existing server details in order to // possibly update metadata (e.g. server version) - newServers[idx] = server + newServers[idx] = s - sc.servers = newServers + l.servers = newServers found = true break } @@ -128,53 +132,53 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { // Add to the list if not known if !found { - newServers := make([]*server_details.ServerDetails, len(sc.servers), len(sc.servers)+1) - copy(newServers, sc.servers) - newServers = append(newServers, server) - sc.servers = newServers + newServers := make([]*agent.Server, len(l.servers), len(l.servers)+1) + copy(newServers, l.servers) + newServers = append(newServers, s) + l.servers = newServers } - sm.saveServerConfig(sc) + m.saveServerList(l) } // cycleServers returns a new list of servers that has dequeued the first // server and enqueued it at the end of the list. cycleServers assumes the -// caller is holding the serverConfigLock. cycleServer does not test or ping +// caller is holding the listLock. cycleServer does not test or ping // the next server inline. cycleServer may be called when the environment // has just entered an unhealthy situation and blocking on a server test is // less desirable than just returning the next server in the firing line. If // the next server fails, it will fail fast enough and cycleServer will be // called again. -func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { - numServers := len(sc.servers) +func (l *serverList) cycleServer() (servers []*agent.Server) { + numServers := len(l.servers) if numServers < 2 { return servers // No action required } - newServers := make([]*server_details.ServerDetails, 0, numServers) - newServers = append(newServers, sc.servers[1:]...) - newServers = append(newServers, sc.servers[0]) + newServers := make([]*agent.Server, 0, numServers) + newServers = append(newServers, l.servers[1:]...) + newServers = append(newServers, l.servers[0]) return newServers } // removeServerByKey performs an inline removal of the first matching server -func (sc *serverConfig) removeServerByKey(targetKey *server_details.Key) { - for i, s := range sc.servers { +func (l *serverList) removeServerByKey(targetKey *agent.Key) { + for i, s := range l.servers { if targetKey.Equal(s.Key()) { - copy(sc.servers[i:], sc.servers[i+1:]) - sc.servers[len(sc.servers)-1] = nil - sc.servers = sc.servers[:len(sc.servers)-1] + copy(l.servers[i:], l.servers[i+1:]) + l.servers[len(l.servers)-1] = nil + l.servers = l.servers[:len(l.servers)-1] return } } } // shuffleServers shuffles the server list in place -func (sc *serverConfig) shuffleServers() { - for i := len(sc.servers) - 1; i > 0; i-- { +func (l *serverList) shuffleServers() { + for i := len(l.servers) - 1; i > 0; i-- { j := rand.Int31n(int32(i + 1)) - sc.servers[i], sc.servers[j] = sc.servers[j], sc.servers[i] + l.servers[i], l.servers[j] = l.servers[j], l.servers[i] } } @@ -184,52 +188,52 @@ func (sc *serverConfig) shuffleServers() { // server list. If the server at the front of the list has failed or fails // during an RPC call, it is rotated to the end of the list. If there are no // servers available, return nil. -func (sm *ServerManager) FindServer() *server_details.ServerDetails { - sc := sm.getServerConfig() - numServers := len(sc.servers) +func (m *Manager) FindServer() *agent.Server { + l := m.getServerList() + numServers := len(l.servers) if numServers == 0 { - sm.logger.Printf("[WARN] server manager: No servers available") + m.logger.Printf("[WARN] manager: No servers available") return nil } else { // Return whatever is at the front of the list because it is // assumed to be the oldest in the server list (unless - // hypothetically - the server list was rotated right after a // server was added). - return sc.servers[0] + return l.servers[0] } } -// getServerConfig is a convenience method which hides the locking semantics +// getServerList is a convenience method which hides the locking semantics // of atomic.Value from the caller. -func (sm *ServerManager) getServerConfig() serverConfig { - return sm.serverConfigValue.Load().(serverConfig) +func (m *Manager) getServerList() serverList { + return m.listValue.Load().(serverList) } -// saveServerConfig is a convenience method which hides the locking semantics +// saveServerList is a convenience method which hides the locking semantics // of atomic.Value from the caller. -func (sm *ServerManager) saveServerConfig(sc serverConfig) { - sm.serverConfigValue.Store(sc) +func (m *Manager) saveServerList(l serverList) { + m.listValue.Store(l) } -// New is the only way to safely create a new ServerManager struct. -func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (sm *ServerManager) { - sm = new(ServerManager) - sm.logger = logger - sm.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle - sm.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle - sm.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) - sm.shutdownCh = shutdownCh +// New is the only way to safely create a new Manager struct. +func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (m *Manager) { + m = new(Manager) + m.logger = logger + m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle + m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle + m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) + m.shutdownCh = shutdownCh - sc := serverConfig{} - sc.servers = make([]*server_details.ServerDetails, 0) - sm.saveServerConfig(sc) - return sm + l := serverList{} + l.servers = make([]*agent.Server, 0) + m.saveServerList(l) + return m } // NotifyFailedServer marks the passed in server as "failed" by rotating it // to the end of the server list. -func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { - sc := sm.getServerConfig() +func (m *Manager) NotifyFailedServer(s *agent.Server) { + l := m.getServerList() // If the server being failed is not the first server on the list, // this is a noop. If, however, the server is failed and first on @@ -237,30 +241,29 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // the server to the end of the list. // Only rotate the server list when there is more than one server - if len(sc.servers) > 1 && sc.servers[0] == server && + if len(l.servers) > 1 && l.servers[0] == s && // Use atomic.CAS to emulate a TryLock(). - atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { - defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0) + atomic.CompareAndSwapInt32(&m.notifyFailedBarrier, 0, 1) { + defer atomic.StoreInt32(&m.notifyFailedBarrier, 0) // Grab a lock, retest, and take the hit of cycling the first // server to the end. - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() - sc = sm.getServerConfig() + m.listLock.Lock() + defer m.listLock.Unlock() + l = m.getServerList() - if len(sc.servers) > 1 && sc.servers[0] == server { - sc.servers = sc.cycleServer() - sm.saveServerConfig(sc) + if len(l.servers) > 1 && l.servers[0] == s { + l.servers = l.cycleServer() + m.saveServerList(l) } } } // NumServers takes out an internal "read lock" and returns the number of // servers. numServers includes both healthy and unhealthy servers. -func (sm *ServerManager) NumServers() (numServers int) { - sc := sm.getServerConfig() - numServers = len(sc.servers) - return numServers +func (m *Manager) NumServers() int { + l := m.getServerList() + return len(l.servers) } // RebalanceServers shuffles the list of servers on this agent. The server @@ -275,46 +278,46 @@ func (sm *ServerManager) NumServers() (numServers int) { // Unhealthy servers are removed when serf notices the server has been // deregistered. Before the newly shuffled server list is saved, the new // remote endpoint is tested to ensure its responsive. -func (sm *ServerManager) RebalanceServers() { - // Obtain a copy of the current serverConfig - sc := sm.getServerConfig() +func (m *Manager) RebalanceServers() { + // Obtain a copy of the current serverList + l := m.getServerList() // Early abort if there is no value to shuffling - if len(sc.servers) < 2 { + if len(l.servers) < 2 { return } - sc.shuffleServers() + l.shuffleServers() // Iterate through the shuffled server list to find a healthy server. // Don't iterate on the list directly, this loop mutates the server // list. var foundHealthyServer bool - for i := 0; i < len(sc.servers); i++ { + for i := 0; i < len(l.servers); i++ { // Always test the first server. Failed servers are cycled // while Serf detects the node has failed. - selectedServer := sc.servers[0] + selectedServer := l.servers[0] - ok, err := sm.connPoolPinger.PingConsulServer(selectedServer) + ok, err := m.connPoolPinger.PingConsulServer(selectedServer) if ok { foundHealthyServer = true break } - sm.logger.Printf(`[DEBUG] server manager: pinging server "%s" failed: %s`, selectedServer.String(), err) + m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, selectedServer.String(), err) - sc.cycleServer() + l.cycleServer() } // If no healthy servers were found, sleep and wait for Serf to make // the world a happy place again. if !foundHealthyServer { - sm.logger.Printf("[DEBUG] server manager: No healthy servers during rebalance, aborting") + m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance, aborting") return } // Verify that all servers are present - if sm.reconcileServerList(&sc) { - sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String()) + if m.reconcileServerList(&l) { + m.logger.Printf("[DEBUG] manager: Rebalanced %d servers, next active server is %s", len(l.servers), l.servers[0].String()) } else { // reconcileServerList failed because Serf removed the server // that was at the front of the list that had successfully @@ -330,36 +333,36 @@ func (sm *ServerManager) RebalanceServers() { return } -// reconcileServerList returns true when the first server in serverConfig -// exists in the receiver's serverConfig. If true, the merged serverConfig -// is stored as the receiver's serverConfig. Returns false if the first +// reconcileServerList returns true when the first server in serverList +// exists in the receiver's serverList. If true, the merged serverList +// is stored as the receiver's serverList. Returns false if the first // server does not exist in the list (i.e. was removed by Serf during a // PingConsulServer() call. Newly added servers are appended to the list and // other missing servers are removed from the list. -func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() +func (m *Manager) reconcileServerList(l *serverList) bool { + m.listLock.Lock() + defer m.listLock.Unlock() - // newServerCfg is a serverConfig that has been kept up to date with + // newServerCfg is a serverList that has been kept up to date with // Serf node join and node leave events. - newServerCfg := sm.getServerConfig() + newServerCfg := m.getServerList() // If Serf has removed all nodes, or there is no selected server - // (zero nodes in sc), abort early. - if len(newServerCfg.servers) == 0 || len(sc.servers) == 0 { + // (zero nodes in l), abort early. + if len(newServerCfg.servers) == 0 || len(l.servers) == 0 { return false } type targetServer struct { - server *server_details.ServerDetails + server *agent.Server // 'b' == both // 'o' == original // 'n' == new state byte } - mergedList := make(map[server_details.Key]*targetServer, len(sc.servers)) - for _, s := range sc.servers { + mergedList := make(map[agent.Key]*targetServer, len(l.servers)) + for _, s := range l.servers { mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} } for _, s := range newServerCfg.servers { @@ -373,7 +376,7 @@ func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { } // Ensure the selected server has not been removed by Serf - selectedServerKey := sc.servers[0].Key() + selectedServerKey := l.servers[0].Key() if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { return false } @@ -385,63 +388,63 @@ func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { // Do nothing, server exists in both case 'o': // Server has been removed - sc.removeServerByKey(&k) + l.removeServerByKey(&k) case 'n': // Server added - sc.servers = append(sc.servers, v.server) + l.servers = append(l.servers, v.server) default: panic("unknown merge list state") } } - sm.saveServerConfig(*sc) + m.saveServerList(*l) return true } // RemoveServer takes out an internal write lock and removes a server from // the server list. -func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() - sc := sm.getServerConfig() +func (m *Manager) RemoveServer(s *agent.Server) { + m.listLock.Lock() + defer m.listLock.Unlock() + l := m.getServerList() // Remove the server if known - for i, _ := range sc.servers { - if sc.servers[i].Name == server.Name { - newServers := make([]*server_details.ServerDetails, 0, len(sc.servers)-1) - newServers = append(newServers, sc.servers[:i]...) - newServers = append(newServers, sc.servers[i+1:]...) - sc.servers = newServers + for i, _ := range l.servers { + if l.servers[i].Name == s.Name { + newServers := make([]*agent.Server, 0, len(l.servers)-1) + newServers = append(newServers, l.servers[:i]...) + newServers = append(newServers, l.servers[i+1:]...) + l.servers = newServers - sm.saveServerConfig(sc) + m.saveServerList(l) return } } } -// refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires. -func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration { - sc := sm.getServerConfig() - numConsulServers := len(sc.servers) +// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires. +func (m *Manager) refreshServerRebalanceTimer() time.Duration { + l := m.getServerList() + numConsulServers := len(l.servers) // Limit this connection's life based on the size (and health) of the // cluster. Never rebalance a connection more frequently than // connReuseLowWatermarkDuration, and make sure we never exceed // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - numLANMembers := sm.clusterInfo.NumNodes() + numLANMembers := m.clusterInfo.NumNodes() connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) - sm.rebalanceTimer.Reset(connRebalanceTimeout) + m.rebalanceTimer.Reset(connRebalanceTimeout) return connRebalanceTimeout } // ResetRebalanceTimer resets the rebalance timer. This method primarily // exists for testing and should not be used directly. -func (sm *ServerManager) ResetRebalanceTimer() { - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() - sm.rebalanceTimer.Reset(clientRPCMinReuseDuration) +func (m *Manager) ResetRebalanceTimer() { + m.listLock.Lock() + defer m.listLock.Unlock() + m.rebalanceTimer.Reset(clientRPCMinReuseDuration) } // Start is used to start and manage the task of automatically shuffling and @@ -450,15 +453,15 @@ func (sm *ServerManager) ResetRebalanceTimer() { // automatically cycled to the end of the list. New servers are appended to // the list. The order of the server list must be shuffled periodically to // distribute load across all known and available consul servers. -func (sm *ServerManager) Start() { +func (m *Manager) Start() { for { select { - case <-sm.rebalanceTimer.C: - sm.RebalanceServers() - sm.refreshServerRebalanceTimer() + case <-m.rebalanceTimer.C: + m.RebalanceServers() + m.refreshServerRebalanceTimer() - case <-sm.shutdownCh: - sm.logger.Printf("[INFO] server manager: shutting down") + case <-m.shutdownCh: + m.logger.Printf("[INFO] manager: shutting down") return } } diff --git a/consul/server_manager/server_manager_internal_test.go b/consul/servers/manager_internal_test.go similarity index 52% rename from consul/server_manager/server_manager_internal_test.go rename to consul/servers/manager_internal_test.go index 4a5a6d43ab..6ca5759cec 100644 --- a/consul/server_manager/server_manager_internal_test.go +++ b/consul/servers/manager_internal_test.go @@ -1,4 +1,4 @@ -package server_manager +package servers import ( "bytes" @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/agent" ) var ( @@ -31,7 +31,7 @@ type fauxConnPool struct { failPct float64 } -func (cp *fauxConnPool) PingConsulServer(server *server_details.ServerDetails) (bool, error) { +func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) { var success bool successProb := rand.Float64() if successProb > cp.failPct { @@ -48,108 +48,108 @@ func (s *fauxSerf) NumNodes() int { return s.numNodes } -func testServerManager() (sm *ServerManager) { +func testManager() (m *Manager) { logger := GetBufferedLogger() shutdownCh := make(chan struct{}) - sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}) - return sm + m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}) + return m } -func testServerManagerFailProb(failPct float64) (sm *ServerManager) { +func testManagerFailProb(failPct float64) (m *Manager) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) - return sm + m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) + return m } -// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { -func TestServerManagerInternal_cycleServer(t *testing.T) { - sm := testServerManager() - sc := sm.getServerConfig() +// func (l *serverList) cycleServer() (servers []*agent.Server) { +func TestManagerInternal_cycleServer(t *testing.T) { + m := testManager() + l := m.getServerList() - server0 := &server_details.ServerDetails{Name: "server1"} - server1 := &server_details.ServerDetails{Name: "server2"} - server2 := &server_details.ServerDetails{Name: "server3"} - sc.servers = append(sc.servers, server0, server1, server2) - sm.saveServerConfig(sc) + server0 := &agent.Server{Name: "server1"} + server1 := &agent.Server{Name: "server2"} + server2 := &agent.Server{Name: "server3"} + l.servers = append(l.servers, server0, server1, server2) + m.saveServerList(l) - sc = sm.getServerConfig() - if len(sc.servers) != 3 { - t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + l = m.getServerList() + if len(l.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(l.servers)) } - if sc.servers[0] != server0 && - sc.servers[1] != server1 && - sc.servers[2] != server2 { + if l.servers[0] != server0 && + l.servers[1] != server1 && + l.servers[2] != server2 { t.Fatalf("initial server ordering not correct") } - sc.servers = sc.cycleServer() - if len(sc.servers) != 3 { - t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + l.servers = l.cycleServer() + if len(l.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(l.servers)) } - if sc.servers[0] != server1 && - sc.servers[1] != server2 && - sc.servers[2] != server0 { + if l.servers[0] != server1 && + l.servers[1] != server2 && + l.servers[2] != server0 { t.Fatalf("server ordering after one cycle not correct") } - sc.servers = sc.cycleServer() - if len(sc.servers) != 3 { - t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + l.servers = l.cycleServer() + if len(l.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(l.servers)) } - if sc.servers[0] != server2 && - sc.servers[1] != server0 && - sc.servers[2] != server1 { + if l.servers[0] != server2 && + l.servers[1] != server0 && + l.servers[2] != server1 { t.Fatalf("server ordering after two cycles not correct") } - sc.servers = sc.cycleServer() - if len(sc.servers) != 3 { - t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + l.servers = l.cycleServer() + if len(l.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(l.servers)) } - if sc.servers[0] != server0 && - sc.servers[1] != server1 && - sc.servers[2] != server2 { + if l.servers[0] != server0 && + l.servers[1] != server1 && + l.servers[2] != server2 { t.Fatalf("server ordering after three cycles not correct") } } -// func (sm *ServerManager) getServerConfig() serverConfig { -func TestServerManagerInternal_getServerConfig(t *testing.T) { - sm := testServerManager() - sc := sm.getServerConfig() - if sc.servers == nil { - t.Fatalf("serverConfig.servers nil") +// func (m *Manager) getServerList() serverList { +func TestManagerInternal_getServerList(t *testing.T) { + m := testManager() + l := m.getServerList() + if l.servers == nil { + t.Fatalf("serverList.servers nil") } - if len(sc.servers) != 0 { - t.Fatalf("serverConfig.servers length not zero") + if len(l.servers) != 0 { + t.Fatalf("serverList.servers length not zero") } } -// func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) { -func TestServerManagerInternal_New(t *testing.T) { - sm := testServerManager() - if sm == nil { - t.Fatalf("ServerManager nil") +// func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (m *Manager) { +func TestManagerInternal_New(t *testing.T) { + m := testManager() + if m == nil { + t.Fatalf("Manager nil") } - if sm.clusterInfo == nil { - t.Fatalf("ServerManager.clusterInfo nil") + if m.clusterInfo == nil { + t.Fatalf("Manager.clusterInfo nil") } - if sm.logger == nil { - t.Fatalf("ServerManager.logger nil") + if m.logger == nil { + t.Fatalf("Manager.logger nil") } - if sm.shutdownCh == nil { - t.Fatalf("ServerManager.shutdownCh nil") + if m.shutdownCh == nil { + t.Fatalf("Manager.shutdownCh nil") } } -// func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { -func TestServerManagerInternal_reconcileServerList(t *testing.T) { +// func (m *Manager) reconcileServerList(l *serverList) bool { +func TestManagerInternal_reconcileServerList(t *testing.T) { tests := []int{0, 1, 2, 3, 4, 5, 10, 100} for _, n := range tests { ok, err := test_reconcileServerList(n) @@ -164,22 +164,22 @@ func test_reconcileServerList(maxServers int) (bool, error) { // missing, the added have been added, and the original server is // present. const failPct = 0.5 - sm := testServerManagerFailProb(failPct) + m := testManagerFailProb(failPct) - var failedServers, healthyServers []*server_details.ServerDetails + var failedServers, healthyServers []*agent.Server for i := 0; i < maxServers; i++ { nodeName := fmt.Sprintf("s%02d", i) - node := &server_details.ServerDetails{Name: nodeName} - // Add 66% of servers to ServerManager + node := &agent.Server{Name: nodeName} + // Add 66% of servers to Manager if rand.Float64() > 0.33 { - sm.AddServer(node) + m.AddServer(node) // Of healthy servers, (ab)use connPoolPinger to // failPct of the servers for the reconcile. This // allows for the selected server to no longer be // healthy for the reconcile below. - if ok, _ := sm.connPoolPinger.PingConsulServer(node); ok { + if ok, _ := m.connPoolPinger.PingConsulServer(node); ok { // Will still be present healthyServers = append(healthyServers, node) } else { @@ -192,9 +192,9 @@ func test_reconcileServerList(maxServers int) (bool, error) { } } - // Randomize ServerManager's server list - sm.RebalanceServers() - selectedServer := sm.FindServer() + // Randomize Manager's server list + m.RebalanceServers() + selectedServer := m.FindServer() var selectedServerFailed bool for _, s := range failedServers { @@ -204,39 +204,39 @@ func test_reconcileServerList(maxServers int) (bool, error) { } } - // Update ServerManager's server list to be "healthy" based on Serf. + // Update Manager's server list to be "healthy" based on Serf. // Reconcile this with origServers, which is shuffled and has a live // connection, but possibly out of date. - origServers := sm.getServerConfig() - sm.saveServerConfig(serverConfig{servers: healthyServers}) + origServers := m.getServerList() + m.saveServerList(serverList{servers: healthyServers}) // This should always succeed with non-zero server lists - if !selectedServerFailed && !sm.reconcileServerList(&origServers) && - len(sm.getServerConfig().servers) != 0 && + if !selectedServerFailed && !m.reconcileServerList(&origServers) && + len(m.getServerList().servers) != 0 && len(origServers.servers) != 0 { // If the random gods are unfavorable and we end up with zero // length lists, expect things to fail and retry the test. return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d", selectedServerFailed, - len(sm.getServerConfig().servers), + len(m.getServerList().servers), len(origServers.servers)) } // If we have zero-length server lists, test succeeded in degenerate // case. - if len(sm.getServerConfig().servers) == 0 && + if len(m.getServerList().servers) == 0 && len(origServers.servers) == 0 { // Failed as expected w/ zero length list return true, nil } - resultingServerMap := make(map[server_details.Key]bool) - for _, s := range sm.getServerConfig().servers { + resultingServerMap := make(map[agent.Key]bool) + for _, s := range m.getServerList().servers { resultingServerMap[*s.Key()] = true } - // Test to make sure no failed servers are in the ServerManager's - // list. Error if there are any failedServers in sc.servers + // Test to make sure no failed servers are in the Manager's + // list. Error if there are any failedServers in l.servers for _, s := range failedServers { _, ok := resultingServerMap[*s.Key()] if ok { @@ -245,7 +245,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { } // Test to make sure all healthy servers are in the healthy list. - if len(healthyServers) != len(sm.getServerConfig().servers) { + if len(healthyServers) != len(m.getServerList().servers) { return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers)) } @@ -259,8 +259,8 @@ func test_reconcileServerList(maxServers int) (bool, error) { return true, nil } -// func (sc *serverConfig) refreshServerRebalanceTimer() { -func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { +// func (l *serverList) refreshServerRebalanceTimer() { +func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) { type clusterSizes struct { numNodes int numServers int @@ -299,54 +299,54 @@ func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { shutdownCh := make(chan struct{}) for _, s := range clusters { - sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) + m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) for i := 0; i < s.numServers; i++ { nodeName := fmt.Sprintf("s%02d", i) - sm.AddServer(&server_details.ServerDetails{Name: nodeName}) + m.AddServer(&agent.Server{Name: nodeName}) } - d := sm.refreshServerRebalanceTimer() + d := m.refreshServerRebalanceTimer() if d < s.minRebalance { t.Errorf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance) } } } -// func (sm *ServerManager) saveServerConfig(sc serverConfig) { -func TestServerManagerInternal_saveServerConfig(t *testing.T) { - sm := testServerManager() +// func (m *Manager) saveServerList(l serverList) { +func TestManagerInternal_saveServerList(t *testing.T) { + m := testManager() // Initial condition func() { - sc := sm.getServerConfig() - if len(sc.servers) != 0 { - t.Fatalf("ServerManager.saveServerConfig failed to load init config") + l := m.getServerList() + if len(l.servers) != 0 { + t.Fatalf("Manager.saveServerList failed to load init config") } - newServer := new(server_details.ServerDetails) - sc.servers = append(sc.servers, newServer) - sm.saveServerConfig(sc) + newServer := new(agent.Server) + l.servers = append(l.servers, newServer) + m.saveServerList(l) }() // Test that save works func() { - sc1 := sm.getServerConfig() - t1NumServers := len(sc1.servers) + l1 := m.getServerList() + t1NumServers := len(l1.servers) if t1NumServers != 1 { - t.Fatalf("ServerManager.saveServerConfig failed to save mutated config") + t.Fatalf("Manager.saveServerList failed to save mutated config") } }() // Verify mutation w/o a save doesn't alter the original func() { - newServer := new(server_details.ServerDetails) - sc := sm.getServerConfig() - sc.servers = append(sc.servers, newServer) + newServer := new(agent.Server) + l := m.getServerList() + l.servers = append(l.servers, newServer) - sc_orig := sm.getServerConfig() - origNumServers := len(sc_orig.servers) - if origNumServers >= len(sc.servers) { - t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original") + l_orig := m.getServerList() + origNumServers := len(l_orig.servers) + if origNumServers >= len(l.servers) { + t.Fatalf("Manager.saveServerList unsaved config overwrote original") } }() } diff --git a/consul/server_manager/server_manager_test.go b/consul/servers/manager_test.go similarity index 50% rename from consul/server_manager/server_manager_test.go rename to consul/servers/manager_test.go index 25673140a4..5b467445d5 100644 --- a/consul/server_manager/server_manager_test.go +++ b/consul/servers/manager_test.go @@ -1,4 +1,4 @@ -package server_manager_test +package servers_test import ( "bytes" @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/consul/server_details" - "github.com/hashicorp/consul/consul/server_manager" + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/servers" ) var ( @@ -32,7 +32,7 @@ type fauxConnPool struct { failPct float64 } -func (cp *fauxConnPool) PingConsulServer(server *server_details.ServerDetails) (bool, error) { +func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) { var success bool successProb := rand.Float64() if successProb > cp.failPct { @@ -48,66 +48,66 @@ func (s *fauxSerf) NumNodes() int { return 16384 } -func testServerManager() (sm *server_manager.ServerManager) { +func testManager() (m *servers.Manager) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) - return sm + m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) + return m } -func testServerManagerFailProb(failPct float64) (sm *server_manager.ServerManager) { +func testManagerFailProb(failPct float64) (m *servers.Manager) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) - return sm + m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) + return m } -// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { -func TestServerManager_AddServer(t *testing.T) { - sm := testServerManager() +// func (m *Manager) AddServer(server *agent.Server) { +func TestServers_AddServer(t *testing.T) { + m := testManager() var num int - num = sm.NumServers() + num = m.NumServers() if num != 0 { t.Fatalf("Expected zero servers to start") } - s1 := &server_details.ServerDetails{Name: "s1"} - sm.AddServer(s1) - num = sm.NumServers() + s1 := &agent.Server{Name: "s1"} + m.AddServer(s1) + num = m.NumServers() if num != 1 { t.Fatalf("Expected one server") } - sm.AddServer(s1) - num = sm.NumServers() + m.AddServer(s1) + num = m.NumServers() if num != 1 { t.Fatalf("Expected one server (still)") } - s2 := &server_details.ServerDetails{Name: "s2"} - sm.AddServer(s2) - num = sm.NumServers() + s2 := &agent.Server{Name: "s2"} + m.AddServer(s2) + num = m.NumServers() if num != 2 { t.Fatalf("Expected two servers") } } -// func (sm *ServerManager) FindServer() (server *server_details.ServerDetails) { -func TestServerManager_FindServer(t *testing.T) { - sm := testServerManager() +// func (m *Manager) FindServer() (server *agent.Server) { +func TestServers_FindServer(t *testing.T) { + m := testManager() - if sm.FindServer() != nil { + if m.FindServer() != nil { t.Fatalf("Expected nil return") } - sm.AddServer(&server_details.ServerDetails{Name: "s1"}) - if sm.NumServers() != 1 { + m.AddServer(&agent.Server{Name: "s1"}) + if m.NumServers() != 1 { t.Fatalf("Expected one server") } - s1 := sm.FindServer() + s1 := m.FindServer() if s1 == nil { t.Fatalf("Expected non-nil server") } @@ -115,118 +115,118 @@ func TestServerManager_FindServer(t *testing.T) { t.Fatalf("Expected s1 server") } - s1 = sm.FindServer() + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server (still)") } - sm.AddServer(&server_details.ServerDetails{Name: "s2"}) - if sm.NumServers() != 2 { + m.AddServer(&agent.Server{Name: "s2"}) + if m.NumServers() != 2 { t.Fatalf("Expected two servers") } - s1 = sm.FindServer() + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server (still)") } - sm.NotifyFailedServer(s1) - s2 := sm.FindServer() + m.NotifyFailedServer(s1) + s2 := m.FindServer() if s2 == nil || s2.Name != "s2" { t.Fatalf("Expected s2 server") } - sm.NotifyFailedServer(s2) - s1 = sm.FindServer() + m.NotifyFailedServer(s2) + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server") } } -// func New(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) { -func TestServerManager_New(t *testing.T) { +// func New(logger *log.Logger, shutdownCh chan struct{}) (m *Manager) { +func TestServers_New(t *testing.T) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm := server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) - if sm == nil { - t.Fatalf("ServerManager nil") + m := servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) + if m == nil { + t.Fatalf("Manager nil") } } -// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { -func TestServerManager_NotifyFailedServer(t *testing.T) { - sm := testServerManager() +// func (m *Manager) NotifyFailedServer(server *agent.Server) { +func TestServers_NotifyFailedServer(t *testing.T) { + m := testManager() - if sm.NumServers() != 0 { + if m.NumServers() != 0 { t.Fatalf("Expected zero servers to start") } - s1 := &server_details.ServerDetails{Name: "s1"} - s2 := &server_details.ServerDetails{Name: "s2"} + s1 := &agent.Server{Name: "s1"} + s2 := &agent.Server{Name: "s2"} - // Try notifying for a server that is not part of the server manager - sm.NotifyFailedServer(s1) - if sm.NumServers() != 0 { + // Try notifying for a server that is not managed by Manager + m.NotifyFailedServer(s1) + if m.NumServers() != 0 { t.Fatalf("Expected zero servers to start") } - sm.AddServer(s1) + m.AddServer(s1) // Test again w/ a server not in the list - sm.NotifyFailedServer(s2) - if sm.NumServers() != 1 { + m.NotifyFailedServer(s2) + if m.NumServers() != 1 { t.Fatalf("Expected one server") } - sm.AddServer(s2) - if sm.NumServers() != 2 { + m.AddServer(s2) + if m.NumServers() != 2 { t.Fatalf("Expected two servers") } - s1 = sm.FindServer() + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server") } - sm.NotifyFailedServer(s2) - s1 = sm.FindServer() + m.NotifyFailedServer(s2) + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server (still)") } - sm.NotifyFailedServer(s1) - s2 = sm.FindServer() + m.NotifyFailedServer(s1) + s2 = m.FindServer() if s2 == nil || s2.Name != "s2" { t.Fatalf("Expected s2 server") } - sm.NotifyFailedServer(s2) - s1 = sm.FindServer() + m.NotifyFailedServer(s2) + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server") } } -// func (sm *ServerManager) NumServers() (numServers int) { -func TestServerManager_NumServers(t *testing.T) { - sm := testServerManager() +// func (m *Manager) NumServers() (numServers int) { +func TestServers_NumServers(t *testing.T) { + m := testManager() var num int - num = sm.NumServers() + num = m.NumServers() if num != 0 { t.Fatalf("Expected zero servers to start") } - s := &server_details.ServerDetails{} - sm.AddServer(s) - num = sm.NumServers() + s := &agent.Server{} + m.AddServer(s) + num = m.NumServers() if num != 1 { t.Fatalf("Expected one server after AddServer") } } -// func (sm *ServerManager) RebalanceServers() { -func TestServerManager_RebalanceServers(t *testing.T) { +// func (m *Manager) RebalanceServers() { +func TestServers_RebalanceServers(t *testing.T) { const failPct = 0.5 - sm := testServerManagerFailProb(failPct) + m := testManagerFailProb(failPct) const maxServers = 100 const numShuffleTests = 100 const uniquePassRate = 0.5 @@ -234,18 +234,18 @@ func TestServerManager_RebalanceServers(t *testing.T) { // Make a huge list of nodes. for i := 0; i < maxServers; i++ { nodeName := fmt.Sprintf("s%02d", i) - sm.AddServer(&server_details.ServerDetails{Name: nodeName}) + m.AddServer(&agent.Server{Name: nodeName}) } // Keep track of how many unique shuffles we get. uniques := make(map[string]struct{}, maxServers) for i := 0; i < numShuffleTests; i++ { - sm.RebalanceServers() + m.RebalanceServers() var names []string for j := 0; j < maxServers; j++ { - server := sm.FindServer() - sm.NotifyFailedServer(server) + server := m.FindServer() + m.NotifyFailedServer(server) names = append(names, server.Name) } key := strings.Join(names, "|") @@ -260,48 +260,48 @@ func TestServerManager_RebalanceServers(t *testing.T) { } } -// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { -func TestServerManager_RemoveServer(t *testing.T) { +// func (m *Manager) RemoveServer(server *agent.Server) { +func TestManager_RemoveServer(t *testing.T) { const nodeNameFmt = "s%02d" - sm := testServerManager() + m := testManager() - if sm.NumServers() != 0 { + if m.NumServers() != 0 { t.Fatalf("Expected zero servers to start") } // Test removing server before its added nodeName := fmt.Sprintf(nodeNameFmt, 1) - s1 := &server_details.ServerDetails{Name: nodeName} - sm.RemoveServer(s1) - sm.AddServer(s1) + s1 := &agent.Server{Name: nodeName} + m.RemoveServer(s1) + m.AddServer(s1) nodeName = fmt.Sprintf(nodeNameFmt, 2) - s2 := &server_details.ServerDetails{Name: nodeName} - sm.RemoveServer(s2) - sm.AddServer(s2) + s2 := &agent.Server{Name: nodeName} + m.RemoveServer(s2) + m.AddServer(s2) const maxServers = 19 - servers := make([]*server_details.ServerDetails, maxServers) + servers := make([]*agent.Server, maxServers) // Already added two servers above for i := maxServers; i > 2; i-- { nodeName := fmt.Sprintf(nodeNameFmt, i) - server := &server_details.ServerDetails{Name: nodeName} + server := &agent.Server{Name: nodeName} servers = append(servers, server) - sm.AddServer(server) + m.AddServer(server) } - if sm.NumServers() != maxServers { - t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers()) + if m.NumServers() != maxServers { + t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers()) } - sm.RebalanceServers() + m.RebalanceServers() - if sm.NumServers() != maxServers { - t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers()) + if m.NumServers() != maxServers { + t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers()) } - findServer := func(server *server_details.ServerDetails) bool { - for i := sm.NumServers(); i > 0; i-- { - s := sm.FindServer() + findServer := func(server *agent.Server) bool { + for i := m.NumServers(); i > 0; i-- { + s := m.FindServer() if s == server { return true } @@ -310,18 +310,18 @@ func TestServerManager_RemoveServer(t *testing.T) { } expectedNumServers := maxServers - removedServers := make([]*server_details.ServerDetails, 0, maxServers) + removedServers := make([]*agent.Server, 0, maxServers) // Remove servers from the front of the list for i := 3; i > 0; i-- { - server := sm.FindServer() + server := m.FindServer() if server == nil { t.Fatalf("FindServer returned nil") } - sm.RemoveServer(server) + m.RemoveServer(server) expectedNumServers-- - if sm.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + if m.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers()) } if findServer(server) == true { t.Fatalf("Did not expect to find server %s after removal from the front", server.Name) @@ -331,12 +331,12 @@ func TestServerManager_RemoveServer(t *testing.T) { // Remove server from the end of the list for i := 3; i > 0; i-- { - server := sm.FindServer() - sm.NotifyFailedServer(server) - sm.RemoveServer(server) + server := m.FindServer() + m.NotifyFailedServer(server) + m.RemoveServer(server) expectedNumServers-- - if sm.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + if m.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers()) } if findServer(server) == true { t.Fatalf("Did not expect to find server %s", server.Name) @@ -346,15 +346,15 @@ func TestServerManager_RemoveServer(t *testing.T) { // Remove server from the middle of the list for i := 3; i > 0; i-- { - server := sm.FindServer() - sm.NotifyFailedServer(server) - server2 := sm.FindServer() - sm.NotifyFailedServer(server2) // server2 now at end of the list + server := m.FindServer() + m.NotifyFailedServer(server) + server2 := m.FindServer() + m.NotifyFailedServer(server2) // server2 now at end of the list - sm.RemoveServer(server) + m.RemoveServer(server) expectedNumServers-- - if sm.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + if m.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers()) } if findServer(server) == true { t.Fatalf("Did not expect to find server %s", server.Name) @@ -362,21 +362,21 @@ func TestServerManager_RemoveServer(t *testing.T) { removedServers = append(removedServers, server) } - if sm.NumServers()+len(removedServers) != maxServers { - t.Fatalf("Expected %d+%d=%d servers", sm.NumServers(), len(removedServers), maxServers) + if m.NumServers()+len(removedServers) != maxServers { + t.Fatalf("Expected %d+%d=%d servers", m.NumServers(), len(removedServers), maxServers) } // Drain the remaining servers from the middle - for i := sm.NumServers(); i > 0; i-- { - server := sm.FindServer() - sm.NotifyFailedServer(server) - server2 := sm.FindServer() - sm.NotifyFailedServer(server2) // server2 now at end of the list - sm.RemoveServer(server) + for i := m.NumServers(); i > 0; i-- { + server := m.FindServer() + m.NotifyFailedServer(server) + server2 := m.FindServer() + m.NotifyFailedServer(server2) // server2 now at end of the list + m.RemoveServer(server) removedServers = append(removedServers, server) } - if sm.NumServers() != 0 { + if m.NumServers() != 0 { t.Fatalf("Expected an empty server list") } if len(removedServers) != maxServers { @@ -384,4 +384,4 @@ func TestServerManager_RemoveServer(t *testing.T) { } } -// func (sm *ServerManager) Start() { +// func (m *Manager) Start() {