diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 13ba06aa32..4d996c70be 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -194,13 +194,14 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se } type endpointsChange struct { - previous *api.Endpoints - current *api.Endpoints + previous proxyEndpointsMap + current proxyEndpointsMap } type endpointsChangeMap struct { sync.Mutex - items map[types.NamespacedName]*endpointsChange + hostname string + items map[types.NamespacedName]*endpointsChange } type serviceChange struct { @@ -216,23 +217,34 @@ type serviceChangeMap struct { type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo -func newEndpointsChangeMap() endpointsChangeMap { +func newEndpointsChangeMap(hostname string) endpointsChangeMap { return endpointsChangeMap{ - items: make(map[types.NamespacedName]*endpointsChange), + hostname: hostname, + items: make(map[types.NamespacedName]*endpointsChange), } } -func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) { +func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool { ecm.Lock() defer ecm.Unlock() change, exists := ecm.items[*namespacedName] if !exists { change = &endpointsChange{} - change.previous = previous + change.previous = endpointsToEndpointsMap(previous, ecm.hostname) ecm.items[*namespacedName] = change } - change.current = current + change.current = endpointsToEndpointsMap(current, ecm.hostname) + if reflect.DeepEqual(change.previous, change.current) { + delete(ecm.items, *namespacedName) + return false + } + // TODO: Instead of returning true/false, we should consider returning whether + // the map contains some element or not. Currently, if the change is + // "reverting" some previous endpoints update, but there are still some other + // modified endpoints, we will return false, even though there are some change + // to apply. + return true } func newServiceChangeMap() serviceChangeMap { @@ -410,7 +422,7 @@ func NewProxier(ipt utiliptables.Interface, serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(), + endpointsChanges: newEndpointsChangeMap(hostname), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, throttle: throttle, @@ -677,30 +689,48 @@ func updateServiceMap( func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.update(&namespacedName, nil, endpoints) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonEndpoints) + } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonEndpoints) + } } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - proxier.endpointsChanges.update(&namespacedName, endpoints, nil) - - proxier.syncProxyRules(syncReasonEndpoints) + if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) { + // TODO(wojtek-t): If the initial sync of informer either for endpoints or + // services is not finished, it doesn't make sense to call syncProxyRules + // because it will early-return (to avoid resyncing iptables with partial + // state right after kube-proxy restart). This can eat a token for calling + // syncProxyRules, but is not that critical since it can happen only + // after kube-proxy was (re)started. + proxier.syncProxyRules(syncReasonEndpoints) + } } func (proxier *Proxier) OnEndpointsSynced() { proxier.mu.Lock() proxier.endpointsSynced = true proxier.mu.Unlock() - + // Call it unconditionally - this is called once per lifetime. proxier.syncProxyRules(syncReasonEndpoints) } @@ -717,14 +747,10 @@ func updateEndpointsMap( changes.Lock() defer changes.Unlock() for _, change := range changes.items { - oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname) - newEndpointsMap := endpointsToEndpointsMap(change.current, hostname) - if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) { - endpointsMap.unmerge(oldEndpointsMap) - endpointsMap.merge(newEndpointsMap) - detectStaleConnections(oldEndpointsMap, newEndpointsMap, staleSet) - syncRequired = true - } + endpointsMap.unmerge(change.previous) + endpointsMap.merge(change.current) + detectStaleConnections(change.previous, change.current, staleSet) + syncRequired = true } changes.items = make(map[types.NamespacedName]*endpointsChange) }() diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index aba2e3c827..0d11b7cc40 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -388,7 +388,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { serviceMap: make(proxyServiceMap), serviceChanges: newServiceChangeMap(), endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(), + endpointsChanges: newEndpointsChangeMap(testHostname), iptables: ipt, clusterCIDR: "10.0.0.0/24", hostname: testHostname, @@ -1611,7 +1611,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.Serv } func Test_updateEndpointsMap(t *testing.T) { - var nodeName = "host" + var nodeName = testHostname unnamedPort := func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{