fix duplicate unbind action

pull/6/head
choury 2017-09-04 13:41:44 +08:00
parent 6e2249b784
commit 00f8ae3540
14 changed files with 244 additions and 183 deletions

View File

@ -13,6 +13,7 @@ go_test(
deps = [
"//pkg/api:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/ipvs/testing:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library",
@ -30,7 +31,16 @@ go_test(
go_library(
name = "go_default_library",
srcs = ["proxier.go"],
srcs = [
"netlink.go",
"netlink_unsupported.go",
"proxier.go",
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"netlink_linux.go",
],
"//conditions:default": [],
}),
deps = [
"//pkg/api:go_default_library",
"//pkg/api/helper:go_default_library",
@ -52,7 +62,12 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"//vendor/github.com/vishvananda/netlink:go_default_library",
],
"//conditions:default": [],
}),
)
filegroup(
@ -64,6 +79,9 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/proxy/ipvs/testing:all-srcs",
],
tags = ["automanaged"],
)

25
pkg/proxy/ipvs/netlink.go Normal file
View File

@ -0,0 +1,25 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
// NetLinkHandle for revoke netlink interface
type NetLinkHandle interface {
// EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true.
EnsureAddressBind(address, devName string) (exist bool, err error)
// UnbindAddress unbind address from the interface
UnbindAddress(address, devName string) error
}

View File

@ -0,0 +1,72 @@
// +build linux
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
import (
"fmt"
"net"
"syscall"
"github.com/vishvananda/netlink"
)
type netlinkHandle struct {
netlink.Handle
}
// NewNetLinkHandle will crate a new netlinkHandle
func NewNetLinkHandle() NetLinkHandle {
return &netlinkHandle{netlink.Handle{}}
}
// EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true.
func (h *netlinkHandle) EnsureAddressBind(address, devName string) (exist bool, err error) {
dev, err := h.LinkByName(devName)
if err != nil {
return false, fmt.Errorf("error get interface: %s, err: %v", devName, err)
}
addr := net.ParseIP(address)
if addr == nil {
return false, fmt.Errorf("error parse ip address: %s", address)
}
if err := h.AddrAdd(dev, &netlink.Addr{IPNet: netlink.NewIPNet(addr)}); err != nil {
// "EEXIST" will be returned if the address is already bound to device
if err == syscall.Errno(syscall.EEXIST) {
return true, nil
}
return false, fmt.Errorf("error bind address: %s to interface: %s, err: %v", address, devName, err)
}
return false, nil
}
// UnbindAddress unbind address from the interface
func (h *netlinkHandle) UnbindAddress(address, devName string) error {
dev, err := h.LinkByName(devName)
if err != nil {
return fmt.Errorf("error get interface: %s, err: %v", devName, err)
}
addr := net.ParseIP(address)
if addr == nil {
return fmt.Errorf("error parse ip address: %s", address)
}
if err := h.AddrDel(dev, &netlink.Addr{IPNet: netlink.NewIPNet(addr)}); err != nil {
return fmt.Errorf("error unbind address: %s from interface: %s, err: %v", address, devName, err)
}
return nil
}

View File

@ -0,0 +1,41 @@
// +build !linux
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
import (
"fmt"
)
type emptyHandle struct {
}
// NewNetLinkHandle will create an EmptyHandle
func NewNetLinkHandle() NetLinkHandle {
return &emptyHandle{}
}
// EnsureAddressBind checks if address is bound to the interface and, if not, binds it. If the address is already bound, return true.
func (h *emptyHandle) EnsureAddressBind(address, devName string) (exist bool, err error) {
return false, fmt.Errorf("netlink not supported for this platform")
}
// UnbindAddress unbind address from the interface
func (h *emptyHandle) UnbindAddress(address, devName string) error {
return fmt.Errorf("netlink not supported for this platform")
}

View File

