Actually hold NodePorts open in kube-proxy

pull/6/head
Tim Hockin 2015-06-17 15:26:14 -07:00
parent 40ba1856bc
commit f5a9281a26
2 changed files with 54 additions and 13 deletions

View File

@ -73,7 +73,7 @@ type Proxier struct {
serviceMap map[proxy.ServicePortName]*serviceInfo
syncPeriod time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]proxy.ServicePortName
portMap map[portMapKey]*portMapValue
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
@ -94,6 +94,14 @@ func (k *portMapKey) String() string {
return fmt.Sprintf("%s/%d", k.protocol, k.port)
}
// A value for the portMap
type portMapValue struct {
owner proxy.ServicePortName
socket interface {
Close() error
}
}
var (
// ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
// the loopback address. May be checked for by callers of NewProxier to know whether
@ -146,7 +154,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
portMap: make(map[portMapKey]proxy.ServicePortName),
portMap: make(map[portMapKey]*portMapValue),
syncPeriod: syncPeriod,
listenIP: listenIP,
iptables: iptables,
@ -499,7 +507,7 @@ func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, prox
// Marks a port as being owned by a particular service, or returns error if already claimed.
// Idempotent: reclaiming with the same owner is not an error
func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner proxy.ServicePortName) error {
func (proxier *Proxier) claimNodePort(port int, protocol api.Protocol, owner proxy.ServicePortName) error {
proxier.portMapMutex.Lock()
defer proxier.portMapMutex.Unlock()
@ -508,19 +516,30 @@ func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner proxy.S
key := portMapKey{port: port, protocol: protocol}
existing, found := proxier.portMap[key]
if !found {
proxier.portMap[key] = owner
// Hold the actual port open, even though we use iptables to redirect
// it. This ensures that a) it's safe to take and b) that stays true.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
socket, err := newProxySocket(protocol, net.ParseIP("127.0.0.1"), port)
if err != nil {
return fmt.Errorf("can't open node port for %s: %v", key.String(), err)
}
proxier.portMap[key] = &portMapValue{owner: owner, socket: socket}
return nil
}
if existing == owner {
if existing.owner == owner {
// We are idempotent
return nil
}
return fmt.Errorf("Port conflict detected on port %v. %v vs %v", key, owner, existing)
return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing)
}
// Release a claim on a port. Returns an error if the owner does not match the claim.
// Tolerates release on an unclaimed port, to simplify .
func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner proxy.ServicePortName) error {
func (proxier *Proxier) releaseNodePort(port int, protocol api.Protocol, owner proxy.ServicePortName) error {
proxier.portMapMutex.Lock()
defer proxier.portMapMutex.Unlock()
@ -531,10 +550,11 @@ func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner proxy
glog.Infof("Ignoring release on unowned port: %v", key)
return nil
}
if existing != owner {
if existing.owner != owner {
return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing)
}
delete(proxier.portMap, key)
existing.socket.Close()
return nil
}
@ -542,7 +562,7 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI
// TODO: Do we want to allow containers to access public services? Probably yes.
// TODO: We could refactor this to be the same code as portal, but with IP == nil
err := proxier.claimPort(nodePort, protocol, name)
err := proxier.claimNodePort(nodePort, protocol, name)
if err != nil {
return err
}
@ -645,7 +665,7 @@ func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxy
el = append(el, err)
}
if err := proxier.releasePort(nodePort, protocol, name); err != nil {
if err := proxier.releaseNodePort(nodePort, protocol, name); err != nil {
el = append(el, err)
}

View File

@ -69,6 +69,7 @@ var _ = Describe("Services", func() {
}
}
})
// TODO: We get coverage of TCP/UDP and multi-port services through the DNS test. We should have a simpler test for multi-port TCP here.
It("should provide secure master service", func() {
_, err := c.Services(api.NamespaceDefault).Get("kubernetes")
@ -460,6 +461,16 @@ var _ = Describe("Services", func() {
By("hitting the pod through the service's NodePort")
ip := pickMinionIP(c)
testReachable(ip, nodePort)
hosts, err := NodeSSHHosts(c)
if err != nil {
Expect(err).NotTo(HaveOccurred())
}
cmd := fmt.Sprintf(`test -n "$(ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN)"`, nodePort)
_, _, code, err := SSH(cmd, hosts[0], testContext.Provider)
if code != 0 {
Failf("expected node port (%d) to be in use", nodePort)
}
})
It("should be able to change the type and nodeport settings of a service", func() {
@ -878,16 +889,26 @@ var _ = Describe("Services", func() {
if !ServiceNodePortRange.Contains(port.NodePort) {
Failf("got unexpected (out-of-range) port for new service: %v", service)
}
port1 := port.NodePort
nodePort := port.NodePort
By("deleting original service " + serviceName)
err = t.DeleteService(serviceName)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", port1))
hosts, err := NodeSSHHosts(c)
if err != nil {
Expect(err).NotTo(HaveOccurred())
}
cmd := fmt.Sprintf(`test -n "$(ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN)"`, nodePort)
_, _, code, err := SSH(cmd, hosts[0], testContext.Provider)
if code == 0 {
Failf("expected node port (%d) to not be in use", nodePort)
}
By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort))
service = t.BuildServiceSpec()
service.Spec.Type = api.ServiceTypeNodePort
service.Spec.Ports[0].NodePort = port1
service.Spec.Ports[0].NodePort = nodePort
service, err = t.CreateService(service)
Expect(err).NotTo(HaveOccurred())
})