Merge pull request #62083 from rramkumar1/ipvs-exclude-cidrs-flag

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add --ipvs-exclude-cidrs flag to kube-proxy. 

**What this PR does / why we need it**:
Add a flag to kube-proxy called --ipvs-exclude-cidrs. This flag allows a user to specify a list of CIDR ranges that should not be included in the cleanup of IPVS rules. 

Fixes: #59507

**Release note**:
```
Use --ipvs-exclude-cidrs to specify a list of CIDR's which the IPVS proxier should not touch when cleaning up IPVS rules.
```
/assign @m1093782566
pull/8/head
Kubernetes Submit Queue 2018-04-24 11:13:14 -07:00 committed by GitHub
commit c0d1ab8e99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 217 additions and 12 deletions

View File

@ -149,6 +149,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.config.IPTables.MinSyncPeriod.Duration, "iptables-min-sync-period", o.config.IPTables.MinSyncPeriod.Duration, "The minimum interval of how often the iptables rules can be refreshed as endpoints and services change (e.g. '5s', '1m', '2h22m').")
fs.DurationVar(&o.config.IPVS.SyncPeriod.Duration, "ipvs-sync-period", o.config.IPVS.SyncPeriod.Duration, "The maximum interval of how often ipvs rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.")
fs.DurationVar(&o.config.IPVS.MinSyncPeriod.Duration, "ipvs-min-sync-period", o.config.IPVS.MinSyncPeriod.Duration, "The minimum interval of how often the ipvs rules can be refreshed as endpoints and services change (e.g. '5s', '1m', '2h22m').")
fs.StringSliceVar(&o.config.IPVS.ExcludeCIDRs, "ipvs-exclude-cidrs", o.config.IPVS.ExcludeCIDRs, "A comma-separated list of CIDR's which the ipvs proxier should not touch when cleaning up IPVS rules.")
fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed. Must be greater than 0.")
fs.BoolVar(&o.config.IPTables.MasqueradeAll, "masquerade-all", o.config.IPTables.MasqueradeAll, "If using the pure iptables proxy, SNAT all traffic sent via Service cluster IPs (this not commonly needed)")
fs.StringVar(&o.config.ClusterCIDR, "cluster-cidr", o.config.ClusterCIDR, "The CIDR range of pods in the cluster. When configured, traffic sent to a Service cluster IP from outside this range will be masqueraded and traffic sent from pods to an external LoadBalancer IP will be directed to the respective cluster IP instead")

View File

@ -189,6 +189,7 @@ func newProxyServer(
execer,
config.IPVS.SyncPeriod.Duration,
config.IPVS.MinSyncPeriod.Duration,
config.IPVS.ExcludeCIDRs,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
config.ClusterCIDR,

View File

@ -201,6 +201,9 @@ iptables:
ipvs:
minSyncPeriod: 10s
syncPeriod: 60s
excludeCIDRs:
- "10.20.30.40/16"
- "fd00:1::0/64"
kind: KubeProxyConfiguration
metricsBindAddress: "%s"
mode: "%s"
@ -316,6 +319,7 @@ nodePortAddresses:
IPVS: kubeproxyconfig.KubeProxyIPVSConfiguration{
MinSyncPeriod: metav1.Duration{Duration: 10 * time.Second},
SyncPeriod: metav1.Duration{Duration: 60 * time.Second},
ExcludeCIDRs: []string{"10.20.30.40/16", "fd00:1::0/64"},
},
MetricsBindAddress: tc.metricsBindAddress,
Mode: kubeproxyconfig.ProxyMode(tc.mode),

View File

@ -67,6 +67,9 @@ type KubeProxyIPVSConfiguration struct {
MinSyncPeriod metav1.Duration
// ipvs scheduler
Scheduler string
// excludeCIDRs is a list of CIDR's which the ipvs proxier should not touch
// when cleaning up ipvs services.
ExcludeCIDRs []string
}
// KubeProxyConntrackConfiguration contains conntrack settings for

View File

@ -63,6 +63,9 @@ type KubeProxyIPVSConfiguration struct {
MinSyncPeriod metav1.Duration `json:"minSyncPeriod"`
// ipvs scheduler
Scheduler string `json:"scheduler"`
// excludeCIDRs is a list of CIDR's which the ipvs proxier should not touch
// when cleaning up ipvs services.
ExcludeCIDRs []string
}
// KubeProxyConntrackConfiguration contains conntrack settings for

View File

@ -206,6 +206,7 @@ func autoConvert_v1alpha1_KubeProxyIPVSConfiguration_To_kubeproxyconfig_KubeProx
out.SyncPeriod = in.SyncPeriod
out.MinSyncPeriod = in.MinSyncPeriod
out.Scheduler = in.Scheduler
out.ExcludeCIDRs = *(*[]string)(unsafe.Pointer(&in.ExcludeCIDRs))
return nil
}
@ -218,6 +219,7 @@ func autoConvert_kubeproxyconfig_KubeProxyIPVSConfiguration_To_v1alpha1_KubeProx
out.SyncPeriod = in.SyncPeriod
out.MinSyncPeriod = in.MinSyncPeriod
out.Scheduler = in.Scheduler
out.ExcludeCIDRs = *(*[]string)(unsafe.Pointer(&in.ExcludeCIDRs))
return nil
}

