iptables proxier part changes

pull/6/head
m1093782566 2017-12-10 15:12:23 +08:00
parent 6edcf02d9e
commit f3512cbbb9
2 changed files with 240 additions and 750 deletions

View File

@ -26,7 +26,6 @@ import (
"encoding/base32"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"sync"
@ -37,7 +36,6 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
apiservice "k8s.io/kubernetes/pkg/api/service"
@ -157,37 +155,8 @@ type serviceInfo struct {
serviceLBChainName utiliptables.Chain
}
// internal struct for endpoints information
type endpointsInfo struct {
endpoint string // TODO: should be an endpointString type
isLocal bool
// The following fields we lazily compute and store here for performance
// reasons. If the protocol is the same as you expect it to be, then the
// chainName can be reused, otherwise it should be recomputed.
protocol string
chainName utiliptables.Chain
}
// IPPart returns just the IP part of the endpoint.
func (e *endpointsInfo) IPPart() string {
return utilproxy.IPPart(e.endpoint)
}
// Returns the endpoint chain name for a given endpointsInfo.
func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain {
if e.protocol != protocol {
e.protocol = protocol
e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint)
}
return e.chainName
}
func (e *endpointsInfo) String() string {
return fmt.Sprintf("%v", *e)
}
// returns a new serviceInfo struct
func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
// returns a new proxy.ServicePort which abstracts a serviceInfo
func newServiceInfo(port *api.ServicePort, service *api.Service) proxy.ServicePort {
onlyNodeLocalEndpoints := false
if apiservice.RequestsOnlyLocalTraffic(service) {
onlyNodeLocalEndpoints = true
@ -214,10 +183,13 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
copy(info.externalIPs, service.Spec.ExternalIPs)
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort
if p == 0 {
glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String())
glog.Errorf("Service %q has no healthcheck nodeport", svcName.String())
} else {
info.healthCheckNodePort = int(p)
}
@ -233,134 +205,90 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se
return info
}
type endpointsChange struct {
previous proxyEndpointsMap
current proxyEndpointsMap
// ClusterIP is part of proxy.ServicePort interface.
func (info *serviceInfo) ClusterIP() string {
return info.clusterIP.String()
}
type endpointsChangeMap struct {
lock sync.Mutex
hostname string
items map[types.NamespacedName]*endpointsChange
// Port is part of proxy.ServicePort interface.
func (info *serviceInfo) Port() int {
return info.port
}
type serviceChange struct {
previous proxyServiceMap
current proxyServiceMap
// Protocol is part of proxy.ServicePort interface.
func (info *serviceInfo) Protocol() api.Protocol {
return info.protocol
}
type serviceChangeMap struct {
lock sync.Mutex
items map[types.NamespacedName]*serviceChange
// String is part of proxy.ServicePort interface.
func (info *serviceInfo) String() string {
return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol)
}
type updateEndpointMapResult struct {
hcEndpoints map[types.NamespacedName]int
staleEndpoints map[endpointServicePair]bool
staleServiceNames map[proxy.ServicePortName]bool
// HealthCheckNodePort is part of proxy.ServicePort interface.
func (info *serviceInfo) HealthCheckNodePort() int {
return info.healthCheckNodePort
}
type updateServiceMapResult struct {
hcServices map[types.NamespacedName]uint16
staleServices sets.String
var _ proxy.ServicePort = &serviceInfo{}
// internal struct for endpoints information
type endpointsInfo struct {
endpoint string // TODO: should be an endpointString type
isLocal bool
// The following fields we lazily compute and store here for performance
// reasons. If the protocol is the same as you expect it to be, then the
// chainName can be reused, otherwise it should be recomputed.
protocol string
chainName utiliptables.Chain
}
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
func newEndpointsChangeMap(hostname string) endpointsChangeMap {
return endpointsChangeMap{
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),
// returns a new proxy.Endpoint which abstracts a endpointsInfo
func newEndpointsInfo(IP string, port int, isLocal bool) proxy.Endpoint {
return &endpointsInfo{
endpoint: net.JoinHostPort(IP, strconv.Itoa(port)),
isLocal: isLocal,
}
}
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool {
ecm.lock.Lock()
defer ecm.lock.Unlock()
change, exists := ecm.items[*namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = endpointsToEndpointsMap(previous, ecm.hostname)
ecm.items[*namespacedName] = change
}
change.current = endpointsToEndpointsMap(current, ecm.hostname)
if reflect.DeepEqual(change.previous, change.current) {
delete(ecm.items, *namespacedName)
}
return len(ecm.items) > 0
// IsLocal is part of proxy.Endpoint interface.
func (e *endpointsInfo) IsLocal() bool {
return e.isLocal
}
func newServiceChangeMap() serviceChangeMap {
return serviceChangeMap{
items: make(map[types.NamespacedName]*serviceChange),
}
// IP is part of proxy.Endpoint interface.
func (e *endpointsInfo) IP() string {
return utilproxy.IPPart(e.endpoint)
}
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool {
scm.lock.Lock()
defer scm.lock.Unlock()
change, exists := scm.items[*namespacedName]
if !exists {
change = &serviceChange{}
change.previous = serviceToServiceMap(previous)
scm.items[*namespacedName] = change
// Equal is part of proxy.Endpoint interface.
func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
o, ok := other.(*endpointsInfo)
if !ok {
glog.Errorf("Failed to cast endpointsInfo")
return false
}
change.current = serviceToServiceMap(current)
if reflect.DeepEqual(change.previous, change.current) {
delete(scm.items, *namespacedName)
}
return len(scm.items) > 0
return e.endpoint == o.endpoint &&
e.isLocal == o.isLocal &&
e.protocol == o.protocol &&
e.chainName == o.chainName
}
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
existingPorts := sets.NewString()
for svcPortName, info := range other {
port := strconv.Itoa(info.port)
clusterIPPort := net.JoinHostPort(info.clusterIP.String(), port)
existingPorts.Insert(svcPortName.Port)
_, exists := (*sm)[svcPortName]
if !exists {
glog.V(1).Infof("Adding new service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol)
} else {
glog.V(1).Infof("Updating existing service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol)
}
(*sm)[svcPortName] = info
}
return existingPorts
// String is part of proxy.Endpoint interface.
func (e *endpointsInfo) String() string {
return e.endpoint
}
func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) {
for svcPortName := range other {
if existingPorts.Has(svcPortName.Port) {
continue
}
info, exists := (*sm)[svcPortName]
if exists {
glog.V(1).Infof("Removing service port %q", svcPortName)
if info.protocol == api.ProtocolUDP {
staleServices.Insert(info.clusterIP.String())
}
delete(*sm, svcPortName)
} else {
glog.Errorf("Service port %q removed, but doesn't exists", svcPortName)
}
// Returns the endpoint chain name for a given endpointsInfo.
func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain {
if e.protocol != protocol {
e.protocol = protocol
e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint)
}
return e.chainName
}
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
for svcPortName := range other {
em[svcPortName] = other[svcPortName]
}
}
func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
for svcPortName := range other {
delete(em, svcPortName)
}
}
var _ proxy.Endpoint = &endpointsInfo{}
// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
@ -369,12 +297,12 @@ type Proxier struct {
// services that happened since iptables was synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
endpointsChanges endpointsChangeMap
serviceChanges serviceChangeMap
endpointsChanges *proxy.EndpointChangeTracker
serviceChanges *proxy.ServiceChangeTracker
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
endpointsMap proxyEndpointsMap
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables
@ -469,10 +397,10 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(hostname),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname),
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
@ -660,22 +588,19 @@ func (proxier *Proxier) isInitialized() bool {
}
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
if proxier.serviceChanges.Update(nil, service, newServiceInfo) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
if proxier.serviceChanges.Update(oldService, service, newServiceInfo) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
if proxier.serviceChanges.Update(service, nil, newServiceInfo) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
@ -690,52 +615,20 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.syncProxyRules()
}
// <serviceMap> is updated by this function (based on the given changes).
// <changes> map is cleared after applying them.
func updateServiceMap(
serviceMap proxyServiceMap,
changes *serviceChangeMap) (result updateServiceMapResult) {
result.staleServices = sets.NewString()
func() {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
existingPorts := serviceMap.merge(change.current)
serviceMap.unmerge(change.previous, existingPorts, result.staleServices)
}
changes.items = make(map[types.NamespacedName]*serviceChange)
}()
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
result.hcServices = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap {
if info.healthCheckNodePort != 0 {
result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
}
}
return result
}
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
if proxier.endpointsChanges.Update(nil, endpoints, newEndpointsInfo) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
if proxier.endpointsChanges.Update(oldEndpoints, endpoints, newEndpointsInfo) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
if proxier.endpointsChanges.Update(endpoints, nil, newEndpointsInfo) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
@ -750,152 +643,6 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.syncProxyRules()
}
// <endpointsMap> is updated by this function (based on the given changes).
// <changes> map is cleared after applying them.
func updateEndpointsMap(
endpointsMap proxyEndpointsMap,
changes *endpointsChangeMap,
hostname string) (result updateEndpointMapResult) {
result.staleEndpoints = make(map[endpointServicePair]bool)
result.staleServiceNames = make(map[proxy.ServicePortName]bool)
func() {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
endpointsMap.unmerge(change.previous)
endpointsMap.merge(change.current)
detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames)
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
}()
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
result.hcEndpoints = make(map[types.NamespacedName]int)
localIPs := getLocalIPs(endpointsMap)
for nsn, ips := range localIPs {
result.hcEndpoints[nsn] = len(ips)
}
return result
}
// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
for svcPortName, epList := range oldEndpointsMap {
for _, ep := range epList {
stale := true
for i := range newEndpointsMap[svcPortName] {
if *newEndpointsMap[svcPortName][i] == *ep {
stale = false
break
}
}
if stale {
glog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.endpoint)
staleEndpoints[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPortName}] = true
}
}
}
for svcPortName, epList := range newEndpointsMap {
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
staleServiceNames[svcPortName] = true
}
}
}
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
localIPs := make(map[types.NamespacedName]sets.String)
for svcPortName := range endpointsMap {
for _, ep := range endpointsMap[svcPortName] {
if ep.isLocal {
// If the endpoint has a bad format, utilproxy.IPPart() will log an
// error and ep.IPPart() will return a null string.
if ip := ep.IPPart(); ip != "" {
nsn := svcPortName.NamespacedName
if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString()
}
localIPs[nsn].Insert(ip)
}
}
}
}
return localIPs
}
// Translates single Endpoints object to proxyEndpointsMap.
// This function is used for incremental updated of endpointsMap.
//
// NOTE: endpoints object should NOT be modified.
func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEndpointsMap {
if endpoints == nil {
return nil
}
endpointsMap := make(proxyEndpointsMap)
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
if port.Port == 0 {
glog.Warningf("ignoring invalid endpoint port %s", port.Name)
continue
}
svcPortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
Port: port.Name,
}
for i := range ss.Addresses {
addr := &ss.Addresses[i]
if addr.IP == "" {
glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name)
continue
}
epInfo := &endpointsInfo{
endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
}
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo)
}
if glog.V(3) {
newEPList := []string{}
for _, ep := range endpointsMap[svcPortName] {
newEPList = append(newEPList, ep.endpoint)
}
glog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList)
}
}
}
return endpointsMap
}
// Translates single Service object to proxyServiceMap.
//
// NOTE: service object should NOT be modified.
func serviceToServiceMap(service *api.Service) proxyServiceMap {
if service == nil {
return nil
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if utilproxy.ShouldSkipService(svcName, service) {
return nil
}
serviceMap := make(proxyServiceMap)
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service)
}
return serviceMap
}
// portProtoHash takes the ServicePortName and protocol for a service
// returns the associated 16 character hash. This is computed by hashing (sha256)
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
@ -936,25 +683,17 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
}
type endpointServicePair struct {
endpoint string
servicePortName proxy.ServicePortName
}
func (esp *endpointServicePair) IPPart() string {
return utilproxy.IPPart(esp.endpoint)
}
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
for epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.endpoint)
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.clusterIP.String(), endpointIP)
// TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.ClusterIP(), endpointIP)
if err != nil {
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.servicePortName.String(), err)
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
}
}
}
@ -981,17 +720,15 @@ func (proxier *Proxier) syncProxyRules() {
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
endpointUpdateResult := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
staleServices := serviceUpdateResult.staleServices
staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
for svcPortName := range endpointUpdateResult.staleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
staleServices.Insert(svcInfo.clusterIP.String())
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == api.ProtocolUDP {
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP())
staleServices.Insert(svcInfo.ClusterIP())
}
}
@ -1164,7 +901,12 @@ func (proxier *Proxier) syncProxyRules() {
// Build rules for each service.
var svcNameString string
for svcName, svcInfo := range proxier.serviceMap {
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
glog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue
}
isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP)
protocol := strings.ToLower(string(svcInfo.protocol))
svcNameString = svcInfo.serviceNameString
@ -1224,7 +966,7 @@ func (proxier *Proxier) syncProxyRules() {
lp := utilproxy.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
Port: svcInfo.port,
Port: svcInfo.Port(),
Protocol: protocol,
}
if proxier.portsMap[lp] != nil {
@ -1448,8 +1190,13 @@ func (proxier *Proxier) syncProxyRules() {
endpointChains = endpointChains[:0]
var endpointChain utiliptables.Chain
for _, ep := range proxier.endpointsMap[svcName] {
endpoints = append(endpoints, ep)
endpointChain = ep.endpointChain(svcNameString, protocol)
epInfo, ok := ep.(*endpointsInfo)
if !ok {
glog.Errorf("Failed to cast endpointsInfo %q", ep.String())
continue
}
endpoints = append(endpoints, epInfo)
endpointChain = epInfo.endpointChain(svcNameString, protocol)
endpointChains = append(endpointChains, endpointChain)
// Create the endpoint chain, retaining counters if possible.
@ -1476,7 +1223,7 @@ func (proxier *Proxier) syncProxyRules() {
// Now write loadbalancing & DNAT rules.
n := len(endpointChains)
for i, endpointChain := range endpointChains {
epIP := endpoints[i].IPPart()
epIP := endpoints[i].IP()
if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
@ -1687,10 +1434,10 @@ func (proxier *Proxier) syncProxyRules() {
// Update healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the healthChecker
// will just drop those endpoints.
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
glog.Errorf("Error syncing healtcheck services: %v", err)
}
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err)
}
@ -1701,7 +1448,7 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
}
}
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}
// Join all words with spaces, terminate with newline and write to buf.

View File

@ -18,22 +18,19 @@ package iptables
import (
"bytes"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
"fmt"
"net"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/proxy"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
@ -200,8 +197,8 @@ func TestDeleteEndpointConnections(t *testing.T) {
svcIP string
svcPort int
protocol api.Protocol
endpoint string // IP:port endpoint
epSvcPair endpointServicePair // Will be generated by test
endpoint string // IP:port endpoint
epSvcPair proxy.ServiceEndpoint // Will be generated by test
simulatedErr string
}{
{
@ -253,16 +250,16 @@ func TestDeleteEndpointConnections(t *testing.T) {
// Create a service map that has service info entries for all test cases
// and generate an endpoint service pair for each test case
serviceMap := make(map[proxy.ServicePortName]*serviceInfo)
serviceMap := make(map[proxy.ServicePortName]proxy.ServicePort)
for i, tc := range testCases {
svc := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
}
serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false)
testCases[i].epSvcPair = endpointServicePair{
endpoint: tc.endpoint,
servicePortName: svc,
testCases[i].epSvcPair = proxy.ServiceEndpoint{
Endpoint: tc.endpoint,
ServicePortName: svc,
}
}
@ -298,7 +295,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
priorExecs := fexec.CommandCalls
priorGlogErrs := glog.Stats.Error.Lines()
input := map[endpointServicePair]bool{tc.epSvcPair: true}
input := []proxy.ServiceEndpoint{tc.epSvcPair}
fakeProxier.deleteEndpointConnections(input)
// For UDP connections, check the executed conntrack command
@ -391,10 +388,10 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// invocation into a Run() method.
p := &Proxier{
exec: &fakeexec.FakeExec{},
serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(testHostname),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname),
iptables: ipt,
clusterCIDR: "10.0.0.0/24",
hostname: testHostname,
@ -720,7 +717,6 @@ func TestLoadBalancer(t *testing.T) {
proto := strings.ToLower(string(api.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
svcChain := string(servicePortChainName(svcPortName.String(), proto))
//lbChain := string(serviceLBChainName(svcPortName.String(), proto))
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
@ -1111,24 +1107,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
for i := range services {
fp.OnServiceAdd(services[i])
}
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
if len(fp.serviceMap) != 8 {
t.Errorf("expected service map length 8, got %v", fp.serviceMap)
}
// The only-local-loadbalancer ones get added
if len(result.hcServices) != 1 {
t.Errorf("expected 1 healthcheck port, got %v", result.hcServices)
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
} else {
nsn := makeNSN("somewhere", "only-local-load-balancer")
if port, found := result.hcServices[nsn]; !found || port != 345 {
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.hcServices)
if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
}
}
if len(result.staleServices) != 0 {
if len(result.UDPStaleClusterIP) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
}
// Remove some stuff
@ -1144,24 +1140,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
fp.OnServiceDelete(services[2])
fp.OnServiceDelete(services[3])
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
if len(fp.serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
}
if len(result.hcServices) != 0 {
t.Errorf("expected 0 healthcheck ports, got %v", result.hcServices)
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
}
// All services but one were deleted. While you'd expect only the ClusterIPs
// from the three deleted services here, we still have the ClusterIP for
// the not-deleted service, because one of it's ServicePorts was deleted.
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
if len(result.staleServices) != len(expectedStaleUDPServices) {
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.staleServices.UnsortedList())
if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
}
for _, ip := range expectedStaleUDPServices {
if !result.staleServices.Has(ip) {
if !result.UDPStaleClusterIP.Has(ip) {
t.Errorf("expected stale UDP service service %s", ip)
}
}
@ -1184,18 +1180,18 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
)
// Headless service should be ignored
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
}
// No proxied services, so no healthchecks
if len(result.hcServices) != 0 {
t.Errorf("expected healthcheck ports length 0, got %d", len(result.hcServices))
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
}
if len(result.staleServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
if len(result.UDPStaleClusterIP) != 0 {
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
}
}
@ -1212,16 +1208,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
}),
)
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
if len(fp.serviceMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
}
// No proxied services, so no healthchecks
if len(result.hcServices) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
}
if len(result.staleServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices)
if len(result.UDPStaleClusterIP) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
}
}
@ -1252,328 +1248,57 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
fp.OnServiceAdd(servicev1)
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
}
if len(result.hcServices) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
}
if len(result.staleServices) != 0 {
if len(result.UDPStaleClusterIP) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
}
// Change service to load-balancer
fp.OnServiceUpdate(servicev1, servicev2)
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
}
if len(result.hcServices) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices)
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
}
if len(result.staleServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.UnsortedList())
if len(result.UDPStaleClusterIP) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
}
// No change; make sure the service map stays the same and there are
// no health-check changes
fp.OnServiceUpdate(servicev2, servicev2)
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
}
if len(result.hcServices) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices)
if len(result.HCServiceNodePorts) != 1 {
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
}
if len(result.staleServices) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.UnsortedList())
if len(result.UDPStaleClusterIP) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
}
// And back to ClusterIP
fp.OnServiceUpdate(servicev2, servicev1)
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
if len(fp.serviceMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
}
if len(result.hcServices) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
if len(result.HCServiceNodePorts) != 0 {
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
}
if len(result.staleServices) != 0 {
if len(result.UDPStaleClusterIP) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
}
}
func Test_getLocalIPs(t *testing.T) {
testCases := []struct {
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
expected map[types.NamespacedName]sets.String
}{{
// Case[0]: nothing
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{},
expected: map[types.NamespacedName]sets.String{},
}, {
// Case[1]: unnamed port
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expected: map[types.NamespacedName]sets.String{},
}, {
// Case[2]: unnamed port local
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
{endpoint: "1.1.1.1:11", isLocal: true},
},
},
expected: map[types.NamespacedName]sets.String{
{Namespace: "ns1", Name: "ep1"}: sets.NewString("1.1.1.1"),
},
}, {
// Case[3]: named local and non-local ports for the same IP.
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{endpoint: "1.1.1.1:11", isLocal: false},
{endpoint: "1.1.1.2:11", isLocal: true},
},
makeServicePortName("ns1", "ep1", "p12"): {
{endpoint: "1.1.1.1:12", isLocal: false},
{endpoint: "1.1.1.2:12", isLocal: true},
},
},
expected: map[types.NamespacedName]sets.String{
{Namespace: "ns1", Name: "ep1"}: sets.NewString("1.1.1.2"),
},
}, {
// Case[4]: named local and non-local ports for different IPs.
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{endpoint: "1.1.1.1:11", isLocal: false},
},
makeServicePortName("ns2", "ep2", "p22"): {
{endpoint: "2.2.2.2:22", isLocal: true},
{endpoint: "2.2.2.22:22", isLocal: true},
},
makeServicePortName("ns2", "ep2", "p23"): {
{endpoint: "2.2.2.3:23", isLocal: true},
},
makeServicePortName("ns4", "ep4", "p44"): {
{endpoint: "4.4.4.4:44", isLocal: true},
{endpoint: "4.4.4.5:44", isLocal: false},
},
makeServicePortName("ns4", "ep4", "p45"): {
{endpoint: "4.4.4.6:45", isLocal: true},
},
},
expected: map[types.NamespacedName]sets.String{
{Namespace: "ns2", Name: "ep2"}: sets.NewString("2.2.2.2", "2.2.2.22", "2.2.2.3"),
{Namespace: "ns4", Name: "ep4"}: sets.NewString("4.4.4.4", "4.4.4.6"),
},
}, {
// Case[5]: named port local and bad endpoints IP
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): {
{endpoint: "bad ip:11", isLocal: true},
},
},
expected: map[types.NamespacedName]sets.String{},
}}
for tci, tc := range testCases {
// outputs
localIPs := getLocalIPs(tc.endpointsMap)
if !reflect.DeepEqual(localIPs, tc.expected) {
t.Errorf("[%d] expected %#v, got %#v", tci, tc.expected, localIPs)
}
}
}
// This is a coarse test, but it offers some modicum of confidence as the code is evolved.
func Test_endpointsToEndpointsMap(t *testing.T) {
testCases := []struct {
newEndpoints *api.Endpoints
expected map[proxy.ServicePortName][]*endpointsInfo
}{{
// Case[0]: nothing
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}),
expected: map[proxy.ServicePortName][]*endpointsInfo{},
}, {
// Case[1]: no changes, unnamed port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{
IP: "1.1.1.1",
}},
Ports: []api.EndpointPort{{
Name: "",
Port: 11,
}},
},
}
}),
expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
}, {
// Case[2]: no changes, named port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{
IP: "1.1.1.1",
}},
Ports: []api.EndpointPort{{
Name: "port",
Port: 11,
}},
},
}
}),
expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "port"): {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
}, {
// Case[3]: new port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{
IP: "1.1.1.1",
}},
Ports: []api.EndpointPort{{
Port: 11,
}},
},
}
}),
expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
}, {
// Case[4]: remove port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}),
expected: map[proxy.ServicePortName][]*endpointsInfo{},
}, {
// Case[5]: new IP and port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{
IP: "1.1.1.1",
}, {
IP: "2.2.2.2",
}},
Ports: []api.EndpointPort{{
Name: "p1",
Port: 11,
}, {
Name: "p2",
Port: 22,
}},
},
}
}),
expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p1"): {
{endpoint: "1.1.1.1:11", isLocal: false},
{endpoint: "2.2.2.2:11", isLocal: false},
},
makeServicePortName("ns1", "ep1", "p2"): {
{endpoint: "1.1.1.1:22", isLocal: false},
{endpoint: "2.2.2.2:22", isLocal: false},
},
},
}, {
// Case[6]: remove IP and port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{
IP: "1.1.1.1",
}},
Ports: []api.EndpointPort{{
Name: "p1",
Port: 11,
}},
},
}
}),
expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p1"): {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
}, {
// Case[7]: rename port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{
IP: "1.1.1.1",
}},
Ports: []api.EndpointPort{{
Name: "p2",
Port: 11,
}},
},
}
}),
expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p2"): {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
}, {
// Case[8]: renumber port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{
IP: "1.1.1.1",
}},
Ports: []api.EndpointPort{{
Name: "p1",
Port: 22,
}},
},
}
}),
expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p1"): {
{endpoint: "1.1.1.1:22", isLocal: false},
},
},
}}
for tci, tc := range testCases {
// outputs
newEndpoints := endpointsToEndpointsMap(tc.newEndpoints, "host")
if len(newEndpoints) != len(tc.expected) {
t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expected), len(newEndpoints), spew.Sdump(newEndpoints))
}
for x := range tc.expected {
if len(newEndpoints[x]) != len(tc.expected[x]) {
t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expected[x]), x, len(newEndpoints[x]))
} else {
for i := range newEndpoints[x] {
if *(newEndpoints[x][i]) != *(tc.expected[x][i]) {
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expected[x][i], *(newEndpoints[x][i]))
}
}
}
}
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
}
}
@ -1619,7 +1344,7 @@ func makeServiceMap(proxier *Proxier, allServices ...*api.Service) {
proxier.servicesSynced = true
}
func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
if len(newMap) != len(expected) {
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
}
@ -1628,8 +1353,13 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.Serv
t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
} else {
for i := range expected[x] {
if *(newMap[x][i]) != *(expected[x][i]) {
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newMap[x][i])
newEp, ok := newMap[x][i].(*endpointsInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo")
continue
}
if *newEp != *(expected[x][i]) {
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
}
}
}
@ -1950,14 +1680,14 @@ func Test_updateEndpointsMap(t *testing.T) {
currentEndpoints []*api.Endpoints
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
expectedResult map[proxy.ServicePortName][]*endpointsInfo
expectedStaleEndpoints []endpointServicePair
expectedStaleEndpoints []proxy.ServiceEndpoint
expectedStaleServiceNames map[proxy.ServicePortName]bool
expectedHealthchecks map[types.NamespacedName]int
}{{
// Case[0]: nothing
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
@ -1978,7 +1708,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
@ -1999,7 +1729,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: true},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
@ -2028,7 +1758,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.2:12", isLocal: false},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
}, {
@ -2061,7 +1791,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.3:13", isLocal: false},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
@ -2128,7 +1858,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "2.2.2.2:22", isLocal: true},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 2,
@ -2148,7 +1878,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: true},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", ""): true,
},
@ -2169,9 +1899,9 @@ func Test_updateEndpointsMap(t *testing.T) {
},
},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
expectedStaleEndpoints: []endpointServicePair{{
endpoint: "1.1.1.1:11",
servicePortName: makeServicePortName("ns1", "ep1", ""),
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", ""),
}},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
@ -2198,7 +1928,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.2:12", isLocal: true},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12"): true,
},
@ -2228,15 +1958,15 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedStaleEndpoints: []endpointServicePair{{
endpoint: "1.1.1.2:11",
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.2:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
}, {
endpoint: "1.1.1.1:12",
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
Endpoint: "1.1.1.1:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
}, {
endpoint: "1.1.1.2:12",
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
}},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
@ -2261,7 +1991,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.2:12", isLocal: true},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12"): true,
},
@ -2289,9 +2019,9 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedStaleEndpoints: []endpointServicePair{{
endpoint: "1.1.1.2:12",
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
}},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
@ -2313,9 +2043,9 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedStaleEndpoints: []endpointServicePair{{
endpoint: "1.1.1.1:11",
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
}},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2"): true,
@ -2339,9 +2069,9 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:22", isLocal: false},
},
},
expectedStaleEndpoints: []endpointServicePair{{
endpoint: "1.1.1.1:11",
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
}},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
@ -2396,21 +2126,21 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "4.4.4.4:44", isLocal: true},
},
},
expectedStaleEndpoints: []endpointServicePair{{
endpoint: "2.2.2.2:22",
servicePortName: makeServicePortName("ns2", "ep2", "p22"),
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "2.2.2.2:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22"),
}, {
endpoint: "2.2.2.22:22",
servicePortName: makeServicePortName("ns2", "ep2", "p22"),
Endpoint: "2.2.2.22:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22"),
}, {
endpoint: "2.2.2.3:23",
servicePortName: makeServicePortName("ns2", "ep2", "p23"),
Endpoint: "2.2.2.3:23",
ServicePortName: makeServicePortName("ns2", "ep2", "p23"),
}, {
endpoint: "4.4.4.5:44",
servicePortName: makeServicePortName("ns4", "ep4", "p44"),
Endpoint: "4.4.4.5:44",
ServicePortName: makeServicePortName("ns4", "ep4", "p44"),
}, {
endpoint: "4.4.4.6:45",
servicePortName: makeServicePortName("ns4", "ep4", "p45"),
Endpoint: "4.4.4.6:45",
ServicePortName: makeServicePortName("ns4", "ep4", "p45"),
}},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12"): true,
@ -2434,7 +2164,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedStaleEndpoints: []endpointServicePair{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", ""): true,
},
@ -2454,7 +2184,7 @@ func Test_updateEndpointsMap(t *testing.T) {
fp.OnEndpointsAdd(tc.previousEndpoints[i])
}
}
updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname)
proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
// Now let's call appropriate handlers to get to state we want to be.
@ -2474,27 +2204,40 @@ func Test_updateEndpointsMap(t *testing.T) {
fp.OnEndpointsUpdate(prev, curr)
}
}
result := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname)
result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
if len(result.staleEndpoints) != len(tc.expectedStaleEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.staleEndpoints), result.staleEndpoints)
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
}
for _, x := range tc.expectedStaleEndpoints {
if result.staleEndpoints[x] != true {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.staleEndpoints)
found := false
for _, stale := range result.StaleEndpoints {
if stale == x {
found = true
break
}
}
if !found {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
}
}
if len(result.staleServiceNames) != len(tc.expectedStaleServiceNames) {
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.staleServiceNames), result.staleServiceNames)
if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
}
for svcName := range tc.expectedStaleServiceNames {
if result.staleServiceNames[svcName] != true {
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.staleServiceNames)
found := false
for _, stale := range result.StaleServiceNames {
if stale == svcName {
found = true
}
}
if !found {
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
}
}
if !reflect.DeepEqual(result.hcEndpoints, tc.expectedHealthchecks) {
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.hcEndpoints)
if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
}
}
}