fix race condition in transport

pull/432/head
Darien Raymond 2017-02-18 00:28:50 +01:00
parent ebed271a92
commit 7a97d73737
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
3 changed files with 26 additions and 12 deletions

View File

@ -31,7 +31,7 @@ func (ec *ExpiringConnection) Expired() bool {
// Pool is a connection pool. // Pool is a connection pool.
type Pool struct { type Pool struct {
sync.Mutex sync.RWMutex
connsByDest map[ConnectionID][]*ExpiringConnection connsByDest map[ConnectionID][]*ExpiringConnection
cleanupToken *signal.Semaphore cleanupToken *signal.Semaphore
} }
@ -74,10 +74,17 @@ func (p *Pool) Get(id ConnectionID) net.Conn {
return conn.conn return conn.conn
} }
func (p *Pool) isEmpty() bool {
p.RLock()
defer p.RUnlock()
return len(p.connsByDest) == 0
}
func (p *Pool) cleanup() { func (p *Pool) cleanup() {
defer p.cleanupToken.Signal() defer p.cleanupToken.Signal()
for len(p.connsByDest) > 0 { for !p.isEmpty() {
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
expiredConns := make([]net.Conn, 0, 16) expiredConns := make([]net.Conn, 0, 16)
p.Lock() p.Lock()

View File

@ -246,13 +246,14 @@ func (v *Listener) Accept() (internet.Connection, error) {
// Close stops listening on the UDP address. Already Accepted connections are not closed. // Close stops listening on the UDP address. Already Accepted connections are not closed.
func (v *Listener) Close() error { func (v *Listener) Close() error {
v.Lock()
defer v.Unlock()
select { select {
case <-v.closed: case <-v.closed:
return ErrClosedListener return ErrClosedListener
default: default:
} }
v.Lock()
defer v.Unlock()
close(v.closed) close(v.closed)
close(v.awaitingConns) close(v.awaitingConns)

View File

@ -52,7 +52,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
type Listener struct { type Listener struct {
sync.Mutex sync.Mutex
acccepting bool closed chan bool
awaitingConns chan *ConnectionWithError awaitingConns chan *ConnectionWithError
listener net.Listener listener net.Listener
tlsConfig *tls.Config tlsConfig *tls.Config
@ -67,7 +67,7 @@ func ListenWS(address v2net.Address, port v2net.Port, options internet.ListenOpt
wsSettings := networkSettings.(*Config) wsSettings := networkSettings.(*Config)
l := &Listener{ l := &Listener{
acccepting: true, closed: make(chan bool),
awaitingConns: make(chan *ConnectionWithError, 32), awaitingConns: make(chan *ConnectionWithError, 32),
config: wsSettings, config: wsSettings,
} }
@ -130,8 +130,10 @@ func converttovws(w http.ResponseWriter, r *http.Request) (*connection, error) {
} }
func (ln *Listener) Accept() (internet.Connection, error) { func (ln *Listener) Accept() (internet.Connection, error) {
for ln.acccepting { for {
select { select {
case <-ln.closed:
return nil, ErrClosedListener
case connErr, open := <-ln.awaitingConns: case connErr, open := <-ln.awaitingConns:
if !open { if !open {
return nil, ErrClosedListener return nil, ErrClosedListener
@ -143,14 +145,15 @@ func (ln *Listener) Accept() (internet.Connection, error) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
} }
} }
return nil, ErrClosedListener
} }
func (ln *Listener) Put(id internal.ConnectionID, conn net.Conn) { func (ln *Listener) Put(id internal.ConnectionID, conn net.Conn) {
ln.Lock() ln.Lock()
defer ln.Unlock() defer ln.Unlock()
if !ln.acccepting { select {
case <-ln.closed:
return return
default:
} }
select { select {
case ln.awaitingConns <- &ConnectionWithError{conn: conn}: case ln.awaitingConns <- &ConnectionWithError{conn: conn}:
@ -166,10 +169,13 @@ func (ln *Listener) Addr() net.Addr {
func (ln *Listener) Close() error { func (ln *Listener) Close() error {
ln.Lock() ln.Lock()
defer ln.Unlock() defer ln.Unlock()
ln.acccepting = false select {
case <-ln.closed:
return ErrClosedListener
default:
}
close(ln.closed)
ln.listener.Close() ln.listener.Close()
close(ln.awaitingConns) close(ln.awaitingConns)
for connErr := range ln.awaitingConns { for connErr := range ln.awaitingConns {
if connErr.conn != nil { if connErr.conn != nil {