Browse Source

router: remove grpcServerTracker from managers

It only needs to be refereced from the Router, because there is only 1 instance, and the
Router can call AddServer/RemoveServer like it does on the Manager.
pull/8667/head
Daniel Nephin 4 years ago
parent
commit
07b4507f1e
  1. 17
      agent/router/manager.go
  2. 6
      agent/router/manager_internal_test.go
  3. 8
      agent/router/manager_test.go
  4. 4
      agent/router/router.go

17
agent/router/manager.go

@ -98,10 +98,6 @@ type Manager struct {
// client.ConnPool.
connPoolPinger Pinger
// grpcServerTracker is used to balance grpc connections across servers,
// and has callbacks for adding or removing a server.
grpcServerTracker ServerTracker
// serverName has the name of the managers's server. This is used to
// short-circuit pinging to itself.
serverName string
@ -123,7 +119,6 @@ type Manager struct {
func (m *Manager) AddServer(s *metadata.Server) {
m.listLock.Lock()
defer m.listLock.Unlock()
m.grpcServerTracker.AddServer(s)
l := m.getServerList()
// Check if this server is known
@ -256,11 +251,6 @@ func (m *Manager) CheckServers(fn func(srv *metadata.Server) bool) {
_ = m.checkServers(fn)
}
// Servers returns the current list of servers.
func (m *Manager) Servers() []*metadata.Server {
return m.getServerList().servers
}
// getServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (m *Manager) getServerList() serverList {
@ -277,19 +267,15 @@ func (m *Manager) saveServerList(l serverList) {
}
// New is the only way to safely create a new Manager struct.
func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, tracker ServerTracker, serverName string) (m *Manager) {
func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string) (m *Manager) {
if logger == nil {
logger = hclog.New(&hclog.LoggerOptions{})
}
if tracker == nil {
tracker = NoOpServerTracker{}
}
m = new(Manager)
m.logger = logger.Named(logging.Manager)
m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
m.grpcServerTracker = tracker
m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
m.shutdownCh = shutdownCh
m.serverName = serverName
@ -492,7 +478,6 @@ func (m *Manager) reconcileServerList(l *serverList) bool {
func (m *Manager) RemoveServer(s *metadata.Server) {
m.listLock.Lock()
defer m.listLock.Unlock()
m.grpcServerTracker.RemoveServer(s)
l := m.getServerList()
// Remove the server if known

6
agent/router/manager_internal_test.go

@ -54,14 +54,14 @@ func (s *fauxSerf) NumNodes() int {
func testManager() (m *Manager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}, nil, "")
m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}, "")
return m
}
func testManagerFailProb(failPct float64) (m *Manager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, nil, "")
m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "")
return m
}
@ -300,7 +300,7 @@ func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
shutdownCh := make(chan struct{})
for _, s := range clusters {
m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}, nil, "")
m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}, "")
for i := 0; i < s.numServers; i++ {
nodeName := fmt.Sprintf("s%02d", i)
m.AddServer(&metadata.Server{Name: nodeName})

8
agent/router/manager_test.go

@ -57,21 +57,21 @@ func (s *fauxSerf) NumNodes() int {
func testManager(t testing.TB) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, nil, "")
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "")
return m
}
func testManagerFailProb(t testing.TB, failPct float64) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, nil, "")
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "")
return m
}
func testManagerFailAddr(t testing.TB, failAddr net.Addr) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failAddr: failAddr}, nil, "")
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failAddr: failAddr}, "")
return m
}
@ -195,7 +195,7 @@ func TestServers_FindServer(t *testing.T) {
func TestServers_New(t *testing.T) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, nil, "")
m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "")
if m == nil {
t.Fatalf("Manager nil")
}

4
agent/router/router.go

@ -259,7 +259,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
}
shutdownCh := make(chan struct{})
manager := New(r.logger, shutdownCh, area.cluster, area.pinger, nil, r.serverName)
manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName)
info = &managerInfo{
manager: manager,
shutdownCh: shutdownCh,
@ -286,6 +286,7 @@ func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
}
manager.AddServer(s)
r.grpcServerTracker.AddServer(s)
return nil
}
@ -321,6 +322,7 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error {
return nil
}
info.manager.RemoveServer(s)
r.grpcServerTracker.RemoveServer(s)
// If this manager is empty then remove it so we don't accumulate cruft
// and waste time during request routing.

Loading…
Cancel
Save