From 098a4467fea04397dca83b552920f781830af96f Mon Sep 17 00:00:00 2001 From: Pavithra Ramesh Date: Wed, 31 Jan 2018 18:20:22 -0800 Subject: [PATCH] Remove conntrack entry on udp rule add. Moved conntrack util outside of proxy pkg Added warning message if conntrack binary is not found Addressed review comments. ran gofmt --- pkg/kubelet/network/hostport/BUILD | 3 ++ .../network/hostport/hostport_manager.go | 40 +++++++++++++-- .../network/hostport/hostport_manager_test.go | 2 + pkg/proxy/iptables/BUILD | 2 + pkg/proxy/iptables/proxier.go | 9 ++-- pkg/proxy/iptables/proxier_test.go | 5 +- pkg/proxy/ipvs/BUILD | 1 + pkg/proxy/ipvs/proxier.go | 11 +++-- pkg/proxy/userspace/BUILD | 2 + pkg/proxy/userspace/proxier.go | 4 +- pkg/proxy/util/BUILD | 5 -- pkg/util/BUILD | 1 + pkg/util/conntrack/BUILD | 41 ++++++++++++++++ .../util => util/conntrack}/conntrack.go | 49 ++++++++++++------- .../util => util/conntrack}/conntrack_test.go | 11 +++-- 15 files changed, 142 insertions(+), 44 deletions(-) create mode 100644 pkg/util/conntrack/BUILD rename pkg/{proxy/util => util/conntrack}/conntrack.go (66%) rename pkg/{proxy/util => util/conntrack}/conntrack_test.go (96%) diff --git a/pkg/kubelet/network/hostport/BUILD b/pkg/kubelet/network/hostport/BUILD index 4f748e75a3..839f72904c 100644 --- a/pkg/kubelet/network/hostport/BUILD +++ b/pkg/kubelet/network/hostport/BUILD @@ -17,11 +17,13 @@ go_library( importpath = "k8s.io/kubernetes/pkg/kubelet/network/hostport", deps = [ "//pkg/proxy/iptables:go_default_library", + "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/utils/exec:go_default_library", ], ) @@ -38,6 +40,7 @@ go_test( "//pkg/util/iptables:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/utils/exec:go_default_library", ], ) diff --git a/pkg/kubelet/network/hostport/hostport_manager.go b/pkg/kubelet/network/hostport/hostport_manager.go index b355dbbb2e..9d9100a014 100644 --- a/pkg/kubelet/network/hostport/hostport_manager.go +++ b/pkg/kubelet/network/hostport/hostport_manager.go @@ -26,9 +26,12 @@ import ( "sync" "github.com/golang/glog" + "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" + "k8s.io/kubernetes/pkg/util/conntrack" utiliptables "k8s.io/kubernetes/pkg/util/iptables" + "k8s.io/utils/exec" ) // HostPortManager is an interface for adding and removing hostport for a given pod sandbox. @@ -44,18 +47,26 @@ type HostPortManager interface { } type hostportManager struct { - hostPortMap map[hostport]closeable - iptables utiliptables.Interface - portOpener hostportOpener - mu sync.Mutex + hostPortMap map[hostport]closeable + execer exec.Interface + conntrackFound bool + iptables utiliptables.Interface + portOpener hostportOpener + mu sync.Mutex } func NewHostportManager(iptables utiliptables.Interface) HostPortManager { - return &hostportManager{ + h := &hostportManager{ hostPortMap: make(map[hostport]closeable), + execer: exec.New(), iptables: iptables, portOpener: openLocalPort, } + h.conntrackFound = conntrack.Exists(h.execer) + if !h.conntrackFound { + glog.Warningf("The binary conntrack is not installed, this can cause failures in network connection cleanup.") + } + return h } func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) (err error) { @@ -103,10 +114,14 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt } newChains := []utiliptables.Chain{} + conntrackPortsToRemove := []int{} for _, pm := range hostportMappings { protocol := strings.ToLower(string(pm.Protocol)) chain := getHostportChain(id, pm) newChains = append(newChains, chain) + if pm.Protocol == v1.ProtocolUDP { + conntrackPortsToRemove = append(conntrackPortsToRemove, int(pm.HostPort)) + } // Add new hostport chain writeLine(natChains, utiliptables.MakeChainLine(chain)) @@ -150,6 +165,21 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt // clean up opened host port if encounter any error return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) } + isIpv6 := conntrack.IsIPv6(podPortMapping.IP) + + // Remove conntrack entries just after adding the new iptables rules. If the conntrack entry is removed along with + // the IP tables rule, it can be the case that the packets received by the node after iptables rule removal will + // create a new conntrack entry without any DNAT. That will result in blackhole of the traffic even after correct + // iptables rules have been added back. + if hm.execer != nil && hm.conntrackFound { + glog.Infof("Starting to delete udp conntrack entries: %v, isIPv6 - %v", conntrackPortsToRemove, isIpv6) + for _, port := range conntrackPortsToRemove { + err = conntrack.ClearEntriesForPort(hm.execer, port, isIpv6, v1.ProtocolUDP) + if err != nil { + glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", port, err) + } + } + } return nil } diff --git a/pkg/kubelet/network/hostport/hostport_manager_test.go b/pkg/kubelet/network/hostport/hostport_manager_test.go index 289d3b3171..019d36e78d 100644 --- a/pkg/kubelet/network/hostport/hostport_manager_test.go +++ b/pkg/kubelet/network/hostport/hostport_manager_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/api/core/v1" utiliptables "k8s.io/kubernetes/pkg/util/iptables" + "k8s.io/utils/exec" ) func TestHostportManager(t *testing.T) { @@ -34,6 +35,7 @@ func TestHostportManager(t *testing.T) { hostPortMap: make(map[hostport]closeable), iptables: iptables, portOpener: portOpener.openFakeSocket, + execer: exec.New(), } testCases := []struct { diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 796bdcb85e..24adfcb059 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -21,6 +21,7 @@ go_library( "//pkg/proxy/metrics:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/async:go_default_library", + "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/sysctl:go_default_library", "//pkg/util/version:go_default_library", @@ -42,6 +43,7 @@ go_test( "//pkg/proxy:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/async:go_default_library", + "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//vendor/github.com/golang/glog:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 5595318257..7e49f72c7a 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" + "k8s.io/kubernetes/pkg/util/conntrack" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilversion "k8s.io/kubernetes/pkg/util/version" @@ -682,7 +683,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE for _, epSvcPair := range connectionMap { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP) + err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP(), endpointIP, v1.ProtocolUDP) if err != nil { glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } @@ -839,7 +840,7 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to cast serviceInfo %q", svcName.String()) continue } - isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP) + isIPv6 := conntrack.IsIPv6(svcInfo.clusterIP) protocol := strings.ToLower(string(svcInfo.protocol)) svcNameString = svcInfo.serviceNameString @@ -1052,7 +1053,7 @@ func (proxier *Proxier) syncProxyRules() { // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. // This only affects UDP connections, which are not common. // See issue: https://github.com/kubernetes/kubernetes/issues/49881 - err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6) + err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) if err != nil { glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err) } @@ -1376,7 +1377,7 @@ func (proxier *Proxier) syncProxyRules() { // Finish housekeeping. // TODO: these could be made more consistent. for _, svcIP := range staleServices.UnsortedList() { - if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil { + if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index d52acfd67d..046411d745 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/proxy" utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" + "k8s.io/kubernetes/pkg/util/conntrack" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" "k8s.io/utils/exec" @@ -222,7 +223,7 @@ func TestDeleteEndpointConnections(t *testing.T) { svcPort: 80, protocol: UDP, endpoint: "10.240.0.3:80", - simulatedErr: utilproxy.NoConnectionToDelete, + simulatedErr: conntrack.NoConnectionToDelete, }, { description: "V4 UDP, unexpected error, should be glogged", svcName: "v4-udp-simulated-error", @@ -328,7 +329,7 @@ func TestDeleteEndpointConnections(t *testing.T) { // Check the number of new glog errors var expGlogErrs int64 - if tc.simulatedErr != "" && tc.simulatedErr != utilproxy.NoConnectionToDelete { + if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { expGlogErrs = 1 } glogErrs := glog.Stats.Error.Lines() - priorGlogErrs diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index e6c93255ab..ae5d09d083 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -85,6 +85,7 @@ go_library( "//pkg/proxy/metrics:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/async:go_default_library", + "//pkg/util/conntrack:go_default_library", "//pkg/util/ipset:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/ipvs:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index c3933f01be..40e50680f5 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" + "k8s.io/kubernetes/pkg/util/conntrack" utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" @@ -295,7 +296,7 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps - isIPv6 := utilproxy.IsIPv6(nodeIP) + isIPv6 := conntrack.IsIPv6(nodeIP) glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) @@ -1116,8 +1117,8 @@ func (proxier *Proxier) syncProxyRules() { continue } if lp.Protocol == "udp" { - isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP) - utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6) + isIPv6 := conntrack.IsIPv6(svcInfo.clusterIP) + conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, clientv1.ProtocolUDP) } replacementPortsMap[lp] = socket } // We're holding the port, so it's OK to install ipvs rules. @@ -1349,7 +1350,7 @@ func (proxier *Proxier) syncProxyRules() { // Finish housekeeping. // TODO: these could be made more consistent. for _, svcIP := range staleServices.UnsortedList() { - if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil { + if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, clientv1.ProtocolUDP); err != nil { glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } } @@ -1363,7 +1364,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE for _, epSvcPair := range connectionMap { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP) + err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP(), endpointIP, clientv1.ProtocolUDP) if err != nil { glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index 95c29711e9..715e176fb9 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -57,9 +57,11 @@ go_library( "//pkg/apis/core/helper:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/slice:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 3d11391a95..139a366536 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -26,6 +26,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" utilnet "k8s.io/apimachinery/pkg/util/net" api "k8s.io/kubernetes/pkg/apis/core" @@ -36,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" proxyutil "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/conntrack" "k8s.io/kubernetes/pkg/util/iptables" utilexec "k8s.io/utils/exec" ) @@ -507,7 +509,7 @@ func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets. proxier.loadBalancer.DeleteService(serviceName) } for _, svcIP := range staleUDPServices.UnsortedList() { - if err := proxyutil.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil { + if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } } diff --git a/pkg/proxy/util/BUILD b/pkg/proxy/util/BUILD index 712edc76e8..681c510bdf 100644 --- a/pkg/proxy/util/BUILD +++ b/pkg/proxy/util/BUILD @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ - "conntrack.go", "endpoints.go", "port.go", "utils.go", @@ -15,14 +14,12 @@ go_library( "//pkg/apis/core/helper:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/utils/exec:go_default_library", ], ) go_test( name = "go_default_test", srcs = [ - "conntrack_test.go", "endpoints_test.go", "port_test.go", "utils_test.go", @@ -32,8 +29,6 @@ go_test( "//pkg/apis/core:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/utils/exec:go_default_library", - "//vendor/k8s.io/utils/exec/testing:go_default_library", ], ) diff --git a/pkg/util/BUILD b/pkg/util/BUILD index 67d1d92a46..1622fe7150 100644 --- a/pkg/util/BUILD +++ b/pkg/util/BUILD @@ -15,6 +15,7 @@ filegroup( "//pkg/util/bandwidth:all-srcs", "//pkg/util/config:all-srcs", "//pkg/util/configz:all-srcs", + "//pkg/util/conntrack:all-srcs", "//pkg/util/dbus:all-srcs", "//pkg/util/ebtables:all-srcs", "//pkg/util/env:all-srcs", diff --git a/pkg/util/conntrack/BUILD b/pkg/util/conntrack/BUILD new file mode 100644 index 0000000000..b71382a38d --- /dev/null +++ b/pkg/util/conntrack/BUILD @@ -0,0 +1,41 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "conntrack.go", + ], + importpath = "k8s.io/kubernetes/pkg/util/conntrack", + visibility = ["//visibility:public"], + deps = [ + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/utils/exec:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "conntrack_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/utils/exec:go_default_library", + "//vendor/k8s.io/utils/exec/testing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/proxy/util/conntrack.go b/pkg/util/conntrack/conntrack.go similarity index 66% rename from pkg/proxy/util/conntrack.go rename to pkg/util/conntrack/conntrack.go index d99d61b5db..bc08c2d6d7 100644 --- a/pkg/proxy/util/conntrack.go +++ b/pkg/util/conntrack/conntrack.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package conntrack import ( "fmt" @@ -22,22 +22,30 @@ import ( "strconv" "strings" + "k8s.io/api/core/v1" "k8s.io/utils/exec" ) // Utilities for dealing with conntrack +// NoConnectionToDelete is the error string returned by conntrack when no matching connections are found const NoConnectionToDelete = "0 flow entries have been deleted" +// IsIPv6 returns true if the given ip address is a valid ipv6 address func IsIPv6(netIP net.IP) bool { return netIP != nil && netIP.To4() == nil } +// IsIPv6String returns true if the given string is a valid ipv6 address func IsIPv6String(ip string) bool { netIP := net.ParseIP(ip) return IsIPv6(netIP) } +func protoStr(proto v1.Protocol) string { + return strings.ToLower(string(proto)) +} + func parametersWithFamily(isIPv6 bool, parameters ...string) []string { if isIPv6 { parameters = append(parameters, "-f", "ipv6") @@ -45,11 +53,11 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string { return parameters } -// ClearUDPConntrackForIP uses the conntrack tool to delete the conntrack entries +// ClearEntriesForIP uses the conntrack tool to delete the conntrack entries // for the UDP connections specified by the given service IP -func ClearUDPConntrackForIP(execer exec.Interface, ip string) error { - parameters := parametersWithFamily(IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", "udp") - err := ExecConntrackTool(execer, parameters...) +func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) error { + parameters := parametersWithFamily(IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol)) + err := Exec(execer, parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it @@ -59,8 +67,8 @@ func ClearUDPConntrackForIP(execer exec.Interface, ip string) error { return nil } -// ExecConntrackTool executes the conntrack tool using the given parameters -func ExecConntrackTool(execer exec.Interface, parameters ...string) error { +// Exec executes the conntrack tool using the given parameters +func Exec(execer exec.Interface, parameters ...string) error { conntrackPath, err := execer.LookPath("conntrack") if err != nil { return fmt.Errorf("error looking for path of conntrack: %v", err) @@ -72,29 +80,36 @@ func ExecConntrackTool(execer exec.Interface, parameters ...string) error { return nil } -// ClearUDPConntrackForPort uses the conntrack tool to delete the conntrack entries -// for the UDP connections specified by the port. +// Exists returns true if conntrack binary is installed. +func Exists(execer exec.Interface) bool { + _, err := execer.LookPath("conntrack") + return err == nil +} + +// ClearEntriesForPort uses the conntrack tool to delete the conntrack entries +// for connections specified by the port. // When a packet arrives, it will not go through NAT table again, because it is not "the first" packet. // The solution is clearing the conntrack. Known issues: // https://github.com/docker/docker/issues/8795 // https://github.com/kubernetes/kubernetes/issues/31983 -func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) error { +func ClearEntriesForPort(execer exec.Interface, port int, isIPv6 bool, protocol v1.Protocol) error { if port <= 0 { return fmt.Errorf("Wrong port number. The port number must be greater than zero") } - parameters := parametersWithFamily(isIPv6, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) - err := ExecConntrackTool(execer, parameters...) + parameters := parametersWithFamily(isIPv6, "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port)) + err := Exec(execer, parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } return nil } -// ClearUDPConntrackForPeers uses the conntrack tool to delete the conntrack entries -// for the UDP connections specified by the {origin, dest} IP pair. -func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error { - parameters := parametersWithFamily(IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp") - err := ExecConntrackTool(execer, parameters...) +// ClearEntriesForNAT uses the conntrack tool to delete the conntrack entries +// for connections specified by the {origin, dest} IP pair. +func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.Protocol) error { + parameters := parametersWithFamily(IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, + "-p", protoStr(protocol)) + err := Exec(execer, parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it diff --git a/pkg/proxy/util/conntrack_test.go b/pkg/util/conntrack/conntrack_test.go similarity index 96% rename from pkg/proxy/util/conntrack_test.go rename to pkg/util/conntrack/conntrack_test.go index f3d8a14526..c11fac76fc 100644 --- a/pkg/proxy/util/conntrack_test.go +++ b/pkg/util/conntrack/conntrack_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package conntrack import ( "fmt" @@ -22,6 +22,7 @@ import ( "strings" "testing" + "k8s.io/api/core/v1" "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" ) @@ -61,7 +62,7 @@ func TestExecConntrackTool(t *testing.T) { expectErr := []bool{false, false, true} for i := range testCases { - err := ExecConntrackTool(&fexec, testCases[i]...) + err := Exec(&fexec, testCases[i]...) if expectErr[i] { if err == nil { @@ -115,7 +116,7 @@ func TestClearUDPConntrackForIP(t *testing.T) { svcCount := 0 for _, tc := range testCases { - if err := ClearUDPConntrackForIP(&fexec, tc.ip); err != nil { + if err := ClearEntriesForIP(&fexec, tc.ip, v1.ProtocolUDP); err != nil { t.Errorf("%s test case:, Unexpected error: %v", tc.name, err) } expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(IsIPv6String(tc.ip)) @@ -160,7 +161,7 @@ func TestClearUDPConntrackForPort(t *testing.T) { } svcCount := 0 for _, tc := range testCases { - err := ClearUDPConntrackForPort(&fexec, tc.port, tc.isIPv6) + err := ClearEntriesForPort(&fexec, tc.port, tc.isIPv6, v1.ProtocolUDP) if err != nil { t.Errorf("%s test case: Unexpected error: %v", tc.name, err) } @@ -218,7 +219,7 @@ func TestDeleteUDPConnections(t *testing.T) { } svcCount := 0 for i, tc := range testCases { - err := ClearUDPConntrackForPeers(&fexec, tc.origin, tc.dest) + err := ClearEntriesForNAT(&fexec, tc.origin, tc.dest, v1.ProtocolUDP) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) }