From d737a2204a4d78c8c475bbe9ecc94ab72d32cb12 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 27 May 2014 15:45:19 -0700 Subject: [PATCH] consul: Pass protocol version for leader forwarding --- consul/rpc.go | 14 ++++++++++++-- consul/serf.go | 20 +++++++++++++++++--- consul/server.go | 6 ++++++ 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/consul/rpc.go b/consul/rpc.go index 478bbd8ef9..60ad5c422e 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -182,12 +182,22 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, // forwardLeader is used to forward an RPC call to the leader, or fail if no leader func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error { + // Get the leader leader := s.raft.Leader() if leader == nil { return structs.ErrNoLeader } - // TODO: Correct version - return s.connPool.RPC(leader, 1, method, args, reply) + + // Lookup the server + s.localLock.RLock() + server := s.localConsuls[leader.String()] + s.localLock.RUnlock() + + // Handle a missing server + if server == nil { + return structs.ErrNoLeader + } + return s.connPool.RPC(server.Addr, server.Version, method, args, reply) } // forwardDC is used to forward an RPC call to a remote DC, or fail if no servers diff --git a/consul/serf.go b/consul/serf.go index c90bacbb79..11a48ee471 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -24,7 +24,7 @@ func (s *Server) lanEventHandler() { case serf.EventMemberLeave: fallthrough case serf.EventMemberFailed: - s.nodeFailed(e.(serf.MemberEvent)) + s.nodeFailed(e.(serf.MemberEvent), false) s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventMemberReap: @@ -54,7 +54,7 @@ func (s *Server) wanEventHandler() { case serf.EventMemberLeave: fallthrough case serf.EventMemberFailed: - s.nodeFailed(e.(serf.MemberEvent)) + s.nodeFailed(e.(serf.MemberEvent), true) case serf.EventMemberUpdate: // Ignore case serf.EventMemberReap: // Ignore case serf.EventUser: @@ -142,11 +142,18 @@ func (s *Server) nodeJoin(me serf.MemberEvent, wan bool) { s.remoteConsuls[parts.Datacenter] = append(existing, parts) } s.remoteLock.Unlock() + + // Add to the local list as well + if !wan { + s.localLock.Lock() + s.localConsuls[parts.Addr.String()] = parts + s.localLock.Unlock() + } } } // nodeFailed is used to handle fail events on both the serf clustes -func (s *Server) nodeFailed(me serf.MemberEvent) { +func (s *Server) nodeFailed(me serf.MemberEvent, wan bool) { for _, m := range me.Members { ok, parts := isConsulServer(m) if !ok { @@ -174,5 +181,12 @@ func (s *Server) nodeFailed(me serf.MemberEvent) { s.remoteConsuls[parts.Datacenter] = existing } s.remoteLock.Unlock() + + // Remove from the local list as well + if !wan { + s.localLock.Lock() + delete(s.localConsuls, parts.Addr.String()) + s.localLock.Unlock() + } } } diff --git a/consul/server.go b/consul/server.go index ba2976192a..6b1603dd65 100644 --- a/consul/server.go +++ b/consul/server.go @@ -68,6 +68,11 @@ type Server struct { // Have we attempted to leave the cluster left bool + // localConsuls is used to track the known consuls + // in the local data center. Used to do leader forwarding. + localConsuls map[string]*serverParts + localLock sync.RWMutex + // Logger uses the provided LogOutput logger *log.Logger @@ -161,6 +166,7 @@ func NewServer(config *Config) (*Server, error) { connPool: NewPool(serverRPCCache, serverMaxStreams, tlsConfig), eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), + localConsuls: make(map[string]*serverParts), logger: logger, reconcileCh: make(chan serf.Member, 32), remoteConsuls: make(map[string][]*serverParts),