diff --git a/hack/.golint_failures b/hack/.golint_failures index 0345369d29..46426eb91a 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -377,7 +377,6 @@ pkg/util/initsystem pkg/util/ipconfig pkg/util/iptables pkg/util/iptables/testing -pkg/util/keymutex pkg/util/labels pkg/util/mount pkg/util/netsh/testing diff --git a/pkg/util/keymutex/BUILD b/pkg/util/keymutex/BUILD index bc278f010a..256ed34181 100644 --- a/pkg/util/keymutex/BUILD +++ b/pkg/util/keymutex/BUILD @@ -8,7 +8,10 @@ load( go_library( name = "go_default_library", - srcs = ["keymutex.go"], + srcs = [ + "hashed.go", + "keymutex.go", + ], importpath = "k8s.io/kubernetes/pkg/util/keymutex", deps = ["//vendor/github.com/golang/glog:go_default_library"], ) diff --git a/pkg/util/keymutex/hashed.go b/pkg/util/keymutex/hashed.go new file mode 100644 index 0000000000..5fe9a025c2 --- /dev/null +++ b/pkg/util/keymutex/hashed.go @@ -0,0 +1,64 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keymutex + +import ( + "hash/fnv" + "runtime" + "sync" + + "github.com/golang/glog" +) + +// NewHashed returns a new instance of KeyMutex which hashes arbitrary keys to +// a fixed set of locks. `n` specifies number of locks, if n <= 0, we use +// number of cpus. +// Note that because it uses fixed set of locks, different keys may share same +// lock, so it's possible to wait on same lock. +func NewHashed(n int) KeyMutex { + if n <= 0 { + n = runtime.NumCPU() + } + return &hashedKeyMutex{ + mutexes: make([]sync.Mutex, n), + } +} + +type hashedKeyMutex struct { + mutexes []sync.Mutex +} + +// Acquires a lock associated with the specified ID. +func (km *hashedKeyMutex) LockKey(id string) { + glog.V(5).Infof("hashedKeyMutex.LockKey(...) called for id %q\r\n", id) + km.mutexes[km.hash(id)%len(km.mutexes)].Lock() + glog.V(5).Infof("hashedKeyMutex.LockKey(...) for id %q completed.\r\n", id) +} + +// Releases the lock associated with the specified ID. +func (km *hashedKeyMutex) UnlockKey(id string) error { + glog.V(5).Infof("hashedKeyMutex.UnlockKey(...) called for id %q\r\n", id) + km.mutexes[km.hash(id)%len(km.mutexes)].Unlock() + glog.V(5).Infof("hashedKeyMutex.UnlockKey(...) for id %q completed.\r\n", id) + return nil +} + +func (km *hashedKeyMutex) hash(id string) int { + h := fnv.New32a() + h.Write([]byte(id)) + return int(h.Sum32()) +} diff --git a/pkg/util/keymutex/keymutex.go b/pkg/util/keymutex/keymutex.go index 6955d2bb70..89dc022397 100644 --- a/pkg/util/keymutex/keymutex.go +++ b/pkg/util/keymutex/keymutex.go @@ -16,13 +16,6 @@ limitations under the License. package keymutex -import ( - "fmt" - "sync" - - "github.com/golang/glog" -) - // KeyMutex is a thread-safe interface for acquiring locks on arbitrary strings. type KeyMutex interface { // Acquires a lock associated with the specified ID, creates the lock if one doesn't already exist. @@ -32,52 +25,3 @@ type KeyMutex interface { // Returns an error if the specified ID doesn't exist. UnlockKey(id string) error } - -// Returns a new instance of a key mutex. -func NewKeyMutex() KeyMutex { - return &keyMutex{ - mutexMap: make(map[string]*sync.Mutex), - } -} - -type keyMutex struct { - sync.RWMutex - mutexMap map[string]*sync.Mutex -} - -// Acquires a lock associated with the specified ID (creates the lock if one doesn't already exist). -func (km *keyMutex) LockKey(id string) { - glog.V(5).Infof("LockKey(...) called for id %q\r\n", id) - mutex := km.getOrCreateLock(id) - mutex.Lock() - glog.V(5).Infof("LockKey(...) for id %q completed.\r\n", id) -} - -// Releases the lock associated with the specified ID. -// Returns an error if the specified ID doesn't exist. -func (km *keyMutex) UnlockKey(id string) error { - glog.V(5).Infof("UnlockKey(...) called for id %q\r\n", id) - km.RLock() - defer km.RUnlock() - mutex, exists := km.mutexMap[id] - if !exists { - return fmt.Errorf("id %q not found", id) - } - glog.V(5).Infof("UnlockKey(...) for id. Mutex found, trying to unlock it. %q\r\n", id) - - mutex.Unlock() - glog.V(5).Infof("UnlockKey(...) for id %q completed.\r\n", id) - return nil -} - -// Returns lock associated with the specified ID, or creates the lock if one doesn't already exist. -func (km *keyMutex) getOrCreateLock(id string) *sync.Mutex { - km.Lock() - defer km.Unlock() - - if _, exists := km.mutexMap[id]; !exists { - km.mutexMap[id] = &sync.Mutex{} - } - - return km.mutexMap[id] -} diff --git a/pkg/util/keymutex/keymutex_test.go b/pkg/util/keymutex/keymutex_test.go index c155a18af5..ce2f567bb2 100644 --- a/pkg/util/keymutex/keymutex_test.go +++ b/pkg/util/keymutex/keymutex_test.go @@ -25,46 +25,58 @@ const ( callbackTimeout = 1 * time.Second ) +func newKeyMutexes() []KeyMutex { + return []KeyMutex{ + NewHashed(0), + NewHashed(1), + NewHashed(2), + NewHashed(4), + } +} + func Test_SingleLock_NoUnlock(t *testing.T) { - // Arrange - km := NewKeyMutex() - key := "fakeid" - callbackCh := make(chan interface{}) + for _, km := range newKeyMutexes() { + // Arrange + key := "fakeid" + callbackCh := make(chan interface{}) - // Act - go lockAndCallback(km, key, callbackCh) + // Act + go lockAndCallback(km, key, callbackCh) - // Assert - verifyCallbackHappens(t, callbackCh) + // Assert + verifyCallbackHappens(t, callbackCh) + } } func Test_SingleLock_SingleUnlock(t *testing.T) { - // Arrange - km := NewKeyMutex() - key := "fakeid" - callbackCh := make(chan interface{}) + for _, km := range newKeyMutexes() { + // Arrange + key := "fakeid" + callbackCh := make(chan interface{}) - // Act & Assert - go lockAndCallback(km, key, callbackCh) - verifyCallbackHappens(t, callbackCh) - km.UnlockKey(key) + // Act & Assert + go lockAndCallback(km, key, callbackCh) + verifyCallbackHappens(t, callbackCh) + km.UnlockKey(key) + } } func Test_DoubleLock_DoubleUnlock(t *testing.T) { - // Arrange - km := NewKeyMutex() - key := "fakeid" - callbackCh1stLock := make(chan interface{}) - callbackCh2ndLock := make(chan interface{}) + for _, km := range newKeyMutexes() { + // Arrange + key := "fakeid" + callbackCh1stLock := make(chan interface{}) + callbackCh2ndLock := make(chan interface{}) - // Act & Assert - go lockAndCallback(km, key, callbackCh1stLock) - verifyCallbackHappens(t, callbackCh1stLock) - go lockAndCallback(km, key, callbackCh2ndLock) - verifyCallbackDoesntHappens(t, callbackCh2ndLock) - km.UnlockKey(key) - verifyCallbackHappens(t, callbackCh2ndLock) - km.UnlockKey(key) + // Act & Assert + go lockAndCallback(km, key, callbackCh1stLock) + verifyCallbackHappens(t, callbackCh1stLock) + go lockAndCallback(km, key, callbackCh2ndLock) + verifyCallbackDoesntHappens(t, callbackCh2ndLock) + km.UnlockKey(key) + verifyCallbackHappens(t, callbackCh2ndLock) + km.UnlockKey(key) + } } func lockAndCallback(km KeyMutex, id string, callbackCh chan<- interface{}) { diff --git a/pkg/volume/azure_dd/attacher.go b/pkg/volume/azure_dd/attacher.go index 556496b24a..6633ed8a59 100644 --- a/pkg/volume/azure_dd/attacher.go +++ b/pkg/volume/azure_dd/attacher.go @@ -56,7 +56,7 @@ var _ volume.DeviceMounter = &azureDiskAttacher{} var _ volume.DeviceUnmounter = &azureDiskDetacher{} // acquire lock to get an lun number -var getLunMutex = keymutex.NewKeyMutex() +var getLunMutex = keymutex.NewHashed(0) // Attach attaches a volume.Spec to an Azure VM referenced by NodeName, returning the disk's LUN func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { diff --git a/pkg/volume/cinder/cinder.go b/pkg/volume/cinder/cinder.go index d799d4b3a6..7086cc06f8 100644 --- a/pkg/volume/cinder/cinder.go +++ b/pkg/volume/cinder/cinder.go @@ -87,7 +87,7 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string { func (plugin *cinderPlugin) Init(host volume.VolumeHost) error { plugin.host = host - plugin.volumeLocks = keymutex.NewKeyMutex() + plugin.volumeLocks = keymutex.NewHashed(0) return nil } diff --git a/pkg/volume/iscsi/iscsi.go b/pkg/volume/iscsi/iscsi.go index 8e003b7849..f7f2cc2b13 100644 --- a/pkg/volume/iscsi/iscsi.go +++ b/pkg/volume/iscsi/iscsi.go @@ -55,7 +55,7 @@ const ( func (plugin *iscsiPlugin) Init(host volume.VolumeHost) error { plugin.host = host - plugin.targetLocks = keymutex.NewKeyMutex() + plugin.targetLocks = keymutex.NewHashed(0) return nil } diff --git a/pkg/volume/local/local.go b/pkg/volume/local/local.go index 5a3e2ea64c..c8ec8ed14d 100644 --- a/pkg/volume/local/local.go +++ b/pkg/volume/local/local.go @@ -59,7 +59,7 @@ const ( func (plugin *localVolumePlugin) Init(host volume.VolumeHost) error { plugin.host = host - plugin.volumeLocks = keymutex.NewKeyMutex() + plugin.volumeLocks = keymutex.NewHashed(0) plugin.recorder = host.GetEventRecorder() return nil } diff --git a/pkg/volume/scaleio/sio_plugin.go b/pkg/volume/scaleio/sio_plugin.go index 6eb91b79bc..367c9b9638 100644 --- a/pkg/volume/scaleio/sio_plugin.go +++ b/pkg/volume/scaleio/sio_plugin.go @@ -50,7 +50,7 @@ var _ volume.VolumePlugin = &sioPlugin{} func (p *sioPlugin) Init(host volume.VolumeHost) error { p.host = host - p.volumeMtx = keymutex.NewKeyMutex() + p.volumeMtx = keymutex.NewHashed(0) return nil } diff --git a/pkg/volume/vsphere_volume/attacher.go b/pkg/volume/vsphere_volume/attacher.go index c49be010c3..9f292e348f 100644 --- a/pkg/volume/vsphere_volume/attacher.go +++ b/pkg/volume/vsphere_volume/attacher.go @@ -46,7 +46,7 @@ var _ volume.AttachableVolumePlugin = &vsphereVolumePlugin{} var _ volume.DeviceMountableVolumePlugin = &vsphereVolumePlugin{} // Singleton key mutex for keeping attach operations for the same host atomic -var attachdetachMutex = keymutex.NewKeyMutex() +var attachdetachMutex = keymutex.NewHashed(0) func (plugin *vsphereVolumePlugin) NewAttacher() (volume.Attacher, error) { vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider())