From 01c73ee9ae7996293e65fb728bd8f1da573c7a10 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 20 Jan 2014 13:39:07 -1000 Subject: [PATCH] change isConsulServer to parse flags --- consul/client.go | 16 ++++++++-------- consul/leader.go | 14 +++++++------- consul/serf.go | 22 +++++++++++----------- consul/server.go | 6 +++++- consul/util.go | 18 +++++++++++++----- consul/util_test.go | 6 +++--- 6 files changed, 47 insertions(+), 35 deletions(-) diff --git a/consul/client.go b/consul/client.go index 9a06c2f89e..f1ec12e866 100644 --- a/consul/client.go +++ b/consul/client.go @@ -178,18 +178,18 @@ func (c *Client) lanEventHandler() { // nodeJoin is used to handle join events on the serf cluster func (c *Client) nodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, dc, port := isConsulServer(m) + ok, parts := isConsulServer(m) if !ok { continue } - if dc != c.config.Datacenter { + if parts.Datacenter != c.config.Datacenter { c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster", - m.Name, dc) + m.Name, parts.Datacenter) continue } - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} - c.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", dc, addr) + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} + c.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr) // Check if this server is known found := false @@ -212,12 +212,12 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { // nodeFail is used to handle fail events on the serf cluster func (c *Client) nodeFail(me serf.MemberEvent) { for _, m := range me.Members { - ok, dc, port := isConsulServer(m) + ok, parts := isConsulServer(m) if !ok { continue } - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} - c.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", dc, addr) + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} + c.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", parts.Datacenter, addr) // Remove the server if known c.consulLock.Lock() diff --git a/consul/leader.go b/consul/leader.go index 7adab0c968..61a8b284a9 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -129,7 +129,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { return true } - if valid, dc, _ := isConsulServer(member); valid && dc == s.config.Datacenter { + if valid, parts := isConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { return true } return false @@ -142,15 +142,15 @@ func (s *Server) handleAliveMember(member serf.Member) error { // Register consul service if a server var service *structs.NodeService - if valid, _, port := isConsulServer(member); valid { + if valid, parts := isConsulServer(member); valid { service = &structs.NodeService{ ID: ConsulServiceID, Service: ConsulServiceName, - Port: port, + Port: parts.Port, } // Attempt to join the consul server - if err := s.joinConsulServer(member, port); err != nil { + if err := s.joinConsulServer(member, parts.Port); err != nil { return err } } @@ -247,8 +247,8 @@ func (s *Server) handleLeftMember(member serf.Member) error { s.logger.Printf("[INFO] consul: member '%s' left, deregistering", member.Name) // Remove from Raft peers if this was a server - if valid, _, port := isConsulServer(member); valid { - if err := s.removeConsulServer(member, port); err != nil { + if valid, parts := isConsulServer(member); valid { + if err := s.removeConsulServer(member, parts.Port); err != nil { return err } } @@ -279,7 +279,7 @@ func (s *Server) joinConsulServer(m serf.Member, port int) error { return nil } -// joinConsulServer is used to try to join another consul server +// removeConsulServer is used to try to remove a consul server that has left func (s *Server) removeConsulServer(m serf.Member, port int) error { // Do not remove ourself if m.Name == s.config.NodeName { diff --git a/consul/serf.go b/consul/serf.go index 26fd2c475e..3dff0f9e2e 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -71,18 +71,18 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) { // remoteJoin is used to handle join events on the wan serf cluster func (s *Server) remoteJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, dc, port := isConsulServer(m) + ok, parts := isConsulServer(m) if !ok { s.logger.Printf("[WARN] consul: non-server in WAN pool: %s %s", m.Name) continue } - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} - s.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", dc, addr) + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} + s.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr) // Check if this server is known found := false s.remoteLock.Lock() - existing := s.remoteConsuls[dc] + existing := s.remoteConsuls[parts.Datacenter] for _, e := range existing { if e.String() == addr.String() { found = true @@ -92,7 +92,7 @@ func (s *Server) remoteJoin(me serf.MemberEvent) { // Add ot the list if not known if !found { - s.remoteConsuls[dc] = append(existing, addr) + s.remoteConsuls[parts.Datacenter] = append(existing, addr) } s.remoteLock.Unlock() } @@ -101,16 +101,16 @@ func (s *Server) remoteJoin(me serf.MemberEvent) { // remoteFailed is used to handle fail events on the wan serf cluster func (s *Server) remoteFailed(me serf.MemberEvent) { for _, m := range me.Members { - ok, dc, port := isConsulServer(m) + ok, parts := isConsulServer(m) if !ok { continue } - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} - s.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", dc, addr) + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port} + s.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", parts.Datacenter, addr) // Remove the server if known s.remoteLock.Lock() - existing := s.remoteConsuls[dc] + existing := s.remoteConsuls[parts.Datacenter] n := len(existing) for i := 0; i < n; i++ { if existing[i].String() == addr.String() { @@ -123,9 +123,9 @@ func (s *Server) remoteFailed(me serf.MemberEvent) { // Trim the list if all known consuls are dead if n == 0 { - delete(s.remoteConsuls, dc) + delete(s.remoteConsuls, parts.Datacenter) } else { - s.remoteConsuls[dc] = existing + s.remoteConsuls[parts.Datacenter] = existing } s.remoteLock.Unlock() } diff --git a/consul/server.go b/consul/server.go index e158580c76..7466413ea7 100644 --- a/consul/server.go +++ b/consul/server.go @@ -161,8 +161,12 @@ func NewServer(config *Config) (*Server, error) { // setupSerf is used to setup and initialize a Serf func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) { addr := s.rpcListener.Addr().(*net.TCPAddr) + flags := "" + if s.config.Bootstrap { + flags = "b" + } conf.NodeName = s.config.NodeName - conf.Role = fmt.Sprintf("consul:%s:%d", s.config.Datacenter, addr.Port) + conf.Role = fmt.Sprintf("consul:%s:%d:%s", s.config.Datacenter, addr.Port, flags) conf.MemberlistConfig.LogOutput = s.config.LogOutput conf.LogOutput = s.config.LogOutput conf.EventCh = ch diff --git a/consul/util.go b/consul/util.go index eb9f765d0b..22d8ccf8c0 100644 --- a/consul/util.go +++ b/consul/util.go @@ -19,6 +19,13 @@ import ( */ var privateBlocks []*net.IPNet +// serverparts is used to return the parts of a server role +type serverParts struct { + Datacenter string + Port int + Flags string +} + func init() { // Add each private block privateBlocks = make([]*net.IPNet, 3) @@ -61,21 +68,22 @@ func ensurePath(path string, dir bool) error { // Returns if a member is a consul server. Returns a bool, // the data center, and the rpc port -func isConsulServer(m serf.Member) (bool, string, int) { +func isConsulServer(m serf.Member) (bool, *serverParts) { role := m.Role if !strings.HasPrefix(role, "consul:") { - return false, "", 0 + return false, nil } - parts := strings.SplitN(role, ":", 3) + parts := strings.SplitN(role, ":", 4) datacenter := parts[1] port_str := parts[2] + flags := parts[3] port, err := strconv.Atoi(port_str) if err != nil { - return false, "", 0 + return false, nil } - return true, datacenter, port + return true, &serverParts{datacenter, port, flags} } // Returns if a member is a consul node. Returns a boo, diff --git a/consul/util_test.go b/consul/util_test.go index 7d24d458a4..9117684b17 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -37,9 +37,9 @@ func TestIsConsulServer(t *testing.T) { m := serf.Member{ Role: "consul:east-aws:10000", } - valid, dc, port := isConsulServer(m) - if !valid || dc != "east-aws" || port != 10000 { - t.Fatalf("bad: %v %v %v", valid, dc, port) + valid, parts := isConsulServer(m) + if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 { + t.Fatalf("bad: %v %v", valid, parts) } }