Merge pull request #51686 from choury/fix_dup_unbind

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix duplicate unbind action in kube-proxy

**What this PR does / why we need it**:
Fix duplicate unbind action in kube-proxy. It will generate unnecessary error info If unbind multi-ports on one service .

**Which issue this PR fixes**:
fixes #51694

**Release-note**:
```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2017-10-15 17:38:45 -07:00 committed by GitHub
commit 02f0d92160
14 changed files with 244 additions and 183 deletions

View File

@ -13,6 +13,7 @@ go_test(
deps = [
"//pkg/api:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/ipvs/testing:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library",
@ -30,7 +31,16 @@ go_test(
go_library(
name = "go_default_library",
srcs = ["proxier.go"],
srcs = [
"netlink.go",
"netlink_unsupported.go",
"proxier.go",
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"netlink_linux.go",
],
"//conditions:default": [],
}),
deps = [
"//pkg/api:go_default_library",
"//pkg/api/helper:go_default_library",
@ -52,7 +62,12 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"//vendor/github.com/vishvananda/netlink:go_default_library",
],
"//conditions:default": [],
}),
)
filegroup(
@ -64,6 +79,9 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/proxy/ipvs/testing:all-srcs",
],
tags = ["automanaged"],
)

25
pkg/proxy/ipvs/netlink.go Normal file
View File

@ -0,0 +1,25 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
// NetLinkHandle for revoke netlink interface
type NetLinkHandle interface {
// EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true.
EnsureAddressBind(address, devName string) (exist bool, err error)
// UnbindAddress unbind address from the interface
UnbindAddress(address, devName string) error
}

View File

@ -0,0 +1,72 @@
// +build linux
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
import (
"fmt"
"net"
"syscall"
"github.com/vishvananda/netlink"
)
type netlinkHandle struct {
netlink.Handle
}
// NewNetLinkHandle will crate a new netlinkHandle
func NewNetLinkHandle() NetLinkHandle {
return &netlinkHandle{netlink.Handle{}}
}
// EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true.
func (h *netlinkHandle) EnsureAddressBind(address, devName string) (exist bool, err error) {
dev, err := h.LinkByName(devName)
if err != nil {
return false, fmt.Errorf("error get interface: %s, err: %v", devName, err)
}
addr := net.ParseIP(address)
if addr == nil {
return false, fmt.Errorf("error parse ip address: %s", address)
}
if err := h.AddrAdd(dev, &netlink.Addr{IPNet: netlink.NewIPNet(addr)}); err != nil {
// "EEXIST" will be returned if the address is already bound to device
if err == syscall.Errno(syscall.EEXIST) {
return true, nil
}
return false, fmt.Errorf("error bind address: %s to interface: %s, err: %v", address, devName, err)
}
return false, nil
}
// UnbindAddress unbind address from the interface
func (h *netlinkHandle) UnbindAddress(address, devName string) error {
dev, err := h.LinkByName(devName)
if err != nil {
return fmt.Errorf("error get interface: %s, err: %v", devName, err)
}
addr := net.ParseIP(address)
if addr == nil {
return fmt.Errorf("error parse ip address: %s", address)
}
if err := h.AddrDel(dev, &netlink.Addr{IPNet: netlink.NewIPNet(addr)}); err != nil {
return fmt.Errorf("error unbind address: %s from interface: %s, err: %v", address, devName, err)
}
return nil
}

View File

@ -0,0 +1,41 @@
// +build !linux
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
import (
"fmt"
)
type emptyHandle struct {
}
// NewNetLinkHandle will create an EmptyHandle
func NewNetLinkHandle() NetLinkHandle {
return &emptyHandle{}
}
// EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true.
func (h *emptyHandle) EnsureAddressBind(address, devName string) (exist bool, err error) {
return false, fmt.Errorf("netlink not supported for this platform")
}
// UnbindAddress unbind address from the interface
func (h *emptyHandle) UnbindAddress(address, devName string) error {
return fmt.Errorf("netlink not supported for this platform")
}

View File

@ -134,6 +134,8 @@ type Proxier struct {
iptablesData *bytes.Buffer
natChains *bytes.Buffer
natRules *bytes.Buffer
// Added as a member to the struct to allow injection for testing.
netlinkHandle NetLinkHandle
}
// IPGetter helps get node network interface IP
@ -270,6 +272,7 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(),
}
burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
@ -1292,7 +1295,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
// bind service address to dummy interface even if service not changed,
// in case that service IP was removed by other processes
if bindAddr {
_, err := proxier.ipvs.EnsureVirtualServerAddressBind(vs, DefaultDummyDevice)
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
if err != nil {
glog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
return err
@ -1381,6 +1384,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
}
func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
unbindIPAddr := sets.NewString()
for cS := range currentServices {
if !atciveServices[cS] {
svc := currentServices[cS]
@ -1388,10 +1392,14 @@ func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, curre
if err != nil {
glog.Errorf("Failed to delete service, error: %v", err)
}
err = proxier.ipvs.UnbindVirtualServerAddress(svc, DefaultDummyDevice)
if err != nil {
glog.Errorf("Failed to unbind service from dummy interface, error: %v", err)
}
unbindIPAddr.Insert(svc.Address.String())
}
}
for _, addr := range unbindIPAddr.UnsortedList() {
err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice)
if err != nil {
glog.Errorf("Failed to unbind service from dummy interface, error: %v", err)
}
}
}

View File

@ -31,6 +31,7 @@ import (
fakeexec "k8s.io/utils/exec/testing"
"k8s.io/apimachinery/pkg/util/sets"
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
@ -114,6 +115,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
}
}

View File

@ -0,0 +1,32 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"fake.go",
],
"//conditions:default": [],
}),
tags = ["automanaged"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,38 @@
// +build linux
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
//FakeNetlinkHandle mock implementation of proxy NetlinkHandle
type FakeNetlinkHandle struct {
}
//NewFakeNetlinkHandle will create a new FakeNetlinkHandle
func NewFakeNetlinkHandle() *FakeNetlinkHandle {
return &FakeNetlinkHandle{}
}
//EnsureAddressBind is a mock implementation
func (h *FakeNetlinkHandle) EnsureAddressBind(address, devName string) (exist bool, err error) {
return false, nil
}
//UnbindAddress is a mock implementation
func (h *FakeNetlinkHandle) UnbindAddress(address, devName string) error {
return nil
}

View File

@ -20,9 +20,6 @@ go_test(
deps = select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"//vendor/github.com/docker/libnetwork/ipvs:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
],
"//conditions:default": [],
}),

View File

@ -25,10 +25,6 @@ import (
type Interface interface {
// Flush clears all virtual servers in system. return occurred error immediately.
Flush() error
// EnsureVirtualServerAddressBind checks if virtual server's address is bound to dummy interface and, if not, binds it. If the address is already bound, return true.
EnsureVirtualServerAddressBind(serv *VirtualServer, dev string) (exist bool, err error)
// UnbindVirtualServerAddress checks if virtual server's address is bound to dummy interface and, if so, unbinds it.
UnbindVirtualServerAddress(serv *VirtualServer, dev string) error
// AddVirtualServer creates the specified virtual server.
AddVirtualServer(*VirtualServer) error
// UpdateVirtualServer updates an already existing virtual server. If the virtual server does not exist, return error.

View File

@ -30,8 +30,6 @@ import (
utilexec "k8s.io/utils/exec"
)
const cmdIP = "ip"
// runner implements Interface.
type runner struct {
exec utilexec.Interface
@ -51,34 +49,6 @@ func New(exec utilexec.Interface) Interface {
}
}
// EnsureVirtualServerAddressBind is part of Interface.
func (runner *runner) EnsureVirtualServerAddressBind(vs *VirtualServer, dummyDev string) (exist bool, err error) {
addr := vs.Address.String()
args := []string{"addr", "add", addr, "dev", dummyDev}
out, err := runner.exec.Command(cmdIP, args...).CombinedOutput()
if err != nil {
// "exit status 2" will be returned if the address is already bound to dummy device
if ee, ok := err.(utilexec.ExitError); ok {
if ee.Exited() && ee.ExitStatus() == 2 {
return true, nil
}
}
return false, fmt.Errorf("error bind address: %s to dummy interface: %s, err: %v: %s", vs.Address.String(), dummyDev, err, out)
}
return false, nil
}
// UnbindVirtualServerAddress is part of Interface.
func (runner *runner) UnbindVirtualServerAddress(vs *VirtualServer, dummyDev string) error {
addr := vs.Address.String()
args := []string{"addr", "del", addr, "dev", dummyDev}
out, err := runner.exec.Command(cmdIP, args...).CombinedOutput()
if err != nil {
return fmt.Errorf("error unbind address: %s from dummy interface: %s, err: %v: %s", vs.Address.String(), dummyDev, err, out)
}
return nil
}
// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
eSvc, err := toBackendService(vs)

View File

@ -25,129 +25,9 @@ import (
"syscall"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
"github.com/docker/libnetwork/ipvs"
)
const dummyDevice = "kube-ipvs0"
func TestEnsureVirtualServerAddressBind(t *testing.T) {
tests := []VirtualServer{
{
Address: net.ParseIP("10.20.30.40"),
Port: uint16(1234),
Protocol: string("TCP"),
},
{
Address: net.ParseIP("2012::beef"),
Port: uint16(5678),
Protocol: string("UDP"),
},
}
for i := range tests {
vs := &tests[i]
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success.
func() ([]byte, error) { return []byte{}, nil },
// Exists.
func() ([]byte, error) { return nil, &fakeexec.FakeExitError{Status: 2} },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Success.
exists, err := runner.EnsureVirtualServerAddressBind(vs, dummyDevice)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if exists {
t.Errorf("expected exists = false")
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
IP := tests[i].Address.String()
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "addr", "add", IP, "dev", dummyDevice) {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Exists.
exists, err = runner.EnsureVirtualServerAddressBind(vs, dummyDevice)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if !exists {
t.Errorf("expected exists = true")
}
}
}
func TestUnbindVirtualServerAddress(t *testing.T) {
tests := []VirtualServer{
{
Address: net.ParseIP("2012::beef"),
Port: uint16(5678),
Protocol: string("UDP"),
},
{
Address: net.ParseIP("10.20.30.40"),
Port: uint16(1234),
Protocol: string("TCP"),
},
}
for i := range tests {
vs := &tests[i]
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success.
func() ([]byte, error) {
return []byte{}, nil
},
// Failure.
func() ([]byte, error) {
return nil, &fakeexec.FakeExitError{Status: 2}
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
},
func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
},
},
}
runner := New(&fexec)
// Success.
err := runner.UnbindVirtualServerAddress(vs, dummyDevice)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
IP := tests[i].Address.String()
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "addr", "del", IP, "dev", dummyDevice) {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Failure.
err = runner.UnbindVirtualServerAddress(vs, dummyDevice)
if err == nil {
t.Errorf("expected failure")
}
}
}
func Test_toVirtualServer(t *testing.T) {
Tests := []struct {
ipvsService ipvs.Service

View File

@ -36,14 +36,6 @@ func (runner *runner) Flush() error {
return fmt.Errorf("IPVS not supported for this platform")
}
func (runner *runner) EnsureVirtualServerAddressBind(*VirtualServer, string) (bool, error) {
return false, fmt.Errorf("IPVS not supported for this platform")
}
func (runner *runner) UnbindVirtualServerAddress(*VirtualServer, string) error {
return fmt.Errorf("IPVS not supported for this platform")
}
func (runner *runner) AddVirtualServer(*VirtualServer) error {
return fmt.Errorf("IPVS not supported for this platform")
}

View File

@ -55,16 +55,6 @@ func toServiceKey(serv *utilipvs.VirtualServer) serviceKey {
}
}
//EnsureVirtualServerAddressBind is an empty implementation
func (*FakeIPVS) EnsureVirtualServerAddressBind(serv *utilipvs.VirtualServer, dev string) (exist bool, err error) {
return true, nil
}
//UnbindVirtualServerAddress is an empty implementation
func (*FakeIPVS) UnbindVirtualServerAddress(serv *utilipvs.VirtualServer, dev string) error {
return nil
}
//AddVirtualServer is a fake implementation, it simply adds the VirtualServer into the cache store.
func (f *FakeIPVS) AddVirtualServer(serv *utilipvs.VirtualServer) error {
if serv == nil {