kube-proxy: simplify endpoints updates

We don't need the svcPortToInfoMap.  Its only purpose was to
send "valid" local endpoints (those with valid IP and >0 port) to the
health checker.  But we shouldn't be sending invalid endpoints to
the health checker anyway, because it can't do anything with them.

If we exclude invalid endpoints earlier, then we don't need
flattenValidEndpoints().

And if we don't need flattenValidEndpoints() it makes no sense to have
svcPortToInfoMap store hostPortInfo, since endpointsInfo is the same
thing as hostPortInfo except with a combined host:port.

And if svcPortToInfoMap now only stores valid endpointsInfos, it is
exactly the same thing as newEndpoints.
pull/6/head
Dan Williams 2017-03-07 13:32:02 -06:00
parent 0fad9ce5e2
commit 76a7d690db
2 changed files with 21 additions and 77 deletions

View File

@ -556,32 +556,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
}
// Reconstruct the list of endpoint infos from the endpointIP list
// Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos
// from the full []hostPortInfo slice.
//
// For e.g. if input is
// endpoints = []hostPortInfo{ {host="1.1.1.1", port=22, localEndpointOnly=<bool>}, {host="2.2.2.2", port=80, localEndpointOnly=<bool>} }
// endpointIPs = []string{ "2.2.2.2:80" }
//
// then output will be
//
// []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=<bool>} }
func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo {
lookupSet := sets.NewString()
for _, ip := range endpointIPs {
lookupSet.Insert(ip)
}
var filteredEndpoints []*endpointsInfo
for _, hpp := range endPoints {
key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))
if lookupSet.Has(key) {
filteredEndpoints = append(filteredEndpoints, &endpointsInfo{endpoint: key, isLocal: hpp.isLocal})
}
}
return filteredEndpoints
}
// OnEndpointsUpdate takes in a slice of updated endpoints.
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.mu.Lock()
@ -611,12 +585,9 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN
newMap = make(map[proxy.ServicePortName][]*endpointsInfo)
staleSet = make(map[endpointServicePair]bool)
// local
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
// Update endpoints for services.
for i := range allEndpoints {
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap)
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap)
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
@ -646,7 +617,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN
allSvcPorts[svcPort] = true
}
for svcPort := range allSvcPorts {
updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker)
updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker)
}
return newMap, staleSet
@ -663,8 +634,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN
// - naming is poor and responsibilities are muddled
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
curEndpoints map[proxy.ServicePortName][]*endpointsInfo,
newEndpoints *map[proxy.ServicePortName][]*endpointsInfo,
svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) {
newEndpoints *map[proxy.ServicePortName][]*endpointsInfo) {
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
@ -672,70 +642,45 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
if port.Port == 0 {
glog.Warningf("ignoring invalid endpoint port %s", port.Name)
continue
}
svcPort := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
Port: port.Name,
}
for i := range ss.Addresses {
addr := &ss.Addresses[i]
hostPortObject := hostPortInfo{
host: addr.IP,
port: int(port.Port),
if addr.IP == "" {
glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name)
continue
}
epInfo := &endpointsInfo{
endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))),
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
}
(*svcPortToInfoMap)[svcPort] = append((*svcPortToInfoMap)[svcPort], hostPortObject)
(*newEndpoints)[svcPort] = append((*newEndpoints)[svcPort], epInfo)
}
}
}
// Decompose the lists of endpoints into details of what was changed for the caller.
for svcPort, hostPortInfos := range *svcPortToInfoMap {
newEPList := flattenValidEndpoints(hostPortInfos)
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList)
// Once the set operations using the list of ips are complete, build the list of endpoint infos
(*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList)
}
}
// updateHealthCheckEntries - send the new set of local endpoints to the health checker
func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) {
func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
}
// Use a set instead of a slice to provide deduplication
endpoints := sets.NewString()
for _, portInfo := range hostPorts {
epSet := sets.NewString()
for _, portInfo := range endpoints {
if portInfo.isLocal {
// kube-proxy health check only needs local endpoints
endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
}
}
healthChecker.UpdateEndpoints(name, endpoints)
}
// used in OnEndpointsUpdate
type hostPortInfo struct {
host string
port int
isLocal bool
}
func isValidEndpoint(hpp *hostPortInfo) bool {
return hpp.host != "" && hpp.port > 0
}
func flattenValidEndpoints(endpoints []hostPortInfo) []string {
// Convert Endpoint objects into strings for easier use later.
var result []string
for i := range endpoints {
hpp := &endpoints[i]
if isValidEndpoint(hpp) {
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
} else {
glog.Warningf("got invalid endpoint: %+v", *hpp)
}
}
return result
healthChecker.UpdateEndpoints(name, epSet)
}
// portProtoHash takes the ServicePortName and protocol for a service

View File

@ -1337,8 +1337,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
for tci, tc := range testCases {
// outputs
newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{}
svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{}
accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints, &svcPortToInfoMap)
accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints)
if len(newEndpoints) != len(tc.expectedNew) {
t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints))