diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index da6727d8e5..f934aca6ee 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -272,6 +272,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err loadBalancer, net.ParseIP(config.BindAddress), iptInterface, + execer, *utilnet.ParsePortRangeOrDie(config.PortRange), config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, diff --git a/pkg/proxy/BUILD b/pkg/proxy/BUILD index 2c366011ef..a21f2bd8d5 100644 --- a/pkg/proxy/BUILD +++ b/pkg/proxy/BUILD @@ -35,6 +35,7 @@ filegroup( "//pkg/proxy/healthcheck:all-srcs", "//pkg/proxy/iptables:all-srcs", "//pkg/proxy/userspace:all-srcs", + "//pkg/proxy/util:all-srcs", "//pkg/proxy/winuserspace:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 821b14da8a..4b64995719 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/features:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/healthcheck:go_default_library", + "//pkg/proxy/util:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/sysctl:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 6208dc0012..1d2bfe8cee 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -552,7 +553,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed") } - proxier.deleteServiceConnections(staleUDPServices.List()) + utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) } // Reconstruct the list of endpoint infos from the endpointIP list @@ -792,7 +793,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { endpointIP := strings.Split(epSvcPair.endpoint, ":")[0] glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) - err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") + err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it @@ -803,33 +804,6 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ } } -// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip -func (proxier *Proxier) deleteServiceConnections(svcIPs []string) { - for _, ip := range svcIPs { - glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) - err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp") - if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { - // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. - // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it - // is expensive to baby sit all udp connections to kubernetes services. - glog.Errorf("conntrack return with error: %v", err) - } - } -} - -//execConntrackTool executes conntrack tool using given parameters -func (proxier *Proxier) execConntrackTool(parameters ...string) error { - conntrackPath, err := proxier.exec.LookPath("conntrack") - if err != nil { - return fmt.Errorf("Error looking for path of conntrack: %v", err) - } - output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput() - if err != nil { - return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err) - } - return nil -} - // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held @@ -1392,10 +1366,9 @@ func (proxier *Proxier) syncProxyRules() { // https://github.com/docker/docker/issues/8795 // https://github.com/kubernetes/kubernetes/issues/31983 func (proxier *Proxier) clearUdpConntrackForPort(port int) { - var err error = nil glog.V(2).Infof("Deleting conntrack entries for udp connections") if port > 0 { - err = proxier.execConntrackTool("-D", "-p", "udp", "--dport", strconv.Itoa(port)) + err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { glog.Errorf("conntrack return with error: %v", err) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 7fbc27c1d4..6fb4532174 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -172,57 +172,6 @@ func TestGetChainLinesMultipleTables(t *testing.T) { checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) } -func TestExecConntrackTool(t *testing.T) { - fcmd := exec.FakeCmd{ - CombinedOutputScript: []exec.FakeCombinedOutputAction{ - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") - }, - }, - } - fexec := exec.FakeExec{ - CommandScript: []exec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - }, - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - - fakeProxier := Proxier{exec: &fexec} - - testCases := [][]string{ - {"-L", "-p", "udp"}, - {"-D", "-p", "udp", "-d", "10.0.240.1"}, - {"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"}, - } - - expectErr := []bool{false, false, true} - - for i := range testCases { - err := fakeProxier.execConntrackTool(testCases[i]...) - - if expectErr[i] { - if err == nil { - t.Errorf("expected err, got %v", err) - } - } else { - if err != nil { - t.Errorf("expected success, got %v", err) - } - } - - execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ") - expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " ")) - - if execCmd != expectCmd { - t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd) - } - } -} - func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo { return &serviceInfo{ sessionAffinityType: api.ServiceAffinityNone, // default @@ -296,54 +245,6 @@ func TestDeleteEndpointConnections(t *testing.T) { } } -func TestDeleteServiceConnections(t *testing.T) { - fcmd := exec.FakeCmd{ - CombinedOutputScript: []exec.FakeCombinedOutputAction{ - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, - func() ([]byte, error) { - return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") - }, - }, - } - fexec := exec.FakeExec{ - CommandScript: []exec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, - }, - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - - fakeProxier := Proxier{exec: &fexec} - - testCases := [][]string{ - { - "10.240.0.3", - "10.240.0.5", - }, - { - "10.240.0.4", - }, - } - - svcCount := 0 - for i := range testCases { - fakeProxier.deleteServiceConnections(testCases[i]) - for _, ip := range testCases[i] { - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip) - execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") - if expectCommand != execCommand { - t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) - } - svcCount += 1 - } - if svcCount != fexec.CommandCalls { - t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) - } - } -} - type fakeClosable struct { closed bool } diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index 2ed2504f08..d58780bc6f 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -23,6 +23,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/util:go_default_library", + "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/slice:go_default_library", "//vendor:github.com/golang/glog", @@ -30,6 +32,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/net", "//vendor:k8s.io/apimachinery/pkg/util/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/wait", ], ) @@ -46,6 +49,7 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/util/exec:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 7b375739b8..79b9b61a76 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -33,6 +33,9 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" + utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/iptables" ) @@ -106,6 +109,7 @@ type Proxier struct { hostIP net.IP proxyPorts PortAllocator makeProxySocket ProxySocketFunc + exec utilexec.Interface } // assert Proxier is a ProxyProvider @@ -150,15 +154,15 @@ func IsProxyLocked(err error) bool { // if iptables fails to update or acquire the initial lock. Once a proxier is // created, it will keep iptables up to date in the background and will not // terminate if a particular iptables call fails. -func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { - return NewCustomProxier(loadBalancer, listenIP, iptables, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket) +func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { + return NewCustomProxier(loadBalancer, listenIP, iptables, exec, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket) } // NewCustomProxier functions similarly to NewProxier, returing a new Proxier // for the given LoadBalancer and address. The new proxier is constructed using // the ProxySocket constructor provided, however, instead of constructing the // default ProxySockets. -func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { +func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) { return nil, ErrProxyOnLocalhost } @@ -176,10 +180,10 @@ func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptab proxyPorts := newPortAllocator(pr) glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) - return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket) + return createProxier(loadBalancer, listenIP, iptables, exec, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket) } -func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { // convenient to pass nil for tests.. if proxyPorts == nil { proxyPorts = newPortAllocator(utilnet.PortRange{}) @@ -206,6 +210,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables hostIP: hostIP, proxyPorts: proxyPorts, makeProxySocket: makeProxySocket, + exec: exec, }, nil } @@ -469,11 +474,18 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) } } + + staleUDPServices := sets.NewString() proxier.mu.Lock() defer proxier.mu.Unlock() for name, info := range proxier.serviceMap { if !activeServices[name] { glog.V(1).Infof("Stopping service %q", name) + + if proxier.serviceMap[name].protocol == api.ProtocolUDP { + staleUDPServices.Insert(proxier.serviceMap[name].portal.ip.String()) + } + err := proxier.closePortal(name, info) if err != nil { glog.Errorf("Failed to close portal for %q: %v", name, err) @@ -485,6 +497,8 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { proxier.loadBalancer.DeleteService(name) } } + + utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) } func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index c5f327667f..ed5080c202 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/util/exec" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" ) @@ -211,7 +212,9 @@ func TestTCPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -239,7 +242,9 @@ func TestUDPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -267,7 +272,9 @@ func TestUDPProxyTimeout(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -304,7 +311,9 @@ func TestMultiPortProxy(t *testing.T) { }}, }}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -333,7 +342,9 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -396,7 +407,9 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -441,7 +454,9 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -480,7 +495,9 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -518,7 +535,9 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -555,7 +574,9 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -609,7 +630,9 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -664,7 +687,9 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -713,7 +738,9 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -759,7 +786,9 @@ func TestProxyUpdatePublicIPs(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -811,7 +840,9 @@ func TestProxyUpdatePortal(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) + fexec := makeFakeExec() + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -868,4 +899,18 @@ func TestProxyUpdatePortal(t *testing.T) { waitForNumProxyLoops(t, p, 1) } +func makeFakeExec() *exec.FakeExec { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + }, + } + return &exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } +} + // TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in diff --git a/pkg/proxy/util/BUILD b/pkg/proxy/util/BUILD new file mode 100644 index 0000000000..24e16ffb3c --- /dev/null +++ b/pkg/proxy/util/BUILD @@ -0,0 +1,40 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["conntrack.go"], + tags = ["automanaged"], + deps = [ + "//pkg/util/exec:go_default_library", + "//vendor:github.com/golang/glog", + ], +) + +go_test( + name = "go_default_test", + srcs = ["conntrack_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = ["//pkg/util/exec:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/proxy/util/conntrack.go b/pkg/proxy/util/conntrack.go new file mode 100644 index 0000000000..436045ecb3 --- /dev/null +++ b/pkg/proxy/util/conntrack.go @@ -0,0 +1,58 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "strings" + + "k8s.io/kubernetes/pkg/util/exec" + + "github.com/golang/glog" +) + +// Utilities for dealing with conntrack + +const noConnectionToDelete = "0 flow entries have been deleted" + +// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries +// for the UDP connections specified by the given service IPs +func DeleteServiceConnections(execer exec.Interface, svcIPs []string) { + for _, ip := range svcIPs { + glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) + err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp") + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { + // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. + // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it + // is expensive to baby-sit all udp connections to kubernetes services. + glog.Errorf("conntrack returned error: %v", err) + } + } +} + +// ExecConntrackTool executes the conntrack tool using the given parameters +func ExecConntrackTool(execer exec.Interface, parameters ...string) error { + conntrackPath, err := execer.LookPath("conntrack") + if err != nil { + return fmt.Errorf("error looking for path of conntrack: %v", err) + } + output, err := execer.Command(conntrackPath, parameters...).CombinedOutput() + if err != nil { + return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err) + } + return nil +} diff --git a/pkg/proxy/util/conntrack_test.go b/pkg/proxy/util/conntrack_test.go new file mode 100644 index 0000000000..05729f51e3 --- /dev/null +++ b/pkg/proxy/util/conntrack_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/util/exec" +) + +func TestExecConntrackTool(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + testCases := [][]string{ + {"-L", "-p", "udp"}, + {"-D", "-p", "udp", "-d", "10.0.240.1"}, + {"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"}, + } + + expectErr := []bool{false, false, true} + + for i := range testCases { + err := ExecConntrackTool(&fexec, testCases[i]...) + + if expectErr[i] { + if err == nil { + t.Errorf("expected err, got %v", err) + } + } else { + if err != nil { + t.Errorf("expected success, got %v", err) + } + } + + execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ") + expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " ")) + + if execCmd != expectCmd { + t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd) + } + } +} + +func TestDeleteServiceConnections(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") + }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + + testCases := [][]string{ + { + "10.240.0.3", + "10.240.0.5", + }, + { + "10.240.0.4", + }, + } + + svcCount := 0 + for i := range testCases { + DeleteServiceConnections(&fexec, testCases[i]) + for _, ip := range testCases[i] { + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip) + execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") + if expectCommand != execCommand { + t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) + } + svcCount += 1 + } + if svcCount != fexec.CommandCalls { + t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) + } + } +} diff --git a/test/test_owners.csv b/test/test_owners.csv index 4f82a186a9..d7a7bdcede 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -768,6 +768,7 @@ k8s.io/kubernetes/pkg/proxy/config,ixdy,1, k8s.io/kubernetes/pkg/proxy/healthcheck,rrati,0, k8s.io/kubernetes/pkg/proxy/iptables,freehan,0, k8s.io/kubernetes/pkg/proxy/userspace,luxas,1, +k8s.io/kubernetes/pkg/proxy/util,knobunc,0, k8s.io/kubernetes/pkg/proxy/winuserspace,jbhurat,0, k8s.io/kubernetes/pkg/quota,sttts,1, k8s.io/kubernetes/pkg/quota/evaluator/core,yifan-gu,1,