diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 003fa1ff75..2b0b6690a1 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -144,13 +144,31 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo { // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { - mu sync.Mutex // protects serviceMap + mu sync.Mutex // protects the following fields serviceMap map[proxy.ServicePortName]*serviceInfo - syncPeriod time.Duration - iptables utiliptables.Interface + portsMap map[localPort]closeable haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event - MasqueradeAll bool + + // These are effectively const and do not need the mutex to be held. + syncPeriod time.Duration + iptables utiliptables.Interface + masqueradeAll bool +} + +type localPort struct { + desc string + ip string + port int + protocol string +} + +func (lp *localPort) String() string { + return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol) +} + +type closeable interface { + Close() error } // Proxier implements ProxyProvider @@ -161,8 +179,7 @@ var _ proxy.ProxyProvider = &Proxier{} // An error will be returned if iptables fails to update or acquire the initial lock. // Once a proxier is created, it will keep iptables up to date in the background and // will not terminate if a particular iptables call fails. -func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, MasqueradeAll bool) (*Proxier, error) { - +func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool) (*Proxier, error) { // Set the route_localnet sysctl we need for if err := setSysctl(sysctlRouteLocalnet, 1); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err) @@ -178,9 +195,10 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod return &Proxier{ serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + portsMap: make(map[localPort]closeable), syncPeriod: syncPeriod, iptables: ipt, - MasqueradeAll: MasqueradeAll, + masqueradeAll: masqueradeAll, }, nil } @@ -241,9 +259,7 @@ func (proxier *Proxier) SyncLoop() { func() { proxier.mu.Lock() defer proxier.mu.Unlock() - if err := proxier.syncProxyRules(); err != nil { - glog.Errorf("Failed to sync iptables rules: %v", err) - } + proxier.syncProxyRules() }() } } @@ -259,17 +275,24 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { for i := range allServices { service := &allServices[i] + svcName := types.NamespacedName{ + Namespace: service.Namespace, + Name: service.Name, + } // if ClusterIP is "None" or empty, skip proxying if !api.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) continue } for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} + serviceName := proxy.ServicePortName{ + NamespacedName: svcName, + Port: servicePort.Name, + } activeServices[serviceName] = true info, exists := proxier.serviceMap[serviceName] if exists && proxier.sameConfig(info, service, servicePort) { @@ -308,9 +331,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { } } - if err := proxier.syncProxyRules(); err != nil { - glog.Errorf("Failed to sync iptables rules: %v", err) - } + proxier.syncProxyRules() } // OnEndpointsUpdate takes in a slice of updated endpoints. @@ -371,9 +392,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { } } - if err := proxier.syncProxyRules(); err != nil { - glog.Errorf("Failed to sync iptables rules: %v", err) - } + proxier.syncProxyRules() } // used in OnEndpointsUpdate @@ -404,25 +423,26 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string { hpp := &endpoints[i] if isValidEndpoint(hpp) { result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) + } else { + glog.Warningf("got invalid endpoint: %+v", *hpp) } } return result } -// servicePortToServiceChain takes the ServicePortName for a service and +// servicePortChainName takes the ServicePortName for a service and // returns the associated iptables chain. This is computed by hashing (sha256) // then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do // this because Iptables Chain Names must be <= 28 chars long, and the longer // they are the harder they are to read. -func servicePortToServiceChain(s proxy.ServicePortName, protocol string) utiliptables.Chain { +func servicePortChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain { hash := sha256.Sum256([]byte(s.String() + protocol)) encoded := base32.StdEncoding.EncodeToString(hash[:]) return utiliptables.Chain("KUBE-SVC-" + encoded[:16]) } -// This is the same as servicePortToServiceChain but with the endpoint -// included. -func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain { +// This is the same as servicePortChainName but with the endpoint included. +func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain { hash := sha256.Sum256([]byte(s.String() + protocol + endpoint)) encoded := base32.StdEncoding.EncodeToString(hash[:]) return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) @@ -431,11 +451,11 @@ func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol stri // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held -func (proxier *Proxier) syncProxyRules() error { +func (proxier *Proxier) syncProxyRules() { // don't sync rules till we've received services and endpoints if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") - return nil + return } glog.V(3).Infof("Syncing iptables rules") @@ -444,12 +464,14 @@ func (proxier *Proxier) syncProxyRules() error { // Link the services chain. for _, chain := range inputChains { if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil { - return err + glog.Errorf("Failed to ensure that chain %s exists: %v", iptablesServicesChain, err) + return } comment := "kubernetes service portals" args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)} if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, chain, args...); err != nil { - return err + glog.Errorf("Failed to ensure that chain %s jumps to %s: %v", chain, iptablesServicesChain, err) + return } } // Link the output rules. @@ -457,7 +479,8 @@ func (proxier *Proxier) syncProxyRules() error { comment := "kubernetes service traffic requiring SNAT" args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"} if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - return err + glog.Errorf("Failed to ensure that chain %s obeys MASQUERADE mark: %v", utiliptables.ChainPostrouting, err) + return } } @@ -491,14 +514,17 @@ func (proxier *Proxier) syncProxyRules() error { } // Accumulate chains to keep. - activeChains := make(map[utiliptables.Chain]bool) // use a map as a set + activeChains := map[utiliptables.Chain]bool{} // use a map as a set + + // Accumulate new local ports that we have opened. + newLocalPorts := map[localPort]closeable{} // Build rules for each service. - for name, info := range proxier.serviceMap { - protocol := strings.ToLower(string(info.protocol)) + for svcName, svcInfo := range proxier.serviceMap { + protocol := strings.ToLower(string(svcInfo.protocol)) // Create the per-service chain, retaining counters if possible. - svcChain := servicePortToServiceChain(name, protocol) + svcChain := servicePortChainName(svcName, protocol) if chain, ok := existingChains[svcChain]; ok { writeLine(chainsLines, chain) } else { @@ -509,12 +535,12 @@ func (proxier *Proxier) syncProxyRules() error { // Capture the clusterIP. args := []string{ "-A", string(iptablesServicesChain), - "-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", name.String()), + "-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", svcName.String()), "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", info.clusterIP.String()), - "--dport", fmt.Sprintf("%d", info.port), + "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), + "--dport", fmt.Sprintf("%d", svcInfo.port), } - if proxier.MasqueradeAll { + if proxier.masqueradeAll { writeLine(rulesLines, append(args, "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) } @@ -522,13 +548,36 @@ func (proxier *Proxier) syncProxyRules() error { "-j", string(svcChain))...) // Capture externalIPs. - for _, externalIP := range info.externalIPs { + for _, externalIP := range svcInfo.externalIPs { + // If the "external" IP happens to be an IP that is local to this + // machine, hold the local port open so no other process can open it + // (because the socket might open but it would never work). + if local, err := isLocalIP(externalIP); err != nil { + glog.Errorf("can't determine if IP is local, assuming not: %v", err) + } else if local { + lp := localPort{ + desc: "externalIP for " + svcName.String(), + ip: externalIP, + port: svcInfo.port, + protocol: protocol, + } + if proxier.portsMap[lp] != nil { + newLocalPorts[lp] = proxier.portsMap[lp] + } else { + socket, err := openLocalPort(&lp) + if err != nil { + glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err) + continue + } + newLocalPorts[lp] = socket + } + } // We're holding the port, so it's OK to install iptables rules. args := []string{ "-A", string(iptablesServicesChain), - "-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", name.String()), + "-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", externalIP), - "--dport", fmt.Sprintf("%d", info.port), + "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets to external IPs. writeLine(rulesLines, append(args, @@ -551,14 +600,14 @@ func (proxier *Proxier) syncProxyRules() error { } // Capture load-balancer ingress. - for _, ingress := range info.loadBalancerStatus.Ingress { + for _, ingress := range svcInfo.loadBalancerStatus.Ingress { if ingress.IP != "" { args := []string{ "-A", string(iptablesServicesChain), - "-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", name.String()), + "-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", ingress.IP), - "--dport", fmt.Sprintf("%d", info.port), + "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets from external IPs. writeLine(rulesLines, append(args, @@ -571,20 +620,38 @@ func (proxier *Proxier) syncProxyRules() error { // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. - if info.nodePort != 0 { + if svcInfo.nodePort != 0 { + // Hold the local port open so no other process can open it + // (because the socket might open but it would never work). + lp := localPort{ + desc: "nodePort for " + svcName.String(), + ip: "", + port: svcInfo.nodePort, + protocol: protocol, + } + if proxier.portsMap[lp] != nil { + newLocalPorts[lp] = proxier.portsMap[lp] + } else { + socket, err := openLocalPort(&lp) + if err != nil { + glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) + continue + } + newLocalPorts[lp] = socket + } // We're holding the port, so it's OK to install iptables rules. // Nodeports need SNAT. writeLine(rulesLines, "-A", string(iptablesNodePortsChain), - "-m", "comment", "--comment", name.String(), + "-m", "comment", "--comment", svcName.String(), "-m", protocol, "-p", protocol, - "--dport", fmt.Sprintf("%d", info.nodePort), + "--dport", fmt.Sprintf("%d", svcInfo.nodePort), "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark)) // Jump to the service chain. writeLine(rulesLines, "-A", string(iptablesNodePortsChain), - "-m", "comment", "--comment", name.String(), + "-m", "comment", "--comment", svcName.String(), "-m", protocol, "-p", protocol, - "--dport", fmt.Sprintf("%d", info.nodePort), + "--dport", fmt.Sprintf("%d", svcInfo.nodePort), "-j", string(svcChain)) } @@ -592,9 +659,9 @@ func (proxier *Proxier) syncProxyRules() error { // can group rules together. endpoints := make([]string, 0) endpointChains := make([]utiliptables.Chain, 0) - for _, ep := range info.endpoints { + for _, ep := range svcInfo.endpoints { endpoints = append(endpoints, ep) - endpointChain := servicePortAndEndpointToServiceChain(name, protocol, ep) + endpointChain := servicePortEndpointChainName(svcName, protocol, ep) endpointChains = append(endpointChains, endpointChain) // Create the endpoint chain, retaining counters if possible. @@ -607,13 +674,13 @@ func (proxier *Proxier) syncProxyRules() error { } // First write session affinity rules, if applicable. - if info.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { for _, endpointChain := range endpointChains { writeLine(rulesLines, "-A", string(svcChain), - "-m", "comment", "--comment", name.String(), + "-m", "comment", "--comment", svcName.String(), "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", fmt.Sprintf("%d", info.stickyMaxAgeSeconds), "--reap", + "--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeSeconds), "--reap", "-j", string(endpointChain)) } } @@ -624,7 +691,7 @@ func (proxier *Proxier) syncProxyRules() error { // Balancing rules in the per-service chain. args := []string{ "-A", string(svcChain), - "-m", "comment", "--comment", name.String(), + "-m", "comment", "--comment", svcName.String(), } if i < (n - 1) { // Each rule is a probabilistic match. @@ -640,7 +707,7 @@ func (proxier *Proxier) syncProxyRules() error { // Rules in the per-endpoint chain. args = []string{ "-A", string(endpointChain), - "-m", "comment", "--comment", name.String(), + "-m", "comment", "--comment", svcName.String(), } // Handle traffic that loops back to the originator with SNAT. // Technically we only need to do this if the endpoint is on this @@ -652,7 +719,7 @@ func (proxier *Proxier) syncProxyRules() error { "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) // Update client-affinity lists. - if info.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. @@ -694,7 +761,23 @@ func (proxier *Proxier) syncProxyRules() error { // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. lines := append(chainsLines.Bytes(), rulesLines.Bytes()...) glog.V(3).Infof("Syncing rules: %s", lines) - return proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + err = proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + if err != nil { + glog.Errorf("Failed to sync iptables rules: %v", err) + // Revert new local ports. + for k, v := range newLocalPorts { + glog.Errorf("Closing local port %s", k.String()) + v.Close() + } + } else { + // Close old local ports and save new ones. + for k, v := range proxier.portsMap { + if newLocalPorts[k] == nil { + v.Close() + } + } + proxier.portsMap = newLocalPorts + } } // Join all words with spaces, terminate with newline and write to buf. @@ -736,3 +819,58 @@ func getChainLines(table utiliptables.Table, save []byte) map[utiliptables.Chain } return chainsMap } + +func isLocalIP(ip string) (bool, error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + return false, err + } + for i := range addrs { + intf, _, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + return false, err + } + if net.ParseIP(ip).Equal(intf) { + return true, nil + } + } + return false, nil +} + +func openLocalPort(lp *localPort) (closeable, error) { + // For ports on node IPs, open the actual port and hold it, even though we + // use iptables to redirect traffic. + // This ensures a) that it's safe to use that port and b) that (a) stays + // true. The risk is that some process on the node (e.g. sshd or kubelet) + // is using a port and we give that same port out to a Service. That would + // be bad because iptables would silently claim the traffic but the process + // would never know. + // NOTE: We should not need to have a real listen()ing socket - bind() + // should be enough, but I can't figure out a way to e2e test without + // it. Tools like 'ss' and 'netstat' do not show sockets that are + // bind()ed but not listen()ed, and at least the default debian netcat + // has no way to avoid about 10 seconds of retries. + var socket closeable + switch lp.protocol { + case "tcp": + listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) + if err != nil { + return nil, err + } + socket = listener + case "udp": + addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port))) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + socket = conn + default: + return nil, fmt.Errorf("unknown protocol %q", lp.protocol) + } + glog.V(2).Infof("Opened local port %s", lp.String()) + return socket, nil +}