Userspace proxy should remove conntrack entries

This changes the userspace proxy so that it cleans up its conntrack
settings when a service is removed (as the iptables proxy already
does).  This could theoretically cause problems when a UDP service
as deleted and recreated quickly (with the same IP address).  As
long as packets from the same UDP source IP and port were going to
the same destination IP and port, the the conntrack would apply and
the packets would be sent to the old destination.

This is astronomically unlikely if you did not specify the IP address
to use in the service, and even then, only happens with an "established"
UDP connection.  However, in cases where a service could be "switched"
between using the iptables proxy and the userspace proxy, this case
becomes much more frequent.
pull/6/head
Benjamin Bennett 2016-07-22 14:26:36 -04:00 committed by Solly Ross
parent 655b338256
commit 5447db3048
12 changed files with 309 additions and 150 deletions

View File

@ -272,6 +272,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
loadBalancer,
net.ParseIP(config.BindAddress),
iptInterface,
execer,
*utilnet.ParsePortRangeOrDie(config.PortRange),
config.IPTablesSyncPeriod.Duration,
config.IPTablesMinSyncPeriod.Duration,

View File

@ -35,6 +35,7 @@ filegroup(
"//pkg/proxy/healthcheck:all-srcs",
"//pkg/proxy/iptables:all-srcs",
"//pkg/proxy/userspace:all-srcs",
"//pkg/proxy/util:all-srcs",
"//pkg/proxy/winuserspace:all-srcs",
],
tags = ["automanaged"],

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/sysctl:go_default_library",

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
@ -552,7 +553,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed")
}
proxier.deleteServiceConnections(staleUDPServices.List())
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
}
// Reconstruct the list of endpoint infos from the endpointIP list
@ -792,7 +793,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
endpointIP := strings.Split(epSvcPair.endpoint, ":")[0]
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
@ -803,33 +804,6 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
}
}
// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip
func (proxier *Proxier) deleteServiceConnections(svcIPs []string) {
for _, ip := range svcIPs {
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
}
}
}
//execConntrackTool executes conntrack tool using given parameters
func (proxier *Proxier) execConntrackTool(parameters ...string) error {
conntrackPath, err := proxier.exec.LookPath("conntrack")
if err != nil {
return fmt.Errorf("Error looking for path of conntrack: %v", err)
}
output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err)
}
return nil
}
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
@ -1392,10 +1366,9 @@ func (proxier *Proxier) syncProxyRules() {
// https://github.com/docker/docker/issues/8795
// https://github.com/kubernetes/kubernetes/issues/31983
func (proxier *Proxier) clearUdpConntrackForPort(port int) {
var err error = nil
glog.V(2).Infof("Deleting conntrack entries for udp connections")
if port > 0 {
err = proxier.execConntrackTool("-D", "-p", "udp", "--dport", strconv.Itoa(port))
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
glog.Errorf("conntrack return with error: %v", err)
}

View File

@ -172,57 +172,6 @@ func TestGetChainLinesMultipleTables(t *testing.T) {
checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected)
}
func TestExecConntrackTool(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
fakeProxier := Proxier{exec: &fexec}
testCases := [][]string{
{"-L", "-p", "udp"},
{"-D", "-p", "udp", "-d", "10.0.240.1"},
{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"},
}
expectErr := []bool{false, false, true}
for i := range testCases {
err := fakeProxier.execConntrackTool(testCases[i]...)
if expectErr[i] {
if err == nil {
t.Errorf("expected err, got %v", err)
}
} else {
if err != nil {
t.Errorf("expected success, got %v", err)
}
}
execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ")
expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " "))
if execCmd != expectCmd {
t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd)
}
}
}
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo {
return &serviceInfo{
sessionAffinityType: api.ServiceAffinityNone, // default
@ -296,54 +245,6 @@ func TestDeleteEndpointConnections(t *testing.T) {
}
}
func TestDeleteServiceConnections(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
fakeProxier := Proxier{exec: &fexec}
testCases := [][]string{
{
"10.240.0.3",
"10.240.0.5",
},
{
"10.240.0.4",
},
}
svcCount := 0
for i := range testCases {
fakeProxier.deleteServiceConnections(testCases[i])
for _, ip := range testCases[i] {
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1
}
if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
}
type fakeClosable struct {
closed bool
}

View File

@ -23,6 +23,8 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/slice:go_default_library",
"//vendor:github.com/golang/glog",
@ -30,6 +32,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/net",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
],
)
@ -46,6 +49,7 @@ go_test(
deps = [
"//pkg/api:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables/testing:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",

View File

@ -33,6 +33,9 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/iptables"
)
@ -106,6 +109,7 @@ type Proxier struct {
hostIP net.IP
proxyPorts PortAllocator
makeProxySocket ProxySocketFunc
exec utilexec.Interface
}
// assert Proxier is a ProxyProvider
@ -150,15 +154,15 @@ func IsProxyLocked(err error) bool {
// if iptables fails to update or acquire the initial lock. Once a proxier is
// created, it will keep iptables up to date in the background and will not
// terminate if a particular iptables call fails.
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
return NewCustomProxier(loadBalancer, listenIP, iptables, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket)
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
return NewCustomProxier(loadBalancer, listenIP, iptables, exec, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket)
}
// NewCustomProxier functions similarly to NewProxier, returing a new Proxier
// for the given LoadBalancer and address. The new proxier is constructed using
// the ProxySocket constructor provided, however, instead of constructing the
// default ProxySockets.
func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
return nil, ErrProxyOnLocalhost
}
@ -176,10 +180,10 @@ func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptab
proxyPorts := newPortAllocator(pr)
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket)
return createProxier(loadBalancer, listenIP, iptables, exec, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket)
}
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
// convenient to pass nil for tests..
if proxyPorts == nil {
proxyPorts = newPortAllocator(utilnet.PortRange{})
@ -206,6 +210,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
hostIP: hostIP,
proxyPorts: proxyPorts,
makeProxySocket: makeProxySocket,
exec: exec,
}, nil
}
@ -469,11 +474,18 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
}
staleUDPServices := sets.NewString()
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1).Infof("Stopping service %q", name)
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
staleUDPServices.Insert(proxier.serviceMap[name].portal.ip.String())
}
err := proxier.closePortal(name, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", name, err)
@ -485,6 +497,8 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
proxier.loadBalancer.DeleteService(name)
}
}
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
}
func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool {

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/util/exec"
ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
)
@ -211,7 +212,9 @@ func TestTCPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -239,7 +242,9 @@ func TestUDPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -267,7 +272,9 @@ func TestUDPProxyTimeout(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -304,7 +311,9 @@ func TestMultiPortProxy(t *testing.T) {
}},
}})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -333,7 +342,9 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -396,7 +407,9 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -441,7 +454,9 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -480,7 +495,9 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -518,7 +535,9 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -555,7 +574,9 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -609,7 +630,9 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -664,7 +687,9 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -713,7 +738,9 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -759,7 +786,9 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -811,7 +840,9 @@ func TestProxyUpdatePortal(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -868,4 +899,18 @@ func TestProxyUpdatePortal(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
}
func makeFakeExec() *exec.FakeExec {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
},
}
return &exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
}
// TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in

40
pkg/proxy/util/BUILD Normal file
View File

@ -0,0 +1,40 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["conntrack.go"],
tags = ["automanaged"],
deps = [
"//pkg/util/exec:go_default_library",
"//vendor:github.com/golang/glog",
],
)
go_test(
name = "go_default_test",
srcs = ["conntrack_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//pkg/util/exec:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,58 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"strings"
"k8s.io/kubernetes/pkg/util/exec"
"github.com/golang/glog"
)
// Utilities for dealing with conntrack
const noConnectionToDelete = "0 flow entries have been deleted"
// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the given service IPs
func DeleteServiceConnections(execer exec.Interface, svcIPs []string) {
for _, ip := range svcIPs {
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby-sit all udp connections to kubernetes services.
glog.Errorf("conntrack returned error: %v", err)
}
}
}
// ExecConntrackTool executes the conntrack tool using the given parameters
func ExecConntrackTool(execer exec.Interface, parameters ...string) error {
conntrackPath, err := execer.LookPath("conntrack")
if err != nil {
return fmt.Errorf("error looking for path of conntrack: %v", err)
}
output, err := execer.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err)
}
return nil
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"strings"
"testing"
"k8s.io/kubernetes/pkg/util/exec"
)
func TestExecConntrackTool(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := [][]string{
{"-L", "-p", "udp"},
{"-D", "-p", "udp", "-d", "10.0.240.1"},
{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"},
}
expectErr := []bool{false, false, true}
for i := range testCases {
err := ExecConntrackTool(&fexec, testCases[i]...)
if expectErr[i] {
if err == nil {
t.Errorf("expected err, got %v", err)
}
} else {
if err != nil {
t.Errorf("expected success, got %v", err)
}
}
execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ")
expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " "))
if execCmd != expectCmd {
t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd)
}
}
}
func TestDeleteServiceConnections(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := [][]string{
{
"10.240.0.3",
"10.240.0.5",
},
{
"10.240.0.4",
},
}
svcCount := 0
for i := range testCases {
DeleteServiceConnections(&fexec, testCases[i])
for _, ip := range testCases[i] {
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1
}
if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
}

View File

@ -768,6 +768,7 @@ k8s.io/kubernetes/pkg/proxy/config,ixdy,1,
k8s.io/kubernetes/pkg/proxy/healthcheck,rrati,0,
k8s.io/kubernetes/pkg/proxy/iptables,freehan,0,
k8s.io/kubernetes/pkg/proxy/userspace,luxas,1,
k8s.io/kubernetes/pkg/proxy/util,knobunc,0,
k8s.io/kubernetes/pkg/proxy/winuserspace,jbhurat,0,
k8s.io/kubernetes/pkg/quota,sttts,1,
k8s.io/kubernetes/pkg/quota/evaluator/core,yifan-gu,1,

1 name owner auto-assigned sig
768 k8s.io/kubernetes/pkg/proxy/healthcheck rrati 0
769 k8s.io/kubernetes/pkg/proxy/iptables freehan 0
770 k8s.io/kubernetes/pkg/proxy/userspace luxas 1
771 k8s.io/kubernetes/pkg/proxy/util knobunc 0
772 k8s.io/kubernetes/pkg/proxy/winuserspace jbhurat 0
773 k8s.io/kubernetes/pkg/quota sttts 1
774 k8s.io/kubernetes/pkg/quota/evaluator/core yifan-gu 1