View File

@ -54,7 +54,7 @@ func (in *KubeProxyConfiguration) DeepCopyInto(out *KubeProxyConfiguration) {
}
out.ClientConnection = in.ClientConnection
in.IPTables.DeepCopyInto(&out.IPTables)
out.IPVS = in.IPVS
in.IPVS.DeepCopyInto(&out.IPVS)
if in.OOMScoreAdj != nil {
in, out := &in.OOMScoreAdj, &out.OOMScoreAdj
if *in == nil {
@ -186,6 +186,11 @@ func (in *KubeProxyIPVSConfiguration) DeepCopyInto(out *KubeProxyIPVSConfigurati
*out = *in
out.SyncPeriod = in.SyncPeriod
out.MinSyncPeriod = in.MinSyncPeriod
if in.ExcludeCIDRs != nil {
in, out := &in.ExcludeCIDRs, &out.ExcludeCIDRs
*out = make([]string, len(*in))
copy(*out, *in)
}
return
}

View File

@ -116,6 +116,7 @@ func validateKubeProxyIPVSConfiguration(config kubeproxyconfig.KubeProxyIPVSConf
}
allErrs = append(allErrs, validateIPVSSchedulerMethod(kubeproxyconfig.IPVSSchedulerMethod(config.Scheduler), fldPath.Child("Scheduler"))...)
allErrs = append(allErrs, validateIPVSExcludeCIDRs(config.ExcludeCIDRs, fldPath.Child("ExcludeCidrs"))...)
return allErrs
}
@ -253,3 +254,14 @@ func validateKubeProxyNodePortAddress(nodePortAddresses []string, fldPath *field
return allErrs
}
func validateIPVSExcludeCIDRs(excludeCIDRs []string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
for i := range excludeCIDRs {
if _, _, err := net.ParseCIDR(excludeCIDRs[i]); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, excludeCIDRs, "must be a valid IP block"))
}
}
return allErrs
}

View File

@ -749,3 +749,75 @@ func TestValidateKubeProxyNodePortAddress(t *testing.T) {
}
}
}
func TestValidateKubeProxyExcludeCIDRs(t *testing.T) {
// TODO(rramkumar): This test is a copy of TestValidateKubeProxyNodePortAddress.
// Maybe some code can be shared?
newPath := field.NewPath("KubeProxyConfiguration")
successCases := []struct {
addresses []string
}{
{[]string{}},
{[]string{"127.0.0.0/8"}},
{[]string{"0.0.0.0/0"}},
{[]string{"::/0"}},
{[]string{"127.0.0.1/32", "1.2.3.0/24"}},
{[]string{"127.0.0.0/8"}},
{[]string{"127.0.0.1/32"}},
{[]string{"::1/128"}},
{[]string{"1.2.3.4/32"}},
{[]string{"10.20.30.0/24"}},
{[]string{"10.20.0.0/16", "100.200.0.0/16"}},
{[]string{"10.0.0.0/8"}},
{[]string{"2001:db8::/32"}},
}
for _, successCase := range successCases {
if errs := validateIPVSExcludeCIDRs(successCase.addresses, newPath.Child("ExcludeCIDRs")); len(errs) != 0 {
t.Errorf("expected success: %v", errs)
}
}
errorCases := []struct {
addresses []string
msg string
}{
{
addresses: []string{"foo"},
msg: "must be a valid IP block",
},
{
addresses: []string{"1.2.3"},
msg: "must be a valid IP block",
},
{
addresses: []string{""},
msg: "must be a valid IP block",
},
{
addresses: []string{"10.20.30.40"},
msg: "must be a valid IP block",
},
{
addresses: []string{"::1"},
msg: "must be a valid IP block",
},
{
addresses: []string{"2001:db8:1"},
msg: "must be a valid IP block",
},
{
addresses: []string{"2001:db8:xyz/64"},
msg: "must be a valid IP block",
},
}
for _, errorCase := range errorCases {
if errs := validateIPVSExcludeCIDRs(errorCase.addresses, newPath.Child("ExcludeCIDRs")); len(errs) == 0 {
t.Errorf("expected failure for %s", errorCase.msg)
} else if !strings.Contains(errs[0].Error(), errorCase.msg) {
t.Errorf("unexpected error: %v, expected: %s", errs[0], errorCase.msg)
}
}
}

View File

