Merge pull request #1273 from thockin/proxy_udp

Proxy udp
pull/6/head
Tim Hockin 2014-09-12 16:52:49 -07:00
commit c383233c49
7 changed files with 713 additions and 160 deletions

View File

@ -111,7 +111,7 @@ type Port struct {
HostPort int `yaml:"hostPort,omitempty" json:"hostPort,omitempty"`
// Required: This must be a valid port number, 0 < x < 65536.
ContainerPort int `yaml:"containerPort" json:"containerPort"`
// Optional: Defaults to "TCP".
// Optional: Supports "TCP" and "UDP". Defaults to "TCP".
Protocol string `yaml:"protocol,omitempty" json:"protocol,omitempty"`
// Optional: What host IP to bind the external port to.
HostIP string `yaml:"hostIP,omitempty" json:"hostIP,omitempty"`
@ -389,7 +389,11 @@ func (*ServiceList) IsAnAPIObject() {}
// will answer requests sent through the proxy.
type Service struct {
JSONBase `json:",inline" yaml:",inline"`
Port int `json:"port,omitempty" yaml:"port,omitempty"`
// Required.
Port int `json:"port" yaml:"port"`
// Optional: Supports "TCP" and "UDP". Defaults to "TCP".
Protocol string `yaml:"protocol,omitempty" json:"protocol,omitempty"`
// This service's labels.
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`

View File

@ -111,7 +111,7 @@ type Port struct {
HostPort int `yaml:"hostPort,omitempty" json:"hostPort,omitempty"`
// Required: This must be a valid port number, 0 < x < 65536.
ContainerPort int `yaml:"containerPort" json:"containerPort"`
// Optional: Defaults to "TCP".
// Optional: Supports "TCP" and "UDP". Defaults to "TCP".
Protocol string `yaml:"protocol,omitempty" json:"protocol,omitempty"`
// Optional: What host IP to bind the external port to.
HostIP string `yaml:"hostIP,omitempty" json:"hostIP,omitempty"`
@ -401,7 +401,11 @@ func (*ServiceList) IsAnAPIObject() {}
// will answer requests sent through the proxy.
type Service struct {
JSONBase `json:",inline" yaml:",inline"`
Port int `json:"port,omitempty" yaml:"port,omitempty"`
// Required.
Port int `json:"port" yaml:"port"`
// Optional: Supports "TCP" and "UDP". Defaults to "TCP".
Protocol string `yaml:"protocol,omitempty" json:"protocol,omitempty"`
// This service's labels.
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`

View File

@ -282,6 +282,11 @@ func ValidateService(service *api.Service) errs.ErrorList {
if !util.IsValidPortNum(service.Port) {
allErrs = append(allErrs, errs.NewFieldInvalid("Service.Port", service.Port))
}
if len(service.Protocol) == 0 {
service.Protocol = "TCP"
} else if !supportedPortProtocols.Has(strings.ToUpper(service.Protocol)) {
allErrs = append(allErrs, errs.NewFieldNotSupported("protocol", service.Protocol))
}
if labels.Set(service.Selector).AsSelector().Empty() {
allErrs = append(allErrs, errs.NewFieldRequired("selector", service.Selector))
}

View File

@ -365,50 +365,118 @@ func TestValidatePod(t *testing.T) {
}
func TestValidateService(t *testing.T) {
// This test should fail because the port number is invalid i.e.
// the Port field has a default value of 0.
errs := ValidateService(&api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{
"foo": "bar",
testCases := []struct {
name string
svc api.Service
numErrs int
}{
{
name: "missing id",
svc: api.Service{
Port: 8675,
Selector: map[string]string{"foo": "bar"},
},
// Should fail because the ID is missing.
numErrs: 1,
},
{
name: "invalid id",
svc: api.Service{
JSONBase: api.JSONBase{ID: "123abc"},
Port: 8675,
Selector: map[string]string{"foo": "bar"},
},
// Should fail because the ID is invalid.
numErrs: 1,
},
{
name: "missing port",
svc: api.Service{
JSONBase: api.JSONBase{ID: "abc123"},
Selector: map[string]string{"foo": "bar"},
},
// Should fail because the port number is missing/invalid.
numErrs: 1,
},
{
name: "invalid port",
svc: api.Service{
JSONBase: api.JSONBase{ID: "abc123"},
Port: 65536,
Selector: map[string]string{"foo": "bar"},
},
// Should fail because the port number is invalid.
numErrs: 1,
},
{
name: "invalid protocol",
svc: api.Service{
JSONBase: api.JSONBase{ID: "abc123"},
Port: 8675,
Protocol: "INVALID",
Selector: map[string]string{"foo": "bar"},
},
// Should fail because the protocol is invalid.
numErrs: 1,
},
{
name: "missing selector",
svc: api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Port: 8675,
},
// Should fail because the selector is missing.
numErrs: 1,
},
{
name: "valid 1",
svc: api.Service{
JSONBase: api.JSONBase{ID: "abc123"},
Port: 1,
Protocol: "TCP",
Selector: map[string]string{"foo": "bar"},
},
numErrs: 0,
},
{
name: "valid 2",
svc: api.Service{
JSONBase: api.JSONBase{ID: "abc123"},
Port: 65535,
Protocol: "UDP",
Selector: map[string]string{"foo": "bar"},
},
numErrs: 0,
},
{
name: "valid 3",
svc: api.Service{
JSONBase: api.JSONBase{ID: "abc123"},
Port: 80,
Selector: map[string]string{"foo": "bar"},
},
numErrs: 0,
},
})
if len(errs) != 1 {
t.Errorf("Unexpected error list: %#v", errs)
}
errs = ValidateService(&api.Service{
for _, tc := range testCases {
errs := ValidateService(&tc.svc)
if len(errs) != tc.numErrs {
t.Errorf("Unexpected error list for case %q: %+v", tc.name, errs)
}
}
svc := api.Service{
Port: 6502,
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{
"foo": "bar",
},
})
Selector: map[string]string{"foo": "bar"},
}
errs := ValidateService(&svc)
if len(errs) != 0 {
t.Errorf("Unexpected non-zero error list: %#v", errs)
}
errs = ValidateService(&api.Service{
Port: 6502,
Selector: map[string]string{
"foo": "bar",
},
})
if len(errs) != 1 {
t.Errorf("Unexpected error list: %#v", errs)
}
errs = ValidateService(&api.Service{
Port: 6502,
JSONBase: api.JSONBase{ID: "foo"},
})
if len(errs) != 1 {
t.Errorf("Unexpected error list: %#v", errs)
}
errs = ValidateService(&api.Service{})
if len(errs) != 3 {
t.Errorf("Unexpected error list: %#v", errs)
if svc.Protocol != "TCP" {
t.Errorf("Expected default protocol of 'TCP': %#v", errs)
}
}

View File

@ -21,6 +21,7 @@ import (
"io"
"net"
"strconv"
"strings"
"sync"
"time"
@ -32,11 +33,256 @@ import (
type serviceInfo struct {
name string
port int
listener net.Listener
protocol string
socket proxySocket
timeout time.Duration
mu sync.Mutex // protects active
active bool
}
func (si *serviceInfo) isActive() bool {
si.mu.Lock()
defer si.mu.Unlock()
return si.active
}
func (si *serviceInfo) setActive(val bool) bool {
si.mu.Lock()
defer si.mu.Unlock()
tmp := si.active
si.active = val
return tmp
}
// How long we wait for a connection to a backend.
const endpointDialTimeout = 5 * time.Second
// Abstraction over TCP/UDP sockets which are proxied.
type proxySocket interface {
// Addr gets the net.Addr for a proxySocket.
Addr() net.Addr
// Close stops the proxySocket from accepting incoming connections. Each implementation should comment
// on the impact of calling Close while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, proxier *Proxier)
}
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
// no new connections are allowed but existing connections are left untouched.
type tcpProxySocket struct {
net.Listener
}
func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
info, found := proxier.getServiceInfo(service)
if !found {
glog.Errorf("Failed to find service: %s", service)
return
}
for {
if !info.isActive() {
break
}
// Block until a connection is made.
inConn, err := tcp.Accept()
if err != nil {
glog.Errorf("Accept failed: %v", err)
continue
}
glog.Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
endpoint, err := proxier.loadBalancer.NextEndpoint(service, inConn.RemoteAddr())
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
inConn.Close()
continue
}
glog.Infof("Mapped service %s to endpoint %s", service, endpoint)
// TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic.
outConn, err := net.DialTimeout("tcp", endpoint, endpointDialTimeout)
if err != nil {
// TODO: Try another endpoint?
glog.Errorf("Dial failed: %v", err)
inConn.Close()
continue
}
// Spin up an async copy loop.
proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
}
}
// proxyTCP proxies data bi-directionally between in and out.
func proxyTCP(in, out *net.TCPConn) {
glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
go copyBytes(in, out)
go copyBytes(out, in)
}
// udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called,
// no new connections are allowed and existing connections are broken.
// TODO: We could lame-duck this ourselves, if it becomes important.
type udpProxySocket struct {
*net.UDPConn
}
func (udp *udpProxySocket) Addr() net.Addr {
return udp.LocalAddr()
}
// Holds all the known UDP clients that have not timed out.
type clientCache struct {
mu sync.Mutex
clients map[string]net.Conn // addr string -> connection
}
func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
}
func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
info, found := proxier.getServiceInfo(service)
if !found {
glog.Errorf("Failed to find service: %s", service)
return
}
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
if !info.isActive() {
break
}
// Block until data arrives.
// TODO: Accumulate a histogram of n or something, to fine tune the buffer size.
n, cliAddr, err := udp.ReadFrom(buffer[0:])
if err != nil {
if e, ok := err.(net.Error); ok {
if e.Temporary() {
glog.Infof("ReadFrom had a temporary failure: %v", err)
continue
}
}
glog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err)
break
}
// If this is a client we know already, reuse the connection and goroutine.
svrConn, err := udp.getBackendConn(activeClients, cliAddr, proxier, service, info.timeout)
if err != nil {
continue
}
// TODO: It would be nice to let the goroutine handle this write, but we don't
// really want to copy the buffer. We could do a pool of buffers or something.
_, err = svrConn.Write(buffer[0:n])
if err != nil {
if !logTimeout(err) {
glog.Errorf("Write failed: %v", err)
// TODO: Maybe tear down the goroutine for this client/server pair?
}
continue
}
svrConn.SetDeadline(time.Now().Add(info.timeout))
if err != nil {
glog.Errorf("SetDeadline failed: %v", err)
continue
}
}
}
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service string, timeout time.Duration) (net.Conn, error) {
activeClients.mu.Lock()
defer activeClients.mu.Unlock()
svrConn, found := activeClients.clients[cliAddr.String()]
if !found {
// TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic.
glog.Infof("New UDP connection from %s", cliAddr)
endpoint, err := proxier.loadBalancer.NextEndpoint(service, cliAddr)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
return nil, err
}
glog.Infof("Mapped service %s to endpoint %s", service, endpoint)
svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout)
if err != nil {
// TODO: Try another endpoint?
glog.Errorf("Dial failed: %v", err)
return nil, err
}
activeClients.clients[cliAddr.String()] = svrConn
go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
defer util.HandleCrash()
udp.proxyClient(cliAddr, svrConn, activeClients, timeout)
}(cliAddr, svrConn, activeClients, timeout)
}
return svrConn, nil
}
// This function is expected to be called as a goroutine.
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
defer svrConn.Close()
var buffer [4096]byte
for {
n, err := svrConn.Read(buffer[0:])
if err != nil {
if !logTimeout(err) {
glog.Errorf("Read failed: %v", err)
}
break
}
svrConn.SetDeadline(time.Now().Add(timeout))
if err != nil {
glog.Errorf("SetDeadline failed: %v", err)
break
}
n, err = udp.WriteTo(buffer[0:n], cliAddr)
if err != nil {
if !logTimeout(err) {
glog.Errorf("WriteTo failed: %v", err)
}
break
}
}
activeClients.mu.Lock()
delete(activeClients.clients, cliAddr.String())
activeClients.mu.Unlock()
}
func logTimeout(err error) bool {
if e, ok := err.(net.Error); ok {
if e.Timeout() {
glog.Infof("connection to endpoint closed due to inactivity")
return true
}
}
return false
}
func newProxySocket(protocol string, host string, port int) (proxySocket, error) {
switch strings.ToUpper(protocol) {
case "TCP":
listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
if err != nil {
return nil, err
}
return &tcpProxySocket{listener}, nil
case "UDP":
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port)))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
return &udpProxySocket{conn}, nil
}
return nil, fmt.Errorf("Unknown protocol %q", protocol)
}
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
@ -66,14 +312,6 @@ func copyBytes(in, out *net.TCPConn) {
out.CloseWrite()
}
// proxyConnection proxies data bidirectionally between in and out.
func proxyConnection(in, out *net.TCPConn) {
glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
go copyBytes(in, out)
go copyBytes(out, in)
}
// StopProxy stops the proxy for the named service.
func (proxier *Proxier) StopProxy(service string) error {
// TODO: delete from map here?
@ -85,14 +323,11 @@ func (proxier *Proxier) StopProxy(service string) error {
}
func (proxier *Proxier) stopProxyInternal(info *serviceInfo) error {
info.mu.Lock()
defer info.mu.Unlock()
if !info.active {
if !info.setActive(false) {
return nil
}
glog.Infof("Removing service: %s", info.name)
info.active = false
return info.listener.Close()
return info.socket.Close()
}
func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) {
@ -109,57 +344,20 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
proxier.serviceMap[service] = info
}
// AcceptHandler proxies incoming connections for the specified service
// to the load-balanced service endpoints.
func (proxier *Proxier) AcceptHandler(service string, listener net.Listener) {
info, found := proxier.getServiceInfo(service)
if !found {
glog.Errorf("Failed to find service: %s", service)
return
}
for {
info.mu.Lock()
if !info.active {
info.mu.Unlock()
break
}
info.mu.Unlock()
inConn, err := listener.Accept()
if err != nil {
glog.Errorf("Accept failed: %v", err)
continue
}
glog.Infof("Accepted connection from: %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
endpoint, err := proxier.loadBalancer.NextEndpoint(service, inConn.RemoteAddr())
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
inConn.Close()
continue
}
glog.Infof("Mapped service %s to endpoint %s", service, endpoint)
outConn, err := net.DialTimeout("tcp", endpoint, time.Duration(5)*time.Second)
if err != nil {
glog.Errorf("Dial failed: %v", err)
inConn.Close()
continue
}
proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
}
}
// used to globally lock around unused ports. Only used in testing.
var unusedPortLock sync.Mutex
// addServiceOnUnusedPort starts listening for a new service, returning the
// port it's using. For testing on a system with unknown ports used.
func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) {
// port it's using. For testing on a system with unknown ports used. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnUnusedPort(service, protocol string, timeout time.Duration) (string, error) {
unusedPortLock.Lock()
defer unusedPortLock.Unlock()
l, err := net.Listen("tcp", net.JoinHostPort(proxier.address, "0"))
sock, err := newProxySocket(protocol, proxier.address, 0)
if err != nil {
return "", err
}
_, port, err := net.SplitHostPort(l.Addr().String())
_, port, err := net.SplitHostPort(sock.Addr().String())
if err != nil {
return "", err
}
@ -169,18 +367,26 @@ func (proxier *Proxier) addServiceOnUnusedPort(service string) (string, error) {
}
proxier.setServiceInfo(service, &serviceInfo{
port: portNum,
protocol: protocol,
active: true,
listener: l,
socket: sock,
timeout: timeout,
})
proxier.startAccepting(service, l)
proxier.startAccepting(service, sock)
return port, nil
}
func (proxier *Proxier) startAccepting(service string, l net.Listener) {
glog.Infof("Listening for %s on %s", service, l.Addr().String())
go proxier.AcceptHandler(service, l)
func (proxier *Proxier) startAccepting(service string, sock proxySocket) {
glog.Infof("Listening for %s on %s:%s", service, sock.Addr().Network(), sock.Addr().String())
go func(service string, proxier *Proxier) {
defer util.HandleCrash()
sock.ProxyLoop(service, proxier)
}(service, proxier)
}
// How long we leave idle UDP connections open.
const udpIdleTimeout = 1 * time.Minute
// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
@ -190,30 +396,39 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
for _, service := range services {
activeServices.Insert(service.ID)
info, exists := proxier.getServiceInfo(service.ID)
if exists && info.active && info.port == service.Port {
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.isActive() && info.port == service.Port {
continue
}
if exists && info.port != service.Port {
proxier.StopProxy(service.ID)
}
glog.Infof("Adding a new service %s on port %d", service.ID, service.Port)
listener, err := net.Listen("tcp", net.JoinHostPort(proxier.address, strconv.Itoa(service.Port)))
err := proxier.stopProxyInternal(info)
if err != nil {
glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port)
glog.Errorf("error stopping %s: %v", info.name, err)
}
}
glog.Infof("Adding a new service %s on %s port %d", service.ID, service.Protocol, service.Port)
sock, err := newProxySocket(service.Protocol, proxier.address, service.Port)
if err != nil {
glog.Errorf("Failed to get a socket for %s: %+v", service.ID, err)
continue
}
proxier.setServiceInfo(service.ID, &serviceInfo{
port: service.Port,
protocol: service.Protocol,
active: true,
listener: listener,
socket: sock,
timeout: udpIdleTimeout,
})
proxier.startAccepting(service.ID, listener)
proxier.startAccepting(service.ID, sock)
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices.Has(name) {
proxier.stopProxyInternal(info)
err := proxier.stopProxyInternal(info)
if err != nil {
glog.Errorf("error stopping %s: %v", info.name, err)
}
}
}
}

