From d1b2e416802f39a0a83986eaf2c612ddf162f0f1 Mon Sep 17 00:00:00 2001 From: Ali Abbas Date: Wed, 26 Nov 2014 10:25:37 +0100 Subject: [PATCH] * Fix race condition on read/write of shutdown bool variable of server and connection pool. * In connection pool, there is no guarantee that .reap() cannot execute the same time as .Shutdown() is called. It also did not benefit to eval shutdown when a select is run on the shutdown channel. * In server, same principle applies to handleConsulConn. Since we also have a shutdown channel, it makes more to use this than to loop on a bool variable. --- consul/pool.go | 4 ++-- consul/rpc.go | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/consul/pool.go b/consul/pool.go index 4ab75fbc6a..495ab2bd59 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -358,12 +358,12 @@ func (p *ConnPool) RPC(addr net.Addr, version int, method string, args interface // Reap is used to close conns open over maxTime func (p *ConnPool) reap() { - for !p.shutdown { + for { // Sleep for a while select { - case <-time.After(time.Second): case <-p.shutdownCh: return + case <-time.After(time.Second): } // Reap all old conns diff --git a/consul/rpc.go b/consul/rpc.go index 8956b0d0ff..c98ebb60c5 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -3,16 +3,17 @@ package consul import ( "crypto/tls" "fmt" - "github.com/armon/go-metrics" - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/go-msgpack/codec" - "github.com/hashicorp/yamux" - "github.com/inconshreveable/muxado" "io" "math/rand" "net" "strings" "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/yamux" + "github.com/inconshreveable/muxado" ) type RPCType byte @@ -149,7 +150,13 @@ func (s *Server) handleMultiplexV2(conn net.Conn) { func (s *Server) handleConsulConn(conn net.Conn) { defer conn.Close() rpcCodec := codec.GoRpc.ServerCodec(conn, msgpackHandle) - for !s.shutdown { + for { + select { + case <-s.shutdownCh: + return + default: + } + if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { if err != io.EOF && !strings.Contains(err.Error(), "closed") { s.logger.Printf("[ERR] consul.rpc: RPC error: %v (%v)", err, conn)