consul: Store the protocol version for a server

pull/179/head
Armon Dadgar 2014-05-27 15:07:31 -07:00
parent aa32e95139
commit 3a854b5952
6 changed files with 56 additions and 28 deletions

View File

@ -43,7 +43,7 @@ type Client struct {
connPool *ConnPool
// consuls tracks the locally known servers
consuls []net.Addr
consuls []*serverParts
consulLock sync.RWMutex
// eventCh is used to receive events from the
@ -52,7 +52,7 @@ type Client struct {
// lastServer is the last server we made an RPC call to,
// this is used to re-use the last connection
lastServer net.Addr
lastServer *serverParts
lastRPCTime time.Time
// Logger uses the provided LogOutput
@ -230,15 +230,14 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
m.Name, parts.Datacenter)
continue
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
c.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr)
c.logger.Printf("[INFO] consul: adding server %s", parts)
// Check if this server is known
found := false
c.consulLock.Lock()
for _, c := range c.consuls {
if c.String() == addr.String() {
for idx, existing := range c.consuls {
if existing.Name == parts.Name {
c.consuls[idx] = parts
found = true
break
}
@ -246,7 +245,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
// Add to the list if not known
if !found {
c.consuls = append(c.consuls, addr)
c.consuls = append(c.consuls, parts)
}
c.consulLock.Unlock()
@ -304,7 +303,7 @@ func (c *Client) localEvent(event serf.UserEvent) {
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Check the last rpc time
var server net.Addr
var server *serverParts
if time.Now().Sub(c.lastRPCTime) < clientRPCCache {
server = c.lastServer
if server != nil {
@ -325,8 +324,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Forward to remote Consul
TRY_RPC:
// TODO: Correct version
if err := c.connPool.RPC(server, 1, method, args, reply); err != nil {
if err := c.connPool.RPC(server.Addr, server.Version, method, args, reply); err != nil {
c.lastServer = nil
c.lastRPCTime = time.Time{}
return err

View File

@ -186,7 +186,8 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{
if leader == nil {
return structs.ErrNoLeader
}
return s.connPool.RPC(leader, method, args, reply)
// TODO: Correct version
return s.connPool.RPC(leader, 1, method, args, reply)
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
@ -207,7 +208,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
// Forward to remote Consul
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
return s.connPool.RPC(server, method, args, reply)
return s.connPool.RPC(server.Addr, server.Version, method, args, reply)
}
// raftApply is used to encode a message, run it through raft, and return

View File

@ -2,7 +2,6 @@ package consul
import (
"github.com/hashicorp/serf/serf"
"net"
"strings"
)
@ -118,15 +117,15 @@ func (s *Server) remoteJoin(me serf.MemberEvent) {
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s %s", m.Name)
continue
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
s.logger.Printf("[INFO] consul: adding server for datacenter: %s, addr: %s", parts.Datacenter, addr)
s.logger.Printf("[INFO] consul: adding server %s", parts)
// Check if this server is known
found := false
s.remoteLock.Lock()
existing := s.remoteConsuls[parts.Datacenter]
for _, e := range existing {
if e.String() == addr.String() {
for idx, e := range existing {
if e.Name == parts.Name {
existing[idx] = parts
found = true
break
}
@ -134,7 +133,7 @@ func (s *Server) remoteJoin(me serf.MemberEvent) {
// Add ot the list if not known
if !found {
s.remoteConsuls[parts.Datacenter] = append(existing, addr)
s.remoteConsuls[parts.Datacenter] = append(existing, parts)
}
s.remoteLock.Unlock()
}
@ -147,15 +146,14 @@ func (s *Server) remoteFailed(me serf.MemberEvent) {
if !ok {
continue
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
s.logger.Printf("[INFO] consul: removing server for datacenter: %s, addr: %s", parts.Datacenter, addr)
s.logger.Printf("[INFO] consul: removing server %s", parts)
// Remove the server if known
s.remoteLock.Lock()
existing := s.remoteConsuls[parts.Datacenter]
n := len(existing)
for i := 0; i < n; i++ {
if existing[i].String() == addr.String() {
if existing[i].Name == parts.Name {
existing[i], existing[n-1] = existing[n-1], nil
existing = existing[:n-1]
n--

View File

@ -86,7 +86,7 @@ type Server struct {
// remoteConsuls is used to track the known consuls in
// remote data centers. Used to do DC forwarding.
remoteConsuls map[string][]net.Addr
remoteConsuls map[string][]*serverParts
remoteLock sync.RWMutex
// rpcListener is used to listen for incoming connections
@ -164,7 +164,7 @@ func NewServer(config *Config) (*Server, error) {
eventChWAN: make(chan serf.Event, 256),
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]net.Addr),
remoteConsuls: make(map[string][]*serverParts),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
shutdownCh: make(chan struct{}),
@ -522,8 +522,8 @@ func (s *Server) IsLeader() bool {
// RPC is used to make a local RPC call
func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
addr := s.rpcListener.Addr()
// TODO: Correct version
return s.connPool.RPC(addr, 1, method, args, reply)
version := int(s.config.ProtocolVersion)
return s.connPool.RPC(addr, version, method, args, reply)
}
// Stats is used to return statistics for debugging and insight

View File

@ -22,12 +22,18 @@ var privateBlocks []*net.IPNet
// serverparts is used to return the parts of a server role
type serverParts struct {
Name string
Datacenter string
Port int
Bootstrap bool
Version int
Addr net.Addr
}
func (s *serverParts) String() string {
return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter)
}
func init() {
// Add each private block
privateBlocks = make([]*net.IPNet, 3)
@ -76,14 +82,31 @@ func isConsulServer(m serf.Member) (bool, *serverParts) {
}
datacenter := m.Tags["dc"]
port_str := m.Tags["port"]
_, bootstrap := m.Tags["bootstrap"]
port_str := m.Tags["port"]
port, err := strconv.Atoi(port_str)
if err != nil {
return false, nil
}
vsn_str := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsn_str)
if err != nil {
return false, nil
}
addr := &net.TCPAddr{IP: m.Addr, Port: port}
return true, &serverParts{datacenter, port, bootstrap, addr}
parts := &serverParts{
Name: m.Name,
Datacenter: datacenter,
Port: port,
Bootstrap: bootstrap,
Addr: addr,
Version: vsn,
}
return true, parts
}
// Returns if a member is a consul node. Returns a boo,

View File

@ -37,17 +37,22 @@ func TestIsPrivateIP(t *testing.T) {
func TestIsConsulServer(t *testing.T) {
m := serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
},
}
valid, parts := isConsulServer(m)
if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 {
t.Fatalf("bad: %v %v", valid, parts)
}
if parts.Name != "foo" {
t.Fatalf("bad: %v", parts)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
@ -59,6 +64,9 @@ func TestIsConsulServer(t *testing.T) {
if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr)
}
if parts.Version != 1 {
t.Fatalf("bad: %v", parts)
}
}
func TestIsConsulNode(t *testing.T) {