diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 6ce3199dd3..f46ed98e04 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -26,7 +26,6 @@ import ( "encoding/base32" "fmt" "net" - "reflect" "strconv" "strings" "sync" @@ -37,7 +36,6 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" apiservice "k8s.io/kubernetes/pkg/api/service" @@ -157,37 +155,8 @@ type serviceInfo struct { serviceLBChainName utiliptables.Chain } -// internal struct for endpoints information -type endpointsInfo struct { - endpoint string // TODO: should be an endpointString type - isLocal bool - // The following fields we lazily compute and store here for performance - // reasons. If the protocol is the same as you expect it to be, then the - // chainName can be reused, otherwise it should be recomputed. - protocol string - chainName utiliptables.Chain -} - -// IPPart returns just the IP part of the endpoint. -func (e *endpointsInfo) IPPart() string { - return utilproxy.IPPart(e.endpoint) -} - -// Returns the endpoint chain name for a given endpointsInfo. -func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain { - if e.protocol != protocol { - e.protocol = protocol - e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint) - } - return e.chainName -} - -func (e *endpointsInfo) String() string { - return fmt.Sprintf("%v", *e) -} - -// returns a new serviceInfo struct -func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo { +// returns a new proxy.ServicePort which abstracts a serviceInfo +func newServiceInfo(port *api.ServicePort, service *api.Service) proxy.ServicePort { onlyNodeLocalEndpoints := false if apiservice.RequestsOnlyLocalTraffic(service) { onlyNodeLocalEndpoints = true @@ -214,10 +183,13 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) copy(info.externalIPs, service.Spec.ExternalIPs) + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} + if apiservice.NeedsHealthCheck(service) { p := service.Spec.HealthCheckNodePort if p == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String()) + glog.Errorf("Service %q has no healthcheck nodeport", svcName.String()) } else { info.healthCheckNodePort = int(p) } @@ -233,134 +205,90 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se return info } -type endpointsChange struct { - previous proxyEndpointsMap - current proxyEndpointsMap +// ClusterIP is part of proxy.ServicePort interface. +func (info *serviceInfo) ClusterIP() string { + return info.clusterIP.String() } -type endpointsChangeMap struct { - lock sync.Mutex - hostname string - items map[types.NamespacedName]*endpointsChange +// Port is part of proxy.ServicePort interface. +func (info *serviceInfo) Port() int { + return info.port } -type serviceChange struct { - previous proxyServiceMap - current proxyServiceMap +// Protocol is part of proxy.ServicePort interface. +func (info *serviceInfo) Protocol() api.Protocol { + return info.protocol } -type serviceChangeMap struct { - lock sync.Mutex - items map[types.NamespacedName]*serviceChange +// String is part of proxy.ServicePort interface. +func (info *serviceInfo) String() string { + return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol) } -type updateEndpointMapResult struct { - hcEndpoints map[types.NamespacedName]int - staleEndpoints map[endpointServicePair]bool - staleServiceNames map[proxy.ServicePortName]bool +// HealthCheckNodePort is part of proxy.ServicePort interface. +func (info *serviceInfo) HealthCheckNodePort() int { + return info.healthCheckNodePort } -type updateServiceMapResult struct { - hcServices map[types.NamespacedName]uint16 - staleServices sets.String +var _ proxy.ServicePort = &serviceInfo{} + +// internal struct for endpoints information +type endpointsInfo struct { + endpoint string // TODO: should be an endpointString type + isLocal bool + // The following fields we lazily compute and store here for performance + // reasons. If the protocol is the same as you expect it to be, then the + // chainName can be reused, otherwise it should be recomputed. + protocol string + chainName utiliptables.Chain } -type proxyServiceMap map[proxy.ServicePortName]*serviceInfo -type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo - -func newEndpointsChangeMap(hostname string) endpointsChangeMap { - return endpointsChangeMap{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), +// returns a new proxy.Endpoint which abstracts a endpointsInfo +func newEndpointsInfo(IP string, port int, isLocal bool) proxy.Endpoint { + return &endpointsInfo{ + endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), + isLocal: isLocal, } } -func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool { - ecm.lock.Lock() - defer ecm.lock.Unlock() - - change, exists := ecm.items[*namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = endpointsToEndpointsMap(previous, ecm.hostname) - ecm.items[*namespacedName] = change - } - change.current = endpointsToEndpointsMap(current, ecm.hostname) - if reflect.DeepEqual(change.previous, change.current) { - delete(ecm.items, *namespacedName) - } - return len(ecm.items) > 0 +// IsLocal is part of proxy.Endpoint interface. +func (e *endpointsInfo) IsLocal() bool { + return e.isLocal } -func newServiceChangeMap() serviceChangeMap { - return serviceChangeMap{ - items: make(map[types.NamespacedName]*serviceChange), - } +// IP is part of proxy.Endpoint interface. +func (e *endpointsInfo) IP() string { + return utilproxy.IPPart(e.endpoint) } -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool { - scm.lock.Lock() - defer scm.lock.Unlock() - - change, exists := scm.items[*namespacedName] - if !exists { - change = &serviceChange{} - change.previous = serviceToServiceMap(previous) - scm.items[*namespacedName] = change +// Equal is part of proxy.Endpoint interface. +func (e *endpointsInfo) Equal(other proxy.Endpoint) bool { + o, ok := other.(*endpointsInfo) + if !ok { + glog.Errorf("Failed to cast endpointsInfo") + return false } - change.current = serviceToServiceMap(current) - if reflect.DeepEqual(change.previous, change.current) { - delete(scm.items, *namespacedName) - } - return len(scm.items) > 0 + return e.endpoint == o.endpoint && + e.isLocal == o.isLocal && + e.protocol == o.protocol && + e.chainName == o.chainName } -func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String { - existingPorts := sets.NewString() - for svcPortName, info := range other { - port := strconv.Itoa(info.port) - clusterIPPort := net.JoinHostPort(info.clusterIP.String(), port) - existingPorts.Insert(svcPortName.Port) - _, exists := (*sm)[svcPortName] - if !exists { - glog.V(1).Infof("Adding new service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol) - } else { - glog.V(1).Infof("Updating existing service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol) - } - (*sm)[svcPortName] = info - } - return existingPorts +// String is part of proxy.Endpoint interface. +func (e *endpointsInfo) String() string { + return e.endpoint } -func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) { - for svcPortName := range other { - if existingPorts.Has(svcPortName.Port) { - continue - } - info, exists := (*sm)[svcPortName] - if exists { - glog.V(1).Infof("Removing service port %q", svcPortName) - if info.protocol == api.ProtocolUDP { - staleServices.Insert(info.clusterIP.String()) - } - delete(*sm, svcPortName) - } else { - glog.Errorf("Service port %q removed, but doesn't exists", svcPortName) - } +// Returns the endpoint chain name for a given endpointsInfo. +func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain { + if e.protocol != protocol { + e.protocol = protocol + e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint) } + return e.chainName } -func (em proxyEndpointsMap) merge(other proxyEndpointsMap) { - for svcPortName := range other { - em[svcPortName] = other[svcPortName] - } -} - -func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { - for svcPortName := range other { - delete(em, svcPortName) - } -} +var _ proxy.Endpoint = &endpointsInfo{} // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. @@ -369,12 +297,12 @@ type Proxier struct { // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. - endpointsChanges endpointsChangeMap - serviceChanges serviceChangeMap + endpointsChanges *proxy.EndpointChangeTracker + serviceChanges *proxy.ServiceChangeTracker mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap - endpointsMap proxyEndpointsMap + serviceMap proxy.ServiceMap + endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating iptables @@ -469,10 +397,10 @@ func NewProxier(ipt utiliptables.Interface, proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname), iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -660,22 +588,19 @@ func (proxier *Proxier) isInitialized() bool { } func (proxier *Proxier) OnServiceAdd(service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { + if proxier.serviceChanges.Update(nil, service, newServiceInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { + if proxier.serviceChanges.Update(oldService, service, newServiceInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceDelete(service *api.Service) { - namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { + if proxier.serviceChanges.Update(service, nil, newServiceInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -690,52 +615,20 @@ func (proxier *Proxier) OnServiceSynced() { proxier.syncProxyRules() } -// is updated by this function (based on the given changes). -// map is cleared after applying them. -func updateServiceMap( - serviceMap proxyServiceMap, - changes *serviceChangeMap) (result updateServiceMapResult) { - result.staleServices = sets.NewString() - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - existingPorts := serviceMap.merge(change.current) - serviceMap.unmerge(change.previous, existingPorts, result.staleServices) - } - changes.items = make(map[types.NamespacedName]*serviceChange) - }() - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to serviceMap. - result.hcServices = make(map[types.NamespacedName]uint16) - for svcPortName, info := range serviceMap { - if info.healthCheckNodePort != 0 { - result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) - } - } - - return result -} - func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(nil, endpoints, newEndpointsInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(oldEndpoints, endpoints, newEndpointsInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(endpoints, nil, newEndpointsInfo) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -750,152 +643,6 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.syncProxyRules() } -// is updated by this function (based on the given changes). -// map is cleared after applying them. -func updateEndpointsMap( - endpointsMap proxyEndpointsMap, - changes *endpointsChangeMap, - hostname string) (result updateEndpointMapResult) { - result.staleEndpoints = make(map[endpointServicePair]bool) - result.staleServiceNames = make(map[proxy.ServicePortName]bool) - - func() { - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { - endpointsMap.unmerge(change.previous) - endpointsMap.merge(change.current) - detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames) - } - changes.items = make(map[types.NamespacedName]*endpointsChange) - }() - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to endpointsMap. - result.hcEndpoints = make(map[types.NamespacedName]int) - localIPs := getLocalIPs(endpointsMap) - for nsn, ips := range localIPs { - result.hcEndpoints[nsn] = len(ips) - } - - return result -} - -// and are modified by this function with detected stale connections. -func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { - for svcPortName, epList := range oldEndpointsMap { - for _, ep := range epList { - stale := true - for i := range newEndpointsMap[svcPortName] { - if *newEndpointsMap[svcPortName][i] == *ep { - stale = false - break - } - } - if stale { - glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint) - staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true - } - } - } - - for svcPortName, epList := range newEndpointsMap { - // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. - if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { - staleServiceNames[svcPortName] = true - } - } -} - -func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String { - localIPs := make(map[types.NamespacedName]sets.String) - for svcPortName := range endpointsMap { - for _, ep := range endpointsMap[svcPortName] { - if ep.isLocal { - // If the endpoint has a bad format, utilproxy.IPPart() will log an - // error and ep.IPPart() will return a null string. - if ip := ep.IPPart(); ip != "" { - nsn := svcPortName.NamespacedName - if localIPs[nsn] == nil { - localIPs[nsn] = sets.NewString() - } - localIPs[nsn].Insert(ip) - } - } - } - } - return localIPs -} - -// Translates single Endpoints object to proxyEndpointsMap. -// This function is used for incremental updated of endpointsMap. -// -// NOTE: endpoints object should NOT be modified. -func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap { - if endpoints == nil { - return nil - } - - endpointsMap := make(proxyEndpointsMap) - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - if port.Port == 0 { - glog.Warningf("ignoring invalid endpoint port %s", port.Name) - continue - } - svcPortName := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, - Port: port.Name, - } - for i := range ss.Addresses { - addr := &ss.Addresses[i] - if addr.IP == "" { - glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) - continue - } - epInfo := &endpointsInfo{ - endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), - isLocal: addr.NodeName != nil && *addr.NodeName == hostname, - } - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo) - } - if glog.V(3) { - newEPList := []string{} - for _, ep := range endpointsMap[svcPortName] { - newEPList = append(newEPList, ep.endpoint) - } - glog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList) - } - } - } - return endpointsMap -} - -// Translates single Service object to proxyServiceMap. -// -// NOTE: service object should NOT be modified. -func serviceToServiceMap(service *api.Service) proxyServiceMap { - if service == nil { - return nil - } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if utilproxy.ShouldSkipService(svcName, service) { - return nil - } - - serviceMap := make(proxyServiceMap) - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service) - } - return serviceMap -} - // portProtoHash takes the ServicePortName and protocol for a service // returns the associated 16 character hash. This is computed by hashing (sha256) // then encoding to base32 and truncating to 16 chars. We do this because IPTables @@ -936,25 +683,17 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) } -type endpointServicePair struct { - endpoint string - servicePortName proxy.ServicePortName -} - -func (esp *endpointServicePair) IPPart() string { - return utilproxy.IPPart(esp.endpoint) -} - // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held -func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) { - for epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { - endpointIP := utilproxy.IPPart(epSvcPair.endpoint) - err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.clusterIP.String(), endpointIP) +// TODO: move it to util +func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { + for _, epSvcPair := range connectionMap { + if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP { + endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) + err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP) if err != nil { - glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.servicePortName.String(), err) + glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } } } @@ -981,17 +720,15 @@ func (proxier *Proxier) syncProxyRules() { // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := updateServiceMap( - proxier.serviceMap, &proxier.serviceChanges) - endpointUpdateResult := updateEndpointsMap( - proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) + serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) + endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges) - staleServices := serviceUpdateResult.staleServices + staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap - for svcPortName := range endpointUpdateResult.staleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { - glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) - staleServices.Insert(svcInfo.clusterIP.String()) + for _, svcPortName := range endpointUpdateResult.StaleServiceNames { + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == api.ProtocolUDP { + glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP()) + staleServices.Insert(svcInfo.ClusterIP()) } } @@ -1164,7 +901,12 @@ func (proxier *Proxier) syncProxyRules() { // Build rules for each service. var svcNameString string - for svcName, svcInfo := range proxier.serviceMap { + for svcName, svc := range proxier.serviceMap { + svcInfo, ok := svc.(*serviceInfo) + if !ok { + glog.Errorf("Failed to cast serviceInfo %q", svcName.String()) + continue + } isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP) protocol := strings.ToLower(string(svcInfo.protocol)) svcNameString = svcInfo.serviceNameString @@ -1224,7 +966,7 @@ func (proxier *Proxier) syncProxyRules() { lp := utilproxy.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, - Port: svcInfo.port, + Port: svcInfo.Port(), Protocol: protocol, } if proxier.portsMap[lp] != nil { @@ -1448,8 +1190,13 @@ func (proxier *Proxier) syncProxyRules() { endpointChains = endpointChains[:0] var endpointChain utiliptables.Chain for _, ep := range proxier.endpointsMap[svcName] { - endpoints = append(endpoints, ep) - endpointChain = ep.endpointChain(svcNameString, protocol) + epInfo, ok := ep.(*endpointsInfo) + if !ok { + glog.Errorf("Failed to cast endpointsInfo %q", ep.String()) + continue + } + endpoints = append(endpoints, epInfo) + endpointChain = epInfo.endpointChain(svcNameString, protocol) endpointChains = append(endpointChains, endpointChain) // Create the endpoint chain, retaining counters if possible. @@ -1476,7 +1223,7 @@ func (proxier *Proxier) syncProxyRules() { // Now write loadbalancing & DNAT rules. n := len(endpointChains) for i, endpointChain := range endpointChains { - epIP := endpoints[i].IPPart() + epIP := endpoints[i].IP() if epIP == "" { // Error parsing this endpoint has been logged. Skip to next endpoint. continue @@ -1687,10 +1434,10 @@ func (proxier *Proxier) syncProxyRules() { // Update healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the healthChecker // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { + if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { glog.Errorf("Error syncing healtcheck services: %v", err) } - if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { + if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { glog.Errorf("Error syncing healthcheck endoints: %v", err) } @@ -1701,7 +1448,7 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) } } - proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) + proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } // Join all words with spaces, terminate with newline and write to buf. diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 6308d4cc0f..441a79f019 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -18,22 +18,19 @@ package iptables import ( "bytes" + "fmt" + "net" "reflect" "strconv" + "strings" "testing" "time" - "github.com/davecgh/go-spew/spew" "github.com/golang/glog" - "fmt" - "net" - "strings" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/sets" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/proxy" utilproxy "k8s.io/kubernetes/pkg/proxy/util" @@ -200,8 +197,8 @@ func TestDeleteEndpointConnections(t *testing.T) { svcIP string svcPort int protocol api.Protocol - endpoint string // IP:port endpoint - epSvcPair endpointServicePair // Will be generated by test + endpoint string // IP:port endpoint + epSvcPair proxy.ServiceEndpoint // Will be generated by test simulatedErr string }{ { @@ -253,16 +250,16 @@ func TestDeleteEndpointConnections(t *testing.T) { // Create a service map that has service info entries for all test cases // and generate an endpoint service pair for each test case - serviceMap := make(map[proxy.ServicePortName]*serviceInfo) + serviceMap := make(map[proxy.ServicePortName]proxy.ServicePort) for i, tc := range testCases { svc := proxy.ServicePortName{ NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName}, Port: "p80", } serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false) - testCases[i].epSvcPair = endpointServicePair{ - endpoint: tc.endpoint, - servicePortName: svc, + testCases[i].epSvcPair = proxy.ServiceEndpoint{ + Endpoint: tc.endpoint, + ServicePortName: svc, } } @@ -298,7 +295,7 @@ func TestDeleteEndpointConnections(t *testing.T) { priorExecs := fexec.CommandCalls priorGlogErrs := glog.Stats.Error.Lines() - input := map[endpointServicePair]bool{tc.epSvcPair: true} + input := []proxy.ServiceEndpoint{tc.epSvcPair} fakeProxier.deleteEndpointConnections(input) // For UDP connections, check the executed conntrack command @@ -391,10 +388,10 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // invocation into a Run() method. p := &Proxier{ exec: &fakeexec.FakeExec{}, - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(testHostname), + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname), iptables: ipt, clusterCIDR: "10.0.0.0/24", hostname: testHostname, @@ -720,7 +717,6 @@ func TestLoadBalancer(t *testing.T) { proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svcPortName.String(), proto)) svcChain := string(servicePortChainName(svcPortName.String(), proto)) - //lbChain := string(serviceLBChainName(svcPortName.String(), proto)) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) { @@ -1111,24 +1107,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) { for i := range services { fp.OnServiceAdd(services[i]) } - result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 8 { t.Errorf("expected service map length 8, got %v", fp.serviceMap) } // The only-local-loadbalancer ones get added - if len(result.hcServices) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts) } else { nsn := makeNSN("somewhere", "only-local-load-balancer") - if port, found := result.hcServices[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.hcServices) + if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts) } } - if len(result.staleServices) != 0 { + if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } // Remove some stuff @@ -1144,24 +1140,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.OnServiceDelete(services[2]) fp.OnServiceDelete(services[3]) - result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.serviceMap) } - if len(result.hcServices) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts) } // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} - if len(result.staleServices) != len(expectedStaleUDPServices) { - t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.staleServices.UnsortedList()) + if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) { + t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList()) } for _, ip := range expectedStaleUDPServices { - if !result.staleServices.Has(ip) { + if !result.UDPStaleClusterIP.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } @@ -1184,18 +1180,18 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { ) // Headless service should be ignored - result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.serviceMap)) } // No proxied services, so no healthchecks - if len(result.hcServices) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(result.hcServices)) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts)) } - if len(result.staleServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) + if len(result.UDPStaleClusterIP) != 0 { + t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } } @@ -1212,16 +1208,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), ) - result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.serviceMap) } // No proxied services, so no healthchecks - if len(result.hcServices) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.staleServices) + if len(result.UDPStaleClusterIP) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) } } @@ -1252,328 +1248,57 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.OnServiceAdd(servicev1) - result := updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(result.hcServices) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { + if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) - result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(result.hcServices) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.UnsortedList()) + if len(result.UDPStaleClusterIP) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) - result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(result.hcServices) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.UnsortedList()) + if len(result.UDPStaleClusterIP) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) - result = updateServiceMap(fp.serviceMap, &fp.serviceChanges) + result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges) if len(fp.serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.serviceMap) } - if len(result.hcServices) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices) + if len(result.HCServiceNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) } - if len(result.staleServices) != 0 { + if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices)) - } -} - -func Test_getLocalIPs(t *testing.T) { - testCases := []struct { - endpointsMap map[proxy.ServicePortName][]*endpointsInfo - expected map[types.NamespacedName]sets.String - }{{ - // Case[0]: nothing - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{}, - expected: map[types.NamespacedName]sets.String{}, - }, { - // Case[1]: unnamed port - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, - }, - }, - expected: map[types.NamespacedName]sets.String{}, - }, { - // Case[2]: unnamed port local - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: true}, - }, - }, - expected: map[types.NamespacedName]sets.String{ - {Namespace: "ns1", Name: "ep1"}: sets.NewString("1.1.1.1"), - }, - }, { - // Case[3]: named local and non-local ports for the same IP. - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, - }, - makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, - }, - }, - expected: map[types.NamespacedName]sets.String{ - {Namespace: "ns1", Name: "ep1"}: sets.NewString("1.1.1.2"), - }, - }, { - // Case[4]: named local and non-local ports for different IPs. - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - }, - makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.2:22", isLocal: true}, - {endpoint: "2.2.2.22:22", isLocal: true}, - }, - makeServicePortName("ns2", "ep2", "p23"): { - {endpoint: "2.2.2.3:23", isLocal: true}, - }, - makeServicePortName("ns4", "ep4", "p44"): { - {endpoint: "4.4.4.4:44", isLocal: true}, - {endpoint: "4.4.4.5:44", isLocal: false}, - }, - makeServicePortName("ns4", "ep4", "p45"): { - {endpoint: "4.4.4.6:45", isLocal: true}, - }, - }, - expected: map[types.NamespacedName]sets.String{ - {Namespace: "ns2", Name: "ep2"}: sets.NewString("2.2.2.2", "2.2.2.22", "2.2.2.3"), - {Namespace: "ns4", Name: "ep4"}: sets.NewString("4.4.4.4", "4.4.4.6"), - }, - }, { - // Case[5]: named port local and bad endpoints IP - endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "bad ip:11", isLocal: true}, - }, - }, - expected: map[types.NamespacedName]sets.String{}, - }} - - for tci, tc := range testCases { - // outputs - localIPs := getLocalIPs(tc.endpointsMap) - - if !reflect.DeepEqual(localIPs, tc.expected) { - t.Errorf("[%d] expected %#v, got %#v", tci, tc.expected, localIPs) - } - } -} - -// This is a coarse test, but it offers some modicum of confidence as the code is evolved. -func Test_endpointsToEndpointsMap(t *testing.T) { - testCases := []struct { - newEndpoints *api.Endpoints - expected map[proxy.ServicePortName][]*endpointsInfo - }{{ - // Case[0]: nothing - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - expected: map[proxy.ServicePortName][]*endpointsInfo{}, - }, { - // Case[1]: no changes, unnamed port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "", - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, - }, - }, - }, { - // Case[2]: no changes, named port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "port", - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "port"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - }, - }, - }, { - // Case[3]: new port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, - }, - }, - }, { - // Case[4]: remove port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - expected: map[proxy.ServicePortName][]*endpointsInfo{}, - }, { - // Case[5]: new IP and port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }, { - IP: "2.2.2.2", - }}, - Ports: []api.EndpointPort{{ - Name: "p1", - Port: 11, - }, { - Name: "p2", - Port: 22, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "2.2.2.2:11", isLocal: false}, - }, - makeServicePortName("ns1", "ep1", "p2"): { - {endpoint: "1.1.1.1:22", isLocal: false}, - {endpoint: "2.2.2.2:22", isLocal: false}, - }, - }, - }, { - // Case[6]: remove IP and port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p1", - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - }, - }, - }, { - // Case[7]: rename port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p2", - Port: 11, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p2"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - }, - }, - }, { - // Case[8]: renumber port - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { - ept.Subsets = []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{ - IP: "1.1.1.1", - }}, - Ports: []api.EndpointPort{{ - Name: "p1", - Port: 22, - }}, - }, - } - }), - expected: map[proxy.ServicePortName][]*endpointsInfo{ - makeServicePortName("ns1", "ep1", "p1"): { - {endpoint: "1.1.1.1:22", isLocal: false}, - }, - }, - }} - - for tci, tc := range testCases { - // outputs - newEndpoints := endpointsToEndpointsMap(tc.newEndpoints, "host") - - if len(newEndpoints) != len(tc.expected) { - t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expected), len(newEndpoints), spew.Sdump(newEndpoints)) - } - for x := range tc.expected { - if len(newEndpoints[x]) != len(tc.expected[x]) { - t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expected[x]), x, len(newEndpoints[x])) - } else { - for i := range newEndpoints[x] { - if *(newEndpoints[x][i]) != *(tc.expected[x][i]) { - t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expected[x][i], *(newEndpoints[x][i])) - } - } - } - } + t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } } @@ -1619,7 +1344,7 @@ func makeServiceMap(proxier *Proxier, allServices ...*api.Service) { proxier.servicesSynced = true } -func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.ServicePortName][]*endpointsInfo) { +func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) } @@ -1628,8 +1353,13 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.Serv t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) } else { for i := range expected[x] { - if *(newMap[x][i]) != *(expected[x][i]) { - t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newMap[x][i]) + newEp, ok := newMap[x][i].(*endpointsInfo) + if !ok { + t.Errorf("Failed to cast endpointsInfo") + continue + } + if *newEp != *(expected[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp) } } } @@ -1950,14 +1680,14 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints []*api.Endpoints oldEndpoints map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo - expectedStaleEndpoints []endpointServicePair + expectedStaleEndpoints []proxy.ServiceEndpoint expectedStaleServiceNames map[proxy.ServicePortName]bool expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { @@ -1978,7 +1708,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { @@ -1999,7 +1729,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, @@ -2028,7 +1758,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, }, { @@ -2061,7 +1791,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.3:13", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, @@ -2128,7 +1858,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "2.2.2.2:22", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, @@ -2148,7 +1878,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", ""): true, }, @@ -2169,9 +1899,9 @@ func Test_updateEndpointsMap(t *testing.T) { }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.1:11", - servicePortName: makeServicePortName("ns1", "ep1", ""), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.1:11", + ServicePortName: makeServicePortName("ns1", "ep1", ""), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -2198,7 +1928,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12"): true, }, @@ -2228,15 +1958,15 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.2:11", - servicePortName: makeServicePortName("ns1", "ep1", "p11"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.2:11", + ServicePortName: makeServicePortName("ns1", "ep1", "p11"), }, { - endpoint: "1.1.1.1:12", - servicePortName: makeServicePortName("ns1", "ep1", "p12"), + Endpoint: "1.1.1.1:12", + ServicePortName: makeServicePortName("ns1", "ep1", "p12"), }, { - endpoint: "1.1.1.2:12", - servicePortName: makeServicePortName("ns1", "ep1", "p12"), + Endpoint: "1.1.1.2:12", + ServicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -2261,7 +1991,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12"): true, }, @@ -2289,9 +2019,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.2:12", - servicePortName: makeServicePortName("ns1", "ep1", "p12"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.2:12", + ServicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -2313,9 +2043,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.1:11", - servicePortName: makeServicePortName("ns1", "ep1", "p11"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.1:11", + ServicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2"): true, @@ -2339,9 +2069,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:22", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "1.1.1.1:11", - servicePortName: makeServicePortName("ns1", "ep1", "p11"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "1.1.1.1:11", + ServicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -2396,21 +2126,21 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "4.4.4.4:44", isLocal: true}, }, }, - expectedStaleEndpoints: []endpointServicePair{{ - endpoint: "2.2.2.2:22", - servicePortName: makeServicePortName("ns2", "ep2", "p22"), + expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "2.2.2.2:22", + ServicePortName: makeServicePortName("ns2", "ep2", "p22"), }, { - endpoint: "2.2.2.22:22", - servicePortName: makeServicePortName("ns2", "ep2", "p22"), + Endpoint: "2.2.2.22:22", + ServicePortName: makeServicePortName("ns2", "ep2", "p22"), }, { - endpoint: "2.2.2.3:23", - servicePortName: makeServicePortName("ns2", "ep2", "p23"), + Endpoint: "2.2.2.3:23", + ServicePortName: makeServicePortName("ns2", "ep2", "p23"), }, { - endpoint: "4.4.4.5:44", - servicePortName: makeServicePortName("ns4", "ep4", "p44"), + Endpoint: "4.4.4.5:44", + ServicePortName: makeServicePortName("ns4", "ep4", "p44"), }, { - endpoint: "4.4.4.6:45", - servicePortName: makeServicePortName("ns4", "ep4", "p45"), + Endpoint: "4.4.4.6:45", + ServicePortName: makeServicePortName("ns4", "ep4", "p45"), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12"): true, @@ -2434,7 +2164,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedStaleEndpoints: []endpointServicePair{}, + expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", ""): true, }, @@ -2454,7 +2184,7 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsAdd(tc.previousEndpoints[i]) } } - updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints) // Now let's call appropriate handlers to get to state we want to be. @@ -2474,27 +2204,40 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsUpdate(prev, curr) } } - result := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname) + result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) - if len(result.staleEndpoints) != len(tc.expectedStaleEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.staleEndpoints), result.staleEndpoints) + if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { + t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints) } for _, x := range tc.expectedStaleEndpoints { - if result.staleEndpoints[x] != true { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.staleEndpoints) + found := false + for _, stale := range result.StaleEndpoints { + if stale == x { + found = true + break + } + } + if !found { + t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints) } } - if len(result.staleServiceNames) != len(tc.expectedStaleServiceNames) { - t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.staleServiceNames), result.staleServiceNames) + if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) { + t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames) } for svcName := range tc.expectedStaleServiceNames { - if result.staleServiceNames[svcName] != true { - t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.staleServiceNames) + found := false + for _, stale := range result.StaleServiceNames { + if stale == svcName { + found = true + } + } + if !found { + t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) } } - if !reflect.DeepEqual(result.hcEndpoints, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.hcEndpoints) + if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) { + t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize) } } }