diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 3f0c43d944..3ac43c0a9e 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -219,7 +219,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname}) var proxier proxy.ProxyProvider - var servicesHandler proxyconfig.ServiceConfigHandler + var serviceEventHandler proxyconfig.ServiceHandler + // TODO: Migrate all handlers to ServiceHandler types and + // get rid of this one. + var serviceHandler proxyconfig.ServiceConfigHandler var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) @@ -246,7 +249,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierIPTables - servicesHandler = proxierIPTables + serviceEventHandler = proxierIPTables endpointsEventHandler = proxierIPTables // No turning back. Remove artifacts that might still exist from the userspace Proxier. glog.V(0).Info("Tearing down userspace rules.") @@ -271,7 +274,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } - servicesHandler = proxierUserspace + serviceHandler = proxierUserspace proxier = proxierUserspace } else { // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for @@ -292,7 +295,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } - servicesHandler = proxierUserspace + serviceHandler = proxierUserspace proxier = proxierUserspace } // Remove artifacts from the pure-iptables Proxier, if not on Windows. @@ -314,7 +317,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), config.ConfigSyncPeriod) - serviceConfig.RegisterHandler(servicesHandler) + if serviceHandler != nil { + serviceConfig.RegisterHandler(serviceHandler) + } + if serviceEventHandler != nil { + serviceConfig.RegisterEventHandler(serviceEventHandler) + } go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod) diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 3918c4de27..b68c78286d 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -20,7 +20,6 @@ go_library( "//pkg/client/informers/informers_generated/internalversion/core/internalversion:go_default_library", "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/controller:go_default_library", - "//pkg/util/config:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 6d8d0a68f0..afc42817d6 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -18,7 +18,6 @@ package config import ( "reflect" - "sort" "sync" "testing" "time" @@ -51,40 +50,34 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - ch := make(chan struct{}) - handler := newSvcHandler(t, nil, func() { ch <- struct{}{} }) + handler := NewServiceHandlerMock() sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) - serviceConfig.RegisterHandler(handler) + serviceConfig.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) go serviceConfig.Run(stopCh) // Add the first service - handler.expected = []*api.Service{service1v1} fakeWatch.Add(service1v1) - <-ch + handler.ValidateServices(t, []*api.Service{service1v1}) // Add another service - handler.expected = []*api.Service{service1v1, service2} fakeWatch.Add(service2) - <-ch + handler.ValidateServices(t, []*api.Service{service1v1, service2}) // Modify service1 - handler.expected = []*api.Service{service1v2, service2} fakeWatch.Modify(service1v2) - <-ch + handler.ValidateServices(t, []*api.Service{service1v2, service2}) // Delete service1 - handler.expected = []*api.Service{service2} fakeWatch.Delete(service1v2) - <-ch + handler.ValidateServices(t, []*api.Service{service2}) // Delete service2 - handler.expected = []*api.Service{} fakeWatch.Delete(service2) - <-ch + handler.ValidateServices(t, []*api.Service{}) } func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { @@ -155,22 +148,17 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { handler.ValidateEndpoints(t, []*api.Endpoints{}) } -type svcHandler struct { - t *testing.T - expected []*api.Service - done func() -} - -func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) *svcHandler { - return &svcHandler{t: t, expected: svcs, done: done} -} - -func (s *svcHandler) OnServiceUpdate(services []*api.Service) { - defer s.done() - sort.Sort(sortedServices(services)) - if !reflect.DeepEqual(s.expected, services) { - s.t.Errorf("Unexpected services: %#v, expected: %#v", services, s.expected) +func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) ServiceHandler { + shm := &ServiceHandlerMock{ + state: make(map[types.NamespacedName]*api.Service), } + shm.process = func(services []*api.Service) { + defer done() + if !reflect.DeepEqual(services, svcs) { + t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs) + } + } + return shm } func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler { @@ -213,7 +201,7 @@ func TestInitialSync(t *testing.T) { svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0) epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0) svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done) - svcConfig.RegisterHandler(svcHandler) + svcConfig.RegisterEventHandler(svcHandler) epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) epsConfig.RegisterEventHandler(epsHandler) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 89cc3c1b63..3e99d1f9fa 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -28,10 +28,10 @@ import ( coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion" listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/util/config" ) // ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. +// DEPRECATED: Use ServiceHandler instead - this will be removed soon. type ServiceConfigHandler interface { // OnServiceUpdate gets called when a service is created, removed or changed // on any of the configuration sources. An example is when a new service @@ -46,7 +46,24 @@ type ServiceConfigHandler interface { OnServiceUpdate(services []*api.Service) } -// EndpointsHandler is an abstract interface o objects which receive +// ServiceHandler is an abstract interface of objects which receive +// notifications about service object changes. +type ServiceHandler interface { + // OnServiceAdd is called whenever creation of new service object + // is observed. + OnServiceAdd(service *api.Service) + // OnServiceUpdate is called whenever modification of an existing + // service object is observed. + OnServiceUpdate(oldService, service *api.Service) + // OnServiceDelete is called whenever deletion of an existing service + // object is observed. + OnServiceDelete(service *api.Service) + // OnServiceSynced is called once all the initial even handlers were + // called and the state is fully propagated to local cache. + OnServiceSynced() +} + +// EndpointsHandler is an abstract interface of objects which receive // notifications about endpoints object changes. type EndpointsHandler interface { // OnEndpointsAdd is called whenever creation of new endpoints object @@ -157,7 +174,7 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { } } for i := range c.eventHandlers { - glog.V(4).Infof("Calling handler.OnEndpointsUpdate") + glog.V(4).Infof("Calling handler.OnEndpointsDelete") c.eventHandlers[i].OnEndpointsDelete(endpoints) } } @@ -165,9 +182,11 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { // ServiceConfig tracks a set of service configurations. // It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change. type ServiceConfig struct { - lister listers.ServiceLister - listerSynced cache.InformerSynced - handlers []ServiceConfigHandler + lister listers.ServiceLister + listerSynced cache.InformerSynced + eventHandlers []ServiceHandler + // TODO: Remove as soon as we migrate everything to event handlers. + handlers []ServiceConfigHandler // updates channel is used to trigger registered handlers updates chan struct{} stop chan struct{} @@ -199,10 +218,16 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio } // RegisterHandler registers a handler which is called on every services change. +// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon. func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { c.handlers = append(c.handlers, handler) } +// RegisterEventHandler registers a handler which is called on every service change. +func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + // Run starts the goroutine responsible for calling // registered handlers. func (c *ServiceConfig) Run(stopCh <-chan struct{}) { @@ -217,6 +242,10 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { // We have synced informers. Now we can start delivering updates // to the registered handler. + for i := range c.eventHandlers { + glog.V(3).Infof("Calling handler.OnServiceSynced()") + c.eventHandlers[i].OnServiceSynced() + } go func() { defer utilruntime.HandleCrash() for { @@ -241,24 +270,60 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { } } }() + // Close updates channel when stopCh is closed. - go func() { - <-stopCh - close(c.stop) - }() - <-stopCh + close(c.stop) } -func (c *ServiceConfig) handleAddService(_ interface{}) { +func (c *ServiceConfig) handleAddService(obj interface{}) { + service, ok := obj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnServiceAdd") + c.eventHandlers[i].OnServiceAdd(service) + } c.dispatchUpdate() } -func (c *ServiceConfig) handleUpdateService(_, _ interface{}) { +func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) { + oldService, ok := oldObj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj)) + return + } + service, ok := newObj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) + return + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnServiceUpdate") + c.eventHandlers[i].OnServiceUpdate(oldService, service) + } c.dispatchUpdate() } -func (c *ServiceConfig) handleDeleteService(_ interface{}) { +func (c *ServiceConfig) handleDeleteService(obj interface{}) { + service, ok := obj.(*api.Service) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + if service, ok = tombstone.Obj.(*api.Service); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnServiceDelete") + c.eventHandlers[i].OnServiceDelete(service) + } c.dispatchUpdate() } @@ -272,12 +337,3 @@ func (c *ServiceConfig) dispatchUpdate() { glog.V(4).Infof("Service handler already has a pending interrupt.") } } - -// watchForUpdates invokes bcaster.Notify() with the latest version of an object -// when changes occur. -func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) { - for true { - <-updates - bcaster.Notify(accessor.MergedState()) - } -} diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index d4a9a370e0..c64072d234 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -46,16 +46,66 @@ func (s sortedServices) Less(i, j int) bool { } type ServiceHandlerMock struct { + lock sync.Mutex + + state map[types.NamespacedName]*api.Service + synced bool updated chan []*api.Service + process func([]*api.Service) } func NewServiceHandlerMock() *ServiceHandlerMock { - return &ServiceHandlerMock{updated: make(chan []*api.Service, 5)} + shm := &ServiceHandlerMock{ + state: make(map[types.NamespacedName]*api.Service), + updated: make(chan []*api.Service, 5), + } + shm.process = func(services []*api.Service) { + shm.updated <- services + } + return shm } -func (h *ServiceHandlerMock) OnServiceUpdate(services []*api.Service) { +func (h *ServiceHandlerMock) OnServiceAdd(service *api.Service) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + h.state[namespacedName] = service + h.sendServices() +} + +func (h *ServiceHandlerMock) OnServiceUpdate(oldService, service *api.Service) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + h.state[namespacedName] = service + h.sendServices() +} + +func (h *ServiceHandlerMock) OnServiceDelete(service *api.Service) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + delete(h.state, namespacedName) + h.sendServices() +} + +func (h *ServiceHandlerMock) OnServiceSynced() { + h.lock.Lock() + defer h.lock.Unlock() + h.synced = true + h.sendServices() +} + +func (h *ServiceHandlerMock) sendServices() { + if !h.synced { + return + } + services := make([]*api.Service, 0, len(h.state)) + for _, svc := range h.state { + services = append(services, svc) + } sort.Sort(sortedServices(services)) - h.updated <- services + h.process(services) } func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) { @@ -185,7 +235,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) { config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() - config.RegisterHandler(handler) + config.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) go config.Run(stopCh) @@ -209,7 +259,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() - config.RegisterHandler(handler) + config.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) go config.Run(stopCh) @@ -246,8 +296,8 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() handler2 := NewServiceHandlerMock() - config.RegisterHandler(handler) - config.RegisterHandler(handler2) + config.RegisterEventHandler(handler) + config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9c6cf16acb..bd547b5aa6 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -196,6 +196,7 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se } type endpointsMap map[types.NamespacedName]*api.Endpoints +type serviceMap map[types.NamespacedName]*api.Service type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo @@ -211,13 +212,13 @@ type Proxier struct { // to not be modified in the meantime, but also require to be not modified // by Proxier. allEndpoints endpointsMap - // allServices is nil until we have seen an OnServiceUpdate event. - allServices []*api.Service + allServices serviceMap - // endpointsSynced is set to true when endpoints are synced after startup. - // This is used to avoid updating iptables with some partial data after - // kube-proxy restart. + // endpointsSynced and servicesSynced are set to true when corresponding + // objects are synced after startup. This is used to avoid updating iptables + // with some partial data after kube-proxy restart. endpointsSynced bool + servicesSynced bool throttle flowcontrol.RateLimiter @@ -333,6 +334,7 @@ func NewProxier(ipt utiliptables.Interface, endpointsMap: make(proxyEndpointMap), portsMap: make(map[localPort]closeable), allEndpoints: make(endpointsMap), + allServices: make(serviceMap), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, throttle: throttle, @@ -457,7 +459,7 @@ func (proxier *Proxier) SyncLoop() { // Accepts a list of Services and the existing service map. Returns the new // service map, a map of healthcheck ports, and a set of stale UDP // services. -func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) { +func buildNewServiceMap(allServices serviceMap, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) { newServiceMap := make(proxyServiceMap) hcPorts := make(map[types.NamespacedName]uint16) @@ -525,15 +527,37 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa return newServiceMap, hcPorts, staleUDPServices } -// OnServiceUpdate tracks the active set of service proxies. -// They will be synchronized using syncProxyRules() -func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) { +func (proxier *Proxier) OnServiceAdd(service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + proxier.mu.Lock() defer proxier.mu.Unlock() - if proxier.allServices == nil { - glog.V(2).Info("Received first Services update") - } - proxier.allServices = allServices + proxier.allServices[namespacedName] = service + proxier.syncProxyRules(syncReasonServices) +} + +func (proxier *Proxier) OnServiceUpdate(_, service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.allServices[namespacedName] = service + proxier.syncProxyRules(syncReasonServices) +} + +func (proxier *Proxier) OnServiceDelete(service *api.Service) { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + + proxier.mu.Lock() + defer proxier.mu.Unlock() + delete(proxier.allServices, namespacedName) + proxier.syncProxyRules(syncReasonServices) +} + +func (proxier *Proxier) OnServiceSynced() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.servicesSynced = true proxier.syncProxyRules(syncReasonServices) } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 4a0d157598..72f7c4a6ab 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -388,7 +388,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { iptables: ipt, clusterCIDR: "10.0.0.0/24", allEndpoints: make(endpointsMap), - allServices: []*api.Service{}, + allServices: make(serviceMap), endpointsSynced: true, hostname: testHostname, portsMap: make(map[localPort]closeable), @@ -567,7 +567,7 @@ func TestClusterIPReject(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) { svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []api.ServicePort{{ @@ -576,7 +576,7 @@ func TestClusterIPReject(t *testing.T) { Protocol: api.ProtocolTCP, }} }), - } + ) fp.syncProxyRules(syncReasonForce) svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) @@ -600,7 +600,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []api.ServicePort{{ @@ -609,7 +609,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { Protocol: api.ProtocolTCP, }} }), - } + ) epIP := "10.180.0.1" fp.allEndpoints = makeEndpointsMap( @@ -659,7 +659,7 @@ func TestLoadBalancer(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP @@ -673,7 +673,7 @@ func TestLoadBalancer(t *testing.T) { IP: svcLBIP, }} }), - } + ) epIP := "10.180.0.1" fp.allEndpoints = makeEndpointsMap( @@ -719,7 +719,7 @@ func TestNodePort(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -730,7 +730,7 @@ func TestNodePort(t *testing.T) { NodePort: int32(svcNodePort), }} }), - } + ) epIP := "10.180.0.1" fp.allEndpoints = makeEndpointsMap( @@ -769,7 +769,7 @@ func TestNodePortReject(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -780,7 +780,7 @@ func TestNodePortReject(t *testing.T) { NodePort: int32(svcNodePort), }} }), - } + ) fp.syncProxyRules(syncReasonForce) @@ -806,7 +806,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP @@ -821,7 +821,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }} svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal }), - } + ) epIP1 := "10.180.0.1" epIP2 := "10.180.2.1" @@ -900,7 +900,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable Port: "p80", } - fp.allServices = []*api.Service{ + fp.allServices = makeServiceMap( makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -912,7 +912,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable }} svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal }), - } + ) epIP1 := "10.180.0.1" epIP2 := "10.180.2.1" @@ -992,7 +992,7 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po } func TestBuildServiceMapAddRemove(t *testing.T) { - services := []*api.Service{ + services := makeServiceMap( makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" @@ -1033,7 +1033,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { }, } }), - } + ) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 8 { @@ -1056,8 +1056,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } // Remove some stuff - services = []*api.Service{services[0]} - services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} + oneService := services[makeNSN("somewhere-else", "cluster-ip")] + oneService.Spec.Ports = []api.ServicePort{oneService.Spec.Ports[1]} + services = makeServiceMap(oneService) serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap) if len(serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", serviceMap) @@ -1082,13 +1083,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } func TestBuildServiceMapServiceHeadless(t *testing.T) { - services := []*api.Service{ + services := makeServiceMap( makeTestService("somewhere-else", "headless", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = api.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) }), - } + ) // Headless service should be ignored serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) @@ -1107,14 +1108,14 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { - services := []*api.Service{ + services := makeServiceMap( makeTestService("somewhere-else", "external-name", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeExternalName svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored svc.Spec.ExternalName = "foo2.bar.com" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0) }), - } + ) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 0 { @@ -1130,16 +1131,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { } func TestBuildServiceMapServiceUpdate(t *testing.T) { - first := []*api.Service{ + first := makeServiceMap( makeTestService("somewhere", "some-service", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0) }), - } + ) - second := []*api.Service{ + second := makeServiceMap( makeTestService("somewhere", "some-service", func(svc *api.Service) { svc.ObjectMeta.Annotations = map[string]string{ service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, @@ -1156,7 +1157,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { }, } }), - } + ) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) if len(serviceMap) != 2 { @@ -1426,6 +1427,15 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName { } } +func makeServiceMap(allServices ...*api.Service) serviceMap { + result := make(serviceMap) + for _, service := range allServices { + namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + result[namespacedName] = service + } + return result +} + func Test_buildNewEndpointsMap(t *testing.T) { var nodeName = "host"