diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 6c4ad35724..07316a5f6c 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -28,6 +28,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/golang/glog" @@ -35,9 +36,9 @@ import ( clientv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/helper" apiservice "k8s.io/kubernetes/pkg/api/service" @@ -45,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/async" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -105,8 +107,8 @@ type Proxier struct { // with some partial data after kube-proxy restart. endpointsSynced bool servicesSynced bool - - throttle flowcontrol.RateLimiter + initialized int32 + syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. syncPeriod time.Duration @@ -242,23 +244,14 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps - var throttle flowcontrol.RateLimiter - // Defaulting back to not limit sync rate when minSyncPeriod is 0. - if minSyncPeriod != 0 { - syncsPerSecond := float32(time.Second) / float32(minSyncPeriod) - // The average use case will process 2 updates in short succession - throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2) - } - - return &Proxier{ + proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(), + endpointsChanges: newEndpointsChangeMap(hostname), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, - throttle: throttle, iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -276,7 +269,11 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, iptablesData: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), - }, nil + } + burstSyncs := 2 + glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + return proxier, nil } type proxyServiceMap map[proxy.ServicePortName]*serviceInfo @@ -294,37 +291,41 @@ type serviceInfo struct { loadBalancerSourceRanges []string onlyNodeLocalEndpoints bool healthCheckNodePort int + // The following fields are computed and stored for performance reasons. + serviceNameString string } // is updated by this function (based on the given changes). // map is cleared after applying them. func updateServiceMap( serviceMap proxyServiceMap, - changes *serviceChangeMap) (syncRequired bool, hcServices map[types.NamespacedName]uint16, staleServices sets.String) { - syncRequired = false - staleServices = sets.NewString() + changes *serviceChangeMap) (result updateServiceMapResult) { + result.staleServices = sets.NewString() - for _, change := range changes.items { - mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current) - unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices) - syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired - } - changes.items = make(map[types.NamespacedName]*serviceChange) + func() { + changes.lock.Lock() + defer changes.lock.Unlock() + for _, change := range changes.items { + existingPorts := serviceMap.merge(change.current) + serviceMap.unmerge(change.previous, existingPorts, result.staleServices) + } + changes.items = make(map[types.NamespacedName]*serviceChange) + }() // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to serviceMap. - hcServices = make(map[types.NamespacedName]uint16) - for svcPort, info := range serviceMap { + result.hcServices = make(map[types.NamespacedName]uint16) + for svcPortName, info := range serviceMap { if info.healthCheckNodePort != 0 { - hcServices[svcPort.NamespacedName] = uint16(info.healthCheckNodePort) + result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) } } - return syncRequired, hcServices, staleServices + return result } // returns a new serviceInfo struct -func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { +func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { onlyNodeLocalEndpoints := false if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && apiservice.RequestsOnlyLocalTraffic(service) { @@ -347,90 +348,77 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, } + copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) copy(info.externalIPs, service.Spec.ExternalIPs) if apiservice.NeedsHealthCheck(service) { p := service.Spec.HealthCheckNodePort if p == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", serviceName) + glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String()) } else { info.healthCheckNodePort = int(p) } } + // Store the following for performance reasons. + info.serviceNameString = svcPortName.String() + return info } -func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) { - if service == nil { - return false, nil - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if utilproxy.ShouldSkipService(svcName, service) { - return false, nil - } - syncRequired := false +func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { existingPorts := sets.NewString() - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - existingPorts.Insert(servicePort.Name) - info := newServiceInfo(serviceName, servicePort, service) - oldInfo, exists := (*sm)[serviceName] - equal := reflect.DeepEqual(info, oldInfo) - if exists { - glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) - } else if !equal { - glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) - } - if !equal { - (*sm)[serviceName] = info - syncRequired = true + for svcPortName, info := range other { + existingPorts.Insert(svcPortName.Port) + _, exists := (*sm)[svcPortName] + if !exists { + glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) + } else { + glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol) } + (*sm)[svcPortName] = info } - return syncRequired, existingPorts + return existingPorts } -// are modified by this function with detected stale services. -func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool { - if service == nil { - return false - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if utilproxy.ShouldSkipService(svcName, service) { - return false - } - syncRequired := false - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - if existingPorts.Has(servicePort.Name) { +func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { + for svcPortName := range other { + if existingPorts.Has(svcPortName.Port) { continue } - serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - info, exists := (*sm)[serviceName] + info, exists := (*sm)[svcPortName] if exists { - glog.V(1).Infof("Removing service %q", serviceName) + glog.V(1).Infof("Removing service port %q", svcPortName) if info.protocol == api.ProtocolUDP { staleServices.Insert(info.clusterIP.String()) } - delete(*sm, serviceName) - syncRequired = true + delete(*sm, svcPortName) } else { - glog.Errorf("Service %q removed, but doesn't exists", serviceName) + glog.Errorf("Service port %q removed, but doesn't exists", svcPortName) } } - return syncRequired } type serviceChangeMap struct { - sync.Mutex + lock sync.Mutex items map[types.NamespacedName]*serviceChange } type serviceChange struct { - previous *api.Service - current *api.Service + previous proxyServiceMap + current proxyServiceMap +} + +type updateEndpointMapResult struct { + hcEndpoints map[types.NamespacedName]int + staleEndpoints map[endpointServicePair]bool + staleServiceNames map[proxy.ServicePortName]bool +} + +type updateServiceMapResult struct { + hcServices map[types.NamespacedName]uint16 + staleServices sets.String } func newServiceChangeMap() serviceChangeMap { @@ -439,17 +427,42 @@ func newServiceChangeMap() serviceChangeMap { } } -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) { - scm.Lock() - defer scm.Unlock() +func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool { + scm.lock.Lock() + defer scm.lock.Unlock() change, exists := scm.items[*namespacedName] if !exists { change = &serviceChange{} - change.previous = previous + change.previous = serviceToServiceMap(previous) scm.items[*namespacedName] = change } - change.current = current + change.current = serviceToServiceMap(current) + if reflect.DeepEqual(change.previous, change.current) { + delete(scm.items, *namespacedName) + } + return len(scm.items) > 0 +} + +// Translates single Service object to proxyServiceMap. +// +// NOTE: service object should NOT be modified. +func serviceToServiceMap(service *api.Service) proxyServiceMap { + if service == nil { + return nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if utilproxy.ShouldSkipService(svcName, service) { + return nil + } + + serviceMap := make(proxyServiceMap) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service) + } + return serviceMap } // internal struct for endpoints information @@ -462,6 +475,14 @@ func (e *endpointsInfo) String() string { return fmt.Sprintf("%v", *e) } +// IPPart returns just the IP part of the endpoint. +func (e *endpointsInfo) IPPart() string { + if index := strings.Index(e.endpoint, ":"); index != -1 { + return e.endpoint[0:index] + } + return e.endpoint +} + type endpointServicePair struct { endpoint string servicePortName proxy.ServicePortName @@ -470,33 +491,40 @@ type endpointServicePair struct { type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo type endpointsChange struct { - previous *api.Endpoints - current *api.Endpoints + previous proxyEndpointsMap + current proxyEndpointsMap } type endpointsChangeMap struct { - sync.Mutex - items map[types.NamespacedName]*endpointsChange + lock sync.Mutex + hostname string + items map[types.NamespacedName]*endpointsChange } -// are modified by this function with detected stale -// connections. -func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) { - for svcPort, epList := range oldEndpointsMap { +// and are modified by this function with detected stale connections. +func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { + for svcPortName, epList := range oldEndpointsMap { for _, ep := range epList { stale := true - for i := range newEndpointsMap[svcPort] { - if *newEndpointsMap[svcPort][i] == *ep { + for i := range newEndpointsMap[svcPortName] { + if *newEndpointsMap[svcPortName][i] == *ep { stale = false break } } if stale { - glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint) - staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true + glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint) + staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true } } } + + for svcPortName, epList := range newEndpointsMap { + // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. + if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { + staleServiceNames[svcPortName] = true + } + } } // is updated by this function (based on the given changes). @@ -504,20 +532,20 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, func updateEndpointsMap( endpointsMap proxyEndpointsMap, changes *endpointsChangeMap, - hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) { - syncRequired = false - staleSet = make(map[endpointServicePair]bool) - for _, change := range changes.items { - oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname) - newEndpointsMap := endpointsToEndpointsMap(change.current, hostname) - if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) { - endpointsMap.unmerge(oldEndpointsMap) - endpointsMap.merge(newEndpointsMap) - detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet) - syncRequired = true + hostname string) (result updateEndpointMapResult) { + result.staleEndpoints = make(map[endpointServicePair]bool) + result.staleServiceNames = make(map[proxy.ServicePortName]bool) + + func() { + changes.lock.Lock() + defer changes.lock.Unlock() + for _, change := range changes.items { + endpointsMap.unmerge(change.previous) + endpointsMap.merge(change.current) + detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames) } - } - changes.items = make(map[types.NamespacedName]*endpointsChange) + changes.items = make(map[types.NamespacedName]*endpointsChange) + }() if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { return @@ -525,13 +553,13 @@ func updateEndpointsMap( // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to endpointsMap. - hcEndpoints = make(map[types.NamespacedName]int) + result.hcEndpoints = make(map[types.NamespacedName]int) localIPs := getLocalIPs(endpointsMap) for nsn, ips := range localIPs { - hcEndpoints[nsn] = len(ips) + result.hcEndpoints[nsn] = len(ips) } - return syncRequired, hcEndpoints, staleSet + return result } // Translates single Endpoints object to proxyEndpointsMap. @@ -582,23 +610,28 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd return endpointsMap } -func newEndpointsChangeMap() endpointsChangeMap { +func newEndpointsChangeMap(hostname string) endpointsChangeMap { return endpointsChangeMap{ - items: make(map[types.NamespacedName]*endpointsChange), + hostname: hostname, + items: make(map[types.NamespacedName]*endpointsChange), } } -func (ecm *endpointsChangeMap) Update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) { - ecm.Lock() - defer ecm.Unlock() +func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool { + ecm.lock.Lock() + defer ecm.lock.Unlock() change, exists := ecm.items[*namespacedName] if !exists { change = &endpointsChange{} - change.previous = previous + change.previous = endpointsToEndpointsMap(previous, ecm.hostname) ecm.items[*namespacedName] = change } - change.current = current + change.current = endpointsToEndpointsMap(current, ecm.hostname) + if reflect.DeepEqual(change.previous, change.current) { + delete(ecm.items, *namespacedName) + } + return len(ecm.items) > 0 } func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { @@ -726,81 +759,89 @@ func CleanupLeftovers(execer utilexec.Interface, ipvs utilipvs.Interface, ipt ut return encounteredError } -// Sync is called to immediately synchronize the proxier state to iptables +// Sync is called to synchronize the proxier state to iptables and ipvs as soon as possible. func (proxier *Proxier) Sync() { - proxier.syncProxyRules(syncReasonForce) + proxier.syncRunner.Run() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { - t := time.NewTicker(proxier.syncPeriod) - defer t.Stop() // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } - for { - <-t.C - glog.V(6).Infof("Periodic sync") - proxier.Sync() + proxier.syncRunner.Loop(wait.NeverStop) +} + +func (proxier *Proxier) setInitialized(value bool) { + var initialized int32 + if value { + initialized = 1 } + atomic.StoreInt32(&proxier.initialized, initialized) +} + +func (proxier *Proxier) isInitialized() bool { + return atomic.LoadInt32(&proxier.initialized) > 0 } // OnServiceAdd is called whenever creation of new service object is observed. func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, nil, service) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { + proxier.syncRunner.Run() + } } // OnServiceUpdate is called whenever modification of an existing service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, oldService, service) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { + proxier.syncRunner.Run() + } } // OnServiceDelete is called whenever deletion of an existing service object is observed. func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - proxier.serviceChanges.update(&namespacedName, service, nil) - - proxier.syncProxyRules(syncReasonServices) + if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { + proxier.syncRunner.Run() + } } // OnServiceSynced is called once all the initial even handlers were called and the state is fully propagated to local cache. func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true + proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.mu.Unlock() - proxier.syncProxyRules(syncReasonServices) + // Sync unconditionally - this is called once per lifetime. + proxier.syncProxyRules() } // OnEndpointsAdd is called whenever creation of new endpoints object is observed. func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.Update(&namespacedName, nil, endpoints) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { + proxier.syncRunner.Run() + } } // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.Update(&namespacedName, oldEndpoints, endpoints) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { + proxier.syncRunner.Run() + } } // OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed. func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.Update(&namespacedName, endpoints, nil) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { + proxier.syncRunner.Run() + } } // OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache. @@ -809,7 +850,7 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.endpointsSynced = true proxier.mu.Unlock() - proxier.syncProxyRules(syncReasonEndpoints) + proxier.syncProxyRules() } type syncReason string @@ -820,16 +861,13 @@ const syncReasonForce syncReason = "Force" // This is where all of the ipvs calls happen. // assumes proxier.mu is held -func (proxier *Proxier) syncProxyRules(reason syncReason) { +func (proxier *Proxier) syncProxyRules() { proxier.mu.Lock() defer proxier.mu.Unlock() - if proxier.throttle != nil { - proxier.throttle.Accept() - } start := time.Now() defer func() { - glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start)) + glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() // don't sync rules till we've received services and endpoints if !proxier.endpointsSynced || !proxier.servicesSynced { @@ -837,27 +875,21 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { return } - // Figure out the new services we need to activate. - proxier.serviceChanges.Lock() - serviceSyncRequired, hcServices, staleServices := updateServiceMap( + // We assume that if this was called, we really want to sync them, + // even if nothing changed in the meantime. In other words, callers are + // responsible for detecting no-op changes and not calling this function. + serviceUpdateResult := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) - proxier.serviceChanges.Unlock() - - // If this was called because of a services update, but nothing actionable has changed, skip it. - if reason == syncReasonServices && !serviceSyncRequired { - glog.V(3).Infof("Skipping ipvs sync because nothing changed") - return - } - - proxier.endpointsChanges.Lock() - endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap( + endpointUpdateResult := updateEndpointsMap( proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) - proxier.endpointsChanges.Unlock() - // If this was called because of an endpoints update, but nothing actionable has changed, skip it. - if reason == syncReasonEndpoints && !endpointsSyncRequired { - glog.V(3).Infof("Skipping ipvs sync because nothing changed") - return + staleServices := serviceUpdateResult.staleServices + // merge stale services gathered from updateEndpointsMap + for svcPortName := range endpointUpdateResult.staleServiceNames { + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { + glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) + staleServices.Insert(svcInfo.clusterIP.String()) + } } glog.V(3).Infof("Syncing ipvs Proxier rules") @@ -1166,13 +1198,14 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. - natLines := append(proxier.natChains.Bytes(), proxier.natRules.Bytes()...) - lines := natLines + proxier.iptablesData.Reset() + proxier.iptablesData.Write(proxier.natChains.Bytes()) + proxier.iptablesData.Write(proxier.natRules.Bytes()) - glog.V(3).Infof("Restoring iptables rules: %s", lines) - err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) + err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { - glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines) + glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes()) // Revert new local ports. utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return @@ -1197,18 +1230,18 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices) - // Update healthz timestamp if it is periodic sync. - if proxier.healthzServer != nil && reason == syncReasonForce { + // Update healthz timestamp + if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } // Update healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the healthChecker // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(hcServices); err != nil { + if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { glog.Errorf("Error syncing healtcheck services: %v", err) } - if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil { + if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { glog.Errorf("Error syncing healthcheck endpoints: %v", err) } @@ -1219,7 +1252,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } } - proxier.deleteEndpointConnections(staleEndpoints) + proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) } // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we @@ -1408,7 +1441,15 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables. // Join all words with spaces, terminate with newline and write to buff. func writeLine(buf *bytes.Buffer, words ...string) { - buf.WriteString(strings.Join(words, " ") + "\n") + // We avoid strings.Join for performance reasons. + for i := range words { + buf.WriteString(words[i]) + if i < len(words)-1 { + buf.WriteByte(' ') + } else { + buf.WriteByte('\n') + } + } } func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { @@ -1420,8 +1461,7 @@ func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.S if localIPs[nsn] == nil { localIPs[nsn] = sets.NewString() } - ip := strings.Split(ep.endpoint, ":")[0] // just the IP part - localIPs[nsn].Insert(ip) + localIPs[nsn].Insert(ep.IPPart()) // just the IP part } } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index b02c6eae7e..27d1a0a228 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -103,7 +103,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(), + endpointsChanges: newEndpointsChangeMap(testHostname), iptables: ipt, ipvs: ipvs, clusterCIDR: "10.0.0.0/24", @@ -208,7 +208,7 @@ func TestNodePort(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() // Check ipvs service and destinations services, err := ipvs.GetVirtualServers() @@ -266,7 +266,7 @@ func TestNodePortNoEndpoint(t *testing.T) { ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() // Check ipvs service and destinations services, err := ipvs.GetVirtualServers() @@ -314,7 +314,7 @@ func TestClusterIPNoEndpoint(t *testing.T) { }), ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() @@ -372,7 +372,7 @@ func TestClusterIP(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() @@ -423,7 +423,7 @@ func TestExternalIPsNoEndpoint(t *testing.T) { makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() @@ -490,7 +490,7 @@ func TestExternalIPs(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() @@ -562,7 +562,7 @@ func TestLoadBalancer(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() } func strPtr(s string) *string { @@ -616,7 +616,7 @@ func TestOnlyLocalNodePorts(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() // Expect 2 services and 1 destination services, err := ipvs.GetVirtualServers() @@ -700,7 +700,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }), ) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() } func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort { @@ -763,24 +763,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.OnServiceAdd(services[i]) } - _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 8 { t.Errorf("expected service map length 8, got %v", fp.serviceMap) } // The only-local-loadbalancer ones get added - if len(hcPorts) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", hcPorts) + if len(result.hcServices) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", result.hcServices) } else { nsn := makeNSN("somewhere", "only-local-load-balancer") - if port, found := hcPorts[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, hcPorts) + if port, found := result.hcServices[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.hcServices) } } - if len(staleUDPServices) != 0 { + if len(result.staleServices) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) } // Remove some stuff @@ -796,24 +796,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) - _, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.serviceMap) } - if len(hcPorts) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", hcPorts) + if len(result.hcServices) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", result.hcServices) } // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} - if len(staleUDPServices) != len(expectedStaleUDPServices) { - t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), staleUDPServices.List()) + if len(result.staleServices) != len(expectedStaleUDPServices) { + t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.staleServices.List()) } for _, ip := range expectedStaleUDPServices { - if !staleUDPServices.Has(ip) { + if !result.staleServices.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } @@ -830,21 +830,25 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { svc.Spec.ClusterIP = api.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) }), + makeTestService("somewhere-else", "headless-without-port", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeClusterIP + svc.Spec.ClusterIP = api.ClusterIPNone + }), ) // Headless service should be ignored - _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) } // No proxied services, so no healthchecks - if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(hcPorts)) + if len(result.hcServices) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(result.hcServices)) } - if len(staleUDPServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + if len(result.staleServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) } } @@ -862,16 +866,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), ) - _, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.serviceMap) } // No proxied services, so no healthchecks - if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) + if len(result.hcServices) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices) + if len(result.staleServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.staleServices) } } @@ -903,69 +907,57 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.OnServiceAdd(servicev1) - syncRequired, hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges) - if !syncRequired { - t.Errorf("expected sync required, got %t", syncRequired) - } + result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) + if len(result.hcServices) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { + if len(result.staleServices) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) } // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) - syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) - if !syncRequired { - t.Errorf("expected sync required, got %t", syncRequired) - } + result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(hcPorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) + if len(result.hcServices) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) + if len(result.staleServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List()) } // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) - syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) - if syncRequired { - t.Errorf("not expected sync required, got %t", syncRequired) - } + result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(hcPorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) + if len(result.hcServices) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) + if len(result.staleServices) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List()) } // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) - syncRequired, hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges) - if !syncRequired { - t.Errorf("expected sync required, got %t", syncRequired) - } + result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(hcPorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) + if len(result.hcServices) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) } - if len(staleUDPServices) != 0 { + if len(result.staleServices) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) } } @@ -1005,7 +997,7 @@ func TestSessionAffinity(t *testing.T) { ) makeEndpointsMap(fp) - fp.syncProxyRules(syncReasonForce) + fp.syncProxyRules() // check ipvs service and destinations services, err := ipvs.GetVirtualServers() @@ -1027,8 +1019,11 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName { } func Test_updateEndpointsMap(t *testing.T) { - var nodeName = "host" + var nodeName = testHostname + emptyEndpoint := func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{} + } unnamedPort := func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1333,18 +1328,20 @@ func Test_updateEndpointsMap(t *testing.T) { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - previousEndpoints []*api.Endpoints - currentEndpoints []*api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedResult map[proxy.ServicePortName][]*endpointsInfo - expectedStale []endpointServicePair - expectedHealthchecks map[types.NamespacedName]int + previousEndpoints []*api.Endpoints + currentEndpoints []*api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedResult map[proxy.ServicePortName][]*endpointsInfo + expectedStaleEndpoints []endpointServicePair + expectedStaleServiceNames map[proxy.ServicePortName]bool + expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[1]: no change, unnamed port previousEndpoints: []*api.Endpoints{ @@ -1355,16 +1352,17 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[2]: no change, named port, local previousEndpoints: []*api.Endpoints{ @@ -1375,15 +1373,16 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", true}, + {endpoint: "1.1.1.1:11", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", true}, + {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -1397,22 +1396,23 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.2:12", false}, + {endpoint: "1.1.1.2:12", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.2:12", false}, + {endpoint: "1.1.1.2:12", isLocal: false}, }, }, - expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[4]: no change, multiple subsets, multiple ports, local previousEndpoints: []*api.Endpoints{ @@ -1423,32 +1423,33 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", true}, + {endpoint: "1.1.1.1:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", true}, + {endpoint: "1.1.1.1:12", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {"1.1.1.3:13", false}, + {endpoint: "1.1.1.3:13", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", true}, + {endpoint: "1.1.1.1:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", true}, + {endpoint: "1.1.1.1:12", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {"1.1.1.3:13", false}, + {endpoint: "1.1.1.3:13", isLocal: false}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { - // Case[5]: no change, multiple Endpoints, subsets, IPs, and ports + // Case[5]: no change, multiple endpoints, subsets, IPs, and ports previousEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1), makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2), @@ -1459,57 +1460,58 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, - {"1.1.1.2:11", true}, + {endpoint: "1.1.1.1:11", isLocal: false}, + {endpoint: "1.1.1.2:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, - {"1.1.1.2:12", true}, + {endpoint: "1.1.1.1:12", isLocal: false}, + {endpoint: "1.1.1.2:12", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {"1.1.1.3:13", false}, - {"1.1.1.4:13", true}, + {endpoint: "1.1.1.3:13", isLocal: false}, + {endpoint: "1.1.1.4:13", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p14"): { - {"1.1.1.3:14", false}, - {"1.1.1.4:14", true}, + {endpoint: "1.1.1.3:14", isLocal: false}, + {endpoint: "1.1.1.4:14", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p21"): { - {"2.2.2.1:21", false}, - {"2.2.2.2:21", true}, + {endpoint: "2.2.2.1:21", isLocal: false}, + {endpoint: "2.2.2.2:21", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p22"): { - {"2.2.2.1:22", false}, - {"2.2.2.2:22", true}, + {endpoint: "2.2.2.1:22", isLocal: false}, + {endpoint: "2.2.2.2:22", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, - {"1.1.1.2:11", true}, + {endpoint: "1.1.1.1:11", isLocal: false}, + {endpoint: "1.1.1.2:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, - {"1.1.1.2:12", true}, + {endpoint: "1.1.1.1:12", isLocal: false}, + {endpoint: "1.1.1.2:12", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {"1.1.1.3:13", false}, - {"1.1.1.4:13", true}, + {endpoint: "1.1.1.3:13", isLocal: false}, + {endpoint: "1.1.1.4:13", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p14"): { - {"1.1.1.3:14", false}, - {"1.1.1.4:14", true}, + {endpoint: "1.1.1.3:14", isLocal: false}, + {endpoint: "1.1.1.4:14", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p21"): { - {"2.2.2.1:21", false}, - {"2.2.2.2:21", true}, + {endpoint: "2.2.2.1:21", isLocal: false}, + {endpoint: "2.2.2.2:21", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p22"): { - {"2.2.2.1:22", false}, - {"2.2.2.2:22", true}, + {endpoint: "2.2.2.1:22", isLocal: false}, + {endpoint: "2.2.2.2:22", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -1525,10 +1527,13 @@ func Test_updateEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", true}, + {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", ""): true, + }, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -1542,15 +1547,16 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {"1.1.1.1:11", true}, + {endpoint: "1.1.1.1:11", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", ""), }}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[8]: add an IP and port previousEndpoints: []*api.Endpoints{ @@ -1561,20 +1567,23 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, - {"1.1.1.2:11", true}, + {endpoint: "1.1.1.1:11", isLocal: false}, + {endpoint: "1.1.1.2:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, - {"1.1.1.2:12", true}, + {endpoint: "1.1.1.1:12", isLocal: false}, + {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", "p12"): true, + }, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -1588,20 +1597,20 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, - {"1.1.1.2:11", true}, + {endpoint: "1.1.1.1:11", isLocal: false}, + {endpoint: "1.1.1.2:11", isLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.1:12", false}, - {"1.1.1.2:12", true}, + {endpoint: "1.1.1.1:12", isLocal: false}, + {endpoint: "1.1.1.2:12", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.2:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }, { @@ -1611,7 +1620,8 @@ func Test_updateEndpointsMap(t *testing.T) { endpoint: "1.1.1.2:12", servicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[10]: add a subset previousEndpoints: []*api.Endpoints{ @@ -1622,18 +1632,21 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.2:12", true}, + {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedStale: []endpointServicePair{}, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", "p12"): true, + }, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -1647,22 +1660,23 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.2:12", false}, + {endpoint: "1.1.1.2:12", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.2:12", servicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[12]: rename a port previousEndpoints: []*api.Endpoints{ @@ -1673,18 +1687,21 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11-2"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", "p11-2"): true, + }, expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[13]: renumber a port @@ -1696,19 +1713,20 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:22", false}, + {endpoint: "1.1.1.1:22", isLocal: false}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[14]: complex add and remove previousEndpoints: []*api.Endpoints{ @@ -1725,42 +1743,42 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, }, makeServicePortName("ns2", "ep2", "p22"): { - {"2.2.2.2:22", true}, - {"2.2.2.22:22", true}, + {endpoint: "2.2.2.2:22", isLocal: true}, + {endpoint: "2.2.2.22:22", isLocal: true}, }, makeServicePortName("ns2", "ep2", "p23"): { - {"2.2.2.3:23", true}, + {endpoint: "2.2.2.3:23", isLocal: true}, }, makeServicePortName("ns4", "ep4", "p44"): { - {"4.4.4.4:44", true}, - {"4.4.4.5:44", true}, + {endpoint: "4.4.4.4:44", isLocal: true}, + {endpoint: "4.4.4.5:44", isLocal: true}, }, makeServicePortName("ns4", "ep4", "p45"): { - {"4.4.4.6:45", true}, + {endpoint: "4.4.4.6:45", isLocal: true}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {"1.1.1.1:11", false}, - {"1.1.1.11:11", false}, + {endpoint: "1.1.1.1:11", isLocal: false}, + {endpoint: "1.1.1.11:11", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {"1.1.1.2:12", false}, + {endpoint: "1.1.1.2:12", isLocal: false}, }, makeServicePortName("ns1", "ep1", "p122"): { - {"1.1.1.2:122", false}, + {endpoint: "1.1.1.2:122", isLocal: false}, }, makeServicePortName("ns3", "ep3", "p33"): { - {"3.3.3.3:33", false}, + {endpoint: "3.3.3.3:33", isLocal: false}, }, makeServicePortName("ns4", "ep4", "p44"): { - {"4.4.4.4:44", true}, + {endpoint: "4.4.4.4:44", isLocal: true}, }, }, - expectedStale: []endpointServicePair{{ + expectedStaleEndpoints: []endpointServicePair{{ endpoint: "2.2.2.2:22", servicePortName: makeServicePortName("ns2", "ep2", "p22"), }, { @@ -1776,10 +1794,35 @@ func Test_updateEndpointsMap(t *testing.T) { endpoint: "4.4.4.6:45", servicePortName: makeServicePortName("ns4", "ep4", "p45"), }}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", "p12"): true, + makeServicePortName("ns1", "ep1", "p122"): true, + makeServicePortName("ns3", "ep3", "p33"): true, + }, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, - }} + }, { + // Case[15]: change from 0 endpoint address to 1 unnamed port + previousEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", emptyEndpoint), + }, + currentEndpoints: []*api.Endpoints{ + makeTestEndpoints("ns1", "ep1", unnamedPort), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {endpoint: "1.1.1.1:11", isLocal: false}, + }, + }, + expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + makeServicePortName("ns1", "ep1", ""): true, + }, + expectedHealthchecks: map[types.NamespacedName]int{}, + }, + } for tci, tc := range testCases { ipt := iptablestest.NewFake() @@ -1787,7 +1830,7 @@ func Test_updateEndpointsMap(t *testing.T) { fp := NewFakeProxier(ipt, ipvs, nil) fp.hostname = nodeName - // First check that after adding all previous versions of Endpoints, + // First check that after adding all previous versions of endpoints, // the fp.oldEndpoints is as we expect. for i := range tc.previousEndpoints { if tc.previousEndpoints[i] != nil { @@ -1799,7 +1842,7 @@ func Test_updateEndpointsMap(t *testing.T) { // Now let's call appropriate handlers to get to state we want to be. if len(tc.previousEndpoints) != len(tc.currentEndpoints) { - t.Fatalf("[%d] different lengths of previous and current Endpoints", tci) + t.Fatalf("[%d] different lengths of previous and current endpoints", tci) continue } @@ -1814,19 +1857,27 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsUpdate(prev, curr) } } - _, hcEndpoints, stale := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + result := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) - if len(stale) != len(tc.expectedStale) { - t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(stale), stale) + if len(result.staleEndpoints) != len(tc.expectedStaleEndpoints) { + t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.staleEndpoints), result.staleEndpoints) } - for _, x := range tc.expectedStale { - if stale[x] != true { - t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, stale) + for _, x := range tc.expectedStaleEndpoints { + if result.staleEndpoints[x] != true { + t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.staleEndpoints) } } - if !reflect.DeepEqual(hcEndpoints, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, hcEndpoints) + if len(result.staleServiceNames) != len(tc.expectedStaleServiceNames) { + t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.staleServiceNames), result.staleServiceNames) + } + for svcName := range tc.expectedStaleServiceNames { + if result.staleServiceNames[svcName] != true { + t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.staleServiceNames) + } + } + if !reflect.DeepEqual(result.hcEndpoints, tc.expectedHealthchecks) { + t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.hcEndpoints) } } }