@ -134,6 +134,8 @@ type Proxier struct {
iptablesData *bytes.Buffer
natChains *bytes.Buffer
natRules *bytes.Buffer
// Added as a member to the struct to allow injection for testing.
netlinkHandle NetLinkHandle
}
// IPGetter helps get node network interface IP
@ -270,6 +272,7 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(),
}
burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
@ -1295,7 +1298,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
// bind service address to dummy interface even if service not changed,
// in case that service IP was removed by other processes
if bindAddr {
_, err := proxier.ipvs.EnsureVirtualServerAddressBind(vs, DefaultDummyDevice)
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
if err != nil {
glog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
return err
@ -1384,6 +1387,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
}
func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
unbindIPAddr := sets.NewString()
for cS := range currentServices {
if !atciveServices[cS] {
svc := currentServices[cS]
@ -1391,13 +1395,17 @@ func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, curre
if err != nil {
glog.Errorf("Failed to delete service, error: %v", err)
}
err = proxier.ipvs.UnbindVirtualServerAddress(svc, DefaultDummyDevice)
unbindIPAddr.Insert(svc.Address.String())
}
}
for _, addr := range unbindIPAddr.UnsortedList() {
err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice)
if err != nil {
glog.Errorf("Failed to unbind service from dummy interface, error: %v", err)
}
}
}
}
// linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT
// If not specify masqueradeAll or clusterCIDR or LB source range, won't create them.

View File

@ -31,6 +31,7 @@ import (
fakeexec "k8s.io/utils/exec/testing"
"k8s.io/apimachinery/pkg/util/sets"
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
@ -114,6 +115,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
}
}

View File

@ -0,0 +1,32 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"fake.go",
],
"//conditions:default": [],
}),
tags = ["automanaged"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,38 @@
// +build linux
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
//FakeNetlinkHandle mock implementation of proxy NetlinkHandle
type FakeNetlinkHandle struct {
}
//NewFakeNetlinkHandle will create a new FakeNetlinkHandle
func NewFakeNetlinkHandle() *FakeNetlinkHandle {
return &FakeNetlinkHandle{}
}
//EnsureAddressBind is a mock implementation
func (h *FakeNetlinkHandle) EnsureAddressBind(address, devName string) (exist bool, err error) {
return false, nil
}
//UnbindAddress is a mock implementation
func (h *FakeNetlinkHandle) UnbindAddress(address, devName string) error {
return nil
}

View File

@ -20,9 +20,6 @@ go_test(
deps = select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"//vendor/github.com/docker/libnetwork/ipvs:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
],
"//conditions:default": [],
}),

View File

