Merge pull request #67328 from cofyc/fix65113-2

Automatic merge from submit-queue (batch tested with PRs 66916, 67252, 67794, 67619, 67328). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Using a fixed set of locks, then we don't need to free unused locks anymore.

**What this PR does / why we need it**:

Using a fixed set of locks, then we don't need to free unused locks anymore.
See kubernetes/kubernetes/pull/66442 for discussions.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #65113

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```

/assign @msau42 
/assign @thockin
pull/8/head
Kubernetes Submit Queue 2018-08-24 15:25:17 -07:00 committed by GitHub
commit b883c5905a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 115 additions and 93 deletions

View File

@ -377,7 +377,6 @@ pkg/util/initsystem
pkg/util/ipconfig pkg/util/ipconfig
pkg/util/iptables pkg/util/iptables
pkg/util/iptables/testing pkg/util/iptables/testing
pkg/util/keymutex
pkg/util/labels pkg/util/labels
pkg/util/mount pkg/util/mount
pkg/util/netsh/testing pkg/util/netsh/testing

View File

@ -8,7 +8,10 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["keymutex.go"], srcs = [
"hashed.go",
"keymutex.go",
],
importpath = "k8s.io/kubernetes/pkg/util/keymutex", importpath = "k8s.io/kubernetes/pkg/util/keymutex",
deps = ["//vendor/github.com/golang/glog:go_default_library"], deps = ["//vendor/github.com/golang/glog:go_default_library"],
) )

View File

@ -0,0 +1,64 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package keymutex
import (
"hash/fnv"
"runtime"
"sync"
"github.com/golang/glog"
)
// NewHashed returns a new instance of KeyMutex which hashes arbitrary keys to
// a fixed set of locks. `n` specifies number of locks, if n <= 0, we use
// number of cpus.
// Note that because it uses fixed set of locks, different keys may share same
// lock, so it's possible to wait on same lock.
func NewHashed(n int) KeyMutex {
if n <= 0 {
n = runtime.NumCPU()
}
return &hashedKeyMutex{
mutexes: make([]sync.Mutex, n),
}
}
type hashedKeyMutex struct {
mutexes []sync.Mutex
}
// Acquires a lock associated with the specified ID.
func (km *hashedKeyMutex) LockKey(id string) {
glog.V(5).Infof("hashedKeyMutex.LockKey(...) called for id %q\r\n", id)
km.mutexes[km.hash(id)%len(km.mutexes)].Lock()
glog.V(5).Infof("hashedKeyMutex.LockKey(...) for id %q completed.\r\n", id)
}
// Releases the lock associated with the specified ID.
func (km *hashedKeyMutex) UnlockKey(id string) error {
glog.V(5).Infof("hashedKeyMutex.UnlockKey(...) called for id %q\r\n", id)
km.mutexes[km.hash(id)%len(km.mutexes)].Unlock()
glog.V(5).Infof("hashedKeyMutex.UnlockKey(...) for id %q completed.\r\n", id)
return nil
}
func (km *hashedKeyMutex) hash(id string) int {
h := fnv.New32a()
h.Write([]byte(id))
return int(h.Sum32())
}

View File

@ -16,13 +16,6 @@ limitations under the License.
package keymutex package keymutex
import (
"fmt"
"sync"
"github.com/golang/glog"
)
// KeyMutex is a thread-safe interface for acquiring locks on arbitrary strings. // KeyMutex is a thread-safe interface for acquiring locks on arbitrary strings.
type KeyMutex interface { type KeyMutex interface {
// Acquires a lock associated with the specified ID, creates the lock if one doesn't already exist. // Acquires a lock associated with the specified ID, creates the lock if one doesn't already exist.
@ -32,52 +25,3 @@ type KeyMutex interface {
// Returns an error if the specified ID doesn't exist. // Returns an error if the specified ID doesn't exist.
UnlockKey(id string) error UnlockKey(id string) error
} }
// Returns a new instance of a key mutex.
func NewKeyMutex() KeyMutex {
return &keyMutex{
mutexMap: make(map[string]*sync.Mutex),
}
}
type keyMutex struct {
sync.RWMutex
mutexMap map[string]*sync.Mutex
}
// Acquires a lock associated with the specified ID (creates the lock if one doesn't already exist).
func (km *keyMutex) LockKey(id string) {
glog.V(5).Infof("LockKey(...) called for id %q\r\n", id)
mutex := km.getOrCreateLock(id)
mutex.Lock()
glog.V(5).Infof("LockKey(...) for id %q completed.\r\n", id)
}
// Releases the lock associated with the specified ID.
// Returns an error if the specified ID doesn't exist.
func (km *keyMutex) UnlockKey(id string) error {
glog.V(5).Infof("UnlockKey(...) called for id %q\r\n", id)
km.RLock()
defer km.RUnlock()
mutex, exists := km.mutexMap[id]
if !exists {
return fmt.Errorf("id %q not found", id)
}
glog.V(5).Infof("UnlockKey(...) for id. Mutex found, trying to unlock it. %q\r\n", id)
mutex.Unlock()
glog.V(5).Infof("UnlockKey(...) for id %q completed.\r\n", id)
return nil
}
// Returns lock associated with the specified ID, or creates the lock if one doesn't already exist.
func (km *keyMutex) getOrCreateLock(id string) *sync.Mutex {
km.Lock()
defer km.Unlock()
if _, exists := km.mutexMap[id]; !exists {
km.mutexMap[id] = &sync.Mutex{}
}
return km.mutexMap[id]
}

View File

@ -25,46 +25,58 @@ const (
callbackTimeout = 1 * time.Second callbackTimeout = 1 * time.Second
) )
func newKeyMutexes() []KeyMutex {
return []KeyMutex{
NewHashed(0),
NewHashed(1),
NewHashed(2),
NewHashed(4),
}
}
func Test_SingleLock_NoUnlock(t *testing.T) { func Test_SingleLock_NoUnlock(t *testing.T) {
// Arrange for _, km := range newKeyMutexes() {
km := NewKeyMutex() // Arrange
key := "fakeid" key := "fakeid"
callbackCh := make(chan interface{}) callbackCh := make(chan interface{})
// Act // Act
go lockAndCallback(km, key, callbackCh) go lockAndCallback(km, key, callbackCh)
// Assert // Assert
verifyCallbackHappens(t, callbackCh) verifyCallbackHappens(t, callbackCh)
}
} }
func Test_SingleLock_SingleUnlock(t *testing.T) { func Test_SingleLock_SingleUnlock(t *testing.T) {
// Arrange for _, km := range newKeyMutexes() {
km := NewKeyMutex() // Arrange
key := "fakeid" key := "fakeid"
callbackCh := make(chan interface{}) callbackCh := make(chan interface{})
// Act & Assert // Act & Assert
go lockAndCallback(km, key, callbackCh) go lockAndCallback(km, key, callbackCh)
verifyCallbackHappens(t, callbackCh) verifyCallbackHappens(t, callbackCh)
km.UnlockKey(key) km.UnlockKey(key)
}
} }
func Test_DoubleLock_DoubleUnlock(t *testing.T) { func Test_DoubleLock_DoubleUnlock(t *testing.T) {
// Arrange for _, km := range newKeyMutexes() {
km := NewKeyMutex() // Arrange
key := "fakeid" key := "fakeid"
callbackCh1stLock := make(chan interface{}) callbackCh1stLock := make(chan interface{})
callbackCh2ndLock := make(chan interface{}) callbackCh2ndLock := make(chan interface{})
// Act & Assert // Act & Assert
go lockAndCallback(km, key, callbackCh1stLock) go lockAndCallback(km, key, callbackCh1stLock)
verifyCallbackHappens(t, callbackCh1stLock) verifyCallbackHappens(t, callbackCh1stLock)
go lockAndCallback(km, key, callbackCh2ndLock) go lockAndCallback(km, key, callbackCh2ndLock)
verifyCallbackDoesntHappens(t, callbackCh2ndLock) verifyCallbackDoesntHappens(t, callbackCh2ndLock)
km.UnlockKey(key) km.UnlockKey(key)
verifyCallbackHappens(t, callbackCh2ndLock) verifyCallbackHappens(t, callbackCh2ndLock)
km.UnlockKey(key) km.UnlockKey(key)
}
} }
func lockAndCallback(km KeyMutex, id string, callbackCh chan<- interface{}) { func lockAndCallback(km KeyMutex, id string, callbackCh chan<- interface{}) {

View File

@ -56,7 +56,7 @@ var _ volume.DeviceMounter = &azureDiskAttacher{}
var _ volume.DeviceUnmounter = &azureDiskDetacher{} var _ volume.DeviceUnmounter = &azureDiskDetacher{}
// acquire lock to get an lun number // acquire lock to get an lun number
var getLunMutex = keymutex.NewKeyMutex() var getLunMutex = keymutex.NewHashed(0)
// Attach attaches a volume.Spec to an Azure VM referenced by NodeName, returning the disk's LUN // Attach attaches a volume.Spec to an Azure VM referenced by NodeName, returning the disk's LUN
func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {

View File

@ -87,7 +87,7 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
func (plugin *cinderPlugin) Init(host volume.VolumeHost) error { func (plugin *cinderPlugin) Init(host volume.VolumeHost) error {
plugin.host = host plugin.host = host
plugin.volumeLocks = keymutex.NewKeyMutex() plugin.volumeLocks = keymutex.NewHashed(0)
return nil return nil
} }

View File

@ -55,7 +55,7 @@ const (
func (plugin *iscsiPlugin) Init(host volume.VolumeHost) error { func (plugin *iscsiPlugin) Init(host volume.VolumeHost) error {
plugin.host = host plugin.host = host
plugin.targetLocks = keymutex.NewKeyMutex() plugin.targetLocks = keymutex.NewHashed(0)
return nil return nil
} }

View File

@ -59,7 +59,7 @@ const (
func (plugin *localVolumePlugin) Init(host volume.VolumeHost) error { func (plugin *localVolumePlugin) Init(host volume.VolumeHost) error {
plugin.host = host plugin.host = host
plugin.volumeLocks = keymutex.NewKeyMutex() plugin.volumeLocks = keymutex.NewHashed(0)
plugin.recorder = host.GetEventRecorder() plugin.recorder = host.GetEventRecorder()
return nil return nil
} }

View File

@ -50,7 +50,7 @@ var _ volume.VolumePlugin = &sioPlugin{}
func (p *sioPlugin) Init(host volume.VolumeHost) error { func (p *sioPlugin) Init(host volume.VolumeHost) error {
p.host = host p.host = host
p.volumeMtx = keymutex.NewKeyMutex() p.volumeMtx = keymutex.NewHashed(0)
return nil return nil
} }

View File

@ -46,7 +46,7 @@ var _ volume.AttachableVolumePlugin = &vsphereVolumePlugin{}
var _ volume.DeviceMountableVolumePlugin = &vsphereVolumePlugin{} var _ volume.DeviceMountableVolumePlugin = &vsphereVolumePlugin{}
// Singleton key mutex for keeping attach operations for the same host atomic // Singleton key mutex for keeping attach operations for the same host atomic
var attachdetachMutex = keymutex.NewKeyMutex() var attachdetachMutex = keymutex.NewHashed(0)
func (plugin *vsphereVolumePlugin) NewAttacher() (volume.Attacher, error) { func (plugin *vsphereVolumePlugin) NewAttacher() (volume.Attacher, error) {
vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider()) vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider())