mirror of https://github.com/hashicorp/consul
agent: move agent/consul/servers to agent/router
parent
1acff3533e
commit
c395599cea
|
@ -12,8 +12,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/consul/servers"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
|
@ -52,9 +52,9 @@ type Client struct {
|
|||
// Connection pool to consul servers
|
||||
connPool *pool.ConnPool
|
||||
|
||||
// servers is responsible for the selection and maintenance of
|
||||
// routers is responsible for the selection and maintenance of
|
||||
// Consul servers this agent uses for RPC requests
|
||||
servers *servers.Manager
|
||||
routers *router.Manager
|
||||
|
||||
// eventCh is used to receive events from the
|
||||
// serf cluster in the datacenter
|
||||
|
@ -140,8 +140,8 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
|
|||
}
|
||||
|
||||
// Start maintenance task for servers
|
||||
c.servers = servers.New(c.logger, c.shutdownCh, c.serf, c.connPool)
|
||||
go c.servers.Start()
|
||||
c.routers = router.New(c.logger, c.shutdownCh, c.serf, c.connPool)
|
||||
go c.routers.Start()
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
@ -285,7 +285,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
|
|||
continue
|
||||
}
|
||||
c.logger.Printf("[INFO] consul: adding server %s", parts)
|
||||
c.servers.AddServer(parts)
|
||||
c.routers.AddServer(parts)
|
||||
|
||||
// Trigger the callback
|
||||
if c.config.ServerUp != nil {
|
||||
|
@ -302,7 +302,7 @@ func (c *Client) nodeFail(me serf.MemberEvent) {
|
|||
continue
|
||||
}
|
||||
c.logger.Printf("[INFO] consul: removing server %s", parts)
|
||||
c.servers.RemoveServer(parts)
|
||||
c.routers.RemoveServer(parts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -336,14 +336,14 @@ func (c *Client) localEvent(event serf.UserEvent) {
|
|||
|
||||
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||
server := c.servers.FindServer()
|
||||
server := c.routers.FindServer()
|
||||
if server == nil {
|
||||
return structs.ErrNoServers
|
||||
}
|
||||
|
||||
// Forward to remote Consul
|
||||
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil {
|
||||
c.servers.NotifyFailedServer(server)
|
||||
c.routers.NotifyFailedServer(server)
|
||||
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
|
||||
return err
|
||||
}
|
||||
|
@ -358,7 +358,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
|
|||
replyFn structs.SnapshotReplyFn) error {
|
||||
|
||||
// Locate a server to make the request to.
|
||||
server := c.servers.FindServer()
|
||||
server := c.routers.FindServer()
|
||||
if server == nil {
|
||||
return structs.ErrNoServers
|
||||
}
|
||||
|
@ -395,7 +395,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
|
|||
// Stats is used to return statistics for debugging and insight
|
||||
// for various sub-systems
|
||||
func (c *Client) Stats() map[string]map[string]string {
|
||||
numServers := c.servers.NumServers()
|
||||
numServers := c.routers.NumServers()
|
||||
|
||||
toString := func(v uint64) string {
|
||||
return strconv.FormatUint(v, 10)
|
||||
|
@ -412,7 +412,7 @@ func (c *Client) Stats() map[string]map[string]string {
|
|||
}
|
||||
|
||||
func (c *Client) ServerAddrs() map[string]string {
|
||||
return c.servers.GetServerAddrs()
|
||||
return c.routers.GetServerAddrs()
|
||||
}
|
||||
|
||||
// GetLANCoordinate returns the network coordinate of the current node, as
|
||||
|
|
|
@ -84,7 +84,7 @@ func TestClient_JoinLAN(t *testing.T) {
|
|||
// Try to join
|
||||
joinLAN(t, c1, s1)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if got, want := c1.servers.NumServers(), 1; got != want {
|
||||
if got, want := c1.routers.NumServers(), 1; got != want {
|
||||
r.Fatalf("got %d servers want %d", got, want)
|
||||
}
|
||||
if got, want := len(s1.LANMembers()), 2; got != want {
|
||||
|
@ -254,7 +254,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sleep to allow Serf to sync, shuffle, and let the shuffle complete
|
||||
c.servers.ResetRebalanceTimer()
|
||||
c.routers.ResetRebalanceTimer()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
if len(c.LANMembers()) != numServers+numClients {
|
||||
|
@ -270,7 +270,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
|||
var pingCount int
|
||||
for range servers {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
s := c.servers.FindServer()
|
||||
s := c.routers.FindServer()
|
||||
ok, err := c.connPool.Ping(s.Datacenter, s.Addr, s.Version, s.UseTLS)
|
||||
if !ok {
|
||||
t.Errorf("Unable to ping server %v: %s", s.String(), err)
|
||||
|
@ -279,7 +279,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
|||
|
||||
// Artificially fail the server in order to rotate the server
|
||||
// list
|
||||
c.servers.NotifyFailedServer(s)
|
||||
c.routers.NotifyFailedServer(s)
|
||||
}
|
||||
|
||||
if pingCount != numServers {
|
||||
|
@ -354,7 +354,7 @@ func TestClient_SnapshotRPC(t *testing.T) {
|
|||
|
||||
// Wait until we've got a healthy server.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if got, want := c1.servers.NumServers(), 1; got != want {
|
||||
if got, want := c1.routers.NumServers(), 1; got != want {
|
||||
r.Fatalf("got %d servers want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
@ -413,7 +413,7 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Wait until we've got a healthy server.
|
||||
if got, want := c1.servers.NumServers(), 1; got != want {
|
||||
if got, want := c1.routers.NumServers(), 1; got != want {
|
||||
r.Fatalf("got %d servers want %d", got, want)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -3,7 +3,7 @@ package consul
|
|||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/servers"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
|
@ -24,7 +24,7 @@ func (s *Server) FloodNotify() {
|
|||
// Flood is a long-running goroutine that floods servers from the LAN to the
|
||||
// given global Serf instance, such as the WAN. This will exit once either of
|
||||
// the Serf instances are shut down.
|
||||
func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) {
|
||||
func (s *Server) Flood(portFn router.FloodPortFn, global *serf.Serf) {
|
||||
s.floodLock.Lock()
|
||||
floodCh := make(chan struct{})
|
||||
s.floodCh = append(s.floodCh, floodCh)
|
||||
|
@ -61,6 +61,6 @@ func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) {
|
|||
}
|
||||
|
||||
FLOOD:
|
||||
servers.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global)
|
||||
router.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,9 +19,9 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/consul/servers"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
|
@ -154,7 +154,7 @@ type Server struct {
|
|||
|
||||
// router is used to map out Consul servers in the WAN and in Consul
|
||||
// Enterprise user-defined areas.
|
||||
router *servers.Router
|
||||
router *router.Router
|
||||
|
||||
// Listener is used to listen for incoming connections
|
||||
Listener net.Listener
|
||||
|
@ -181,7 +181,7 @@ type Server struct {
|
|||
sessionTimers *SessionTimers
|
||||
|
||||
// statsFetcher is used by autopilot to check the status of the other
|
||||
// Consul servers.
|
||||
// Consul router.
|
||||
statsFetcher *StatsFetcher
|
||||
|
||||
// reassertLeaderCh is used to signal the leader loop should re-run
|
||||
|
@ -298,7 +298,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
localConsuls: make(map[raft.ServerAddress]*agent.Server),
|
||||
logger: logger,
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
router: servers.NewRouter(logger, config.Datacenter),
|
||||
router: router.NewRouter(logger, config.Datacenter),
|
||||
rpcServer: rpc.NewServer(),
|
||||
rpcTLS: incomingTLS,
|
||||
reassertLeaderCh: make(chan chan error),
|
||||
|
@ -382,7 +382,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
|
||||
}
|
||||
go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
|
||||
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
|
||||
|
||||
// Fire up the LAN <-> WAN join flooder.
|
||||
portFn := func(s *agent.Server) (int, bool) {
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
// client's perspective (i.e. a list of servers that a client talks with for
|
||||
// RPCs). The servers package does not provide any API guarantees and should
|
||||
// be called only by `hashicorp/consul`.
|
||||
package servers
|
||||
package router
|
||||
|
||||
import (
|
||||
"log"
|
|
@ -1,4 +1,4 @@
|
|||
package servers
|
||||
package router
|
||||
|
||||
import (
|
||||
"bytes"
|
|
@ -1,4 +1,4 @@
|
|||
package servers_test
|
||||
package router_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -10,7 +10,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/consul/servers"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
)
|
||||
|
||||
type fauxConnPool struct {
|
||||
|
@ -34,17 +34,17 @@ func (s *fauxSerf) NumNodes() int {
|
|||
return 16384
|
||||
}
|
||||
|
||||
func testManager() (m *servers.Manager) {
|
||||
func testManager() (m *router.Manager) {
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||
return m
|
||||
}
|
||||
|
||||
func testManagerFailProb(failPct float64) (m *servers.Manager) {
|
||||
func testManagerFailProb(failPct float64) (m *router.Manager) {
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
|
||||
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
|
||||
return m
|
||||
}
|
||||
|
||||
|
@ -169,7 +169,7 @@ func TestServers_FindServer(t *testing.T) {
|
|||
func TestServers_New(t *testing.T) {
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
m := servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||
m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||
if m == nil {
|
||||
t.Fatalf("Manager nil")
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package servers
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
|
@ -1,4 +1,4 @@
|
|||
package servers
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
|
@ -1,4 +1,4 @@
|
|||
package servers
|
||||
package router
|
||||
|
||||
import (
|
||||
"log"
|
|
@ -1,4 +1,4 @@
|
|||
package servers
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
Loading…
Reference in New Issue