View File

@ -30,35 +30,76 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func waitForClosedPort(p *Proxier, proxyPort string) error {
func waitForClosedPortTCP(p *Proxier, proxyPort string) error {
for i := 0; i < 50; i++ {
_, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort))
if err != nil {
return nil
}
conn.Close()
time.Sleep(1 * time.Millisecond)
}
return fmt.Errorf("port %s still open", proxyPort)
}
var port string
func waitForClosedPortUDP(p *Proxier, proxyPort string) error {
for i := 0; i < 50; i++ {
conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort))
if err != nil {
return nil
}
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
// To detect a closed UDP port write, then read.
_, err = conn.Write([]byte("x"))
if err != nil {
if e, ok := err.(net.Error); ok && !e.Timeout() {
return nil
}
}
var buf [4]byte
_, err = conn.Read(buf[0:])
if err != nil {
if e, ok := err.(net.Error); ok && !e.Timeout() {
return nil
}
}
conn.Close()
time.Sleep(1 * time.Millisecond)
}
return fmt.Errorf("port %s still open", proxyPort)
}
var tcpServerPort string
var udpServerPort string
func init() {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TCP setup.
tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(r.URL.Path[1:]))
}))
u, err := url.Parse(ts.URL)
u, err := url.Parse(tcp.URL)
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
_, port, err = net.SplitHostPort(u.Host)
_, tcpServerPort, err = net.SplitHostPort(u.Host)
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
}
func testEchoConnection(t *testing.T, address, port string) {
// UDP setup.
udp, err := newUDPEchoServer()
if err != nil {
panic(fmt.Sprintf("failed to make a UDP server: %v", err))
}
_, udpServerPort, err = net.SplitHostPort(udp.LocalAddr().String())
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
go udp.Loop()
}
func testEchoTCP(t *testing.T, address, port string) {
path := "aaaaa"
res, err := http.Get("http://" + address + ":" + port + "/" + path)
if err != nil {
@ -74,27 +115,74 @@ func testEchoConnection(t *testing.T, address, port string) {
}
}
func TestProxy(t *testing.T) {
func testEchoUDP(t *testing.T, address, port string) {
data := "abc123"
conn, err := net.Dial("udp", net.JoinHostPort(address, port))
if err != nil {
t.Fatalf("error connecting to server: %v", err)
}
if _, err := conn.Write([]byte(data)); err != nil {
t.Fatalf("error sending to server: %v", err)
}
var resp [1024]byte
n, err := conn.Read(resp[0:])
if err != nil {
t.Errorf("error receiving data: %v", err)
}
if string(resp[0:n]) != data {
t.Errorf("expected: %s, got %s", data, string(resp[0:n]))
}
}
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{
{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}})
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoConnection(t, "127.0.0.1", proxyPort)
testEchoTCP(t, "127.0.0.1", proxyPort)
}
func TestProxyStop(t *testing.T) {
func TestUDPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}})
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoUDP(t, "127.0.0.1", proxyPort)
}
func TestTCPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -106,18 +194,51 @@ func TestProxyStop(t *testing.T) {
p.StopProxy("echo")
// Wait for the port to really close.
if err := waitForClosedPort(p, proxyPort); err != nil {
if err := waitForClosedPortTCP(p, proxyPort); err != nil {
t.Fatalf(err.Error())
}
}
func TestProxyUpdateDelete(t *testing.T) {
func TestUDPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}})
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
p.StopProxy("echo")
// Wait for the port to really close.
if err := waitForClosedPortUDP(p, proxyPort); err != nil {
t.Fatalf(err.Error())
}
}
func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -128,18 +249,50 @@ func TestProxyUpdateDelete(t *testing.T) {
conn.Close()
p.OnUpdate([]api.Service{})
if err := waitForClosedPort(p, proxyPort); err != nil {
if err := waitForClosedPortTCP(p, proxyPort); err != nil {
t.Fatalf(err.Error())
}
}
func TestProxyUpdateDeleteUpdate(t *testing.T) {
func TestUDPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}})
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
p.OnUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, proxyPort); err != nil {
t.Fatalf(err.Error())
}
}
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -150,23 +303,60 @@ func TestProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close()
p.OnUpdate([]api.Service{})
if err := waitForClosedPort(p, proxyPort); err != nil {
if err := waitForClosedPortTCP(p, proxyPort); err != nil {
t.Fatalf(err.Error())
}
proxyPortNum, _ := strconv.Atoi(proxyPort)
p.OnUpdate([]api.Service{
{JSONBase: api.JSONBase{ID: "echo"}, Port: proxyPortNum},
{JSONBase: api.JSONBase{ID: "echo"}, Port: proxyPortNum, Protocol: "TCP"},
})
testEchoConnection(t, "127.0.0.1", proxyPort)
testEchoTCP(t, "127.0.0.1", proxyPort)
}
func TestProxyUpdatePort(t *testing.T) {
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}})
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
p.OnUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, proxyPort); err != nil {
t.Fatalf(err.Error())
}
proxyPortNum, _ := strconv.Atoi(proxyPort)
p.OnUpdate([]api.Service{
{JSONBase: api.JSONBase{ID: "echo"}, Port: proxyPortNum, Protocol: "UDP"},
})
testEchoUDP(t, "127.0.0.1", proxyPort)
}
func TestTCPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -174,62 +364,75 @@ func TestProxyUpdatePort(t *testing.T) {
// add a new dummy listener in order to get a port that is free
l, _ := net.Listen("tcp", ":0")
_, newPort, _ := net.SplitHostPort(l.Addr().String())
portNum, _ := strconv.Atoi(newPort)
newPortNum, _ := strconv.Atoi(newPort)
l.Close()
// Wait for the socket to actually get free.
if err := waitForClosedPort(p, newPort); err != nil {
if err := waitForClosedPortTCP(p, newPort); err != nil {
t.Fatalf(err.Error())
}
if proxyPort == newPort {
t.Errorf("expected difference, got %s %s", newPort, proxyPort)
}
p.OnUpdate([]api.Service{
{JSONBase: api.JSONBase{ID: "echo"}, Port: portNum},
{JSONBase: api.JSONBase{ID: "echo"}, Port: newPortNum, Protocol: "TCP"},
})
if err := waitForClosedPort(p, proxyPort); err != nil {
if err := waitForClosedPortTCP(p, proxyPort); err != nil {
t.Fatalf(err.Error())
}
testEchoConnection(t, "127.0.0.1", newPort)
testEchoTCP(t, "127.0.0.1", newPort)
// Ensure the old port is released and re-usable.
l, err = net.Listen("tcp", net.JoinHostPort("", proxyPort))
if err != nil {
t.Fatalf("can't claim released port: %s", err)
}
l.Close()
}
func TestProxyUpdatePortLetsGoOfOldPort(t *testing.T) {
func TestUDPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
lb.OnUpdate([]api.Endpoints{{JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}})
lb.OnUpdate([]api.Endpoints{
{
JSONBase: api.JSONBase{ID: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
},
})
p := NewProxier(lb, "127.0.0.1")
proxyPort, err := p.addServiceOnUnusedPort("echo")
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
// add a new dummy listener in order to get a port that is free
l, _ := net.Listen("tcp", ":0")
_, newPort, _ := net.SplitHostPort(l.Addr().String())
portNum, _ := strconv.Atoi(newPort)
l.Close()
pc, _ := net.ListenPacket("udp", ":0")
_, newPort, _ := net.SplitHostPort(pc.LocalAddr().String())
newPortNum, _ := strconv.Atoi(newPort)
pc.Close()
// Wait for the socket to actually get free.
if err := waitForClosedPort(p, newPort); err != nil {
if err := waitForClosedPortUDP(p, newPort); err != nil {
t.Fatalf(err.Error())
}
if proxyPort == newPort {
t.Errorf("expected difference, got %s %s", newPort, proxyPort)
}
p.OnUpdate([]api.Service{
{JSONBase: api.JSONBase{ID: "echo"}, Port: portNum},
{JSONBase: api.JSONBase{ID: "echo"}, Port: newPortNum, Protocol: "UDP"},
})
if err := waitForClosedPort(p, proxyPort); err != nil {
if err := waitForClosedPortUDP(p, proxyPort); err != nil {
t.Fatalf(err.Error())
}
testEchoConnection(t, "127.0.0.1", newPort)
proxyPortNum, _ := strconv.Atoi(proxyPort)
p.OnUpdate([]api.Service{
{JSONBase: api.JSONBase{ID: "echo"}, Port: proxyPortNum},
})
if err := waitForClosedPort(p, newPort); err != nil {
t.Fatalf(err.Error())
testEchoUDP(t, "127.0.0.1", newPort)
// Ensure the old port is released and re-usable.
pc, err = net.ListenPacket("udp", net.JoinHostPort("", proxyPort))
if err != nil {
t.Fatalf("can't claim released port: %s", err)
}
testEchoConnection(t, "127.0.0.1", proxyPort)
pc.Close()
}
// TODO: Test UDP timeouts.

54
pkg/proxy/udp_server.go Normal file
View File

@ -0,0 +1,54 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
"net"
)
// udpEchoServer is a simple echo server in UDP, intended for testing the proxy.
type udpEchoServer struct {
net.PacketConn
}
func (r *udpEchoServer) Loop() {
var buffer [4096]byte
for {
n, cliAddr, err := r.ReadFrom(buffer[0:])
if err != nil {
fmt.Printf("ReadFrom failed: %#v\n", err)
continue
}
r.WriteTo(buffer[0:n], cliAddr)
}
}
func newUDPEchoServer() (*udpEchoServer, error) {
packetconn, err := net.ListenPacket("udp", ":0")
if err != nil {
return nil, err
}
return &udpEchoServer{packetconn}, nil
}
/*
func main() {
r,_ := newUDPEchoServer()
r.Loop()
}
*/