Merge pull request #59286 from prameshj/udp-conntrack

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Delete stale UDP conntrack entries that use hostPort

**What this PR does / why we need it**:
This PR introduces a change to delete stale conntrack entries for UDP connections, specifically for udp connections that use hostPort. When the pod listening on that udp port get updated/restarted(and gets a new ip address), these entries need to be flushed so that ongoing udp connections can recover once the pod is back and the new iptables rules have been installed. 
**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #59033

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2018-02-23 19:54:08 -08:00 committed by GitHub
commit c1a73ea685
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 142 additions and 44 deletions

View File

@ -17,11 +17,13 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/network/hostport", importpath = "k8s.io/kubernetes/pkg/kubelet/network/hostport",
deps = [ deps = [
"//pkg/proxy/iptables:go_default_library", "//pkg/proxy/iptables:go_default_library",
"//pkg/util/conntrack:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
], ],
) )
@ -38,6 +40,7 @@ go_test(
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
], ],
) )

View File

@ -26,9 +26,12 @@ import (
"sync" "sync"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/util/conntrack"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/utils/exec"
) )
// HostPortManager is an interface for adding and removing hostport for a given pod sandbox. // HostPortManager is an interface for adding and removing hostport for a given pod sandbox.
@ -44,18 +47,26 @@ type HostPortManager interface {
} }
type hostportManager struct { type hostportManager struct {
hostPortMap map[hostport]closeable hostPortMap map[hostport]closeable
iptables utiliptables.Interface execer exec.Interface
portOpener hostportOpener conntrackFound bool
mu sync.Mutex iptables utiliptables.Interface
portOpener hostportOpener
mu sync.Mutex
} }
func NewHostportManager(iptables utiliptables.Interface) HostPortManager { func NewHostportManager(iptables utiliptables.Interface) HostPortManager {
return &hostportManager{ h := &hostportManager{
hostPortMap: make(map[hostport]closeable), hostPortMap: make(map[hostport]closeable),
execer: exec.New(),
iptables: iptables, iptables: iptables,
portOpener: openLocalPort, portOpener: openLocalPort,
} }
h.conntrackFound = conntrack.Exists(h.execer)
if !h.conntrackFound {
glog.Warningf("The binary conntrack is not installed, this can cause failures in network connection cleanup.")
}
return h
} }
func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) (err error) { func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInterfaceName string) (err error) {
@ -103,10 +114,14 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
} }
newChains := []utiliptables.Chain{} newChains := []utiliptables.Chain{}
conntrackPortsToRemove := []int{}
for _, pm := range hostportMappings { for _, pm := range hostportMappings {
protocol := strings.ToLower(string(pm.Protocol)) protocol := strings.ToLower(string(pm.Protocol))
chain := getHostportChain(id, pm) chain := getHostportChain(id, pm)
newChains = append(newChains, chain) newChains = append(newChains, chain)
if pm.Protocol == v1.ProtocolUDP {
conntrackPortsToRemove = append(conntrackPortsToRemove, int(pm.HostPort))
}
// Add new hostport chain // Add new hostport chain
writeLine(natChains, utiliptables.MakeChainLine(chain)) writeLine(natChains, utiliptables.MakeChainLine(chain))
@ -150,6 +165,21 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
// clean up opened host port if encounter any error // clean up opened host port if encounter any error
return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)})
} }
isIpv6 := conntrack.IsIPv6(podPortMapping.IP)
// Remove conntrack entries just after adding the new iptables rules. If the conntrack entry is removed along with
// the IP tables rule, it can be the case that the packets received by the node after iptables rule removal will
// create a new conntrack entry without any DNAT. That will result in blackhole of the traffic even after correct
// iptables rules have been added back.
if hm.execer != nil && hm.conntrackFound {
glog.Infof("Starting to delete udp conntrack entries: %v, isIPv6 - %v", conntrackPortsToRemove, isIpv6)
for _, port := range conntrackPortsToRemove {
err = conntrack.ClearEntriesForPort(hm.execer, port, isIpv6, v1.ProtocolUDP)
if err != nil {
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", port, err)
}
}
}
return nil return nil
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/utils/exec"
) )
func TestHostportManager(t *testing.T) { func TestHostportManager(t *testing.T) {
@ -34,6 +35,7 @@ func TestHostportManager(t *testing.T) {
hostPortMap: make(map[hostport]closeable), hostPortMap: make(map[hostport]closeable),
iptables: iptables, iptables: iptables,
portOpener: portOpener.openFakeSocket, portOpener: portOpener.openFakeSocket,
execer: exec.New(),
} }
testCases := []struct { testCases := []struct {

View File

@ -21,6 +21,7 @@ go_library(
"//pkg/proxy/metrics:go_default_library", "//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library", "//pkg/util/async:go_default_library",
"//pkg/util/conntrack:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/sysctl:go_default_library", "//pkg/util/sysctl:go_default_library",
"//pkg/util/version:go_default_library", "//pkg/util/version:go_default_library",
@ -42,6 +43,7 @@ go_test(
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library", "//pkg/util/async:go_default_library",
"//pkg/util/conntrack:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library", "//pkg/util/iptables/testing:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
utilversion "k8s.io/kubernetes/pkg/util/version" utilversion "k8s.io/kubernetes/pkg/util/version"
@ -682,7 +683,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
for _, epSvcPair := range connectionMap { for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP) err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP(), endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
} }
@ -838,7 +839,7 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("Failed to cast serviceInfo %q", svcName.String()) glog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue continue
} }
isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP) isIPv6 := conntrack.IsIPv6(svcInfo.clusterIP)
protocol := strings.ToLower(string(svcInfo.protocol)) protocol := strings.ToLower(string(svcInfo.protocol))
svcNameString := svcInfo.serviceNameString svcNameString := svcInfo.serviceNameString
hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 hasEndpoints := len(proxier.endpointsMap[svcName]) > 0
@ -1064,7 +1065,7 @@ func (proxier *Proxier) syncProxyRules() {
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services. // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common. // This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881 // See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6) err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
if err != nil { if err != nil {
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err) glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
} }
@ -1373,7 +1374,7 @@ func (proxier *Proxier) syncProxyRules() {
// Finish housekeeping. // Finish housekeeping.
// TODO: these could be made more consistent. // TODO: these could be made more consistent.
for _, svcIP := range staleServices.UnsortedList() { for _, svcIP := range staleServices.UnsortedList() {
if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
} }
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec" "k8s.io/utils/exec"
@ -222,7 +223,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
svcPort: 80, svcPort: 80,
protocol: UDP, protocol: UDP,
endpoint: "10.240.0.3:80", endpoint: "10.240.0.3:80",
simulatedErr: utilproxy.NoConnectionToDelete, simulatedErr: conntrack.NoConnectionToDelete,
}, { }, {
description: "V4 UDP, unexpected error, should be glogged", description: "V4 UDP, unexpected error, should be glogged",
svcName: "v4-udp-simulated-error", svcName: "v4-udp-simulated-error",
@ -328,7 +329,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
// Check the number of new glog errors // Check the number of new glog errors
var expGlogErrs int64 var expGlogErrs int64
if tc.simulatedErr != "" && tc.simulatedErr != utilproxy.NoConnectionToDelete { if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
expGlogErrs = 1 expGlogErrs = 1
} }
glogErrs := glog.Stats.Error.Lines() - priorGlogErrs glogErrs := glog.Stats.Error.Lines() - priorGlogErrs

View File

@ -85,6 +85,7 @@ go_library(
"//pkg/proxy/metrics:go_default_library", "//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library", "//pkg/util/async:go_default_library",
"//pkg/util/conntrack:go_default_library",
"//pkg/util/ipset:go_default_library", "//pkg/util/ipset:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/ipvs:go_default_library", "//pkg/util/ipvs:go_default_library",

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
utilipset "k8s.io/kubernetes/pkg/util/ipset" utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
@ -295,7 +296,7 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
isIPv6 := utilproxy.IsIPv6(nodeIP) isIPv6 := conntrack.IsIPv6(nodeIP)
glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
@ -1116,8 +1117,8 @@ func (proxier *Proxier) syncProxyRules() {
continue continue
} }
if lp.Protocol == "udp" { if lp.Protocol == "udp" {
isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP) isIPv6 := conntrack.IsIPv6(svcInfo.clusterIP)
utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6) conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, clientv1.ProtocolUDP)
} }
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules. } // We're holding the port, so it's OK to install ipvs rules.
@ -1349,7 +1350,7 @@ func (proxier *Proxier) syncProxyRules() {
// Finish housekeeping. // Finish housekeeping.
// TODO: these could be made more consistent. // TODO: these could be made more consistent.
for _, svcIP := range staleServices.UnsortedList() { for _, svcIP := range staleServices.UnsortedList() {
if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, clientv1.ProtocolUDP); err != nil {
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
} }
} }
@ -1363,7 +1364,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
for _, epSvcPair := range connectionMap { for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP) err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP(), endpointIP, clientv1.ProtocolUDP)
if err != nil { if err != nil {
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
} }

View File

@ -57,9 +57,11 @@ go_library(
"//pkg/apis/core/helper:go_default_library", "//pkg/apis/core/helper:go_default_library",
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/conntrack:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/slice:go_default_library", "//pkg/util/slice:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",

View File

@ -26,6 +26,7 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
@ -36,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
proxyutil "k8s.io/kubernetes/pkg/proxy/util" proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/conntrack"
"k8s.io/kubernetes/pkg/util/iptables" "k8s.io/kubernetes/pkg/util/iptables"
utilexec "k8s.io/utils/exec" utilexec "k8s.io/utils/exec"
) )
@ -507,7 +509,7 @@ func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets.
proxier.loadBalancer.DeleteService(serviceName) proxier.loadBalancer.DeleteService(serviceName)
} }
for _, svcIP := range staleUDPServices.UnsortedList() { for _, svcIP := range staleUDPServices.UnsortedList() {
if err := proxyutil.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
} }
} }

View File

@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"conntrack.go",
"endpoints.go", "endpoints.go",
"port.go", "port.go",
"utils.go", "utils.go",
@ -15,14 +14,12 @@ go_library(
"//pkg/apis/core/helper:go_default_library", "//pkg/apis/core/helper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
], ],
) )
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"conntrack_test.go",
"endpoints_test.go", "endpoints_test.go",
"port_test.go", "port_test.go",
"utils_test.go", "utils_test.go",
@ -32,8 +29,6 @@ go_test(
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
], ],
) )

View File

