diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 792644bc4b..0371ec4755 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" "github.com/golang/glog" ) @@ -81,7 +82,7 @@ type EndpointsConfig struct { // It immediately runs the created EndpointsConfig. func NewEndpointsConfig() *EndpointsConfig { updates := make(chan struct{}) - store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)} + store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)} mux := config.NewMux(store) bcaster := config.NewBroadcaster() go watchForUpdates(bcaster, store, updates) @@ -112,7 +113,7 @@ func (c *EndpointsConfig) Config() []api.Endpoints { type endpointsStore struct { endpointLock sync.RWMutex - endpoints map[string]map[string]api.Endpoints + endpoints map[string]map[types.NamespacedName]api.Endpoints updates chan<- struct{} } @@ -120,26 +121,29 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { s.endpointLock.Lock() endpoints := s.endpoints[source] if endpoints == nil { - endpoints = make(map[string]api.Endpoints) + endpoints = make(map[types.NamespacedName]api.Endpoints) } update := change.(EndpointsUpdate) switch update.Op { case ADD: glog.V(4).Infof("Adding new endpoint from source %s : %+v", source, update.Endpoints) for _, value := range update.Endpoints { - endpoints[value.Name] = value + name := types.NamespacedName{value.Namespace, value.Name} + endpoints[name] = value } case REMOVE: glog.V(4).Infof("Removing an endpoint %+v", update) for _, value := range update.Endpoints { - delete(endpoints, value.Name) + name := types.NamespacedName{value.Namespace, value.Name} + delete(endpoints, name) } case SET: glog.V(4).Infof("Setting endpoints %+v", update) // Clear the old map entries by just creating a new map - endpoints = make(map[string]api.Endpoints) + endpoints = make(map[types.NamespacedName]api.Endpoints) for _, value := range update.Endpoints { - endpoints[value.Name] = value + name := types.NamespacedName{value.Namespace, value.Name} + endpoints[name] = value } default: glog.V(4).Infof("Received invalid update type: %v", update) @@ -176,7 +180,7 @@ type ServiceConfig struct { // It immediately runs the created ServiceConfig. func NewServiceConfig() *ServiceConfig { updates := make(chan struct{}) - store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)} + store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)} mux := config.NewMux(store) bcaster := config.NewBroadcaster() go watchForUpdates(bcaster, store, updates) @@ -207,7 +211,7 @@ func (c *ServiceConfig) Config() []api.Service { type serviceStore struct { serviceLock sync.RWMutex - services map[string]map[string]api.Service + services map[string]map[types.NamespacedName]api.Service updates chan<- struct{} } @@ -215,26 +219,29 @@ func (s *serviceStore) Merge(source string, change interface{}) error { s.serviceLock.Lock() services := s.services[source] if services == nil { - services = make(map[string]api.Service) + services = make(map[types.NamespacedName]api.Service) } update := change.(ServiceUpdate) switch update.Op { case ADD: glog.V(4).Infof("Adding new service from source %s : %+v", source, update.Services) for _, value := range update.Services { - services[value.Name] = value + name := types.NamespacedName{value.Namespace, value.Name} + services[name] = value } case REMOVE: glog.V(4).Infof("Removing a service %+v", update) for _, value := range update.Services { - delete(services, value.Name) + name := types.NamespacedName{value.Namespace, value.Name} + delete(services, name) } case SET: glog.V(4).Infof("Setting services %+v", update) // Clear the old map entries by just creating a new map - services = make(map[string]api.Service) + services = make(map[types.NamespacedName]api.Service) for _, value := range update.Services { - services[value.Name] = value + name := types.NamespacedName{value.Namespace, value.Name} + services[name] = value } default: glog.V(4).Infof("Received invalid update type: %v", update) diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 3481266d58..582365f587 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -136,7 +136,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) { handler := NewServiceHandlerMock() handler.Wait(1) config.RegisterHandler(handler) - serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) + serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) channel <- serviceUpdate handler.ValidateServices(t, serviceUpdate.Services) @@ -147,24 +147,24 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { channel := config.Channel("one") handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) + serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) handler.Wait(1) channel <- serviceUpdate handler.ValidateServices(t, serviceUpdate.Services) - serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) + serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) handler.Wait(1) channel <- serviceUpdate2 services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]} handler.ValidateServices(t, services) - serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}}) handler.Wait(1) channel <- serviceUpdate3 services = []api.Service{serviceUpdate2.Services[0]} handler.ValidateServices(t, services) - serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}}) + serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}}) handler.Wait(1) channel <- serviceUpdate4 services = []api.Service{serviceUpdate4.Services[0]} @@ -180,8 +180,8 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) { } handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) - serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) + serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) + serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) handler.Wait(2) channelOne <- serviceUpdate1 channelTwo <- serviceUpdate2 @@ -197,8 +197,8 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T handler2 := NewServiceHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) - serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) + serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}}) + serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}}) handler.Wait(2) handler2.Wait(2) channelOne <- serviceUpdate1 @@ -217,11 +217,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing. config.RegisterHandler(handler) config.RegisterHandler(handler2) endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}}, }) endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "bar"}, + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}}, }) handler.Wait(2) @@ -243,11 +243,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t config.RegisterHandler(handler) config.RegisterHandler(handler2) endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}}, }) endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "bar"}, + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}}, }) handler.Wait(2) @@ -261,7 +261,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t // Add one more endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foobar"}, + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}}, }) handler.Wait(1) @@ -273,7 +273,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t // Update the "foo" service with new endpoints endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Endpoints: []api.Endpoint{{IP: "endpoint7"}}, }) handler.Wait(1) @@ -284,7 +284,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t handler2.ValidateEndpoints(t, endpoints) // Remove "bar" service - endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}) + endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}}) handler.Wait(1) handler2.Wait(1) channelTwo <- endpointsUpdate2 diff --git a/pkg/proxy/loadbalancer.go b/pkg/proxy/loadbalancer.go index a94665383d..f0e921258b 100644 --- a/pkg/proxy/loadbalancer.go +++ b/pkg/proxy/loadbalancer.go @@ -20,13 +20,14 @@ import ( "net" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) // LoadBalancer is an interface for distributing incoming requests to service endpoints. type LoadBalancer interface { // NextEndpoint returns the endpoint to handle a request for the given // service and source address. - NextEndpoint(service string, srcAddr net.Addr) (string, error) - NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error - CleanupStaleStickySessions(service string) + NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error) + NewService(service types.NamespacedName, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error + CleanupStaleStickySessions(service types.NamespacedName) } diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 864036dfed..9907ac36a4 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -27,6 +27,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" @@ -58,7 +59,7 @@ type proxySocket interface { // while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service string, info *serviceInfo, proxier *Proxier) + ProxyLoop(service types.NamespacedName, info *serviceInfo, proxier *Proxier) } // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, @@ -67,7 +68,7 @@ type tcpProxySocket struct { net.Listener } -func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { +func tryConnect(service types.NamespacedName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { for _, retryTimeout := range endpointDialTimeout { endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) if err != nil { @@ -87,7 +88,7 @@ func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Prox return nil, fmt.Errorf("failed to connect to an endpoint.") } -func (tcp *tcpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) { +func (tcp *tcpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *serviceInfo, proxier *Proxier) { for { if info, exists := proxier.getServiceInfo(service); !exists || info != myInfo { // The service port was closed or replaced. @@ -162,7 +163,7 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxier *Proxier) { +func (udp *udpProxySocket) ProxyLoop(service types.NamespacedName, myInfo *serviceInfo, proxier *Proxier) { activeClients := newClientCache() var buffer [4096]byte // 4KiB should be enough for most whole-packets for { @@ -207,7 +208,7 @@ func (udp *udpProxySocket) ProxyLoop(service string, myInfo *serviceInfo, proxie } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service string, timeout time.Duration) (net.Conn, error) { +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service types.NamespacedName, timeout time.Duration) (net.Conn, error) { activeClients.mu.Lock() defer activeClients.mu.Unlock() @@ -303,7 +304,7 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er type Proxier struct { loadBalancer LoadBalancer mu sync.Mutex // protects serviceMap - serviceMap map[string]*serviceInfo + serviceMap map[types.NamespacedName]*serviceInfo numProxyLoops int32 // use atomic ops to access this; mostly for testing listenIP net.IP iptables iptables.Interface @@ -345,7 +346,7 @@ func CreateProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables } return &Proxier{ loadBalancer: loadBalancer, - serviceMap: make(map[string]*serviceInfo), + serviceMap: make(map[types.NamespacedName]*serviceInfo), listenIP: listenIP, iptables: iptables, hostIP: hostIP, @@ -393,26 +394,26 @@ func (proxier *Proxier) cleanupStaleStickySessions() { } // This assumes proxier.mu is not locked. -func (proxier *Proxier) stopProxy(service string, info *serviceInfo) error { +func (proxier *Proxier) stopProxy(service types.NamespacedName, info *serviceInfo) error { proxier.mu.Lock() defer proxier.mu.Unlock() return proxier.stopProxyInternal(service, info) } // This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) error { +func (proxier *Proxier) stopProxyInternal(service types.NamespacedName, info *serviceInfo) error { delete(proxier.serviceMap, service) return info.socket.Close() } -func (proxier *Proxier) getServiceInfo(service string) (*serviceInfo, bool) { +func (proxier *Proxier) getServiceInfo(service types.NamespacedName) (*serviceInfo, bool) { proxier.mu.Lock() defer proxier.mu.Unlock() info, ok := proxier.serviceMap[service] return info, ok } -func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) { +func (proxier *Proxier) setServiceInfo(service types.NamespacedName, info *serviceInfo) { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.serviceMap[service] = info @@ -421,7 +422,7 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) { // addServiceOnPort starts listening for a new service, returning the serviceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { +func (proxier *Proxier) addServiceOnPort(service types.NamespacedName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -447,7 +448,7 @@ func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxier.setServiceInfo(service, si) glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) - go func(service string, proxier *Proxier) { + go func(service types.NamespacedName, proxier *Proxier) { defer util.HandleCrash() atomic.AddInt32(&proxier.numProxyLoops, 1) sock.ProxyLoop(service, si, proxier) @@ -465,30 +466,31 @@ const udpIdleTimeout = 1 * time.Minute // shutdown if missing from the update set. func (proxier *Proxier) OnUpdate(services []api.Service) { glog.V(4).Infof("Received update notice: %+v", services) - activeServices := util.StringSet{} + activeServices := make(map[types.NamespacedName]bool) // use a map as a set for _, service := range services { - activeServices.Insert(service.Name) - info, exists := proxier.getServiceInfo(service.Name) + serviceName := types.NamespacedName{service.Namespace, service.Name} + activeServices[serviceName] = true + info, exists := proxier.getServiceInfo(serviceName) serviceIP := net.ParseIP(service.Spec.PortalIP) // TODO: check health of the socket? What if ProxyLoop exited? if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) { continue } if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) { - glog.V(4).Infof("Something changed for service %q: stopping it", service.Name) - err := proxier.closePortal(service.Name, info) + glog.V(4).Infof("Something changed for service %q: stopping it", serviceName.String()) + err := proxier.closePortal(serviceName, info) if err != nil { - glog.Errorf("Failed to close portal for %q: %v", service.Name, err) + glog.Errorf("Failed to close portal for %q: %v", serviceName, err) } - err = proxier.stopProxy(service.Name, info) + err = proxier.stopProxy(serviceName, info) if err != nil { - glog.Errorf("Failed to stop service %q: %v", service.Name, err) + glog.Errorf("Failed to stop service %q: %v", serviceName, err) } } - glog.V(1).Infof("Adding new service %q at %s:%d/%s", service.Name, serviceIP, service.Spec.Port, service.Spec.Protocol) - info, err := proxier.addServiceOnPort(service.Name, service.Spec.Protocol, 0, udpIdleTimeout) + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, service.Spec.Port, service.Spec.Protocol) + info, err := proxier.addServiceOnPort(serviceName, service.Spec.Protocol, 0, udpIdleTimeout) if err != nil { - glog.Errorf("Failed to start proxy for %q: %v", service.Name, err) + glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) continue } info.portalIP = serviceIP @@ -499,16 +501,16 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { info.stickyMaxAgeMinutes = 180 glog.V(4).Infof("info: %+v", info) - err = proxier.openPortal(service.Name, info) + err = proxier.openPortal(serviceName, info) if err != nil { - glog.Errorf("Failed to open portal for %q: %v", service.Name, err) + glog.Errorf("Failed to open portal for %q: %v", serviceName, err) } - proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes) + proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) } proxier.mu.Lock() defer proxier.mu.Unlock() for name, info := range proxier.serviceMap { - if !activeServices.Has(name) { + if !activeServices[name] { glog.V(1).Infof("Stopping service %q", name) err := proxier.closePortal(name, info) if err != nil { @@ -534,7 +536,7 @@ func ipsEqual(lhs, rhs []string) bool { return true } -func (proxier *Proxier) openPortal(service string, info *serviceInfo) error { +func (proxier *Proxier) openPortal(service types.NamespacedName, info *serviceInfo) error { err := proxier.openOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service) if err != nil { return err @@ -548,7 +550,7 @@ func (proxier *Proxier) openPortal(service string, info *serviceInfo) error { return nil } -func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) error { +func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name types.NamespacedName) error { // Handle traffic from containers. args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name) existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...) @@ -573,7 +575,7 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol return nil } -func (proxier *Proxier) closePortal(service string, info *serviceInfo) error { +func (proxier *Proxier) closePortal(service types.NamespacedName, info *serviceInfo) error { // Collect errors and report them all at the end. el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service) for _, publicIP := range info.publicIP { @@ -587,7 +589,7 @@ func (proxier *Proxier) closePortal(service string, info *serviceInfo) error { return errors.NewAggregate(el) } -func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) []error { +func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name types.NamespacedName) []error { el := []error{} // Handle traffic from containers. @@ -666,7 +668,7 @@ var zeroIPv6 = net.ParseIP("::0") var localhostIPv6 = net.ParseIP("::1") // Build a slice of iptables args that are common to from-container and from-host portal rules. -func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service string) []string { +func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service types.NamespacedName) []string { // This list needs to include all fields as they are eventually spit out // by iptables-save. This is because some systems do not support the // 'iptables -C' arg, and so fall back on parsing iptables-save output. @@ -677,7 +679,7 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol // iptables versions. args := []string{ "-m", "comment", - "--comment", service, + "--comment", service.String(), "-p", strings.ToLower(string(protocol)), "-m", strings.ToLower(string(protocol)), "-d", fmt.Sprintf("%s/32", destIP.String()), @@ -687,7 +689,7 @@ func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol } // Build a slice of iptables args for a from-container portal rule. -func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string { +func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service types.NamespacedName) []string { args := iptablesCommonPortalArgs(destIP, destPort, protocol, service) // This is tricky. @@ -734,7 +736,7 @@ func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, } // Build a slice of iptables args for a from-host portal rule. -func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string { +func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service types.NamespacedName) []string { args := iptablesCommonPortalArgs(destIP, destPort, protocol, service) // This is tricky. diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 6f23084e47..0c4f4ba067 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" ) @@ -194,9 +195,10 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -204,7 +206,7 @@ func TestTCPProxy(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -214,9 +216,10 @@ func TestTCPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -224,7 +227,7 @@ func TestUDPProxy(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -233,7 +236,7 @@ func TestUDPProxy(t *testing.T) { } // Helper: Stops the proxy for the named service. -func stopProxyByName(proxier *Proxier, service string) error { +func stopProxyByName(proxier *Proxier, service types.NamespacedName) error { info, found := proxier.getServiceInfo(service) if !found { return fmt.Errorf("unknown service: %s", service) @@ -243,9 +246,10 @@ func stopProxyByName(proxier *Proxier, service string) error { func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -253,7 +257,7 @@ func TestTCPProxyStop(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -264,7 +268,7 @@ func TestTCPProxyStop(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - stopProxyByName(p, "echo") + stopProxyByName(p, service) // Wait for the port to really close. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) @@ -274,9 +278,10 @@ func TestTCPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -284,7 +289,7 @@ func TestUDPProxyStop(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -295,7 +300,7 @@ func TestUDPProxyStop(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - stopProxyByName(p, "echo") + stopProxyByName(p, service) // Wait for the port to really close. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) @@ -305,9 +310,10 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -315,7 +321,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -335,9 +341,10 @@ func TestTCPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -345,7 +352,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -365,9 +372,10 @@ func TestUDPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -375,7 +383,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -392,11 +400,11 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) p.OnUpdate([]api.Service{ - {ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP"}, Status: api.ServiceStatus{}}, + {ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "TCP"}, Status: api.ServiceStatus{}}, }) - svcInfo, exists := p.getServiceInfo("echo") + svcInfo, exists := p.getServiceInfo(service) if !exists { - t.Fatalf("can't find serviceInfo") + t.Fatalf("can't find serviceInfo for %s", service) } testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) waitForNumProxyLoops(t, p, 1) @@ -404,9 +412,10 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -414,7 +423,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -431,9 +440,9 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) p.OnUpdate([]api.Service{ - {ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP"}, Status: api.ServiceStatus{}}, + {ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.proxyPort, Protocol: "UDP"}, Status: api.ServiceStatus{}}, }) - svcInfo, exists := p.getServiceInfo("echo") + svcInfo, exists := p.getServiceInfo(service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -443,9 +452,10 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -453,7 +463,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } @@ -461,13 +471,13 @@ func TestTCPProxyUpdatePort(t *testing.T) { waitForNumProxyLoops(t, p, 1) p.OnUpdate([]api.Service{ - {ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: 99, Protocol: "TCP"}, Status: api.ServiceStatus{}}, + {ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "TCP"}, Status: api.ServiceStatus{}}, }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo("echo") + svcInfo, exists := p.getServiceInfo(service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -479,9 +489,10 @@ func TestTCPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() + service := types.NewNamespacedNameOrDie("testnamespace", "echo") lb.OnUpdate([]api.Endpoints{ { - ObjectMeta: api.ObjectMeta{Name: "echo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -489,20 +500,20 @@ func TestUDPProxyUpdatePort(t *testing.T) { p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1")) waitForNumProxyLoops(t, p, 0) - svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } waitForNumProxyLoops(t, p, 1) p.OnUpdate([]api.Service{ - {ObjectMeta: api.ObjectMeta{Name: "echo"}, Spec: api.ServiceSpec{Port: 99, Protocol: "UDP"}, Status: api.ServiceStatus{}}, + {ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: 99, Protocol: "UDP"}, Status: api.ServiceStatus{}}, }) // Wait for the socket to actually get free. if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo("echo") + svcInfo, exists := p.getServiceInfo(service) if !exists { t.Fatalf("can't find serviceInfo") } diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 1afc9f2762..4c8bc5cba9 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -26,6 +26,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/slice" "github.com/golang/glog" ) @@ -49,13 +50,10 @@ type affinityPolicy struct { ttlMinutes int } -// balancerKey is a string that the balancer uses to key stored state. -type balancerKey string - // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { lock sync.RWMutex - services map[balancerKey]*balancerState + services map[types.NamespacedName]*balancerState } type balancerState struct { @@ -75,11 +73,11 @@ func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityP // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - services: map[balancerKey]*balancerState{}, + services: map[types.NamespacedName]*balancerState{}, } } -func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { +func (lb *LoadBalancerRR) NewService(service types.NamespacedName, affinityType api.AffinityType, ttlMinutes int) error { lb.lock.Lock() defer lb.lock.Unlock() @@ -88,17 +86,16 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy } // This assumes that lb.lock is already held. -func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState { +func (lb *LoadBalancerRR) newServiceInternal(service types.NamespacedName, affinityType api.AffinityType, ttlMinutes int) *balancerState { if ttlMinutes == 0 { ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } - key := balancerKey(service) - if _, exists := lb.services[key]; !exists { - lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} + if _, exists := lb.services[service]; !exists { + lb.services[service] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service) } - return lb.services[key] + return lb.services[service] } // return true if this service is using some form of session affinity. @@ -112,13 +109,13 @@ func isSessionAffinity(affinity *affinityPolicy) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. -func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { +func (lb *LoadBalancerRR) NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error) { // Coarse locking is simple. We can get more fine-grained if/when we // can prove it matters. lb.lock.Lock() defer lb.lock.Unlock() - key := balancerKey(service) + key := service state, exists := lb.services[key] if !exists || state == nil { return "", ErrMissingServiceEntry @@ -185,7 +182,7 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { } // Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) { +func removeSessionAffinityByEndpoint(state *balancerState, service types.NamespacedName, endpoint string) { for _, affinity := range state.affinity.affinityMap { if affinity.endpoint == endpoint { glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service) @@ -197,7 +194,7 @@ func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, // Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. // This assumes the lb.lock is held. -func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) { +func (lb *LoadBalancerRR) updateAffinityMap(service types.NamespacedName, newEndpoints []string) { allEndpoints := map[string]int{} for _, newEndpoint := range newEndpoints { allEndpoints[newEndpoint] = 1 @@ -221,13 +218,14 @@ func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints [] // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { - registeredEndpoints := make(map[balancerKey]bool) + registeredEndpoints := make(map[types.NamespacedName]bool) lb.lock.Lock() defer lb.lock.Unlock() // Update endpoints for services. for _, svcEndpoints := range allEndpoints { - key := balancerKey(svcEndpoints.Name) + name := types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name} + key := name state, exists := lb.services[key] curEndpoints := []string{} if state != nil { @@ -240,7 +238,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { // On update can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created // if one does not already exist. - state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) + state = lb.newServiceInternal(name, api.AffinityTypeNone, 0) state.endpoints = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. @@ -268,11 +266,11 @@ func slicesEquiv(lhs, rhs []string) bool { return false } -func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { +func (lb *LoadBalancerRR) CleanupStaleStickySessions(service types.NamespacedName) { lb.lock.Lock() defer lb.lock.Unlock() - key := balancerKey(service) + key := service state, exists := lb.services[key] if !exists { glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service) diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index c47e07eab0..099843bc56 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) func TestValidateWorks(t *testing.T) { @@ -66,7 +67,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() var endpoints []api.Endpoints loadBalancer.OnUpdate(endpoints) - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil { t.Errorf("Didn't fail with non-existent service") } @@ -75,7 +77,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { } } -func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) { +func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service types.NamespacedName, expected string, netaddr net.Addr) { endpoint, err := loadBalancer.NextEndpoint(service, netaddr) if err != nil { t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) @@ -87,31 +89,33 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) } func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, @@ -119,22 +123,23 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) + shuffledEndpoints := loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil) } func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, @@ -142,31 +147,31 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + shuffledEndpoints := loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 8}, {IP: "endpoint", Port: 9}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + shuffledEndpoints = loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil) // Clear endpoints - endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -174,13 +179,15 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + fooService := types.NewNamespacedNameOrDie("testnamespace", "foo") + barService := types.NewNamespacedNameOrDie("testnamespace", "bar") + endpoint, err := loadBalancer.NextEndpoint(fooService, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 2) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, @@ -188,60 +195,61 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } endpoints[1] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "bar"}, + ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 4}, {IP: "endpoint", Port: 5}, }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) + shuffledFooEndpoints := loadBalancer.services[fooService].endpoints + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], nil) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], nil) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], nil) - shuffledBarEndpoints := loadBalancer.services["bar"].endpoints - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + shuffledBarEndpoints := loadBalancer.services[barService].endpoints + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint(fooService, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } // but bar is still there, and we continue RR from where we left off. - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil) } func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService(service, api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) + expectEndpoint(t, loadBalancer, service, "endpoint:1", client1) + expectEndpoint(t, loadBalancer, service, "endpoint:1", client1) + expectEndpoint(t, loadBalancer, service, "endpoint:1", client2) + expectEndpoint(t, loadBalancer, service, "endpoint:1", client2) } func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { @@ -249,15 +257,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService(service, api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, @@ -265,15 +274,15 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + shuffledEndpoints := loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) } func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { @@ -281,15 +290,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeNone, 0) + loadBalancer.NewService(service, api.AffinityTypeNone, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, @@ -297,15 +307,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) + + shuffledEndpoints := loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1) } func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { @@ -316,15 +327,16 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0} client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService(service, api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, @@ -332,25 +344,25 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + shuffledEndpoints := loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) client2Endpoint := shuffledEndpoints[1] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services["foo"].endpoints + shuffledEndpoints = loadBalancer.services[service].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] } else if client2Endpoint == "endpoint:3" { @@ -358,12 +370,12 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } else if client3Endpoint == "endpoint:3" { client3Endpoint = shuffledEndpoints[0] } - expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) - expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) - expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) + expectEndpoint(t, loadBalancer, service, client1Endpoint, client1) + expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) + expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, @@ -371,13 +383,13 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) - expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) - expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client4) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client5) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client6) + shuffledEndpoints = loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, client1Endpoint, client1) + expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) + expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6) } func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { @@ -385,15 +397,16 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + service := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService(service, api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, @@ -401,35 +414,35 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + shuffledEndpoints := loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 4}, {IP: "endpoint", Port: 5}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + shuffledEndpoints = loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Clear endpoints - endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint(service, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -440,58 +453,61 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + fooService := types.NewNamespacedNameOrDie("testnamespace", "foo") + endpoint, err := loadBalancer.NextEndpoint(fooService, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService(fooService, api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 2) endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 1}, {IP: "endpoint", Port: 2}, {IP: "endpoint", Port: 3}, }, } - loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0) + barService := types.NewNamespacedNameOrDie("testnamespace", "bar") + loadBalancer.NewService(barService, api.AffinityTypeClientIP, 0) endpoints[1] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "bar"}, + ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Endpoints: []api.Endpoint{ {IP: "endpoint", Port: 5}, {IP: "endpoint", Port: 5}, }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - shuffledBarEndpoints := loadBalancer.services["bar"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) + shuffledFooEndpoints := loadBalancer.services[fooService].endpoints + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2) + + shuffledBarEndpoints := loadBalancer.services[barService].endpoints + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint(fooService, nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } // but bar is still there, and we continue RR from where we left off. - shuffledBarEndpoints = loadBalancer.services["bar"].endpoints - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) + shuffledBarEndpoints = loadBalancer.services[barService].endpoints + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) } diff --git a/pkg/types/namespacedname.go b/pkg/types/namespacedname.go new file mode 100644 index 0000000000..7a0614bc86 --- /dev/null +++ b/pkg/types/namespacedname.go @@ -0,0 +1,35 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +// NamespacedName comprises a resource name, with a mandatory namespace, +// rendered as "/". Being a type captures intent and +// helps make sure that UIDs, namespaced names and non-namespaced names +// do not get conflated in code. For most use cases, namespace and name +// will already have been format validated at the API entry point, so we +// don't do that here. Where that's not the case (e.g. in testing), +// consider using NamespacedNameOrDie() in testing.go in this package. + +type NamespacedName struct { + Namespace string + Name string +} + +// String returns the general purpose string representation +func (n NamespacedName) String() string { + return n.Namespace + "/" + n.Name +} diff --git a/pkg/types/testing.go b/pkg/types/testing.go new file mode 100644 index 0000000000..1a8c87c529 --- /dev/null +++ b/pkg/types/testing.go @@ -0,0 +1,26 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import "fmt" + +func NewNamespacedNameOrDie(namespace, name string) (ret NamespacedName) { + if len(namespace) == 0 || len(name) == 0 { + panic(fmt.Sprintf("invalid call to NewNamespacedNameOrDie(%q, %q)", namespace, name)) + } + return NamespacedName{namespace, name} +} diff --git a/test/e2e/service.go b/test/e2e/service.go index 22e566929e..af003e5137 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -18,6 +18,7 @@ package e2e import ( "fmt" + "sort" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -178,8 +179,8 @@ var _ = Describe("Services", func() { Expect(foundRO).To(Equal(true)) }) - It("should serve basic a endpoint from pods", func(done Done) { - serviceName := "endpoint-test" + It("should serve a basic endpoint from pods", func(done Done) { + serviceName := "endpoint-test2" ns := api.NamespaceDefault labels := map[string]string{ "foo": "bar", @@ -243,9 +244,59 @@ var _ = Describe("Services", func() { defer func() { close(done) }() - }, 120.0) + }, 240.0) + It("should correctly serve identically named services in different namespaces on different external IP addresses", func(done Done) { + serviceNames := []string{"services-namespace-test0"} // Could add more here, but then it takes longer. + namespaces := []string{"namespace0", "namespace1"} // As above. + labels := map[string]string{ + "key0": "value0", + "key1": "value1", + } + service := &api.Service{ + ObjectMeta: api.ObjectMeta{}, + Spec: api.ServiceSpec{ + Port: 80, + Selector: labels, + ContainerPort: util.NewIntOrStringFromInt(80), + CreateExternalLoadBalancer: true, + }, + } + publicIPs := []string{} + // We defer Gingko pieces that may Fail, so clean up at the end. + defer func() { + close(done) + }() + for _, namespace := range namespaces { + for _, serviceName := range serviceNames { + service.ObjectMeta.Name = serviceName + service.ObjectMeta.Namespace = namespace + By("creating service " + serviceName + " in namespace " + namespace) + result, err := c.Services(namespace).Create(service) + Expect(err).NotTo(HaveOccurred()) + defer func(namespace, serviceName string) { // clean up when we're done + By("deleting service " + serviceName + " in namespace " + namespace) + err := c.Services(namespace).Delete(serviceName) + Expect(err).NotTo(HaveOccurred()) + }(namespace, serviceName) + publicIPs = append(publicIPs, result.Spec.PublicIPs...) // Save 'em to check uniqueness + } + } + validateUniqueOrFail(publicIPs) + }, 240.0) }) +func validateUniqueOrFail(s []string) { + By(fmt.Sprintf("validating unique: %v", s)) + sort.Strings(s) + var prev string + for i, elem := range s { + if i > 0 && elem == prev { + Fail("duplicate found: " + elem) + } + prev = elem + } +} + func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) { ips := util.StringSet{} for _, ep := range endpoints.Endpoints { @@ -263,7 +314,10 @@ func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEn if !ips.Has(pod.Status.PodIP) { Failf("ip validation failed, expected: %v, saw: %v", ips, pod.Status.PodIP) } + By(fmt.Sprintf("")) } + By(fmt.Sprintf("successfully validated IPs %v against expected endpoints %v port %d on namespace %s", ips, expectedEndpoints, expectedPort, ns)) + } func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedPort int, expectedEndpoints []string) { @@ -274,17 +328,18 @@ func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedP validateIPsOrFail(c, ns, expectedPort, expectedEndpoints, endpoints) return } else { - By(fmt.Sprintf("Unexpected endpoints: %v, expected %v", endpoints.Endpoints, expectedEndpoints)) + By(fmt.Sprintf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", endpoints.Endpoints, expectedEndpoints)) } } else { - By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1s)", err)) + By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1 second)", err)) } time.Sleep(time.Second) } + By(fmt.Sprintf("successfully validated endpoints %v port %d on service %s/%s", expectedEndpoints, expectedPort, ns, serviceName)) } func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]string) { - By(fmt.Sprintf("Adding pod %v", name)) + By(fmt.Sprintf("Adding pod %v in namespace %v", name, ns)) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: name,