mirror of https://github.com/hashicorp/consul
Implement AddressProvider and wire that up to raft transport layer to support server nodes changing their IP addresses in containerized environments
parent
5ded0fbf4e
commit
393ce1581b
|
@ -494,7 +494,7 @@ func (s *Server) setupRaft() error {
|
|||
}
|
||||
|
||||
// Create a transport layer.
|
||||
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
|
||||
trans := raft.NewNetworkTransportWithServerAddressProvider(s.raftLayer, 3, 10*time.Second, s)
|
||||
s.raftTransport = trans
|
||||
|
||||
// Make sure we set the LogOutput.
|
||||
|
@ -1047,6 +1047,18 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
|
|||
return s.serfWAN.GetCoordinate()
|
||||
}
|
||||
|
||||
func (s *Server) ServerAddr(id raft.ServerID) raft.ServerAddress {
|
||||
if string(id) == string(s.config.NodeID) {
|
||||
return raft.ServerAddress(s.config.RPCAddr.String())
|
||||
}
|
||||
addr, err := s.router.GetServerAddressByID(s.config.Datacenter, string(id))
|
||||
if err != nil {
|
||||
s.logger.Println("[WARN] Unable to find address for raft server id %v", id)
|
||||
return raft.ServerAddress("")
|
||||
}
|
||||
return raft.ServerAddress(addr)
|
||||
}
|
||||
|
||||
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
|
||||
func (s *Server) setConsistentReadReady() {
|
||||
atomic.StoreInt32(&s.readyForConsistentReads, 1)
|
||||
|
|
|
@ -79,6 +79,9 @@ type Manager struct {
|
|||
listValue atomic.Value
|
||||
listLock sync.Mutex
|
||||
|
||||
// idToAddress provides lookup of server address by id, and is maintained alongside listValue
|
||||
idToAddress atomic.Value
|
||||
|
||||
// rebalanceTimer controls the duration of the rebalance interval
|
||||
rebalanceTimer *time.Timer
|
||||
|
||||
|
@ -223,10 +226,30 @@ func (m *Manager) getServerList() serverList {
|
|||
return m.listValue.Load().(serverList)
|
||||
}
|
||||
|
||||
// GetServerAddress by ID returns a server address based on the id
|
||||
func (m *Manager) GetServerAddressByID(id string) string {
|
||||
idAddrMap := m.idToAddress.Load().(map[string]string)
|
||||
addr, ok := idAddrMap[id]
|
||||
if !ok {
|
||||
m.logger.Printf("[WARN] Unable to find address for node id %v", id)
|
||||
return ""
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
// saveServerList is a convenience method which hides the locking semantics
|
||||
// of atomic.Value from the caller.
|
||||
func (m *Manager) saveServerList(l serverList) {
|
||||
m.listValue.Store(l)
|
||||
m.idToAddress.Store(makeIdAddrMap(l))
|
||||
}
|
||||
|
||||
func makeIdAddrMap(list serverList) map[string]string {
|
||||
ret := make(map[string]string)
|
||||
for _, server := range list.servers {
|
||||
ret[server.ID] = server.Addr.String()
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// New is the only way to safely create a new Manager struct.
|
||||
|
|
|
@ -489,3 +489,28 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
|
|||
}
|
||||
return maps, nil
|
||||
}
|
||||
|
||||
func (r *Router) GetServerAddressByID(datacenter string, id string) (string, error) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
// Get the list of managers for this datacenter. This will usually just
|
||||
// have one entry, but it's possible to have a user-defined area + WAN.
|
||||
managers, ok := r.managers[datacenter]
|
||||
if !ok {
|
||||
return "", fmt.Errorf("datacenter %v not found", datacenter)
|
||||
}
|
||||
|
||||
// loop over all the managers till we find a matching address for the id
|
||||
// there could be more than for if network areas are configured
|
||||
for _, manager := range managers {
|
||||
if manager.IsOffline() {
|
||||
continue
|
||||
}
|
||||
id := manager.GetServerAddressByID(id)
|
||||
if id != "" {
|
||||
return id, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("Unable to match id %v to any known servers ", id)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue