From 1ad4549f5fa0210c9528d88366793b04641a078e Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Fri, 22 May 2015 17:19:45 -0400 Subject: [PATCH] Proxy infrastructure for NodePorts A service with a NodePort set will listen on that port, on every node. This is both handy for some load balancers (AWS ELB) and for people that want to expose a service without using a load balancer. --- pkg/proxy/proxier.go | 221 ++++++++++++++++++++++++++++- pkg/proxy/proxier_test.go | 4 +- pkg/util/iptables/iptables.go | 13 +- pkg/util/iptables/iptables_test.go | 8 +- 4 files changed, 231 insertions(+), 15 deletions(-) diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 18717f84ce..31e568b1ff 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -41,6 +41,7 @@ type serviceInfo struct { socket proxySocket timeout time.Duration publicIPs []string // TODO: make this net.IP + nodePort int sessionAffinityType api.ServiceAffinity stickyMaxAgeMinutes int } @@ -61,12 +62,24 @@ type Proxier struct { loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap serviceMap map[ServicePortName]*serviceInfo + portMapMutex sync.Mutex + portMap map[portMapKey]ServicePortName numProxyLoops int32 // use atomic ops to access this; mostly for testing listenIP net.IP iptables iptables.Interface hostIP net.IP } +// A key for the portMap +type portMapKey struct { + port int + protocol api.Protocol +} + +func (k *portMapKey) String() string { + return fmt.Sprintf("%s/%d", k.protocol, k.port) +} + var ( // ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on // the loopback address. May be checked for by callers of NewProxier to know whether @@ -113,6 +126,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables return &Proxier{ loadBalancer: loadBalancer, serviceMap: make(map[ServicePortName]*serviceInfo), + portMap: make(map[portMapKey]ServicePortName), listenIP: listenIP, iptables: iptables, hostIP: hostIP, @@ -274,6 +288,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { info.portalIP = serviceIP info.portalPort = servicePort.Port info.publicIPs = service.Spec.PublicIPs + // TODO(justinsb): switch to servicePort.NodePort when that lands + info.nodePort = 0 info.sessionAffinityType = service.Spec.SessionAffinity glog.V(4).Infof("info: %+v", info) @@ -302,7 +318,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { } func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { - if info.protocol != port.Protocol || info.portalPort != port.Port { + // TODO(justinsb): switch to port.NodePort when that lands + if info.protocol != port.Protocol || info.portalPort != port.Port || info.nodePort != 0 /*port.NodePort*/ { return false } if !info.portalIP.Equal(net.ParseIP(service.Spec.PortalIP)) { @@ -340,13 +357,19 @@ func (proxier *Proxier) openPortal(service ServicePortName, info *serviceInfo) e return err } } + if info.nodePort != 0 { + err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service) + if err != nil { + return err + } + } return nil } func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error { // Handle traffic from containers. args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name) - existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...) + existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...) if err != nil { glog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerPortalChain, name) return err @@ -357,7 +380,7 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol // Handle traffic from the host. args = proxier.iptablesHostPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name) - existed, err = proxier.iptables.EnsureRule(iptables.TableNAT, iptablesHostPortalChain, args...) + existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...) if err != nil { glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name) return err @@ -368,12 +391,89 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol return nil } +// Marks a port as being owned by a particular service, or returns error if already claimed. +// Idempotent: reclaiming with the same owner is not an error +func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner ServicePortName) error { + proxier.portMapMutex.Lock() + defer proxier.portMapMutex.Unlock() + + // TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports + + key := portMapKey{port: port, protocol: protocol} + existing, found := proxier.portMap[key] + if !found { + proxier.portMap[key] = owner + return nil + } + if existing == owner { + // We are idempotent + return nil + } + return fmt.Errorf("Port conflict detected on port %v. %v vs %v", key, owner, existing) +} + +// Release a claim on a port. Returns an error if the owner does not match the claim. +// Tolerates release on an unclaimed port, to simplify . +func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner ServicePortName) error { + proxier.portMapMutex.Lock() + defer proxier.portMapMutex.Unlock() + + key := portMapKey{port: port, protocol: protocol} + existing, found := proxier.portMap[key] + if !found { + // We tolerate this, it happens if we are cleaning up a failed allocation + glog.Infof("Ignoring release on unowned port: %v", key) + return nil + } + if existing != owner { + return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing) + } + delete(proxier.portMap, key) + return nil +} + +func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error { + // TODO: Do we want to allow containers to access public services? Probably yes. + // TODO: We could refactor this to be the same code as portal, but with IP == nil + + err := proxier.claimPort(nodePort, protocol, name) + if err != nil { + return err + } + + // Handle traffic from containers. + args := proxier.iptablesContainerNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name) + existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerNodePortChain, args...) + if err != nil { + glog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerNodePortChain, name) + return err + } + if !existed { + glog.Infof("Opened iptables from-containers public port for service %q on %s port %d", name, protocol, nodePort) + } + + // Handle traffic from the host. + args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name) + existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostNodePortChain, args...) + if err != nil { + glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostNodePortChain, name) + return err + } + if !existed { + glog.Infof("Opened iptables from-host public port for service %q on %s port %d", name, protocol, nodePort) + } + return nil +} + func (proxier *Proxier) closePortal(service ServicePortName, info *serviceInfo) error { // Collect errors and report them all at the end. el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service) for _, publicIP := range info.publicIPs { el = append(el, proxier.closeOnePortal(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)...) } + if info.nodePort != 0 { + el = append(el, proxier.closeNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)...) + } if len(el) == 0 { glog.V(3).Infof("Closed iptables portals for service %q", service) } else { @@ -402,28 +502,94 @@ func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol return el } +func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) []error { + el := []error{} + + // Handle traffic from containers. + args := proxier.iptablesContainerNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name) + if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerNodePortChain, args...); err != nil { + glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerNodePortChain, name) + el = append(el, err) + } + + // Handle traffic from the host. + args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name) + if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostNodePortChain, args...); err != nil { + glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostNodePortChain, name) + el = append(el, err) + } + + if err := proxier.releasePort(nodePort, protocol, name); err != nil { + el = append(el, err) + } + + return el +} + // See comments in the *PortalArgs() functions for some details about why we -// use two chains. +// use two chains for portals. var iptablesContainerPortalChain iptables.Chain = "KUBE-PORTALS-CONTAINER" var iptablesHostPortalChain iptables.Chain = "KUBE-PORTALS-HOST" +// Chains for NodePort services +var iptablesContainerNodePortChain iptables.Chain = "KUBE-NODEPORT-CONTAINER" +var iptablesHostNodePortChain iptables.Chain = "KUBE-NODEPORT-HOST" + // Ensure that the iptables infrastructure we use is set up. This can safely be called periodically. func iptablesInit(ipt iptables.Interface) error { // TODO: There is almost certainly room for optimization here. E.g. If // we knew the portal_net CIDR we could fast-track outbound packets not // destined for a service. There's probably more, help wanted. + + // Danger - order of these rules matters here: + // + // We match portal rules first, then NodePort rules. For NodePort rules, we filter primarily on --dst-type LOCAL, + // because we want to listen on all local addresses, but don't match internet traffic with the same dst port number. + // + // There is one complication (per thockin): + // -m addrtype --dst-type LOCAL is what we want except that it is broken (by intent without foresight to our usecase) + // on at least GCE. Specifically, GCE machines have a daemon which learns what external IPs are forwarded to that + // machine, and configure a local route for that IP, making a match for --dst-type LOCAL when we don't want it to. + // Removing the route gives correct behavior until the daemon recreates it. + // Killing the daemon is an option, but means that any non-kubernetes use of the machine with external IP will be broken. + // + // This applies to IPs on GCE that are actually from a load-balancer; they will be categorized as LOCAL. + // _If_ the chains were in the wrong order, and the LB traffic had dst-port == a NodePort on some other service, + // the NodePort would take priority (incorrectly). + // This is unlikely (and would only affect outgoing traffic from the cluster to the load balancer, which seems + // doubly-unlikely), but we need to be careful to keep the rules in the right order. + args := []string{ /* portal_net matching could go here */ } + args = append(args, "-m", "comment", "--comment", "handle Portals; NOTE: this must be before the NodePort rules") if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil { return err } - if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesContainerPortalChain)); err != nil { + if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerPortalChain))...); err != nil { return err } if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostPortalChain); err != nil { return err } - if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesHostPortalChain)); err != nil { + if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostPortalChain))...); err != nil { return err } + + // This set of rules matches broadly (addrtype & destination port), and therefore must come after the portal rules + args = []string{"-m", "addrtype", "--dst-type", "LOCAL"} + args = append(args, "-m", "comment", "--comment", "handle service NodePorts; NOTE: this must be the last rule in the chain") + if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil { + return err + } + if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerNodePortChain))...); err != nil { + return err + } + if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil { + return err + } + if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostNodePortChain))...); err != nil { + return err + } + + // TODO: Verify order of rules. return nil } @@ -436,6 +602,12 @@ func iptablesFlush(ipt iptables.Interface) error { if err := ipt.FlushChain(iptables.TableNAT, iptablesHostPortalChain); err != nil { el = append(el, err) } + if err := ipt.FlushChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil { + el = append(el, err) + } + if err := ipt.FlushChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil { + el = append(el, err) + } if len(el) != 0 { glog.Errorf("Some errors flushing old iptables portals: %v", el) } @@ -464,9 +636,13 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol "--comment", service.String(), "-p", strings.ToLower(string(protocol)), "-m", strings.ToLower(string(protocol)), - "-d", fmt.Sprintf("%s/32", destIP.String()), "--dport", fmt.Sprintf("%d", destPort), } + + if destIP != nil { + args = append(args, "-d", fmt.Sprintf("%s/32", destIP.String())) + } + return args } @@ -550,6 +726,37 @@ func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, prot return args } +// Build a slice of iptables args for a from-container public-port rule. +// See iptablesContainerPortalArgs +// TODO: Should we just reuse iptablesContainerPortalArgs? +func (proxier *Proxier) iptablesContainerNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string { + args := iptablesCommonPortalArgs(nil, nodePort, protocol, service) + + if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) { + // TODO: Can we REDIRECT with IPv6? + args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort)) + } else { + // TODO: Can we DNAT with IPv6? + args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort))) + } + + return args +} + +// Build a slice of iptables args for a from-host public-port rule. +// See iptablesHostPortalArgs +// TODO: Should we just reuse iptablesHostPortalArgs? +func (proxier *Proxier) iptablesHostNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string { + args := iptablesCommonPortalArgs(nil, nodePort, protocol, service) + + if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) { + proxyIP = proxier.hostIP + } + // TODO: Can we DNAT with IPv6? + args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort))) + return args +} + func isTooManyFDsError(err error) bool { return strings.Contains(err.Error(), "too many open files") } diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 7e9c52039f..350da4b3f0 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -92,7 +92,7 @@ func (fake *fakeIptables) FlushChain(table iptables.Table, chain iptables.Chain) return nil } -func (fake *fakeIptables) EnsureRule(table iptables.Table, chain iptables.Chain, args ...string) (bool, error) { +func (fake *fakeIptables) EnsureRule(position iptables.RulePosition, table iptables.Table, chain iptables.Chain, args ...string) (bool, error) { return false, nil } @@ -810,3 +810,5 @@ func TestProxyUpdatePortal(t *testing.T) { } // TODO: Test UDP timeouts. + +// TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index 467cd1cefc..b3da523c0f 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -28,6 +28,13 @@ import ( "github.com/golang/glog" ) +type RulePosition string + +const ( + Prepend RulePosition = "-I" + Append RulePosition = "-A" +) + // An injectable interface for running iptables commands. Implementations must be goroutine-safe. type Interface interface { // EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true. @@ -37,7 +44,7 @@ type Interface interface { // DeleteChain deletes the specified chain. If the chain did not exist, return error. DeleteChain(table Table, chain Chain) error // EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true. - EnsureRule(table Table, chain Chain, args ...string) (bool, error) + EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) // DeleteRule checks if the specified rule is present and, if so, deletes it. DeleteRule(table Table, chain Chain, args ...string) error // IsIpv6 returns true if this is managing ipv6 tables @@ -126,7 +133,7 @@ func (runner *runner) DeleteChain(table Table, chain Chain) error { } // EnsureRule is part of Interface. -func (runner *runner) EnsureRule(table Table, chain Chain, args ...string) (bool, error) { +func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) { fullArgs := makeFullArgs(table, chain, args...) runner.mu.Lock() @@ -139,7 +146,7 @@ func (runner *runner) EnsureRule(table Table, chain Chain, args ...string) (bool if exists { return true, nil } - out, err := runner.run(opAppendRule, fullArgs) + out, err := runner.run(operation(position), fullArgs) if err != nil { return false, fmt.Errorf("error appending rule: %v: %s", err, out) } diff --git a/pkg/util/iptables/iptables_test.go b/pkg/util/iptables/iptables_test.go index 92efd78db5..63478a000a 100644 --- a/pkg/util/iptables/iptables_test.go +++ b/pkg/util/iptables/iptables_test.go @@ -176,7 +176,7 @@ func TestEnsureRuleAlreadyExists(t *testing.T) { }, } runner := New(&fexec, ProtocolIpv4) - exists, err := runner.EnsureRule(TableNAT, ChainOutput, "abc", "123") + exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) } @@ -212,7 +212,7 @@ func TestEnsureRuleNew(t *testing.T) { }, } runner := New(&fexec, ProtocolIpv4) - exists, err := runner.EnsureRule(TableNAT, ChainOutput, "abc", "123") + exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err != nil { t.Errorf("expected success, got %v", err) } @@ -245,7 +245,7 @@ func TestEnsureRuleErrorChecking(t *testing.T) { }, } runner := New(&fexec, ProtocolIpv4) - _, err := runner.EnsureRule(TableNAT, ChainOutput, "abc", "123") + _, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") } @@ -275,7 +275,7 @@ func TestEnsureRuleErrorCreating(t *testing.T) { }, } runner := New(&fexec, ProtocolIpv4) - _, err := runner.EnsureRule(TableNAT, ChainOutput, "abc", "123") + _, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123") if err == nil { t.Errorf("expected failure") }