consul: Conn pool clean, spare existing streams

pull/179/head
Armon Dadgar 11 years ago
parent 22f548c338
commit 5124428ba7

@ -42,6 +42,8 @@ type StreamClient struct {
// Conn is a pooled connection to a Consul server // Conn is a pooled connection to a Consul server
type Conn struct { type Conn struct {
refCount int32 refCount int32
shouldClose int32
addr net.Addr addr net.Addr
session muxSession session muxSession
lastUsed time.Time lastUsed time.Time
@ -93,7 +95,7 @@ func (c *Conn) getClient() (*StreamClient, error) {
func (c *Conn) returnClient(client *StreamClient) { func (c *Conn) returnClient(client *StreamClient) {
didSave := false didSave := false
c.clientLock.Lock() c.clientLock.Lock()
if c.clients.Len() < c.pool.maxStreams { if c.clients.Len() < c.pool.maxStreams && atomic.LoadInt32(&c.shouldClose) == 0 {
c.clients.PushFront(client) c.clients.PushFront(client)
didSave = true didSave = true
} }
@ -184,14 +186,12 @@ func (p *ConnPool) acquire(addr net.Addr, version int) (*Conn, error) {
// getPooled is used to return a pooled connection // getPooled is used to return a pooled connection
func (p *ConnPool) getPooled(addr net.Addr, version int) *Conn { func (p *ConnPool) getPooled(addr net.Addr, version int) *Conn {
p.Lock() p.Lock()
defer p.Unlock()
// Look for an existing connection
c := p.pool[addr.String()] c := p.pool[addr.String()]
if c != nil { if c != nil {
c.lastUsed = time.Now() c.lastUsed = time.Now()
atomic.AddInt32(&c.refCount, 1) atomic.AddInt32(&c.refCount, 1)
} }
p.Unlock()
return c return c
} }
@ -261,29 +261,41 @@ func (p *ConnPool) getNewConn(addr net.Addr, version int) (*Conn, error) {
// Track this connection, handle potential race condition // Track this connection, handle potential race condition
p.Lock() p.Lock()
defer p.Unlock()
if existing := p.pool[addr.String()]; existing != nil { if existing := p.pool[addr.String()]; existing != nil {
session.Close() c.Close()
p.Unlock()
return existing, nil return existing, nil
} else { } else {
p.pool[addr.String()] = c p.pool[addr.String()] = c
p.Unlock()
return c, nil return c, nil
} }
} }
// clearConn is used to clear any cached connection, potentially in response to an erro // clearConn is used to clear any cached connection, potentially in response to an erro
func (p *ConnPool) clearConn(addr net.Addr) { func (p *ConnPool) clearConn(conn *Conn) {
// Ensure returned streams are closed
atomic.StoreInt32(&conn.shouldClose, 1)
// Clear from the cache
p.Lock() p.Lock()
defer p.Unlock() if c, ok := p.pool[conn.addr.String()]; ok && c == conn {
if conn, ok := p.pool[addr.String()]; ok { delete(p.pool, conn.addr.String())
}
p.Unlock()
// Close down immediately if idle
if refCount := atomic.LoadInt32(&conn.shouldClose); refCount == 0 {
conn.Close() conn.Close()
delete(p.pool, addr.String())
} }
} }
// releaseConn is invoked when we are done with a conn to reduce the ref count // releaseConn is invoked when we are done with a conn to reduce the ref count
func (p *ConnPool) releaseConn(conn *Conn) { func (p *ConnPool) releaseConn(conn *Conn) {
atomic.AddInt32(&conn.refCount, -1) refCount := atomic.AddInt32(&conn.refCount, -1)
if refCount == 0 && atomic.LoadInt32(&conn.shouldClose) == 1 {
conn.Close()
}
} }
// getClient is used to get a usable client for an address and protocol version // getClient is used to get a usable client for an address and protocol version
@ -299,7 +311,8 @@ START:
// Get a client // Get a client
client, err := conn.getClient() client, err := conn.getClient()
if err != nil { if err != nil {
p.clearConn(addr) p.clearConn(conn)
p.releaseConn(conn)
// Try to redial, possible that the TCP session closed due to timeout // Try to redial, possible that the TCP session closed due to timeout
if retries == 0 { if retries == 0 {
@ -313,23 +326,24 @@ START:
// RPC is used to make an RPC call to a remote host // RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(addr net.Addr, version int, method string, args interface{}, reply interface{}) error { func (p *ConnPool) RPC(addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
// Get a usable client
conn, sc, err := p.getClient(addr, version) conn, sc, err := p.getClient(addr, version)
defer func() { if err != nil {
conn.returnClient(sc) return fmt.Errorf("rpc error: %v", err)
p.releaseConn(conn) }
}()
// Make the RPC call // Make the RPC call
err = sc.client.Call(method, args, reply) err = sc.client.Call(method, args, reply)
if err != nil {
// Fast path the non-error case p.clearConn(conn)
if err == nil { p.releaseConn(conn)
return nil return fmt.Errorf("rpc error: %v", err)
} }
// Do-not re-use as a pre-caution // Done with the connection
p.clearConn(addr) conn.returnClient(sc)
return fmt.Errorf("rpc error: %v", err) p.releaseConn(conn)
return nil
} }
// Reap is used to close conns open over maxTime // Reap is used to close conns open over maxTime

Loading…
Cancel
Save