Merge pull request #54219 from m1093782566/ipset

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Using ipset doing SNAT and packet filter in IPVS kube-proxy

**What this PR does / why we need it**:

Try ipset in ipvs proxy mode.

**Which issue this PR fixes**: 

fixes #54203

xref: #53393, #53775

**Special notes for your reviewer**:

**Release note**:

```release-note
Using ipset doing SNAT and packet filtering in IPVS kube-proxy
```

/sig network

/area kube-proxy
pull/6/head
Kubernetes Submit Queue 2017-11-19 22:09:13 -08:00 committed by GitHub
commit 3df3c580b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1729 additions and 121 deletions

View File

@ -41,6 +41,7 @@ go_library(
"//pkg/proxy/userspace:go_default_library", "//pkg/proxy/userspace:go_default_library",
"//pkg/util/configz:go_default_library", "//pkg/util/configz:go_default_library",
"//pkg/util/dbus:go_default_library", "//pkg/util/dbus:go_default_library",
"//pkg/util/ipset:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/ipvs:go_default_library", "//pkg/util/ipvs:go_default_library",
"//pkg/util/mount:go_default_library", "//pkg/util/mount:go_default_library",

View File

@ -63,6 +63,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/ipvs" "k8s.io/kubernetes/pkg/proxy/ipvs"
"k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilnode "k8s.io/kubernetes/pkg/util/node" utilnode "k8s.io/kubernetes/pkg/util/node"
@ -356,6 +357,7 @@ type ProxyServer struct {
EventClient v1core.EventsGetter EventClient v1core.EventsGetter
IptInterface utiliptables.Interface IptInterface utiliptables.Interface
IpvsInterface utilipvs.Interface IpvsInterface utilipvs.Interface
IpsetInterface utilipset.Interface
execer exec.Interface execer exec.Interface
Proxier proxy.ProxyProvider Proxier proxy.ProxyProvider
Broadcaster record.EventBroadcaster Broadcaster record.EventBroadcaster
@ -422,7 +424,7 @@ func (s *ProxyServer) Run() error {
if s.CleanupAndExit { if s.CleanupAndExit {
encounteredError := userspace.CleanupLeftovers(s.IptInterface) encounteredError := userspace.CleanupLeftovers(s.IptInterface)
encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError
encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface) || encounteredError encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface, s.IpsetInterface) || encounteredError
if encounteredError { if encounteredError {
return errors.New("encountered an error while tearing down rules.") return errors.New("encountered an error while tearing down rules.")
} }

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
utildbus "k8s.io/kubernetes/pkg/util/dbus" utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilnode "k8s.io/kubernetes/pkg/util/node" utilnode "k8s.io/kubernetes/pkg/util/node"
@ -72,6 +73,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
var iptInterface utiliptables.Interface var iptInterface utiliptables.Interface
var ipvsInterface utilipvs.Interface var ipvsInterface utilipvs.Interface
var ipsetInterface utilipset.Interface
var dbus utildbus.Interface var dbus utildbus.Interface
// Create a iptables utils. // Create a iptables utils.
@ -80,6 +82,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
dbus = utildbus.New() dbus = utildbus.New()
iptInterface = utiliptables.New(execer, dbus, protocol) iptInterface = utiliptables.New(execer, dbus, protocol)
ipvsInterface = utilipvs.New(execer) ipvsInterface = utilipvs.New(execer)
ipsetInterface = utilipset.New(execer)
// We omit creation of pretty much everything if we run in cleanup mode // We omit creation of pretty much everything if we run in cleanup mode
if cleanupAndExit { if cleanupAndExit {
@ -87,6 +90,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
execer: execer, execer: execer,
IptInterface: iptInterface, IptInterface: iptInterface,
IpvsInterface: ipvsInterface, IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
CleanupAndExit: cleanupAndExit, CleanupAndExit: cleanupAndExit,
}, nil }, nil
} }
@ -119,7 +123,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
var serviceEventHandler proxyconfig.ServiceHandler var serviceEventHandler proxyconfig.ServiceHandler
var endpointsEventHandler proxyconfig.EndpointsHandler var endpointsEventHandler proxyconfig.EndpointsHandler
proxyMode := getProxyMode(string(config.Mode), iptInterface, iptables.LinuxKernelCompatTester{}) proxyMode := getProxyMode(string(config.Mode), iptInterface, ipsetInterface, iptables.LinuxKernelCompatTester{})
if proxyMode == proxyModeIPTables { if proxyMode == proxyModeIPTables {
glog.V(0).Info("Using iptables Proxier.") glog.V(0).Info("Using iptables Proxier.")
nodeIP := net.ParseIP(config.BindAddress) nodeIP := net.ParseIP(config.BindAddress)
@ -159,12 +163,13 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
userspace.CleanupLeftovers(iptInterface) userspace.CleanupLeftovers(iptInterface)
// IPVS Proxier will generate some iptables rules, // IPVS Proxier will generate some iptables rules,
// need to clean them before switching to other proxy mode. // need to clean them before switching to other proxy mode.
ipvs.CleanupLeftovers(ipvsInterface, iptInterface) ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface)
} else if proxyMode == proxyModeIPVS { } else if proxyMode == proxyModeIPVS {
glog.V(0).Info("Using ipvs Proxier.") glog.V(0).Info("Using ipvs Proxier.")
proxierIPVS, err := ipvs.NewProxier( proxierIPVS, err := ipvs.NewProxier(
iptInterface, iptInterface,
ipvsInterface, ipvsInterface,
ipsetInterface,
utilsysctl.New(), utilsysctl.New(),
execer, execer,
config.IPVS.SyncPeriod.Duration, config.IPVS.SyncPeriod.Duration,
@ -220,7 +225,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
iptables.CleanupLeftovers(iptInterface) iptables.CleanupLeftovers(iptInterface)
// IPVS Proxier will generate some iptables rules, // IPVS Proxier will generate some iptables rules,
// need to clean them before switching to other proxy mode. // need to clean them before switching to other proxy mode.
ipvs.CleanupLeftovers(ipvsInterface, iptInterface) ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface)
} }
iptInterface.AddReloadFunc(proxier.Sync) iptInterface.AddReloadFunc(proxier.Sync)
@ -230,6 +235,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
EventClient: eventClient, EventClient: eventClient,
IptInterface: iptInterface, IptInterface: iptInterface,
IpvsInterface: ipvsInterface, IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
execer: execer, execer: execer,
Proxier: proxier, Proxier: proxier,
Broadcaster: eventBroadcaster, Broadcaster: eventBroadcaster,
@ -249,7 +255,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
}, nil }, nil
} }
func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat iptables.KernelCompatTester) string { func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string {
if proxyMode == proxyModeUserspace { if proxyMode == proxyModeUserspace {
return proxyModeUserspace return proxyModeUserspace
} }
@ -260,7 +266,7 @@ func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat i
if utilfeature.DefaultFeatureGate.Enabled(features.SupportIPVSProxyMode) { if utilfeature.DefaultFeatureGate.Enabled(features.SupportIPVSProxyMode) {
if proxyMode == proxyModeIPVS { if proxyMode == proxyModeIPVS {
return tryIPVSProxy(iptver, kcompat) return tryIPVSProxy(iptver, ipsetver, kcompat)
} else { } else {
glog.Warningf("Can't use ipvs proxier, trying iptables proxier") glog.Warningf("Can't use ipvs proxier, trying iptables proxier")
return tryIPTablesProxy(iptver, kcompat) return tryIPTablesProxy(iptver, kcompat)
@ -270,10 +276,10 @@ func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat i
return tryIPTablesProxy(iptver, kcompat) return tryIPTablesProxy(iptver, kcompat)
} }
func tryIPVSProxy(iptver iptables.IPTablesVersioner, kcompat iptables.KernelCompatTester) string { func tryIPVSProxy(iptver iptables.IPTablesVersioner, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string {
// guaranteed false on error, error only necessary for debugging // guaranteed false on error, error only necessary for debugging
// IPVS Proxier relies on iptables // IPVS Proxier relies on ipset
useIPVSProxy, err := ipvs.CanUseIPVSProxier() useIPVSProxy, err := ipvs.CanUseIPVSProxier(ipsetver)
if err != nil { if err != nil {
// Try to fallback to iptables before falling back to userspace // Try to fallback to iptables before falling back to userspace
utilruntime.HandleError(fmt.Errorf("can't determine whether to use ipvs proxy, error: %v", err)) utilruntime.HandleError(fmt.Errorf("can't determine whether to use ipvs proxy, error: %v", err))
@ -282,8 +288,6 @@ func tryIPVSProxy(iptver iptables.IPTablesVersioner, kcompat iptables.KernelComp
return proxyModeIPVS return proxyModeIPVS
} }
// TODO: Check ipvs version
// Try to fallback to iptables before falling back to userspace // Try to fallback to iptables before falling back to userspace
glog.V(1).Infof("Can't use ipvs proxier, trying iptables proxier") glog.V(1).Infof("Can't use ipvs proxier, trying iptables proxier")
return tryIPTablesProxy(iptver, kcompat) return tryIPTablesProxy(iptver, kcompat)

View File

@ -52,6 +52,15 @@ func (fake *fakeIPTablesVersioner) GetVersion() (string, error) {
return fake.version, fake.err return fake.version, fake.err
} }
type fakeIPSetVersioner struct {
version string // what to return
err error // what to return
}
func (fake *fakeIPSetVersioner) GetVersion() (string, error) {
return fake.version, fake.err
}
type fakeKernelCompatTester struct { type fakeKernelCompatTester struct {
ok bool ok bool
} }
@ -72,8 +81,10 @@ func Test_getProxyMode(t *testing.T) {
annotationKey string annotationKey string
annotationVal string annotationVal string
iptablesVersion string iptablesVersion string
ipsetVersion string
kernelCompat bool kernelCompat bool
iptablesError error iptablesError error
ipsetError error
expected string expected string
}{ }{
{ // flag says userspace { // flag says userspace
@ -128,7 +139,8 @@ func Test_getProxyMode(t *testing.T) {
for i, c := range cases { for i, c := range cases {
versioner := &fakeIPTablesVersioner{c.iptablesVersion, c.iptablesError} versioner := &fakeIPTablesVersioner{c.iptablesVersion, c.iptablesError}
kcompater := &fakeKernelCompatTester{c.kernelCompat} kcompater := &fakeKernelCompatTester{c.kernelCompat}
r := getProxyMode(c.flag, versioner, kcompater) ipsetver := &fakeIPSetVersioner{c.ipsetVersion, c.ipsetError}
r := getProxyMode(c.flag, versioner, ipsetver, kcompater)
if r != c.expected { if r != c.expected {
t.Errorf("Case[%d] Expected %q, got %q", i, c.expected, r) t.Errorf("Case[%d] Expected %q, got %q", i, c.expected, r)
} }

View File

@ -9,6 +9,7 @@ load(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"ipset_test.go",
"proxier_test.go", "proxier_test.go",
], ],
importpath = "k8s.io/kubernetes/pkg/proxy/ipvs", importpath = "k8s.io/kubernetes/pkg/proxy/ipvs",
@ -18,6 +19,8 @@ go_test(
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/ipset:go_default_library",
"//pkg/util/ipset/testing:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library", "//pkg/util/iptables/testing:go_default_library",
"//pkg/util/ipvs:go_default_library", "//pkg/util/ipvs:go_default_library",
@ -35,6 +38,7 @@ go_test(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"ipset.go",
"netlink.go", "netlink.go",
"netlink_unsupported.go", "netlink_unsupported.go",
"proxier.go", "proxier.go",
@ -55,9 +59,11 @@ go_library(
"//pkg/proxy/metrics:go_default_library", "//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library", "//pkg/util/async:go_default_library",
"//pkg/util/ipset:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/ipvs:go_default_library", "//pkg/util/ipvs:go_default_library",
"//pkg/util/sysctl:go_default_library", "//pkg/util/sysctl:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",

157
pkg/proxy/ipvs/ipset.go Normal file
View File

@ -0,0 +1,157 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
import (
"k8s.io/apimachinery/pkg/util/sets"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utilversion "k8s.io/kubernetes/pkg/util/version"
"github.com/golang/glog"
)
const (
// MinIPSetCheckVersion is the min ipset version we need. IPv6 is supported in ipset 6.x
MinIPSetCheckVersion = "6.0"
// KubeLoopBackIPSet is used to store endpoints dst ip:port, source ip for solving hairpin purpose.
KubeLoopBackIPSet = "KUBE-LOOP-BACK"
// KubeClusterIPSet is used to store service cluster ip + port for masquerade purpose.
KubeClusterIPSet = "KUBE-CLUSTER-IP"
// KubeExternalIPSet is used to store service external ip + port for masquerade and filter purpose.
KubeExternalIPSet = "KUBE-EXTERNAL-IP"
// KubeLoadBalancerSet is used to store service load balancer ingress ip + port, it is the service lb portal.
KubeLoadBalancerSet = "KUBE-LOAD-BALANCER"
// KubeLoadBalancerMasqSet is used to store service load balancer ingress ip + port for masquerade purpose.
KubeLoadBalancerMasqSet = "KUBE-LOAD-BALANCER-MASQ"
// KubeLoadBalancerSourceIPSet is used to store service load balancer ingress ip + port + source IP for packet filter purpose.
KubeLoadBalancerSourceIPSet = "KUBE-LOAD-BALANCER-SOURCE-IP"
// KubeLoadBalancerSourceCIDRSet is used to store service load balancer ingress ip + port + source cidr for packet filter purpose.
KubeLoadBalancerSourceCIDRSet = "KUBE-LOAD-BALANCER-SOURCE-CIDR"
// KubeNodePortSetTCP is used to store nodeport TCP port for masquerade purpose.
KubeNodePortSetTCP = "KUBE-NODE-PORT-TCP"
// KubeNodePortSetUDP is used to store nodeport UDP port for masquerade purpose.
KubeNodePortSetUDP = "KUBE-NODE-PORT-UDP"
)
// IPSetVersioner can query the current ipset version.
type IPSetVersioner interface {
// returns "X.Y"
GetVersion() (string, error)
}
// IPSet wraps util/ipset which is used by IPVS proxier.
type IPSet struct {
utilipset.IPSet
// activeEntries is the current active entries of the ipset.
activeEntries sets.String
// handle is the util ipset interface handle.
handle utilipset.Interface
}
// NewIPSet initialize a new IPSet struct
func NewIPSet(handle utilipset.Interface, name string, setType utilipset.Type, isIPv6 bool) *IPSet {
hashFamily := utilipset.ProtocolFamilyIPV4
if isIPv6 {
hashFamily = utilipset.ProtocolFamilyIPV6
}
set := &IPSet{
IPSet: utilipset.IPSet{
Name: name,
SetType: setType,
HashFamily: hashFamily,
},
activeEntries: sets.NewString(),
handle: handle,
}
return set
}
func (set *IPSet) isEmpty() bool {
return len(set.activeEntries.UnsortedList()) == 0
}
func (set *IPSet) resetEntries() {
set.activeEntries = sets.NewString()
}
func (set *IPSet) syncIPSetEntries() {
appliedEntries, err := set.handle.ListEntries(set.Name)
if err != nil {
glog.Errorf("Failed to list ip set entries, error: %v", err)
return
}
// currentIPSetEntries represents Endpoints watched from API Server.
currentIPSetEntries := sets.NewString()
for _, appliedEntry := range appliedEntries {
currentIPSetEntries.Insert(appliedEntry)
}
if !set.activeEntries.Equal(currentIPSetEntries) {
// Clean legacy entries
for _, entry := range currentIPSetEntries.Difference(set.activeEntries).List() {
if err := set.handle.DelEntry(entry, set.Name); err != nil {
glog.Errorf("Failed to delete ip set entry: %s from ip set: %s, error: %v", entry, set.Name, err)
} else {
glog.V(3).Infof("Successfully delete legacy ip set entry: %s from ip set: %s", entry, set.Name)
}
}
// Create active entries
for _, entry := range set.activeEntries.Difference(currentIPSetEntries).List() {
if err := set.handle.AddEntry(entry, set.Name, true); err != nil {
glog.Errorf("Failed to add entry: %v to ip set: %s, error: %v", entry, set.Name, err)
} else {
glog.V(3).Infof("Successfully add entry: %v to ip set: %s", entry, set.Name)
}
}
}
}
func ensureIPSets(ipSets ...*IPSet) error {
for _, set := range ipSets {
if err := set.handle.CreateSet(&set.IPSet, true); err != nil {
glog.Errorf("Failed to make sure ip set: %v exist, error: %v", set, err)
return err
}
}
return nil
}
// checkMinVersion checks if ipset current version satisfies required min version
func checkMinVersion(vstring string) bool {
version, err := utilversion.ParseGeneric(vstring)
if err != nil {
glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err)
return false
}
minVersion, err := utilversion.ParseGeneric(MinIPSetCheckVersion)
if err != nil {
glog.Errorf("MinCheckVersion (%s) is not a valid version string: %v", MinIPSetCheckVersion, err)
return false
}
return !version.LessThan(minVersion)
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
import (
"testing"
)
func TestCheckIPSetVersion(t *testing.T) {
testCases := []struct {
vstring string
valid bool
}{
// version less than "6.0" is not valid.
{"4.0", false},
{"5.1", false},
{"5.1.2", false},
// "7" is not a valid version string.
{"7", false},
{"6.0", true},
{"6.1", true},
{"6.19", true},
{"7.0", true},
{"8.1.2", true},
{"9.3.4.0", true},
{"total junk", false},
}
for i := range testCases {
valid := checkMinVersion(testCases[i].vstring)
if testCases[i].valid != valid {
t.Errorf("Expected result: %v, Got result: %v", testCases[i].valid, valid)
}
}
}

View File

@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
@ -59,6 +60,12 @@ const (
// kubeServicesChain is the services portal chain // kubeServicesChain is the services portal chain
kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
// KubeServiceIPSetsChain is the services access IP chain
KubeServiceIPSetsChain utiliptables.Chain = "KUBE-SVC-IPSETS"
// KubeFireWallChain is the kubernetes firewall chain.
KubeFireWallChain utiliptables.Chain = "KUBE-FIRE-WALL"
// kubePostroutingChain is the kubernetes postrouting chain // kubePostroutingChain is the kubernetes postrouting chain
kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
@ -67,11 +74,10 @@ const (
// KubeMarkDropChain is the mark-for-drop chain // KubeMarkDropChain is the mark-for-drop chain
KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
)
const (
// DefaultScheduler is the default ipvs scheduler algorithm - round robin. // DefaultScheduler is the default ipvs scheduler algorithm - round robin.
DefaultScheduler = "rr" DefaultScheduler = "rr"
// DefaultDummyDevice is the default dummy interface where ipvs service address will bind to it. // DefaultDummyDevice is the default dummy interface where ipvs service address will bind to it.
DefaultDummyDevice = "kube-ipvs0" DefaultDummyDevice = "kube-ipvs0"
) )
@ -117,6 +123,7 @@ type Proxier struct {
minSyncPeriod time.Duration minSyncPeriod time.Duration
iptables utiliptables.Interface iptables utiliptables.Interface
ipvs utilipvs.Interface ipvs utilipvs.Interface
ipset utilipset.Interface
exec utilexec.Interface exec utilexec.Interface
masqueradeAll bool masqueradeAll bool
masqueradeMark string masqueradeMark string
@ -137,6 +144,26 @@ type Proxier struct {
natRules *bytes.Buffer natRules *bytes.Buffer
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
netlinkHandle NetLinkHandle netlinkHandle NetLinkHandle
// loopbackSet is the ipset where stores all endpoints IP:Port,IP for solving hairpin mode purpose.
loopbackSet *IPSet
// clusterIPSet is the ipset where stores all service ClusterIP:Port
clusterIPSet *IPSet
// nodePortSetTCP is the bitmap:port type ipset where stores all TCP node port
nodePortSetTCP *IPSet
// nodePortSetTCP is the bitmap:port type ipset where stores all UDP node port
nodePortSetUDP *IPSet
// externalIPSet is the hash:ip,port type ipset where stores all service ExternalIP:Port
externalIPSet *IPSet
// lbIngressSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port.
lbIngressSet *IPSet
// lbMasqSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port which needs masquerade.
lbMasqSet *IPSet
// lbWhiteListIPSet is the hash:ip,port,ip type ipset where stores all service load balancer ingress IP:Port,sourceIP pair, any packets
// with the source IP visit ingress IP:Port can pass through.
lbWhiteListIPSet *IPSet
// lbWhiteListIPSet is the hash:ip,port,net type ipset where stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets
// from the source CIDR visit ingress IP:Port can pass through.
lbWhiteListCIDRSet *IPSet
} }
// IPGetter helps get node network interface IP // IPGetter helps get node network interface IP
@ -184,7 +211,9 @@ var _ proxy.ProxyProvider = &Proxier{}
// An error will be returned if it fails to update or acquire the initial lock. // An error will be returned if it fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
// will not terminate if a particular iptables or ipvs call fails. // will not terminate if a particular iptables or ipvs call fails.
func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, func NewProxier(ipt utiliptables.Interface,
ipvs utilipvs.Interface,
ipset utilipset.Interface,
sysctl utilsysctl.Interface, sysctl utilsysctl.Interface,
exec utilexec.Interface, exec utilexec.Interface,
syncPeriod time.Duration, syncPeriod time.Duration,
@ -240,32 +269,46 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
isIPv6 := utilproxy.IsIPv6(nodeIP)
glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(hostname), endpointsChanges: newEndpointsChangeMap(hostname),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod, minSyncPeriod: minSyncPeriod,
iptables: ipt, iptables: ipt,
masqueradeAll: masqueradeAll, masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark, masqueradeMark: masqueradeMark,
exec: exec, exec: exec,
clusterCIDR: clusterCIDR, clusterCIDR: clusterCIDR,
hostname: hostname, hostname: hostname,
nodeIP: nodeIP, nodeIP: nodeIP,
portMapper: &listenPortOpener{}, portMapper: &listenPortOpener{},
recorder: recorder, recorder: recorder,
healthChecker: healthChecker, healthChecker: healthChecker,
healthzServer: healthzServer, healthzServer: healthzServer,
ipvs: ipvs, ipvs: ipvs,
ipvsScheduler: scheduler, ipvsScheduler: scheduler,
ipGetter: &realIPGetter{}, ipGetter: &realIPGetter{},
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(), netlinkHandle: NewNetLinkHandle(),
ipset: ipset,
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6),
lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, isIPv6),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
} }
burstSyncs := 2 burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
@ -477,6 +520,11 @@ func (e *endpointsInfo) IPPart() string {
return utilproxy.IPPart(e.endpoint) return utilproxy.IPPart(e.endpoint)
} }
// PortPart returns just the Port part of the endpoint.
func (e *endpointsInfo) PortPart() (int, error) {
return utilproxy.PortPart(e.endpoint)
}
type endpointServicePair struct { type endpointServicePair struct {
endpoint string endpoint string
servicePortName proxy.ServicePortName servicePortName proxy.ServicePortName
@ -644,7 +692,7 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
// This is determined by checking if all the required kernel modules can be loaded. It may // This is determined by checking if all the required kernel modules can be loaded. It may
// return an error if it fails to get the kernel modules information without error, in which // return an error if it fails to get the kernel modules information without error, in which
// case it will also return false. // case it will also return false.
func CanUseIPVSProxier() (bool, error) { func CanUseIPVSProxier(ipsetver IPSetVersioner) (bool, error) {
// Try to load IPVS required kernel modules using modprobe // Try to load IPVS required kernel modules using modprobe
for _, kmod := range ipvsModules { for _, kmod := range ipvsModules {
err := utilexec.New().Command("modprobe", "--", kmod).Run() err := utilexec.New().Command("modprobe", "--", kmod).Run()
@ -669,6 +717,15 @@ func CanUseIPVSProxier() (bool, error) {
if len(modules) != 0 { if len(modules) != 0 {
return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", modules) return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", modules)
} }
// Check ipset version
versionString, err := ipsetver.GetVersion()
if err != nil {
return false, fmt.Errorf("error getting ipset version, error: %v", err)
}
if !checkMinVersion(versionString) {
return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion)
}
return true, nil return true, nil
} }
@ -720,7 +777,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
natRules := bytes.NewBuffer(nil) natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat") writeLine(natChains, "*nat")
// Start with chains we know we need to remove. // Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeMarkMasqChain} { for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeMarkMasqChain, KubeServiceIPSetsChain} {
if _, found := existingNATChains[chain]; found { if _, found := existingNATChains[chain]; found {
chainString := string(chain) chainString := string(chain)
writeLine(natChains, existingNATChains[chain]) // flush writeLine(natChains, existingNATChains[chain]) // flush
@ -740,7 +797,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
} }
// CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier. // CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier.
func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface) (encounteredError bool) { func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) {
// Return immediately when ipvs interface is nil - Probably initialization failed in somewhere. // Return immediately when ipvs interface is nil - Probably initialization failed in somewhere.
if ipvs == nil { if ipvs == nil {
return true return true
@ -760,6 +817,16 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface) (enco
} }
// Clear iptables created by ipvs Proxier. // Clear iptables created by ipvs Proxier.
encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
// Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
// iptables since we can NOT delete ip set which is still referenced by iptables.
ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP,
KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, KubeLoadBalancerMasqSet}
for _, set := range ipSetsToDestroy {
err = ipset.DestroySet(set)
if err != nil {
encounteredError = true
}
}
return encounteredError return encounteredError
} }
@ -949,6 +1016,16 @@ func (proxier *Proxier) syncProxyRules() {
return return
} }
// make sure ip sets exists in the system.
ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP,
proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet}
if err := ensureIPSets(ipSets...); err != nil {
return
}
for i := range ipSets {
ipSets[i].resetEntries()
}
// Accumulate the set of local ports that we will be holding open once this update is complete // Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
// activeIPVSServices represents IPVS service successfully created in this round of sync // activeIPVSServices represents IPVS service successfully created in this round of sync
@ -968,6 +1045,17 @@ func (proxier *Proxier) syncProxyRules() {
// is just for efficiency, not correctness. // is just for efficiency, not correctness.
args := make([]string, 64) args := make([]string, 64)
// Kube service portal
if err := proxier.linkKubeServiceChain(existingNATChains, proxier.natChains); err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
return
}
// Kube service ipset
if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
return
}
// Build IPVS rules for each service. // Build IPVS rules for each service.
for svcName, svcInfo := range proxier.serviceMap { for svcName, svcInfo := range proxier.serviceMap {
protocol := strings.ToLower(string(svcInfo.protocol)) protocol := strings.ToLower(string(svcInfo.protocol))
@ -975,7 +1063,41 @@ func (proxier *Proxier) syncProxyRules() {
// to ServicePortName.String() show up in CPU profiles. // to ServicePortName.String() show up in CPU profiles.
svcNameString := svcName.String() svcNameString := svcName.String()
// Handle traffic that loops back to the originator with SNAT.
for _, ep := range proxier.endpointsMap[svcName] {
epIP := ep.IPPart()
epPort, err := ep.PortPart()
// Error parsing this endpoint has been logged. Skip to next endpoint.
if epIP == "" || err != nil {
continue
}
entry := &utilipset.Entry{
IP: epIP,
Port: epPort,
Protocol: protocol,
IP2: epIP,
SetType: utilipset.HashIPPortIP,
}
proxier.loopbackSet.activeEntries.Insert(entry.String())
}
// Capture the clusterIP. // Capture the clusterIP.
// ipset call
entry := &utilipset.Entry{
IP: svcInfo.clusterIP.String(),
Port: svcInfo.port,
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
// Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified.
if proxier.masqueradeAll {
proxier.clusterIPSet.activeEntries.Insert(entry.String())
} else if len(proxier.clusterCIDR) > 0 {
proxier.clusterIPSet.activeEntries.Insert(entry.String())
}
// ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: svcInfo.clusterIP, Address: svcInfo.clusterIP,
Port: uint16(svcInfo.port), Port: uint16(svcInfo.port),
@ -996,32 +1118,6 @@ func (proxier *Proxier) syncProxyRules() {
} else { } else {
glog.Errorf("Failed to sync service: %v, err: %v", serv, err) glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
} }
// Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.clusterIP),
"--dport", strconv.Itoa(svcInfo.port),
)
if proxier.masqueradeAll {
err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
}
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
} else if len(proxier.clusterCIDR) > 0 {
// This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this.
err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
}
writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
}
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.externalIPs { for _, externalIP := range svcInfo.externalIPs {
@ -1056,6 +1152,17 @@ func (proxier *Proxier) syncProxyRules() {
} }
} // We're holding the port, so it's OK to install IPVS rules. } // We're holding the port, so it's OK to install IPVS rules.
// ipset call
entry := &utilipset.Entry{
IP: externalIP,
Port: svcInfo.port,
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// We have to SNAT packets to external IPs.
proxier.externalIPSet.activeEntries.Insert(entry.String())
// ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: net.ParseIP(externalIP), Address: net.ParseIP(externalIP),
Port: uint16(svcInfo.port), Port: uint16(svcInfo.port),
@ -1080,25 +1187,39 @@ func (proxier *Proxier) syncProxyRules() {
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, ingress := range svcInfo.loadBalancerStatus.Ingress { for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
if ingress.IP != "" { if ingress.IP != "" {
// ipset call
entry = &utilipset.Entry{
IP: ingress.IP,
Port: svcInfo.port,
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if !svcInfo.onlyNodeLocalEndpoints {
proxier.lbMasqSet.activeEntries.Insert(entry.String())
}
if len(svcInfo.loadBalancerSourceRanges) != 0 { if len(svcInfo.loadBalancerSourceRanges) != 0 {
err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
}
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips. // This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
args = append(args[:0], proxier.lbIngressSet.activeEntries.Insert(entry.String())
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", string(svcInfo.protocol), "-p", string(svcInfo.protocol),
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)),
"--dport", fmt.Sprintf("%d", svcInfo.port),
)
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.loadBalancerSourceRanges { for _, src := range svcInfo.loadBalancerSourceRanges {
writeLine(proxier.natRules, append(args, "-s", src, "-j", "ACCEPT")...) // ipset call
entry = &utilipset.Entry{
IP: ingress.IP,
Port: svcInfo.port,
Protocol: protocol,
Net: src,
SetType: utilipset.HashIPPortNet,
}
// enumerate all white list source cidr
proxier.lbWhiteListCIDRSet.activeEntries.Insert(entry.String())
// ignore error because it has been validated // ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src) _, cidr, _ := net.ParseCIDR(src)
if cidr.Contains(proxier.nodeIP) { if cidr.Contains(proxier.nodeIP) {
@ -1109,14 +1230,19 @@ func (proxier *Proxier) syncProxyRules() {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host. // Need to add the following rule to allow request on host.
if allowFromNode { if allowFromNode {
writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", "ACCEPT")...) entry = &utilipset.Entry{
IP: ingress.IP,
Port: svcInfo.port,
Protocol: protocol,
IP2: ingress.IP,
SetType: utilipset.HashIPPortIP,
}
// enumerate all white list source ip
proxier.lbWhiteListIPSet.activeEntries.Insert(entry.String())
} }
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go through the firewall, then DROP it.
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
} }
// ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: net.ParseIP(ingress.IP), Address: net.ParseIP(ingress.IP),
Port: uint16(svcInfo.port), Port: uint16(svcInfo.port),
@ -1162,12 +1288,33 @@ func (proxier *Proxier) syncProxyRules() {
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules. } // We're holding the port, so it's OK to install ipvs rules.
// Nodeports need SNAT, unless they're local.
// ipset call
if !svcInfo.onlyNodeLocalEndpoints {
entry = &utilipset.Entry{
// No need to provide ip info
Port: svcInfo.nodePort,
Protocol: protocol,
SetType: utilipset.BitmapPort,
}
switch protocol {
case "tcp":
proxier.nodePortSetTCP.activeEntries.Insert(entry.String())
case "udp":
proxier.nodePortSetUDP.activeEntries.Insert(entry.String())
default:
// It should never hit
glog.Errorf("Unsupported protocol type: %s", protocol)
}
}
// Build ipvs kernel routes for each node ip address // Build ipvs kernel routes for each node ip address
nodeIPs, err := proxier.ipGetter.NodeIPs() nodeIPs, err := proxier.ipGetter.NodeIPs()
if err != nil { if err != nil {
glog.Errorf("Failed to get node IP, err: %v", err) glog.Errorf("Failed to get node IP, err: %v", err)
} else { } else {
for _, nodeIP := range nodeIPs { for _, nodeIP := range nodeIPs {
// ipvs call
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
Address: nodeIP, Address: nodeIP,
Port: uint16(svcInfo.nodePort), Port: uint16(svcInfo.nodePort),
@ -1192,6 +1339,119 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
// sync ipset entries
ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.lbMasqSet, proxier.nodePortSetTCP,
proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet}
for i := range ipsetsToSync {
ipsetsToSync[i].syncIPSetEntries()
}
// Tail call iptables rules for ipset, make sure only call iptables once
// in a single loop per ip set.
if !proxier.loopbackSet.isEmpty() {
args = append(args[:0],
"-A", string(kubePostroutingChain),
"-m", "set", "--match-set", proxier.loopbackSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "MASQUERADE")...)
}
if !proxier.clusterIPSet.isEmpty() {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.clusterIPSet.Name,
"dst,dst",
)
if proxier.masqueradeAll {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
} else if len(proxier.clusterCIDR) > 0 {
// This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this.
writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
}
}
if !proxier.externalIPSet.isEmpty() {
// Build masquerade rules for packets to external IPs.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.externalIPSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
// nor from a local process to be forwarded to the service.
// This rule roughly translates to "all traffic from off-machine".
// This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
externalTrafficOnlyArgs := append(args,
"-m", "physdev", "!", "--physdev-is-in",
"-m", "addrtype", "!", "--src-type", "LOCAL")
writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...)
dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
// This covers cases like GCE load-balancers which get added to the local routing table.
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
}
if !proxier.lbMasqSet.isEmpty() {
// Build masquerade rules for packets which cross node visit load balancer ingress IPs.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.lbMasqSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() {
// link kube-services chain -> kube-fire-wall chain
args := []string{"-m", "set", "--match-set", proxier.lbIngressSet.Name, "dst,dst", "-j", string(KubeFireWallChain)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, kubeServicesChain, args...); err != nil {
glog.Errorf("Failed to ensure that ipset %s chain %s jumps to %s: %v", proxier.lbIngressSet.Name, kubeServicesChain, KubeFireWallChain, err)
}
if !proxier.lbWhiteListCIDRSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
if !proxier.lbWhiteListIPSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
args = append(args[:0],
"-A", string(KubeFireWallChain),
)
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go thru the firewall, then mark it for DROP
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
}
if !proxier.nodePortSetTCP.isEmpty() {
// Build masquerade rules for packets which cross node visit nodeport.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "tcp", "-p", "tcp",
"-m", "set", "--match-set", proxier.nodePortSetTCP.Name,
"dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if !proxier.nodePortSetUDP.isEmpty() {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "udp", "-p", "udp",
"-m", "set", "--match-set", proxier.nodePortSetUDP.Name,
"dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
// Write the end-of-table markers. // Write the end-of-table markers.
writeLine(proxier.natRules, "COMMIT") writeLine(proxier.natRules, "COMMIT")
@ -1403,7 +1663,6 @@ func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, curre
} }
// linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT // linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT
// If not specify masqueradeAll or clusterCIDR or LB source range, won't create them.
// Chain PREROUTING (policy ACCEPT) // Chain PREROUTING (policy ACCEPT)
// target prot opt source destination // target prot opt source destination
@ -1443,6 +1702,55 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables.
return nil return nil
} }
//// linkKubeIPSetsChain will Create chain KUBE-SVC-IPSETS and link the chin in KUBE-SERVICES
//
//// Chain KUBE-SERVICES (policy ACCEPT)
//// target prot opt source destination
//// KUBE-SVC-IPSETS all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-SERVICE-ACCESS dst,dst
//
//// Chain KUBE-SVC-IPSETS (1 references)
//// target prot opt source destination
//// KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst
//// ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst PHYSDEV match ! --physdev-is-in ADDRTYPE match src-type !LOCAL
//// ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst ADDRTYPE match dst-type LOCAL
//// ...
//func (proxier *Proxier) linkKubeIPSetsChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error {
// if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeServiceIPSetsChain); err != nil {
// return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeServiceIPSetsChain, err)
// }
//
// // TODO: iptables comment message for ipset?
// // The hash:ip,port type of sets require two src/dst parameters of the set match and SET target kernel modules.
// args := []string{"-m", "set", "--match-set", proxier.kubeServiceAccessSet.Name, "dst,dst", "-j", string(KubeServiceIPSetsChain)}
// if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, kubeServicesChain, args...); err != nil {
// return fmt.Errorf("Failed to ensure that ipset %s chain %s jumps to %s: %v", proxier.kubeServiceAccessSet.Name, kubeServicesChain, KubeServiceIPSetsChain, err)
// }
//
// // equal to `iptables -t nat -N KUBE-SVC-IPSETS`
// // write `:KUBE-SERVICES - [0:0]` in nat table
// if chain, ok := existingNATChains[KubeServiceIPSetsChain]; ok {
// writeLine(natChains, chain)
// } else {
// writeLine(natChains, utiliptables.MakeChainLine(KubeServiceIPSetsChain))
// }
// return nil
//}
func (proxier *Proxier) createKubeFireWallChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error {
// `iptables -t nat -N KUBE-FIRE-WALL`
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeFireWallChain); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeFireWallChain, err)
}
// write `:KUBE-FIRE-WALL - [0:0]` in nat table
if chain, ok := existingNATChains[KubeFireWallChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(KubeFireWallChain))
}
return nil
}
// Join all words with spaces, terminate with newline and write to buff. // Join all words with spaces, terminate with newline and write to buff.
func writeLine(buf *bytes.Buffer, words ...string) { func writeLine(buf *bytes.Buffer, words ...string) {
// We avoid strings.Join for performance reasons. // We avoid strings.Join for performance reasons.

View File

@ -33,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
proxyutil "k8s.io/kubernetes/pkg/proxy/util" proxyutil "k8s.io/kubernetes/pkg/proxy/util"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
@ -85,7 +87,7 @@ func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedNa
return nil return nil
} }
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs []net.IP) *Proxier { func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP) *Proxier {
fcmd := fakeexec.FakeCmd{ fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("dummy device have been created"), nil }, func() ([]byte, error) { return []byte("dummy device have been created"), nil },
@ -98,24 +100,34 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs
LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
} }
return &Proxier{ return &Proxier{
exec: fexec, exec: fexec,
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(testHostname), endpointsChanges: newEndpointsChangeMap(testHostname),
iptables: ipt, iptables: ipt,
ipvs: ipvs, ipvs: ipvs,
clusterCIDR: "10.0.0.0/24", ipset: ipset,
hostname: testHostname, clusterCIDR: "10.0.0.0/24",
portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable), hostname: testHostname,
portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}}, portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable),
healthChecker: newFakeHealthChecker(), portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}},
ipvsScheduler: DefaultScheduler, healthChecker: newFakeHealthChecker(),
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, ipvsScheduler: DefaultScheduler,
iptablesData: bytes.NewBuffer(nil), ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
natChains: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(), natRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false),
lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, false),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
} }
} }
@ -171,10 +183,11 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap
func TestNodePort(t *testing.T) { func TestNodePort(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake()
nodeIPv4 := net.ParseIP("100.101.102.103") nodeIPv4 := net.ParseIP("100.101.102.103")
nodeIPv6 := net.ParseIP("2001:db8::1:1") nodeIPv6 := net.ParseIP("2001:db8::1:1")
nodeIPs := sets.NewString(nodeIPv4.String(), nodeIPv6.String()) nodeIPs := sets.NewString(nodeIPv4.String(), nodeIPv6.String())
fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIPv4, nodeIPv6}) fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIPv4, nodeIPv6})
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -248,8 +261,9 @@ func TestNodePort(t *testing.T) {
func TestNodePortNoEndpoint(t *testing.T) { func TestNodePortNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake()
nodeIP := net.ParseIP("100.101.102.103") nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -301,7 +315,8 @@ func TestNodePortNoEndpoint(t *testing.T) {
func TestClusterIPNoEndpoint(t *testing.T) { func TestClusterIPNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcPortName := proxy.ServicePortName{ svcPortName := proxy.ServicePortName{
@ -344,7 +359,8 @@ func TestClusterIPNoEndpoint(t *testing.T) {
func TestClusterIP(t *testing.T) { func TestClusterIP(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIPv4 := "10.20.30.41" svcIPv4 := "10.20.30.41"
svcPortV4 := 80 svcPortV4 := 80
@ -450,7 +466,8 @@ func TestClusterIP(t *testing.T) {
func TestExternalIPsNoEndpoint(t *testing.T) { func TestExternalIPsNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcExternalIPs := "50.60.70.81" svcExternalIPs := "50.60.70.81"
@ -504,7 +521,8 @@ func TestExternalIPsNoEndpoint(t *testing.T) {
func TestExternalIPs(t *testing.T) { func TestExternalIPs(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcExternalIPs := sets.NewString("50.60.70.81", "2012::51") svcExternalIPs := sets.NewString("50.60.70.81", "2012::51")
@ -573,7 +591,8 @@ func TestExternalIPs(t *testing.T) {
func TestLoadBalancer(t *testing.T) { func TestLoadBalancer(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -624,8 +643,9 @@ func strPtr(s string) *string {
func TestOnlyLocalNodePorts(t *testing.T) { func TestOnlyLocalNodePorts(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake()
nodeIP := net.ParseIP("100.101.102.103") nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -705,7 +725,8 @@ func TestOnlyLocalNodePorts(t *testing.T) {
func TestOnlyLocalLoadBalancing(t *testing.T) { func TestOnlyLocalLoadBalancing(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -769,7 +790,8 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po
func TestBuildServiceMapAddRemove(t *testing.T) { func TestBuildServiceMapAddRemove(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
services := []*api.Service{ services := []*api.Service{
makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
@ -874,7 +896,8 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
func TestBuildServiceMapServiceHeadless(t *testing.T) { func TestBuildServiceMapServiceHeadless(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
makeServiceMap(fp, makeServiceMap(fp,
makeTestService("somewhere-else", "headless", func(svc *api.Service) { makeTestService("somewhere-else", "headless", func(svc *api.Service) {
@ -907,7 +930,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
makeServiceMap(fp, makeServiceMap(fp,
makeTestService("somewhere-else", "external-name", func(svc *api.Service) { makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
@ -934,7 +958,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
func TestBuildServiceMapServiceUpdate(t *testing.T) { func TestBuildServiceMapServiceUpdate(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) { servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.Type = api.ServiceTypeClusterIP
@ -1016,8 +1041,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
func TestSessionAffinity(t *testing.T) { func TestSessionAffinity(t *testing.T) {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake()
nodeIP := net.ParseIP("100.101.102.103") nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
@ -1879,7 +1905,8 @@ func Test_updateEndpointsMap(t *testing.T) {
for tci, tc := range testCases { for tci, tc := range testCases {
ipt := iptablestest.NewFake() ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake() ipvs := ipvstest.NewFake()
fp := NewFakeProxier(ipt, ipvs, nil) ipset := ipsettest.NewFake()
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp.hostname = nodeName fp.hostname = nodeName
// First check that after adding all previous versions of endpoints, // First check that after adding all previous versions of endpoints,

View File

@ -19,6 +19,7 @@ package util
import ( import (
"fmt" "fmt"
"net" "net"
"strconv"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -46,6 +47,21 @@ func IPPart(s string) string {
return "" return ""
} }
func PortPart(s string) (int, error) {
// Must be IP:port
_, port, err := net.SplitHostPort(s)
if err != nil {
glog.Errorf("Error parsing '%s': %v", s, err)
return -1, err
}
portNumber, err := strconv.Atoi(port)
if err != nil {
glog.Errorf("Error parsing '%s': %v", port, err)
return -1, err
}
return portNumber, nil
}
// ToCIDR returns a host address of the form <ip-address>/32 for // ToCIDR returns a host address of the form <ip-address>/32 for
// IPv4 and <ip-address>/128 for IPv6 // IPv4 and <ip-address>/128 for IPv6
func ToCIDR(ip net.IP) string { func ToCIDR(ip net.IP) string {

View File

@ -27,6 +27,7 @@ filegroup(
"//pkg/util/interrupt:all-srcs", "//pkg/util/interrupt:all-srcs",
"//pkg/util/io:all-srcs", "//pkg/util/io:all-srcs",
"//pkg/util/ipconfig:all-srcs", "//pkg/util/ipconfig:all-srcs",
"//pkg/util/ipset:all-srcs",
"//pkg/util/iptables:all-srcs", "//pkg/util/iptables:all-srcs",
"//pkg/util/ipvs:all-srcs", "//pkg/util/ipvs:all-srcs",
"//pkg/util/keymutex:all-srcs", "//pkg/util/keymutex:all-srcs",

41
pkg/util/ipset/BUILD Normal file
View File

@ -0,0 +1,41 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"ipset.go",
"types.go",
],
importpath = "k8s.io/kubernetes/pkg/util/ipset",
visibility = ["//visibility:public"],
deps = ["//vendor/k8s.io/utils/exec:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["ipset_test.go"],
importpath = "k8s.io/kubernetes/pkg/util/ipset",
library = ":go_default_library",
deps = [
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/util/ipset/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

325
pkg/util/ipset/ipset.go Normal file
View File

@ -0,0 +1,325 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipset
import (
"bytes"
"fmt"
"regexp"
"strconv"
"strings"
utilexec "k8s.io/utils/exec"
)
// Interface is an injectable interface for running ipset commands. Implementations must be goroutine-safe.
type Interface interface {
// FlushSet deletes all entries from a named set.
FlushSet(set string) error
// DestroySet deletes a named set.
DestroySet(set string) error
// DestroyAllSets deletes all sets.
DestroyAllSets() error
// CreateSet creates a new set, it will ignore error when the set already exists if ignoreExistErr=true.
CreateSet(set *IPSet, ignoreExistErr bool) error
// AddEntry adds a new entry to the named set.
AddEntry(entry string, set string, ignoreExistErr bool) error
// DelEntry deletes one entry from the named set
DelEntry(entry string, set string) error
// Test test if an entry exists in the named set
TestEntry(entry string, set string) (bool, error)
// ListEntries lists all the entries from a named set
ListEntries(set string) ([]string, error)
// ListSets list all set names from kernel
ListSets() ([]string, error)
// GetVersion returns the "X.Y" version string for ipset.
GetVersion() (string, error)
}
// IPSetCmd represents the ipset util. We use ipset command for ipset execute.
const IPSetCmd = "ipset"
// EntryMemberPattern is the regular expression pattern of ipset member list.
// The raw output of ipset command `ipset list {set}` is similar to,
//Name: foobar
//Type: hash:ip,port
//Revision: 2
//Header: family inet hashsize 1024 maxelem 65536
//Size in memory: 16592
//References: 0
//Members:
//192.168.1.2,tcp:8080
//192.168.1.1,udp:53
var EntryMemberPattern = "(?m)^(.*\n)*Members:\n"
// VersionPattern is the regular expression pattern of ipset version string.
// ipset version output is similar to "v6.10".
var VersionPattern = "v[0-9]+\\.[0-9]+"
// IPSet implements an Interface to an set.
type IPSet struct {
// Name is the set name.
Name string
// SetType specifies the ipset type.
SetType Type
// HashFamily specifies the protocol family of the IP addresses to be stored in the set.
// The default is inet, i.e IPv4. If users want to use IPv6, they should specify inet6.
HashFamily string
// HashSize specifies the hash table size of ipset.
HashSize int
// MaxElem specifies the max element number of ipset.
MaxElem int
// PortRange specifies the port range of bitmap:port type ipset.
PortRange string
}
// Entry represents a ipset entry.
type Entry struct {
// IP is the entry's IP. The IP address protocol corresponds to the HashFamily of IPSet.
// All entries' IP addresses in the same ip set has same the protocol, IPv4 or IPv6.
IP string
// Port is the entry's Port.
Port int
// Protocol is the entry's Protocol. The protocols of entries in the same ip set are all
// the same. The accepted protocols are TCP and UDP.
Protocol string
// Net is the entry's IP network address. Network address with zero prefix size can NOT
// be stored.
Net string
// IP2 is the entry's second IP. IP2 may not be empty for `hash:ip,port,ip` type ip set.
IP2 string
// SetType specifies the type of ip set where the entry exists.
SetType Type
}
func (e *Entry) String() string {
switch e.SetType {
case HashIPPort:
// Entry{192.168.1.1, udp, 53} -> 192.168.1.1,udp:53
// Entry{192.168.1.2, tcp, 8080} -> 192.168.1.2,tcp:8080
return fmt.Sprintf("%s,%s:%s", e.IP, e.Protocol, strconv.Itoa(e.Port))
case HashIPPortIP:
// Entry{192.168.1.1, udp, 53, 10.0.0.1} -> 192.168.1.1,udp:53,10.0.0.1
// Entry{192.168.1.2, tcp, 8080, 192.168.1.2} -> 192.168.1.2,tcp:8080,192.168.1.2
return fmt.Sprintf("%s,%s:%s,%s", e.IP, e.Protocol, strconv.Itoa(e.Port), e.IP2)
case HashIPPortNet:
// Entry{192.168.1.2, udp, 80, 10.0.1.0/24} -> 192.168.1.2,udp:80,10.0.1.0/24
// Entry{192.168.2,25, tcp, 8080, 10.1.0.0/16} -> 192.168.2,25,tcp:8080,10.1.0.0/16
return fmt.Sprintf("%s,%s:%s,%s", e.IP, e.Protocol, strconv.Itoa(e.Port), e.Net)
case BitmapPort:
// Entry{53} -> 53
// Entry{8080} -> 8080
return strconv.Itoa(e.Port)
}
return ""
}
type runner struct {
exec utilexec.Interface
}
// New returns a new Interface which will exec ipset.
func New(exec utilexec.Interface) Interface {
return &runner{
exec: exec,
}
}
// CreateSet creates a new set, it will ignore error when the set already exists if ignoreExistErr=true.
func (runner *runner) CreateSet(set *IPSet, ignoreExistErr bool) error {
// Using default values.
if set.HashSize == 0 {
set.HashSize = 1024
}
if set.MaxElem == 0 {
set.MaxElem = 65536
}
if set.HashFamily == "" {
set.HashFamily = ProtocolFamilyIPV4
}
if len(set.HashFamily) != 0 && set.HashFamily != ProtocolFamilyIPV4 && set.HashFamily != ProtocolFamilyIPV6 {
return fmt.Errorf("Currently supported protocol families are: %s and %s, %s is not supported", ProtocolFamilyIPV4, ProtocolFamilyIPV6, set.HashFamily)
}
// Default ipset type is "hash:ip,port"
if len(set.SetType) == 0 {
set.SetType = HashIPPort
}
// Check if setType is supported
if !IsValidIPSetType(set.SetType) {
return fmt.Errorf("Currently supported ipset types are: %v, %s is not supported", ValidIPSetTypes, set.SetType)
}
return runner.createSet(set, ignoreExistErr)
}
// If ignoreExistErr is set to true, then the -exist option of ipset will be specified, ipset ignores the error
// otherwise raised when the same set (setname and create parameters are identical) already exists.
func (runner *runner) createSet(set *IPSet, ignoreExistErr bool) error {
args := []string{"create", set.Name, string(set.SetType)}
if set.SetType == HashIPPortIP || set.SetType == HashIPPort {
args = append(args,
"family", set.HashFamily,
"hashsize", strconv.Itoa(set.HashSize),
"maxelem", strconv.Itoa(set.MaxElem),
)
}
if set.SetType == BitmapPort {
if len(set.PortRange) == 0 {
set.PortRange = DefaultPortRange
}
if !validatePortRange(set.PortRange) {
return fmt.Errorf("invalid port range for %s type ip set: %s, expect: a-b", BitmapPort, set.PortRange)
}
args = append(args, "range", set.PortRange)
}
if ignoreExistErr {
args = append(args, "-exist")
}
if _, err := runner.exec.Command(IPSetCmd, args...).CombinedOutput(); err != nil {
return fmt.Errorf("error creating ipset %s, error: %v", set.Name, err)
}
return nil
}
// AddEntry adds a new entry to the named set.
// If the -exist option is specified, ipset ignores the error otherwise raised when
// the same set (setname and create parameters are identical) already exists.
func (runner *runner) AddEntry(entry string, set string, ignoreExistErr bool) error {
args := []string{"add", set, entry}
if ignoreExistErr {
args = append(args, "-exist")
}
if _, err := runner.exec.Command(IPSetCmd, args...).CombinedOutput(); err != nil {
return fmt.Errorf("error adding entry %s, error: %v", entry, err)
}
return nil
}
// DelEntry is used to delete the specified entry from the set.
func (runner *runner) DelEntry(entry string, set string) error {
if _, err := runner.exec.Command(IPSetCmd, "del", set, entry).CombinedOutput(); err != nil {
return fmt.Errorf("error deleting entry %s: from set: %s, error: %v", entry, set, err)
}
return nil
}
// TestEntry is used to check whether the specified entry is in the set or not.
func (runner *runner) TestEntry(entry string, set string) (bool, error) {
if out, err := runner.exec.Command(IPSetCmd, "test", set, entry).CombinedOutput(); err == nil {
reg, e := regexp.Compile("NOT")
if e == nil && reg.MatchString(string(out)) {
return false, nil
} else if e == nil {
return true, nil
} else {
return false, fmt.Errorf("error testing entry: %s, error: %v", entry, e)
}
} else {
return false, fmt.Errorf("error testing entry %s: %v (%s)", entry, err, out)
}
}
// FlushSet deletes all entries from a named set.
func (runner *runner) FlushSet(set string) error {
if _, err := runner.exec.Command(IPSetCmd, "flush", set).CombinedOutput(); err != nil {
return fmt.Errorf("error flushing set: %s, error: %v", set, err)
}
return nil
}
// DestroySet is used to destroy a named set.
func (runner *runner) DestroySet(set string) error {
if _, err := runner.exec.Command(IPSetCmd, "destroy", set).CombinedOutput(); err != nil {
return fmt.Errorf("error destroying set %s:, error: %v", set, err)
}
return nil
}
// DestroyAllSets is used to destroy all sets.
func (runner *runner) DestroyAllSets() error {
if _, err := runner.exec.Command(IPSetCmd, "destroy").CombinedOutput(); err != nil {
return fmt.Errorf("error destroying all sets, error: %v", err)
}
return nil
}
// ListSets list all set names from kernel
func (runner *runner) ListSets() ([]string, error) {
out, err := runner.exec.Command(IPSetCmd, "list", "-n").CombinedOutput()
if err != nil {
return nil, fmt.Errorf("error listing all sets, error: %v", err)
}
return strings.Split(string(out), "\n"), nil
}
// ListEntries lists all the entries from a named set.
func (runner *runner) ListEntries(set string) ([]string, error) {
if len(set) == 0 {
return nil, fmt.Errorf("set name can't be nil")
}
out, err := runner.exec.Command(IPSetCmd, "list", set).CombinedOutput()
if err != nil {
return nil, fmt.Errorf("error listing set: %s, error: %v", set, err)
}
memberMatcher := regexp.MustCompile(EntryMemberPattern)
list := memberMatcher.ReplaceAllString(string(out[:]), "")
strs := strings.Split(list, "\n")
results := make([]string, 0)
for i := range strs {
if len(strs[i]) > 0 {
results = append(results, strs[i])
}
}
return results, nil
}
// GetVersion returns the version string.
func (runner *runner) GetVersion() (string, error) {
return getIPSetVersionString(runner.exec)
}
// getIPSetVersionString runs "ipset --version" to get the version string
// in the form of "X.Y", i.e "6.19"
func getIPSetVersionString(exec utilexec.Interface) (string, error) {
cmd := exec.Command(IPSetCmd, "--version")
cmd.SetStdin(bytes.NewReader([]byte{}))
bytes, err := cmd.CombinedOutput()
if err != nil {
return "", err
}
versionMatcher := regexp.MustCompile(VersionPattern)
match := versionMatcher.FindStringSubmatch(string(bytes))
if match == nil {
return "", fmt.Errorf("no ipset version found in string: %s", bytes)
}
return match[0], nil
}
func validatePortRange(portRange string) bool {
strs := strings.Split(portRange, "-")
if len(strs) != 2 {
return false
}
for i := range strs {
if _, err := strconv.Atoi(strs[i]); err != nil {
return false
}
}
return true
}
var _ = Interface(&runner{})

View File

@ -0,0 +1,483 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipset
import (
"reflect"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
)
func TestCheckIPSetVersion(t *testing.T) {
testCases := []struct {
vstring string
Expect string
Err bool
}{
{"ipset v4.0, protocol version: 4", "v4.0", false},
{"ipset v5.1, protocol version: 5", "v5.1", false},
{"ipset v6.0, protocol version: 6", "v6.0", false},
{"ipset v6.1, protocol version: 6", "v6.1", false},
{"ipset v6.19, protocol version: 6", "v6.19", false},
{"total junk", "", true},
}
for i := range testCases {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// ipset version response
func() ([]byte, error) { return []byte(testCases[i].vstring), nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
gotVersion, err := getIPSetVersionString(&fexec)
if (err != nil) != testCases[i].Err {
t.Errorf("Expected error: %v, Got error: %v", testCases[i].Err, err)
}
if err == nil {
if testCases[i].Expect != gotVersion {
t.Errorf("Expected result: %v, Got result: %v", testCases[i].Expect, gotVersion)
}
}
}
}
func TestFlushSet(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) { return []byte{}, nil },
// Success
func() ([]byte, error) { return []byte{}, nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Success.
err := runner.FlushSet("FOOBAR")
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "flush", "FOOBAR") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Flush again
err = runner.FlushSet("FOOBAR")
if err != nil {
t.Errorf("expected success, got %v", err)
}
}
func TestDestroySet(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) { return []byte{}, nil },
// Failure
func() ([]byte, error) {
return []byte("ipset v6.19: The set with the given name does not exist"), &fakeexec.FakeExitError{Status: 1}
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Success
err := runner.DestroySet("FOOBAR")
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "destroy", "FOOBAR") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Failure
err = runner.DestroySet("FOOBAR")
if err == nil {
t.Errorf("expected failure, got nil")
}
}
func TestDestroyAllSets(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) { return []byte{}, nil },
// Success
func() ([]byte, error) { return []byte{}, nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Success
err := runner.DestroyAllSets()
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "destroy") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Success
err = runner.DestroyAllSets()
if err != nil {
t.Errorf("Unexpected failure: %v", err)
}
}
func TestCreateSet(t *testing.T) {
testSet := IPSet{
Name: "FOOBAR",
SetType: HashIPPort,
HashFamily: ProtocolFamilyIPV4,
}
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) { return []byte{}, nil },
// Success
func() ([]byte, error) { return []byte{}, nil },
// Failure
func() ([]byte, error) {
return []byte("ipset v6.19: Set cannot be created: set with the same name already exists"), &fakeexec.FakeExitError{Status: 1}
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Create with ignoreExistErr = false, expect success
err := runner.CreateSet(&testSet, false)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "create", "FOOBAR", "hash:ip,port", "family", "inet", "hashsize", "1024", "maxelem", "65536") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Create with ignoreExistErr = true, expect success
err = runner.CreateSet(&testSet, true)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 2 {
t.Errorf("expected 2 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("ipset", "create", "FOOBAR", "hash:ip,port", "family", "inet", "hashsize", "1024", "maxelem", "65536", "-exist") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[1])
}
// Create with ignoreExistErr = false, expect failure
err = runner.CreateSet(&testSet, false)
if err == nil {
t.Errorf("expected failure, got nil")
}
}
func TestAddEntry(t *testing.T) {
testEntry := &Entry{
IP: "192.168.1.1",
Port: 53,
Protocol: ProtocolUDP,
SetType: HashIPPort,
}
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) { return []byte{}, nil },
// Success
func() ([]byte, error) { return []byte{}, nil },
// Failure
func() ([]byte, error) {
return []byte("ipset v6.19: Set cannot be created: set with the same name already exists"), &fakeexec.FakeExitError{Status: 1}
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Create with ignoreExistErr = false, expect success
err := runner.AddEntry(testEntry.String(), "FOOBAR", false)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "add", "FOOBAR", "192.168.1.1,udp:53") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Create with ignoreExistErr = true, expect success
err = runner.AddEntry(testEntry.String(), "FOOBAR", true)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 2 {
t.Errorf("expected 3 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("ipset", "add", "FOOBAR", "192.168.1.1,udp:53", "-exist") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[1])
}
// Create with ignoreExistErr = false, expect failure
err = runner.AddEntry(testEntry.String(), "FOOBAR", false)
if err == nil {
t.Errorf("expected failure, got nil")
}
}
func TestDelEntry(t *testing.T) {
// TODO: Test more set type
testEntry := &Entry{
IP: "192.168.1.1",
Port: 53,
Protocol: ProtocolUDP,
SetType: HashIPPort,
}
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) { return []byte{}, nil },
// Failure
func() ([]byte, error) {
return []byte("ipset v6.19: Element cannot be deleted from the set: it's not added"), &fakeexec.FakeExitError{Status: 1}
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
err := runner.DelEntry(testEntry.String(), "FOOBAR")
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "del", "FOOBAR", "192.168.1.1,udp:53") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
err = runner.DelEntry(testEntry.String(), "FOOBAR")
if err == nil {
t.Errorf("expected failure, got nil")
}
}
func TestTestEntry(t *testing.T) {
// TODO: IPv6?
testEntry := &Entry{
IP: "10.120.7.100",
Port: 8080,
Protocol: ProtocolTCP,
SetType: HashIPPort,
}
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) { return []byte("10.120.7.100,tcp:8080 is in set FOOBAR."), nil },
// Failure
func() ([]byte, error) {
return []byte("192.168.1.3,tcp:8080 is NOT in set FOOBAR."), &fakeexec.FakeExitError{Status: 1}
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Success
ok, err := runner.TestEntry(testEntry.String(), "FOOBAR")
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 2 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "test", "FOOBAR", "10.120.7.100,tcp:8080") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
if !ok {
t.Errorf("expect entry exists in test set, got not")
}
// Failure
ok, err = runner.TestEntry(testEntry.String(), "FOOBAR")
if err == nil || ok {
t.Errorf("expect entry doesn't exist in test set")
}
}
func TestListEntries(t *testing.T) {
output := `Name: foobar
Type: hash:ip,port
Revision: 2
Header: family inet hashsize 1024 maxelem 65536
Size in memory: 16592
References: 0
Members:
192.168.1.2,tcp:8080
192.168.1.1,udp:53`
emptyOutput := `Name: KUBE-NODE-PORT
Type: bitmap:port
Revision: 1
Header: range 0-65535
Size in memory: 524432
References: 1
Members:
`
testCases := []struct {
output string
expected []string
}{
{
output: output,
expected: []string{"192.168.1.2,tcp:8080", "192.168.1.1,udp:53"},
},
{
output: emptyOutput,
expected: []string{},
},
}
for i := range testCases {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) {
return []byte(testCases[i].output), nil
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
},
},
}
runner := New(&fexec)
// Success
entries, err := runner.ListEntries("foobar")
if err != nil {
t.Errorf("expected success, got: %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got: %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "list", "foobar") {
t.Errorf("wrong CombinedOutput() log, got: %s", fcmd.CombinedOutputLog[0])
}
if len(entries) != len(testCases[i].expected) {
t.Errorf("expected %d ipset entries, got: %d", len(testCases[i].expected), len(entries))
}
if !reflect.DeepEqual(entries, testCases[i].expected) {
t.Errorf("expected entries: %v, got: %v", testCases[i].expected, entries)
}
}
}
func TestListSets(t *testing.T) {
output := `foo
bar
baz`
expected := []string{"foo", "bar", "baz"}
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success
func() ([]byte, error) { return []byte(output), nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Success
list, err := runner.ListSets()
if err != nil {
t.Errorf("expected success, got: %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got: %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "list", "-n") {
t.Errorf("wrong CombinedOutput() log, got: %s", fcmd.CombinedOutputLog[0])
}
if len(list) != len(expected) {
t.Errorf("expected %d sets, got: %d", len(expected), len(list))
}
if !reflect.DeepEqual(list, expected) {
t.Errorf("expected sets: %v, got: %v", expected, list)
}
}

View File

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["fake.go"],
importpath = "k8s.io/kubernetes/pkg/util/ipset/testing",
visibility = ["//visibility:public"],
deps = ["//pkg/util/ipset:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,83 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"k8s.io/kubernetes/pkg/util/ipset"
)
// FakeIPSet is a no-op implementation of ipset Interface
type FakeIPSet struct {
Lines []byte
}
// NewFake create a new fake ipset interface.
func NewFake() *FakeIPSet {
return &FakeIPSet{}
}
// GetVersion is part of interface.
func (*FakeIPSet) GetVersion() (string, error) {
return "0.0", nil
}
// FlushSet is part of interface.
func (*FakeIPSet) FlushSet(set string) error {
return nil
}
// DestroySet is part of interface.
func (*FakeIPSet) DestroySet(set string) error {
return nil
}
// DestroyAllSets is part of interface.
func (*FakeIPSet) DestroyAllSets() error {
return nil
}
// CreateSet is part of interface.
func (*FakeIPSet) CreateSet(set *ipset.IPSet, ignoreExistErr bool) error {
return nil
}
// AddEntry is part of interface.
func (*FakeIPSet) AddEntry(entry string, set string, ignoreExistErr bool) error {
return nil
}
// DelEntry is part of interface.
func (*FakeIPSet) DelEntry(entry string, set string) error {
return nil
}
// TestEntry is part of interface.
func (*FakeIPSet) TestEntry(entry string, set string) (bool, error) {
return true, nil
}
// ListEntries is part of interface.
func (*FakeIPSet) ListEntries(set string) ([]string, error) {
return nil, nil
}
// ListSets is part of interface.
func (*FakeIPSet) ListSets() ([]string, error) {
return nil, nil
}
var _ = ipset.Interface(&FakeIPSet{})

70
pkg/util/ipset/types.go Normal file
View File

@ -0,0 +1,70 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipset
// Type represents the ipset type
type Type string
const (
// HashIPPort represents the `hash:ip,port` type ipset. The hash:ip,port is similar to hash:ip but
// you can store IP address and protocol-port pairs in it. TCP, SCTP, UDP, UDPLITE, ICMP and ICMPv6 are supported
// with port numbers/ICMP(v6) types and other protocol numbers without port information.
HashIPPort Type = "hash:ip,port"
// HashIPPortIP represents the `hash:ip,port,ip` type ipset. The hash:ip,port,ip set type uses a hash to store
// IP address, port number and a second IP address triples. The port number is interpreted together with a
// protocol (default TCP) and zero protocol number cannot be used.
HashIPPortIP Type = "hash:ip,port,ip"
// HashIPPortNet represents the `hash:ip,port,net` type ipset. The hash:ip,port,net set type uses a hash to store IP address, port number and IP network address triples. The port
// number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. Network address
// with zero prefix size cannot be stored either.
HashIPPortNet Type = "hash:ip,port,net"
// BitmapPort represents the `bitmap:port` type ipset. The bitmap:port set type uses a memory range, where each bit
// represents one TCP/UDP port. A bitmap:port type of set can store up to 65535 ports.
BitmapPort Type = "bitmap:port"
)
// DefaultPortRange defines the default bitmap:port valid port range.
const DefaultPortRange string = "0-65535"
const (
// ProtocolFamilyIPV4 represents IPv4 protocol.
ProtocolFamilyIPV4 = "inet"
// ProtocolFamilyIPV6 represents IPv6 protocol.
ProtocolFamilyIPV6 = "inet6"
// ProtocolTCP represents TCP protocol.
ProtocolTCP = "tcp"
// ProtocolUDP represents UDP protocol.
ProtocolUDP = "udp"
)
// ValidIPSetTypes defines the supported ip set type.
var ValidIPSetTypes = []Type{
HashIPPort,
HashIPPortIP,
BitmapPort,
HashIPPortNet,
}
// IsValidIPSetType checks if the given ipset type is valid.
func IsValidIPSetType(set Type) bool {
for _, valid := range ValidIPSetTypes {
if set == valid {
return true
}
}
return false
}