Walks back the changes to change pool address interface into strings.

pull/2801/head
James Phillips 2017-03-15 09:06:21 -07:00
parent f1acda4238
commit 5626d3526c
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
7 changed files with 36 additions and 33 deletions

View File

@ -361,6 +361,6 @@ func (s *Server) getServerHealth(id string) *structs.ServerHealth {
func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) {
var args struct{} var args struct{}
var reply structs.ServerStats var reply structs.ServerStats
err := s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, "Status.RaftStats", &args, &reply) err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
return reply, err return reply, err
} }

View File

@ -330,7 +330,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
} }
// Forward to remote Consul // Forward to remote Consul
if err := c.connPool.RPC(c.config.Datacenter, server.Addr.String(), server.Version, method, args, reply); err != nil { if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
c.servers.NotifyFailedServer(server) c.servers.NotifyFailedServer(server)
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
return err return err
@ -357,7 +357,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
// Request the operation. // Request the operation.
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr.String(), args, in, &reply) snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr, args, in, &reply)
if err != nil { if err != nil {
return err return err
} }

View File

@ -40,7 +40,7 @@ type Conn struct {
refCount int32 refCount int32
shouldClose int32 shouldClose int32
addr string addr net.Addr
session muxSession session muxSession
lastUsed time.Time lastUsed time.Time
version int version int
@ -189,12 +189,14 @@ func (p *ConnPool) Shutdown() error {
// wait for an existing connection attempt to finish, if one if in progress, // wait for an existing connection attempt to finish, if one if in progress,
// and will return that one if it succeeds. If all else fails, it will return a // and will return that one if it succeeds. If all else fails, it will return a
// newly-created connection and add it to the pool. // newly-created connection and add it to the pool.
func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) { func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) {
addrStr := addr.String()
// Check to see if there's a pooled connection available. This is up // Check to see if there's a pooled connection available. This is up
// here since it should the the vastly more common case than the rest // here since it should the the vastly more common case than the rest
// of the code here. // of the code here.
p.Lock() p.Lock()
c := p.pool[addr] c := p.pool[addrStr]
if c != nil { if c != nil {
c.markForUse() c.markForUse()
p.Unlock() p.Unlock()
@ -206,9 +208,9 @@ func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) {
// attempt is done. // attempt is done.
var wait chan struct{} var wait chan struct{}
var ok bool var ok bool
if wait, ok = p.limiter[addr]; !ok { if wait, ok = p.limiter[addrStr]; !ok {
wait = make(chan struct{}) wait = make(chan struct{})
p.limiter[addr] = wait p.limiter[addrStr] = wait
} }
isLeadThread := !ok isLeadThread := !ok
p.Unlock() p.Unlock()
@ -218,14 +220,14 @@ func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) {
if isLeadThread { if isLeadThread {
c, err := p.getNewConn(dc, addr, version) c, err := p.getNewConn(dc, addr, version)
p.Lock() p.Lock()
delete(p.limiter, addr) delete(p.limiter, addrStr)
close(wait) close(wait)
if err != nil { if err != nil {
p.Unlock() p.Unlock()
return nil, err return nil, err
} }
p.pool[addr] = c p.pool[addrStr] = c
p.Unlock() p.Unlock()
return c, nil return c, nil
} }
@ -240,7 +242,7 @@ func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) {
// See if the lead thread was able to get us a connection. // See if the lead thread was able to get us a connection.
p.Lock() p.Lock()
if c := p.pool[addr]; c != nil { if c := p.pool[addrStr]; c != nil {
c.markForUse() c.markForUse()
p.Unlock() p.Unlock()
return c, nil return c, nil
@ -261,9 +263,9 @@ type HalfCloser interface {
// DialTimeout is used to establish a raw connection to the given server, with a // DialTimeout is used to establish a raw connection to the given server, with a
// given connection timeout. // given connection timeout.
func (p *ConnPool) DialTimeout(dc string, addr string, timeout time.Duration) (net.Conn, HalfCloser, error) { func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration) (net.Conn, HalfCloser, error) {
// Try to dial the conn // Try to dial the conn
conn, err := net.DialTimeout("tcp", addr, defaultDialTimeout) conn, err := net.DialTimeout("tcp", addr.String(), defaultDialTimeout)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -297,7 +299,7 @@ func (p *ConnPool) DialTimeout(dc string, addr string, timeout time.Duration) (n
} }
// getNewConn is used to return a new connection // getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(dc string, addr string, version int) (*Conn, error) { func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) {
// Get a new, raw connection. // Get a new, raw connection.
conn, _, err := p.DialTimeout(dc, addr, defaultDialTimeout) conn, _, err := p.DialTimeout(dc, addr, defaultDialTimeout)
if err != nil { if err != nil {
@ -343,9 +345,10 @@ func (p *ConnPool) clearConn(conn *Conn) {
atomic.StoreInt32(&conn.shouldClose, 1) atomic.StoreInt32(&conn.shouldClose, 1)
// Clear from the cache // Clear from the cache
addrStr := conn.addr.String()
p.Lock() p.Lock()
if c, ok := p.pool[conn.addr]; ok && c == conn { if c, ok := p.pool[addrStr]; ok && c == conn {
delete(p.pool, conn.addr) delete(p.pool, addrStr)
} }
p.Unlock() p.Unlock()
@ -364,7 +367,7 @@ func (p *ConnPool) releaseConn(conn *Conn) {
} }
// getClient is used to get a usable client for an address and protocol version // getClient is used to get a usable client for an address and protocol version
func (p *ConnPool) getClient(dc string, addr string, version int) (*Conn, *StreamClient, error) { func (p *ConnPool) getClient(dc string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
retries := 0 retries := 0
START: START:
// Try to get a conn first // Try to get a conn first
@ -390,7 +393,7 @@ START:
} }
// RPC is used to make an RPC call to a remote host // RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(dc string, addr string, version int, method string, args interface{}, reply interface{}) error { func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
// Get a usable client // Get a usable client
conn, sc, err := p.getClient(dc, addr, version) conn, sc, err := p.getClient(dc, addr, version)
if err != nil { if err != nil {
@ -415,7 +418,7 @@ func (p *ConnPool) RPC(dc string, addr string, version int, method string, args
// returns true if healthy, false if an error occurred // returns true if healthy, false if an error occurred
func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) { func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) {
// Get a usable client // Get a usable client
conn, sc, err := p.getClient(s.Datacenter, s.Addr.String(), s.Version) conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version)
if err != nil { if err != nil {
return false, err return false, err
} }

View File

@ -262,7 +262,7 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa
if server == nil { if server == nil {
return structs.ErrNoLeader return structs.ErrNoLeader
} }
return s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, method, args, reply) return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply)
} }
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers // forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
@ -274,7 +274,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
} }
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1) metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
if err := s.connPool.RPC(dc, server.Addr.String(), server.Version, method, args, reply); err != nil { if err := s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply); err != nil {
manager.NotifyFailedServer(server) manager.NotifyFailedServer(server)
s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err) s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err)
return err return err

View File

@ -195,7 +195,7 @@ func (s *Server) maybeBootstrap() {
// Retry with exponential backoff to get peer status from this server // Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ { for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, if err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version,
"Status.Peers", &struct{}{}, &peers); err != nil { "Status.Peers", &struct{}{}, &peers); err != nil {
nextRetry := time.Duration((1 << attempt) * peerRetryBase) nextRetry := time.Duration((1 << attempt) * peerRetryBase)
s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+ s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+

View File

@ -35,7 +35,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
return nil, structs.ErrNoDCPath return nil, structs.ErrNoDCPath
} }
snap, err := SnapshotRPC(s.connPool, dc, server.Addr.String(), args, in, reply) snap, err := SnapshotRPC(s.connPool, dc, server.Addr, args, in, reply)
if err != nil { if err != nil {
manager.NotifyFailedServer(server) manager.NotifyFailedServer(server)
return nil, err return nil, err
@ -50,7 +50,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
if server == nil { if server == nil {
return nil, structs.ErrNoLeader return nil, structs.ErrNoLeader
} }
return SnapshotRPC(s.connPool, args.Datacenter, server.Addr.String(), args, in, reply) return SnapshotRPC(s.connPool, args.Datacenter, server.Addr, args, in, reply)
} }
} }
@ -160,7 +160,7 @@ RESPOND:
// the streaming output (for a snapshot). If the reply contains an error, this // the streaming output (for a snapshot). If the reply contains an error, this
// will always return an error as well, so you don't need to check the error // will always return an error as well, so you don't need to check the error
// inside the filled-in reply. // inside the filled-in reply.
func SnapshotRPC(pool *ConnPool, dc string, addr string, func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr,
args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) { args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) {
conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second) conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second)

View File

@ -43,7 +43,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr.String(), snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -115,7 +115,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) {
// Restore the snapshot. // Restore the snapshot.
args.Op = structs.SnapshotRestore args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr.String(), restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
&args, snap, &reply) &args, snap, &reply)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -186,7 +186,7 @@ func TestSnapshot_LeaderState(t *testing.T) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr.String(), snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -219,7 +219,7 @@ func TestSnapshot_LeaderState(t *testing.T) {
// Restore the snapshot. // Restore the snapshot.
args.Op = structs.SnapshotRestore args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr.String(), restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, snap, &reply) &args, snap, &reply)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -256,7 +256,7 @@ func TestSnapshot_ACLDeny(t *testing.T) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr.String(), _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) { if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -270,7 +270,7 @@ func TestSnapshot_ACLDeny(t *testing.T) {
Op: structs.SnapshotRestore, Op: structs.SnapshotRestore,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr.String(), _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) { if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -367,7 +367,7 @@ func TestSnapshot_AllowStale(t *testing.T) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr.String(), _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) { if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -384,7 +384,7 @@ func TestSnapshot_AllowStale(t *testing.T) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr.String(), _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") { if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)