cleanup proxier

pull/6/head
Anthony Howe 2017-02-08 07:35:55 -08:00
parent 48647fb9b5
commit 0e37f0a890
9 changed files with 359 additions and 920 deletions

View File

@ -47,15 +47,3 @@ type ServicePortName struct {
func (spn ServicePortName) String() string {
return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port)
}
// ServicePortPortalName carries a namespace + name + portname + portalip. This is the unique
// identfier for a load-balanced service.
type ServicePortPortalName struct {
types.NamespacedName
Port string
PortalIPName string
}
func (spn ServicePortPortalName) String() string {
return fmt.Sprintf("%s:%s:%s", spn.NamespacedName.String(), spn.Port, spn.PortalIPName)
}

View File

@ -12,10 +12,10 @@ go_library(
name = "go_default_library",
srcs = [
"loadbalancer.go",
"port_allocator.go",
"proxier.go",
"proxysocket.go",
"roundrobin.go",
"types.go",
"udp_server.go",
],
tags = ["automanaged"],
@ -36,7 +36,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"port_allocator_test.go",
"proxier_test.go",
"roundrobin_test.go",
],

View File

@ -1,158 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winuserspace
import (
"errors"
"math/big"
"math/rand"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
)
var (
errPortRangeNoPortsRemaining = errors.New("port allocation failed; there are no remaining ports left to allocate in the accepted range")
)
type PortAllocator interface {
AllocateNext() (int, error)
Release(int)
}
// randomAllocator is a PortAllocator implementation that allocates random ports, yielding
// a port value of 0 for every call to AllocateNext().
type randomAllocator struct{}
// AllocateNext always returns 0
func (r *randomAllocator) AllocateNext() (int, error) {
return 0, nil
}
// Release is a noop
func (r *randomAllocator) Release(_ int) {
// noop
}
// newPortAllocator builds PortAllocator for a given PortRange. If the PortRange is empty
// then a random port allocator is returned; otherwise, a new range-based allocator
// is returned.
func newPortAllocator(r net.PortRange) PortAllocator {
if r.Base == 0 {
return &randomAllocator{}
}
return newPortRangeAllocator(r, true)
}
const (
portsBufSize = 16
nextFreePortCooldown = 500 * time.Millisecond
allocateNextTimeout = 1 * time.Second
)
type rangeAllocator struct {
net.PortRange
ports chan int
used big.Int
lock sync.Mutex
rand *rand.Rand
}
func newPortRangeAllocator(r net.PortRange, autoFill bool) PortAllocator {
if r.Base == 0 || r.Size == 0 {
panic("illegal argument: may not specify an empty port range")
}
ra := &rangeAllocator{
PortRange: r,
ports: make(chan int, portsBufSize),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
if autoFill {
go wait.Forever(func() { ra.fillPorts() }, nextFreePortCooldown)
}
return ra
}
// fillPorts loops, always searching for the next free port and, if found, fills the ports buffer with it.
// this func blocks unless there are no remaining free ports.
func (r *rangeAllocator) fillPorts() {
for {
if !r.fillPortsOnce() {
return
}
}
}
func (r *rangeAllocator) fillPortsOnce() bool {
port := r.nextFreePort()
if port == -1 {
return false
}
r.ports <- port
return true
}
// nextFreePort finds a free port, first picking a random port. if that port is already in use
// then the port range is scanned sequentially until either a port is found or the scan completes
// unsuccessfully. an unsuccessful scan returns a port of -1.
func (r *rangeAllocator) nextFreePort() int {
r.lock.Lock()
defer r.lock.Unlock()
// choose random port
j := r.rand.Intn(r.Size)
if b := r.used.Bit(j); b == 0 {
r.used.SetBit(&r.used, j, 1)
return j + r.Base
}
// search sequentially
for i := j + 1; i < r.Size; i++ {
if b := r.used.Bit(i); b == 0 {
r.used.SetBit(&r.used, i, 1)
return i + r.Base
}
}
for i := 0; i < j; i++ {
if b := r.used.Bit(i); b == 0 {
r.used.SetBit(&r.used, i, 1)
return i + r.Base
}
}
return -1
}
func (r *rangeAllocator) AllocateNext() (port int, err error) {
select {
case port = <-r.ports:
case <-time.After(allocateNextTimeout):
err = errPortRangeNoPortsRemaining
}
return
}
func (r *rangeAllocator) Release(port int) {
port -= r.Base
if port < 0 || port >= r.Size {
return
}
r.lock.Lock()
defer r.lock.Unlock()
r.used.SetBit(&r.used, port, 0)
}

View File

@ -1,178 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winuserspace
import (
"reflect"
"testing"
"k8s.io/apimachinery/pkg/util/net"
)
func TestRangeAllocatorEmpty(t *testing.T) {
r := &net.PortRange{}
r.Set("0-0")
defer func() {
if rv := recover(); rv == nil {
t.Fatalf("expected panic because of empty port range: %#v", r)
}
}()
_ = newPortRangeAllocator(*r, true)
}
func TestRangeAllocatorFullyAllocated(t *testing.T) {
r := &net.PortRange{}
r.Set("1-1")
// Don't auto-fill ports, we'll manually turn the crank
pra := newPortRangeAllocator(*r, false)
a := pra.(*rangeAllocator)
// Fill in the one available port
if !a.fillPortsOnce() {
t.Fatalf("Expected to be able to fill ports")
}
// There should be no ports available
if a.fillPortsOnce() {
t.Fatalf("Expected to be unable to fill ports")
}
p, err := a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p != 1 {
t.Fatalf("unexpected allocated port: %d", p)
}
a.lock.Lock()
if bit := a.used.Bit(p - a.Base); bit != 1 {
a.lock.Unlock()
t.Fatalf("unexpected used bit for allocated port: %d", p)
}
a.lock.Unlock()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
a.Release(p)
a.lock.Lock()
if bit := a.used.Bit(p - a.Base); bit != 0 {
a.lock.Unlock()
t.Fatalf("unexpected used bit for allocated port: %d", p)
}
a.lock.Unlock()
// Fill in the one available port
if !a.fillPortsOnce() {
t.Fatalf("Expected to be able to fill ports")
}
p, err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p != 1 {
t.Fatalf("unexpected allocated port: %d", p)
}
a.lock.Lock()
if bit := a.used.Bit(p - a.Base); bit != 1 {
a.lock.Unlock()
t.Fatalf("unexpected used bit for allocated port: %d", p)
}
a.lock.Unlock()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
}
func TestRangeAllocator_RandomishAllocation(t *testing.T) {
r := &net.PortRange{}
r.Set("1-100")
pra := newPortRangeAllocator(*r, false)
a := pra.(*rangeAllocator)
// allocate all the ports
var err error
ports := make([]int, 100, 100)
for i := 0; i < 100; i++ {
if !a.fillPortsOnce() {
t.Fatalf("Expected to be able to fill ports")
}
ports[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if ports[i] < 1 || ports[i] > 100 {
t.Fatalf("unexpected allocated port: %d", ports[i])
}
a.lock.Lock()
if bit := a.used.Bit(ports[i] - a.Base); bit != 1 {
a.lock.Unlock()
t.Fatalf("unexpected used bit for allocated port: %d", ports[i])
}
a.lock.Unlock()
}
if a.fillPortsOnce() {
t.Fatalf("Expected to be unable to fill ports")
}
// release them all
for i := 0; i < 100; i++ {
a.Release(ports[i])
a.lock.Lock()
if bit := a.used.Bit(ports[i] - a.Base); bit != 0 {
a.lock.Unlock()
t.Fatalf("unexpected used bit for allocated port: %d", ports[i])
}
a.lock.Unlock()
}
// allocate the ports again
rports := make([]int, 100, 100)
for i := 0; i < 100; i++ {
if !a.fillPortsOnce() {
t.Fatalf("Expected to be able to fill ports")
}
rports[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if rports[i] < 1 || rports[i] > 100 {
t.Fatalf("unexpected allocated port: %d", rports[i])
}
a.lock.Lock()
if bit := a.used.Bit(rports[i] - a.Base); bit != 1 {
a.lock.Unlock()
t.Fatalf("unexpected used bit for allocated port: %d", rports[i])
}
a.lock.Unlock()
}
if a.fillPortsOnce() {
t.Fatalf("Expected to be unable to fill ports")
}
if reflect.DeepEqual(ports, rports) {
t.Fatalf("expected re-allocated ports to be in a somewhat random order")
}
}

View File

@ -27,7 +27,6 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
//utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/api"
@ -35,6 +34,9 @@ import (
"k8s.io/kubernetes/pkg/util/netsh"
)
const allAvailableInterfaces string = ""
const stickyMaxAgeMinutes int = 180 // TODO: parameterize this in the API.
type portal struct {
ip string
port int
@ -42,19 +44,13 @@ type portal struct {
}
type serviceInfo struct {
isAliveAtomic int32 // Only access this with atomic ops
portal portal
protocol api.Protocol
proxyPort int
socket proxySocket
timeout time.Duration
activeClients *clientCache
//nodePort int
//loadBalancerStatus api.LoadBalancerStatus
isAliveAtomic int32 // Only access this with atomic ops
portal portal
protocol api.Protocol
socket proxySocket
timeout time.Duration
activeClients *clientCache
sessionAffinityType api.ServiceAffinity
stickyMaxAgeMinutes int
// Deprecated, but required for back-compat (including e2e)
//externalIPs []string
}
func (info *serviceInfo) setAlive(b bool) {
@ -84,16 +80,14 @@ func logTimeout(err error) bool {
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortPortalName]*serviceInfo
serviceMap map[ServicePortPortalName]*serviceInfo
syncPeriod time.Duration
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
numProxyLoops int32 // use atomic ops to access this; mostly for testing
//listenIP net.IP
netsh netsh.Interface
hostIP net.IP
//proxyPorts PortAllocator
netsh netsh.Interface
hostIP net.IP
}
// assert Proxier is a ProxyProvider
@ -113,7 +107,7 @@ func (k *portMapKey) String() string {
// A value for the portMap
type portMapValue struct {
owner proxy.ServicePortPortalName
owner ServicePortPortalName
socket interface {
Close() error
}
@ -159,7 +153,7 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interfac
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortPortalName]*serviceInfo),
serviceMap: make(map[ServicePortPortalName]*serviceInfo),
portMap: make(map[portMapKey]*portMapValue),
syncPeriod: syncPeriod,
udpIdleTimeout: udpIdleTimeout,
@ -170,7 +164,6 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Inter
// Sync is called to immediately synchronize the proxier state
func (proxier *Proxier) Sync() {
proxier.ensurePortals()
proxier.cleanupStaleStickySessions()
}
@ -185,110 +178,146 @@ func (proxier *Proxier) SyncLoop() {
}
}
// Ensure that portals exist for all services.
func (proxier *Proxier) ensurePortals() {
/*proxier.mu.Lock()
defer proxier.mu.Unlock()
// NB: This does not remove rules that should not be present.
for name, info := range proxier.serviceMap {
err := proxier.openPortal(name, info)
if err != nil {
glog.Errorf("Failed to ensure portal for %q: %v", name, err)
}
}*/
}
// cleanupStaleStickySessions cleans up any stale sticky session records in the hash map.
func (proxier *Proxier) cleanupStaleStickySessions() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
spnMap := make(map[proxy.ServicePortName]bool)
for k := range proxier.serviceMap {
spn := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: k.Namespace, Name: k.Name}, Port: k.Port}
spnMap[spn] = true
}
for name := range spnMap {
proxier.loadBalancer.CleanupStaleStickySessions(name)
servicePortNameMap := make(map[proxy.ServicePortName]bool)
for name := range proxier.serviceMap {
servicePortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: name.Namespace,
Name: name.Name,
},
Port: name.Port,
}
if servicePortNameMap[servicePortName] == false {
// ensure cleanup sticky sessions only gets called once per serviceportname
servicePortNameMap[servicePortName] = true
proxier.loadBalancer.CleanupStaleStickySessions(servicePortName)
}
}
}
// This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service proxy.ServicePortPortalName, info *serviceInfo) error {
func (proxier *Proxier) stopProxy(service ServicePortPortalName, info *serviceInfo) error {
proxier.mu.Lock()
defer proxier.mu.Unlock()
return proxier.stopProxyInternal(service, info)
}
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortPortalName, info *serviceInfo) error {
func (proxier *Proxier) stopProxyInternal(service ServicePortPortalName, info *serviceInfo) error {
delete(proxier.serviceMap, service)
info.setAlive(false)
err := info.socket.Close()
return err
}
func (proxier *Proxier) getServiceInfo(service proxy.ServicePortPortalName) (*serviceInfo, bool) {
func (proxier *Proxier) getServiceInfo(service ServicePortPortalName) (*serviceInfo, bool) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
info, ok := proxier.serviceMap[service]
return info, ok
}
func (proxier *Proxier) setServiceInfo(service proxy.ServicePortPortalName, info *serviceInfo) {
func (proxier *Proxier) setServiceInfo(service ServicePortPortalName, info *serviceInfo) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.serviceMap[service] = info
}
/*
// addServiceOnPort starts listening for a new service, returning the serviceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortPortalName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil {
return nil, err
// addServicePortPortal starts listening for a new service, returning the serviceInfo.
// The timeout only applies to UDP connections, for now.
func (proxier *Proxier) addServicePortPortal(servicePortPortalName ServicePortPortalName, protocol api.Protocol, listenIP string, port int, timeout time.Duration) (*serviceInfo, error) {
var serviceIP net.IP
if listenIP != allAvailableInterfaces {
if serviceIP = net.ParseIP(listenIP); serviceIP == nil {
return nil, fmt.Errorf("could not parse ip '%q'", listenIP)
}
// add the IP address. Node port binds to all interfaces.
args := proxier.netshIpv4AddressAddArgs(serviceIP)
if existed, err := proxier.netsh.EnsureIPAddress(args, serviceIP); err != nil {
return nil, err
} else if !existed {
glog.V(3).Infof("Added ip address to fowarder interface for service %q at %s:%d/%s", servicePortPortalName, listenIP, port, protocol)
}
}
_, portStr, err := net.SplitHostPort(sock.Addr().String())
// add the listener, proxy
sock, err := newProxySocket(protocol, serviceIP, port)
if err != nil {
sock.Close()
return nil, err
}
portNum, err := strconv.Atoi(portStr)
if err != nil {
sock.Close()
return nil, err
}
si := &serviceInfo{
isAliveAtomic: 1,
proxyPort: portNum,
isAliveAtomic: 1,
portal: portal{
ip: listenIP,
port: port,
isExternal: false,
},
protocol: protocol,
socket: sock,
timeout: timeout,
activeClients: newClientCache(),
sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API.
}
proxier.setServiceInfo(service, si)
proxier.setServiceInfo(servicePortPortalName, si)
glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service proxy.ServicePortName, proxier *Proxier) {
glog.V(2).Infof("Proxying for service %q at %s:%d/%s", servicePortPortalName, listenIP, port, protocol)
go func(service ServicePortPortalName, proxier *Proxier) {
defer runtime.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
atomic.AddInt32(&proxier.numProxyLoops, -1)
}(service, proxier)
}(servicePortPortalName, proxier)
return si, nil
}
*/
func (proxier *Proxier) closeServicePortPortal(servicePortPortalName ServicePortPortalName, info *serviceInfo) error {
// turn off the proxy
if err := proxier.stopProxy(servicePortPortalName, info); err != nil {
return err
}
// close the PortalProxy by deleting the service IP address
if info.portal.ip != allAvailableInterfaces {
serviceIP := net.ParseIP(info.portal.ip)
args := proxier.netshIpv4AddressDeleteArgs(serviceIP)
if err := proxier.netsh.DeleteIPAddress(args); err != nil {
return err
}
}
return nil
}
// getListenIPPortMap returns a slice of all listen IPs for a service.
func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[string]int {
listenIPPortMap := make(map[string]int)
listenIPPortMap[service.Spec.ClusterIP] = listenPort
for _, ip := range service.Spec.ExternalIPs {
listenIPPortMap[ip] = listenPort
}
for _, ingress := range service.Status.LoadBalancer.Ingress {
listenIPPortMap[ingress.IP] = listenPort
}
if nodePort != 0 {
listenIPPortMap[allAvailableInterfaces] = nodePort
}
return listenIPPortMap
}
// OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := make(map[proxy.ServicePortPortalName]bool) // use a map as a set
activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set
for i := range services {
service := &services[i]
@ -300,393 +329,91 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
// create a slice of source IPs
var listenIPs []string
listenIPs = append(listenIPs, service.Spec.ClusterIP)
// create a slice of all the source IPs to use for service port portals
listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
protocol := servicePort.Protocol
/*for _, ip := range service.Spec.ExternalIPs {
listenIPs = append(listenIPs, ip)
}
for _, ip := range service.Status.LoadBalancer.Ingress {
listenIPs = append(listenIPs, ip)
}
if int(service.Spec.Ports[i]) != 0 {
listenIPs = append(listenIPs, "")
}*/
for _, listenIP := range listenIPs {
serviceName := proxy.ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name, PortalIPName: listenIP}
activeServices[serviceName] = true
serviceIP := net.ParseIP(listenIP)
info, exists := proxier.getServiceInfo(serviceName)
if exists && sameConfig(info, service, servicePort) {
for listenIP, listenPort := range listenIPPortMap {
servicePortPortalName := ServicePortPortalName{
NamespacedName: types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
},
Port: servicePort.Name,
PortalIPName: listenIP,
}
activeServicePortPortals[servicePortPortalName] = true
info, exists := proxier.getServiceInfo(servicePortPortalName)
if exists && sameConfig(info, service, protocol, listenPort) {
// Nothing changed.
continue
}
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
// turn off the proxy
err := proxier.stopProxy(serviceName, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
// close the PortalProxy if it is not a node port
if serviceIP != nil {
args := proxier.netshIpv4AddressDeleteArgs(serviceIP)
if err := proxier.netsh.DeleteIPAddress(args); err != nil {
glog.Errorf("Failed to delete IP address for service %q, error %s", serviceName, err.Error())
}
} else {
// TODO(ajh) release the node port
glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName)
if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
}
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
// add the IP address if it is not a node port
if serviceIP != nil {
args := proxier.netshIpv4AddressAddArgs(serviceIP)
existed, err := proxier.netsh.EnsureIPAddress(args, serviceIP)
if err != nil {
glog.Errorf("Failed to add ip address for service %q, args:%v", serviceName, args)
continue
}
if !existed {
glog.V(3).Infof("Added ip address to fowarder interface for service %q on %s %s:%d", serviceName, servicePort.Protocol, serviceIP, int(servicePort.Port))
}
} else {
// TODO(ajh) handle the node port
}
// add the listener, proxy
sock, err := newProxySocket(servicePort.Protocol, serviceIP, int(servicePort.Port))
glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol)
info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("failed to create a new proxy socket for service %q: %v", serviceName, err)
glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err)
continue
}
si := &serviceInfo{
isAliveAtomic: 1,
portal: portal{ip: listenIP, port: int(servicePort.Port), isExternal: false},
protocol: servicePort.Protocol,
proxyPort: int(servicePort.Port),
socket: sock,
timeout: proxier.udpIdleTimeout,
activeClients: newClientCache(),
sessionAffinityType: service.Spec.SessionAffinity, // default
stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API.
}
glog.V(4).Infof("info: %#v", si)
proxier.setServiceInfo(serviceName, si)
glog.V(2).Infof("Proxying for service %q on %s port %d", serviceName, servicePort.Protocol, int(servicePort.Port))
go func(service proxy.ServicePortPortalName, proxier *Proxier) {
defer runtime.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
atomic.AddInt32(&proxier.numProxyLoops, -1)
}(serviceName, proxier)
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(10).Infof("info: %#v", info)
}
if len(listenIPs) > 0 {
// only one loadbalancer per service endpoint
servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, 180)
if len(listenIPPortMap) > 0 {
// only one loadbalancer per service port portal
servicePortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
},
Port: servicePort.Name,
}
proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes)
}
}
}
for name, info := range proxier.serviceMap {
if !activeServicePortPortals[name] {
glog.V(1).Infof("Stopping service %q", name)
if err := proxier.closeServicePortPortal(name, info); err != nil {
glog.Errorf("Failed to close service port portal %q: %v", name, err)
}
}
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
spnMap := make(map[proxy.ServicePortName]bool)
for name, info := range proxier.serviceMap {
spn := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: name.Namespace, Name: name.Name}, Port: name.Port}
spnMap[spn] = spnMap[spn] || activeServices[name]
if !activeServices[name] {
serviceIP := net.ParseIP(info.portal.ip)
glog.V(1).Infof("Stopping service %q", name)
// turn off the proxy
err := proxier.stopProxy(name, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", name, err)
}
// close the PortalProxy if it is not a node port
if serviceIP != nil {
args := proxier.netshIpv4AddressDeleteArgs(serviceIP)
if err := proxier.netsh.DeleteIPAddress(args); err != nil {
glog.Errorf("Failed to stop service %q: %v", name, err)
}
} else {
// TODO(ajh) release the node port
}
// servicePortNameMap tracks all service port portals with the same name/port.
// A value of true means there is one or more service port portals with name/port pair.
servicePortNameMap := make(map[proxy.ServicePortName]bool)
for name := range proxier.serviceMap {
servicePortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: name.Namespace,
Name: name.Name,
},
Port: name.Port,
}
servicePortNameMap[servicePortName] = servicePortNameMap[servicePortName] || activeServicePortPortals[name]
}
// only delete spn if all listen ips show inactive
for k := range spnMap {
if !spnMap[k] {
proxier.loadBalancer.DeleteService(k)
// Only delete load balancer if all listen ips per name/port show inactive.
for name := range servicePortNameMap {
if !servicePortNameMap[name] {
proxier.loadBalancer.DeleteService(name)
}
}
}
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
if info.protocol != port.Protocol || info.portal.port != int(port.Port) {
return false
}
if info.sessionAffinityType != service.Spec.SessionAffinity {
return false
}
return true
}
func ipsEqual(lhs, rhs []string) bool {
if len(lhs) != len(rhs) {
return false
}
for i := range lhs {
if lhs[i] != rhs[i] {
return false
}
}
return true
}
/*
func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error {
err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
for _, publicIP := range info.externalIPs {
err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
for _, ingress := range info.loadBalancerStatus.Ingress {
if ingress.IP != "" {
err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
}
if info.nodePort != 0 {
err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
return nil
}
*/
/*
func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
if protocol == api.ProtocolUDP {
glog.Warningf("Not adding rule for %q on %s:%d as UDP protocol is not supported by netsh portproxy", name, portal.ip, portal.port)
return nil
}
// Add IP address to "vEthernet (HNSTransparent)" so that portproxy could be used to redirect the traffic
args := proxier.netshIpv4AddressAddArgs(portal.ip)
existed, err := proxier.netsh.EnsureIPAddress(args, portal.ip)
if err != nil {
glog.Errorf("Failed to add ip address for service %q, args:%v", name, args)
return err
}
if !existed {
glog.V(3).Infof("Added ip address to HNSTransparent interface for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)
}
args = proxier.netshPortProxyAddArgs(portal.ip, portal.port, proxyIP, proxyPort, name)
existed, err = proxier.netsh.EnsurePortProxyRule(args)
if err != nil {
glog.Errorf("Failed to run portproxy rule for service %q, args:%v", name, args)
return err
}
if !existed {
glog.V(3).Infof("Added portproxy rule for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)
}
return nil
}
*/
/*
// claimNodePort marks a port as being owned by a particular service, or returns error if already claimed.
// Idempotent: reclaiming with the same owner is not an error
func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error {
proxier.portMapMutex.Lock()
defer proxier.portMapMutex.Unlock()
// TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports
key := portMapKey{ip: ip.String(), port: port, protocol: protocol}
existing, found := proxier.portMap[key]
if !found {
// Hold the actual port open, even though we use iptables to redirect
// it. This ensures that a) it's safe to take and b) that stays true.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
socket, err := newProxySocket(protocol, ip, port)
if err != nil {
return fmt.Errorf("can't open node port for %s: %v", key.String(), err)
}
proxier.portMap[key] = &portMapValue{owner: owner, socket: socket}
glog.V(2).Infof("Claimed local port %s", key.String())
return nil
}
if existing.owner == owner {
// We are idempotent
return nil
}
return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing)
}
*/
/*
// releaseNodePort releases a claim on a port. Returns an error if the owner does not match the claim.
// Tolerates release on an unclaimed port, to simplify .
func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error {
proxier.portMapMutex.Lock()
defer proxier.portMapMutex.Unlock()
key := portMapKey{ip: ip.String(), port: port, protocol: protocol}
existing, found := proxier.portMap[key]
if !found {
// We tolerate this, it happens if we are cleaning up a failed allocation
glog.Infof("Ignoring release on unowned port: %v", key)
return nil
}
if existing.owner != owner {
return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing)
}
delete(proxier.portMap, key)
existing.socket.Close()
return nil
}
*/
/*
func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
if protocol == api.ProtocolUDP {
glog.Warningf("Not adding node port rule for %q on port %d as UDP protocol is not supported by netsh portproxy", name, nodePort)
return nil
}
err := proxier.claimNodePort(nil, nodePort, protocol, name)
if err != nil {
return err
}
args := proxier.netshPortProxyAddArgs(nil, nodePort, proxyIP, proxyPort, name)
existed, err := proxier.netsh.EnsurePortProxyRule(args)
if err != nil {
glog.Errorf("Failed to run portproxy rule for service %q", name)
return err
}
if !existed {
glog.Infof("Added portproxy rule for service %q on %s port %d", name, protocol, nodePort)
}
return nil
}*/
/*
func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error {
// Collect errors and report them all at the end.
el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
for _, publicIP := range info.externalIPs {
el = append(el, proxier.closeOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)...)
}
for _, ingress := range info.loadBalancerStatus.Ingress {
if ingress.IP != "" {
el = append(el, proxier.closeOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)...)
}
}
if info.nodePort != 0 {
el = append(el, proxier.closeNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
}
if len(el) == 0 {
glog.V(3).Infof("Closed iptables portals for service %q", service)
} else {
glog.Errorf("Some errors closing iptables portals for service %q", service)
}
return utilerrors.NewAggregate(el)
}*/
/*
func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error {
el := []error{}
if local, err := isLocalIP(portal.ip); err != nil {
el = append(el, fmt.Errorf("can't determine if IP is local, assuming not: %v", err))
} else if local {
if err := proxier.releaseNodePort(portal.ip, portal.port, protocol, name); err != nil {
el = append(el, err)
}
}
args := proxier.netshIpv4AddressDeleteArgs(portal.ip)
if err := proxier.netsh.DeleteIPAddress(args); err != nil {
glog.Errorf("Failed to delete IP address for service %q", name)
el = append(el, err)
}
args = proxier.netshPortProxyDeleteArgs(portal.ip, portal.port, proxyIP, proxyPort, name)
if err := proxier.netsh.DeletePortProxyRule(args); err != nil {
glog.Errorf("Failed to delete portproxy rule for service %q", name)
el = append(el, err)
}
return el
}*/
/*
func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error {
el := []error{}
args := proxier.netshPortProxyDeleteArgs(nil, nodePort, proxyIP, proxyPort, name)
if err := proxier.netsh.DeletePortProxyRule(args); err != nil {
glog.Errorf("Failed to delete portproxy rule for service %q", name)
el = append(el, err)
}
if err := proxier.releaseNodePort(nil, nodePort, protocol, name); err != nil {
el = append(el, err)
}
return el
}*/
func isLocalIP(ip net.IP) (bool, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return false, err
}
for i := range addrs {
intf, _, err := net.ParseCIDR(addrs[i].String())
if err != nil {
return false, err
}
if ip.Equal(intf) {
return true, nil
}
}
return false, nil
func sameConfig(info *serviceInfo, service *api.Service, protocol api.Protocol, listenPort int) bool {
return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity
}
func isTooManyFDsError(err error) bool {
@ -700,22 +427,6 @@ func isClosedError(err error) bool {
return strings.HasSuffix(err.Error(), "use of closed network connection")
}
/*
func (proxier *Proxier) netshPortProxyAddArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
args := []string{
"interface", "portproxy", "set", "v4tov4",
"listenPort=" + strconv.Itoa(destPort),
"connectaddress=" + proxyIP.String(),
"connectPort=" + strconv.Itoa(proxyPort),
}
if destIP != nil {
args = append(args, "listenaddress="+destIP.String())
}
return args
}
*/
func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string {
intName := proxier.netsh.GetInterfaceToAddIP()
args := []string{
@ -727,20 +438,6 @@ func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string {
return args
}
/*
func (proxier *Proxier) netshPortProxyDeleteArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
args := []string{
"interface", "portproxy", "delete", "v4tov4",
"listenPort=" + strconv.Itoa(destPort),
}
if destIP != nil {
args = append(args, "listenaddress="+destIP.String())
}
return args
}
*/
func (proxier *Proxier) netshIpv4AddressDeleteArgs(destIP net.IP) []string {
intName := proxier.netsh.GetInterfaceToAddIP()
args := []string{

View File

@ -198,6 +198,21 @@ func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time
t.Errorf("expected %d ProxyClients live, got %d", want, got)
}
func getPortNum(t *testing.T, addr string) int {
_, portStr, err := net.SplitHostPort(addr)
if err != nil {
t.Errorf("error getting port from %s", addr)
return 0
}
portNum, err := strconv.Atoi(portStr)
if err != nil {
t.Errorf("error getting port from %s", addr)
return 0
}
return portNum
}
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
@ -211,17 +226,19 @@ func TestTCPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
}
@ -238,17 +255,19 @@ func TestUDPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
}
@ -265,18 +284,20 @@ func TestUDPProxyTimeout(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
waitForNumProxyLoops(t, p, 1)
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
// When connecting to a UDP service endpoint, there should be a Conn for proxy.
waitForNumProxyClients(t, svcInfo, 1, time.Second)
// If conn has no activity for serviceInfo.timeout since last Read/Write, it should be closed because of timeout.
@ -301,24 +322,27 @@ func TestMultiPortProxy(t *testing.T) {
}},
}})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP}
svcInfoP, err := p.addServicePortPortal(servicePortPortalNameP, "TCP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfoP.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second)
servicePortPortalNameQ := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceQ.Namespace, Name: serviceQ.Name}, Port: serviceQ.Port, PortalIPName: listenIP}
svcInfoQ, err := p.addServicePortPortal(servicePortPortalNameQ, "UDP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoUDP(t, "127.0.0.1", svcInfoQ.proxyPort)
testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfoQ.socket.Addr().String()))
waitForNumProxyLoops(t, p, 2)
}
@ -328,7 +352,8 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
@ -336,41 +361,45 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
p.OnServiceUpdate([]api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{
Name: "p",
Port: 80,
Port: 0,
Protocol: "TCP",
}, {
Name: "q",
Port: 81,
Port: 0,
Protocol: "UDP",
}}},
}})
waitForNumProxyLoops(t, p, 2)
svcInfo, exists := p.getServiceInfo(serviceP)
servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP}
svcInfo, exists := p.getServiceInfo(servicePortPortalNameP)
if !exists {
t.Fatalf("can't find serviceInfo for %s", serviceP)
t.Fatalf("can't find serviceInfo for %s", servicePortPortalNameP)
}
if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 80 || svcInfo.protocol != "TCP" {
if svcInfo.portal.ip != "0.0.0.0" || svcInfo.portal.port != 0 || svcInfo.protocol != "TCP" {
t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo)
}
svcInfo, exists = p.getServiceInfo(serviceQ)
servicePortPortalNameQ := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceQ.Namespace, Name: serviceQ.Name}, Port: serviceQ.Port, PortalIPName: listenIP}
svcInfo, exists = p.getServiceInfo(servicePortPortalNameQ)
if !exists {
t.Fatalf("can't find serviceInfo for %s", serviceQ)
t.Fatalf("can't find serviceInfo for %s", servicePortPortalNameQ)
}
if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 81 || svcInfo.protocol != "UDP" {
if svcInfo.portal.ip != "0.0.0.0" || svcInfo.portal.port != 0 || svcInfo.protocol != "UDP" {
t.Errorf("unexpected serviceInfo for %s: %#v", serviceQ, svcInfo)
}
svcInfo, exists = p.getServiceInfo(serviceX)
servicePortPortalNameX := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceX.Namespace, Name: serviceX.Name}, Port: serviceX.Port, PortalIPName: listenIP}
svcInfo, exists = p.getServiceInfo(servicePortPortalNameX)
if exists {
t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo)
}
}
// Helper: Stops the proxy for the named service.
func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
func stopProxyByName(proxier *Proxier, service ServicePortPortalName) error {
info, found := proxier.getServiceInfo(service)
if !found {
return fmt.Errorf("unknown service: %s", service)
@ -391,32 +420,34 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
if !svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected true")
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, service)
stopProxyByName(p, servicePortPortalName)
if svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected false")
}
// Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
@ -435,26 +466,28 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, service)
stopProxyByName(p, servicePortPortalName)
// Wait for the port to really close.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
@ -473,17 +506,20 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
fmt.Println("here0")
conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
@ -491,7 +527,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
@ -510,17 +546,19 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
@ -528,7 +566,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
@ -546,17 +584,19 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
conn, err := net.Dial("tcp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
@ -564,7 +604,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
@ -573,17 +613,17 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p.OnServiceUpdate([]api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: int32(svcInfo.proxyPort),
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "TCP",
}}},
}})
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := p.getServiceInfo(servicePortPortalName)
if !exists {
t.Fatalf("can't find serviceInfo for %s", service)
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
}
@ -599,17 +639,19 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
conn, err := net.Dial("udp", joinHostPort("", getPortNum(t, svcInfo.socket.Addr().String())))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
@ -617,7 +659,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
@ -626,17 +668,17 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p.OnServiceUpdate([]api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: int32(svcInfo.proxyPort),
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "UDP",
}}},
}})
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := p.getServiceInfo(servicePortPortalName)
if !exists {
t.Fatalf("can't find serviceInfo")
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
}
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
}
@ -653,36 +695,38 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: 99,
Port: 0,
Protocol: "TCP",
}}},
}})
// Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := p.getServiceInfo(servicePortPortalName)
if !exists {
t.Fatalf("can't find serviceInfo")
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
// This is a bit async, but this should be sufficient.
time.Sleep(500 * time.Millisecond)
waitForNumProxyLoops(t, p, 1)
@ -701,13 +745,15 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "UDP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -715,21 +761,21 @@ func TestUDPProxyUpdatePort(t *testing.T) {
p.OnServiceUpdate([]api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: 99,
Port: 0,
Protocol: "UDP",
}}},
}})
// Wait for the socket to actually get free.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := p.getServiceInfo(servicePortPortalName)
if !exists {
t.Fatalf("can't find serviceInfo")
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
}
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoUDP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
}
@ -746,17 +792,19 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{
@ -767,19 +815,19 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
Port: int32(svcInfo.portal.port),
Protocol: "TCP",
}},
ClusterIP: svcInfo.portal.ip.String(),
ExternalIPs: []string{"4.3.2.1"},
ClusterIP: svcInfo.portal.ip,
ExternalIPs: []string{"0.0.0.0"},
},
}})
// Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := p.getServiceInfo(servicePortPortalName)
if !exists {
t.Fatalf("can't find serviceInfo")
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
// This is a bit async, but this should be sufficient.
time.Sleep(500 * time.Millisecond)
waitForNumProxyLoops(t, p, 1)
@ -797,28 +845,30 @@ func TestProxyUpdatePortal(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
listenIP := "0.0.0.0"
p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
servicePortPortalName := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port, PortalIPName: listenIP}
svcInfo, err := p.addServicePortPortal(servicePortPortalName, "TCP", listenIP, 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
Name: "p",
Port: int32(svcInfo.proxyPort),
Port: int32(svcInfo.portal.port),
Protocol: "TCP",
}}},
}})
_, exists := p.getServiceInfo(service)
_, exists := p.getServiceInfo(servicePortPortalName)
if exists {
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
}
@ -827,29 +877,29 @@ func TestProxyUpdatePortal(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
Name: "p",
Port: int32(svcInfo.proxyPort),
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "TCP",
}}},
}})
_, exists = p.getServiceInfo(service)
_, exists = p.getServiceInfo(servicePortPortalName)
if exists {
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
}
p.OnServiceUpdate([]api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: int32(svcInfo.proxyPort),
Port: int32(svcInfo.portal.port),
Protocol: "TCP",
}}},
}})
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
svcInfo, exists = p.getServiceInfo(service)
svcInfo, exists = p.getServiceInfo(servicePortPortalName)
if !exists {
t.Fatalf("service with ClusterIP set not found in the proxy")
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1)
}

