Document the various functions and their locking

pull/1873/head
Sean Chittenden 9 years ago
parent 9eb6481d73
commit b4db49a62e

@ -93,6 +93,9 @@ type ServerManager struct {
logger *log.Logger logger *log.Logger
} }
// AddServer takes out an internal write lock and adds a new server. If the
// server is not known, it adds the new server and schedules a rebalance. If
// it is known, we merge the new server details.
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
@ -123,6 +126,10 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
sm.serverConfigValue.Store(serverCfg) sm.serverConfigValue.Store(serverCfg)
} }
// CycleFailedServers takes out an internal write lock and dequeues all
// failed servers and re-enqueues them. This method does not reshuffle the
// server list. Because this changed the order of servers, we push out the
// time at which a rebalance occurs.
func (sm *ServerManager) CycleFailedServers() { func (sm *ServerManager) CycleFailedServers() {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
@ -141,6 +148,9 @@ func (sm *ServerManager) CycleFailedServers() {
sm.serverConfigValue.Store(serverCfg) sm.serverConfigValue.Store(serverCfg)
} }
// cycleServers returns a new list of servers that has dequeued the first
// server and enqueued it at the end of the list. cycleServers assumes the
// caller is holding the serverConfigLock.
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
numServers := len(servers) numServers := len(servers)
if numServers < 2 { if numServers < 2 {
@ -155,6 +165,8 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
return servers return servers
} }
// FindHealthyServer takes out an internal "read lock" and searches through
// the list of servers to find a healthy server.
func (sm *ServerManager) FindHealthyServer() (server *server_details.ServerDetails) { func (sm *ServerManager) FindHealthyServer() (server *server_details.ServerDetails) {
serverCfg := sm.getServerConfig() serverCfg := sm.getServerConfig()
numServers := len(serverCfg.servers) numServers := len(serverCfg.servers)
@ -178,6 +190,8 @@ func (sm *ServerManager) FindHealthyServer() (server *server_details.ServerDetai
return server return server
} }
// GetNumServers takes out an internal "read lock" and returns the number of
// servers. numServers includes both healthy and unhealthy servers.
func (sm *ServerManager) GetNumServers() (numServers int) { func (sm *ServerManager) GetNumServers() (numServers int) {
serverCfg := sm.getServerConfig() serverCfg := sm.getServerConfig()
numServers = len(serverCfg.servers) numServers = len(serverCfg.servers)
@ -205,11 +219,23 @@ func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerM
return sm return sm
} }
// NotifyFailedServer is an exported convenience function that allows callers
// to pass in a server that has failed an RPC request and mark it as failed.
// This will initiate a background task that will optimize the failed server
// to the end of the serer list. No locks are required here because we are
// bypassing the serverConfig and sending a message to ServerManager's
// channel.
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
atomic.AddUint64(&server.Disabled, 1) atomic.AddUint64(&server.Disabled, 1)
sm.consulServersCh <- consulServersRPCError sm.consulServersCh <- consulServersRPCError
} }
// RebalanceServers takes out an internal write lock and shuffles the list of
// servers on this agent. This allows for a redistribution of work across
// consul servers and provides a guarantee that the order list of
// ServerDetails isn't actually ordered, therefore we can sequentially walk
// the array to pick a server without all agents in the cluster dog piling on
// a single node.
func (sm *ServerManager) RebalanceServers() { func (sm *ServerManager) RebalanceServers() {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
@ -227,6 +253,12 @@ func (sm *ServerManager) RebalanceServers() {
sm.serverConfigValue.Store(serverCfg) sm.serverConfigValue.Store(serverCfg)
} }
// RemoveServer takes out an internal write lock and removes a server from
// the server list. No rebalancing happens as a result of the removed server
// because we do not want a network partition which separated a server from
// this agent to cause an increase in work. Instead we rely on the internal
// already existing semantics to handle failure detection after a server has
// been removed.
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()

Loading…
Cancel
Save