From 00f8ae35408e5d05dfdb3f7cf4f4f670060a5682 Mon Sep 17 00:00:00 2001 From: choury Date: Mon, 4 Sep 2017 13:41:44 +0800 Subject: [PATCH] fix duplicate unbind action --- pkg/proxy/ipvs/BUILD | 24 +++++- pkg/proxy/ipvs/netlink.go | 25 ++++++ pkg/proxy/ipvs/netlink_linux.go | 72 ++++++++++++++++ pkg/proxy/ipvs/netlink_unsupported.go | 41 +++++++++ pkg/proxy/ipvs/proxier.go | 18 ++-- pkg/proxy/ipvs/proxier_test.go | 2 + pkg/proxy/ipvs/testing/BUILD | 32 +++++++ pkg/proxy/ipvs/testing/fake.go | 38 ++++++++ pkg/util/ipvs/BUILD | 3 - pkg/util/ipvs/ipvs.go | 4 - pkg/util/ipvs/ipvs_linux.go | 30 ------- pkg/util/ipvs/ipvs_linux_test.go | 120 -------------------------- pkg/util/ipvs/ipvs_unsupported.go | 8 -- pkg/util/ipvs/testing/fake.go | 10 --- 14 files changed, 244 insertions(+), 183 deletions(-) create mode 100644 pkg/proxy/ipvs/netlink.go create mode 100644 pkg/proxy/ipvs/netlink_linux.go create mode 100644 pkg/proxy/ipvs/netlink_unsupported.go create mode 100644 pkg/proxy/ipvs/testing/BUILD create mode 100644 pkg/proxy/ipvs/testing/fake.go diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 080ec19e3e..478844250d 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -13,6 +13,7 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", @@ -30,7 +31,16 @@ go_test( go_library( name = "go_default_library", - srcs = ["proxier.go"], + srcs = [ + "netlink.go", + "netlink_unsupported.go", + "proxier.go", + ] + select({ + "@io_bazel_rules_go//go/platform:linux_amd64": [ + "netlink_linux.go", + ], + "//conditions:default": [], + }), deps = [ "//pkg/api:go_default_library", "//pkg/api/helper:go_default_library", @@ -52,7 +62,12 @@ go_library( "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", - ], + ] + select({ + "@io_bazel_rules_go//go/platform:linux_amd64": [ + "//vendor/github.com/vishvananda/netlink:go_default_library", + ], + "//conditions:default": [], + }), ) filegroup( @@ -64,6 +79,9 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/proxy/ipvs/testing:all-srcs", + ], tags = ["automanaged"], ) diff --git a/pkg/proxy/ipvs/netlink.go b/pkg/proxy/ipvs/netlink.go new file mode 100644 index 0000000000..a4e859f2b2 --- /dev/null +++ b/pkg/proxy/ipvs/netlink.go @@ -0,0 +1,25 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +// NetLinkHandle for revoke netlink interface +type NetLinkHandle interface { + // EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true. + EnsureAddressBind(address, devName string) (exist bool, err error) + // UnbindAddress unbind address from the interface + UnbindAddress(address, devName string) error +} diff --git a/pkg/proxy/ipvs/netlink_linux.go b/pkg/proxy/ipvs/netlink_linux.go new file mode 100644 index 0000000000..df5b2f779a --- /dev/null +++ b/pkg/proxy/ipvs/netlink_linux.go @@ -0,0 +1,72 @@ +// +build linux + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "fmt" + "net" + "syscall" + + "github.com/vishvananda/netlink" +) + +type netlinkHandle struct { + netlink.Handle +} + +// NewNetLinkHandle will crate a new netlinkHandle +func NewNetLinkHandle() NetLinkHandle { + return &netlinkHandle{netlink.Handle{}} +} + +// EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true. +func (h *netlinkHandle) EnsureAddressBind(address, devName string) (exist bool, err error) { + dev, err := h.LinkByName(devName) + if err != nil { + return false, fmt.Errorf("error get interface: %s, err: %v", devName, err) + } + addr := net.ParseIP(address) + if addr == nil { + return false, fmt.Errorf("error parse ip address: %s", address) + } + if err := h.AddrAdd(dev, &netlink.Addr{IPNet: netlink.NewIPNet(addr)}); err != nil { + // "EEXIST" will be returned if the address is already bound to device + if err == syscall.Errno(syscall.EEXIST) { + return true, nil + } + return false, fmt.Errorf("error bind address: %s to interface: %s, err: %v", address, devName, err) + } + return false, nil +} + +// UnbindAddress unbind address from the interface +func (h *netlinkHandle) UnbindAddress(address, devName string) error { + dev, err := h.LinkByName(devName) + if err != nil { + return fmt.Errorf("error get interface: %s, err: %v", devName, err) + } + addr := net.ParseIP(address) + if addr == nil { + return fmt.Errorf("error parse ip address: %s", address) + } + if err := h.AddrDel(dev, &netlink.Addr{IPNet: netlink.NewIPNet(addr)}); err != nil { + return fmt.Errorf("error unbind address: %s from interface: %s, err: %v", address, devName, err) + } + return nil +} diff --git a/pkg/proxy/ipvs/netlink_unsupported.go b/pkg/proxy/ipvs/netlink_unsupported.go new file mode 100644 index 0000000000..6fec7de4a6 --- /dev/null +++ b/pkg/proxy/ipvs/netlink_unsupported.go @@ -0,0 +1,41 @@ +// +build !linux + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "fmt" +) + +type emptyHandle struct { +} + +// NewNetLinkHandle will create an EmptyHandle +func NewNetLinkHandle() NetLinkHandle { + return &emptyHandle{} +} + +// EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true. +func (h *emptyHandle) EnsureAddressBind(address, devName string) (exist bool, err error) { + return false, fmt.Errorf("netlink not supported for this platform") +} + +// UnbindAddress unbind address from the interface +func (h *emptyHandle) UnbindAddress(address, devName string) error { + return fmt.Errorf("netlink not supported for this platform") +} diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 6da4a0a485..59ece18578 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -134,6 +134,8 @@ type Proxier struct { iptablesData *bytes.Buffer natChains *bytes.Buffer natRules *bytes.Buffer + // Added as a member to the struct to allow injection for testing. + netlinkHandle NetLinkHandle } // IPGetter helps get node network interface IP @@ -270,6 +272,7 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, iptablesData: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), + netlinkHandle: NewNetLinkHandle(), } burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) @@ -1295,7 +1298,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, // bind service address to dummy interface even if service not changed, // in case that service IP was removed by other processes if bindAddr { - _, err := proxier.ipvs.EnsureVirtualServerAddressBind(vs, DefaultDummyDevice) + _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice) if err != nil { glog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err) return err @@ -1384,6 +1387,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) { + unbindIPAddr := sets.NewString() for cS := range currentServices { if !atciveServices[cS] { svc := currentServices[cS] @@ -1391,10 +1395,14 @@ func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, curre if err != nil { glog.Errorf("Failed to delete service, error: %v", err) } - err = proxier.ipvs.UnbindVirtualServerAddress(svc, DefaultDummyDevice) - if err != nil { - glog.Errorf("Failed to unbind service from dummy interface, error: %v", err) - } + unbindIPAddr.Insert(svc.Address.String()) + } + } + + for _, addr := range unbindIPAddr.UnsortedList() { + err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice) + if err != nil { + glog.Errorf("Failed to unbind service from dummy interface, error: %v", err) } } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 251d9a90d9..8119f6d5ac 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -31,6 +31,7 @@ import ( fakeexec "k8s.io/utils/exec/testing" "k8s.io/apimachinery/pkg/util/sets" + netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" proxyutil "k8s.io/kubernetes/pkg/proxy/util" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" @@ -114,6 +115,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs iptablesData: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), + netlinkHandle: netlinktest.NewFakeNetlinkHandle(), } } diff --git a/pkg/proxy/ipvs/testing/BUILD b/pkg/proxy/ipvs/testing/BUILD new file mode 100644 index 0000000000..7efddaf19d --- /dev/null +++ b/pkg/proxy/ipvs/testing/BUILD @@ -0,0 +1,32 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = select({ + "@io_bazel_rules_go//go/platform:linux_amd64": [ + "fake.go", + ], + "//conditions:default": [], + }), + tags = ["automanaged"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/proxy/ipvs/testing/fake.go b/pkg/proxy/ipvs/testing/fake.go new file mode 100644 index 0000000000..e1dc62929e --- /dev/null +++ b/pkg/proxy/ipvs/testing/fake.go @@ -0,0 +1,38 @@ +// +build linux + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +//FakeNetlinkHandle mock implementation of proxy NetlinkHandle +type FakeNetlinkHandle struct { +} + +//NewFakeNetlinkHandle will create a new FakeNetlinkHandle +func NewFakeNetlinkHandle() *FakeNetlinkHandle { + return &FakeNetlinkHandle{} +} + +//EnsureAddressBind is a mock implementation +func (h *FakeNetlinkHandle) EnsureAddressBind(address, devName string) (exist bool, err error) { + return false, nil +} + +//UnbindAddress is a mock implementation +func (h *FakeNetlinkHandle) UnbindAddress(address, devName string) error { + return nil +} diff --git a/pkg/util/ipvs/BUILD b/pkg/util/ipvs/BUILD index 5853ebb8d3..f78bd8f2f2 100644 --- a/pkg/util/ipvs/BUILD +++ b/pkg/util/ipvs/BUILD @@ -20,9 +20,6 @@ go_test( deps = select({ "@io_bazel_rules_go//go/platform:linux_amd64": [ "//vendor/github.com/docker/libnetwork/ipvs:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//vendor/k8s.io/utils/exec:go_default_library", - "//vendor/k8s.io/utils/exec/testing:go_default_library", ], "//conditions:default": [], }), diff --git a/pkg/util/ipvs/ipvs.go b/pkg/util/ipvs/ipvs.go index c474e921e4..9d15beb0c6 100644 --- a/pkg/util/ipvs/ipvs.go +++ b/pkg/util/ipvs/ipvs.go @@ -25,10 +25,6 @@ import ( type Interface interface { // Flush clears all virtual servers in system. return occurred error immediately. Flush() error - // EnsureVirtualServerAddressBind checks if virtual server's address is bound to dummy interface and, if not, binds it. If the address is already bound, return true. - EnsureVirtualServerAddressBind(serv *VirtualServer, dev string) (exist bool, err error) - // UnbindVirtualServerAddress checks if virtual server's address is bound to dummy interface and, if so, unbinds it. - UnbindVirtualServerAddress(serv *VirtualServer, dev string) error // AddVirtualServer creates the specified virtual server. AddVirtualServer(*VirtualServer) error // UpdateVirtualServer updates an already existing virtual server. If the virtual server does not exist, return error. diff --git a/pkg/util/ipvs/ipvs_linux.go b/pkg/util/ipvs/ipvs_linux.go index fb3043bab3..3652a95663 100644 --- a/pkg/util/ipvs/ipvs_linux.go +++ b/pkg/util/ipvs/ipvs_linux.go @@ -30,8 +30,6 @@ import ( utilexec "k8s.io/utils/exec" ) -const cmdIP = "ip" - // runner implements Interface. type runner struct { exec utilexec.Interface @@ -51,34 +49,6 @@ func New(exec utilexec.Interface) Interface { } } -// EnsureVirtualServerAddressBind is part of Interface. -func (runner *runner) EnsureVirtualServerAddressBind(vs *VirtualServer, dummyDev string) (exist bool, err error) { - addr := vs.Address.String() - args := []string{"addr", "add", addr, "dev", dummyDev} - out, err := runner.exec.Command(cmdIP, args...).CombinedOutput() - if err != nil { - // "exit status 2" will be returned if the address is already bound to dummy device - if ee, ok := err.(utilexec.ExitError); ok { - if ee.Exited() && ee.ExitStatus() == 2 { - return true, nil - } - } - return false, fmt.Errorf("error bind address: %s to dummy interface: %s, err: %v: %s", vs.Address.String(), dummyDev, err, out) - } - return false, nil -} - -// UnbindVirtualServerAddress is part of Interface. -func (runner *runner) UnbindVirtualServerAddress(vs *VirtualServer, dummyDev string) error { - addr := vs.Address.String() - args := []string{"addr", "del", addr, "dev", dummyDev} - out, err := runner.exec.Command(cmdIP, args...).CombinedOutput() - if err != nil { - return fmt.Errorf("error unbind address: %s from dummy interface: %s, err: %v: %s", vs.Address.String(), dummyDev, err, out) - } - return nil -} - // AddVirtualServer is part of Interface. func (runner *runner) AddVirtualServer(vs *VirtualServer) error { eSvc, err := toBackendService(vs) diff --git a/pkg/util/ipvs/ipvs_linux_test.go b/pkg/util/ipvs/ipvs_linux_test.go index 8326a9256a..9e6743b87f 100644 --- a/pkg/util/ipvs/ipvs_linux_test.go +++ b/pkg/util/ipvs/ipvs_linux_test.go @@ -25,129 +25,9 @@ import ( "syscall" "testing" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/utils/exec" - fakeexec "k8s.io/utils/exec/testing" - "github.com/docker/libnetwork/ipvs" ) -const dummyDevice = "kube-ipvs0" - -func TestEnsureVirtualServerAddressBind(t *testing.T) { - tests := []VirtualServer{ - { - Address: net.ParseIP("10.20.30.40"), - Port: uint16(1234), - Protocol: string("TCP"), - }, - { - Address: net.ParseIP("2012::beef"), - Port: uint16(5678), - Protocol: string("UDP"), - }, - } - for i := range tests { - vs := &tests[i] - fcmd := fakeexec.FakeCmd{ - CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ - // Success. - func() ([]byte, error) { return []byte{}, nil }, - // Exists. - func() ([]byte, error) { return nil, &fakeexec.FakeExitError{Status: 2} }, - }, - } - fexec := fakeexec.FakeExec{ - CommandScript: []fakeexec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - }, - } - runner := New(&fexec) - // Success. - exists, err := runner.EnsureVirtualServerAddressBind(vs, dummyDevice) - if err != nil { - t.Errorf("expected success, got %v", err) - } - if exists { - t.Errorf("expected exists = false") - } - if fcmd.CombinedOutputCalls != 1 { - t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) - } - IP := tests[i].Address.String() - if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "addr", "add", IP, "dev", dummyDevice) { - t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) - } - // Exists. - exists, err = runner.EnsureVirtualServerAddressBind(vs, dummyDevice) - if err != nil { - t.Errorf("expected success, got %v", err) - } - if !exists { - t.Errorf("expected exists = true") - } - } -} - -func TestUnbindVirtualServerAddress(t *testing.T) { - tests := []VirtualServer{ - { - Address: net.ParseIP("2012::beef"), - Port: uint16(5678), - Protocol: string("UDP"), - }, - { - Address: net.ParseIP("10.20.30.40"), - Port: uint16(1234), - Protocol: string("TCP"), - }, - } - for i := range tests { - vs := &tests[i] - fcmd := fakeexec.FakeCmd{ - CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ - // Success. - func() ([]byte, error) { - return []byte{}, nil - }, - // Failure. - func() ([]byte, error) { - return nil, &fakeexec.FakeExitError{Status: 2} - }, - }, - } - fexec := fakeexec.FakeExec{ - CommandScript: []fakeexec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - }, - func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - }, - }, - } - runner := New(&fexec) - // Success. - err := runner.UnbindVirtualServerAddress(vs, dummyDevice) - if err != nil { - t.Errorf("expected success, got %v", err) - } - if fcmd.CombinedOutputCalls != 1 { - t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) - } - IP := tests[i].Address.String() - if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "addr", "del", IP, "dev", dummyDevice) { - t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) - } - // Failure. - err = runner.UnbindVirtualServerAddress(vs, dummyDevice) - if err == nil { - t.Errorf("expected failure") - } - } -} - func Test_toVirtualServer(t *testing.T) { Tests := []struct { ipvsService ipvs.Service diff --git a/pkg/util/ipvs/ipvs_unsupported.go b/pkg/util/ipvs/ipvs_unsupported.go index c2ccce9143..dd4d5b625b 100644 --- a/pkg/util/ipvs/ipvs_unsupported.go +++ b/pkg/util/ipvs/ipvs_unsupported.go @@ -36,14 +36,6 @@ func (runner *runner) Flush() error { return fmt.Errorf("IPVS not supported for this platform") } -func (runner *runner) EnsureVirtualServerAddressBind(*VirtualServer, string) (bool, error) { - return false, fmt.Errorf("IPVS not supported for this platform") -} - -func (runner *runner) UnbindVirtualServerAddress(*VirtualServer, string) error { - return fmt.Errorf("IPVS not supported for this platform") -} - func (runner *runner) AddVirtualServer(*VirtualServer) error { return fmt.Errorf("IPVS not supported for this platform") } diff --git a/pkg/util/ipvs/testing/fake.go b/pkg/util/ipvs/testing/fake.go index 455d26f6a8..b33f091213 100644 --- a/pkg/util/ipvs/testing/fake.go +++ b/pkg/util/ipvs/testing/fake.go @@ -55,16 +55,6 @@ func toServiceKey(serv *utilipvs.VirtualServer) serviceKey { } } -//EnsureVirtualServerAddressBind is an empty implementation -func (*FakeIPVS) EnsureVirtualServerAddressBind(serv *utilipvs.VirtualServer, dev string) (exist bool, err error) { - return true, nil -} - -//UnbindVirtualServerAddress is an empty implementation -func (*FakeIPVS) UnbindVirtualServerAddress(serv *utilipvs.VirtualServer, dev string) error { - return nil -} - //AddVirtualServer is a fake implementation, it simply adds the VirtualServer into the cache store. func (f *FakeIPVS) AddVirtualServer(serv *utilipvs.VirtualServer) error { if serv == nil {