From c7353432df21f42193908dd129d4c04f4099d32c Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 18 Apr 2017 14:51:14 +0200 Subject: [PATCH] Don't rebuild service map in iptables kube-proxy all the time --- pkg/proxy/iptables/BUILD | 1 - pkg/proxy/iptables/proxier.go | 252 ++++++++++++++++++----------- pkg/proxy/iptables/proxier_test.go | 182 ++++++++++++--------- 3 files changed, 269 insertions(+), 166 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 0a3de1effa..4a2d71e594 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -24,7 +24,6 @@ go_library( "//pkg/util/iptables:go_default_library", "//pkg/util/sysctl:go_default_library", "//pkg/util/version:go_default_library", - "//vendor/github.com/davecgh/go-spew/spew:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 74c155b400..8d133e3300 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -32,7 +32,6 @@ import ( "sync" "time" - "github.com/davecgh/go-spew/spew" "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" @@ -199,8 +198,14 @@ type endpointsChange struct { previous *api.Endpoints current *api.Endpoints } + +type serviceChange struct { + previous *api.Service + current *api.Service +} + type endpointsChangeMap map[types.NamespacedName]*endpointsChange -type serviceMap map[types.NamespacedName]*api.Service +type serviceChangeMap map[types.NamespacedName]*serviceChange type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo @@ -219,20 +224,23 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { - mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap + mu sync.Mutex // protects the following fields + + serviceMap proxyServiceMap + // serviceChanges contains all changes to services that happened since + // last syncProxyRules call. For a single object, changes are accumulated, + // i.e. previous is state from before all of them, current is state after + // applying all of those. + serviceChanges serviceChangeMap + endpointsMap proxyEndpointsMap // endpointsChanges contains all changes to endpoints that happened since // last syncProxyRules call. For a single object, changes are accumulated, // i.e. previous is state from before all of them, current is state after // applying all of those. endpointsChanges endpointsChangeMap - portsMap map[localPort]closeable - // allServices should never be modified by proxier - the - // pointers are shared with higher layers of kube-proxy. They are guaranteed - // to not be modified in the meantime, but also require to be not modified - // by Proxier. - allServices serviceMap + + portsMap map[localPort]closeable // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating iptables @@ -350,11 +358,11 @@ func NewProxier(ipt utiliptables.Interface, } return &Proxier{ + portsMap: make(map[localPort]closeable), serviceMap: make(proxyServiceMap), + serviceChanges: make(serviceChangeMap), endpointsMap: make(proxyEndpointsMap), endpointsChanges: make(endpointsChangeMap), - portsMap: make(map[localPort]closeable), - allServices: make(serviceMap), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, throttle: throttle, @@ -476,92 +484,37 @@ func (proxier *Proxier) SyncLoop() { } } -// Accepts a list of Services and the existing service map. Returns the new -// service map, a map of healthcheck ports, and a set of stale UDP -// services. -func buildNewServiceMap(allServices serviceMap, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) { - newServiceMap := make(proxyServiceMap) - hcPorts := make(map[types.NamespacedName]uint16) - - for _, service := range allServices { - svcName := types.NamespacedName{ - Namespace: service.Namespace, - Name: service.Name, - } - - // if ClusterIP is "None" or empty, skip proxying - if !helper.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) - continue - } - // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied - if service.Spec.Type == api.ServiceTypeExternalName { - glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName) - continue - } - - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - - serviceName := proxy.ServicePortName{ - NamespacedName: svcName, - Port: servicePort.Name, - } - - info := newServiceInfo(serviceName, servicePort, service) - oldInfo, exists := oldServiceMap[serviceName] - equal := reflect.DeepEqual(info, oldInfo) - if !exists { - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) - } else if !equal { - glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) - } - - if info.onlyNodeLocalEndpoints { - hcPorts[svcName] = uint16(info.healthCheckNodePort) - } - - newServiceMap[serviceName] = info - glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info)) - } - } - - for nsn, port := range hcPorts { - if port == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", nsn) - delete(hcPorts, nsn) - } - } - - staleUDPServices := sets.NewString() - // Remove serviceports missing from the update. - for name, info := range oldServiceMap { - if _, exists := newServiceMap[name]; !exists { - glog.V(1).Infof("Removing service %q", name) - if info.protocol == api.ProtocolUDP { - staleUDPServices.Insert(info.clusterIP.String()) - } - } - } - - return newServiceMap, hcPorts, staleUDPServices -} - func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.allServices[namespacedName] = service + + change, exists := proxier.serviceChanges[namespacedName] + if !exists { + change = &serviceChange{} + change.previous = nil + proxier.serviceChanges[namespacedName] = change + } + change.current = service + proxier.syncProxyRules(syncReasonServices) } -func (proxier *Proxier) OnServiceUpdate(_, service *api.Service) { +func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.allServices[namespacedName] = service + + change, exists := proxier.serviceChanges[namespacedName] + if !exists { + change = &serviceChange{} + change.previous = oldService + proxier.serviceChanges[namespacedName] = change + } + change.current = service + proxier.syncProxyRules(syncReasonServices) } @@ -570,7 +523,15 @@ func (proxier *Proxier) OnServiceDelete(service *api.Service) { proxier.mu.Lock() defer proxier.mu.Unlock() - delete(proxier.allServices, namespacedName) + + change, exists := proxier.serviceChanges[namespacedName] + if !exists { + change = &serviceChange{} + change.previous = service + proxier.serviceChanges[namespacedName] = change + } + change.current = nil + proxier.syncProxyRules(syncReasonServices) } @@ -581,6 +542,114 @@ func (proxier *Proxier) OnServiceSynced() { proxier.syncProxyRules(syncReasonServices) } +func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool { + // if ClusterIP is "None" or empty, skip proxying + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return true + } + // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied + if service.Spec.Type == api.ServiceTypeExternalName { + glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName) + return true + } + return false +} + +func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) { + if service == nil { + return false, nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if shouldSkipService(svcName, service) { + return false, nil + } + syncRequired := false + existingPorts := sets.NewString() + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + existingPorts.Insert(servicePort.Name) + info := newServiceInfo(serviceName, servicePort, service) + oldInfo, exists := (*sm)[serviceName] + equal := reflect.DeepEqual(info, oldInfo) + if exists { + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) + } else if !equal { + glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) + } + if !equal { + (*sm)[serviceName] = info + syncRequired = true + } + } + return syncRequired, existingPorts +} + +// are modified by this function with detected stale services. +func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool { + if service == nil { + return false + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if shouldSkipService(svcName, service) { + return false + } + syncRequired := false + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + if existingPorts.Has(servicePort.Name) { + continue + } + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + info, exists := (*sm)[serviceName] + if exists { + glog.V(1).Infof("Removing service %q", serviceName) + if info.protocol == api.ProtocolUDP { + staleServices.Insert(info.clusterIP.String()) + } + delete(*sm, serviceName) + syncRequired = true + } else { + glog.Errorf("Service %q removed, but doesn't exists", serviceName) + } + } + return syncRequired +} + +// is updated by this function (based on the given changes). +// map is cleared after applying them. +func updateServiceMap( + serviceMap proxyServiceMap, + changes *serviceChangeMap) (syncRequired bool, hcServices map[types.NamespacedName]uint16, staleServices sets.String) { + syncRequired = false + staleServices = sets.NewString() + + for _, change := range *changes { + mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current) + unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices) + syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired + } + *changes = make(serviceChangeMap) + + // TODO: If this will appear to be computationally expensive, consider + // computing this incrementally similarly to serviceMap. + hcServices = make(map[types.NamespacedName]uint16) + for svcPort, info := range serviceMap { + if info.onlyNodeLocalEndpoints { + hcServices[svcPort.NamespacedName] = uint16(info.healthCheckNodePort) + } + } + for nsn, port := range hcServices { + if port == 0 { + glog.Errorf("Service %q has no healthcheck nodeport", nsn) + delete(hcServices, nsn) + } + } + + return syncRequired, hcServices, staleServices +} + func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} @@ -849,10 +918,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // Figure out the new services we need to activate. - newServices, hcServices, staleServices := buildNewServiceMap(proxier.allServices, proxier.serviceMap) + serviceSyncRequired, hcServices, staleServices := updateServiceMap( + proxier.serviceMap, &proxier.serviceChanges) // If this was called because of a services update, but nothing actionable has changed, skip it. - if reason == syncReasonServices && reflect.DeepEqual(newServices, proxier.serviceMap) { + if reason == syncReasonServices && !serviceSyncRequired { glog.V(3).Infof("Skipping iptables sync because nothing changed") return } @@ -996,7 +1066,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { replacementPortsMap := map[localPort]closeable{} // Build rules for each service. - for svcName, svcInfo := range newServices { + for svcName, svcInfo := range proxier.serviceMap { protocol := strings.ToLower(string(svcInfo.protocol)) // Precompute svcNameString; with many services the many calls // to ServicePortName.String() show up in CPU profiles. @@ -1437,8 +1507,6 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // Finish housekeeping. - proxier.serviceMap = newServices - // TODO: these and clearUDPConntrackForPort() could be made more consistent. utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) proxier.deleteEndpointConnections(staleEndpoints) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index a419f1c6d6..5d15de681e 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -386,12 +386,11 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { return &Proxier{ exec: &exec.FakeExec{}, serviceMap: make(proxyServiceMap), + serviceChanges: make(serviceChangeMap), endpointsMap: make(proxyEndpointsMap), endpointsChanges: make(endpointsChangeMap), iptables: ipt, clusterCIDR: "10.0.0.0/24", - allServices: make(serviceMap), - servicesSynced: true, hostname: testHostname, portsMap: make(map[localPort]closeable), portMapper: &fakePortOpener{[]*localPort{}}, @@ -569,7 +568,7 @@ func TestClusterIPReject(t *testing.T) { Port: "p80", } - fp.allServices = makeServiceMap( + makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) { svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []api.ServicePort{{ @@ -603,7 +602,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { Port: "p80", } - fp.allServices = makeServiceMap( + makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.ClusterIP = svcIP svc.Spec.Ports = []api.ServicePort{{ @@ -662,7 +661,7 @@ func TestLoadBalancer(t *testing.T) { Port: "p80", } - fp.allServices = makeServiceMap( + makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP @@ -722,7 +721,7 @@ func TestNodePort(t *testing.T) { Port: "p80", } - fp.allServices = makeServiceMap( + makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -772,7 +771,7 @@ func TestNodePortReject(t *testing.T) { Port: "p80", } - fp.allServices = makeServiceMap( + makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -810,7 +809,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { Port: "p80", } - fp.allServices = makeServiceMap( + makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "LoadBalancer" svc.Spec.ClusterIP = svcIP @@ -904,7 +903,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable Port: "p80", } - fp.allServices = makeServiceMap( + makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { svc.Spec.Type = "NodePort" svc.Spec.ClusterIP = svcIP @@ -996,7 +995,10 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po } func TestBuildServiceMapAddRemove(t *testing.T) { - services := makeServiceMap( + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + + services := []*api.Service{ makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = "172.16.55.4" @@ -1037,11 +1039,14 @@ func TestBuildServiceMapAddRemove(t *testing.T) { }, } }), - ) + } - serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) - if len(serviceMap) != 8 { - t.Errorf("expected service map length 8, got %v", serviceMap) + for i := range services { + fp.OnServiceAdd(services[i]) + } + _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + if len(fp.serviceMap) != 8 { + t.Errorf("expected service map length 8, got %v", fp.serviceMap) } // The only-local-loadbalancer ones get added @@ -1060,16 +1065,25 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } // Remove some stuff - oneService := services[makeNSN("somewhere-else", "cluster-ip")] - oneService.Spec.Ports = []api.ServicePort{oneService.Spec.Ports[1]} - services = makeServiceMap(oneService) - serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap) - if len(serviceMap) != 1 { - t.Errorf("expected service map length 1, got %v", serviceMap) + // oneService is a modification of services[0] with removed first port. + oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.16.55.4" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0) + }) + + fp.OnServiceUpdate(services[0], oneService) + fp.OnServiceDelete(services[1]) + fp.OnServiceDelete(services[2]) + fp.OnServiceDelete(services[3]) + + _, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + if len(fp.serviceMap) != 1 { + t.Errorf("expected service map length 1, got %v", fp.serviceMap) } if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) + t.Errorf("expected 0 healthcheck ports, got %v", hcPorts) } // All services but one were deleted. While you'd expect only the ClusterIPs @@ -1087,7 +1101,10 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } func TestBuildServiceMapServiceHeadless(t *testing.T) { - services := makeServiceMap( + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + + makeServiceMap(fp, makeTestService("somewhere-else", "headless", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = api.ClusterIPNone @@ -1096,9 +1113,9 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { ) // Headless service should be ignored - serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) - if len(serviceMap) != 0 { - t.Errorf("expected service map length 0, got %d", len(serviceMap)) + _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + if len(fp.serviceMap) != 0 { + t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) } // No proxied services, so no healthchecks @@ -1112,7 +1129,10 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { - services := makeServiceMap( + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + + makeServiceMap(fp, makeTestService("somewhere-else", "external-name", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeExternalName svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored @@ -1121,9 +1141,9 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), ) - serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) - if len(serviceMap) != 0 { - t.Errorf("expected service map length 0, got %v", serviceMap) + _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + if len(fp.serviceMap) != 0 { + t.Errorf("expected service map length 0, got %v", fp.serviceMap) } // No proxied services, so no healthchecks if len(hcPorts) != 0 { @@ -1135,37 +1155,40 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { } func TestBuildServiceMapServiceUpdate(t *testing.T) { - first := makeServiceMap( - makeTestService("somewhere", "some-service", func(svc *api.Service) { - svc.Spec.Type = api.ServiceTypeClusterIP - svc.Spec.ClusterIP = "172.16.55.4" - svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) - svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0) - }), - ) + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) - second := makeServiceMap( - makeTestService("somewhere", "some-service", func(svc *api.Service) { - svc.ObjectMeta.Annotations = map[string]string{ - service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, - service.BetaAnnotationHealthCheckNodePort: "345", - } - svc.Spec.Type = api.ServiceTypeLoadBalancer - svc.Spec.ClusterIP = "172.16.55.4" - svc.Spec.LoadBalancerIP = "5.6.7.8" - svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002) - svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003) - svc.Status.LoadBalancer = api.LoadBalancerStatus{ - Ingress: []api.LoadBalancerIngress{ - {IP: "10.1.2.3"}, - }, - } - }), - ) + servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeClusterIP + svc.Spec.ClusterIP = "172.16.55.4" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0) + }) + servicev2 := makeTestService("somewhere", "some-service", func(svc *api.Service) { + svc.ObjectMeta.Annotations = map[string]string{ + service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + service.BetaAnnotationHealthCheckNodePort: "345", + } + svc.Spec.Type = api.ServiceTypeLoadBalancer + svc.Spec.ClusterIP = "172.16.55.4" + svc.Spec.LoadBalancerIP = "5.6.7.8" + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002) + svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003) + svc.Status.LoadBalancer = api.LoadBalancerStatus{ + Ingress: []api.LoadBalancerIngress{ + {IP: "10.1.2.3"}, + }, + } + }) - serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) - if len(serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", serviceMap) + fp.OnServiceAdd(servicev1) + + syncRequired, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + if !syncRequired { + t.Errorf("expected sync required, got %t", syncRequired) + } + if len(fp.serviceMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.serviceMap) } if len(hcPorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) @@ -1176,9 +1199,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } // Change service to load-balancer - serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap) - if len(serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", serviceMap) + fp.OnServiceUpdate(servicev1, servicev2) + syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + if !syncRequired { + t.Errorf("expected sync required, got %t", syncRequired) + } + if len(fp.serviceMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.serviceMap) } if len(hcPorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) @@ -1189,9 +1216,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes - serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap) - if len(serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", serviceMap) + fp.OnServiceUpdate(servicev2, servicev2) + syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + if syncRequired { + t.Errorf("not expected sync required, got %t", syncRequired) + } + if len(fp.serviceMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.serviceMap) } if len(hcPorts) != 1 { t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) @@ -1201,9 +1232,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } // And back to ClusterIP - serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(first, serviceMap) - if len(serviceMap) != 2 { - t.Errorf("expected service map length 2, got %v", serviceMap) + fp.OnServiceUpdate(servicev2, servicev1) + syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + if !syncRequired { + t.Errorf("expected sync required, got %t", syncRequired) + } + if len(fp.serviceMap) != 2 { + t.Errorf("expected service map length 2, got %v", fp.serviceMap) } if len(hcPorts) != 0 { t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) @@ -1509,13 +1544,14 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName { } } -func makeServiceMap(allServices ...*api.Service) serviceMap { - result := make(serviceMap) - for _, service := range allServices { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - result[namespacedName] = service +func makeServiceMap(proxier *Proxier, allServices ...*api.Service) { + for i := range allServices { + proxier.OnServiceAdd(allServices[i]) } - return result + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.servicesSynced = true } func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.ServicePortName][]*endpointsInfo) {