From 6c70ca61bee5ffc2e817c186ea3df9bd2205d768 Mon Sep 17 00:00:00 2001 From: andyzhangx Date: Sun, 14 Apr 2019 03:09:06 +0000 Subject: [PATCH] move disk lock process to azure cloud provider fix comments fix import keymux check error add unit test for attach/detach disk funcs --- .../providers/.import-restrictions | 3 +- pkg/cloudprovider/providers/azure/BUILD | 2 + .../azure/azure_controller_common.go | 39 ++++++++++- .../azure/azure_controller_common_test.go | 66 +++++++++++++++++++ pkg/volume/azure_dd/BUILD | 1 - pkg/volume/azure_dd/attacher.go | 40 ++--------- pkg/volume/azure_dd/azure_dd.go | 2 +- 7 files changed, 113 insertions(+), 40 deletions(-) create mode 100644 pkg/cloudprovider/providers/azure/azure_controller_common_test.go diff --git a/pkg/cloudprovider/providers/.import-restrictions b/pkg/cloudprovider/providers/.import-restrictions index 80b05fdd20..8371496c69 100644 --- a/pkg/cloudprovider/providers/.import-restrictions +++ b/pkg/cloudprovider/providers/.import-restrictions @@ -8,7 +8,8 @@ "k8s.io/utils/io", "k8s.io/utils/strings", "k8s.io/utils/exec", - "k8s.io/utils/path" + "k8s.io/utils/path", + "k8s.io/utils/keymutex" ] }, { diff --git a/pkg/cloudprovider/providers/azure/BUILD b/pkg/cloudprovider/providers/azure/BUILD index bd97289989..bbe2482526 100644 --- a/pkg/cloudprovider/providers/azure/BUILD +++ b/pkg/cloudprovider/providers/azure/BUILD @@ -69,6 +69,7 @@ go_library( "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library", "//vendor/k8s.io/klog:go_default_library", + "//vendor/k8s.io/utils/keymutex:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], ) @@ -78,6 +79,7 @@ go_test( srcs = [ "azure_backoff_test.go", "azure_cache_test.go", + "azure_controller_common_test.go", "azure_instances_test.go", "azure_loadbalancer_test.go", "azure_metrics_test.go", diff --git a/pkg/cloudprovider/providers/azure/azure_controller_common.go b/pkg/cloudprovider/providers/azure/azure_controller_common.go index 1b5148d3d5..b4b2e53289 100644 --- a/pkg/cloudprovider/providers/azure/azure_controller_common.go +++ b/pkg/cloudprovider/providers/azure/azure_controller_common.go @@ -17,6 +17,7 @@ limitations under the License. package azure import ( + "context" "fmt" "time" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/types" kwait "k8s.io/apimachinery/pkg/util/wait" cloudprovider "k8s.io/cloud-provider" + "k8s.io/utils/keymutex" ) const ( @@ -50,6 +52,9 @@ var defaultBackOff = kwait.Backoff{ Jitter: 0.0, } +// acquire lock to attach/detach disk in one node +var diskOpMutex = keymutex.NewHashed(0) + type controllerCommon struct { subscriptionID string location string @@ -85,13 +90,29 @@ func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error) return ss, nil } -// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun. -func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error { +// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI. +func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, cachingMode compute.CachingTypes) error { vmset, err := c.getNodeVMSet(nodeName) if err != nil { return err } + instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName) + if err != nil { + klog.Warningf("failed to get azure instance id (%v)", err) + return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) + } + + diskOpMutex.LockKey(instanceid) + defer diskOpMutex.UnlockKey(instanceid) + + lun, err := c.GetNextDiskLun(nodeName) + if err != nil { + klog.Warningf("no LUN available for instance %q (%v)", nodeName, err) + return fmt.Errorf("all LUNs are used, cannot attach volume (%s, %s) to instance %q (%v)", diskName, diskURI, instanceid, err) + } + + klog.V(2).Infof("Trying to attach volume %q lun %d to node %q.", diskURI, lun, nodeName) return vmset.AttachDisk(isManagedDisk, diskName, diskURI, nodeName, lun, cachingMode) } @@ -102,11 +123,25 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N return err } + instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName) + if err != nil { + klog.Warningf("failed to get azure instance id (%v)", err) + return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) + } + + klog.V(2).Infof("detach %v from node %q", diskURI, nodeName) + + // make the lock here as small as possible + diskOpMutex.LockKey(instanceid) resp, err := vmset.DetachDisk(diskName, diskURI, nodeName) + diskOpMutex.UnlockKey(instanceid) + if c.cloud.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { klog.V(2).Infof("azureDisk - update backing off: detach disk(%s, %s), err: %v", diskName, diskURI, err) retryErr := kwait.ExponentialBackoff(c.cloud.requestBackoff(), func() (bool, error) { + diskOpMutex.LockKey(instanceid) resp, err := vmset.DetachDisk(diskName, diskURI, nodeName) + diskOpMutex.UnlockKey(instanceid) return c.cloud.processHTTPRetryResponse(nil, "", resp, err) }) if retryErr != nil { diff --git a/pkg/cloudprovider/providers/azure/azure_controller_common_test.go b/pkg/cloudprovider/providers/azure/azure_controller_common_test.go new file mode 100644 index 0000000000..e4b30d41cf --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_controller_common_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "testing" + + "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute" +) + +func TestAttachDisk(t *testing.T) { + c := getTestCloud() + + common := &controllerCommon{ + location: c.Location, + storageEndpointSuffix: c.Environment.StorageEndpointSuffix, + resourceGroup: c.ResourceGroup, + subscriptionID: c.SubscriptionID, + cloud: c, + } + + diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name", c.SubscriptionID, c.ResourceGroup) + + err := common.AttachDisk(true, "", diskURI, "node1", compute.CachingTypesReadOnly) + if err != nil { + fmt.Printf("TestAttachDisk return expected error: %v", err) + } else { + t.Errorf("TestAttachDisk unexpected nil err") + } +} + +func TestDetachDisk(t *testing.T) { + c := getTestCloud() + + common := &controllerCommon{ + location: c.Location, + storageEndpointSuffix: c.Environment.StorageEndpointSuffix, + resourceGroup: c.ResourceGroup, + subscriptionID: c.SubscriptionID, + cloud: c, + } + + diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name", c.SubscriptionID, c.ResourceGroup) + + err := common.DetachDisk("", diskURI, "node1") + if err != nil { + fmt.Printf("TestAttachDisk return expected error: %v", err) + } else { + t.Errorf("TestAttachDisk unexpected nil err") + } +} diff --git a/pkg/volume/azure_dd/BUILD b/pkg/volume/azure_dd/BUILD index d45498cd24..a894b6ee53 100644 --- a/pkg/volume/azure_dd/BUILD +++ b/pkg/volume/azure_dd/BUILD @@ -40,7 +40,6 @@ go_library( "//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute:go_default_library", "//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-07-01/storage:go_default_library", "//vendor/k8s.io/klog:go_default_library", - "//vendor/k8s.io/utils/keymutex:go_default_library", "//vendor/k8s.io/utils/strings:go_default_library", ], ) diff --git a/pkg/volume/azure_dd/attacher.go b/pkg/volume/azure_dd/attacher.go index b9453f7015..f7486e1e1d 100644 --- a/pkg/volume/azure_dd/attacher.go +++ b/pkg/volume/azure_dd/attacher.go @@ -17,7 +17,6 @@ limitations under the License. package azure_dd import ( - "context" "fmt" "os" "path/filepath" @@ -36,7 +35,6 @@ import ( "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" - "k8s.io/utils/keymutex" ) type azureDiskDetacher struct { @@ -55,9 +53,6 @@ var _ volume.Detacher = &azureDiskDetacher{} var _ volume.DeviceMounter = &azureDiskAttacher{} var _ volume.DeviceUnmounter = &azureDiskDetacher{} -// acquire lock to get an lun number -var getLunMutex = keymutex.NewHashed(0) - // Attach attaches a volume.Spec to an Azure VM referenced by NodeName, returning the disk's LUN func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { volumeSource, _, err := getVolumeSource(spec) @@ -66,12 +61,6 @@ func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) ( return "", err } - instanceid, err := a.cloud.InstanceID(context.TODO(), nodeName) - if err != nil { - klog.Warningf("failed to get azure instance id (%v)", err) - return "", fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) - } - diskController, err := getDiskController(a.plugin.host) if err != nil { return "", err @@ -82,30 +71,22 @@ func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) ( // Log error and continue with attach klog.Warningf( "Error checking if volume is already attached to current node (%q). Will continue and try attach anyway. err=%v", - instanceid, err) + nodeName, err) } if err == nil { // Volume is already attached to node. - klog.V(2).Infof("Attach operation is successful. volume %q is already attached to node %q at lun %d.", volumeSource.DiskName, instanceid, lun) + klog.V(2).Infof("Attach operation is successful. volume %q is already attached to node %q at lun %d.", volumeSource.DiskName, nodeName, lun) } else { klog.V(2).Infof("GetDiskLun returned: %v. Initiating attaching volume %q to node %q.", err, volumeSource.DataDiskURI, nodeName) - getLunMutex.LockKey(instanceid) - defer getLunMutex.UnlockKey(instanceid) - lun, err = diskController.GetNextDiskLun(nodeName) - if err != nil { - klog.Warningf("no LUN available for instance %q (%v)", nodeName, err) - return "", fmt.Errorf("all LUNs are used, cannot attach volume %q to instance %q (%v)", volumeSource.DiskName, instanceid, err) - } - klog.V(2).Infof("Trying to attach volume %q lun %d to node %q.", volumeSource.DataDiskURI, lun, nodeName) isManagedDisk := (*volumeSource.Kind == v1.AzureManagedDisk) - err = diskController.AttachDisk(isManagedDisk, volumeSource.DiskName, volumeSource.DataDiskURI, nodeName, lun, compute.CachingTypes(*volumeSource.CachingMode)) + err = diskController.AttachDisk(isManagedDisk, volumeSource.DiskName, volumeSource.DataDiskURI, nodeName, compute.CachingTypes(*volumeSource.CachingMode)) if err == nil { klog.V(2).Infof("Attach operation successful: volume %q attached to node %q.", volumeSource.DataDiskURI, nodeName) } else { - klog.V(2).Infof("Attach volume %q to instance %q failed with %v", volumeSource.DataDiskURI, instanceid, err) - return "", fmt.Errorf("Attach volume %q to instance %q failed with %v", volumeSource.DiskName, instanceid, err) + klog.V(2).Infof("Attach volume %q to instance %q failed with %v", volumeSource.DataDiskURI, nodeName, err) + return "", fmt.Errorf("Attach volume %q to instance %q failed with %v", volumeSource.DiskName, nodeName, err) } } @@ -285,22 +266,11 @@ func (d *azureDiskDetacher) Detach(diskURI string, nodeName types.NodeName) erro return fmt.Errorf("invalid disk to detach: %q", diskURI) } - instanceid, err := d.cloud.InstanceID(context.TODO(), nodeName) - if err != nil { - klog.Warningf("no instance id for node %q, skip detaching (%v)", nodeName, err) - return nil - } - - klog.V(2).Infof("detach %v from node %q", diskURI, nodeName) - diskController, err := getDiskController(d.plugin.host) if err != nil { return err } - getLunMutex.LockKey(instanceid) - defer getLunMutex.UnlockKey(instanceid) - err = diskController.DetachDisk("", diskURI, nodeName) if err != nil { klog.Errorf("failed to detach azure disk %q, err %v", diskURI, err) diff --git a/pkg/volume/azure_dd/azure_dd.go b/pkg/volume/azure_dd/azure_dd.go index 04e31a661a..811e04106a 100644 --- a/pkg/volume/azure_dd/azure_dd.go +++ b/pkg/volume/azure_dd/azure_dd.go @@ -44,7 +44,7 @@ type DiskController interface { DeleteManagedDisk(diskURI string) error // Attaches the disk to the host machine. - AttachDisk(isManagedDisk bool, diskName, diskUri string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error + AttachDisk(isManagedDisk bool, diskName, diskUri string, nodeName types.NodeName, cachingMode compute.CachingTypes) error // Detaches the disk, identified by disk name or uri, from the host machine. DetachDisk(diskName, diskUri string, nodeName types.NodeName) error