k3s/pkg/proxy/proxier_test.go

580 lines
18 KiB
Go
Raw Normal View History

2014-06-06 23:40:48 +00:00
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
2014-08-13 21:07:14 +00:00
"io/ioutil"
2014-06-06 23:40:48 +00:00
"net"
2014-08-13 21:07:14 +00:00
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"sync/atomic"
2014-06-06 23:40:48 +00:00
"testing"
"time"
2014-06-06 23:40:48 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
2014-11-11 06:18:01 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
2014-09-18 23:03:34 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
2014-06-06 23:40:48 +00:00
)
2014-09-18 23:03:34 +00:00
func joinHostPort(host string, port int) string {
return net.JoinHostPort(host, fmt.Sprintf("%d", port))
}
func waitForClosedPortTCP(p *Proxier, proxyPort int) error {
2014-08-04 19:35:03 +00:00
for i := 0; i < 50; i++ {
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("tcp", joinHostPort("", proxyPort))
2014-08-04 19:35:03 +00:00
if err != nil {
return nil
}
2014-09-11 16:00:06 +00:00
conn.Close()
time.Sleep(1 * time.Millisecond)
}
2014-09-18 23:03:34 +00:00
return fmt.Errorf("port %d still open", proxyPort)
2014-09-11 16:00:06 +00:00
}
2014-09-18 23:03:34 +00:00
func waitForClosedPortUDP(p *Proxier, proxyPort int) error {
2014-09-11 16:00:06 +00:00
for i := 0; i < 50; i++ {
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("udp", joinHostPort("", proxyPort))
2014-09-11 16:00:06 +00:00
if err != nil {
return nil
}
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
// To detect a closed UDP port write, then read.
_, err = conn.Write([]byte("x"))
if err != nil {
if e, ok := err.(net.Error); ok && !e.Timeout() {
return nil
}
}
var buf [4]byte
_, err = conn.Read(buf[0:])
if err != nil {
if e, ok := err.(net.Error); ok && !e.Timeout() {
return nil
}
}
conn.Close()
2014-08-04 19:35:03 +00:00
time.Sleep(1 * time.Millisecond)
}
2014-09-18 23:03:34 +00:00
return fmt.Errorf("port %d still open", proxyPort)
}
// The iptables logic has to be tested in a proper end-to-end test, so this just stubs everything out.
type fakeIptables struct{}
func (fake *fakeIptables) EnsureChain(table iptables.Table, chain iptables.Chain) (bool, error) {
return false, nil
}
func (fake *fakeIptables) DeleteChain(table iptables.Table, chain iptables.Chain) error {
return nil
}
2014-09-18 23:03:34 +00:00
func (fake *fakeIptables) FlushChain(table iptables.Table, chain iptables.Chain) error {
return nil
}
func (fake *fakeIptables) EnsureRule(table iptables.Table, chain iptables.Chain, args ...string) (bool, error) {
return false, nil
}
func (fake *fakeIptables) DeleteRule(table iptables.Table, chain iptables.Chain, args ...string) error {
return nil
2014-08-04 19:35:03 +00:00
}
2014-11-03 16:04:42 +00:00
func (fake *fakeIptables) IsIpv6() bool {
return false
}
var tcpServerPort int
var udpServerPort int
2014-08-13 21:07:14 +00:00
func init() {
2014-11-11 06:18:01 +00:00
// Don't handle panics
util.ReallyCrash = true
2014-09-11 16:00:06 +00:00
// TCP setup.
tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2014-08-13 21:07:14 +00:00
w.WriteHeader(http.StatusOK)
w.Write([]byte(r.URL.Path[1:]))
}))
2014-09-11 16:00:06 +00:00
u, err := url.Parse(tcp.URL)
2014-06-06 23:40:48 +00:00
if err != nil {
2014-08-13 21:07:14 +00:00
panic(fmt.Sprintf("failed to parse: %v", err))
}
_, port, err := net.SplitHostPort(u.Host)
2014-08-13 21:07:14 +00:00
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
2014-06-06 23:40:48 +00:00
}
tcpServerPort, err = strconv.Atoi(port)
if err != nil {
panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
}
2014-09-11 16:00:06 +00:00
// UDP setup.
udp, err := newUDPEchoServer()
if err != nil {
panic(fmt.Sprintf("failed to make a UDP server: %v", err))
}
_, port, err = net.SplitHostPort(udp.LocalAddr().String())
2014-09-11 16:00:06 +00:00
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
udpServerPort, err = strconv.Atoi(port)
if err != nil {
panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
}
2014-09-11 16:00:06 +00:00
go udp.Loop()
}
2014-09-18 23:03:34 +00:00
func testEchoTCP(t *testing.T, address string, port int) {
2014-08-13 21:07:14 +00:00
path := "aaaaa"
2014-09-18 23:03:34 +00:00
res, err := http.Get("http://" + address + ":" + fmt.Sprintf("%d", port) + "/" + path)
if err != nil {
2014-08-13 21:07:14 +00:00
t.Fatalf("error connecting to server: %v", err)
}
2014-08-13 21:07:14 +00:00
defer res.Body.Close()
data, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Errorf("error reading data: %v %v", err, string(data))
}
2014-08-13 21:07:14 +00:00
if string(data) != path {
t.Errorf("expected: %s, got %s", path, string(data))
}
}
2014-09-18 23:03:34 +00:00
func testEchoUDP(t *testing.T, address string, port int) {
2014-09-11 16:00:06 +00:00
data := "abc123"
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("udp", joinHostPort(address, port))
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error connecting to server: %v", err)
}
if _, err := conn.Write([]byte(data)); err != nil {
t.Fatalf("error sending to server: %v", err)
}
var resp [1024]byte
n, err := conn.Read(resp[0:])
if err != nil {
t.Errorf("error receiving data: %v", err)
}
if string(resp[0:n]) != data {
t.Errorf("expected: %s, got %s", data, string(resp[0:n]))
}
}
func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
var got int32
for i := 0; i < 4; i++ {
got = atomic.LoadInt32(&p.numProxyLoops)
if got == want {
return
}
time.Sleep(500 * time.Millisecond)
}
t.Errorf("expected %d ProxyLoops running, got %d", want, got)
}
func TestTCPProxy(t *testing.T) {
2014-06-06 23:40:48 +00:00
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
2014-06-06 23:40:48 +00:00
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
2014-09-18 23:03:34 +00:00
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
}
2014-09-11 16:00:06 +00:00
func TestUDPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
2014-09-11 16:00:06 +00:00
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
2014-09-11 16:00:06 +00:00
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
2014-09-11 16:00:06 +00:00
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
2014-09-18 23:03:34 +00:00
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
2014-09-11 16:00:06 +00:00
}
2014-09-20 18:38:05 +00:00
// Helper: Stops the proxy for the named service.
func stopProxyByName(proxier *Proxier, service types.NamespacedName) error {
2014-09-20 18:38:05 +00:00
info, found := proxier.getServiceInfo(service)
if !found {
return fmt.Errorf("unknown service: %s", service)
}
return proxier.stopProxy(service, info)
}
func TestTCPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
2014-06-06 23:40:48 +00:00
}
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
2014-06-06 23:40:48 +00:00
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, service)
2014-07-30 13:56:42 +00:00
// Wait for the port to really close.
2014-09-18 23:03:34 +00:00
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
2014-08-04 19:35:03 +00:00
t.Fatalf(err.Error())
2014-07-29 12:15:43 +00:00
}
waitForNumProxyLoops(t, p, 0)
}
2014-09-11 16:00:06 +00:00
func TestUDPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
2014-09-11 16:00:06 +00:00
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
2014-09-11 16:00:06 +00:00
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
2014-09-11 16:00:06 +00:00
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
2014-09-11 16:00:06 +00:00
stopProxyByName(p, service)
2014-09-11 16:00:06 +00:00
// Wait for the port to really close.
2014-09-18 23:03:34 +00:00
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
2014-09-11 16:00:06 +00:00
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
2014-09-11 16:00:06 +00:00
}
func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{})
2014-09-18 23:03:34 +00:00
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
2014-08-04 19:35:03 +00:00
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
}
2014-09-11 16:00:06 +00:00
func TestUDPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
2014-09-11 16:00:06 +00:00
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
2014-09-11 16:00:06 +00:00
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
2014-09-11 16:00:06 +00:00
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
2014-09-11 16:00:06 +00:00
p.OnUpdate([]api.Service{})
2014-09-18 23:03:34 +00:00
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
2014-09-11 16:00:06 +00:00
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
2014-09-11 16:00:06 +00:00
}
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{})
2014-09-18 23:03:34 +00:00
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
})
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo for %s", service)
}
2014-09-18 23:03:34 +00:00
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
}
2014-09-11 16:00:06 +00:00
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
2014-09-11 16:00:06 +00:00
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
2014-09-11 16:00:06 +00:00
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
2014-09-11 16:00:06 +00:00
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
2014-09-18 23:03:34 +00:00
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
2014-09-11 16:00:06 +00:00
p.OnUpdate([]api.Service{})
2014-09-18 23:03:34 +00:00
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
2014-09-11 16:00:06 +00:00
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
2014-09-11 16:00:06 +00:00
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
2014-09-11 16:00:06 +00:00
})
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
2014-09-18 23:03:34 +00:00
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
2014-09-11 16:00:06 +00:00
}
func TestTCPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "TCP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
})
// Wait for the socket to actually get free.
2014-09-18 23:03:34 +00:00
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
2014-08-04 19:35:03 +00:00
t.Fatalf(err.Error())
2014-06-06 23:40:48 +00:00
}
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
// This is a bit async, but this should be sufficient.
time.Sleep(500 * time.Millisecond)
waitForNumProxyLoops(t, p, 1)
}
2014-09-11 16:00:06 +00:00
func TestUDPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
2014-09-11 16:00:06 +00:00
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
2015-02-23 21:53:21 +00:00
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
2014-09-11 16:00:06 +00:00
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
2014-09-11 16:00:06 +00:00
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
2014-09-11 16:00:06 +00:00
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
waitForNumProxyLoops(t, p, 1)
2014-09-11 16:00:06 +00:00
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "UDP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
2014-09-11 16:00:06 +00:00
})
// Wait for the socket to actually get free.
2014-09-18 23:03:34 +00:00
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
2014-09-11 16:00:06 +00:00
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
2014-09-11 16:00:06 +00:00
}
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
2014-09-11 16:00:06 +00:00
}
2014-09-11 16:50:20 +00:00
func TestProxyUpdatePortal(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP"}, Status: api.ServiceStatus{}},
})
_, exists := p.getServiceInfo(service)
if exists {
t.Fatalf("service without portalIP should not be included in the proxy")
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", PortalIP: ""}, Status: api.ServiceStatus{}},
})
_, exists = p.getServiceInfo(service)
if exists {
t.Fatalf("service with empty portalIP should not be included in the proxy")
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", PortalIP: "None"}, Status: api.ServiceStatus{}},
})
_, exists = p.getServiceInfo(service)
if exists {
t.Fatalf("service with 'None' as portalIP should not be included in the proxy")
}
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP", PortalIP: "1.2.3.4"}, Status: api.ServiceStatus{}},
})
svcInfo, exists = p.getServiceInfo(service)
if !exists {
t.Fatalf("service with portalIP set not found in the proxy")
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
}
2014-09-11 16:50:20 +00:00
// TODO: Test UDP timeouts.