mirror of https://github.com/hashicorp/consul
consul: Pass protocol version for leader forwarding
parent
f16d13213c
commit
d737a2204a
|
@ -182,12 +182,22 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
|
|||
|
||||
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
|
||||
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
|
||||
// Get the leader
|
||||
leader := s.raft.Leader()
|
||||
if leader == nil {
|
||||
return structs.ErrNoLeader
|
||||
}
|
||||
// TODO: Correct version
|
||||
return s.connPool.RPC(leader, 1, method, args, reply)
|
||||
|
||||
// Lookup the server
|
||||
s.localLock.RLock()
|
||||
server := s.localConsuls[leader.String()]
|
||||
s.localLock.RUnlock()
|
||||
|
||||
// Handle a missing server
|
||||
if server == nil {
|
||||
return structs.ErrNoLeader
|
||||
}
|
||||
return s.connPool.RPC(server.Addr, server.Version, method, args, reply)
|
||||
}
|
||||
|
||||
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
|
||||
|
|
|
@ -24,7 +24,7 @@ func (s *Server) lanEventHandler() {
|
|||
case serf.EventMemberLeave:
|
||||
fallthrough
|
||||
case serf.EventMemberFailed:
|
||||
s.nodeFailed(e.(serf.MemberEvent))
|
||||
s.nodeFailed(e.(serf.MemberEvent), false)
|
||||
s.localMemberEvent(e.(serf.MemberEvent))
|
||||
|
||||
case serf.EventMemberReap:
|
||||
|
@ -54,7 +54,7 @@ func (s *Server) wanEventHandler() {
|
|||
case serf.EventMemberLeave:
|
||||
fallthrough
|
||||
case serf.EventMemberFailed:
|
||||
s.nodeFailed(e.(serf.MemberEvent))
|
||||
s.nodeFailed(e.(serf.MemberEvent), true)
|
||||
case serf.EventMemberUpdate: // Ignore
|
||||
case serf.EventMemberReap: // Ignore
|
||||
case serf.EventUser:
|
||||
|
@ -142,11 +142,18 @@ func (s *Server) nodeJoin(me serf.MemberEvent, wan bool) {
|
|||
s.remoteConsuls[parts.Datacenter] = append(existing, parts)
|
||||
}
|
||||
s.remoteLock.Unlock()
|
||||
|
||||
// Add to the local list as well
|
||||
if !wan {
|
||||
s.localLock.Lock()
|
||||
s.localConsuls[parts.Addr.String()] = parts
|
||||
s.localLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nodeFailed is used to handle fail events on both the serf clustes
|
||||
func (s *Server) nodeFailed(me serf.MemberEvent) {
|
||||
func (s *Server) nodeFailed(me serf.MemberEvent, wan bool) {
|
||||
for _, m := range me.Members {
|
||||
ok, parts := isConsulServer(m)
|
||||
if !ok {
|
||||
|
@ -174,5 +181,12 @@ func (s *Server) nodeFailed(me serf.MemberEvent) {
|
|||
s.remoteConsuls[parts.Datacenter] = existing
|
||||
}
|
||||
s.remoteLock.Unlock()
|
||||
|
||||
// Remove from the local list as well
|
||||
if !wan {
|
||||
s.localLock.Lock()
|
||||
delete(s.localConsuls, parts.Addr.String())
|
||||
s.localLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,11 @@ type Server struct {
|
|||
// Have we attempted to leave the cluster
|
||||
left bool
|
||||
|
||||
// localConsuls is used to track the known consuls
|
||||
// in the local data center. Used to do leader forwarding.
|
||||
localConsuls map[string]*serverParts
|
||||
localLock sync.RWMutex
|
||||
|
||||
// Logger uses the provided LogOutput
|
||||
logger *log.Logger
|
||||
|
||||
|
@ -161,6 +166,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||
connPool: NewPool(serverRPCCache, serverMaxStreams, tlsConfig),
|
||||
eventChLAN: make(chan serf.Event, 256),
|
||||
eventChWAN: make(chan serf.Event, 256),
|
||||
localConsuls: make(map[string]*serverParts),
|
||||
logger: logger,
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
remoteConsuls: make(map[string][]*serverParts),
|
||||
|
|
Loading…
Reference in New Issue