Merge pull request #76886 from andyzhangx/automated-cherry-pick-of-#76573-upstream-release-1.14

Automated cherry pick of #76573: refactor detach azure disk retry operation
pull/564/head
Kubernetes Prow Robot 2019-04-30 17:27:09 -07:00 committed by GitHub
commit ba432573fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 158 additions and 93 deletions

View File

@ -8,7 +8,8 @@
"k8s.io/utils/io",
"k8s.io/utils/strings",
"k8s.io/utils/exec",
"k8s.io/utils/path"
"k8s.io/utils/path",
"k8s.io/utils/keymutex"
]
},
{

View File

@ -69,6 +69,7 @@ go_library(
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/keymutex:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],
)
@ -78,6 +79,7 @@ go_test(
srcs = [
"azure_backoff_test.go",
"azure_cache_test.go",
"azure_controller_common_test.go",
"azure_instances_test.go",
"azure_loadbalancer_test.go",
"azure_metrics_test.go",

View File

@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"context"
"fmt"
"time"
@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"
kwait "k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/utils/keymutex"
)
const (
@ -50,6 +52,9 @@ var defaultBackOff = kwait.Backoff{
Jitter: 0.0,
}
// acquire lock to attach/detach disk in one node
var diskOpMutex = keymutex.NewHashed(0)
type controllerCommon struct {
subscriptionID string
location string
@ -85,24 +90,72 @@ func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error)
return ss, nil
}
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI.
func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, cachingMode compute.CachingTypes) error {
vmset, err := c.getNodeVMSet(nodeName)
if err != nil {
return err
}
instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
klog.Warningf("failed to get azure instance id (%v)", err)
return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}
diskOpMutex.LockKey(instanceid)
defer diskOpMutex.UnlockKey(instanceid)
lun, err := c.GetNextDiskLun(nodeName)
if err != nil {
klog.Warningf("no LUN available for instance %q (%v)", nodeName, err)
return fmt.Errorf("all LUNs are used, cannot attach volume (%s, %s) to instance %q (%v)", diskName, diskURI, instanceid, err)
}
klog.V(2).Infof("Trying to attach volume %q lun %d to node %q.", diskURI, lun, nodeName)
return vmset.AttachDisk(isManagedDisk, diskName, diskURI, nodeName, lun, cachingMode)
}
// DetachDiskByName detaches a vhd from host. The vhd can be identified by diskName or diskURI.
func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
// DetachDisk detaches a disk from host. The vhd can be identified by diskName or diskURI.
func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
vmset, err := c.getNodeVMSet(nodeName)
if err != nil {
return err
}
return vmset.DetachDiskByName(diskName, diskURI, nodeName)
instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
klog.Warningf("failed to get azure instance id (%v)", err)
return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}
klog.V(2).Infof("detach %v from node %q", diskURI, nodeName)
// make the lock here as small as possible
diskOpMutex.LockKey(instanceid)
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
diskOpMutex.UnlockKey(instanceid)
if c.cloud.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("azureDisk - update backing off: detach disk(%s, %s), err: %v", diskName, diskURI, err)
retryErr := kwait.ExponentialBackoff(c.cloud.requestBackoff(), func() (bool, error) {
diskOpMutex.LockKey(instanceid)
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
diskOpMutex.UnlockKey(instanceid)
return c.cloud.processHTTPRetryResponse(nil, "", resp, err)
})
if retryErr != nil {
err = retryErr
klog.V(2).Infof("azureDisk - update abort backoff: detach disk(%s, %s), err: %v", diskName, diskURI, err)
}
}
if err != nil {
klog.Errorf("azureDisk - detach disk(%s, %s) failed, err: %v", diskName, diskURI, err)
} else {
klog.V(2).Infof("azureDisk - detach disk(%s, %s) succeeded", diskName, diskURI)
}
return err
}
// getNodeDataDisks invokes vmSet interfaces to get data disks for the node.

View File

