Bulk Verify Volumes Implementation for vSphere

pull/6/head
Balu Dontu 2017-09-18 12:01:48 -07:00
parent a63e3deec3
commit 2ea619902d
11 changed files with 351 additions and 122 deletions

View File

@ -19,6 +19,7 @@ package vclib
import (
"errors"
"fmt"
"path/filepath"
"strings"
"github.com/golang/glog"
@ -142,6 +143,23 @@ func (dc *Datacenter) GetVMMoList(ctx context.Context, vmObjList []*VirtualMachi
return vmMoList, nil
}
// GetVirtualDiskPage83Data gets the virtual disk UUID by diskPath
func (dc *Datacenter) GetVirtualDiskPage83Data(ctx context.Context, diskPath string) (string, error) {
if len(diskPath) > 0 && filepath.Ext(diskPath) != ".vmdk" {
diskPath += ".vmdk"
}
vdm := object.NewVirtualDiskManager(dc.Client())
// Returns uuid of vmdk virtual disk
diskUUID, err := vdm.QueryVirtualDiskUuid(ctx, diskPath, dc.Datacenter)
if err != nil {
glog.Warningf("QueryVirtualDiskUuid failed for diskPath: %q. err: %+v", diskPath, err)
return "", err
}
diskUUID = formatVirtualDiskUUID(diskUUID)
return diskUUID, nil
}
// GetDatastoreMoList gets the Datastore Managed Objects with the given properties from the datastore objects
func (dc *Datacenter) GetDatastoreMoList(ctx context.Context, dsObjList []*Datastore, properties []string) ([]mo.Datastore, error) {
var dsMoList []mo.Datastore
@ -162,3 +180,78 @@ func (dc *Datacenter) GetDatastoreMoList(ctx context.Context, dsObjList []*Datas
}
return dsMoList, nil
}
// CheckDisksAttached checks if the disk is attached to node.
// This is done by comparing the volume path with the backing.FilePath on the VM Virtual disk devices.
func (dc *Datacenter) CheckDisksAttached(ctx context.Context, nodeVolumes map[string][]string) (map[string]map[string]bool, error) {
attached := make(map[string]map[string]bool)
var vmList []*VirtualMachine
for nodeName, volPaths := range nodeVolumes {
for _, volPath := range volPaths {
setNodeVolumeMap(attached, volPath, nodeName, false)
}
vm, err := dc.GetVMByPath(ctx, nodeName)
if err != nil {
if IsNotFound(err) {
glog.Warningf("Node %q does not exist, vSphere CP will assume disks %v are not attached to it.", nodeName, volPaths)
}
continue
}
vmList = append(vmList, vm)
}
if len(vmList) == 0 {
glog.V(2).Infof("vSphere CP will assume no disks are attached to any node.")
return attached, nil
}
vmMoList, err := dc.GetVMMoList(ctx, vmList, []string{"config.hardware.device", "name"})
if err != nil {
// When there is an error fetching instance information
// it is safer to return nil and let volume information not be touched.
glog.Errorf("Failed to get VM Managed object for nodes: %+v. err: +%v", vmList, err)
return nil, err
}
for _, vmMo := range vmMoList {
if vmMo.Config == nil {
glog.Errorf("Config is not available for VM: %q", vmMo.Name)
continue
}
for nodeName, volPaths := range nodeVolumes {
if nodeName == vmMo.Name {
verifyVolumePathsForVM(vmMo, volPaths, attached)
}
}
}
return attached, nil
}
// VerifyVolumePathsForVM verifies if the volume paths (volPaths) are attached to VM.
func verifyVolumePathsForVM(vmMo mo.VirtualMachine, volPaths []string, nodeVolumeMap map[string]map[string]bool) {
// Verify if the volume paths are present on the VM backing virtual disk devices
for _, volPath := range volPaths {
vmDevices := object.VirtualDeviceList(vmMo.Config.Hardware.Device)
for _, device := range vmDevices {
if vmDevices.TypeName(device) == "VirtualDisk" {
virtualDevice := device.GetVirtualDevice()
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
if backing.FileName == volPath {
setNodeVolumeMap(nodeVolumeMap, volPath, vmMo.Name, true)
}
}
}
}
}
}
func setNodeVolumeMap(
nodeVolumeMap map[string]map[string]bool,
volumePath string,
nodeName string,
check bool) {
volumeMap := nodeVolumeMap[nodeName]
if volumeMap == nil {
volumeMap = make(map[string]bool)
nodeVolumeMap[nodeName] = volumeMap
}
volumeMap[volumePath] = check
}

View File

@ -35,7 +35,7 @@ type virtualDiskManager struct {
// Create implements Disk's Create interface
// Contains implementation of virtualDiskManager based Provisioning
func (diskManager virtualDiskManager) Create(ctx context.Context, datastore *vclib.Datastore) (err error) {
func (diskManager virtualDiskManager) Create(ctx context.Context, datastore *vclib.Datastore) (canonicalDiskPath string, err error) {
if diskManager.volumeOptions.SCSIControllerType == "" {
diskManager.volumeOptions.SCSIControllerType = vclib.LSILogicControllerType
}
@ -57,15 +57,16 @@ func (diskManager virtualDiskManager) Create(ctx context.Context, datastore *vcl
if err != nil {
vclib.RecordvSphereMetric(vclib.APICreateVolume, requestTime, err)
glog.Errorf("Failed to create virtual disk: %s. err: %+v", diskManager.diskPath, err)
return err
return "", err
}
err = task.Wait(ctx)
taskInfo, err := task.WaitForResult(ctx, nil)
vclib.RecordvSphereMetric(vclib.APICreateVolume, requestTime, err)
if err != nil {
glog.Errorf("Failed to create virtual disk: %s. err: %+v", diskManager.diskPath, err)
return err
glog.Errorf("Failed to complete virtual disk creation: %s. err: %+v", diskManager.diskPath, err)
return "", err
}
return nil
canonicalDiskPath = taskInfo.Result.(string)
return canonicalDiskPath, nil
}
// Delete implements Disk's Delete interface

View File

@ -39,7 +39,7 @@ const (
// VirtualDiskProvider defines interfaces for creating disk
type VirtualDiskProvider interface {
Create(ctx context.Context, datastore *vclib.Datastore) error
Create(ctx context.Context, datastore *vclib.Datastore) (string, error)
Delete(ctx context.Context, datastore *vclib.Datastore) error
}
@ -60,16 +60,16 @@ func getDiskManager(disk *VirtualDisk, diskOperation string) VirtualDiskProvider
}
// Create gets appropriate disk manager and calls respective create method
func (virtualDisk *VirtualDisk) Create(ctx context.Context, datastore *vclib.Datastore) error {
func (virtualDisk *VirtualDisk) Create(ctx context.Context, datastore *vclib.Datastore) (string, error) {
if virtualDisk.VolumeOptions.DiskFormat == "" {
virtualDisk.VolumeOptions.DiskFormat = vclib.ThinDiskType
}
if !virtualDisk.VolumeOptions.VerifyVolumeOptions() {
glog.Error("VolumeOptions verification failed. volumeOptions: ", virtualDisk.VolumeOptions)
return vclib.ErrInvalidVolumeOptions
return "", vclib.ErrInvalidVolumeOptions
}
if virtualDisk.VolumeOptions.StoragePolicyID != "" && virtualDisk.VolumeOptions.StoragePolicyName != "" {
return fmt.Errorf("Storage Policy ID and Storage Policy Name both set, Please set only one parameter")
return "", fmt.Errorf("Storage Policy ID and Storage Policy Name both set, Please set only one parameter")
}
return getDiskManager(virtualDisk, VirtualDiskCreateOperation).Create(ctx, datastore)
}

View File

@ -37,33 +37,33 @@ type vmDiskManager struct {
// Create implements Disk's Create interface
// Contains implementation of VM based Provisioning to provision disk with SPBM Policy or VSANStorageProfileData
func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datastore) (err error) {
func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datastore) (canonicalDiskPath string, err error) {
if vmdisk.volumeOptions.SCSIControllerType == "" {
vmdisk.volumeOptions.SCSIControllerType = vclib.PVSCSIControllerType
}
pbmClient, err := vclib.NewPbmClient(ctx, datastore.Client())
if err != nil {
glog.Errorf("Error occurred while creating new pbmClient, err: %+v", err)
return err
return "", err
}
if vmdisk.volumeOptions.StoragePolicyID == "" && vmdisk.volumeOptions.StoragePolicyName != "" {
vmdisk.volumeOptions.StoragePolicyID, err = pbmClient.ProfileIDByName(ctx, vmdisk.volumeOptions.StoragePolicyName)
if err != nil {
glog.Errorf("Error occurred while getting Profile Id from Profile Name: %s, err: %+v", vmdisk.volumeOptions.StoragePolicyName, err)
return err
return "", err
}
}
if vmdisk.volumeOptions.StoragePolicyID != "" {
compatible, faultMessage, err := datastore.IsCompatibleWithStoragePolicy(ctx, vmdisk.volumeOptions.StoragePolicyID)
if err != nil {
glog.Errorf("Error occurred while checking datastore compatibility with storage policy id: %s, err: %+v", vmdisk.volumeOptions.StoragePolicyID, err)
return err
return "", err
}
if !compatible {
glog.Errorf("Datastore: %s is not compatible with Policy: %s", datastore.Name(), vmdisk.volumeOptions.StoragePolicyName)
return fmt.Errorf("User specified datastore is not compatible with the storagePolicy: %q. Failed with faults: %+q", vmdisk.volumeOptions.StoragePolicyName, faultMessage)
return "", fmt.Errorf("User specified datastore is not compatible with the storagePolicy: %q. Failed with faults: %+q", vmdisk.volumeOptions.StoragePolicyName, faultMessage)
}
}
@ -76,11 +76,11 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
// Check Datastore type - VSANStorageProfileData is only applicable to vSAN Datastore
dsType, err := datastore.GetType(ctx)
if err != nil {
return err
return "", err
}
if dsType != vclib.VSANDatastoreType {
glog.Errorf("The specified datastore: %q is not a VSAN datastore", datastore.Name())
return fmt.Errorf("The specified datastore: %q is not a VSAN datastore."+
return "", fmt.Errorf("The specified datastore: %q is not a VSAN datastore."+
" The policy parameters will work only with VSAN Datastore."+
" So, please specify a valid VSAN datastore in Storage class definition.", datastore.Name())
}
@ -91,7 +91,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
}
} else {
glog.Errorf("Both volumeOptions.StoragePolicyID and volumeOptions.VSANStorageProfileData are not set. One of them should be set")
return fmt.Errorf("Both volumeOptions.StoragePolicyID and volumeOptions.VSANStorageProfileData are not set. One of them should be set")
return "", fmt.Errorf("Both volumeOptions.StoragePolicyID and volumeOptions.VSANStorageProfileData are not set. One of them should be set")
}
var dummyVM *vclib.VirtualMachine
// Check if VM already exist in the folder.
@ -106,7 +106,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
dummyVM, err = vmdisk.createDummyVM(ctx, datastore.Datacenter, dummyVMFullName)
if err != nil {
glog.Errorf("Failed to create Dummy VM. err: %v", err)
return err
return "", err
}
}
@ -115,7 +115,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
disk, _, err := dummyVM.CreateDiskSpec(ctx, vmdisk.diskPath, datastore, vmdisk.volumeOptions)
if err != nil {
glog.Errorf("Failed to create Disk Spec. err: %v", err)
return err
return "", err
}
deviceConfigSpec := &types.VirtualDeviceConfigSpec{
Device: disk,
@ -135,7 +135,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
glog.V(vclib.LogLevel).Info("File: %v already exists", vmdisk.diskPath)
} else {
glog.Errorf("Failed to attach the disk to VM: %q with err: %+v", dummyVMFullName, err)
return err
return "", err
}
}
// Detach the disk from the dummy VM.
@ -146,7 +146,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
glog.V(vclib.LogLevel).Info("File: %v is already detached", vmdisk.diskPath)
} else {
glog.Errorf("Failed to detach the disk: %q from VM: %q with err: %+v", vmdisk.diskPath, dummyVMFullName, err)
return err
return "", err
}
}
// Delete the dummy VM
@ -154,7 +154,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
if err != nil {
glog.Errorf("Failed to destroy the vm: %q with err: %+v", dummyVMFullName, err)
}
return nil
return vmdisk.diskPath, nil
}
func (vmdisk vmDiskManager) Delete(ctx context.Context, datastore *vclib.Datastore) error {

View File

@ -22,6 +22,7 @@ import (
"regexp"
"strings"
"github.com/golang/glog"
"github.com/vmware/govmomi/find"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25/types"
@ -130,3 +131,44 @@ func RemoveClusterFromVDiskPath(vDiskPath string) string {
}
return vDiskPath
}
// GetPathFromVMDiskPath retrieves the path from VM Disk Path.
// Example: For vmDiskPath - [vsanDatastore] kubevols/volume.vmdk, the path is kubevols/volume.vmdk
func GetPathFromVMDiskPath(vmDiskPath string) string {
datastorePathObj := new(object.DatastorePath)
isSuccess := datastorePathObj.FromString(vmDiskPath)
if !isSuccess {
glog.Errorf("Failed to parse vmDiskPath: %s", vmDiskPath)
return ""
}
return datastorePathObj.Path
}
// GetDatastoreFromVMDiskPath retrieves the path from VM Disk Path.
// Example: For vmDiskPath - [vsanDatastore] kubevols/volume.vmdk, the path is vsanDatastore
func GetDatastoreFromVMDiskPath(vmDiskPath string) string {
datastorePathObj := new(object.DatastorePath)
isSuccess := datastorePathObj.FromString(vmDiskPath)
if !isSuccess {
glog.Errorf("Failed to parse vmDiskPath: %s", vmDiskPath)
return ""
}
return datastorePathObj.Datastore
}
//GetDatastorePathObjFromVMDiskPath gets the datastorePathObj from VM disk path.
func GetDatastorePathObjFromVMDiskPath(vmDiskPath string) (*object.DatastorePath, error) {
datastorePathObj := new(object.DatastorePath)
isSuccess := datastorePathObj.FromString(vmDiskPath)
if !isSuccess {
glog.Errorf("Failed to parse volPath: %s", vmDiskPath)
return nil, fmt.Errorf("Failed to parse volPath: %s", vmDiskPath)
}
return datastorePathObj, nil
}
//IsValidUUID checks if the string is a valid UUID.
func IsValidUUID(uuid string) bool {
r := regexp.MustCompile("^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$")
return r.MatchString(uuid)
}

View File

@ -19,7 +19,6 @@ package vclib
import (
"context"
"fmt"
"path/filepath"
"time"
"github.com/golang/glog"
@ -46,23 +45,6 @@ func (vm *VirtualMachine) IsDiskAttached(ctx context.Context, diskPath string) (
return false, nil
}
// GetVirtualDiskPage83Data gets the virtual disk UUID by diskPath
func (vm *VirtualMachine) GetVirtualDiskPage83Data(ctx context.Context, diskPath string) (string, error) {
if len(diskPath) > 0 && filepath.Ext(diskPath) != ".vmdk" {
diskPath += ".vmdk"
}
vdm := object.NewVirtualDiskManager(vm.Client())
// Returns uuid of vmdk virtual disk
diskUUID, err := vdm.QueryVirtualDiskUuid(ctx, diskPath, vm.Datacenter.Datacenter)
if err != nil {
glog.Errorf("QueryVirtualDiskUuid failed for diskPath: %q on VM: %q. err: %+v", diskPath, vm.InventoryPath, err)
return "", ErrNoDiskUUIDFound
}
diskUUID = formatVirtualDiskUUID(diskUUID)
return diskUUID, nil
}
// DeleteVM deletes the VM.
func (vm *VirtualMachine) DeleteVM(ctx context.Context) error {
destroyTask, err := vm.Destroy(ctx)
@ -89,7 +71,7 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol
}
// If disk is already attached, return the disk UUID
if attached {
diskUUID, _ := vm.GetVirtualDiskPage83Data(ctx, vmDiskPath)
diskUUID, _ := vm.Datacenter.GetVirtualDiskPage83Data(ctx, vmDiskPath)
return diskUUID, nil
}
@ -143,7 +125,7 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol
}
// Once disk is attached, get the disk UUID.
diskUUID, err := vm.GetVirtualDiskPage83Data(ctx, vmDiskPath)
diskUUID, err := vm.Datacenter.GetVirtualDiskPage83Data(ctx, vmDiskPath)
if err != nil {
glog.Errorf("Error occurred while getting Disk Info from VM: %q. err: %v", vm.InventoryPath, err)
vm.DetachDisk(ctx, vmDiskPath)
@ -285,6 +267,25 @@ func (vm *VirtualMachine) CreateDiskSpec(ctx context.Context, diskPath string, d
return disk, newSCSIController, nil
}
// GetVirtualDiskPath gets the first available virtual disk devicePath from the VM
func (vm *VirtualMachine) GetVirtualDiskPath(ctx context.Context) (string, error) {
vmDevices, err := vm.Device(ctx)
if err != nil {
glog.Errorf("Failed to get the devices for VM: %q. err: %+v", vm.InventoryPath, err)
return "", err
}
// filter vm devices to retrieve device for the given vmdk file identified by disk path
for _, device := range vmDevices {
if vmDevices.TypeName(device) == "VirtualDisk" {
virtualDevice := device.GetVirtualDevice()
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
return backing.FileName, nil
}
}
}
return "", nil
}
// createAndAttachSCSIController creates and attachs the SCSI controller to the VM.
func (vm *VirtualMachine) createAndAttachSCSIController(ctx context.Context, diskControllerType string) (types.BaseVirtualDevice, error) {
// Get VM device list
@ -322,24 +323,17 @@ func (vm *VirtualMachine) createAndAttachSCSIController(ctx context.Context, dis
// getVirtualDeviceByPath gets the virtual device by path
func (vm *VirtualMachine) getVirtualDeviceByPath(ctx context.Context, diskPath string) (types.BaseVirtualDevice, error) {
var diskUUID string
vmDevices, err := vm.Device(ctx)
if err != nil {
glog.Errorf("Failed to get the devices for VM: %q. err: %+v", vm.InventoryPath, err)
return nil, err
}
volumeUUID, err := vm.GetVirtualDiskPage83Data(ctx, diskPath)
if err != nil {
glog.Errorf("Failed to get disk UUID for path: %q on VM: %q. err: %+v", diskPath, vm.InventoryPath, err)
return nil, err
}
// filter vm devices to retrieve device for the given vmdk file identified by disk path
for _, device := range vmDevices {
if vmDevices.TypeName(device) == "VirtualDisk" {
virtualDevice := device.GetVirtualDevice()
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
diskUUID = formatVirtualDiskUUID(backing.Uuid)
if diskUUID == volumeUUID {
if backing.FileName == diskPath {
return device, nil
}
}

View File

@ -56,6 +56,7 @@ const (
)
var cleanUpRoutineInitialized = false
var datastoreFolderIDMap = make(map[string]map[string]string)
var clientLock sync.Mutex
var cleanUpRoutineInitLock sync.Mutex
@ -127,7 +128,7 @@ type Volumes interface {
// DisksAreAttached checks if a list disks are attached to the given node.
// Assumption: If node doesn't exist, disks are not attached to the node.
DisksAreAttached(volPath []string, nodeName k8stypes.NodeName) (map[string]bool, error)
DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error)
// CreateVolume creates a new vmdk with specified parameters.
CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath string, err error)
@ -570,19 +571,12 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b
}
// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin.
func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeName) (map[string]bool, error) {
disksAreAttachedInternal := func(volPaths []string, nodeName k8stypes.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
if len(volPaths) == 0 {
func (vs *VSphere) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) {
disksAreAttachedInternal := func(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) {
attached := make(map[k8stypes.NodeName]map[string]bool)
if len(nodeVolumes) == 0 {
return attached, nil
}
var vSphereInstance string
if nodeName == "" {
vSphereInstance = vs.localInstanceID
nodeName = vmNameToNodeName(vSphereInstance)
} else {
vSphereInstance = nodeNameToVMName(nodeName)
}
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -591,41 +585,40 @@ func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeNam
if err != nil {
return nil, err
}
vm, err := vs.getVMByName(ctx, nodeName)
dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter)
if err != nil {
if vclib.IsNotFound(err) {
glog.Warningf("Node %q does not exist, vsphere CP will assume all disks %v are not attached to it.", nodeName, volPaths)
// make all the disks as detached and return false without error.
attached := make(map[string]bool)
for _, volPath := range volPaths {
attached[volPath] = false
}
return attached, nil
}
glog.Errorf("Failed to get VM object for node: %q. err: +%v", vSphereInstance, err)
return nil, err
}
for _, volPath := range volPaths {
result, err := vm.IsDiskAttached(ctx, volPath)
if err == nil {
if result {
attached[volPath] = true
} else {
attached[volPath] = false
vmVolumes := make(map[string][]string)
for nodeName, volPaths := range nodeVolumes {
for i, volPath := range volPaths {
// Get the canonical volume path for volPath.
canonicalVolumePath, err := getcanonicalVolumePath(ctx, dc, volPath)
if err != nil {
glog.Errorf("Failed to get canonical vsphere volume path for volume: %s. err: %+v", volPath, err)
return nil, err
}
} else {
glog.Errorf("DisksAreAttached failed to determine whether disk %q from volPaths %+v is still attached on node %q",
volPath,
volPaths,
vSphereInstance)
return nil, err
// Check if the volume path contains .vmdk extension. If not, add the extension and update the nodeVolumes Map
if len(canonicalVolumePath) > 0 && filepath.Ext(canonicalVolumePath) != ".vmdk" {
canonicalVolumePath += ".vmdk"
}
volPaths[i] = canonicalVolumePath
}
vmVolumes[nodeNameToVMName(nodeName)] = volPaths
}
// Check if the disks are attached to their respective nodes
disksAttachedList, err := dc.CheckDisksAttached(ctx, vmVolumes)
if err != nil {
return nil, err
}
for vmName, volPaths := range disksAttachedList {
attached[vmNameToNodeName(vmName)] = volPaths
}
return attached, nil
}
requestTime := time.Now()
attached, err := disksAreAttachedInternal(volPaths, nodeName)
attached, err := disksAreAttachedInternal(nodeVolumes)
vclib.RecordvSphereMetric(vclib.OperationDisksAreAttached, requestTime, err)
return attached, err
}
@ -634,9 +627,9 @@ func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeNam
// If the volumeOptions.Datastore is part of datastore cluster for example - [DatastoreCluster/sharedVmfs-0] then
// return value will be [DatastoreCluster/sharedVmfs-0] kubevols/<volume-name>.vmdk
// else return value will be [sharedVmfs-0] kubevols/<volume-name>.vmdk
func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath string, err error) {
func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
glog.V(1).Infof("Starting to create a vSphere volume with volumeOptions: %+v", volumeOptions)
createVolumeInternal := func(volumeOptions *vclib.VolumeOptions) (volumePath string, err error) {
createVolumeInternal := func(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
var datastore string
// Default datastore is the datastore in the vSphere config file that is used to initialize vSphere cloud provider.
if volumeOptions.Datastore == "" {
@ -644,6 +637,7 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath
} else {
datastore = volumeOptions.Datastore
}
datastore = strings.TrimSpace(datastore)
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -694,27 +688,34 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath
glog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err)
return "", err
}
volumePath = kubeVolsPath + volumeOptions.Name + ".vmdk"
volumePath := kubeVolsPath + volumeOptions.Name + ".vmdk"
disk := diskmanagers.VirtualDisk{
DiskPath: volumePath,
VolumeOptions: volumeOptions,
VMOptions: vmOptions,
}
err = disk.Create(ctx, ds)
volumePath, err = disk.Create(ctx, ds)
if err != nil {
glog.Errorf("Failed to create a vsphere volume with volumeOptions: %+v on datastore: %s. err: %+v", volumeOptions, datastore, err)
return "", err
}
// Get the canonical path for the volume path.
canonicalVolumePath, err = getcanonicalVolumePath(ctx, dc, volumePath)
if err != nil {
glog.Errorf("Failed to get canonical vsphere volume path for volume: %s with volumeOptions: %+v on datastore: %s. err: %+v", volumePath, volumeOptions, datastore, err)
return "", err
}
if filepath.Base(datastore) != datastore {
// If datastore is within cluster, add cluster path to the volumePath
volumePath = strings.Replace(volumePath, filepath.Base(datastore), datastore, 1)
canonicalVolumePath = strings.Replace(canonicalVolumePath, filepath.Base(datastore), datastore, 1)
}
return volumePath, nil
return canonicalVolumePath, nil
}
requestTime := time.Now()
volumePath, err = createVolumeInternal(volumeOptions)
canonicalVolumePath, err = createVolumeInternal(volumeOptions)
vclib.RecordCreateVolumeMetric(volumeOptions, requestTime, err)
return volumePath, err
glog.V(1).Infof("The canonical volume path for the newly created vSphere volume is %q", canonicalVolumePath)
return canonicalVolumePath, err
}
// DeleteVolume deletes a volume given volume name.

View File

@ -21,6 +21,7 @@ import (
"errors"
"io/ioutil"
"os"
"regexp"
"runtime"
"strings"
"time"
@ -42,6 +43,7 @@ const (
DatastoreInfoProperty = "info"
Folder = "Folder"
VirtualMachine = "VirtualMachine"
DummyDiskName = "kube-dummyDisk.vmdk"
)
// GetVSphere reads vSphere configuration from system environment and construct vSphere object
@ -293,3 +295,61 @@ func (vs *VSphere) cleanUpDummyVMs(dummyVMPrefix string) {
}
}
}
// Get canonical volume path for volume Path.
// Example1: The canonical path for volume path - [vsanDatastore] kubevols/volume.vmdk will be [vsanDatastore] 25d8b159-948c-4b73-e499-02001ad1b044/volume.vmdk
// Example2: The canonical path for volume path - [vsanDatastore] 25d8b159-948c-4b73-e499-02001ad1b044/volume.vmdk will be same as volume Path.
func getcanonicalVolumePath(ctx context.Context, dc *vclib.Datacenter, volumePath string) (string, error) {
var folderID string
var folderExists bool
canonicalVolumePath := volumePath
dsPathObj, err := vclib.GetDatastorePathObjFromVMDiskPath(volumePath)
if err != nil {
return "", err
}
dsPath := strings.Split(strings.TrimSpace(dsPathObj.Path), "/")
if len(dsPath) <= 1 {
return canonicalVolumePath, nil
}
datastore := dsPathObj.Datastore
dsFolder := dsPath[0]
folderNameIDMap, datastoreExists := datastoreFolderIDMap[datastore]
if datastoreExists {
folderID, folderExists = folderNameIDMap[dsFolder]
}
// Get the datastore folder ID if datastore or folder doesn't exist in datastoreFolderIDMap
if !datastoreExists || !folderExists {
if !vclib.IsValidUUID(dsFolder) {
dummyDiskVolPath := "[" + datastore + "] " + dsFolder + "/" + DummyDiskName
// Querying a non-existent dummy disk on the datastore folder.
// It would fail and return an folder ID in the error message.
_, err := dc.GetVirtualDiskPage83Data(ctx, dummyDiskVolPath)
if err != nil {
re := regexp.MustCompile("File (.*?) was not found")
match := re.FindStringSubmatch(err.Error())
canonicalVolumePath = match[1]
}
}
diskPath := vclib.GetPathFromVMDiskPath(canonicalVolumePath)
if diskPath == "" {
return "", fmt.Errorf("Failed to parse canonicalVolumePath: %s in getcanonicalVolumePath method", canonicalVolumePath)
}
folderID = strings.Split(strings.TrimSpace(diskPath), "/")[0]
setdatastoreFolderIDMap(datastoreFolderIDMap, datastore, dsFolder, folderID)
}
canonicalVolumePath = strings.Replace(volumePath, dsFolder, folderID, 1)
return canonicalVolumePath, nil
}
func setdatastoreFolderIDMap(
datastoreFolderIDMap map[string]map[string]string,
datastore string,
folderName string,
folderID string) {
folderNameIDMap := datastoreFolderIDMap[datastore]
if folderNameIDMap == nil {
folderNameIDMap = make(map[string]string)
datastoreFolderIDMap[datastore] = folderNameIDMap
}
folderNameIDMap[folderName] = folderID
}

View File

@ -86,34 +86,57 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.No
}
func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumePathList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumePathList = append(volumePathList, volumeSource.VolumePath)
volumeSpecMap[volumeSource.VolumePath] = spec
glog.Warningf("Attacher.VolumesAreAttached called for node %q - Please use BulkVerifyVolumes for vSphere", nodeName)
volumeNodeMap := map[types.NodeName][]*volume.Spec{
nodeName: specs,
}
attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathList, nodeName)
nodeVolumesResult := make(map[*volume.Spec]bool)
nodesVerificationMap, err := attacher.BulkVerifyVolumes(volumeNodeMap)
if err != nil {
glog.Errorf(
"Error checking if volumes (%v) are attached to current node (%q). err=%v",
volumePathList, nodeName, err)
return nil, err
glog.Errorf("Attacher.VolumesAreAttached - error checking volumes for node %q with %v", nodeName, err)
return nodeVolumesResult, err
}
if result, ok := nodesVerificationMap[nodeName]; ok {
return result, nil
}
return nodeVolumesResult, nil
}
func (attacher *vsphereVMDKAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool)
volumePathsByNode := make(map[types.NodeName][]string)
volumeSpecMap := make(map[string]*volume.Spec)
for nodeName, volumeSpecs := range volumesByNode {
for _, volumeSpec := range volumeSpecs {
volumeSource, _, err := getVolumeSource(volumeSpec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", volumeSpec.Name(), err)
continue
}
volPath := volumeSource.VolumePath
volumePathsByNode[nodeName] = append(volumePathsByNode[nodeName], volPath)
nodeVolume, nodeVolumeExists := volumesAttachedCheck[nodeName]
if !nodeVolumeExists {
nodeVolume = make(map[*volume.Spec]bool)
}
nodeVolume[volumeSpec] = true
volumeSpecMap[volPath] = volumeSpec
volumesAttachedCheck[nodeName] = nodeVolume
}
}
attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathsByNode)
if err != nil {
glog.Errorf("Error checking if volumes are attached to nodes: %+v. err: %v", volumePathsByNode, err)
return volumesAttachedCheck, err
}
for volumePath, attached := range attachedResult {
spec := volumeSpecMap[volumePath]
if !attached {
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: volume %q (specName: %q) is no longer attached", volumePath, spec.Name())
} else {
volumesAttachedCheck[spec] = true
glog.V(2).Infof("VolumesAreAttached: volume %q (specName: %q) is attached", volumePath, spec.Name())
for nodeName, nodeVolumes := range attachedResult {
for volumePath, attached := range nodeVolumes {
if !attached {
spec := volumeSpecMap[volumePath]
setNodeVolume(volumesAttachedCheck, spec, nodeName, false)
}
}
}
return volumesAttachedCheck, nil
@ -257,3 +280,17 @@ func (detacher *vsphereVMDKDetacher) Detach(deviceMountPath string, nodeName typ
func (detacher *vsphereVMDKDetacher) UnmountDevice(deviceMountPath string) error {
return volumeutil.UnmountPath(deviceMountPath, detacher.mounter)
}
func setNodeVolume(
nodeVolumeMap map[types.NodeName]map[*volume.Spec]bool,
volumeSpec *volume.Spec,
nodeName types.NodeName,
check bool) {
volumeMap := nodeVolumeMap[nodeName]
if volumeMap == nil {
volumeMap = make(map[*volume.Spec]bool)
nodeVolumeMap[nodeName] = volumeMap
}
volumeMap[volumeSpec] = check
}

View File

@ -21,6 +21,7 @@ import (
"testing"
"k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
@ -308,7 +309,7 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam
return expected.isAttached, expected.ret
}
func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
func (testcase *testcase) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) {
return nil, errors.New("Not implemented")
}

View File

@ -85,7 +85,7 @@ func (plugin *vsphereVolumePlugin) SupportsMountOption() bool {
}
func (plugin *vsphereVolumePlugin) SupportsBulkVolumeVerification() bool {
return false
return true
}
func (plugin *vsphereVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {