Fix services namespace clash.

Serve identically names services in different namespaces on different external IP addresses.
pull/6/head
Quinton Hoole 2015-02-18 11:30:18 -08:00 committed by Quinton Hoole
parent fa23519387
commit cc72eaec3a
10 changed files with 429 additions and 278 deletions

View File

@ -20,6 +20,7 @@ import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
"github.com/golang/glog"
)
@ -81,7 +82,7 @@ type EndpointsConfig struct {
// It immediately runs the created EndpointsConfig.
func NewEndpointsConfig() *EndpointsConfig {
updates := make(chan struct{})
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)}
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
@ -112,7 +113,7 @@ func (c *EndpointsConfig) Config() []api.Endpoints {
type endpointsStore struct {
endpointLock sync.RWMutex
endpoints map[string]map[string]api.Endpoints
endpoints map[string]map[types.NamespacedName]api.Endpoints
updates chan<- struct{}
}
@ -120,26 +121,29 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
s.endpointLock.Lock()
endpoints := s.endpoints[source]
if endpoints == nil {
endpoints = make(map[string]api.Endpoints)
endpoints = make(map[types.NamespacedName]api.Endpoints)
}
update := change.(EndpointsUpdate)
switch update.Op {
case ADD:
glog.V(4).Infof("Adding new endpoint from source %s : %+v", source, update.Endpoints)
for _, value := range update.Endpoints {
endpoints[value.Name] = value
name := types.NamespacedName{value.Namespace, value.Name}
endpoints[name] = value
}
case REMOVE:
glog.V(4).Infof("Removing an endpoint %+v", update)
for _, value := range update.Endpoints {
delete(endpoints, value.Name)
name := types.NamespacedName{value.Namespace, value.Name}
delete(endpoints, name)
}
case SET:
glog.V(4).Infof("Setting endpoints %+v", update)
// Clear the old map entries by just creating a new map
endpoints = make(map[string]api.Endpoints)
endpoints = make(map[types.NamespacedName]api.Endpoints)
for _, value := range update.Endpoints {
endpoints[value.Name] = value
name := types.NamespacedName{value.Namespace, value.Name}
endpoints[name] = value
}
default:
glog.V(4).Infof("Received invalid update type: %v", update)
@ -176,7 +180,7 @@ type ServiceConfig struct {
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {
updates := make(chan struct{})
store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
@ -207,7 +211,7 @@ func (c *ServiceConfig) Config() []api.Service {
type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[string]api.Service
services map[string]map[types.NamespacedName]api.Service
updates chan<- struct{}
}
@ -215,26 +219,29 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock()
services := s.services[source]
if services == nil {
services = make(map[string]api.Service)
services = make(map[types.NamespacedName]api.Service)
}
update := change.(ServiceUpdate)
switch update.Op {
case ADD:
glog.V(4).Infof("Adding new service from source %s : %+v", source, update.Services)
for _, value := range update.Services {
services[value.Name] = value
name := types.NamespacedName{value.Namespace, value.Name}
services[name] = value
}
case REMOVE:
glog.V(4).Infof("Removing a service %+v", update)
for _, value := range update.Services {
delete(services, value.Name)
name := types.NamespacedName{value.Namespace, value.Name}
delete(services, name)
}
case SET:
glog.V(4).Infof("Setting services %+v", update)
// Clear the old map entries by just creating a new map
services = make(map[string]api.Service)
services = make(map[types.NamespacedName]api.Service)
for _, value := range update.Services {
services[value.Name] = value
name := types.NamespacedName{value.Namespace, value.Name}
services[name] = value
}
default:
glog.V(4).Infof("Received invalid update type: %v", update)

View File

@ -136,7 +136,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
handler := NewServiceHandlerMock()
handler.Wait(1)
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
@ -147,24 +147,24 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
handler.Wait(1)
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(1)
channel <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}})
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}})
handler.Wait(1)
channel <- serviceUpdate3
services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}})
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}})
handler.Wait(1)
channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]}
@ -180,8 +180,8 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
}
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(2)
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
@ -197,8 +197,8 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T
handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(2)
handler2.Wait(2)
channelOne <- serviceUpdate1
@ -217,11 +217,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
})
handler.Wait(2)
@ -243,11 +243,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
})
handler.Wait(2)
@ -261,7 +261,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foobar"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}},
})
handler.Wait(1)
@ -273,7 +273,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint7"}},
})
handler.Wait(1)
@ -284,7 +284,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2.ValidateEndpoints(t, endpoints)
// Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}})
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}})
handler.Wait(1)
handler2.Wait(1)
channelTwo <- endpointsUpdate2