@ -0,0 +1,66 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"fmt"
"testing"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute"
)
func TestAttachDisk(t *testing.T) {
c := getTestCloud()
common := &controllerCommon{
location: c.Location,
storageEndpointSuffix: c.Environment.StorageEndpointSuffix,
resourceGroup: c.ResourceGroup,
subscriptionID: c.SubscriptionID,
cloud: c,
}
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name", c.SubscriptionID, c.ResourceGroup)
err := common.AttachDisk(true, "", diskURI, "node1", compute.CachingTypesReadOnly)
if err != nil {
fmt.Printf("TestAttachDisk return expected error: %v", err)
} else {
t.Errorf("TestAttachDisk unexpected nil err")
}
}
func TestDetachDisk(t *testing.T) {
c := getTestCloud()
common := &controllerCommon{
location: c.Location,
storageEndpointSuffix: c.Environment.StorageEndpointSuffix,
resourceGroup: c.ResourceGroup,
subscriptionID: c.SubscriptionID,
cloud: c,
}
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name", c.SubscriptionID, c.ResourceGroup)
err := common.DetachDisk("", diskURI, "node1")
if err != nil {
fmt.Printf("TestAttachDisk return expected error: %v", err)
} else {
t.Errorf("TestAttachDisk unexpected nil err")
}
}

View File

@ -18,6 +18,7 @@ package azure
import (
"fmt"
"net/http"
"strings"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute"
@ -90,7 +91,7 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri
if strings.Contains(detail, errLeaseFailed) || strings.Contains(detail, errDiskBlobNotFound) {
// if lease cannot be acquired or disk not found, immediately detach the disk and return the original error
klog.V(2).Infof("azureDisk - err %v, try detach disk(%s, %s)", err, diskName, diskURI)
as.DetachDiskByName(diskName, diskURI, nodeName)
as.DetachDisk(diskName, diskURI, nodeName)
}
} else {
klog.V(2).Infof("azureDisk - attach disk(%s, %s) succeeded", diskName, diskURI)
@ -98,20 +99,20 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri
return err
}
// DetachDiskByName detaches a vhd from host
// DetachDisk detaches a disk from host
// the vhd can be identified by diskName or diskURI
func (as *availabilitySet) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) {
vm, err := as.getVirtualMachine(nodeName)
if err != nil {
// if host doesn't exist, no need to detach
klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI)
return nil
return nil, nil
}
vmName := mapNodeNameToVMName(nodeName)
nodeResourceGroup, err := as.GetNodeResourceGroup(vmName)
if err != nil {
return err
return nil, err
}
disks := make([]compute.DataDisk, len(*vm.StorageProfile.DataDisks))
@ -131,7 +132,7 @@ func (as *availabilitySet) DetachDiskByName(diskName, diskURI string, nodeName t
}
if !bFoundDisk {
return fmt.Errorf("detach azure disk failure, disk %s not found, diskURI: %s", diskName, diskURI)
return nil, fmt.Errorf("detach azure disk failure, disk %s not found, diskURI: %s", diskName, diskURI)
}
newVM := compute.VirtualMachine{
@ -150,22 +151,7 @@ func (as *availabilitySet) DetachDiskByName(diskName, diskURI string, nodeName t
// Invalidate the cache right after updating
defer as.cloud.vmCache.Delete(vmName)
resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, nodeResourceGroup, vmName, newVM)
if as.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s) detach disk(%s, %s), err: %v", nodeResourceGroup, vmName, diskName, diskURI, err)
retryErr := as.CreateOrUpdateVMWithRetry(nodeResourceGroup, vmName, newVM)
if retryErr != nil {
err = retryErr
klog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s) detach disk(%s, %s), err: %v", nodeResourceGroup, vmName, diskName, diskURI, err)
}
}
if err != nil {
klog.Errorf("azureDisk - detach disk(%s, %s) failed, err: %v", diskName, diskURI, err)
} else {
klog.V(2).Infof("azureDisk - detach disk(%s, %s) succeeded", diskName, diskURI)
}
return err
return as.VirtualMachinesClient.CreateOrUpdate(ctx, nodeResourceGroup, vmName, newVM)
}
// GetDataDisks gets a list of data disks attached to the node.