View File

@ -41,7 +41,7 @@ type proxySocket interface {
// while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service proxy.ServicePortPortalName, info *serviceInfo, proxier *Proxier)
ProxyLoop(service ServicePortPortalName, info *serviceInfo, proxier *Proxier)
// ListenPort returns the host port that the proxySocket is listening on
ListenPort() int
}
@ -87,10 +87,16 @@ func (tcp *tcpProxySocket) ListenPort() int {
return tcp.port
}
func tryConnect(service proxy.ServicePortPortalName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
sessionAffinityReset := false
for _, dialTimeout := range endpointDialTimeout {
servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port}
servicePortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
},
Port: service.Port,
}
endpoint, err := proxier.loadBalancer.NextEndpoint(servicePortName, srcAddr, sessionAffinityReset)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
@ -113,7 +119,7 @@ func tryConnect(service proxy.ServicePortPortalName, srcAddr net.Addr, protocol
return nil, fmt.Errorf("failed to connect to an endpoint.")
}
func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) {
func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) {
for {
if !myInfo.isAlive() {
// The service port was closed or replaced.
@ -199,7 +205,7 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
}
func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) {
func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) {
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
if !myInfo.isAlive() {
@ -243,7 +249,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo
}
}
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortPortalName, timeout time.Duration) (net.Conn, error) {
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration) (net.Conn, error) {
activeClients.mu.Lock()
defer activeClients.mu.Unlock()

View File

@ -0,0 +1,35 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winuserspace
import (
"fmt"
"k8s.io/apimachinery/pkg/types"
)
// ServicePortPortalName carries a namespace + name + portname + portalip. This is the unique
// identifier for a windows service port portal.
type ServicePortPortalName struct {
types.NamespacedName
Port string
PortalIPName string
}
func (spn ServicePortPortalName) String() string {
return fmt.Sprintf("%s:%s:%s", spn.NamespacedName.String(), spn.Port, spn.PortalIPName)
}

View File

@ -167,12 +167,12 @@ func (runner *runner) DeleteIPAddress(args []string) error {
// GetInterfaceToAddIP returns the interface name where Service IP needs to be added
// IP Address needs to be added for netsh portproxy to redirect traffic
// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (forwarder)" is returned
// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (HNS Internal NIC)" is returned
func (runner *runner) GetInterfaceToAddIP() string {
if iface := os.Getenv("INTERFACE_TO_ADD_SERVICE_IP"); len(iface) > 0 {
return iface
}
return "vEthernet (forwarder)"
return "vEthernet (HNS Internal NIC)"
}
// Restore is part of Interface.