From 48647fb9b5dce25411c5fbae511d2bf920a52f05 Mon Sep 17 00:00:00 2001 From: Anthony Howe Date: Fri, 3 Feb 2017 08:36:18 -0800 Subject: [PATCH] add tcp or udp proxy for service addresses --- cmd/kube-proxy/app/server.go | 15 +- pkg/proxy/types.go | 12 ++ pkg/proxy/winuserspace/proxier.go | 275 ++++++++++++++++---------- pkg/proxy/winuserspace/proxysocket.go | 14 +- pkg/util/netsh/netsh.go | 4 +- 5 files changed, 205 insertions(+), 115 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index da6727d8e5..d996dc7d9e 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -249,15 +249,15 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err userspace.CleanupLeftovers(iptInterface) } else { glog.V(0).Info("Using userspace Proxier.") - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := userspace.NewLoadBalancerRR() - // set EndpointsConfigHandler to our loadBalancer - endpointsHandler = loadBalancer var proxierUserspace proxy.ProxyProvider if runtime.GOOS == "windows" { + // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for + // our config.EndpointsConfigHandler. + loadBalancer := winuserspace.NewLoadBalancerRR() + // set EndpointsConfigHandler to our loadBalancer + endpointsHandler = loadBalancer proxierUserspace, err = winuserspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), @@ -268,6 +268,11 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err config.UDPIdleTimeout.Duration, ) } else { + // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for + // our config.EndpointsConfigHandler. + loadBalancer := userspace.NewLoadBalancerRR() + // set EndpointsConfigHandler to our loadBalancer + endpointsHandler = loadBalancer proxierUserspace, err = userspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index d9ff569cbb..72f68afc7b 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -47,3 +47,15 @@ type ServicePortName struct { func (spn ServicePortName) String() string { return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port) } + +// ServicePortPortalName carries a namespace + name + portname + portalip. This is the unique +// identfier for a load-balanced service. +type ServicePortPortalName struct { + types.NamespacedName + Port string + PortalIPName string +} + +func (spn ServicePortPortalName) String() string { + return fmt.Sprintf("%s:%s:%s", spn.NamespacedName.String(), spn.Port, spn.PortalIPName) +} diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index c9d61e70d3..32c9198361 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -19,7 +19,6 @@ package winuserspace import ( "fmt" "net" - "strconv" "strings" "sync" "sync/atomic" @@ -28,7 +27,7 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" - utilerrors "k8s.io/apimachinery/pkg/util/errors" + //utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/api" @@ -37,25 +36,25 @@ import ( ) type portal struct { - ip net.IP + ip string port int isExternal bool } type serviceInfo struct { - isAliveAtomic int32 // Only access this with atomic ops - portal portal - protocol api.Protocol - proxyPort int - socket proxySocket - timeout time.Duration - activeClients *clientCache - nodePort int - loadBalancerStatus api.LoadBalancerStatus + isAliveAtomic int32 // Only access this with atomic ops + portal portal + protocol api.Protocol + proxyPort int + socket proxySocket + timeout time.Duration + activeClients *clientCache + //nodePort int + //loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity stickyMaxAgeMinutes int // Deprecated, but required for back-compat (including e2e) - externalIPs []string + //externalIPs []string } func (info *serviceInfo) setAlive(b bool) { @@ -85,16 +84,16 @@ func logTimeout(err error) bool { type Proxier struct { loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortName]*serviceInfo + serviceMap map[proxy.ServicePortPortalName]*serviceInfo syncPeriod time.Duration udpIdleTimeout time.Duration portMapMutex sync.Mutex portMap map[portMapKey]*portMapValue numProxyLoops int32 // use atomic ops to access this; mostly for testing - listenIP net.IP - netsh netsh.Interface - hostIP net.IP - proxyPorts PortAllocator + //listenIP net.IP + netsh netsh.Interface + hostIP net.IP + //proxyPorts PortAllocator } // assert Proxier is a ProxyProvider @@ -114,7 +113,7 @@ func (k *portMapKey) String() string { // A value for the portMap type portMapValue struct { - owner proxy.ServicePortName + owner proxy.ServicePortPortalName socket interface { Close() error } @@ -153,31 +152,23 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interfac return nil, fmt.Errorf("failed to select a host interface: %v", err) } - proxyPorts := newPortAllocator(pr) - glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) - return createProxier(loadBalancer, listenIP, netsh, hostIP, proxyPorts, syncPeriod, udpIdleTimeout) + return createProxier(loadBalancer, listenIP, netsh, hostIP, syncPeriod, udpIdleTimeout) } -func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { - // convenient to pass nil for tests.. - if proxyPorts == nil { - proxyPorts = newPortAllocator(utilnet.PortRange{}) - } +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { return &Proxier{ loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + serviceMap: make(map[proxy.ServicePortPortalName]*serviceInfo), portMap: make(map[portMapKey]*portMapValue), syncPeriod: syncPeriod, udpIdleTimeout: udpIdleTimeout, - listenIP: listenIP, netsh: netsh, hostIP: hostIP, - proxyPorts: proxyPorts, }, nil } -// Sync is called to immediately synchronize the proxier state to iptables +// Sync is called to immediately synchronize the proxier state func (proxier *Proxier) Sync() { proxier.ensurePortals() proxier.cleanupStaleStickySessions() @@ -196,7 +187,7 @@ func (proxier *Proxier) SyncLoop() { // Ensure that portals exist for all services. func (proxier *Proxier) ensurePortals() { - proxier.mu.Lock() + /*proxier.mu.Lock() defer proxier.mu.Unlock() // NB: This does not remove rules that should not be present. for name, info := range proxier.serviceMap { @@ -204,52 +195,56 @@ func (proxier *Proxier) ensurePortals() { if err != nil { glog.Errorf("Failed to ensure portal for %q: %v", name, err) } - } + }*/ } // cleanupStaleStickySessions cleans up any stale sticky session records in the hash map. func (proxier *Proxier) cleanupStaleStickySessions() { proxier.mu.Lock() defer proxier.mu.Unlock() - for name := range proxier.serviceMap { + spnMap := make(map[proxy.ServicePortName]bool) + for k := range proxier.serviceMap { + spn := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: k.Namespace, Name: k.Name}, Port: k.Port} + spnMap[spn] = true + } + for name := range spnMap { proxier.loadBalancer.CleanupStaleStickySessions(name) } } // This assumes proxier.mu is not locked. -func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxy(service proxy.ServicePortPortalName, info *serviceInfo) error { proxier.mu.Lock() defer proxier.mu.Unlock() return proxier.stopProxyInternal(service, info) } // This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error { +func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortPortalName, info *serviceInfo) error { delete(proxier.serviceMap, service) info.setAlive(false) err := info.socket.Close() - port := info.socket.ListenPort() - proxier.proxyPorts.Release(port) return err } -func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*serviceInfo, bool) { +func (proxier *Proxier) getServiceInfo(service proxy.ServicePortPortalName) (*serviceInfo, bool) { proxier.mu.Lock() defer proxier.mu.Unlock() info, ok := proxier.serviceMap[service] return info, ok } -func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *serviceInfo) { +func (proxier *Proxier) setServiceInfo(service proxy.ServicePortPortalName, info *serviceInfo) { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.serviceMap[service] = info } +/* // addServiceOnPort starts listening for a new service, returning the serviceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortPortalName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -286,13 +281,14 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol return si, nil } +*/ // OnServiceUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // shutdown if missing from the update set. func (proxier *Proxier) OnServiceUpdate(services []api.Service) { glog.V(4).Infof("Received update notice: %+v", services) - activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set + activeServices := make(map[proxy.ServicePortPortalName]bool) // use a map as a set for i := range services { service := &services[i] @@ -304,84 +300,143 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} - activeServices[serviceName] = true - serviceIP := net.ParseIP(service.Spec.ClusterIP) - info, exists := proxier.getServiceInfo(serviceName) - // TODO: check health of the socket? What if ProxyLoop exited? - if exists && sameConfig(info, service, servicePort) { - // Nothing changed. - continue + // create a slice of source IPs + var listenIPs []string + listenIPs = append(listenIPs, service.Spec.ClusterIP) + + /*for _, ip := range service.Spec.ExternalIPs { + listenIPs = append(listenIPs, ip) } - if exists { - glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) - err := proxier.closePortal(serviceName, info) - if err != nil { - glog.Errorf("Failed to close portal for %q: %v", serviceName, err) + + for _, ip := range service.Status.LoadBalancer.Ingress { + listenIPs = append(listenIPs, ip) + } + + if int(service.Spec.Ports[i]) != 0 { + listenIPs = append(listenIPs, "") + }*/ + + for _, listenIP := range listenIPs { + serviceName := proxy.ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name, PortalIPName: listenIP} + activeServices[serviceName] = true + serviceIP := net.ParseIP(listenIP) + info, exists := proxier.getServiceInfo(serviceName) + if exists && sameConfig(info, service, servicePort) { + // Nothing changed. + continue } - err = proxier.stopProxy(serviceName, info) - if err != nil { - glog.Errorf("Failed to stop service %q: %v", serviceName, err) + if exists { + glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) + + // turn off the proxy + err := proxier.stopProxy(serviceName, info) + if err != nil { + glog.Errorf("Failed to stop service %q: %v", serviceName, err) + } + + // close the PortalProxy if it is not a node port + if serviceIP != nil { + args := proxier.netshIpv4AddressDeleteArgs(serviceIP) + if err := proxier.netsh.DeleteIPAddress(args); err != nil { + glog.Errorf("Failed to delete IP address for service %q, error %s", serviceName, err.Error()) + } + } else { + // TODO(ajh) release the node port + } } - } - proxyPort, err := proxier.proxyPorts.AllocateNext() - if err != nil { - glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err) - continue - } + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) - if err != nil { - glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) - continue - } - info.portal.ip = serviceIP - info.portal.port = int(servicePort.Port) - info.externalIPs = service.Spec.ExternalIPs - // Deep-copy in case the service instance changes - info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) - info.nodePort = int(servicePort.NodePort) - info.sessionAffinityType = service.Spec.SessionAffinity - glog.V(4).Infof("info: %#v", info) + // add the IP address if it is not a node port + if serviceIP != nil { + args := proxier.netshIpv4AddressAddArgs(serviceIP) + existed, err := proxier.netsh.EnsureIPAddress(args, serviceIP) + if err != nil { + glog.Errorf("Failed to add ip address for service %q, args:%v", serviceName, args) + continue + } + if !existed { + glog.V(3).Infof("Added ip address to fowarder interface for service %q on %s %s:%d", serviceName, servicePort.Protocol, serviceIP, int(servicePort.Port)) + } + } else { + // TODO(ajh) handle the node port + } - err = proxier.openPortal(serviceName, info) - if err != nil { - glog.Errorf("Failed to open portal for %q: %v", serviceName, err) + // add the listener, proxy + sock, err := newProxySocket(servicePort.Protocol, serviceIP, int(servicePort.Port)) + if err != nil { + glog.Errorf("failed to create a new proxy socket for service %q: %v", serviceName, err) + continue + } + si := &serviceInfo{ + isAliveAtomic: 1, + portal: portal{ip: listenIP, port: int(servicePort.Port), isExternal: false}, + protocol: servicePort.Protocol, + proxyPort: int(servicePort.Port), + socket: sock, + timeout: proxier.udpIdleTimeout, + activeClients: newClientCache(), + sessionAffinityType: service.Spec.SessionAffinity, // default + stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. + } + glog.V(4).Infof("info: %#v", si) + proxier.setServiceInfo(serviceName, si) + + glog.V(2).Infof("Proxying for service %q on %s port %d", serviceName, servicePort.Protocol, int(servicePort.Port)) + go func(service proxy.ServicePortPortalName, proxier *Proxier) { + defer runtime.HandleCrash() + atomic.AddInt32(&proxier.numProxyLoops, 1) + sock.ProxyLoop(service, si, proxier) + atomic.AddInt32(&proxier.numProxyLoops, -1) + }(serviceName, proxier) + } + if len(listenIPs) > 0 { + // only one loadbalancer per service endpoint + servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} + proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, 180) } - proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) } } proxier.mu.Lock() defer proxier.mu.Unlock() + + spnMap := make(map[proxy.ServicePortName]bool) + for name, info := range proxier.serviceMap { + spn := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: name.Namespace, Name: name.Name}, Port: name.Port} + spnMap[spn] = spnMap[spn] || activeServices[name] + if !activeServices[name] { + serviceIP := net.ParseIP(info.portal.ip) glog.V(1).Infof("Stopping service %q", name) - err := proxier.closePortal(name, info) - if err != nil { - glog.Errorf("Failed to close portal for %q: %v", name, err) - } - err = proxier.stopProxyInternal(name, info) + + // turn off the proxy + err := proxier.stopProxy(name, info) if err != nil { glog.Errorf("Failed to stop service %q: %v", name, err) } - proxier.loadBalancer.DeleteService(name) + + // close the PortalProxy if it is not a node port + if serviceIP != nil { + args := proxier.netshIpv4AddressDeleteArgs(serviceIP) + if err := proxier.netsh.DeleteIPAddress(args); err != nil { + glog.Errorf("Failed to stop service %q: %v", name, err) + } + } else { + // TODO(ajh) release the node port + } + } + } + // only delete spn if all listen ips show inactive + for k := range spnMap { + if !spnMap[k] { + proxier.loadBalancer.DeleteService(k) } } } func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { - if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { - return false - } - if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) { - return false - } - if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) { - return false - } - if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) { + if info.protocol != port.Protocol || info.portal.port != int(port.Port) { return false } if info.sessionAffinityType != service.Spec.SessionAffinity { @@ -402,6 +457,7 @@ func ipsEqual(lhs, rhs []string) bool { return true } +/* func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error { err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) if err != nil { @@ -429,7 +485,9 @@ func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceI } return nil } +*/ +/* func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { if protocol == api.ProtocolUDP { glog.Warningf("Not adding rule for %q on %s:%d as UDP protocol is not supported by netsh portproxy", name, portal.ip, portal.port) @@ -461,7 +519,9 @@ func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, prox return nil } +*/ +/* // claimNodePort marks a port as being owned by a particular service, or returns error if already claimed. // Idempotent: reclaiming with the same owner is not an error func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error { @@ -494,7 +554,9 @@ func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol } return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing) } +*/ +/* // releaseNodePort releases a claim on a port. Returns an error if the owner does not match the claim. // Tolerates release on an unclaimed port, to simplify . func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error { @@ -515,7 +577,9 @@ func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protoc existing.socket.Close() return nil } +*/ +/* func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { if protocol == api.ProtocolUDP { glog.Warningf("Not adding node port rule for %q on port %d as UDP protocol is not supported by netsh portproxy", name, nodePort) @@ -539,8 +603,9 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI } return nil -} +}*/ +/* func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error { // Collect errors and report them all at the end. el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) @@ -561,8 +626,9 @@ func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *service glog.Errorf("Some errors closing iptables portals for service %q", service) } return utilerrors.NewAggregate(el) -} +}*/ +/* func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { el := []error{} @@ -587,8 +653,9 @@ func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, pro } return el -} +}*/ +/* func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { el := []error{} @@ -603,7 +670,7 @@ func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxy } return el -} +}*/ func isLocalIP(ip net.IP) (bool, error) { addrs, err := net.InterfaceAddrs() @@ -633,6 +700,7 @@ func isClosedError(err error) bool { return strings.HasSuffix(err.Error(), "use of closed network connection") } +/* func (proxier *Proxier) netshPortProxyAddArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { args := []string{ "interface", "portproxy", "set", "v4tov4", @@ -646,6 +714,7 @@ func (proxier *Proxier) netshPortProxyAddArgs(destIP net.IP, destPort int, proxy return args } +*/ func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string { intName := proxier.netsh.GetInterfaceToAddIP() @@ -658,6 +727,7 @@ func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string { return args } +/* func (proxier *Proxier) netshPortProxyDeleteArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { args := []string{ "interface", "portproxy", "delete", "v4tov4", @@ -669,6 +739,7 @@ func (proxier *Proxier) netshPortProxyDeleteArgs(destIP net.IP, destPort int, pr return args } +*/ func (proxier *Proxier) netshIpv4AddressDeleteArgs(destIP net.IP) []string { intName := proxier.netsh.GetInterfaceToAddIP() diff --git a/pkg/proxy/winuserspace/proxysocket.go b/pkg/proxy/winuserspace/proxysocket.go index dbe0fb0a7a..63012387ca 100644 --- a/pkg/proxy/winuserspace/proxysocket.go +++ b/pkg/proxy/winuserspace/proxysocket.go @@ -26,6 +26,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" @@ -40,7 +41,7 @@ type proxySocket interface { // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier) + ProxyLoop(service proxy.ServicePortPortalName, info *serviceInfo, proxier *Proxier) // ListenPort returns the host port that the proxySocket is listening on ListenPort() int } @@ -86,10 +87,11 @@ func (tcp *tcpProxySocket) ListenPort() int { return tcp.port } -func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { +func tryConnect(service proxy.ServicePortPortalName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { sessionAffinityReset := false for _, dialTimeout := range endpointDialTimeout { - endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) + servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: service.Port} + endpoint, err := proxier.loadBalancer.NextEndpoint(servicePortName, srcAddr, sessionAffinityReset) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err @@ -111,7 +113,7 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { for { if !myInfo.isAlive() { // The service port was closed or replaced. @@ -197,7 +199,7 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { var buffer [4096]byte // 4KiB should be enough for most whole-packets for { if !myInfo.isAlive() { @@ -241,7 +243,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) { +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortPortalName, timeout time.Duration) (net.Conn, error) { activeClients.mu.Lock() defer activeClients.mu.Unlock() diff --git a/pkg/util/netsh/netsh.go b/pkg/util/netsh/netsh.go index 59d1a18b61..39e2058460 100644 --- a/pkg/util/netsh/netsh.go +++ b/pkg/util/netsh/netsh.go @@ -167,12 +167,12 @@ func (runner *runner) DeleteIPAddress(args []string) error { // GetInterfaceToAddIP returns the interface name where Service IP needs to be added // IP Address needs to be added for netsh portproxy to redirect traffic -// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (HNS Internal NIC)" is returned +// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (forwarder)" is returned func (runner *runner) GetInterfaceToAddIP() string { if iface := os.Getenv("INTERFACE_TO_ADD_SERVICE_IP"); len(iface) > 0 { return iface } - return "vEthernet (HNS Internal NIC)" + return "vEthernet (forwarder)" } // Restore is part of Interface.