From 144280e7a7efcb15f4801b4cd8fb74d5b4311c57 Mon Sep 17 00:00:00 2001 From: Jacob Tanenbaum Date: Thu, 29 Nov 2018 11:46:12 -0500 Subject: [PATCH] Correctly Clear conntrack entrty on endpoint changes when using nodeport When using NodePort to connect to an endpoint using UDP, if the endpoint is deleted on restoration of the endpoint traffic does not flow. This happens because conntrack holds the state of the connection and the proxy does not correctly clear the conntrack entry for the stale endpoint. Introduced a new function to conntrack ClearEntriesForPortNAT that uses the endpointIP and NodePort to remove the stale conntrack entry and allow traffic to resume when the endpoint is restored. Signed-off-by: Jacob Tanenbaum --- pkg/proxy/iptables/proxier.go | 8 ++++- pkg/proxy/service.go | 5 +++ pkg/proxy/types.go | 2 ++ pkg/util/conntrack/conntrack.go | 16 ++++++++++ pkg/util/conntrack/conntrack_test.go | 47 ++++++++++++++++++++++++++++ 5 files changed, 77 insertions(+), 1 deletion(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1fb994be32..4734de066d 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -607,7 +607,13 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE for _, epSvcPair := range connectionMap { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) + nodePort := svcInfo.GetNodePort() + var err error + if nodePort != 0 { + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) + } else { + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) + } if err != nil { klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 8386e62c0e..307c3d1b3b 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -74,6 +74,11 @@ func (info *BaseServiceInfo) GetHealthCheckNodePort() int { return info.HealthCheckNodePort } +// GetNodePort is part of the ServicePort interface. +func (info *BaseServiceInfo) GetNodePort() int { + return info.NodePort +} + func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { onlyNodeLocalEndpoints := false if apiservice.RequestsOnlyLocalTraffic(service) { diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index f38937068c..f77f9ed0f3 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -54,6 +54,8 @@ type ServicePort interface { GetProtocol() v1.Protocol // GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present. GetHealthCheckNodePort() int + // GetNodePort returns a service Node port if present. If return 0, it means not present. + GetNodePort() int } // Endpoint in an interface which abstracts information about an endpoint. diff --git a/pkg/util/conntrack/conntrack.go b/pkg/util/conntrack/conntrack.go index 353bc0d0c2..5569b411dc 100644 --- a/pkg/util/conntrack/conntrack.go +++ b/pkg/util/conntrack/conntrack.go @@ -107,3 +107,19 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1. } return nil } + +// ClearEntriesForPortNAT uses the conntrack tool to delete the contrack entries +// for connections specified by the {dest IP, port} pair. +// Known issue: +// https://github.com/kubernetes/kubernetes/issues/59368 +func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protocol v1.Protocol) error { + if port <= 0 { + return fmt.Errorf("Wrong port number. The port number must be greater then zero") + } + parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) + err := Exec(execer, parameters...) + if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { + return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) + } + return nil +} diff --git a/pkg/util/conntrack/conntrack_test.go b/pkg/util/conntrack/conntrack_test.go index 6e1c18735f..af05a65e12 100644 --- a/pkg/util/conntrack/conntrack_test.go +++ b/pkg/util/conntrack/conntrack_test.go @@ -234,3 +234,50 @@ func TestDeleteUDPConnections(t *testing.T) { t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls) } } + +func TestClearUDPConntrackForPortNAT(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") + }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + testCases := []struct { + name string + port int + dest string + }{ + { + name: "IPv4 success", + port: 30211, + dest: "1.2.3.4", + }, + } + svcCount := 0 + for i, tc := range testCases { + err := ClearEntriesForPortNAT(&fexec, tc.dest, tc.port, v1.ProtocolUDP) + if err != nil { + t.Errorf("%s test case: unexpected error: %v", tc.name, err) + } + expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) + execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") + if expectCommand != execCommand { + t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) + } + svcCount++ + } + if svcCount != fexec.CommandCalls { + t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls) + } +}