ipvs part implementation

pull/6/head
m1093782566 2018-01-17 10:30:33 +08:00
parent ddfa04e8f4
commit c537ff54e7
3 changed files with 62 additions and 109 deletions

View File

@ -170,6 +170,11 @@ type Proxier struct {
// lbWhiteListIPSet is the hash:ip,port,net type ipset where stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets
// from the source CIDR visit ingress IP:Port can pass through.
lbWhiteListCIDRSet *IPSet
// Values are as a parameter to select the interfaces where nodeport works.
nodePortAddresses []string
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer utilproxy.NetworkInterfacer
}
// IPGetter helps get node network interface IP
@ -253,6 +258,7 @@ func NewProxier(ipt utiliptables.Interface,
recorder record.EventRecorder,
healthzServer healthcheck.HealthzUpdater,
scheduler string,
nodePortAddresses []string,
) (*Proxier, error) {
// Set the route_localnet sysctl we need for
if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
@ -336,6 +342,8 @@ func NewProxier(ipt utiliptables.Interface,
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
nodePortAddresses: nodePortAddresses,
networkInterfacer: utilproxy.RealNetwork{},
}
burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
@ -1134,6 +1142,7 @@ func (proxier *Proxier) syncProxyRules() {
}
var nodePortSet *IPSet
switch protocol {
case "tcp":
nodePortSet = proxier.nodePortSetTCP
case "udp":
@ -1152,31 +1161,43 @@ func (proxier *Proxier) syncProxyRules() {
}
// Build ipvs kernel routes for each node ip address
nodeIPs, err := proxier.ipGetter.NodeIPs()
nodeIPs := make([]net.IP, 0)
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
glog.Errorf("Failed to get node IP, err: %v", err)
} else {
for _, nodeIP := range nodeIPs {
// ipvs call
serv := &utilipvs.VirtualServer{
Address: nodeIP,
Port: uint16(svcInfo.nodePort),
Protocol: string(svcInfo.protocol),
Scheduler: proxier.ipvsScheduler,
}
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds)
}
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false); err == nil {
activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
}
} else {
glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
glog.Errorf("Failed to get node ip address matching nodeport cidr")
continue
}
for address := range addresses {
if !utilproxy.IsZeroCIDR(address) {
nodeIPs = append(nodeIPs, net.ParseIP(address))
continue
}
// zero cidr
nodeIPs, err = proxier.ipGetter.NodeIPs()
if err != nil {
glog.Errorf("Failed to list all node IPs from host, err: %v", err)
}
}
for _, nodeIP := range nodeIPs {
// ipvs call
serv := &utilipvs.VirtualServer{
Address: nodeIP,
Port: uint16(svcInfo.nodePort),
Protocol: string(svcInfo.protocol),
Scheduler: proxier.ipvsScheduler,
}
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds)
}
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false); err == nil {
activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
}
} else {
glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
}
}
}

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@ -146,6 +147,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
nodePortAddresses: make([]string, 0),
networkInterfacer: proxyutiltest.NewFakeNetwork(),
}
}
@ -386,6 +389,8 @@ func TestNodePort(t *testing.T) {
}),
)
fp.nodePortAddresses = []string{"0.0.0.0/0"}
fp.syncProxyRules()
// Check ipvs service and destinations
@ -445,6 +450,8 @@ func TestNodePortNoEndpoint(t *testing.T) {
)
makeEndpointsMap(fp)
fp.nodePortAddresses = []string{"0.0.0.0/0"}
fp.syncProxyRules()
// Check ipvs service and destinations
@ -847,15 +854,23 @@ func TestOnlyLocalNodePorts(t *testing.T) {
}),
)
itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0}
addrs := []net.Addr{proxyutiltest.AddrStruct{Val: "100.101.102.103/24"}}
itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0}
addrs1 := []net.Addr{proxyutiltest.AddrStruct{Val: "2001:db8::0/64"}}
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs)
fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1)
fp.nodePortAddresses = []string{"100.101.102.0/24", "2001:db8::0/64"}
fp.syncProxyRules()
// Expect 2 services and 1 destination
// Expect 3 services and 1 destination
services, err := ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
}
if len(services) != 2 {
t.Errorf("Expect 2 ipvs services, got %d", len(services))
if len(services) != 3 {
t.Errorf("Expect 3 ipvs services, got %d", len(services))
}
found := false
for _, svc := range services {

View File

@ -1,83 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"net"
)
// NetworkInterface defines an interface for several net library functions. Production
// code will forward to net library functions, and unit tests will override the methods
// for testing purposes.
type NetworkInterface interface {
Addrs(intf *net.Interface) ([]net.Addr, error)
Interfaces() ([]net.Interface, error)
}
// RealNetwork implements the NetworkInterface interface for production code, just
// wrapping the underlying net library function calls.
type RealNetwork struct{}
// Addrs wraps net.Interface.Addrs()
func (_ RealNetwork) Addrs(intf *net.Interface) ([]net.Addr, error) {
return intf.Addrs()
}
// Interfaces wraps net.Interfaces()
func (_ RealNetwork) Interfaces() ([]net.Interface, error) {
return net.Interfaces()
}
// RealNetwork implements the NetworkInterface interface for production code, just
// wrapping the underlying net library function calls.
type FakeNetwork struct {
NetworkInterfaces []net.Interface
// The key of map Addrs is interface name
Address map[string][]net.Addr
}
func NewFakeNetwork() *FakeNetwork {
return &FakeNetwork{
NetworkInterfaces: make([]net.Interface, 0),
Address: make(map[string][]net.Addr),
}
}
// AddInterfaceAddr create an interface and its associated addresses for FakeNetwork implementation.
func (f *FakeNetwork) AddInterfaceAddr(intf *net.Interface, addrs []net.Addr) {
f.NetworkInterfaces = append(f.NetworkInterfaces, *intf)
f.Address[intf.Name] = addrs
}
// Addrs is part of FakeNetwork interface.
func (f *FakeNetwork) Addrs(intf *net.Interface) ([]net.Addr, error) {
return f.Address[intf.Name], nil
}
// Interfaces is part of FakeNetwork interface.
func (f *FakeNetwork) Interfaces() ([]net.Interface, error) {
return f.NetworkInterfaces, nil
}
type AddrStruct struct{ Val string }
func (a AddrStruct) Network() string {
return a.Val
}
func (a AddrStruct) String() string {
return a.Val
}