Inject UDP timeout to enable testing

pull/6/head
Tim Hockin 2014-09-11 09:50:20 -07:00
parent 38416f6a23
commit 86d12681f2
2 changed files with 25 additions and 19 deletions

View File

@ -35,6 +35,7 @@ type serviceInfo struct {
port int
protocol string
socket proxySocket
timeout time.Duration
mu sync.Mutex // protects active
active bool
}
@ -130,9 +131,6 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
}
// How long we leave idle UDP connections open.
const udpIdleTimeout = 1 * time.Minute
func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
info, found := proxier.getServiceInfo(service)
if !found {
@ -184,7 +182,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
continue
}
activeClients.clients[cliAddr.String()] = svrConn
go udp.proxyClient(cliAddr, svrConn, activeClients)
go udp.proxyClient(cliAddr, svrConn, activeClients, info.timeout)
}
activeClients.mu.Unlock()
// TODO: It would be nice to let the goroutine handle this write, but we don't
@ -197,7 +195,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
}
continue
}
svrConn.SetDeadline(time.Now().Add(udpIdleTimeout))
svrConn.SetDeadline(time.Now().Add(info.timeout))
if err != nil {
glog.Errorf("SetDeadline failed: %v", err)
continue
@ -206,7 +204,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
}
// This function is expected to be called as a goroutine.
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache) {
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
defer svrConn.Close()
var buffer [4096]byte
for {
@ -217,7 +215,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ
}
break
}
svrConn.SetDeadline(time.Now().Add(udpIdleTimeout))
svrConn.SetDeadline(time.Now().Add(timeout))
if err != nil {
glog.Errorf("SetDeadline failed: %v", err)
break
@ -334,8 +332,9 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
var unusedPortLock sync.Mutex
// addServiceOnUnusedPort starts listening for a new service, returning the
// port it's using. For testing on a system with unknown ports used.
func (proxier *Proxier) addServiceOnUnusedPort(service, protocol string) (string, error) {
// port it's using. For testing on a system with unknown ports used. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnUnusedPort(service, protocol string, timeout time.Duration) (string, error) {
unusedPortLock.Lock()
defer unusedPortLock.Unlock()
sock, err := newProxySocket(protocol, proxier.address, 0)
@ -355,6 +354,7 @@ func (proxier *Proxier) addServiceOnUnusedPort(service, protocol string) (string
protocol: protocol,
active: true,
socket: sock,
timeout: timeout,
})
proxier.startAccepting(service, sock)
return port, nil
@ -365,6 +365,9 @@ func (proxier *Proxier) startAccepting(service string, sock proxySocket) {
go sock.ProxyLoop(service, proxier)
}
// How long we leave idle UDP connections open.
const udpIdleTimeout = 1 * time.Minute
// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
@ -395,6 +398,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
protocol: service.Protocol,
active: true,
socket: sock,
timeout: udpIdleTimeout,
})
proxier.startAccepting(service.ID, sock)
}

View File

@ -146,7 +146,7 @@ func TestTCPProxy(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -164,7 +164,7 @@ func TestUDPProxy(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -182,7 +182,7 @@ func TestTCPProxyStop(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -210,7 +210,7 @@ func TestUDPProxyStop(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -238,7 +238,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -265,7 +265,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -292,7 +292,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -324,7 +324,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -356,7 +356,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -401,7 +401,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -434,3 +434,5 @@ func TestUDPProxyUpdatePort(t *testing.T) {
}
pc.Close()
}
// TODO: Test UDP timeouts.