move disk lock process to azure cloud provider

fix comments

fix import keymux check error

add unit test for attach/detach disk funcs
k3s-v1.15.3
andyzhangx 2019-04-14 03:09:06 +00:00
parent 39c239c308
commit 6c70ca61be
7 changed files with 113 additions and 40 deletions

View File

@ -8,7 +8,8 @@
"k8s.io/utils/io",
"k8s.io/utils/strings",
"k8s.io/utils/exec",
"k8s.io/utils/path"
"k8s.io/utils/path",
"k8s.io/utils/keymutex"
]
},
{

View File

@ -69,6 +69,7 @@ go_library(
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/keymutex:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],
)
@ -78,6 +79,7 @@ go_test(
srcs = [
"azure_backoff_test.go",
"azure_cache_test.go",
"azure_controller_common_test.go",
"azure_instances_test.go",
"azure_loadbalancer_test.go",
"azure_metrics_test.go",

View File

@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"context"
"fmt"
"time"
@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"
kwait "k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/utils/keymutex"
)
const (
@ -50,6 +52,9 @@ var defaultBackOff = kwait.Backoff{
Jitter: 0.0,
}
// acquire lock to attach/detach disk in one node
var diskOpMutex = keymutex.NewHashed(0)
type controllerCommon struct {
subscriptionID string
location string
@ -85,13 +90,29 @@ func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error)
return ss, nil
}
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI.
func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, cachingMode compute.CachingTypes) error {
vmset, err := c.getNodeVMSet(nodeName)
if err != nil {
return err
}
instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
klog.Warningf("failed to get azure instance id (%v)", err)
return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}
diskOpMutex.LockKey(instanceid)
defer diskOpMutex.UnlockKey(instanceid)
lun, err := c.GetNextDiskLun(nodeName)
if err != nil {
klog.Warningf("no LUN available for instance %q (%v)", nodeName, err)
return fmt.Errorf("all LUNs are used, cannot attach volume (%s, %s) to instance %q (%v)", diskName, diskURI, instanceid, err)
}
klog.V(2).Infof("Trying to attach volume %q lun %d to node %q.", diskURI, lun, nodeName)
return vmset.AttachDisk(isManagedDisk, diskName, diskURI, nodeName, lun, cachingMode)
}
@ -102,11 +123,25 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
return err
}
instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
klog.Warningf("failed to get azure instance id (%v)", err)
return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}
klog.V(2).Infof("detach %v from node %q", diskURI, nodeName)
// make the lock here as small as possible
diskOpMutex.LockKey(instanceid)
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
diskOpMutex.UnlockKey(instanceid)
if c.cloud.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("azureDisk - update backing off: detach disk(%s, %s), err: %v", diskName, diskURI, err)
retryErr := kwait.ExponentialBackoff(c.cloud.requestBackoff(), func() (bool, error) {
diskOpMutex.LockKey(instanceid)
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
diskOpMutex.UnlockKey(instanceid)
return c.cloud.processHTTPRetryResponse(nil, "", resp, err)
})
if retryErr != nil {

View File

@ -0,0 +1,66 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"fmt"
"testing"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute"
)
func TestAttachDisk(t *testing.T) {
c := getTestCloud()
common := &controllerCommon{
location: c.Location,
storageEndpointSuffix: c.Environment.StorageEndpointSuffix,
resourceGroup: c.ResourceGroup,
subscriptionID: c.SubscriptionID,
cloud: c,
}
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name", c.SubscriptionID, c.ResourceGroup)
err := common.AttachDisk(true, "", diskURI, "node1", compute.CachingTypesReadOnly)
if err != nil {
fmt.Printf("TestAttachDisk return expected error: %v", err)
} else {
t.Errorf("TestAttachDisk unexpected nil err")
}
}
func TestDetachDisk(t *testing.T) {
c := getTestCloud()
common := &controllerCommon{
location: c.Location,
storageEndpointSuffix: c.Environment.StorageEndpointSuffix,
resourceGroup: c.ResourceGroup,
subscriptionID: c.SubscriptionID,
cloud: c,
}
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name", c.SubscriptionID, c.ResourceGroup)
err := common.DetachDisk("", diskURI, "node1")
if err != nil {
fmt.Printf("TestAttachDisk return expected error: %v", err)
} else {
t.Errorf("TestAttachDisk unexpected nil err")
}
}

View File

@ -40,7 +40,6 @@ go_library(
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-07-01/storage:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/keymutex:go_default_library",
"//vendor/k8s.io/utils/strings:go_default_library",
],
)

View File

@ -17,7 +17,6 @@ limitations under the License.
package azure_dd
import (
"context"
"fmt"
"os"
"path/filepath"
@ -36,7 +35,6 @@ import (
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/utils/keymutex"
)
type azureDiskDetacher struct {
@ -55,9 +53,6 @@ var _ volume.Detacher = &azureDiskDetacher{}
var _ volume.DeviceMounter = &azureDiskAttacher{}
var _ volume.DeviceUnmounter = &azureDiskDetacher{}
// acquire lock to get an lun number
var getLunMutex = keymutex.NewHashed(0)
// Attach attaches a volume.Spec to an Azure VM referenced by NodeName, returning the disk's LUN
func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
@ -66,12 +61,6 @@ func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (
return "", err
}
instanceid, err := a.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
klog.Warningf("failed to get azure instance id (%v)", err)
return "", fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}
diskController, err := getDiskController(a.plugin.host)
if err != nil {
return "", err
@ -82,30 +71,22 @@ func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (
// Log error and continue with attach
klog.Warningf(
"Error checking if volume is already attached to current node (%q). Will continue and try attach anyway. err=%v",
instanceid, err)
nodeName, err)
}
if err == nil {
// Volume is already attached to node.
klog.V(2).Infof("Attach operation is successful. volume %q is already attached to node %q at lun %d.", volumeSource.DiskName, instanceid, lun)
klog.V(2).Infof("Attach operation is successful. volume %q is already attached to node %q at lun %d.", volumeSource.DiskName, nodeName, lun)
} else {
klog.V(2).Infof("GetDiskLun returned: %v. Initiating attaching volume %q to node %q.", err, volumeSource.DataDiskURI, nodeName)
getLunMutex.LockKey(instanceid)
defer getLunMutex.UnlockKey(instanceid)
lun, err = diskController.GetNextDiskLun(nodeName)
if err != nil {
klog.Warningf("no LUN available for instance %q (%v)", nodeName, err)
return "", fmt.Errorf("all LUNs are used, cannot attach volume %q to instance %q (%v)", volumeSource.DiskName, instanceid, err)
}
klog.V(2).Infof("Trying to attach volume %q lun %d to node %q.", volumeSource.DataDiskURI, lun, nodeName)
isManagedDisk := (*volumeSource.Kind == v1.AzureManagedDisk)
err = diskController.AttachDisk(isManagedDisk, volumeSource.DiskName, volumeSource.DataDiskURI, nodeName, lun, compute.CachingTypes(*volumeSource.CachingMode))
err = diskController.AttachDisk(isManagedDisk, volumeSource.DiskName, volumeSource.DataDiskURI, nodeName, compute.CachingTypes(*volumeSource.CachingMode))
if err == nil {
klog.V(2).Infof("Attach operation successful: volume %q attached to node %q.", volumeSource.DataDiskURI, nodeName)
} else {
klog.V(2).Infof("Attach volume %q to instance %q failed with %v", volumeSource.DataDiskURI, instanceid, err)
return "", fmt.Errorf("Attach volume %q to instance %q failed with %v", volumeSource.DiskName, instanceid, err)
klog.V(2).Infof("Attach volume %q to instance %q failed with %v", volumeSource.DataDiskURI, nodeName, err)
return "", fmt.Errorf("Attach volume %q to instance %q failed with %v", volumeSource.DiskName, nodeName, err)
}
}
@ -285,22 +266,11 @@ func (d *azureDiskDetacher) Detach(diskURI string, nodeName types.NodeName) erro
return fmt.Errorf("invalid disk to detach: %q", diskURI)
}
instanceid, err := d.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
klog.Warningf("no instance id for node %q, skip detaching (%v)", nodeName, err)
return nil
}
klog.V(2).Infof("detach %v from node %q", diskURI, nodeName)
diskController, err := getDiskController(d.plugin.host)
if err != nil {
return err
}
getLunMutex.LockKey(instanceid)
defer getLunMutex.UnlockKey(instanceid)
err = diskController.DetachDisk("", diskURI, nodeName)
if err != nil {
klog.Errorf("failed to detach azure disk %q, err %v", diskURI, err)

View File

@ -44,7 +44,7 @@ type DiskController interface {
DeleteManagedDisk(diskURI string) error
// Attaches the disk to the host machine.
AttachDisk(isManagedDisk bool, diskName, diskUri string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error
AttachDisk(isManagedDisk bool, diskName, diskUri string, nodeName types.NodeName, cachingMode compute.CachingTypes) error
// Detaches the disk, identified by disk name or uri, from the host machine.
DetachDisk(diskName, diskUri string, nodeName types.NodeName) error