View File

@ -20,13 +20,14 @@ import (
"net"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
// LoadBalancer is an interface for distributing incoming requests to service endpoints.
type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given
// service and source address.
NextEndpoint(service string, srcAddr net.Addr) (string, error)
NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service string)
NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error)
NewService(service types.NamespacedName, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service types.NamespacedName)
}

View File

@ -27,6 +27,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
@ -58,7 +59,7 @@ type proxySocket interface {
// while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, info *serviceInfo, proxier *Proxier)
ProxyLoop(service types.NamespacedName, info *serviceInfo, proxier *Proxier)
}
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
@ -67,7 +68,7 @@ type tcpProxySocket struct {
net.Listener
}
func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
func tryConnect(service types.NamespacedName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
for _, retryTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr)
if err != nil {
@ -87,7 +88,7 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox
return nil, fmt.Errorf("failed to connect to an endpoint.")
}
func (tcp *tcpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) {
func (tcp *tcpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *serviceInfo, proxier *Proxier) {
for {
if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo {
// The service port was closed or replaced.
@ -162,7 +163,7 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
}
func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) {
func (udp *udpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *serviceInfo, proxier *Proxier) {
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
@ -207,7 +208,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxie
}
}
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service string, timeout time.Duration) (net.Conn, error) {
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service types.NamespacedName, timeout time.Duration) (net.Conn, error) {
activeClients.mu.Lock()
defer activeClients.mu.Unlock()
@ -303,7 +304,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
serviceMap map[types.NamespacedName]*serviceInfo
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
@ -345,7 +346,7 @@ func CreateProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo),
serviceMap: make(map[types.NamespacedName]*serviceInfo),
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
@ -393,26 +394,26 @@ func (proxier *Proxier) cleanupStaleStickySessions() {
}
// This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error {
func (proxier *Proxier) stopProxy(service types.NamespacedName, info *serviceInfo) error {
proxier.mu.Lock()
defer proxier.mu.Unlock()
return proxier.stopProxyInternal(service, info)
}
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error {
func (proxier *Proxier) stopProxyInternal(service types.NamespacedName, info *serviceInfo) error {
delete(proxier.serviceMap, service)
return info.socket.Close()
}
func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) {
func (proxier *Proxier) getServiceInfo(service types.NamespacedName) (*serviceInfo, bool) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
info, ok := proxier.serviceMap[service]
return info, ok
}
func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
func (proxier *Proxier) setServiceInfo(service types.NamespacedName, info *serviceInfo) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.serviceMap[service] = info
@ -421,7 +422,7 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
// addServiceOnPort starts listening for a new service, returning the serviceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
func (proxier *Proxier) addServiceOnPort(service types.NamespacedName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil {
return nil, err
@ -447,7 +448,7 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol,
proxier.setServiceInfo(service, si)
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, proxier *Proxier) {
go func(service types.NamespacedName, proxier *Proxier) {
defer util.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
@ -465,30 +466,31 @@ const udpIdleTimeout = 1 * time.Minute
// shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := util.StringSet{}
activeServices := make(map[types.NamespacedName]bool) // use a map as a set
for _, service := range services {
activeServices.Insert(service.Name)
info, exists := proxier.getServiceInfo(service.Name)
serviceName := types.NamespacedName{service.Namespace, service.Name}
activeServices[serviceName] = true
info, exists := proxier.getServiceInfo(serviceName)
serviceIP := net.ParseIP(service.Spec.PortalIP)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) {
continue
}
if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) {
glog.V(4).Infof("Something changed for service %q: stopping it", service.Name)
err := proxier.closePortal(service.Name, info)
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName.String())
err := proxier.closePortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", service.Name, err)
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
err = proxier.stopProxy(service.Name, info)
err = proxier.stopProxy(serviceName, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", service.Name, err)
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", service.Name, serviceIP, service.Spec.Port, service.Spec.Protocol)
info, err := proxier.addServiceOnPort(service.Name, service.Spec.Protocol, 0, udpIdleTimeout)
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, service.Spec.Port, service.Spec.Protocol)
info, err := proxier.addServiceOnPort(serviceName, service.Spec.Protocol, 0, udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", service.Name, err)
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
}
info.portalIP = serviceIP
@ -499,16 +501,16 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
info.stickyMaxAgeMinutes = 180
glog.V(4).Infof("info: %+v", info)
err = proxier.openPortal(service.Name, info)
err = proxier.openPortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to open portal for %q: %v", service.Name, err)
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
}
proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes)
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices.Has(name) {
if !activeServices[name] {
glog.V(1).Infof("Stopping service %q", name)
err := proxier.closePortal(name, info)
if err != nil {
@ -534,7 +536,7 @@ func ipsEqual(lhs, rhs []string) bool {
return true
}
func (proxier *Proxier) openPortal(service string, info *serviceInfo) error {
func (proxier *Proxier) openPortal(service types.NamespacedName, info *serviceInfo) error {
err := proxier.openOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
@ -548,7 +550,7 @@ func (proxier *Proxier) openPortal(service string, info *serviceInfo) error {
return nil
}
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) error {
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name types.NamespacedName) error {
// Handle traffic from containers.
args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...)
@ -573,7 +575,7 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
return nil
}
func (proxier *Proxier) closePortal(service string, info *serviceInfo) error {
func (proxier *Proxier) closePortal(service types.NamespacedName, info *serviceInfo) error {
// Collect errors and report them all at the end.
el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
for _, publicIP := range info.publicIP {
@ -587,7 +589,7 @@ func (proxier *Proxier) closePortal(service string, info *serviceInfo) error {
return errors.NewAggregate(el)
}
func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) []error {
func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name types.NamespacedName) []error {
el := []error{}
// Handle traffic from containers.
@ -666,7 +668,7 @@ var zeroIPv6 = net.ParseIP("::0")
var localhostIPv6 = net.ParseIP("::1")
// Build a slice of iptables args that are common to from-container and from-host portal rules.
func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service string) []string {
func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service types.NamespacedName) []string {
// This list needs to include all fields as they are eventually spit out
// by iptables-save. This is because some systems do not support the
// 'iptables -C' arg, and so fall back on parsing iptables-save output.
@ -677,7 +679,7 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol
// iptables versions.
args := []string{
"-m", "comment",
"--comment", service,
"--comment", service.String(),
"-p", strings.ToLower(string(protocol)),
"-m", strings.ToLower(string(protocol)),
"-d", fmt.Sprintf("%s/32", destIP.String()),
@ -687,7 +689,7 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol
}
// Build a slice of iptables args for a from-container portal rule.
func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string {
func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service types.NamespacedName) []string {
args := iptablesCommonPortalArgs(destIP, destPort, protocol, service)
// This is tricky.
@ -734,7 +736,7 @@ func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int,
}
// Build a slice of iptables args for a from-host portal rule.
func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string {
func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service types.NamespacedName) []string {
args := iptablesCommonPortalArgs(destIP, destPort, protocol, service)
// This is tricky.

View File

@ -29,6 +29,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
)
@ -194,9 +195,10 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -204,7 +206,7 @@ func TestTCPProxy(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -214,9 +216,10 @@ func TestTCPProxy(t *testing.T) {
func TestUDPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -224,7 +227,7 @@ func TestUDPProxy(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -233,7 +236,7 @@ func TestUDPProxy(t *testing.T) {
}
// Helper: Stops the proxy for the named service.
func stopProxyByName(proxier *Proxier, service string) error {
func stopProxyByName(proxier *Proxier, service types.NamespacedName) error {
info, found := proxier.getServiceInfo(service)
if !found {
return fmt.Errorf("unknown service: %s", service)
@ -243,9 +246,10 @@ func stopProxyByName(proxier *Proxier, service string) error {
func TestTCPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -253,7 +257,7 @@ func TestTCPProxyStop(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -264,7 +268,7 @@ func TestTCPProxyStop(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, "echo")
stopProxyByName(p, service)
// Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
@ -274,9 +278,10 @@ func TestTCPProxyStop(t *testing.T) {
func TestUDPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -284,7 +289,7 @@ func TestUDPProxyStop(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -295,7 +300,7 @@ func TestUDPProxyStop(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, "echo")
stopProxyByName(p, service)
// Wait for the port to really close.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
@ -305,9 +310,10 @@ func TestUDPProxyStop(t *testing.T) {
func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -315,7 +321,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -335,9 +341,10 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
func TestUDPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -345,7 +352,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -365,9 +372,10 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -375,7 +383,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -392,11 +400,11 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP"}, Status: api.ServiceStatus{}},
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP"}, Status: api.ServiceStatus{}},
})
svcInfo, exists := p.getServiceInfo("echo")
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
t.Fatalf("can't find serviceInfo for %s", service)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)
@ -404,9 +412,10 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -414,7 +423,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -431,9 +440,9 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP"}, Status: api.ServiceStatus{}},
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP"}, Status: api.ServiceStatus{}},
})
svcInfo, exists := p.getServiceInfo("echo")
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
@ -443,9 +452,10 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
func TestTCPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -453,7 +463,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -461,13 +471,13 @@ func TestTCPProxyUpdatePort(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: 99, Protocol: "TCP"}, Status: api.ServiceStatus{}},
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "TCP"}, Status: api.ServiceStatus{}},
})
// Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo("echo")
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
@ -479,9 +489,10 @@ func TestTCPProxyUpdatePort(t *testing.T) {
func TestUDPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -489,20 +500,20 @@ func TestUDPProxyUpdatePort(t *testing.T) {
p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
waitForNumProxyLoops(t, p, 1)
p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: 99, Protocol: "UDP"}, Status: api.ServiceStatus{}},
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "UDP"}, Status: api.ServiceStatus{}},
})
// Wait for the socket to actually get free.
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo("echo")
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
}

View File

@ -26,6 +26,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/slice"
"github.com/golang/glog"
)
@ -49,13 +50,10 @@ type affinityPolicy struct {
ttlMinutes int
}
// balancerKey is a string that the balancer uses to key stored state.
type balancerKey string
// LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct {
lock sync.RWMutex
services map[balancerKey]*balancerState
services map[types.NamespacedName]*balancerState
}
type balancerState struct {
@ -75,11 +73,11 @@ func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityP
// NewLoadBalancerRR returns a new LoadBalancerRR.
func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{
services: map[balancerKey]*balancerState{},
services: map[types.NamespacedName]*balancerState{},
}
}
func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error {
func (lb *LoadBalancerRR) NewService(service types.NamespacedName, affinityType api.AffinityType, ttlMinutes int) error {
lb.lock.Lock()
defer lb.lock.Unlock()
@ -88,17 +86,16 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy
}
// This assumes that lb.lock is already held.
func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState {
func (lb *LoadBalancerRR) newServiceInternal(service types.NamespacedName, affinityType api.AffinityType, ttlMinutes int) *balancerState {
if ttlMinutes == 0 {
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
}
key := balancerKey(service)
if _, exists := lb.services[key]; !exists {
lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
if _, exists := lb.services[service]; !exists {
lb.services[service] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service)
}
return lb.services[key]
return lb.services[service]
}
// return true if this service is using some form of session affinity.
@ -112,13 +109,13 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) {
func (lb *LoadBalancerRR) NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
defer lb.lock.Unlock()
key := balancerKey(service)
key := service
state, exists := lb.services[key]
if !exists || state == nil {
return "", ErrMissingServiceEntry
@ -185,7 +182,7 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string {
}
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) {
func removeSessionAffinityByEndpoint(state *balancerState, service types.NamespacedName, endpoint string) {
for _, affinity := range state.affinity.affinityMap {
if affinity.endpoint == endpoint {
glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service)
@ -197,7 +194,7 @@ func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey,
// Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
// Then remove any session affinity records that are not in both lists.
// This assumes the lb.lock is held.
func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) {
func (lb *LoadBalancerRR) updateAffinityMap(service types.NamespacedName, newEndpoints []string) {
allEndpoints := map[string]int{}
for _, newEndpoint := range newEndpoints {
allEndpoints[newEndpoint] = 1
@ -221,13 +218,14 @@ func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []
// Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[balancerKey]bool)
registeredEndpoints := make(map[types.NamespacedName]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
// Update endpoints for services.
for _, svcEndpoints := range allEndpoints {
key := balancerKey(svcEndpoints.Name)
name := types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name}
key := name
state, exists := lb.services[key]
curEndpoints := []string{}
if state != nil {
@ -240,7 +238,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
// On update can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist.
state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0)
state = lb.newServiceInternal(name, api.AffinityTypeNone, 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index.
@ -268,11 +266,11 @@ func slicesEquiv(lhs, rhs []string) bool {
return false
}
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) {
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service types.NamespacedName) {
lb.lock.Lock()
defer lb.lock.Unlock()
key := balancerKey(service)
key := service
state, exists := lb.services[key]
if !exists {
glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service)

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
func TestValidateWorks(t *testing.T) {
@ -66,7 +67,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
var endpoints []api.Endpoints
loadBalancer.OnUpdate(endpoints)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil {
t.Errorf("Didn't fail with non-existent service")
}
@ -75,7 +77,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
}
}
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) {
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service types.NamespacedName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr)
if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
@ -87,31 +89,33 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string,
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
}
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
@ -119,22 +123,23 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
@ -142,31 +147,31 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 8},
{IP: "endpoint", Port: 9},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
endpoint, err = loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@ -174,13 +179,15 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
fooService := types.NewNamespacedNameOrDie("testnamespace", "foo")
barService := types.NewNamespacedNameOrDie("testnamespace", "bar")
endpoint, err := loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
@ -188,60 +195,61 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
},
}
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
shuffledBarEndpoints := loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
}
func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
@ -249,15 +257,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
@ -265,15 +274,15 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
@ -281,15 +290,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeNone, 0)
loadBalancer.NewService(service, api.AffinityTypeNone, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
@ -297,15 +307,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
@ -316,15 +327,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
@ -332,25 +344,25 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
client2Endpoint := shuffledEndpoints[1]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
client3Endpoint := shuffledEndpoints[2]
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints
shuffledEndpoints = loadBalancer.services[service].endpoints
if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0]
} else if client2Endpoint == "endpoint:3" {
@ -358,12 +370,12 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
} else if client3Endpoint == "endpoint:3" {
client3Endpoint = shuffledEndpoints[0]
}
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
@ -371,13 +383,13 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client6)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6)
}
func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
@ -385,15 +397,16 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
@ -401,35 +414,35 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
endpoint, err = loadBalancer.NextEndpoint(service, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@ -440,58 +453,61 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
fooService := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService(fooService, api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0)
barService := types.NewNamespacedNameOrDie("testnamespace", "bar")
loadBalancer.NewService(barService, api.AffinityTypeClientIP, 0)
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 5},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
shuffledBarEndpoints = loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
shuffledBarEndpoints = loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
}

View File

@ -0,0 +1,35 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
// NamespacedName comprises a resource name, with a mandatory namespace,
// rendered as "<namespace>/<name>". Being a type captures intent and
// helps make sure that UIDs, namespaced names and non-namespaced names
// do not get conflated in code. For most use cases, namespace and name
// will already have been format validated at the API entry point, so we
// don't do that here. Where that's not the case (e.g. in testing),
// consider using NamespacedNameOrDie() in testing.go in this package.
type NamespacedName struct {
Namespace string
Name string
}
// String returns the general purpose string representation
func (n NamespacedName) String() string {
return n.Namespace + "/" + n.Name
}

26
pkg/types/testing.go Normal file
View File

@ -0,0 +1,26 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import "fmt"
func NewNamespacedNameOrDie(namespace, name string) (ret NamespacedName) {
if len(namespace) == 0 || len(name) == 0 {
panic(fmt.Sprintf("invalid call to NewNamespacedNameOrDie(%q, %q)", namespace, name))
}
return NamespacedName{namespace, name}
}

View File

@ -18,6 +18,7 @@ package e2e
import (
"fmt"
"sort"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -178,8 +179,8 @@ var _ = Describe("Services", func() {
Expect(foundRO).To(Equal(true))
})
It("should serve basic a endpoint from pods", func(done Done) {
serviceName := "endpoint-test"
It("should serve a basic endpoint from pods", func(done Done) {
serviceName := "endpoint-test2"
ns := api.NamespaceDefault
labels := map[string]string{
"foo": "bar",
@ -243,9 +244,59 @@ var _ = Describe("Services", func() {
defer func() {
close(done)
}()
}, 120.0)
}, 240.0)
It("should correctly serve identically named services in different namespaces on different external IP addresses", func(done Done) {
serviceNames := []string{"services-namespace-test0"} // Could add more here, but then it takes longer.
namespaces := []string{"namespace0", "namespace1"} // As above.
labels := map[string]string{
"key0": "value0",
"key1": "value1",
}
service := &api.Service{
ObjectMeta: api.ObjectMeta{},
Spec: api.ServiceSpec{
Port: 80,
Selector: labels,
ContainerPort: util.NewIntOrStringFromInt(80),
CreateExternalLoadBalancer: true,
},
}
publicIPs := []string{}
// We defer Gingko pieces that may Fail, so clean up at the end.
defer func() {
close(done)
}()
for _, namespace := range namespaces {
for _, serviceName := range serviceNames {
service.ObjectMeta.Name = serviceName
service.ObjectMeta.Namespace = namespace
By("creating service " + serviceName + " in namespace " + namespace)
result, err := c.Services(namespace).Create(service)
Expect(err).NotTo(HaveOccurred())
defer func(namespace, serviceName string) { // clean up when we're done
By("deleting service " + serviceName + " in namespace " + namespace)
err := c.Services(namespace).Delete(serviceName)
Expect(err).NotTo(HaveOccurred())
}(namespace, serviceName)
publicIPs = append(publicIPs, result.Spec.PublicIPs...) // Save 'em to check uniqueness
}
}
validateUniqueOrFail(publicIPs)
}, 240.0)
})
func validateUniqueOrFail(s []string) {
By(fmt.Sprintf("validating unique: %v", s))
sort.Strings(s)
var prev string
for i, elem := range s {
if i > 0 && elem == prev {
Fail("duplicate found: " + elem)
}
prev = elem
}
}
func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) {
ips := util.StringSet{}
for _, ep := range endpoints.Endpoints {
@ -263,7 +314,10 @@ func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEn
if !ips.Has(pod.Status.PodIP) {
Failf("ip validation failed, expected: %v, saw: %v", ips, pod.Status.PodIP)
}
By(fmt.Sprintf(""))
}
By(fmt.Sprintf("successfully validated IPs %v against expected endpoints %v port %d on namespace %s", ips, expectedEndpoints, expectedPort, ns))
}
func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedPort int, expectedEndpoints []string) {
@ -274,17 +328,18 @@ func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedP
validateIPsOrFail(c, ns, expectedPort, expectedEndpoints, endpoints)
return
} else {
By(fmt.Sprintf("Unexpected endpoints: %v, expected %v", endpoints.Endpoints, expectedEndpoints))
By(fmt.Sprintf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", endpoints.Endpoints, expectedEndpoints))
}
} else {
By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1s)", err))
By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1 second)", err))
}
time.Sleep(time.Second)
}
By(fmt.Sprintf("successfully validated endpoints %v port %d on service %s/%s", expectedEndpoints, expectedPort, ns, serviceName))
}
func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]string) {
By(fmt.Sprintf("Adding pod %v", name))
By(fmt.Sprintf("Adding pod %v in namespace %v", name, ns))
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,