Browse Source

Merge pull request #499 from alouche/fix_potential_race_condition_shutdown

Fix potential race condition on shutdown (pool.reap/server.handleConsulConn)
pull/511/head
Armon Dadgar 10 years ago
parent
commit
ab92a900d6
  1. 4
      consul/pool.go
  2. 19
      consul/rpc.go

4
consul/pool.go

@ -358,12 +358,12 @@ func (p *ConnPool) RPC(addr net.Addr, version int, method string, args interface
// Reap is used to close conns open over maxTime
func (p *ConnPool) reap() {
for !p.shutdown {
for {
// Sleep for a while
select {
case <-time.After(time.Second):
case <-p.shutdownCh:
return
case <-time.After(time.Second):
}
// Reap all old conns

19
consul/rpc.go

@ -3,16 +3,17 @@ package consul
import (
"crypto/tls"
"fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
"io"
"math/rand"
"net"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
)
type RPCType byte
@ -149,7 +150,13 @@ func (s *Server) handleMultiplexV2(conn net.Conn) {
func (s *Server) handleConsulConn(conn net.Conn) {
defer conn.Close()
rpcCodec := codec.GoRpc.ServerCodec(conn, msgpackHandle)
for !s.shutdown {
for {
select {
case <-s.shutdownCh:
return
default:
}
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
if err != io.EOF && !strings.Contains(err.Error(), "closed") {
s.logger.Printf("[ERR] consul.rpc: RPC error: %v (%v)", err, conn)

Loading…
Cancel
Save