@ -15,6 +15,7 @@ filegroup(
"//pkg/util/bandwidth:all-srcs", "//pkg/util/bandwidth:all-srcs",
"//pkg/util/config:all-srcs", "//pkg/util/config:all-srcs",
"//pkg/util/configz:all-srcs", "//pkg/util/configz:all-srcs",
"//pkg/util/conntrack:all-srcs",
"//pkg/util/dbus:all-srcs", "//pkg/util/dbus:all-srcs",
"//pkg/util/ebtables:all-srcs", "//pkg/util/ebtables:all-srcs",
"//pkg/util/env:all-srcs", "//pkg/util/env:all-srcs",

41
pkg/util/conntrack/BUILD Normal file
View File

@ -0,0 +1,41 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"conntrack.go",
],
importpath = "k8s.io/kubernetes/pkg/util/conntrack",
visibility = ["//visibility:public"],
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"conntrack_test.go",
],
embed = [":go_default_library"],
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package conntrack
import ( import (
"fmt" "fmt"
@ -22,22 +22,30 @@ import (
"strconv" "strconv"
"strings" "strings"
"k8s.io/api/core/v1"
"k8s.io/utils/exec" "k8s.io/utils/exec"
) )
// Utilities for dealing with conntrack // Utilities for dealing with conntrack
// NoConnectionToDelete is the error string returned by conntrack when no matching connections are found
const NoConnectionToDelete = "0 flow entries have been deleted" const NoConnectionToDelete = "0 flow entries have been deleted"
// IsIPv6 returns true if the given ip address is a valid ipv6 address
func IsIPv6(netIP net.IP) bool { func IsIPv6(netIP net.IP) bool {
return netIP != nil && netIP.To4() == nil return netIP != nil && netIP.To4() == nil
} }
// IsIPv6String returns true if the given string is a valid ipv6 address
func IsIPv6String(ip string) bool { func IsIPv6String(ip string) bool {
netIP := net.ParseIP(ip) netIP := net.ParseIP(ip)
return IsIPv6(netIP) return IsIPv6(netIP)
} }
func protoStr(proto v1.Protocol) string {
return strings.ToLower(string(proto))
}
func parametersWithFamily(isIPv6 bool, parameters ...string) []string { func parametersWithFamily(isIPv6 bool, parameters ...string) []string {
if isIPv6 { if isIPv6 {
parameters = append(parameters, "-f", "ipv6") parameters = append(parameters, "-f", "ipv6")
@ -45,11 +53,11 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string {
return parameters return parameters
} }
// ClearUDPConntrackForIP uses the conntrack tool to delete the conntrack entries // ClearEntriesForIP uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the given service IP // for the UDP connections specified by the given service IP
func ClearUDPConntrackForIP(execer exec.Interface, ip string) error { func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) error {
parameters := parametersWithFamily(IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", "udp") parameters := parametersWithFamily(IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol))
err := ExecConntrackTool(execer, parameters...) err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
@ -59,8 +67,8 @@ func ClearUDPConntrackForIP(execer exec.Interface, ip string) error {
return nil return nil
} }
// ExecConntrackTool executes the conntrack tool using the given parameters // Exec executes the conntrack tool using the given parameters
func ExecConntrackTool(execer exec.Interface, parameters ...string) error { func Exec(execer exec.Interface, parameters ...string) error {
conntrackPath, err := execer.LookPath("conntrack") conntrackPath, err := execer.LookPath("conntrack")
if err != nil { if err != nil {
return fmt.Errorf("error looking for path of conntrack: %v", err) return fmt.Errorf("error looking for path of conntrack: %v", err)
@ -72,29 +80,36 @@ func ExecConntrackTool(execer exec.Interface, parameters ...string) error {
return nil return nil
} }
// ClearUDPConntrackForPort uses the conntrack tool to delete the conntrack entries // Exists returns true if conntrack binary is installed.
// for the UDP connections specified by the port. func Exists(execer exec.Interface) bool {
_, err := execer.LookPath("conntrack")
return err == nil
}
// ClearEntriesForPort uses the conntrack tool to delete the conntrack entries
// for connections specified by the port.
// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet. // When a packet arrives, it will not go through NAT table again, because it is not "the first" packet.
// The solution is clearing the conntrack. Known issues: // The solution is clearing the conntrack. Known issues:
// https://github.com/docker/docker/issues/8795 // https://github.com/docker/docker/issues/8795
// https://github.com/kubernetes/kubernetes/issues/31983 // https://github.com/kubernetes/kubernetes/issues/31983
func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) error { func ClearEntriesForPort(execer exec.Interface, port int, isIPv6 bool, protocol v1.Protocol) error {
if port <= 0 { if port <= 0 {
return fmt.Errorf("Wrong port number. The port number must be greater than zero") return fmt.Errorf("Wrong port number. The port number must be greater than zero")
} }
parameters := parametersWithFamily(isIPv6, "-D", "-p", "udp", "--dport", strconv.Itoa(port)) parameters := parametersWithFamily(isIPv6, "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port))
err := ExecConntrackTool(execer, parameters...) err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
} }
return nil return nil
} }
// ClearUDPConntrackForPeers uses the conntrack tool to delete the conntrack entries // ClearEntriesForNAT uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the {origin, dest} IP pair. // for connections specified by the {origin, dest} IP pair.
func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error { func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.Protocol) error {
parameters := parametersWithFamily(IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp") parameters := parametersWithFamily(IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest,
err := ExecConntrackTool(execer, parameters...) "-p", protoStr(protocol))
err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package conntrack
import ( import (
"fmt" "fmt"
@ -22,6 +22,7 @@ import (
"strings" "strings"
"testing" "testing"
"k8s.io/api/core/v1"
"k8s.io/utils/exec" "k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing" fakeexec "k8s.io/utils/exec/testing"
) )
@ -61,7 +62,7 @@ func TestExecConntrackTool(t *testing.T) {
expectErr := []bool{false, false, true} expectErr := []bool{false, false, true}
for i := range testCases { for i := range testCases {
err := ExecConntrackTool(&fexec, testCases[i]...) err := Exec(&fexec, testCases[i]...)
if expectErr[i] { if expectErr[i] {
if err == nil { if err == nil {
@ -115,7 +116,7 @@ func TestClearUDPConntrackForIP(t *testing.T) {
svcCount := 0 svcCount := 0
for _, tc := range testCases { for _, tc := range testCases {
if err := ClearUDPConntrackForIP(&fexec, tc.ip); err != nil { if err := ClearEntriesForIP(&fexec, tc.ip, v1.ProtocolUDP); err != nil {
t.Errorf("%s test case:, Unexpected error: %v", tc.name, err) t.Errorf("%s test case:, Unexpected error: %v", tc.name, err)
} }
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(IsIPv6String(tc.ip)) expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(IsIPv6String(tc.ip))
@ -160,7 +161,7 @@ func TestClearUDPConntrackForPort(t *testing.T) {
} }
svcCount := 0 svcCount := 0
for _, tc := range testCases { for _, tc := range testCases {
err := ClearUDPConntrackForPort(&fexec, tc.port, tc.isIPv6) err := ClearEntriesForPort(&fexec, tc.port, tc.isIPv6, v1.ProtocolUDP)
if err != nil { if err != nil {
t.Errorf("%s test case: Unexpected error: %v", tc.name, err) t.Errorf("%s test case: Unexpected error: %v", tc.name, err)
} }
@ -218,7 +219,7 @@ func TestDeleteUDPConnections(t *testing.T) {
} }
svcCount := 0 svcCount := 0
for i, tc := range testCases { for i, tc := range testCases {
err := ClearUDPConntrackForPeers(&fexec, tc.origin, tc.dest) err := ClearEntriesForNAT(&fexec, tc.origin, tc.dest, v1.ProtocolUDP)
if err != nil { if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err) t.Errorf("%s test case: unexpected error: %v", tc.name, err)
} }