|
|
@ -98,6 +98,10 @@ type ServerManager struct {
|
|
|
|
// cluster and limit the rate at which it rebalances server
|
|
|
|
// cluster and limit the rate at which it rebalances server
|
|
|
|
// connections
|
|
|
|
// connections
|
|
|
|
clusterInfo ConsulClusterInfo
|
|
|
|
clusterInfo ConsulClusterInfo
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// notifyFailedServersBarrier is acts as a barrier to prevent
|
|
|
|
|
|
|
|
// queueing behind serverConfigLog and acts as a TryLock().
|
|
|
|
|
|
|
|
notifyFailedBarrier int32
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// AddServer takes out an internal write lock and adds a new server. If the
|
|
|
|
// AddServer takes out an internal write lock and adds a new server. If the
|
|
|
@ -207,10 +211,14 @@ func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, cci ConsulCl
|
|
|
|
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
|
|
|
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
|
|
|
serverCfg := sm.getServerConfig()
|
|
|
|
serverCfg := sm.getServerConfig()
|
|
|
|
|
|
|
|
|
|
|
|
if len(serverCfg.servers) > 0 && serverCfg.servers[0] == server {
|
|
|
|
// Use atomic.CAS to emulate a TryLock().
|
|
|
|
|
|
|
|
if len(serverCfg.servers) > 0 && serverCfg.servers[0] == server &&
|
|
|
|
|
|
|
|
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
|
|
|
|
|
|
|
|
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0)
|
|
|
|
|
|
|
|
|
|
|
|
// Grab a lock, retest, and take the hit of cycling the first
|
|
|
|
// Grab a lock, retest, and take the hit of cycling the first
|
|
|
|
// server to the end.
|
|
|
|
// server to the end.
|
|
|
|
sm.serverConfigLock.Lock() // FIXME(sean@): wtb TryLock
|
|
|
|
sm.serverConfigLock.Lock()
|
|
|
|
defer sm.serverConfigLock.Unlock()
|
|
|
|
defer sm.serverConfigLock.Unlock()
|
|
|
|
serverCfg = sm.getServerConfig()
|
|
|
|
serverCfg = sm.getServerConfig()
|
|
|
|
|
|
|
|
|
|
|
|