View File

@ -18,6 +18,7 @@ package azure
import (
"fmt"
"net/http"
"strings"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute"
@ -94,7 +95,7 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
if strings.Contains(detail, errLeaseFailed) || strings.Contains(detail, errDiskBlobNotFound) {
// if lease cannot be acquired or disk not found, immediately detach the disk and return the original error
klog.Infof("azureDisk - err %s, try detach disk(%s, %s)", detail, diskName, diskURI)
ss.DetachDiskByName(diskName, diskURI, nodeName)
ss.DetachDisk(diskName, diskURI, nodeName)
}
} else {
klog.V(2).Infof("azureDisk - attach disk(%s, %s) succeeded", diskName, diskURI)
@ -102,18 +103,18 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
return err
}
// DetachDiskByName detaches a vhd from host
// DetachDisk detaches a disk from host
// the vhd can be identified by diskName or diskURI
func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) {
vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName)
if err != nil {
return err
return nil, err
}
nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName)
if err != nil {
return err
return nil, err
}
disks := []compute.DataDisk{}
@ -135,7 +136,7 @@ func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.No
}
if !bFoundDisk {
return fmt.Errorf("detach azure disk failure, disk %s not found, diskURI: %s", diskName, diskURI)
return nil, fmt.Errorf("detach azure disk failure, disk %s not found, diskURI: %s", diskName, diskURI)
}
newVM := compute.VirtualMachineScaleSetVM{
@ -158,22 +159,7 @@ func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.No
defer ss.vmssVMCache.Delete(key)
klog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk(%s, %s)", nodeResourceGroup, nodeName, diskName, diskURI)
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s) detach disk(%s, %s), err: %v", nodeResourceGroup, nodeName, diskName, diskURI, err)
retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM)
if retryErr != nil {
err = retryErr
klog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s) detach disk(%s, %s), err: %v", nodeResourceGroup, nodeName, diskName, diskURI, err)
}
}
if err != nil {
klog.Errorf("azureDisk - detach disk(%s, %s) from %s failed, err: %v", diskName, diskURI, nodeName, err)
} else {
klog.V(2).Infof("azureDisk - detach disk(%s, %s) succeeded", diskName, diskURI)
}
return err
return ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM)
}
// GetDataDisks gets a list of data disks attached to the node.

View File

@ -910,8 +910,8 @@ func (f *fakeVMSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
return fmt.Errorf("unimplemented")
}
func (f *fakeVMSet) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
return fmt.Errorf("unimplemented")
func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) {
return nil, fmt.Errorf("unimplemented")
}
func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) {

View File

@ -17,6 +17,8 @@ limitations under the License.
package azure
import (
"net/http"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network"
@ -60,8 +62,8 @@ type VMSet interface {
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error
// DetachDiskByName detaches a vhd from host. The vhd can be identified by diskName or diskURI.
DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error
// DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI.
DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error)
// GetDataDisks gets a list of data disks attached to the node.
GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error)

View File

@ -40,7 +40,6 @@ go_library(
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-07-01/storage:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/keymutex:go_default_library",
"//vendor/k8s.io/utils/strings:go_default_library",
],
)

View File

