mirror of https://github.com/hashicorp/consul
pool: remove version
The version field has been used to decide which multiplexing to use. It
was introduced in 2457293dce
. But this is
6y ago and there is no need for this differentiation anymore.
pull/7966/head
parent
4f2bff174d
commit
c45432014b
|
@ -109,7 +109,7 @@ func (c *Client) RequestAutoEncryptCerts(servers []string, port int, token strin
|
||||||
for _, ip := range ips {
|
for _, ip := range ips {
|
||||||
addr := net.TCPAddr{IP: ip, Port: port}
|
addr := net.TCPAddr{IP: ip, Port: port}
|
||||||
|
|
||||||
if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, 0, "AutoEncrypt.Sign", &args, &reply); err == nil {
|
if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, "AutoEncrypt.Sign", &args, &reply); err == nil {
|
||||||
return &reply, pkPEM, nil
|
return &reply, pkPEM, nil
|
||||||
} else {
|
} else {
|
||||||
c.logger.Warn("AutoEncrypt failed", "error", err)
|
c.logger.Warn("AutoEncrypt failed", "error", err)
|
||||||
|
|
|
@ -308,7 +308,7 @@ TRY:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make the request.
|
// Make the request.
|
||||||
rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, server.Version, method, args, reply)
|
rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply)
|
||||||
if rpcErr == nil {
|
if rpcErr == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -425,7 +425,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
||||||
for range servers {
|
for range servers {
|
||||||
time.Sleep(200 * time.Millisecond)
|
time.Sleep(200 * time.Millisecond)
|
||||||
s := c.routers.FindServer()
|
s := c.routers.FindServer()
|
||||||
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr, s.Version)
|
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Errorf("Unable to ping server %v: %s", s.String(), err)
|
t.Errorf("Unable to ping server %v: %s", s.String(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -552,7 +552,7 @@ CHECK_LEADER:
|
||||||
rpcErr := structs.ErrNoLeader
|
rpcErr := structs.ErrNoLeader
|
||||||
if leader != nil {
|
if leader != nil {
|
||||||
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
|
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
|
||||||
leader.Version, method, args, reply)
|
method, args, reply)
|
||||||
if rpcErr != nil && canRetry(info, rpcErr) {
|
if rpcErr != nil && canRetry(info, rpcErr) {
|
||||||
goto RETRY
|
goto RETRY
|
||||||
}
|
}
|
||||||
|
@ -617,7 +617,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
|
||||||
|
|
||||||
metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1,
|
metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1,
|
||||||
[]metrics.Label{{Name: "datacenter", Value: dc}})
|
[]metrics.Label{{Name: "datacenter", Value: dc}})
|
||||||
if err := s.connPool.RPC(dc, server.ShortName, server.Addr, server.Version, method, args, reply); err != nil {
|
if err := s.connPool.RPC(dc, server.ShortName, server.Addr, method, args, reply); err != nil {
|
||||||
manager.NotifyFailedServer(server)
|
manager.NotifyFailedServer(server)
|
||||||
s.rpcLogger().Error("RPC failed to server in DC",
|
s.rpcLogger().Error("RPC failed to server in DC",
|
||||||
"server", server.Addr,
|
"server", server.Addr,
|
||||||
|
|
|
@ -355,7 +355,7 @@ func (s *Server) maybeBootstrap() {
|
||||||
|
|
||||||
// Retry with exponential backoff to get peer status from this server
|
// Retry with exponential backoff to get peer status from this server
|
||||||
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
|
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
|
||||||
if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr, server.Version,
|
if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr,
|
||||||
"Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil {
|
"Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil {
|
||||||
nextRetry := (1 << attempt) * time.Second
|
nextRetry := (1 << attempt) * time.Second
|
||||||
s.logger.Error("Failed to confirm peer status for server (will retry).",
|
s.logger.Error("Failed to confirm peer status for server (will retry).",
|
||||||
|
|
|
@ -1277,7 +1277,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
|
||||||
if leader == nil {
|
if leader == nil {
|
||||||
t.Fatal("no leader")
|
t.Fatal("no leader")
|
||||||
}
|
}
|
||||||
return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr, leader.Version)
|
return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_TLSToNoTLS(t *testing.T) {
|
func TestServer_TLSToNoTLS(t *testing.T) {
|
||||||
|
|
|
@ -43,7 +43,7 @@ func NewStatsFetcher(logger hclog.Logger, pool *pool.ConnPool, datacenter string
|
||||||
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) {
|
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) {
|
||||||
var args struct{}
|
var args struct{}
|
||||||
var reply autopilot.ServerStats
|
var reply autopilot.ServerStats
|
||||||
err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
|
err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, "Status.RaftStats", &args, &reply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.logger.Warn("error getting server health from server",
|
f.logger.Warn("error getting server health from server",
|
||||||
"server", server.Name,
|
"server", server.Name,
|
||||||
|
|
|
@ -46,7 +46,6 @@ type Conn struct {
|
||||||
addr net.Addr
|
addr net.Addr
|
||||||
session muxSession
|
session muxSession
|
||||||
lastUsed time.Time
|
lastUsed time.Time
|
||||||
version int
|
|
||||||
|
|
||||||
pool *ConnPool
|
pool *ConnPool
|
||||||
|
|
||||||
|
@ -209,7 +208,7 @@ func (p *ConnPool) Shutdown() error {
|
||||||
// wait for an existing connection attempt to finish, if one if in progress,
|
// wait for an existing connection attempt to finish, if one if in progress,
|
||||||
// and will return that one if it succeeds. If all else fails, it will return a
|
// and will return that one if it succeeds. If all else fails, it will return a
|
||||||
// newly-created connection and add it to the pool.
|
// newly-created connection and add it to the pool.
|
||||||
func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, error) {
|
func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr, useTLS bool) (*Conn, error) {
|
||||||
if nodeName == "" {
|
if nodeName == "" {
|
||||||
return nil, fmt.Errorf("pool: ConnPool.acquire requires a node name")
|
return nil, fmt.Errorf("pool: ConnPool.acquire requires a node name")
|
||||||
}
|
}
|
||||||
|
@ -244,7 +243,7 @@ func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr, version in
|
||||||
// If we are the lead thread, make the new connection and then wake
|
// If we are the lead thread, make the new connection and then wake
|
||||||
// everybody else up to see if we got it.
|
// everybody else up to see if we got it.
|
||||||
if isLeadThread {
|
if isLeadThread {
|
||||||
c, err := p.getNewConn(dc, nodeName, addr, version, useTLS)
|
c, err := p.getNewConn(dc, nodeName, addr, useTLS)
|
||||||
p.Lock()
|
p.Lock()
|
||||||
delete(p.limiter, addrStr)
|
delete(p.limiter, addrStr)
|
||||||
close(wait)
|
close(wait)
|
||||||
|
@ -497,17 +496,11 @@ func DialTimeoutWithRPCTypeViaMeshGateway(
|
||||||
}
|
}
|
||||||
|
|
||||||
// getNewConn is used to return a new connection
|
// getNewConn is used to return a new connection
|
||||||
func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, error) {
|
func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, useTLS bool) (*Conn, error) {
|
||||||
if nodeName == "" {
|
if nodeName == "" {
|
||||||
return nil, fmt.Errorf("pool: ConnPool.getNewConn requires a node name")
|
return nil, fmt.Errorf("pool: ConnPool.getNewConn requires a node name")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Switch the multiplexing based on version
|
|
||||||
var session muxSession
|
|
||||||
if version < 2 {
|
|
||||||
return nil, fmt.Errorf("cannot make client connection, unsupported protocol version %d", version)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get a new, raw connection and write the Consul multiplex byte to set the mode
|
// Get a new, raw connection and write the Consul multiplex byte to set the mode
|
||||||
conn, _, err := p.DialTimeout(dc, nodeName, addr, defaultDialTimeout, useTLS, RPCMultiplexV2)
|
conn, _, err := p.DialTimeout(dc, nodeName, addr, defaultDialTimeout, useTLS, RPCMultiplexV2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -519,7 +512,7 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version
|
||||||
conf.LogOutput = p.LogOutput
|
conf.LogOutput = p.LogOutput
|
||||||
|
|
||||||
// Create a multiplexed session
|
// Create a multiplexed session
|
||||||
session, _ = yamux.Client(conn, conf)
|
session, _ := yamux.Client(conn, conf)
|
||||||
|
|
||||||
// Wrap the connection
|
// Wrap the connection
|
||||||
c := &Conn{
|
c := &Conn{
|
||||||
|
@ -529,7 +522,6 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version
|
||||||
session: session,
|
session: session,
|
||||||
clients: list.New(),
|
clients: list.New(),
|
||||||
lastUsed: time.Now(),
|
lastUsed: time.Now(),
|
||||||
version: version,
|
|
||||||
pool: p,
|
pool: p,
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
|
@ -567,12 +559,12 @@ func (p *ConnPool) releaseConn(conn *Conn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getClient is used to get a usable client for an address and protocol version
|
// getClient is used to get a usable client for an address
|
||||||
func (p *ConnPool) getClient(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, *StreamClient, error) {
|
func (p *ConnPool) getClient(dc string, nodeName string, addr net.Addr, useTLS bool) (*Conn, *StreamClient, error) {
|
||||||
retries := 0
|
retries := 0
|
||||||
START:
|
START:
|
||||||
// Try to get a conn first
|
// Try to get a conn first
|
||||||
conn, err := p.acquire(dc, nodeName, addr, version, useTLS)
|
conn, err := p.acquire(dc, nodeName, addr, useTLS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
|
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -598,7 +590,6 @@ func (p *ConnPool) RPC(
|
||||||
dc string,
|
dc string,
|
||||||
nodeName string,
|
nodeName string,
|
||||||
addr net.Addr,
|
addr net.Addr,
|
||||||
version int,
|
|
||||||
method string,
|
method string,
|
||||||
args interface{},
|
args interface{},
|
||||||
reply interface{},
|
reply interface{},
|
||||||
|
@ -610,7 +601,7 @@ func (p *ConnPool) RPC(
|
||||||
if method == "AutoEncrypt.Sign" {
|
if method == "AutoEncrypt.Sign" {
|
||||||
return p.rpcInsecure(dc, nodeName, addr, method, args, reply)
|
return p.rpcInsecure(dc, nodeName, addr, method, args, reply)
|
||||||
} else {
|
} else {
|
||||||
return p.rpc(dc, nodeName, addr, version, method, args, reply)
|
return p.rpc(dc, nodeName, addr, method, args, reply)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -636,12 +627,12 @@ func (p *ConnPool) rpcInsecure(dc string, nodeName string, addr net.Addr, method
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
|
func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error {
|
||||||
p.once.Do(p.init)
|
p.once.Do(p.init)
|
||||||
|
|
||||||
// Get a usable client
|
// Get a usable client
|
||||||
useTLS := p.TLSConfigurator.UseTLS(dc)
|
useTLS := p.TLSConfigurator.UseTLS(dc)
|
||||||
conn, sc, err := p.getClient(dc, nodeName, addr, version, useTLS)
|
conn, sc, err := p.getClient(dc, nodeName, addr, useTLS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("rpc error getting client: %v", err)
|
return fmt.Errorf("rpc error getting client: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -671,9 +662,9 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, version int, m
|
||||||
|
|
||||||
// Ping sends a Status.Ping message to the specified server and
|
// Ping sends a Status.Ping message to the specified server and
|
||||||
// returns true if healthy, false if an error occurred
|
// returns true if healthy, false if an error occurred
|
||||||
func (p *ConnPool) Ping(dc string, nodeName string, addr net.Addr, version int) (bool, error) {
|
func (p *ConnPool) Ping(dc string, nodeName string, addr net.Addr) (bool, error) {
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := p.RPC(dc, nodeName, addr, version, "Status.Ping", struct{}{}, &out)
|
err := p.RPC(dc, nodeName, addr, "Status.Ping", struct{}{}, &out)
|
||||||
return err == nil, err
|
return err == nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,7 @@ type ManagerSerfCluster interface {
|
||||||
// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import
|
// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import
|
||||||
// dependency.
|
// dependency.
|
||||||
type Pinger interface {
|
type Pinger interface {
|
||||||
Ping(dc, nodeName string, addr net.Addr, version int) (bool, error)
|
Ping(dc, nodeName string, addr net.Addr) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// serverList is a local copy of the struct used to maintain the list of
|
// serverList is a local copy of the struct used to maintain the list of
|
||||||
|
@ -350,7 +350,7 @@ func (m *Manager) RebalanceServers() {
|
||||||
if m.serverName != "" && srv.Name == m.serverName {
|
if m.serverName != "" && srv.Name == m.serverName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.ShortName, srv.Addr, srv.Version)
|
ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.ShortName, srv.Addr)
|
||||||
if ok {
|
if ok {
|
||||||
foundHealthyServer = true
|
foundHealthyServer = true
|
||||||
break
|
break
|
||||||
|
|
|
@ -33,7 +33,7 @@ type fauxConnPool struct {
|
||||||
failPct float64
|
failPct float64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *fauxConnPool) Ping(string, string, net.Addr, int) (bool, error) {
|
func (cp *fauxConnPool) Ping(string, string, net.Addr) (bool, error) {
|
||||||
var success bool
|
var success bool
|
||||||
successProb := rand.Float64()
|
successProb := rand.Float64()
|
||||||
if successProb > cp.failPct {
|
if successProb > cp.failPct {
|
||||||
|
@ -179,7 +179,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
||||||
// failPct of the servers for the reconcile. This
|
// failPct of the servers for the reconcile. This
|
||||||
// allows for the selected server to no longer be
|
// allows for the selected server to no longer be
|
||||||
// healthy for the reconcile below.
|
// healthy for the reconcile below.
|
||||||
if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.ShortName, node.Addr, node.Version); ok {
|
if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.ShortName, node.Addr); ok {
|
||||||
// Will still be present
|
// Will still be present
|
||||||
healthyServers = append(healthyServers, node)
|
healthyServers = append(healthyServers, node)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -32,7 +32,7 @@ type fauxConnPool struct {
|
||||||
failAddr net.Addr
|
failAddr net.Addr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *fauxConnPool) Ping(dc string, nodeName string, addr net.Addr, version int) (bool, error) {
|
func (cp *fauxConnPool) Ping(dc string, nodeName string, addr net.Addr) (bool, error) {
|
||||||
var success bool
|
var success bool
|
||||||
|
|
||||||
successProb := rand.Float64()
|
successProb := rand.Float64()
|
||||||
|
|
Loading…
Reference in New Issue