From aca4d469b250720d75443fed6811a168386b2361 Mon Sep 17 00:00:00 2001 From: Zihong Zheng Date: Wed, 17 May 2017 16:33:13 -0700 Subject: [PATCH] Revert "Remove reasons from iptables syncProxyRules" This reverts commit 77624a12d38692e1cc06d8a375f67b6d48549ca9. --- pkg/proxy/iptables/proxier.go | 70 ++++++++++++++++++------------ pkg/proxy/iptables/proxier_test.go | 16 +++---- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1a6f6e62ac..c6aedbf144 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -199,7 +199,7 @@ type endpointsChange struct { } type endpointsChangeMap struct { - lock sync.Mutex + sync.Mutex items map[types.NamespacedName]*endpointsChange } @@ -209,7 +209,7 @@ type serviceChange struct { } type serviceChangeMap struct { - lock sync.Mutex + sync.Mutex items map[types.NamespacedName]*serviceChange } @@ -223,8 +223,8 @@ func newEndpointsChangeMap() endpointsChangeMap { } func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) { - ecm.lock.Lock() - defer ecm.lock.Unlock() + ecm.Lock() + defer ecm.Unlock() change, exists := ecm.items[*namespacedName] if !exists { @@ -242,8 +242,8 @@ func newServiceChangeMap() serviceChangeMap { } func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) { - scm.lock.Lock() - defer scm.lock.Unlock() + scm.Lock() + defer scm.Unlock() change, exists := scm.items[*namespacedName] if !exists { @@ -509,7 +509,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonForce) } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. @@ -531,21 +531,21 @@ func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} proxier.serviceChanges.update(&namespacedName, nil, service) - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonServices) } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} proxier.serviceChanges.update(&namespacedName, oldService, service) - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonServices) } func (proxier *Proxier) OnServiceDelete(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} proxier.serviceChanges.update(&namespacedName, service, nil) - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonServices) } func (proxier *Proxier) OnServiceSynced() { @@ -553,7 +553,7 @@ func (proxier *Proxier) OnServiceSynced() { proxier.servicesSynced = true proxier.mu.Unlock() - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonServices) } func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool { @@ -587,7 +587,7 @@ func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String info := newServiceInfo(serviceName, servicePort, service) oldInfo, exists := (*sm)[serviceName] equal := reflect.DeepEqual(info, oldInfo) - if !exists { + if exists { glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) } else if !equal { glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) @@ -662,21 +662,21 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} proxier.endpointsChanges.update(&namespacedName, nil, endpoints) - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonEndpoints) } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonEndpoints) } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} proxier.endpointsChanges.update(&namespacedName, endpoints, nil) - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonEndpoints) } func (proxier *Proxier) OnEndpointsSynced() { @@ -684,7 +684,7 @@ func (proxier *Proxier) OnEndpointsSynced() { proxier.endpointsSynced = true proxier.mu.Unlock() - proxier.syncProxyRules() + proxier.syncProxyRules(syncReasonEndpoints) } // is updated by this function (based on the given changes). @@ -873,10 +873,16 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ } } +type syncReason string + +const syncReasonServices syncReason = "ServicesUpdate" +const syncReasonEndpoints syncReason = "EndpointsUpdate" +const syncReasonForce syncReason = "Force" + // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held -func (proxier *Proxier) syncProxyRules() { +func (proxier *Proxier) syncProxyRules(reason syncReason) { proxier.mu.Lock() defer proxier.mu.Unlock() @@ -886,7 +892,7 @@ func (proxier *Proxier) syncProxyRules() { start := time.Now() defer func() { SyncProxyRulesLatency.Observe(sinceInMicroseconds(start)) - glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) + glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start)) }() // don't sync rules till we've received services and endpoints if !proxier.endpointsSynced || !proxier.servicesSynced { @@ -895,20 +901,28 @@ func (proxier *Proxier) syncProxyRules() { } // Figure out the new services we need to activate. - proxier.serviceChanges.lock.Lock() + proxier.serviceChanges.Lock() serviceSyncRequired, hcServices, staleServices := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) - proxier.serviceChanges.lock.Unlock() + proxier.serviceChanges.Unlock() - proxier.endpointsChanges.lock.Lock() - endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap( - proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) - proxier.endpointsChanges.lock.Unlock() - - if !serviceSyncRequired && !endpointsSyncRequired { + // If this was called because of a services update, but nothing actionable has changed, skip it. + if reason == syncReasonServices && !serviceSyncRequired { glog.V(3).Infof("Skipping iptables sync because nothing changed") return } + + proxier.endpointsChanges.Lock() + endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap( + proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) + proxier.endpointsChanges.Unlock() + + // If this was called because of an endpoints update, but nothing actionable has changed, skip it. + if reason == syncReasonEndpoints && !endpointsSyncRequired { + glog.V(3).Infof("Skipping iptables sync because nothing changed") + return + } + glog.V(3).Infof("Syncing iptables rules") // Create and link the kube services chain. @@ -1482,8 +1496,8 @@ func (proxier *Proxier) syncProxyRules() { } proxier.portsMap = replacementPortsMap - // Update healthz timestamp. - if proxier.healthzServer != nil { + // Update healthz timestamp if it is periodic sync. + if proxier.healthzServer != nil && reason == syncReasonForce { proxier.healthzServer.UpdateTimestamp() } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 4d9cb31571..b49b1929b8 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -578,7 +578,7 @@ func TestClusterIPReject(t *testing.T) { }), ) makeEndpointsMap(fp) - fp.syncProxyRules() + fp.syncProxyRules(syncReasonForce) svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) svcRules := ipt.GetRules(svcChain) @@ -627,7 +627,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { }), ) - fp.syncProxyRules() + fp.syncProxyRules(syncReasonForce) epStr := fmt.Sprintf("%s:%d", epIP, svcPort) svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) @@ -691,7 +691,7 @@ func TestLoadBalancer(t *testing.T) { }), ) - fp.syncProxyRules() + fp.syncProxyRules(syncReasonForce) proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svcPortName.String(), proto)) @@ -748,7 +748,7 @@ func TestNodePort(t *testing.T) { }), ) - fp.syncProxyRules() + fp.syncProxyRules(syncReasonForce) proto := strings.ToLower(string(api.ProtocolTCP)) svcChain := string(servicePortChainName(svcPortName.String(), proto)) @@ -785,7 +785,7 @@ func TestExternalIPsReject(t *testing.T) { ) makeEndpointsMap(fp) - fp.syncProxyRules() + fp.syncProxyRules(syncReasonForce) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) { @@ -818,7 +818,7 @@ func TestNodePortReject(t *testing.T) { ) makeEndpointsMap(fp) - fp.syncProxyRules() + fp.syncProxyRules(syncReasonForce) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) { @@ -881,7 +881,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }), ) - fp.syncProxyRules() + fp.syncProxyRules(syncReasonForce) proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svcPortName.String(), proto)) @@ -972,7 +972,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable }), ) - fp.syncProxyRules() + fp.syncProxyRules(syncReasonForce) proto := strings.ToLower(string(api.ProtocolTCP)) lbChain := string(serviceLBChainName(svcPortName.String(), proto))