@ -14,27 +14,6 @@ import (
type consulServerEventTypes int
const (
// consulServersNodeJoin is used to notify of a new consulServer.
// The primary effect of this is a reshuffling of consulServers and
// finding a new preferredServer.
consulServersNodeJoin = iota
// consulServersRebalance is used to signal we should rebalance our
// connection load across servers
consulServersRebalance
// consulServersRefreshRebalanceDuration is used to signal when we
// should reset the rebalance duration because the server list has
// changed and we don't need to proactively change our connection
consulServersRefreshRebalanceDuration
// consulServersRPCError is used to signal when a server has either
// timed out or returned an error and we would like to have the
// server manager find a new preferredServer.
consulServersRPCError
)
const (
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
@ -149,34 +128,9 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
copy ( newServers , serverCfg . servers )
newServers = append ( newServers , server )
serverCfg . servers = newServers
// Notify the server maintenance task of a new server
sm . consulServersCh <- consulServersNodeJoin
}
sm . saveServerConfig ( serverCfg )
}
// CycleFailedServers takes out an internal write lock and dequeues all
// failed servers and re-enqueues them. This method does not reshuffle the
// server list, instead it requests the rebalance duration be refreshed/reset
// further into the future.
func ( sm * ServerManager ) CycleFailedServers ( ) {
sm . serverConfigLock . Lock ( )
defer sm . serverConfigLock . Unlock ( )
serverCfg := sm . getServerConfig ( )
for i := range serverCfg . servers {
failCount := atomic . LoadUint64 ( & ( serverCfg . servers [ i ] . Disabled ) )
if failCount == 0 {
break
} else if failCount > 0 {
serverCfg . servers = serverCfg . cycleServer ( )
}
}
sm . saveServerConfig ( serverCfg )
sm . requestRefreshRebalanceDuration ( )
}
// cycleServers returns a new list of servers that has dequeued the first
@ -197,27 +151,16 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
// FindHealthyServer takes out an internal "read lock" and searches through
// the list of servers to find a healthy server.
func ( sm * ServerManager ) FindHealthyServer ( ) ( server * server_details . ServerDetails ) {
func ( sm * ServerManager ) FindHealthyServer ( ) * server_details . ServerDetails {
serverCfg := sm . getServerConfig ( )
numServers := len ( serverCfg . servers )
if numServers == 0 {
sm . logger . Printf ( "[ERR] consul: No servers found in the server config" )
return nil
} else {
// Return whatever is at the front of the list
return serverCfg . servers [ 0 ]
}
// Find the first non-failing server in the server list. If this is
// not the first server a prior RPC call marked the first server as
// failed and we're waiting for the server management task to reorder
// a working server to the front of the list.
for i := range serverCfg . servers {
failCount := atomic . LoadUint64 ( & ( serverCfg . servers [ i ] . Disabled ) )
if failCount == 0 {
server = serverCfg . servers [ i ]
break
}
}
return server
}
// GetNumServers takes out an internal "read lock" and returns the number of
@ -254,13 +197,25 @@ func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, serf *serf.S
// NotifyFailedServer is an exported convenience function that allows callers
// to pass in a server that has failed an RPC request and mark it as failed.
// This will initiate a background task that will optimize the failed server
// to the end of the serer list. No locks are required here because we a re
// bypassing the serverConfig and sending a message to ServerManager's
// channel .
// If the server being failed is not the first server on the list, this is a
// noop. If, however, the server is failed and first on the list, acqui re
// the lock, retest, and take the penalty of moving the server to the end of
// the list .
func ( sm * ServerManager ) NotifyFailedServer ( server * server_details . ServerDetails ) {
atomic . AddUint64 ( & server . Disabled , 1 )
sm . consulServersCh <- consulServersRPCError
serverCfg := sm . getServerConfig ( )
if len ( serverCfg . servers ) > 0 && serverCfg . servers [ 0 ] == server {
// Grab a lock, retest, and take the hit of cycling the first
// server to the end.
sm . serverConfigLock . Lock ( ) // FIXME(sean@): wtb TryLock
defer sm . serverConfigLock . Unlock ( )
serverCfg = sm . getServerConfig ( )
if len ( serverCfg . servers ) > 0 && serverCfg . servers [ 0 ] == server {
serverCfg . cycleServer ( )
sm . saveServerConfig ( serverCfg )
}
}
}
// RebalanceServers takes out an internal write lock and shuffles the list of
@ -287,7 +242,6 @@ func (sm *ServerManager) RebalanceServers() {
serverCfg . servers = newServers
sm . saveServerConfig ( serverCfg )
sm . requestRefreshRebalanceDuration ( )
}
// RemoveServer takes out an internal write lock and removes a server from
@ -324,12 +278,6 @@ func (sm *ServerManager) requestRefreshRebalanceDuration() {
sm . refreshRebalanceDurationCh <- true
}
// requestServerRebalance sends a message to which causes a background thread
// to reshuffle the list of servers
func ( sm * ServerManager ) requestServerRebalance ( ) {
sm . consulServersCh <- consulServersRebalance
}
// refreshServerRebalanceTimer is called
func ( sm * ServerManager ) refreshServerRebalanceTimer ( timer * time . Timer ) {
serverCfg := sm . getServerConfig ( )
@ -359,11 +307,10 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) {
sm . serverConfigValue . Store ( sc )
}
// StartServerManager is used to start and manage the task of automatically
// shuffling and rebalance the list of consul servers. This maintenance
// happens either when a new server is added or when a duration has been
// exceed.
func ( sm * ServerManager ) StartServerManager ( ) {
// Start is used to start and manage the task of automatically shuffling and
// rebalance the list of consul servers. This maintenance happens either
// when a new server is added or when a duration has been exceed.
func ( sm * ServerManager ) Start ( ) {
var rebalanceTimer * time . Timer = time . NewTimer ( time . Duration ( initialRebalanceTimeoutHours * time . Hour ) )
var rebalanceTaskDispatched int32
@ -382,51 +329,22 @@ func (sm *ServerManager) StartServerManager() {
for {
select {
case e := <- sm . consulServersCh :
switch e {
case consulServersNodeJoin :
sm . logger . Printf ( "[INFO] server manager: new node joined cluster" )
// rebalance on new server
sm . requestServerRebalance ( )
case consulServersRebalance :
sm . logger . Printf ( "[INFO] server manager: rebalancing servers by request" )
sm . RebalanceServers ( )
case consulServersRPCError :
sm . logger . Printf ( "[INFO] server manager: need to find a new server to talk with" )
sm . CycleFailedServers ( )
// FIXME(sean@): wtb preemptive Status.Ping
// of servers, ideally parallel fan-out of N
// nodes, then settle on the first node which
// responds successfully.
//
// Is there a distinction between slow and
// offline? Do we run the Status.Ping with a
// fixed timeout (say 30s) that way we can
// alert administrators that they've set
// their RPC time too low even though the
// Ping did return successfully?
default :
sm . logger . Printf ( "[WARN] server manager: unhandled LAN Serf Event: %#v" , e )
}
case <- sm . refreshRebalanceDurationCh :
chanLen := len ( sm . refreshRebalanceDurationCh )
// Drain all messages from the rebalance channel
for i := 0 ; i < chanLen ; i ++ {
<- sm . refreshRebalanceDurationCh
}
case <- rebalanceTimer . C :
sm . logger . Printf ( "[INFO] server manager: server rebalance timeout" )
sm . RebalanceServers ( )
// Only run one rebalance task at a time, but do
// allow for the channel to be drained
if atomic . CompareAndSwapInt32 ( & rebalanceTaskDispatched , 0 , 1 ) {
sm . logger . Printf ( "[INFO] server manager: Launching rebalance duration task" )
go func ( ) {
defer atomic . StoreInt32 ( & rebalanceTaskDispatched , 0 )
sm . refreshServerRebalanceTimer ( rebalanceTimer )
} ( )
}
case <- rebalanceTimer . C :
sm . logger . Printf ( "[INFO] consul: server rebalance timeout" )
sm . RebalanceServers ( )
case <- sm . shutdownCh :
sm . logger . Printf ( "[INFO] server manager: shutting down" )
return
}
}