Check whether endpoints change

pull/6/head
Wojciech Tyczynski 2017-05-15 09:55:15 +02:00
parent 37a6989c79
commit 05ffcccdc1
2 changed files with 55 additions and 29 deletions

View File

@ -194,12 +194,13 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
} }
type endpointsChange struct { type endpointsChange struct {
previous *api.Endpoints previous proxyEndpointsMap
current *api.Endpoints current proxyEndpointsMap
} }
type endpointsChangeMap struct { type endpointsChangeMap struct {
sync.Mutex sync.Mutex
hostname string
items map[types.NamespacedName]*endpointsChange items map[types.NamespacedName]*endpointsChange
} }
@ -216,23 +217,34 @@ type serviceChangeMap struct {
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
func newEndpointsChangeMap() endpointsChangeMap { func newEndpointsChangeMap(hostname string) endpointsChangeMap {
return endpointsChangeMap{ return endpointsChangeMap{
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange), items: make(map[types.NamespacedName]*endpointsChange),
} }
} }
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) { func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool {
ecm.Lock() ecm.Lock()
defer ecm.Unlock() defer ecm.Unlock()
change, exists := ecm.items[*namespacedName] change, exists := ecm.items[*namespacedName]
if !exists { if !exists {
change = &endpointsChange{} change = &endpointsChange{}
change.previous = previous change.previous = endpointsToEndpointsMap(previous, ecm.hostname)
ecm.items[*namespacedName] = change ecm.items[*namespacedName] = change
} }
change.current = current change.current = endpointsToEndpointsMap(current, ecm.hostname)
if reflect.DeepEqual(change.previous, change.current) {
delete(ecm.items, *namespacedName)
return false
}
// TODO: Instead of returning true/false, we should consider returning whether
// the map contains some element or not. Currently, if the change is
// "reverting" some previous endpoints update, but there are still some other
// modified endpoints, we will return false, even though there are some change
// to apply.
return true
} }
func newServiceChangeMap() serviceChangeMap { func newServiceChangeMap() serviceChangeMap {
@ -410,7 +422,7 @@ func NewProxier(ipt utiliptables.Interface,
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(), endpointsChanges: newEndpointsChangeMap(hostname),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod, minSyncPeriod: minSyncPeriod,
throttle: throttle, throttle: throttle,
@ -677,30 +689,48 @@ func updateServiceMap(
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, nil, endpoints) if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonEndpoints) proxier.syncProxyRules(syncReasonEndpoints)
}
} }
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonEndpoints) proxier.syncProxyRules(syncReasonEndpoints)
}
} }
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, endpoints, nil) if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonEndpoints) proxier.syncProxyRules(syncReasonEndpoints)
}
} }
func (proxier *Proxier) OnEndpointsSynced() { func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock() proxier.mu.Lock()
proxier.endpointsSynced = true proxier.endpointsSynced = true
proxier.mu.Unlock() proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
proxier.syncProxyRules(syncReasonEndpoints) proxier.syncProxyRules(syncReasonEndpoints)
} }
@ -717,15 +747,11 @@ func updateEndpointsMap(
changes.Lock() changes.Lock()
defer changes.Unlock() defer changes.Unlock()
for _, change := range changes.items { for _, change := range changes.items {
oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname) endpointsMap.unmerge(change.previous)
newEndpointsMap := endpointsToEndpointsMap(change.current, hostname) endpointsMap.merge(change.current)
if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) { detectStaleConnections(change.previous, change.current, staleSet)
endpointsMap.unmerge(oldEndpointsMap)
endpointsMap.merge(newEndpointsMap)
detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet)
syncRequired = true syncRequired = true
} }
}
changes.items = make(map[types.NamespacedName]*endpointsChange) changes.items = make(map[types.NamespacedName]*endpointsChange)
}() }()

View File

@ -388,7 +388,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(), endpointsChanges: newEndpointsChangeMap(testHostname),
iptables: ipt, iptables: ipt,
clusterCIDR: "10.0.0.0/24", clusterCIDR: "10.0.0.0/24",
hostname: testHostname, hostname: testHostname,
@ -1611,7 +1611,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.Serv
} }
func Test_updateEndpointsMap(t *testing.T) { func Test_updateEndpointsMap(t *testing.T) {
var nodeName = "host" var nodeName = testHostname
unnamedPort := func(ept *api.Endpoints) { unnamedPort := func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{ ept.Subsets = []api.EndpointSubset{{