diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1fb994be32..4734de066d 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -607,7 +607,13 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE for _, epSvcPair := range connectionMap { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) + nodePort := svcInfo.GetNodePort() + var err error + if nodePort != 0 { + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) + } else { + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP) + } if err != nil { klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 8386e62c0e..307c3d1b3b 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -74,6 +74,11 @@ func (info *BaseServiceInfo) GetHealthCheckNodePort() int { return info.HealthCheckNodePort } +// GetNodePort is part of the ServicePort interface. +func (info *BaseServiceInfo) GetNodePort() int { + return info.NodePort +} + func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { onlyNodeLocalEndpoints := false if apiservice.RequestsOnlyLocalTraffic(service) { diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index f38937068c..f77f9ed0f3 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -54,6 +54,8 @@ type ServicePort interface { GetProtocol() v1.Protocol // GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present. GetHealthCheckNodePort() int + // GetNodePort returns a service Node port if present. If return 0, it means not present. + GetNodePort() int } // Endpoint in an interface which abstracts information about an endpoint. diff --git a/pkg/util/conntrack/conntrack.go b/pkg/util/conntrack/conntrack.go index 353bc0d0c2..5569b411dc 100644 --- a/pkg/util/conntrack/conntrack.go +++ b/pkg/util/conntrack/conntrack.go @@ -107,3 +107,19 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1. } return nil } + +// ClearEntriesForPortNAT uses the conntrack tool to delete the contrack entries +// for connections specified by the {dest IP, port} pair. +// Known issue: +// https://github.com/kubernetes/kubernetes/issues/59368 +func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protocol v1.Protocol) error { + if port <= 0 { + return fmt.Errorf("Wrong port number. The port number must be greater then zero") + } + parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) + err := Exec(execer, parameters...) + if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { + return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) + } + return nil +} diff --git a/pkg/util/conntrack/conntrack_test.go b/pkg/util/conntrack/conntrack_test.go index 6e1c18735f..af05a65e12 100644 --- a/pkg/util/conntrack/conntrack_test.go +++ b/pkg/util/conntrack/conntrack_test.go @@ -234,3 +234,50 @@ func TestDeleteUDPConnections(t *testing.T) { t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls) } } + +func TestClearUDPConntrackForPortNAT(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + func() ([]byte, error) { + return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") + }, + func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, + } + testCases := []struct { + name string + port int + dest string + }{ + { + name: "IPv4 success", + port: 30211, + dest: "1.2.3.4", + }, + } + svcCount := 0 + for i, tc := range testCases { + err := ClearEntriesForPortNAT(&fexec, tc.dest, tc.port, v1.ProtocolUDP) + if err != nil { + t.Errorf("%s test case: unexpected error: %v", tc.name, err) + } + expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) + execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") + if expectCommand != execCommand { + t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) + } + svcCount++ + } + if svcCount != fexec.CommandCalls { + t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls) + } +}