mirror of https://github.com/hashicorp/consul
change isConsulServer to parse flags
parent
be61bdc5b8
commit
01c73ee9ae
|
@ -178,18 +178,18 @@ func (c *Client) lanEventHandler() {
|
||||||
// nodeJoin is used to handle join events on the serf cluster
|
// nodeJoin is used to handle join events on the serf cluster
|
||||||
func (c *Client) nodeJoin(me serf.MemberEvent) {
|
func (c *Client) nodeJoin(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, dc, port := isConsulServer(m)
|
ok, parts := isConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if dc != c.config.Datacenter {
|
if parts.Datacenter != c.config.Datacenter {
|
||||||
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
|
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
|
||||||
m.Name, dc)
|
m.Name, parts.Datacenter)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
|
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
|
||||||
c.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", dc, addr)
|
c.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr)
|
||||||
|
|
||||||
// Check if this server is known
|
// Check if this server is known
|
||||||
found := false
|
found := false
|
||||||
|
@ -212,12 +212,12 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
|
||||||
// nodeFail is used to handle fail events on the serf cluster
|
// nodeFail is used to handle fail events on the serf cluster
|
||||||
func (c *Client) nodeFail(me serf.MemberEvent) {
|
func (c *Client) nodeFail(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, dc, port := isConsulServer(m)
|
ok, parts := isConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
|
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
|
||||||
c.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", dc, addr)
|
c.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", parts.Datacenter, addr)
|
||||||
|
|
||||||
// Remove the server if known
|
// Remove the server if known
|
||||||
c.consulLock.Lock()
|
c.consulLock.Lock()
|
||||||
|
|
|
@ -129,7 +129,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
|
||||||
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
|
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if valid, dc, _ := isConsulServer(member); valid && dc == s.config.Datacenter {
|
if valid, parts := isConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -142,15 +142,15 @@ func (s *Server) handleAliveMember(member serf.Member) error {
|
||||||
|
|
||||||
// Register consul service if a server
|
// Register consul service if a server
|
||||||
var service *structs.NodeService
|
var service *structs.NodeService
|
||||||
if valid, _, port := isConsulServer(member); valid {
|
if valid, parts := isConsulServer(member); valid {
|
||||||
service = &structs.NodeService{
|
service = &structs.NodeService{
|
||||||
ID: ConsulServiceID,
|
ID: ConsulServiceID,
|
||||||
Service: ConsulServiceName,
|
Service: ConsulServiceName,
|
||||||
Port: port,
|
Port: parts.Port,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to join the consul server
|
// Attempt to join the consul server
|
||||||
if err := s.joinConsulServer(member, port); err != nil {
|
if err := s.joinConsulServer(member, parts.Port); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -247,8 +247,8 @@ func (s *Server) handleLeftMember(member serf.Member) error {
|
||||||
s.logger.Printf("[INFO] consul: member '%s' left, deregistering", member.Name)
|
s.logger.Printf("[INFO] consul: member '%s' left, deregistering", member.Name)
|
||||||
|
|
||||||
// Remove from Raft peers if this was a server
|
// Remove from Raft peers if this was a server
|
||||||
if valid, _, port := isConsulServer(member); valid {
|
if valid, parts := isConsulServer(member); valid {
|
||||||
if err := s.removeConsulServer(member, port); err != nil {
|
if err := s.removeConsulServer(member, parts.Port); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -279,7 +279,7 @@ func (s *Server) joinConsulServer(m serf.Member, port int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// joinConsulServer is used to try to join another consul server
|
// removeConsulServer is used to try to remove a consul server that has left
|
||||||
func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
||||||
// Do not remove ourself
|
// Do not remove ourself
|
||||||
if m.Name == s.config.NodeName {
|
if m.Name == s.config.NodeName {
|
||||||
|
|
|
@ -71,18 +71,18 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) {
|
||||||
// remoteJoin is used to handle join events on the wan serf cluster
|
// remoteJoin is used to handle join events on the wan serf cluster
|
||||||
func (s *Server) remoteJoin(me serf.MemberEvent) {
|
func (s *Server) remoteJoin(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, dc, port := isConsulServer(m)
|
ok, parts := isConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s %s", m.Name)
|
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s %s", m.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
|
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
|
||||||
s.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", dc, addr)
|
s.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr)
|
||||||
|
|
||||||
// Check if this server is known
|
// Check if this server is known
|
||||||
found := false
|
found := false
|
||||||
s.remoteLock.Lock()
|
s.remoteLock.Lock()
|
||||||
existing := s.remoteConsuls[dc]
|
existing := s.remoteConsuls[parts.Datacenter]
|
||||||
for _, e := range existing {
|
for _, e := range existing {
|
||||||
if e.String() == addr.String() {
|
if e.String() == addr.String() {
|
||||||
found = true
|
found = true
|
||||||
|
@ -92,7 +92,7 @@ func (s *Server) remoteJoin(me serf.MemberEvent) {
|
||||||
|
|
||||||
// Add ot the list if not known
|
// Add ot the list if not known
|
||||||
if !found {
|
if !found {
|
||||||
s.remoteConsuls[dc] = append(existing, addr)
|
s.remoteConsuls[parts.Datacenter] = append(existing, addr)
|
||||||
}
|
}
|
||||||
s.remoteLock.Unlock()
|
s.remoteLock.Unlock()
|
||||||
}
|
}
|
||||||
|
@ -101,16 +101,16 @@ func (s *Server) remoteJoin(me serf.MemberEvent) {
|
||||||
// remoteFailed is used to handle fail events on the wan serf cluster
|
// remoteFailed is used to handle fail events on the wan serf cluster
|
||||||
func (s *Server) remoteFailed(me serf.MemberEvent) {
|
func (s *Server) remoteFailed(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
ok, dc, port := isConsulServer(m)
|
ok, parts := isConsulServer(m)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
|
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
|
||||||
s.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", dc, addr)
|
s.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", parts.Datacenter, addr)
|
||||||
|
|
||||||
// Remove the server if known
|
// Remove the server if known
|
||||||
s.remoteLock.Lock()
|
s.remoteLock.Lock()
|
||||||
existing := s.remoteConsuls[dc]
|
existing := s.remoteConsuls[parts.Datacenter]
|
||||||
n := len(existing)
|
n := len(existing)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
if existing[i].String() == addr.String() {
|
if existing[i].String() == addr.String() {
|
||||||
|
@ -123,9 +123,9 @@ func (s *Server) remoteFailed(me serf.MemberEvent) {
|
||||||
|
|
||||||
// Trim the list if all known consuls are dead
|
// Trim the list if all known consuls are dead
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
delete(s.remoteConsuls, dc)
|
delete(s.remoteConsuls, parts.Datacenter)
|
||||||
} else {
|
} else {
|
||||||
s.remoteConsuls[dc] = existing
|
s.remoteConsuls[parts.Datacenter] = existing
|
||||||
}
|
}
|
||||||
s.remoteLock.Unlock()
|
s.remoteLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,8 +161,12 @@ func NewServer(config *Config) (*Server, error) {
|
||||||
// setupSerf is used to setup and initialize a Serf
|
// setupSerf is used to setup and initialize a Serf
|
||||||
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
|
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
|
||||||
addr := s.rpcListener.Addr().(*net.TCPAddr)
|
addr := s.rpcListener.Addr().(*net.TCPAddr)
|
||||||
|
flags := ""
|
||||||
|
if s.config.Bootstrap {
|
||||||
|
flags = "b"
|
||||||
|
}
|
||||||
conf.NodeName = s.config.NodeName
|
conf.NodeName = s.config.NodeName
|
||||||
conf.Role = fmt.Sprintf("consul:%s:%d", s.config.Datacenter, addr.Port)
|
conf.Role = fmt.Sprintf("consul:%s:%d:%s", s.config.Datacenter, addr.Port, flags)
|
||||||
conf.MemberlistConfig.LogOutput = s.config.LogOutput
|
conf.MemberlistConfig.LogOutput = s.config.LogOutput
|
||||||
conf.LogOutput = s.config.LogOutput
|
conf.LogOutput = s.config.LogOutput
|
||||||
conf.EventCh = ch
|
conf.EventCh = ch
|
||||||
|
|
|
@ -19,6 +19,13 @@ import (
|
||||||
*/
|
*/
|
||||||
var privateBlocks []*net.IPNet
|
var privateBlocks []*net.IPNet
|
||||||
|
|
||||||
|
// serverparts is used to return the parts of a server role
|
||||||
|
type serverParts struct {
|
||||||
|
Datacenter string
|
||||||
|
Port int
|
||||||
|
Flags string
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
// Add each private block
|
// Add each private block
|
||||||
privateBlocks = make([]*net.IPNet, 3)
|
privateBlocks = make([]*net.IPNet, 3)
|
||||||
|
@ -61,21 +68,22 @@ func ensurePath(path string, dir bool) error {
|
||||||
|
|
||||||
// Returns if a member is a consul server. Returns a bool,
|
// Returns if a member is a consul server. Returns a bool,
|
||||||
// the data center, and the rpc port
|
// the data center, and the rpc port
|
||||||
func isConsulServer(m serf.Member) (bool, string, int) {
|
func isConsulServer(m serf.Member) (bool, *serverParts) {
|
||||||
role := m.Role
|
role := m.Role
|
||||||
if !strings.HasPrefix(role, "consul:") {
|
if !strings.HasPrefix(role, "consul:") {
|
||||||
return false, "", 0
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
parts := strings.SplitN(role, ":", 3)
|
parts := strings.SplitN(role, ":", 4)
|
||||||
datacenter := parts[1]
|
datacenter := parts[1]
|
||||||
port_str := parts[2]
|
port_str := parts[2]
|
||||||
|
flags := parts[3]
|
||||||
port, err := strconv.Atoi(port_str)
|
port, err := strconv.Atoi(port_str)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, "", 0
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, datacenter, port
|
return true, &serverParts{datacenter, port, flags}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns if a member is a consul node. Returns a boo,
|
// Returns if a member is a consul node. Returns a boo,
|
||||||
|
|
|
@ -37,9 +37,9 @@ func TestIsConsulServer(t *testing.T) {
|
||||||
m := serf.Member{
|
m := serf.Member{
|
||||||
Role: "consul:east-aws:10000",
|
Role: "consul:east-aws:10000",
|
||||||
}
|
}
|
||||||
valid, dc, port := isConsulServer(m)
|
valid, parts := isConsulServer(m)
|
||||||
if !valid || dc != "east-aws" || port != 10000 {
|
if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 {
|
||||||
t.Fatalf("bad: %v %v %v", valid, dc, port)
|
t.Fatalf("bad: %v %v", valid, parts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue