Merge pull request #46004 from MrHohn/revert-remove-sync-reason

Revert "Remove reasons from iptables syncProxyRules"
pull/6/head
Dawn Chen 2017-05-17 21:45:13 -07:00 committed by GitHub
commit dddc6b863e
2 changed files with 50 additions and 36 deletions

View File

@ -199,7 +199,7 @@ type endpointsChange struct {
}
type endpointsChangeMap struct {
lock sync.Mutex
sync.Mutex
items map[types.NamespacedName]*endpointsChange
}
@ -209,7 +209,7 @@ type serviceChange struct {
}
type serviceChangeMap struct {
lock sync.Mutex
sync.Mutex
items map[types.NamespacedName]*serviceChange
}
@ -223,8 +223,8 @@ func newEndpointsChangeMap() endpointsChangeMap {
}
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) {
ecm.lock.Lock()
defer ecm.lock.Unlock()
ecm.Lock()
defer ecm.Unlock()
change, exists := ecm.items[*namespacedName]
if !exists {
@ -242,8 +242,8 @@ func newServiceChangeMap() serviceChangeMap {
}
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) {
scm.lock.Lock()
defer scm.lock.Unlock()
scm.Lock()
defer scm.Unlock()
change, exists := scm.items[*namespacedName]
if !exists {
@ -509,7 +509,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonForce)
}
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
@ -531,21 +531,21 @@ func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, nil, service)
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, oldService, service)
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, service, nil)
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceSynced() {
@ -553,7 +553,7 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.servicesSynced = true
proxier.mu.Unlock()
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonServices)
}
func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool {
@ -587,7 +587,7 @@ func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String
info := newServiceInfo(serviceName, servicePort, service)
oldInfo, exists := (*sm)[serviceName]
equal := reflect.DeepEqual(info, oldInfo)
if !exists {
if exists {
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
} else if !equal {
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
@ -662,21 +662,21 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, nil, endpoints)
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints)
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, endpoints, nil)
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsSynced() {
@ -684,7 +684,7 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.endpointsSynced = true
proxier.mu.Unlock()
proxier.syncProxyRules()
proxier.syncProxyRules(syncReasonEndpoints)
}
// <endpointsMap> is updated by this function (based on the given changes).
@ -873,10 +873,16 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
}
}
type syncReason string
const syncReasonServices syncReason = "ServicesUpdate"
const syncReasonEndpoints syncReason = "EndpointsUpdate"
const syncReasonForce syncReason = "Force"
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() {
func (proxier *Proxier) syncProxyRules(reason syncReason) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
@ -886,7 +892,7 @@ func (proxier *Proxier) syncProxyRules() {
start := time.Now()
defer func() {
SyncProxyRulesLatency.Observe(sinceInMicroseconds(start))
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || !proxier.servicesSynced {
@ -895,20 +901,28 @@ func (proxier *Proxier) syncProxyRules() {
}
// Figure out the new services we need to activate.
proxier.serviceChanges.lock.Lock()
proxier.serviceChanges.Lock()
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
proxier.serviceChanges.lock.Unlock()
proxier.serviceChanges.Unlock()
proxier.endpointsChanges.lock.Lock()
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
proxier.endpointsChanges.lock.Unlock()
if !serviceSyncRequired && !endpointsSyncRequired {
// If this was called because of a services update, but nothing actionable has changed, skip it.
if reason == syncReasonServices && !serviceSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}
proxier.endpointsChanges.Lock()
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
proxier.endpointsChanges.Unlock()
// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
if reason == syncReasonEndpoints && !endpointsSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}
glog.V(3).Infof("Syncing iptables rules")
// Create and link the kube services chain.
@ -1482,8 +1496,8 @@ func (proxier *Proxier) syncProxyRules() {
}
proxier.portsMap = replacementPortsMap
// Update healthz timestamp.
if proxier.healthzServer != nil {
// Update healthz timestamp if it is periodic sync.
if proxier.healthzServer != nil && reason == syncReasonForce {
proxier.healthzServer.UpdateTimestamp()
}

View File

@ -578,7 +578,7 @@ func TestClusterIPReject(t *testing.T) {
}),
)
makeEndpointsMap(fp)
fp.syncProxyRules()
fp.syncProxyRules(syncReasonForce)
svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP))))
svcRules := ipt.GetRules(svcChain)
@ -627,7 +627,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
}),
)
fp.syncProxyRules()
fp.syncProxyRules(syncReasonForce)
epStr := fmt.Sprintf("%s:%d", epIP, svcPort)
svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP))))
@ -691,7 +691,7 @@ func TestLoadBalancer(t *testing.T) {
}),
)
fp.syncProxyRules()
fp.syncProxyRules(syncReasonForce)
proto := strings.ToLower(string(api.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
@ -748,7 +748,7 @@ func TestNodePort(t *testing.T) {
}),
)
fp.syncProxyRules()
fp.syncProxyRules(syncReasonForce)
proto := strings.ToLower(string(api.ProtocolTCP))
svcChain := string(servicePortChainName(svcPortName.String(), proto))
@ -785,7 +785,7 @@ func TestExternalIPsReject(t *testing.T) {
)
makeEndpointsMap(fp)
fp.syncProxyRules()
fp.syncProxyRules(syncReasonForce)
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) {
@ -818,7 +818,7 @@ func TestNodePortReject(t *testing.T) {
)
makeEndpointsMap(fp)
fp.syncProxyRules()
fp.syncProxyRules(syncReasonForce)
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) {
@ -881,7 +881,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
}),
)
fp.syncProxyRules()
fp.syncProxyRules(syncReasonForce)
proto := strings.ToLower(string(api.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
@ -972,7 +972,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
}),
)
fp.syncProxyRules()
fp.syncProxyRules(syncReasonForce)
proto := strings.ToLower(string(api.ProtocolTCP))
lbChain := string(serviceLBChainName(svcPortName.String(), proto))