@ -17,7 +17,6 @@ limitations under the License.
package azure_dd
import (
"context"
"fmt"
"os"
"path/filepath"
@ -36,7 +35,6 @@ import (
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/utils/keymutex"
)
type azureDiskDetacher struct {
@ -55,9 +53,6 @@ var _ volume.Detacher = &azureDiskDetacher{}
var _ volume.DeviceMounter = &azureDiskAttacher{}
var _ volume.DeviceUnmounter = &azureDiskDetacher{}
// acquire lock to get an lun number
var getLunMutex = keymutex.NewHashed(0)
// Attach attaches a volume.Spec to an Azure VM referenced by NodeName, returning the disk's LUN
func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
@ -66,12 +61,6 @@ func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (
return "", err
}
instanceid, err := a.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
klog.Warningf("failed to get azure instance id (%v)", err)
return "", fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}
diskController, err := getDiskController(a.plugin.host)
if err != nil {
return "", err
@ -82,30 +71,22 @@ func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (
// Log error and continue with attach
klog.Warningf(
"Error checking if volume is already attached to current node (%q). Will continue and try attach anyway. err=%v",
instanceid, err)
nodeName, err)
}
if err == nil {
// Volume is already attached to node.
klog.V(2).Infof("Attach operation is successful. volume %q is already attached to node %q at lun %d.", volumeSource.DiskName, instanceid, lun)
klog.V(2).Infof("Attach operation is successful. volume %q is already attached to node %q at lun %d.", volumeSource.DiskName, nodeName, lun)
} else {
klog.V(2).Infof("GetDiskLun returned: %v. Initiating attaching volume %q to node %q.", err, volumeSource.DataDiskURI, nodeName)
getLunMutex.LockKey(instanceid)
defer getLunMutex.UnlockKey(instanceid)
lun, err = diskController.GetNextDiskLun(nodeName)
if err != nil {
klog.Warningf("no LUN available for instance %q (%v)", nodeName, err)
return "", fmt.Errorf("all LUNs are used, cannot attach volume %q to instance %q (%v)", volumeSource.DiskName, instanceid, err)
}
klog.V(2).Infof("Trying to attach volume %q lun %d to node %q.", volumeSource.DataDiskURI, lun, nodeName)
isManagedDisk := (*volumeSource.Kind == v1.AzureManagedDisk)
err = diskController.AttachDisk(isManagedDisk, volumeSource.DiskName, volumeSource.DataDiskURI, nodeName, lun, compute.CachingTypes(*volumeSource.CachingMode))
err = diskController.AttachDisk(isManagedDisk, volumeSource.DiskName, volumeSource.DataDiskURI, nodeName, compute.CachingTypes(*volumeSource.CachingMode))
if err == nil {
klog.V(2).Infof("Attach operation successful: volume %q attached to node %q.", volumeSource.DataDiskURI, nodeName)
} else {
klog.V(2).Infof("Attach volume %q to instance %q failed with %v", volumeSource.DataDiskURI, instanceid, err)
return "", fmt.Errorf("Attach volume %q to instance %q failed with %v", volumeSource.DiskName, instanceid, err)
klog.V(2).Infof("Attach volume %q to instance %q failed with %v", volumeSource.DataDiskURI, nodeName, err)
return "", fmt.Errorf("Attach volume %q to instance %q failed with %v", volumeSource.DiskName, nodeName, err)
}
}
@ -285,23 +266,12 @@ func (d *azureDiskDetacher) Detach(diskURI string, nodeName types.NodeName) erro
return fmt.Errorf("invalid disk to detach: %q", diskURI)
}
instanceid, err := d.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
klog.Warningf("no instance id for node %q, skip detaching (%v)", nodeName, err)
return nil
}
klog.V(2).Infof("detach %v from node %q", diskURI, nodeName)
diskController, err := getDiskController(d.plugin.host)
if err != nil {
return err
}
getLunMutex.LockKey(instanceid)
defer getLunMutex.UnlockKey(instanceid)
err = diskController.DetachDiskByName("", diskURI, nodeName)
err = diskController.DetachDisk("", diskURI, nodeName)
if err != nil {
klog.Errorf("failed to detach azure disk %q, err %v", diskURI, err)
}

View File

@ -44,9 +44,9 @@ type DiskController interface {
DeleteManagedDisk(diskURI string) error
// Attaches the disk to the host machine.
AttachDisk(isManagedDisk bool, diskName, diskUri string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error
AttachDisk(isManagedDisk bool, diskName, diskUri string, nodeName types.NodeName, cachingMode compute.CachingTypes) error
// Detaches the disk, identified by disk name or uri, from the host machine.
DetachDiskByName(diskName, diskUri string, nodeName types.NodeName) error
DetachDisk(diskName, diskUri string, nodeName types.NodeName) error
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error)