Merge pull request #44766 from wojtek-t/better_locking_mechanism

Automatic merge from submit-queue (batch tested with PRs 44044, 44766, 44930, 45109, 43301)

Reduce lock contention in KubeProxy
pull/6/head
Kubernetes Submit Queue 2017-04-28 17:49:08 -07:00 committed by GitHub
commit 10f90b66dc
2 changed files with 82 additions and 96 deletions

View File

@ -199,16 +199,62 @@ type endpointsChange struct {
current *api.Endpoints
}
type endpointsChangeMap struct {
sync.Mutex
items map[types.NamespacedName]*endpointsChange
}
type serviceChange struct {
previous *api.Service
current *api.Service
}
type endpointsChangeMap map[types.NamespacedName]*endpointsChange
type serviceChangeMap map[types.NamespacedName]*serviceChange
type serviceChangeMap struct {
sync.Mutex
items map[types.NamespacedName]*serviceChange
}
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
func newEndpointsChangeMap() endpointsChangeMap {
return endpointsChangeMap{
items: make(map[types.NamespacedName]*endpointsChange),
}
}
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) {
ecm.Lock()
defer ecm.Unlock()
change, exists := ecm.items[*namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = previous
ecm.items[*namespacedName] = change
}
change.current = current
}
func newServiceChangeMap() serviceChangeMap {
return serviceChangeMap{
items: make(map[types.NamespacedName]*serviceChange),
}
}
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) {
scm.Lock()
defer scm.Unlock()
change, exists := scm.items[*namespacedName]
if !exists {
change = &serviceChange{}
change.previous = previous
scm.items[*namespacedName] = change
}
change.current = current
}
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
for svcPort := range other {
em[svcPort] = other[svcPort]
@ -224,24 +270,17 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
// serviceChanges contains all changes to services that happened since
// last syncProxyRules call. For a single object, changes are accumulated,
// i.e. previous is state from before all of them, current is state after
// applying all of those.
serviceChanges serviceChangeMap
endpointsMap proxyEndpointsMap
// endpointsChanges contains all changes to endpoints that happened since
// last syncProxyRules call. For a single object, changes are accumulated,
// i.e. previous is state from before all of them, current is state after
// applying all of those.
// endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since last syncProxyRules call. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
endpointsChanges endpointsChangeMap
serviceChanges serviceChangeMap
portsMap map[localPort]closeable
mu sync.Mutex // protects the following fields
serviceMap proxyServiceMap
endpointsMap proxyEndpointsMap
portsMap map[localPort]closeable
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables
// with some partial data after kube-proxy restart.
@ -360,9 +399,9 @@ func NewProxier(ipt utiliptables.Interface,
return &Proxier{
portsMap: make(map[localPort]closeable),
serviceMap: make(proxyServiceMap),
serviceChanges: make(serviceChangeMap),
serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: make(endpointsChangeMap),
endpointsChanges: newEndpointsChangeMap(),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
@ -468,8 +507,6 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.syncProxyRules(syncReasonForce)
}
@ -486,59 +523,30 @@ func (proxier *Proxier) SyncLoop() {
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
change, exists := proxier.serviceChanges[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = nil
proxier.serviceChanges[namespacedName] = change
}
change.current = service
proxier.serviceChanges.update(&namespacedName, nil, service)
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
change, exists := proxier.serviceChanges[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = oldService
proxier.serviceChanges[namespacedName] = change
}
change.current = service
proxier.serviceChanges.update(&namespacedName, oldService, service)
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
change, exists := proxier.serviceChanges[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = service
proxier.serviceChanges[namespacedName] = change
}
change.current = nil
proxier.serviceChanges.update(&namespacedName, service, nil)
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.servicesSynced = true
proxier.mu.Unlock()
proxier.syncProxyRules(syncReasonServices)
}
@ -625,12 +633,12 @@ func updateServiceMap(
syncRequired = false
staleServices = sets.NewString()
for _, change := range *changes {
for _, change := range changes.items {
mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current)
unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices)
syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired
}
*changes = make(serviceChangeMap)
changes.items = make(map[types.NamespacedName]*serviceChange)
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
@ -652,59 +660,30 @@ func updateServiceMap(
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = nil
proxier.endpointsChanges[namespacedName] = change
}
change.current = endpoints
proxier.endpointsChanges.update(&namespacedName, nil, endpoints)
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = oldEndpoints
proxier.endpointsChanges[namespacedName] = change
}
change.current = endpoints
proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints)
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
change, exists := proxier.endpointsChanges[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = endpoints
proxier.endpointsChanges[namespacedName] = change
}
change.current = nil
proxier.endpointsChanges.update(&namespacedName, endpoints, nil)
proxier.syncProxyRules(syncReasonEndpoints)
}
func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
proxier.mu.Unlock()
proxier.syncProxyRules(syncReasonEndpoints)
}
@ -716,7 +695,7 @@ func updateEndpointsMap(
hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
syncRequired = false
staleSet = make(map[endpointServicePair]bool)
for _, change := range *changes {
for _, change := range changes.items {
oldEndpointsMap := endpointsToEndpointsMap(change.previous, hostname)
newEndpointsMap := endpointsToEndpointsMap(change.current, hostname)
if !reflect.DeepEqual(oldEndpointsMap, newEndpointsMap) {
@ -726,7 +705,7 @@ func updateEndpointsMap(
syncRequired = true
}
}
*changes = make(endpointsChangeMap)
changes.items = make(map[types.NamespacedName]*endpointsChange)
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
return
@ -904,6 +883,9 @@ const syncReasonForce syncReason = "Force"
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules(reason syncReason) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
if proxier.throttle != nil {
proxier.throttle.Accept()
}
@ -918,8 +900,10 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
// Figure out the new services we need to activate.
proxier.serviceChanges.Lock()
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
proxier.serviceChanges.Unlock()
// If this was called because of a services update, but nothing actionable has changed, skip it.
if reason == syncReasonServices && !serviceSyncRequired {
@ -927,8 +911,10 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
return
}
proxier.endpointsChanges.Lock()
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
proxier.endpointsChanges.Unlock()
// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
if reason == syncReasonEndpoints && !endpointsSyncRequired {

View File

@ -386,9 +386,9 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
return &Proxier{
exec: &exec.FakeExec{},
serviceMap: make(proxyServiceMap),
serviceChanges: make(serviceChangeMap),
serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: make(endpointsChangeMap),
endpointsChanges: newEndpointsChangeMap(),
iptables: ipt,
clusterCIDR: "10.0.0.0/24",
hostname: testHostname,