Use bind address as source for outgoing connections (#2822)

This patch configures consul to use the bind address as the
source address for outgoing connections.

Fixes #2822
pull/3006/head
Frank Schroeder 2017-05-03 12:57:11 +02:00 committed by Frank Schröder
parent fb83790cb9
commit 8c2b261c61
6 changed files with 29 additions and 10 deletions

View File

@ -439,6 +439,11 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.RPCAdvertise = base.RPCAddr
}
// set the src address for outgoing rpc connections
// to RPCAdvertise with port 0 so that outgoing
// connections use a random port.
base.RPCSrcAddr = &net.TCPAddr{IP: base.RPCAdvertise.IP}
// Format the build string
revision := a.config.Revision
if len(revision) > 8 {

View File

@ -115,7 +115,7 @@ func NewClient(config *Config) (*Client, error) {
// Create server
c := &Client{
config: config,
connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
connPool: NewPool(config.RPCSrcAddr, config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger,
shutdownCh: make(chan struct{}),

View File

@ -95,6 +95,10 @@ type Config struct {
// reachable
RPCAdvertise *net.TCPAddr
// RPCSrcAddr is the source address for outgoing RPC connections.
// It is RPCAdvertise with the port set to zero.
RPCSrcAddr *net.TCPAddr
// SerfLANConfig is the configuration for the intra-dc serf
SerfLANConfig *serf.Config

View File

@ -120,6 +120,9 @@ func (c *Conn) markForUse() {
type ConnPool struct {
sync.Mutex
// src is the source address for outgoing connections.
src *net.TCPAddr
// LogOutput is used to control logging
logOutput io.Writer
@ -129,7 +132,7 @@ type ConnPool struct {
// The maximum number of open streams to keep
maxStreams int
// Pool maps an address to a open connection
// pool maps an address to a open connection
pool map[string]*Conn
// limiter is used to throttle the number of connect attempts
@ -151,8 +154,9 @@ type ConnPool struct {
// Set maxTime to 0 to disable reaping. maxStreams is used to control
// the number of idle streams allowed.
// If TLS settings are provided outgoing connections use TLS.
func NewPool(logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.DCWrapper) *ConnPool {
func NewPool(src *net.TCPAddr, logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.DCWrapper) *ConnPool {
pool := &ConnPool{
src: src,
logOutput: logOutput,
maxTime: maxTime,
maxStreams: maxStreams,
@ -265,7 +269,8 @@ type HalfCloser interface {
// given connection timeout.
func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration) (net.Conn, HalfCloser, error) {
// Try to dial the conn
conn, err := net.DialTimeout("tcp", addr.String(), defaultDialTimeout)
d := &net.Dialer{LocalAddr: p.src, Timeout: timeout}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, nil, err
}

View File

@ -13,10 +13,13 @@ import (
// RaftLayer implements the raft.StreamLayer interface,
// so that we can use a single RPC layer for Raft and Consul
type RaftLayer struct {
// Addr is the listener address to return
// src is the address for outgoing connections.
src net.Addr
// addr is the listener address to return.
addr net.Addr
// connCh is used to accept connections
// connCh is used to accept connections.
connCh chan net.Conn
// TLS wrapper
@ -31,8 +34,9 @@ type RaftLayer struct {
// NewRaftLayer is used to initialize a new RaftLayer which can
// be used as a StreamLayer for Raft. If a tlsConfig is provided,
// then the connection will use TLS.
func NewRaftLayer(addr net.Addr, tlsWrap tlsutil.Wrapper) *RaftLayer {
func NewRaftLayer(src, addr net.Addr, tlsWrap tlsutil.Wrapper) *RaftLayer {
layer := &RaftLayer{
src: src,
addr: addr,
connCh: make(chan net.Conn),
tlsWrap: tlsWrap,
@ -82,7 +86,8 @@ func (l *RaftLayer) Addr() net.Addr {
// Dial is used to create a new outgoing connection
func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", string(address), timeout)
d := &net.Dialer{LocalAddr: l.src, Timeout: timeout}
conn, err := d.Dial("tcp", string(address))
if err != nil {
return nil, err
}

View File

@ -257,7 +257,7 @@ func NewServer(config *Config) (*Server, error) {
autopilotRemoveDeadCh: make(chan struct{}),
autopilotShutdownCh: make(chan struct{}),
config: config,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
connPool: NewPool(config.RPCSrcAddr, config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[raft.ServerAddress]*agent.Server),
@ -613,7 +613,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant.
wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)
s.raftLayer = NewRaftLayer(s.config.RPCAdvertise, wrapper)
s.raftLayer = NewRaftLayer(s.config.RPCSrcAddr, s.config.RPCAdvertise, wrapper)
return nil
}