Proxy infrastructure for NodePorts

A service with a NodePort set will listen on that port, on every node.

This is both handy for some load balancers (AWS ELB) and for people
that want to expose a service without using a load balancer.
pull/6/head
Justin Santa Barbara 2015-05-22 17:19:45 -04:00
parent 295d0564a2
commit 1ad4549f5f
4 changed files with 231 additions and 15 deletions

View File

@ -41,6 +41,7 @@ type serviceInfo struct {
socket proxySocket
timeout time.Duration
publicIPs []string // TODO: make this net.IP
nodePort int
sessionAffinityType api.ServiceAffinity
stickyMaxAgeMinutes int
}
@ -61,12 +62,24 @@ type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[ServicePortName]*serviceInfo
portMapMutex sync.Mutex
portMap map[portMapKey]ServicePortName
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
}
// A key for the portMap
type portMapKey struct {
port int
protocol api.Protocol
}
func (k *portMapKey) String() string {
return fmt.Sprintf("%s/%d", k.protocol, k.port)
}
var (
// ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
// the loopback address. May be checked for by callers of NewProxier to know whether
@ -113,6 +126,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[ServicePortName]*serviceInfo),
portMap: make(map[portMapKey]ServicePortName),
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
@ -274,6 +288,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
info.portalIP = serviceIP
info.portalPort = servicePort.Port
info.publicIPs = service.Spec.PublicIPs
// TODO(justinsb): switch to servicePort.NodePort when that lands
info.nodePort = 0
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(4).Infof("info: %+v", info)
@ -302,7 +318,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
}
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
if info.protocol != port.Protocol || info.portalPort != port.Port {
// TODO(justinsb): switch to port.NodePort when that lands
if info.protocol != port.Protocol || info.portalPort != port.Port || info.nodePort != 0 /*port.NodePort*/ {
return false
}
if !info.portalIP.Equal(net.ParseIP(service.Spec.PortalIP)) {
@ -340,13 +357,19 @@ func (proxier *Proxier) openPortal(service ServicePortName, info *serviceInfo) e
return err
}
}
if info.nodePort != 0 {
err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
return nil
}
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error {
// Handle traffic from containers.
args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...)
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerPortalChain, name)
return err
@ -357,7 +380,7 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
// Handle traffic from the host.
args = proxier.iptablesHostPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
existed, err = proxier.iptables.EnsureRule(iptables.TableNAT, iptablesHostPortalChain, args...)
existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name)
return err
@ -368,12 +391,89 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
return nil
}
// Marks a port as being owned by a particular service, or returns error if already claimed.
// Idempotent: reclaiming with the same owner is not an error
func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner ServicePortName) error {
proxier.portMapMutex.Lock()
defer proxier.portMapMutex.Unlock()
// TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports
key := portMapKey{port: port, protocol: protocol}
existing, found := proxier.portMap[key]
if !found {
proxier.portMap[key] = owner
return nil
}
if existing == owner {
// We are idempotent
return nil
}
return fmt.Errorf("Port conflict detected on port %v. %v vs %v", key, owner, existing)
}
// Release a claim on a port. Returns an error if the owner does not match the claim.
// Tolerates release on an unclaimed port, to simplify .
func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner ServicePortName) error {
proxier.portMapMutex.Lock()
defer proxier.portMapMutex.Unlock()
key := portMapKey{port: port, protocol: protocol}
existing, found := proxier.portMap[key]
if !found {
// We tolerate this, it happens if we are cleaning up a failed allocation
glog.Infof("Ignoring release on unowned port: %v", key)
return nil
}
if existing != owner {
return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing)
}
delete(proxier.portMap, key)
return nil
}
func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) error {
// TODO: Do we want to allow containers to access public services? Probably yes.
// TODO: We could refactor this to be the same code as portal, but with IP == nil
err := proxier.claimPort(nodePort, protocol, name)
if err != nil {
return err
}
// Handle traffic from containers.
args := proxier.iptablesContainerNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerNodePortChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerNodePortChain, name)
return err
}
if !existed {
glog.Infof("Opened iptables from-containers public port for service %q on %s port %d", name, protocol, nodePort)
}
// Handle traffic from the host.
args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostNodePortChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostNodePortChain, name)
return err
}
if !existed {
glog.Infof("Opened iptables from-host public port for service %q on %s port %d", name, protocol, nodePort)
}
return nil
}
func (proxier *Proxier) closePortal(service ServicePortName, info *serviceInfo) error {
// Collect errors and report them all at the end.
el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
for _, publicIP := range info.publicIPs {
el = append(el, proxier.closeOnePortal(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
}
if info.nodePort != 0 {
el = append(el, proxier.closeNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
}
if len(el) == 0 {
glog.V(3).Infof("Closed iptables portals for service %q", service)
} else {
@ -402,28 +502,94 @@ func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol
return el
}
func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name ServicePortName) []error {
el := []error{}
// Handle traffic from containers.
args := proxier.iptablesContainerNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerNodePortChain, args...); err != nil {
glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerNodePortChain, name)
el = append(el, err)
}
// Handle traffic from the host.
args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostNodePortChain, args...); err != nil {
glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostNodePortChain, name)
el = append(el, err)
}
if err := proxier.releasePort(nodePort, protocol, name); err != nil {
el = append(el, err)
}
return el
}
// See comments in the *PortalArgs() functions for some details about why we
// use two chains.
// use two chains for portals.
var iptablesContainerPortalChain iptables.Chain = "KUBE-PORTALS-CONTAINER"
var iptablesHostPortalChain iptables.Chain = "KUBE-PORTALS-HOST"
// Chains for NodePort services
var iptablesContainerNodePortChain iptables.Chain = "KUBE-NODEPORT-CONTAINER"
var iptablesHostNodePortChain iptables.Chain = "KUBE-NODEPORT-HOST"
// Ensure that the iptables infrastructure we use is set up. This can safely be called periodically.
func iptablesInit(ipt iptables.Interface) error {
// TODO: There is almost certainly room for optimization here. E.g. If
// we knew the portal_net CIDR we could fast-track outbound packets not
// destined for a service. There's probably more, help wanted.
// Danger - order of these rules matters here:
//
// We match portal rules first, then NodePort rules. For NodePort rules, we filter primarily on --dst-type LOCAL,
// because we want to listen on all local addresses, but don't match internet traffic with the same dst port number.
//
// There is one complication (per thockin):
// -m addrtype --dst-type LOCAL is what we want except that it is broken (by intent without foresight to our usecase)
// on at least GCE. Specifically, GCE machines have a daemon which learns what external IPs are forwarded to that
// machine, and configure a local route for that IP, making a match for --dst-type LOCAL when we don't want it to.
// Removing the route gives correct behavior until the daemon recreates it.
// Killing the daemon is an option, but means that any non-kubernetes use of the machine with external IP will be broken.
//
// This applies to IPs on GCE that are actually from a load-balancer; they will be categorized as LOCAL.
// _If_ the chains were in the wrong order, and the LB traffic had dst-port == a NodePort on some other service,
// the NodePort would take priority (incorrectly).
// This is unlikely (and would only affect outgoing traffic from the cluster to the load balancer, which seems
// doubly-unlikely), but we need to be careful to keep the rules in the right order.
args := []string{ /* portal_net matching could go here */ }
args = append(args, "-m", "comment", "--comment", "handle Portals; NOTE: this must be before the NodePort rules")
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesContainerPortalChain)); err != nil {
if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerPortalChain))...); err != nil {
return err
}
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesHostPortalChain)); err != nil {
if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostPortalChain))...); err != nil {
return err
}
// This set of rules matches broadly (addrtype & destination port), and therefore must come after the portal rules
args = []string{"-m", "addrtype", "--dst-type", "LOCAL"}
args = append(args, "-m", "comment", "--comment", "handle service NodePorts; NOTE: this must be the last rule in the chain")
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerNodePortChain))...); err != nil {
return err
}
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostNodePortChain))...); err != nil {
return err
}
// TODO: Verify order of rules.
return nil
}
@ -436,6 +602,12 @@ func iptablesFlush(ipt iptables.Interface) error {
if err := ipt.FlushChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
el = append(el, err)
}
if err := ipt.FlushChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil {
el = append(el, err)
}
if err := ipt.FlushChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil {
el = append(el, err)
}
if len(el) != 0 {
glog.Errorf("Some errors flushing old iptables portals: %v", el)
}
@ -464,9 +636,13 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol
"--comment", service.String(),
"-p", strings.ToLower(string(protocol)),
"-m", strings.ToLower(string(protocol)),
"-d", fmt.Sprintf("%s/32", destIP.String()),
"--dport", fmt.Sprintf("%d", destPort),
}
if destIP != nil {
args = append(args, "-d", fmt.Sprintf("%s/32", destIP.String()))
}
return args
}
@ -550,6 +726,37 @@ func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, prot
return args
}
// Build a slice of iptables args for a from-container public-port rule.
// See iptablesContainerPortalArgs
// TODO: Should we just reuse iptablesContainerPortalArgs?
func (proxier *Proxier) iptablesContainerNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string {
args := iptablesCommonPortalArgs(nil, nodePort, protocol, service)
if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
// TODO: Can we REDIRECT with IPv6?
args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort))
} else {
// TODO: Can we DNAT with IPv6?
args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
}
return args
}
// Build a slice of iptables args for a from-host public-port rule.
// See iptablesHostPortalArgs
// TODO: Should we just reuse iptablesHostPortalArgs?
func (proxier *Proxier) iptablesHostNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service ServicePortName) []string {
args := iptablesCommonPortalArgs(nil, nodePort, protocol, service)
if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
proxyIP = proxier.hostIP
}
// TODO: Can we DNAT with IPv6?
args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
return args
}
func isTooManyFDsError(err error) bool {
return strings.Contains(err.Error(), "too many open files")
}