@ -76,7 +76,7 @@ func (in *KubeProxyConfiguration) DeepCopyInto(out *KubeProxyConfiguration) {
}
out.ClientConnection = in.ClientConnection
in.IPTables.DeepCopyInto(&out.IPTables)
out.IPVS = in.IPVS
in.IPVS.DeepCopyInto(&out.IPVS)
if in.OOMScoreAdj != nil {
in, out := &in.OOMScoreAdj, &out.OOMScoreAdj
if *in == nil {
@ -208,6 +208,11 @@ func (in *KubeProxyIPVSConfiguration) DeepCopyInto(out *KubeProxyIPVSConfigurati
*out = *in
out.SyncPeriod = in.SyncPeriod
out.MinSyncPeriod = in.MinSyncPeriod
if in.ExcludeCIDRs != nil {
in, out := &in.ExcludeCIDRs, &out.ExcludeCIDRs
*out = make([]string, len(*in))
copy(*out, *in)
}
return
}

View File

@ -127,8 +127,10 @@ type Proxier struct {
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
minSyncPeriod time.Duration
syncPeriod time.Duration
minSyncPeriod time.Duration
// Values are CIDR's to exclude when cleaning up IPVS rules.
excludeCIDRs []string
iptables utiliptables.Interface
ipvs utilipvs.Interface
ipset utilipset.Interface
@ -258,6 +260,7 @@ func NewProxier(ipt utiliptables.Interface,
exec utilexec.Interface,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
excludeCIDRs []string,
masqueradeAll bool,
masqueradeBit int,
clusterCIDR string,
@ -1542,16 +1545,28 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
return nil
}
func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
unbindIPAddr := sets.NewString()
for cS := range currentServices {
if !atciveServices[cS] {
svc := currentServices[cS]
err := proxier.ipvs.DeleteVirtualServer(svc)
if err != nil {
glog.Errorf("Failed to delete service, error: %v", err)
for cs := range currentServices {
svc := currentServices[cs]
if _, ok := activeServices[cs]; !ok {
// This service was not processed in the latest sync loop so before deleting it,
// make sure it does not fall within an excluded CIDR range.
okayToDelete := true
for _, excludedCIDR := range proxier.excludeCIDRs {
// Any validation of this CIDR already should have occurred.
_, n, _ := net.ParseCIDR(excludedCIDR)
if n.Contains(svc.Address) {
okayToDelete = false
break
}
}
if okayToDelete {
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
glog.Errorf("Failed to delete service, error: %v", err)
}
unbindIPAddr.Insert(svc.Address.String())
}
unbindIPAddr.Insert(svc.Address.String())
}
}

View File

@ -125,6 +125,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil),
excludeCIDRs: make([]string, 0),
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
@ -2395,6 +2396,87 @@ func Test_syncService(t *testing.T) {
}
}
func Test_cleanLegacyService(t *testing.T) {
// All ipvs services that were processed in the latest sync loop.
activeServices := map[string]bool{"ipvs0": true, "ipvs1": true}
// All ipvs services in the system.
currentServices := map[string]*utilipvs.VirtualServer{
// Created by kube-proxy.
"ipvs0": {
Address: net.ParseIP("1.1.1.1"),
Protocol: string(api.ProtocolUDP),
Port: 53,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by kube-proxy.
"ipvs1": {
Address: net.ParseIP("2.2.2.2"),
Protocol: string(api.ProtocolUDP),
Port: 54,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by an external party.
"ipvs2": {
Address: net.ParseIP("3.3.3.3"),
Protocol: string(api.ProtocolUDP),
Port: 55,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by an external party.
"ipvs3": {
Address: net.ParseIP("4.4.4.4"),
Protocol: string(api.ProtocolUDP),
Port: 56,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by an external party.
"ipvs4": {
Address: net.ParseIP("5.5.5.5"),
Protocol: string(api.ProtocolUDP),
Port: 57,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by kube-proxy, but now stale.
"ipvs5": {
Address: net.ParseIP("6.6.6.6"),
Protocol: string(api.ProtocolUDP),
Port: 58,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
}
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
proxier := NewFakeProxier(ipt, ipvs, ipset, nil)
// These CIDRs cover only ipvs2 and ipvs3.
proxier.excludeCIDRs = []string{"3.3.3.0/24", "4.4.4.0/24"}
for v := range currentServices {
proxier.ipvs.AddVirtualServer(currentServices[v])
}
proxier.cleanLegacyService(activeServices, currentServices)
// ipvs4 and ipvs5 should have been cleaned.
remainingVirtualServers, _ := proxier.ipvs.GetVirtualServers()
if len(remainingVirtualServers) != 4 {
t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers))
}
for _, vs := range remainingVirtualServers {
// Checking that ipvs4 and ipvs5 were removed.
if vs.Port == 57 {
t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains")
}
if vs.Port == 58 {
t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
}
}
}
func buildFakeProxier(nodeIP []net.IP) (*iptablestest.FakeIPTables, *Proxier) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()