@ -25,10 +25,6 @@ import (
type Interface interface {
// Flush clears all virtual servers in system. return occurred error immediately.
Flush() error
// EnsureVirtualServerAddressBind checks if virtual server's address is bound to dummy interface and, if not, binds it. If the address is already bound, return true.
EnsureVirtualServerAddressBind(serv *VirtualServer, dev string) (exist bool, err error)
// UnbindVirtualServerAddress checks if virtual server's address is bound to dummy interface and, if so, unbinds it.
UnbindVirtualServerAddress(serv *VirtualServer, dev string) error
// AddVirtualServer creates the specified virtual server.
AddVirtualServer(*VirtualServer) error
// UpdateVirtualServer updates an already existing virtual server. If the virtual server does not exist, return error.

View File

@ -30,8 +30,6 @@ import (
utilexec "k8s.io/utils/exec"
)
const cmdIP = "ip"
// runner implements Interface.
type runner struct {
exec utilexec.Interface
@ -51,34 +49,6 @@ func New(exec utilexec.Interface) Interface {
}
}
// EnsureVirtualServerAddressBind is part of Interface.
func (runner *runner) EnsureVirtualServerAddressBind(vs *VirtualServer, dummyDev string) (exist bool, err error) {
addr := vs.Address.String()
args := []string{"addr", "add", addr, "dev", dummyDev}
out, err := runner.exec.Command(cmdIP, args...).CombinedOutput()
if err != nil {
// "exit status 2" will be returned if the address is already bound to dummy device
if ee, ok := err.(utilexec.ExitError); ok {
if ee.Exited() && ee.ExitStatus() == 2 {
return true, nil
}
}
return false, fmt.Errorf("error bind address: %s to dummy interface: %s, err: %v: %s", vs.Address.String(), dummyDev, err, out)
}
return false, nil
}
// UnbindVirtualServerAddress is part of Interface.
func (runner *runner) UnbindVirtualServerAddress(vs *VirtualServer, dummyDev string) error {
addr := vs.Address.String()
args := []string{"addr", "del", addr, "dev", dummyDev}
out, err := runner.exec.Command(cmdIP, args...).CombinedOutput()
if err != nil {
return fmt.Errorf("error unbind address: %s from dummy interface: %s, err: %v: %s", vs.Address.String(), dummyDev, err, out)
}
return nil
}
// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
eSvc, err := toBackendService(vs)

View File

@ -25,129 +25,9 @@ import (
"syscall"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
"github.com/docker/libnetwork/ipvs"
)
const dummyDevice = "kube-ipvs0"
func TestEnsureVirtualServerAddressBind(t *testing.T) {
tests := []VirtualServer{
{
Address: net.ParseIP("10.20.30.40"),
Port: uint16(1234),
Protocol: string("TCP"),
},
{
Address: net.ParseIP("2012::beef"),
Port: uint16(5678),
Protocol: string("UDP"),
},
}
for i := range tests {
vs := &tests[i]
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success.
func() ([]byte, error) { return []byte{}, nil },
// Exists.
func() ([]byte, error) { return nil, &fakeexec.FakeExitError{Status: 2} },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec)
// Success.
exists, err := runner.EnsureVirtualServerAddressBind(vs, dummyDevice)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if exists {
t.Errorf("expected exists = false")
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
IP := tests[i].Address.String()
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "addr", "add", IP, "dev", dummyDevice) {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Exists.
exists, err = runner.EnsureVirtualServerAddressBind(vs, dummyDevice)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if !exists {
t.Errorf("expected exists = true")
}
}
}
func TestUnbindVirtualServerAddress(t *testing.T) {
tests := []VirtualServer{
{
Address: net.ParseIP("2012::beef"),
Port: uint16(5678),
Protocol: string("UDP"),
},
{
Address: net.ParseIP("10.20.30.40"),
Port: uint16(1234),
Protocol: string("TCP"),
},
}
for i := range tests {
vs := &tests[i]
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
// Success.
func() ([]byte, error) {
return []byte{}, nil
},
// Failure.
func() ([]byte, error) {
return nil, &fakeexec.FakeExitError{Status: 2}
},
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
},
func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
},
},
}
runner := New(&fexec)
// Success.
err := runner.UnbindVirtualServerAddress(vs, dummyDevice)
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
IP := tests[i].Address.String()
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "addr", "del", IP, "dev", dummyDevice) {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Failure.
err = runner.UnbindVirtualServerAddress(vs, dummyDevice)
if err == nil {
t.Errorf("expected failure")
}
}
}
func Test_toVirtualServer(t *testing.T) {
Tests := []struct {
ipvsService ipvs.Service

View File

@ -36,14 +36,6 @@ func (runner *runner) Flush() error {
return fmt.Errorf("IPVS not supported for this platform")
}
func (runner *runner) EnsureVirtualServerAddressBind(*VirtualServer, string) (bool, error) {
return false, fmt.Errorf("IPVS not supported for this platform")
}
func (runner *runner) UnbindVirtualServerAddress(*VirtualServer, string) error {
return fmt.Errorf("IPVS not supported for this platform")
}
func (runner *runner) AddVirtualServer(*VirtualServer) error {
return fmt.Errorf("IPVS not supported for this platform")
}

View File

@ -55,16 +55,6 @@ func toServiceKey(serv *utilipvs.VirtualServer) serviceKey {
}
}
//EnsureVirtualServerAddressBind is an empty implementation
func (*FakeIPVS) EnsureVirtualServerAddressBind(serv *utilipvs.VirtualServer, dev string) (exist bool, err error) {
return true, nil
}
//UnbindVirtualServerAddress is an empty implementation
func (*FakeIPVS) UnbindVirtualServerAddress(serv *utilipvs.VirtualServer, dev string) error {
return nil
}
//AddVirtualServer is a fake implementation, it simply adds the VirtualServer into the cache store.
func (f *FakeIPVS) AddVirtualServer(serv *utilipvs.VirtualServer) error {
if serv == nil {