View File

@ -92,7 +92,7 @@ func (fake *fakeIptables) FlushChain(table iptables.Table, chain iptables.Chain)
return nil
}
func (fake *fakeIptables) EnsureRule(table iptables.Table, chain iptables.Chain, args ...string) (bool, error) {
func (fake *fakeIptables) EnsureRule(position iptables.RulePosition, table iptables.Table, chain iptables.Chain, args ...string) (bool, error) {
return false, nil
}
@ -810,3 +810,5 @@ func TestProxyUpdatePortal(t *testing.T) {
}
// TODO: Test UDP timeouts.
// TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in

View File

@ -28,6 +28,13 @@ import (
"github.com/golang/glog"
)
type RulePosition string
const (
Prepend RulePosition = "-I"
Append RulePosition = "-A"
)
// An injectable interface for running iptables commands. Implementations must be goroutine-safe.
type Interface interface {
// EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true.
@ -37,7 +44,7 @@ type Interface interface {
// DeleteChain deletes the specified chain. If the chain did not exist, return error.
DeleteChain(table Table, chain Chain) error
// EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true.
EnsureRule(table Table, chain Chain, args ...string) (bool, error)
EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
// DeleteRule checks if the specified rule is present and, if so, deletes it.
DeleteRule(table Table, chain Chain, args ...string) error
// IsIpv6 returns true if this is managing ipv6 tables
@ -126,7 +133,7 @@ func (runner *runner) DeleteChain(table Table, chain Chain) error {
}
// EnsureRule is part of Interface.
func (runner *runner) EnsureRule(table Table, chain Chain, args ...string) (bool, error) {
func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) {
fullArgs := makeFullArgs(table, chain, args...)
runner.mu.Lock()
@ -139,7 +146,7 @@ func (runner *runner) EnsureRule(table Table, chain Chain, args ...string) (bool
if exists {
return true, nil
}
out, err := runner.run(opAppendRule, fullArgs)
out, err := runner.run(operation(position), fullArgs)
if err != nil {
return false, fmt.Errorf("error appending rule: %v: %s", err, out)
}

View File

@ -176,7 +176,7 @@ func TestEnsureRuleAlreadyExists(t *testing.T) {
},
}
runner := New(&fexec, ProtocolIpv4)
exists, err := runner.EnsureRule(TableNAT, ChainOutput, "abc", "123")
exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
}
@ -212,7 +212,7 @@ func TestEnsureRuleNew(t *testing.T) {
},
}
runner := New(&fexec, ProtocolIpv4)
exists, err := runner.EnsureRule(TableNAT, ChainOutput, "abc", "123")
exists, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err != nil {
t.Errorf("expected success, got %v", err)
}
@ -245,7 +245,7 @@ func TestEnsureRuleErrorChecking(t *testing.T) {
},
}
runner := New(&fexec, ProtocolIpv4)
_, err := runner.EnsureRule(TableNAT, ChainOutput, "abc", "123")
_, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
}
@ -275,7 +275,7 @@ func TestEnsureRuleErrorCreating(t *testing.T) {
},
}
runner := New(&fexec, ProtocolIpv4)
_, err := runner.EnsureRule(TableNAT, ChainOutput, "abc", "123")
_, err := runner.EnsureRule(Append, TableNAT, ChainOutput, "abc", "123")
if err == nil {
t.Errorf("expected failure")
}