Updating vSphere Cloud Provider (VCP) to support k8s cluster spead across multiple ESXi clusters, datacenters or even vSphere vCenters

- vsphere.conf (cloud-config) is now needed only on master node
   - VCP uses OS hostname and not vSphere inventory name
   - VCP is now resilient to VM inventory name change and VM migration
pull/6/head
rohitjogvmw 2017-11-15 22:24:23 -08:00
parent 3ec7487c0f
commit 79e1da68d2
33 changed files with 1412 additions and 386 deletions

View File

@ -9,6 +9,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"nodemanager.go",
"vsphere.go",
"vsphere_util.go",
],
@ -21,13 +22,15 @@ go_library(
"//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/vmware/govmomi:go_default_library",
"//vendor/github.com/vmware/govmomi/object:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25/mo:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/gopkg.in/gcfg.v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -0,0 +1,295 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vsphere
import (
"fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"strings"
"sync"
)
// Stores info about the kubernetes node
type NodeInfo struct {
dataCenter *vclib.Datacenter
vm *vclib.VirtualMachine
vcServer string
}
type NodeManager struct {
// TODO: replace map with concurrent map when k8s supports go v1.9
// Maps the VC server to VSphereInstance
vsphereInstanceMap map[string]*VSphereInstance
// Maps node name to node info.
nodeInfoMap map[string]*NodeInfo
// Maps node name to node structure
registeredNodes map[string]*v1.Node
// Mutexes
registeredNodesLock sync.RWMutex
nodeInfoLock sync.RWMutex
}
type NodeDetails struct {
NodeName string
vm *vclib.VirtualMachine
}
// TODO: Make it configurable in vsphere.conf
const (
POOL_SIZE = 8
QUEUE_SIZE = POOL_SIZE * 10
)
func (nm *NodeManager) DiscoverNode(node *v1.Node) error {
type VmSearch struct {
vc string
datacenter *vclib.Datacenter
}
var mutex = &sync.Mutex{}
var globalErrMutex = &sync.Mutex{}
var queueChannel chan *VmSearch
var wg sync.WaitGroup
var globalErr *error
queueChannel = make(chan *VmSearch, QUEUE_SIZE)
nodeUUID := node.Status.NodeInfo.SystemUUID
vmFound := false
globalErr = nil
setGlobalErr := func(err error) {
globalErrMutex.Lock()
globalErr = &err
globalErrMutex.Unlock()
}
setVMFound := func(found bool) {
mutex.Lock()
vmFound = found
mutex.Unlock()
}
getVMFound := func() bool {
mutex.Lock()
found := vmFound
mutex.Unlock()
return found
}
go func() {
var datacenterObjs []*vclib.Datacenter
for vc, vsi := range nm.vsphereInstanceMap {
found := getVMFound()
if found == true {
break
}
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := vsi.conn.Connect(ctx)
if err != nil {
glog.V(4).Info("Discovering node error vc:", err)
setGlobalErr(err)
continue
}
if vsi.cfg.Datacenters == "" {
datacenterObjs, err = vclib.GetAllDatacenter(ctx, vsi.conn)
if err != nil {
glog.V(4).Info("Discovering node error dc:", err)
setGlobalErr(err)
continue
}
} else {
datacenters := strings.Split(vsi.cfg.Datacenters, ",")
for _, dc := range datacenters {
dc = strings.TrimSpace(dc)
if dc == "" {
continue
}
datacenterObj, err := vclib.GetDatacenter(ctx, vsi.conn, dc)
if err != nil {
glog.V(4).Info("Discovering node error dc:", err)
setGlobalErr(err)
continue
}
datacenterObjs = append(datacenterObjs, datacenterObj)
}
}
for _, datacenterObj := range datacenterObjs {
found := getVMFound()
if found == true {
break
}
glog.V(4).Infof("Finding node %s in vc=%s and datacenter=%s", node.Name, vc, datacenterObj.Name())
queueChannel <- &VmSearch{
vc: vc,
datacenter: datacenterObj,
}
}
}
close(queueChannel)
}()
for i := 0; i < POOL_SIZE; i++ {
go func() {
for res := range queueChannel {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vm, err := res.datacenter.GetVMByUUID(ctx, nodeUUID)
if err != nil {
glog.V(4).Infof("Error %q while looking for vm=%+v in vc=%s and datacenter=%s",
err, node.Name, vm, res.vc, res.datacenter.Name())
if err != vclib.ErrNoVMFound {
setGlobalErr(err)
} else {
glog.V(4).Infof("Did not find node %s in vc=%s and datacenter=%s",
node.Name, res.vc, res.datacenter.Name(), err)
}
continue
}
if vm != nil {
glog.V(4).Infof("Found node %s as vm=%+v in vc=%s and datacenter=%s",
node.Name, vm, res.vc, res.datacenter.Name())
nodeInfo := &NodeInfo{dataCenter: res.datacenter, vm: vm, vcServer: res.vc}
nm.addNodeInfo(node.ObjectMeta.Name, nodeInfo)
for range queueChannel {
}
setVMFound(true)
break
}
}
wg.Done()
}()
wg.Add(1)
}
wg.Wait()
if vmFound {
return nil
}
if globalErr != nil {
return *globalErr
}
glog.V(4).Infof("Discovery Node: %q vm not found", node.Name)
return vclib.ErrNoVMFound
}
func (nm *NodeManager) RegisterNode(node *v1.Node) error {
nm.addNode(node)
nm.DiscoverNode(node)
return nil
}
func (nm *NodeManager) UnRegisterNode(node *v1.Node) error {
nm.removeNode(node)
return nil
}
func (nm *NodeManager) RediscoverNode(nodeName k8stypes.NodeName) error {
node, err := nm.GetNode(nodeName)
if err != nil {
return err
}
return nm.DiscoverNode(&node)
}
func (nm *NodeManager) GetNode(nodeName k8stypes.NodeName) (v1.Node, error) {
nm.registeredNodesLock.RLock()
node := nm.registeredNodes[convertToString(nodeName)]
nm.registeredNodesLock.RUnlock()
if node == nil {
return v1.Node{}, vclib.ErrNoVMFound
}
return *node, nil
}
func (nm *NodeManager) addNode(node *v1.Node) {
nm.registeredNodesLock.Lock()
nm.registeredNodes[node.ObjectMeta.Name] = node
nm.registeredNodesLock.Unlock()
}
func (nm *NodeManager) removeNode(node *v1.Node) {
nm.registeredNodesLock.Lock()
delete(nm.registeredNodes, node.ObjectMeta.Name)
nm.registeredNodesLock.Unlock()
}
// GetNodeInfo returns a NodeInfo which datacenter, vm and vc server ip address.
// This method returns an error if it is unable find node VCs and DCs listed in vSphere.conf
// NodeInfo returned may not be updated to reflect current VM location.
func (nm *NodeManager) GetNodeInfo(nodeName k8stypes.NodeName) (NodeInfo, error) {
getNodeInfo := func(nodeName k8stypes.NodeName) *NodeInfo {
nm.nodeInfoLock.RLock()
nodeInfo := nm.nodeInfoMap[convertToString(nodeName)]
nm.nodeInfoLock.RUnlock()
return nodeInfo
}
nodeInfo := getNodeInfo(nodeName)
if nodeInfo == nil {
err := nm.RediscoverNode(nodeName)
if err != nil {
glog.V(4).Infof("error %q node info for node %q not found", err, convertToString(nodeName))
return NodeInfo{}, err
}
nodeInfo = getNodeInfo(nodeName)
}
return *nodeInfo, nil
}
func (nm *NodeManager) GetNodeDetails() []NodeDetails {
nm.nodeInfoLock.RLock()
defer nm.nodeInfoLock.RUnlock()
var nodeDetails []NodeDetails
for nodeName, nodeInfo := range nm.nodeInfoMap {
nodeDetails = append(nodeDetails, NodeDetails{nodeName, nodeInfo.vm})
}
return nodeDetails
}
func (nm *NodeManager) addNodeInfo(nodeName string, nodeInfo *NodeInfo) {
nm.nodeInfoLock.Lock()
nm.nodeInfoMap[nodeName] = nodeInfo
nm.nodeInfoLock.Unlock()
}
func (nm *NodeManager) GetVSphereInstance(nodeName k8stypes.NodeName) (VSphereInstance, error) {
nodeInfo, err := nm.GetNodeInfo(nodeName)
if err != nil {
glog.V(4).Infof("node info for node %q not found", convertToString(nodeName))
return VSphereInstance{}, err
}
vsphereInstance := nm.vsphereInstanceMap[nodeInfo.vcServer]
if vsphereInstance == nil {
return VSphereInstance{}, fmt.Errorf("vSphereInstance for vc server %q not found while looking for node %q", nodeInfo.vcServer, convertToString(nodeName))
}
return *vsphereInstance, nil
}

View File

@ -25,6 +25,7 @@ const (
NoDevicesFoundErrMsg = "No devices found"
DiskNotFoundErrMsg = "No vSphere disk ID found"
InvalidVolumeOptionsErrMsg = "VolumeOptions verification failed"
NoVMFoundErrMsg = "No VM found"
)
// Error constants
@ -34,4 +35,5 @@ var (
ErrNoDevicesFound = errors.New(NoDevicesFoundErrMsg)
ErrNoDiskIDFound = errors.New(DiskNotFoundErrMsg)
ErrInvalidVolumeOptions = errors.New(InvalidVolumeOptionsErrMsg)
ErrNoVMFound = errors.New(NoVMFoundErrMsg)
)

View File

@ -49,6 +49,22 @@ func GetDatacenter(ctx context.Context, connection *VSphereConnection, datacente
return &dc, nil
}
// GetAllDatacenter returns all the DataCenter Objects
func GetAllDatacenter(ctx context.Context, connection *VSphereConnection) ([]*Datacenter, error) {
var dc []*Datacenter
finder := find.NewFinder(connection.GoVmomiClient.Client, true)
datacenters, err := finder.DatacenterList(ctx, "*")
if err != nil {
glog.Errorf("Failed to find the datacenter. err: %+v", err)
return nil, err
}
for _, datacenter := range datacenters {
dc = append(dc, &(Datacenter{datacenter}))
}
return dc, nil
}
// GetVMByUUID gets the VM object from the given vmUUID
func (dc *Datacenter) GetVMByUUID(ctx context.Context, vmUUID string) (*VirtualMachine, error) {
s := object.NewSearchIndex(dc.Client())
@ -60,7 +76,7 @@ func (dc *Datacenter) GetVMByUUID(ctx context.Context, vmUUID string) (*VirtualM
}
if svm == nil {
glog.Errorf("Unable to find VM by UUID. VM UUID: %s", vmUUID)
return nil, fmt.Errorf("Failed to find VM by UUID: %s", vmUUID)
return nil, ErrNoVMFound
}
virtualMachine := VirtualMachine{object.NewVirtualMachine(dc.Client(), svm.Reference()), dc}
return &virtualMachine, nil
@ -79,6 +95,41 @@ func (dc *Datacenter) GetVMByPath(ctx context.Context, vmPath string) (*VirtualM
return &virtualMachine, nil
}
// GetAllDatastores gets the datastore URL to DatastoreInfo map for all the datastores in
// the datacenter.
func (dc *Datacenter) GetAllDatastores(ctx context.Context) (map[string]*DatastoreInfo, error) {
finder := getFinder(dc)
datastores, err := finder.DatastoreList(ctx, "*")
if err != nil {
glog.Errorf("Failed to get all the datastores. err: %+v", err)
return nil, err
}
var dsList []types.ManagedObjectReference
for _, ds := range datastores {
dsList = append(dsList, ds.Reference())
}
var dsMoList []mo.Datastore
pc := property.DefaultCollector(dc.Client())
properties := []string{DatastoreInfoProperty}
err = pc.Retrieve(ctx, dsList, properties, &dsMoList)
if err != nil {
glog.Errorf("Failed to get Datastore managed objects from datastore objects."+
" dsObjList: %+v, properties: %+v, err: %v", dsList, properties, err)
return nil, err
}
dsURLInfoMap := make(map[string]*DatastoreInfo)
for _, dsMo := range dsMoList {
dsURLInfoMap[dsMo.Info.GetDatastoreInfo().Url] = &DatastoreInfo{
&Datastore{object.NewDatastore(dc.Client(), dsMo.Reference()),
dc},
dsMo.Info.GetDatastoreInfo()}
}
glog.V(9).Infof("dsURLInfoMap : %+v", dsURLInfoMap)
return dsURLInfoMap, nil
}
// GetDatastoreByPath gets the Datastore object from the given vmDiskPath
func (dc *Datacenter) GetDatastoreByPath(ctx context.Context, vmDiskPath string) (*Datastore, error) {
datastorePathObj := new(object.DatastorePath)
@ -109,6 +160,23 @@ func (dc *Datacenter) GetDatastoreByName(ctx context.Context, name string) (*Dat
return &datastore, nil
}
// GetResourcePool gets the resource pool for the given path
func (dc *Datacenter) GetResourcePool(ctx context.Context, computePath string) (*object.ResourcePool, error) {
finder := getFinder(dc)
var computeResource *object.ComputeResource
var err error
if computePath == "" {
computeResource, err = finder.DefaultComputeResource(ctx)
} else {
computeResource, err = finder.ComputeResource(ctx, computePath)
}
if err != nil {
glog.Errorf("Failed to get the ResourcePool for computePath '%s'. err: %+v", computePath, err)
return nil, err
}
return computeResource.ResourcePool(ctx)
}
// GetFolderByPath gets the Folder Object from the given folder path
// folderPath should be the full path to folder
func (dc *Datacenter) GetFolderByPath(ctx context.Context, folderPath string) (*Folder, error) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package vclib
import (
"fmt"
"github.com/golang/glog"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/property"
@ -32,6 +33,16 @@ type Datastore struct {
Datacenter *Datacenter
}
// DatastoreInfo is a structure to store the Datastore and it's Info.
type DatastoreInfo struct {
*Datastore
Info *types.DatastoreInfo
}
func (di DatastoreInfo) String() string {
return fmt.Sprintf("Datastore: %+v, datastore URL: %s", di.Datastore, di.Info.Url)
}
// CreateDirectory creates the directory at location specified by directoryPath.
// If the intermediate level folders do not exist, and the parameter createParents is true, all the non-existent folders are created.
// directoryPath must be in the format "[vsanDatastore] kubevols"

View File

@ -70,13 +70,13 @@ func (diskManager virtualDiskManager) Create(ctx context.Context, datastore *vcl
}
// Delete implements Disk's Delete interface
func (diskManager virtualDiskManager) Delete(ctx context.Context, datastore *vclib.Datastore) error {
func (diskManager virtualDiskManager) Delete(ctx context.Context, datacenter *vclib.Datacenter) error {
// Create a virtual disk manager
virtualDiskManager := object.NewVirtualDiskManager(datastore.Client())
diskPath := vclib.RemoveClusterFromVDiskPath(diskManager.diskPath)
virtualDiskManager := object.NewVirtualDiskManager(datacenter.Client())
diskPath := vclib.RemoveStorageClusterORFolderNameFromVDiskPath(diskManager.diskPath)
requestTime := time.Now()
// Delete virtual disk
task, err := virtualDiskManager.DeleteVirtualDisk(ctx, diskPath, datastore.Datacenter.Datacenter)
task, err := virtualDiskManager.DeleteVirtualDisk(ctx, diskPath, datacenter.Datacenter)
if err != nil {
glog.Errorf("Failed to delete virtual disk. err: %v", err)
vclib.RecordvSphereMetric(vclib.APIDeleteVolume, requestTime, err)

View File

@ -40,7 +40,7 @@ const (
// VirtualDiskProvider defines interfaces for creating disk
type VirtualDiskProvider interface {
Create(ctx context.Context, datastore *vclib.Datastore) (string, error)
Delete(ctx context.Context, datastore *vclib.Datastore) error
Delete(ctx context.Context, datacenter *vclib.Datacenter) error
}
// getDiskManager returns vmDiskManager or vdmDiskManager based on given volumeoptions
@ -75,6 +75,6 @@ func (virtualDisk *VirtualDisk) Create(ctx context.Context, datastore *vclib.Dat
}
// Delete gets appropriate disk manager and calls respective delete method
func (virtualDisk *VirtualDisk) Delete(ctx context.Context, datastore *vclib.Datastore) error {
return getDiskManager(virtualDisk, VirtualDiskDeleteOperation).Delete(ctx, datastore)
func (virtualDisk *VirtualDisk) Delete(ctx context.Context, datacenter *vclib.Datacenter) error {
return getDiskManager(virtualDisk, VirtualDiskDeleteOperation).Delete(ctx, datacenter)
}

View File

@ -157,7 +157,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
return vmdisk.diskPath, nil
}
func (vmdisk vmDiskManager) Delete(ctx context.Context, datastore *vclib.Datastore) error {
func (vmdisk vmDiskManager) Delete(ctx context.Context, datacenter *vclib.Datacenter) error {
return fmt.Errorf("vmDiskManager.Delete is not supported")
}

View File

@ -85,7 +85,7 @@ func (pbmClient *PbmClient) IsDatastoreCompatible(ctx context.Context, storagePo
// GetCompatibleDatastores filters and returns compatible list of datastores for given storage policy id
// For Non Compatible Datastores, fault message with the Datastore Name is also returned
func (pbmClient *PbmClient) GetCompatibleDatastores(ctx context.Context, storagePolicyID string, datastores []*Datastore) ([]*Datastore, string, error) {
func (pbmClient *PbmClient) GetCompatibleDatastores(ctx context.Context, dc *Datacenter, storagePolicyID string, datastores []*DatastoreInfo) ([]*DatastoreInfo, string, error) {
var (
dsMorNameMap = getDsMorNameMap(ctx, datastores)
localizedMessagesForNotCompatibleDatastores = ""
@ -96,7 +96,7 @@ func (pbmClient *PbmClient) GetCompatibleDatastores(ctx context.Context, storage
return nil, "", err
}
compatibleHubs := compatibilityResult.CompatibleDatastores()
var compatibleDatastoreList []*Datastore
var compatibleDatastoreList []*DatastoreInfo
for _, hub := range compatibleHubs {
compatibleDatastoreList = append(compatibleDatastoreList, getDatastoreFromPlacementHub(datastores, hub))
}
@ -121,7 +121,7 @@ func (pbmClient *PbmClient) GetCompatibleDatastores(ctx context.Context, storage
}
// GetPlacementCompatibilityResult gets placement compatibility result based on storage policy requirements.
func (pbmClient *PbmClient) GetPlacementCompatibilityResult(ctx context.Context, storagePolicyID string, datastore []*Datastore) (pbm.PlacementCompatibilityResult, error) {
func (pbmClient *PbmClient) GetPlacementCompatibilityResult(ctx context.Context, storagePolicyID string, datastore []*DatastoreInfo) (pbm.PlacementCompatibilityResult, error) {
var hubs []pbmtypes.PbmPlacementHub
for _, ds := range datastore {
hubs = append(hubs, pbmtypes.PbmPlacementHub{
@ -145,7 +145,7 @@ func (pbmClient *PbmClient) GetPlacementCompatibilityResult(ctx context.Context,
}
// getDataStoreForPlacementHub returns matching datastore associated with given pbmPlacementHub
func getDatastoreFromPlacementHub(datastore []*Datastore, pbmPlacementHub pbmtypes.PbmPlacementHub) *Datastore {
func getDatastoreFromPlacementHub(datastore []*DatastoreInfo, pbmPlacementHub pbmtypes.PbmPlacementHub) *DatastoreInfo {
for _, ds := range datastore {
if ds.Reference().Type == pbmPlacementHub.HubType && ds.Reference().Value == pbmPlacementHub.HubId {
return ds
@ -155,7 +155,7 @@ func getDatastoreFromPlacementHub(datastore []*Datastore, pbmPlacementHub pbmtyp
}
// getDsMorNameMap returns map of ds Mor and Datastore Object Name
func getDsMorNameMap(ctx context.Context, datastores []*Datastore) map[string]string {
func getDsMorNameMap(ctx context.Context, datastores []*DatastoreInfo) map[string]string {
dsMorNameMap := make(map[string]string)
for _, ds := range datastores {
dsObjectName, err := ds.ObjectName(ctx)

View File

@ -25,6 +25,8 @@ import (
"github.com/golang/glog"
"github.com/vmware/govmomi/find"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/soap"
"github.com/vmware/govmomi/vim25/types"
)
@ -121,10 +123,10 @@ func getSCSIControllers(vmDevices object.VirtualDeviceList) []*types.VirtualCont
return scsiControllers
}
// RemoveClusterFromVDiskPath removes the cluster or folder path from the vDiskPath
// RemoveStorageClusterORFolderNameFromVDiskPath removes the cluster or folder path from the vDiskPath
// for vDiskPath [DatastoreCluster/sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk, return value is [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk
// for vDiskPath [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk, return value remains same [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk
func RemoveClusterFromVDiskPath(vDiskPath string) string {
func RemoveStorageClusterORFolderNameFromVDiskPath(vDiskPath string) string {
datastore := regexp.MustCompile("\\[(.*?)\\]").FindStringSubmatch(vDiskPath)[1]
if filepath.Base(datastore) != datastore {
vDiskPath = strings.Replace(vDiskPath, datastore, filepath.Base(datastore), 1)
@ -172,3 +174,40 @@ func IsValidUUID(uuid string) bool {
r := regexp.MustCompile("^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$")
return r.MatchString(uuid)
}
// IsManagedObjectNotFoundError returns true if error is of type ManagedObjectNotFound
func IsManagedObjectNotFoundError(err error) bool {
isManagedObjectNotFoundError := false
if soap.IsSoapFault(err) {
_, isManagedObjectNotFoundError = soap.ToSoapFault(err).VimFault().(types.ManagedObjectNotFound)
}
return isManagedObjectNotFoundError
}
// VerifyVolumePathsForVM verifies if the volume paths (volPaths) are attached to VM.
func VerifyVolumePathsForVM(vmMo mo.VirtualMachine, volPaths []string, nodeName string, nodeVolumeMap map[string]map[string]bool) {
// Verify if the volume paths are present on the VM backing virtual disk devices
vmDevices := object.VirtualDeviceList(vmMo.Config.Hardware.Device)
VerifyVolumePathsForVMDevices(vmDevices, volPaths, nodeName, nodeVolumeMap)
}
// VerifyVolumePathsForVMDevices verifies if the volume paths (volPaths) are attached to VM.
func VerifyVolumePathsForVMDevices(vmDevices object.VirtualDeviceList, volPaths []string, nodeName string, nodeVolumeMap map[string]map[string]bool) {
volPathsMap := make(map[string]bool)
for _, volPath := range volPaths {
volPathsMap[volPath] = true
}
// Verify if the volume paths are present on the VM backing virtual disk devices
for _, device := range vmDevices {
if vmDevices.TypeName(device) == "VirtualDisk" {
virtualDevice := device.GetVirtualDevice()
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
if volPathsMap[backing.FileName] {
setNodeVolumeMap(nodeVolumeMap, backing.FileName, nodeName, true)
}
}
}
}
}

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/property"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
)
@ -63,7 +64,7 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol
return "", fmt.Errorf("Not a valid SCSI Controller Type. Valid options are %q", SCSIControllerTypeValidOptions())
}
vmDiskPathCopy := vmDiskPath
vmDiskPath = RemoveClusterFromVDiskPath(vmDiskPath)
vmDiskPath = RemoveStorageClusterORFolderNameFromVDiskPath(vmDiskPath)
attached, err := vm.IsDiskAttached(ctx, vmDiskPath)
if err != nil {
glog.Errorf("Error occurred while checking if disk is attached on VM: %q. vmDiskPath: %q, err: %+v", vm.InventoryPath, vmDiskPath, err)
@ -75,6 +76,20 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol
return diskUUID, nil
}
if volumeOptions.StoragePolicyName != "" {
pbmClient, err := NewPbmClient(ctx, vm.Client())
if err != nil {
glog.Errorf("Error occurred while creating new pbmClient. err: %+v", err)
return "", err
}
volumeOptions.StoragePolicyID, err = pbmClient.ProfileIDByName(ctx, volumeOptions.StoragePolicyName)
if err != nil {
glog.Errorf("Failed to get Profile ID by name: %s. err: %+v", volumeOptions.StoragePolicyName, err)
return "", err
}
}
dsObj, err := vm.Datacenter.GetDatastoreByPath(ctx, vmDiskPathCopy)
if err != nil {
glog.Errorf("Failed to get datastore from vmDiskPath: %q. err: %+v", vmDiskPath, err)
@ -139,7 +154,7 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol
// DetachDisk detaches the disk specified by vmDiskPath
func (vm *VirtualMachine) DetachDisk(ctx context.Context, vmDiskPath string) error {
vmDiskPath = RemoveClusterFromVDiskPath(vmDiskPath)
vmDiskPath = RemoveStorageClusterORFolderNameFromVDiskPath(vmDiskPath)
device, err := vm.getVirtualDeviceByPath(ctx, vmDiskPath)
if err != nil {
glog.Errorf("Disk ID not found for VM: %q with diskPath: %q", vm.InventoryPath, vmDiskPath)
@ -186,7 +201,7 @@ func (vm *VirtualMachine) IsActive(ctx context.Context) (bool, error) {
}
// GetAllAccessibleDatastores gets the list of accessible Datastores for the given Virtual Machine
func (vm *VirtualMachine) GetAllAccessibleDatastores(ctx context.Context) ([]*Datastore, error) {
func (vm *VirtualMachine) GetAllAccessibleDatastores(ctx context.Context) ([]*DatastoreInfo, error) {
host, err := vm.HostSystem(ctx)
if err != nil {
glog.Errorf("Failed to get host system for VM: %q. err: %+v", vm.InventoryPath, err)
@ -199,9 +214,28 @@ func (vm *VirtualMachine) GetAllAccessibleDatastores(ctx context.Context) ([]*Da
glog.Errorf("Failed to retrieve datastores for host: %+v. err: %+v", host, err)
return nil, err
}
var dsObjList []*Datastore
var dsRefList []types.ManagedObjectReference
for _, dsRef := range hostSystemMo.Datastore {
dsObjList = append(dsObjList, &Datastore{object.NewDatastore(vm.Client(), dsRef), vm.Datacenter})
dsRefList = append(dsRefList, dsRef)
}
var dsMoList []mo.Datastore
pc := property.DefaultCollector(vm.Client())
properties := []string{DatastoreInfoProperty}
err = pc.Retrieve(ctx, dsRefList, properties, &dsMoList)
if err != nil {
glog.Errorf("Failed to get Datastore managed objects from datastore objects."+
" dsObjList: %+v, properties: %+v, err: %v", dsRefList, properties, err)
return nil, err
}
glog.V(9).Infof("Result dsMoList: %+v", dsMoList)
var dsObjList []*DatastoreInfo
for _, dsMo := range dsMoList {
dsObjList = append(dsObjList,
&DatastoreInfo{
&Datastore{object.NewDatastore(vm.Client(), dsMo.Reference()),
vm.Datacenter},
dsMo.Info.GetDatastoreInfo()})
}
return dsObjList, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -39,7 +39,7 @@ func configFromEnv() (cfg VSphereConfig, ok bool) {
cfg.Global.Password = os.Getenv("VSPHERE_PASSWORD")
cfg.Global.Datacenter = os.Getenv("VSPHERE_DATACENTER")
cfg.Network.PublicNetwork = os.Getenv("VSPHERE_PUBLIC_NETWORK")
cfg.Global.Datastore = os.Getenv("VSPHERE_DATASTORE")
cfg.Global.DefaultDatastore = os.Getenv("VSPHERE_DATASTORE")
cfg.Disk.SCSIControllerType = os.Getenv("VSPHERE_SCSICONTROLLER_TYPE")
cfg.Global.WorkingDir = os.Getenv("VSPHERE_WORKING_DIR")
cfg.Global.VMName = os.Getenv("VSPHERE_VM_NAME")
@ -103,7 +103,7 @@ func TestNewVSphere(t *testing.T) {
t.Skipf("No config found in environment")
}
_, err := newVSphere(cfg)
_, err := newControllerNode(cfg)
if err != nil {
t.Fatalf("Failed to construct/authenticate vSphere: %s", err)
}
@ -116,7 +116,7 @@ func TestVSphereLogin(t *testing.T) {
}
// Create vSphere configuration object
vs, err := newVSphere(cfg)
vs, err := newControllerNode(cfg)
if err != nil {
t.Fatalf("Failed to construct/authenticate vSphere: %s", err)
}
@ -126,11 +126,16 @@ func TestVSphereLogin(t *testing.T) {
defer cancel()
// Create vSphere client
err = vs.conn.Connect(ctx)
var vcInstance *VSphereInstance
if vcInstance, ok = vs.vsphereInstanceMap[cfg.Global.VCenterIP]; !ok {
t.Fatalf("Couldn't get vSphere instance: %s", cfg.Global.VCenterIP)
}
err = vcInstance.conn.Connect(ctx)
if err != nil {
t.Errorf("Failed to connect to vSphere: %s", err)
}
defer vs.conn.GoVmomiClient.Logout(ctx)
defer vcInstance.conn.GoVmomiClient.Logout(ctx)
}
func TestZones(t *testing.T) {
@ -154,7 +159,7 @@ func TestInstances(t *testing.T) {
t.Skipf("No config found in environment")
}
vs, err := newVSphere(cfg)
vs, err := newControllerNode(cfg)
if err != nil {
t.Fatalf("Failed to construct/authenticate vSphere: %s", err)
}
@ -213,7 +218,7 @@ func TestVolumes(t *testing.T) {
t.Skipf("No config found in environment")
}
vs, err := newVSphere(cfg)
vs, err := newControllerNode(cfg)
if err != nil {
t.Fatalf("Failed to construct/authenticate vSphere: %s", err)
}

View File

@ -28,14 +28,16 @@ import (
"github.com/golang/glog"
"github.com/vmware/govmomi"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/mo"
"fmt"
"github.com/vmware/govmomi/vim25/mo"
"k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers"
"path/filepath"
)
const (
@ -55,10 +57,28 @@ func GetVSphere() (*VSphere, error) {
return nil, err
}
vSphereConn.GoVmomiClient = client
vsphereIns := &VSphereInstance{
conn: vSphereConn,
cfg: &VirtualCenterConfig{
User: cfg.Global.User,
Password: cfg.Global.Password,
VCenterPort: cfg.Global.VCenterPort,
Datacenters: cfg.Global.Datacenters,
RoundTripperCount: cfg.Global.RoundTripperCount,
},
}
vsphereInsMap := make(map[string]*VSphereInstance)
vsphereInsMap[cfg.Global.VCenterIP] = vsphereIns
// TODO: Initialize nodeManager and set it in VSphere.
vs := &VSphere{
conn: vSphereConn,
cfg: cfg,
localInstanceID: "",
vsphereInstanceMap: vsphereInsMap,
hostName: "",
cfg: cfg,
nodeManager: &NodeManager{
vsphereInstanceMap: vsphereInsMap,
nodeInfoMap: make(map[string]*NodeInfo),
registeredNodes: make(map[string]*v1.Node),
},
}
runtime.SetFinalizer(vs, logout)
return vs, nil
@ -70,14 +90,18 @@ func getVSphereConfig() *VSphereConfig {
cfg.Global.VCenterPort = os.Getenv("VSPHERE_VCENTER_PORT")
cfg.Global.User = os.Getenv("VSPHERE_USER")
cfg.Global.Password = os.Getenv("VSPHERE_PASSWORD")
cfg.Global.Datacenter = os.Getenv("VSPHERE_DATACENTER")
cfg.Global.Datastore = os.Getenv("VSPHERE_DATASTORE")
cfg.Global.Datacenters = os.Getenv("VSPHERE_DATACENTER")
cfg.Global.DefaultDatastore = os.Getenv("VSPHERE_DATASTORE")
cfg.Global.WorkingDir = os.Getenv("VSPHERE_WORKING_DIR")
cfg.Global.VMName = os.Getenv("VSPHERE_VM_NAME")
cfg.Global.InsecureFlag = false
if strings.ToLower(os.Getenv("VSPHERE_INSECURE")) == "true" {
cfg.Global.InsecureFlag = true
}
cfg.Workspace.VCenterIP = cfg.Global.VCenterIP
cfg.Workspace.Datacenter = cfg.Global.Datacenters
cfg.Workspace.DefaultDatastore = cfg.Global.DefaultDatastore
cfg.Workspace.Folder = cfg.Global.WorkingDir
return &cfg
}
@ -127,49 +151,83 @@ func getvmUUID() (string, error) {
return uuid, nil
}
// Get all datastores accessible for the virtual machine object.
func getSharedDatastoresInK8SCluster(ctx context.Context, folder *vclib.Folder) ([]*vclib.Datastore, error) {
vmList, err := folder.GetVirtualMachines(ctx)
// Returns the accessible datastores for the given node VM.
func getAccessibleDatastores(ctx context.Context, nodeVmDetail *NodeDetails, nodeManager *NodeManager) ([]*vclib.DatastoreInfo, error) {
accessibleDatastores, err := nodeVmDetail.vm.GetAllAccessibleDatastores(ctx)
if err != nil {
glog.Errorf("Failed to get virtual machines in the kubernetes cluster: %s, err: %+v", folder.InventoryPath, err)
return nil, err
// Check if the node VM is not found which indicates that the node info in the node manager is stale.
// If so, rediscover the node and retry.
if vclib.IsManagedObjectNotFoundError(err) {
glog.V(4).Infof("error %q ManagedObjectNotFound for node %q. Rediscovering...", err, nodeVmDetail.NodeName)
err = nodeManager.RediscoverNode(convertToK8sType(nodeVmDetail.NodeName))
if err == nil {
glog.V(4).Infof("Discovered node %s successfully", nodeVmDetail.NodeName)
nodeInfo, err := nodeManager.GetNodeInfo(convertToK8sType(nodeVmDetail.NodeName))
if err != nil {
glog.V(4).Infof("error %q getting node info for node %+v", err, nodeVmDetail)
return nil, err
}
accessibleDatastores, err = nodeInfo.vm.GetAllAccessibleDatastores(ctx)
if err != nil {
glog.V(4).Infof("error %q getting accessible datastores for node %+v", err, nodeVmDetail)
return nil, err
}
} else {
glog.V(4).Infof("error %q rediscovering node %+v", err, nodeVmDetail)
return nil, err
}
} else {
glog.V(4).Infof("error %q getting accessible datastores for node %+v", err, nodeVmDetail)
return nil, err
}
}
if vmList == nil || len(vmList) == 0 {
glog.Errorf("No virtual machines found in the kubernetes cluster: %s", folder.InventoryPath)
return nil, fmt.Errorf("No virtual machines found in the kubernetes cluster: %s", folder.InventoryPath)
return accessibleDatastores, nil
}
// Get all datastores accessible for the virtual machine object.
func getSharedDatastoresInK8SCluster(ctx context.Context, dc *vclib.Datacenter, nodeManager *NodeManager) ([]*vclib.DatastoreInfo, error) {
nodeVmDetails := nodeManager.GetNodeDetails()
if nodeVmDetails == nil || len(nodeVmDetails) == 0 {
msg := fmt.Sprintf("Kubernetes node nodeVmDetail details is empty. nodeVmDetails : %+v", nodeVmDetails)
glog.Error(msg)
return nil, fmt.Errorf(msg)
}
index := 0
var sharedDatastores []*vclib.Datastore
for _, vm := range vmList {
vmName, err := vm.ObjectName(ctx)
var sharedDatastores []*vclib.DatastoreInfo
for index, nodeVmDetail := range nodeVmDetails {
glog.V(9).Infof("Getting accessible datastores for node %s", nodeVmDetail.NodeName)
accessibleDatastores, err := getAccessibleDatastores(ctx, &nodeVmDetail, nodeManager)
if err != nil {
return nil, err
}
if !strings.HasPrefix(vmName, DummyVMPrefixName) {
accessibleDatastores, err := vm.GetAllAccessibleDatastores(ctx)
if err != nil {
return nil, err
if index == 0 {
sharedDatastores = accessibleDatastores
} else {
sharedDatastores = intersect(sharedDatastores, accessibleDatastores)
if len(sharedDatastores) == 0 {
return nil, fmt.Errorf("No shared datastores found in the Kubernetes cluster for nodeVmDetails: %+v", nodeVmDetails)
}
if index == 0 {
sharedDatastores = accessibleDatastores
} else {
sharedDatastores = intersect(sharedDatastores, accessibleDatastores)
if len(sharedDatastores) == 0 {
return nil, fmt.Errorf("No shared datastores found in the Kubernetes cluster: %s", folder.InventoryPath)
}
}
index++
}
}
glog.V(9).Infof("sharedDatastores : %+v", sharedDatastores)
sharedDatastores, err := getDatastoresForEndpointVC(ctx, dc, sharedDatastores)
if err != nil {
glog.Errorf("Failed to get shared datastores from endpoint VC. err: %+v", err)
return nil, err
}
glog.V(9).Infof("sharedDatastores at endpoint VC: %+v", sharedDatastores)
return sharedDatastores, nil
}
func intersect(list1 []*vclib.Datastore, list2 []*vclib.Datastore) []*vclib.Datastore {
var sharedDs []*vclib.Datastore
func intersect(list1 []*vclib.DatastoreInfo, list2 []*vclib.DatastoreInfo) []*vclib.DatastoreInfo {
glog.V(9).Infof("list1: %+v", list1)
glog.V(9).Infof("list2: %+v", list2)
var sharedDs []*vclib.DatastoreInfo
for _, val1 := range list1 {
// Check if val1 is found in list2
for _, val2 := range list2 {
if val1.Reference().Value == val2.Reference().Value {
// Intersection is performed based on the datastoreUrl as this uniquely identifies the datastore.
if val1.Info.Url == val2.Info.Url {
sharedDs = append(sharedDs, val1)
break
}
@ -178,46 +236,42 @@ func intersect(list1 []*vclib.Datastore, list2 []*vclib.Datastore) []*vclib.Data
return sharedDs
}
// Get the datastores accessible for the virtual machine object.
func getAllAccessibleDatastores(ctx context.Context, client *vim25.Client, vmMo mo.VirtualMachine) ([]string, error) {
host := vmMo.Summary.Runtime.Host
if host == nil {
return nil, errors.New("VM doesn't have a HostSystem")
}
var hostSystemMo mo.HostSystem
s := object.NewSearchIndex(client)
err := s.Properties(ctx, host.Reference(), []string{DatastoreProperty}, &hostSystemMo)
if err != nil {
return nil, err
}
var dsRefValues []string
for _, dsRef := range hostSystemMo.Datastore {
dsRefValues = append(dsRefValues, dsRef.Value)
}
return dsRefValues, nil
}
// getMostFreeDatastore gets the best fit compatible datastore by free space.
func getMostFreeDatastoreName(ctx context.Context, client *vim25.Client, dsObjList []*vclib.Datastore) (string, error) {
dsMoList, err := dsObjList[0].Datacenter.GetDatastoreMoList(ctx, dsObjList, []string{DatastoreInfoProperty})
if err != nil {
return "", err
}
func getMostFreeDatastoreName(ctx context.Context, client *vim25.Client, dsInfoList []*vclib.DatastoreInfo) (string, error) {
var curMax int64
curMax = -1
var index int
for i, dsMo := range dsMoList {
dsFreeSpace := dsMo.Info.GetDatastoreInfo().FreeSpace
for i, dsInfo := range dsInfoList {
dsFreeSpace := dsInfo.Info.GetDatastoreInfo().FreeSpace
if dsFreeSpace > curMax {
curMax = dsFreeSpace
index = i
}
}
return dsMoList[index].Info.GetDatastoreInfo().Name, nil
return dsInfoList[index].Info.GetDatastoreInfo().Name, nil
}
func getPbmCompatibleDatastore(ctx context.Context, client *vim25.Client, storagePolicyName string, folder *vclib.Folder) (string, error) {
pbmClient, err := vclib.NewPbmClient(ctx, client)
// Returns the datastores in the given datacenter by performing lookup based on datastore URL.
func getDatastoresForEndpointVC(ctx context.Context, dc *vclib.Datacenter, sharedDsInfos []*vclib.DatastoreInfo) ([]*vclib.DatastoreInfo, error) {
var datastores []*vclib.DatastoreInfo
allDsInfoMap, err := dc.GetAllDatastores(ctx)
if err != nil {
return nil, err
}
for _, sharedDsInfo := range sharedDsInfos {
dsInfo, ok := allDsInfoMap[sharedDsInfo.Info.Url]
if ok {
datastores = append(datastores, dsInfo)
} else {
glog.V(4).Infof("Warning: Shared datastore with URL %s does not exist in endpoint VC", sharedDsInfo.Info.Url)
}
}
glog.V(9).Infof("Datastore from endpoint VC: %+v", datastores)
return datastores, nil
}
func getPbmCompatibleDatastore(ctx context.Context, dc *vclib.Datacenter, storagePolicyName string, nodeManager *NodeManager) (string, error) {
pbmClient, err := vclib.NewPbmClient(ctx, dc.Client())
if err != nil {
return "", err
}
@ -226,35 +280,40 @@ func getPbmCompatibleDatastore(ctx context.Context, client *vim25.Client, storag
glog.Errorf("Failed to get Profile ID by name: %s. err: %+v", storagePolicyName, err)
return "", err
}
sharedDsList, err := getSharedDatastoresInK8SCluster(ctx, folder)
sharedDs, err := getSharedDatastoresInK8SCluster(ctx, dc, nodeManager)
if err != nil {
glog.Errorf("Failed to get shared datastores from kubernetes cluster: %s. err: %+v", folder.InventoryPath, err)
glog.Errorf("Failed to get shared datastores. err: %+v", err)
return "", err
}
compatibleDatastores, _, err := pbmClient.GetCompatibleDatastores(ctx, storagePolicyID, sharedDsList)
if len(sharedDs) == 0 {
msg := "No shared datastores found in the endpoint virtual center"
glog.Errorf(msg)
return "", errors.New(msg)
}
compatibleDatastores, _, err := pbmClient.GetCompatibleDatastores(ctx, dc, storagePolicyID, sharedDs)
if err != nil {
glog.Errorf("Failed to get compatible datastores from datastores : %+v with storagePolicy: %s. err: %+v", sharedDsList, storagePolicyID, err)
glog.Errorf("Failed to get compatible datastores from datastores : %+v with storagePolicy: %s. err: %+v",
sharedDs, storagePolicyID, err)
return "", err
}
datastore, err := getMostFreeDatastoreName(ctx, client, compatibleDatastores)
glog.V(9).Infof("compatibleDatastores : %+v", compatibleDatastores)
datastore, err := getMostFreeDatastoreName(ctx, dc.Client(), compatibleDatastores)
if err != nil {
glog.Errorf("Failed to get most free datastore from compatible datastores: %+v. err: %+v", compatibleDatastores, err)
return "", err
}
glog.V(4).Infof("Most free datastore : %+s", datastore)
return datastore, err
}
func (vs *VSphere) setVMOptions(ctx context.Context, dc *vclib.Datacenter) (*vclib.VMOptions, error) {
func (vs *VSphere) setVMOptions(ctx context.Context, dc *vclib.Datacenter, resourcePoolPath string) (*vclib.VMOptions, error) {
var vmOptions vclib.VMOptions
vm, err := dc.GetVMByPath(ctx, vs.cfg.Global.WorkingDir+"/"+vs.localInstanceID)
resourcePool, err := dc.GetResourcePool(ctx, resourcePoolPath)
if err != nil {
return nil, err
}
resourcePool, err := vm.GetResourcePool(ctx)
if err != nil {
return nil, err
}
folder, err := dc.GetFolderByPath(ctx, vs.cfg.Global.WorkingDir)
glog.V(9).Infof("Resource pool path %s, resourcePool %+v", resourcePoolPath, resourcePool)
folder, err := dc.GetFolderByPath(ctx, vs.cfg.Workspace.Folder)
if err != nil {
return nil, err
}
@ -270,28 +329,27 @@ func (vs *VSphere) cleanUpDummyVMs(dummyVMPrefix string) {
defer cancel()
for {
time.Sleep(CleanUpDummyVMRoutineInterval * time.Minute)
// Ensure client is logged in and session is valid
err := vs.conn.Connect(ctx)
vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
if err != nil {
glog.V(4).Infof("Failed to connect to VC with err: %+v. Retrying again...", err)
glog.V(4).Infof("Failed to get VSphere instance with err: %+v. Retrying again...", err)
continue
}
dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter)
dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter)
if err != nil {
glog.V(4).Infof("Failed to get the datacenter: %s from VC. err: %+v", vs.cfg.Global.Datacenter, err)
glog.V(4).Infof("Failed to get the datacenter: %s from VC. err: %+v", vs.cfg.Workspace.Datacenter, err)
continue
}
// Get the folder reference for global working directory where the dummy VM needs to be created.
vmFolder, err := dc.GetFolderByPath(ctx, vs.cfg.Global.WorkingDir)
vmFolder, err := dc.GetFolderByPath(ctx, vs.cfg.Workspace.Folder)
if err != nil {
glog.V(4).Infof("Unable to get the kubernetes folder: %q reference. err: %+v", vs.cfg.Global.WorkingDir, err)
glog.V(4).Infof("Unable to get the kubernetes folder: %q reference. err: %+v", vs.cfg.Workspace.Folder, err)
continue
}
// A write lock is acquired to make sure the cleanUp routine doesn't delete any VM's created by ongoing PVC requests.
defer cleanUpDummyVMLock.Lock()
err = diskmanagers.CleanUpDummyVMs(ctx, vmFolder, dc)
if err != nil {
glog.V(4).Infof("Unable to clean up dummy VM's in the kubernetes cluster: %q. err: %+v", vs.cfg.Global.WorkingDir, err)
glog.V(4).Infof("Unable to clean up dummy VM's in the kubernetes cluster: %q. err: %+v", vs.cfg.Workspace.Folder, err)
}
}
}
@ -353,3 +411,118 @@ func setdatastoreFolderIDMap(
}
folderNameIDMap[folderName] = folderID
}
func convertVolPathToDevicePath(ctx context.Context, dc *vclib.Datacenter, volPath string) (string, error) {
volPath = vclib.RemoveStorageClusterORFolderNameFromVDiskPath(volPath)
// Get the canonical volume path for volPath.
canonicalVolumePath, err := getcanonicalVolumePath(ctx, dc, volPath)
if err != nil {
glog.Errorf("Failed to get canonical vsphere volume path for volume: %s. err: %+v", volPath, err)
return "", err
}
// Check if the volume path contains .vmdk extension. If not, add the extension and update the nodeVolumes Map
if len(canonicalVolumePath) > 0 && filepath.Ext(canonicalVolumePath) != ".vmdk" {
canonicalVolumePath += ".vmdk"
}
return canonicalVolumePath, nil
}
// convertVolPathsToDevicePaths removes cluster or folder path from volPaths and convert to canonicalPath
func (vs *VSphere) convertVolPathsToDevicePaths(ctx context.Context, nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName][]string, error) {
vmVolumes := make(map[k8stypes.NodeName][]string)
for nodeName, volPaths := range nodeVolumes {
nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName)
if err != nil {
return nil, err
}
_, err = vs.getVSphereInstanceForServer(nodeInfo.vcServer, ctx)
if err != nil {
return nil, err
}
for i, volPath := range volPaths {
deviceVolPath, err := convertVolPathToDevicePath(ctx, nodeInfo.dataCenter, volPath)
if err != nil {
glog.Errorf("Failed to convert vsphere volume path %s to device path for volume %s. err: %+v", volPath, deviceVolPath, err)
return nil, err
}
volPaths[i] = deviceVolPath
}
vmVolumes[nodeName] = volPaths
}
return vmVolumes, nil
}
// checkDiskAttached verifies volumes are attached to the VMs which are in same vCenter and Datacenter
// Returns nodes if exist any for which VM is not found in that vCenter and Datacenter
func (vs *VSphere) checkDiskAttached(ctx context.Context, nodes []k8stypes.NodeName, nodeVolumes map[k8stypes.NodeName][]string, attached map[string]map[string]bool, retry bool) ([]k8stypes.NodeName, error) {
var nodesToRetry []k8stypes.NodeName
var vmList []*vclib.VirtualMachine
var nodeInfo NodeInfo
var err error
for _, nodeName := range nodes {
nodeInfo, err = vs.nodeManager.GetNodeInfo(nodeName)
if err != nil {
return nodesToRetry, err
}
vmList = append(vmList, nodeInfo.vm)
}
// Making sure session is valid
_, err = vs.getVSphereInstanceForServer(nodeInfo.vcServer, ctx)
if err != nil {
return nodesToRetry, err
}
// If any of the nodes are not present property collector query will fail for entire operation
vmMoList, err := nodeInfo.dataCenter.GetVMMoList(ctx, vmList, []string{"config.hardware.device", "name", "config.uuid"})
if err != nil {
if vclib.IsManagedObjectNotFoundError(err) && !retry {
glog.V(4).Infof("checkDiskAttached: ManagedObjectNotFound for property collector query for nodes: %+v vms: %+v", nodes, vmList)
// Property Collector Query failed
// VerifyVolumePaths per VM
for _, nodeName := range nodes {
nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName)
if err != nil {
return nodesToRetry, err
}
devices, err := nodeInfo.vm.VirtualMachine.Device(ctx)
if err != nil {
if vclib.IsManagedObjectNotFoundError(err) {
glog.V(4).Infof("checkDiskAttached: ManagedObjectNotFound for Kubernetes node: %s with vSphere Virtual Machine reference: %v", nodeName, nodeInfo.vm)
nodesToRetry = append(nodesToRetry, nodeName)
continue
}
return nodesToRetry, err
}
glog.V(4).Infof("Verifying Volume Paths by devices for node %s and VM %s", nodeName, nodeInfo.vm)
vclib.VerifyVolumePathsForVMDevices(devices, nodeVolumes[nodeName], convertToString(nodeName), attached)
}
}
return nodesToRetry, err
}
vmMoMap := make(map[string]mo.VirtualMachine)
for _, vmMo := range vmMoList {
if vmMo.Config == nil {
glog.Errorf("Config is not available for VM: %q", vmMo.Name)
continue
}
glog.V(9).Infof("vmMoMap vmname: %q vmuuid: %s", vmMo.Name, strings.ToLower(vmMo.Config.Uuid))
vmMoMap[strings.ToLower(vmMo.Config.Uuid)] = vmMo
}
glog.V(9).Infof("vmMoMap: +%v", vmMoMap)
for _, nodeName := range nodes {
node, err := vs.nodeManager.GetNode(nodeName)
if err != nil {
return nodesToRetry, err
}
glog.V(9).Infof("Verifying volume for nodeName: %q with nodeuuid: %s", nodeName, node.Status.NodeInfo.SystemUUID, vmMoMap)
vclib.VerifyVolumePathsForVM(vmMoMap[strings.ToLower(node.Status.NodeInfo.SystemUUID)], nodeVolumes[nodeName], convertToString(nodeName), attached)
}
return nodesToRetry, nil
}

View File

@ -76,7 +76,7 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.No
// vsphereCloud.AttachDisk checks if disk is already attached to host and
// succeeds in that case, so no need to do that separately.
diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, volumeSource.StoragePolicyID, nodeName)
diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, volumeSource.StoragePolicyName, nodeName)
if err != nil {
glog.Errorf("Error attaching volume %q to node %q: %+v", volumeSource.VolumePath, nodeName, err)
return "", err

View File

@ -70,7 +70,7 @@ var _ = SIGDescribe("PersistentVolumes:vsphere", func() {
selector = metav1.SetAsLabelSelector(volLabel)
if vsp == nil {
vsp, err = vsphere.GetVSphere()
vsp, err = getVSphere(c)
Expect(err).NotTo(HaveOccurred())
}
if volumePath == "" {
@ -105,7 +105,7 @@ var _ = SIGDescribe("PersistentVolumes:vsphere", func() {
node = types.NodeName(clientPod.Spec.NodeName)
By("Verify disk should be attached to the node")
isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, node)
isAttached, err := verifyVSphereDiskAttached(c, vsp, volumePath, node)
Expect(err).NotTo(HaveOccurred())
Expect(isAttached).To(BeTrue(), "disk is not attached with the node")
})
@ -133,7 +133,11 @@ var _ = SIGDescribe("PersistentVolumes:vsphere", func() {
framework.AddCleanupAction(func() {
// Cleanup actions will be called even when the tests are skipped and leaves namespace unset.
if len(ns) > 0 && len(volumePath) > 0 {
framework.ExpectNoError(waitForVSphereDiskToDetach(vsp, volumePath, node))
client, err := framework.LoadClientset()
if err != nil {
return
}
framework.ExpectNoError(waitForVSphereDiskToDetach(client, vsp, volumePath, node))
vsp.DeleteVolume(volumePath)
}
})
@ -213,6 +217,6 @@ var _ = SIGDescribe("PersistentVolumes:vsphere", func() {
Expect(err).NotTo(HaveOccurred())
By("Verifying Persistent Disk detaches")
waitForVSphereDiskToDetach(vsp, volumePath, node)
waitForVSphereDiskToDetach(c, vsp, volumePath, node)
})
})

View File

@ -56,7 +56,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() {
})
AfterEach(func() {
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(c)
Expect(err).NotTo(HaveOccurred())
testCleanupVSpherePersistentVolumeReclaim(vsp, c, ns, volumePath, pv, pvc)
})
@ -74,7 +74,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() {
6. Verify PV is deleted automatically.
*/
It("should delete persistent volume when reclaimPolicy set to delete and associated claim is deleted", func() {
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(c)
Expect(err).NotTo(HaveOccurred())
volumePath, pv, pvc, err = testSetupVSpherePersistentVolumeReclaim(vsp, c, ns, v1.PersistentVolumeReclaimDelete)
@ -104,7 +104,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() {
9. Verify PV should be detached from the node and automatically deleted.
*/
It("should not detach and unmount PV when associated pvc with delete as reclaimPolicy is deleted when it is in use by the pod", func() {
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(c)
Expect(err).NotTo(HaveOccurred())
volumePath, pv, pvc, err = testSetupVSpherePersistentVolumeReclaim(vsp, c, ns, v1.PersistentVolumeReclaimDelete)
@ -127,19 +127,19 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() {
Expect(framework.WaitForPersistentVolumePhase(v1.VolumeFailed, c, pv.Name, 1*time.Second, 60*time.Second)).NotTo(HaveOccurred())
By("Verify the volume is attached to the node")
isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(vsp, pv.Spec.VsphereVolume.VolumePath, node)
isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(c, vsp, pv.Spec.VsphereVolume.VolumePath, node)
Expect(verifyDiskAttachedError).NotTo(HaveOccurred())
Expect(isVolumeAttached).To(BeTrue())
By("Verify the volume is accessible and available in the pod")
verifyVSphereVolumesAccessible(pod, []*v1.PersistentVolume{pv}, vsp)
verifyVSphereVolumesAccessible(c, pod, []*v1.PersistentVolume{pv}, vsp)
framework.Logf("Verified that Volume is accessible in the POD after deleting PV claim")
By("Deleting the Pod")
framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod), "Failed to delete pod ", pod.Name)
By("Verify PV is detached from the node after Pod is deleted")
Expect(waitForVSphereDiskToDetach(vsp, pv.Spec.VsphereVolume.VolumePath, types.NodeName(pod.Spec.NodeName))).NotTo(HaveOccurred())
Expect(waitForVSphereDiskToDetach(c, vsp, pv.Spec.VsphereVolume.VolumePath, types.NodeName(pod.Spec.NodeName))).NotTo(HaveOccurred())
By("Verify PV should be deleted automatically")
framework.ExpectNoError(framework.WaitForPersistentVolumeDeleted(c, pv.Name, 1*time.Second, 30*time.Second))
@ -167,7 +167,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() {
It("should retain persistent volume when reclaimPolicy set to retain when associated claim is deleted", func() {
var volumeFileContent = "hello from vsphere cloud provider, Random Content is :" + strconv.FormatInt(time.Now().UnixNano(), 10)
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(c)
Expect(err).NotTo(HaveOccurred())
volumePath, pv, pvc, err = testSetupVSpherePersistentVolumeReclaim(vsp, c, ns, v1.PersistentVolumeReclaimRetain)

View File

@ -23,7 +23,6 @@ import (
. "github.com/onsi/gomega"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
vsphere "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -104,7 +103,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:LabelSelector]", func() {
func testSetupVSpherePVClabelselector(c clientset.Interface, ns string, ssdlabels map[string]string, vvollabels map[string]string) (volumePath string, pv_ssd *v1.PersistentVolume, pvc_ssd *v1.PersistentVolumeClaim, pvc_vvol *v1.PersistentVolumeClaim, err error) {
volumePath = ""
By("creating vmdk")
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(c)
Expect(err).NotTo(HaveOccurred())
volumePath, err = createVSphereVolume(vsp, nil)
if err != nil {
@ -134,7 +133,7 @@ func testSetupVSpherePVClabelselector(c clientset.Interface, ns string, ssdlabel
func testCleanupVSpherePVClabelselector(c clientset.Interface, ns string, volumePath string, pv_ssd *v1.PersistentVolume, pvc_ssd *v1.PersistentVolumeClaim, pvc_vvol *v1.PersistentVolumeClaim) {
By("running testCleanupVSpherePVClabelselector")
if len(volumePath) > 0 {
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(c)
Expect(err).NotTo(HaveOccurred())
vsp.DeleteVolume(volumePath)
}

View File

@ -53,7 +53,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -507,7 +506,11 @@ var _ = SIGDescribe("Volumes", func() {
Prefix: "vsphere",
}
By("creating a test vsphere volume")
vsp, err := vsphere.GetVSphere()
c, err := framework.LoadClientset()
if err != nil {
return
}
vsp, err := getVSphere(c)
Expect(err).NotTo(HaveOccurred())
volumePath, err = createVSphereVolume(vsp, nil)

View File

@ -150,7 +150,7 @@ var _ = SIGDescribe("vcp at scale [Feature:vsphere] ", func() {
scArrays[index] = sc
}
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
volumeCountPerInstance := volumeCount / numberOfInstances
@ -176,7 +176,7 @@ var _ = SIGDescribe("vcp at scale [Feature:vsphere] ", func() {
Expect(err).NotTo(HaveOccurred())
}
By("Waiting for volumes to be detached from the node")
err = waitForVSphereDisksToDetach(vsp, nodeVolumeMap)
err = waitForVSphereDisksToDetach(client, vsp, nodeVolumeMap)
Expect(err).NotTo(HaveOccurred())
for _, pvcClaim := range pvcClaimList {
@ -228,7 +228,7 @@ func VolumeCreateAndAttach(client clientset.Interface, namespace string, sc []*s
nodeVolumeMap[pod.Spec.NodeName] = append(nodeVolumeMap[pod.Spec.NodeName], pv.Spec.VsphereVolume.VolumePath)
}
By("Verify the volume is accessible and available in the pod")
verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp)
verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp)
nodeSelectorIndex++
}
nodeVolumeMapChan <- nodeVolumeMap

View File

@ -24,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -104,7 +103,7 @@ var _ = SIGDescribe("vsphere statefulset", func() {
Expect(scaledownErr).NotTo(HaveOccurred())
statefulsetTester.WaitForStatusReadyReplicas(statefulset, replicas-1)
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
// After scale down, verify vsphere volumes are detached from deleted pods
@ -117,7 +116,7 @@ var _ = SIGDescribe("vsphere statefulset", func() {
if volumespec.PersistentVolumeClaim != nil {
vSpherediskPath := getvSphereVolumePathFromClaim(client, statefulset.Namespace, volumespec.PersistentVolumeClaim.ClaimName)
framework.Logf("Waiting for Volume: %q to detach from Node: %q", vSpherediskPath, sspod.Spec.NodeName)
Expect(waitForVSphereDiskToDetach(vsp, vSpherediskPath, types.NodeName(sspod.Spec.NodeName))).NotTo(HaveOccurred())
Expect(waitForVSphereDiskToDetach(client, vsp, vSpherediskPath, types.NodeName(sspod.Spec.NodeName))).NotTo(HaveOccurred())
}
}
}
@ -146,7 +145,7 @@ var _ = SIGDescribe("vsphere statefulset", func() {
framework.Logf("Verify Volume: %q is attached to the Node: %q", vSpherediskPath, sspod.Spec.NodeName)
// Verify scale up has re-attached the same volumes and not introduced new volume
Expect(volumesBeforeScaleDown[vSpherediskPath] == "").To(BeFalse())
isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(vsp, vSpherediskPath, types.NodeName(sspod.Spec.NodeName))
isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(client, vsp, vSpherediskPath, types.NodeName(sspod.Spec.NodeName))
Expect(isVolumeAttached).To(BeTrue())
Expect(verifyDiskAttachedError).NotTo(HaveOccurred())
}

View File

@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/types"
k8stype "k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -135,9 +134,8 @@ var _ = SIGDescribe("vsphere cloud provider stress [Feature:vsphere]", func() {
func PerformVolumeLifeCycleInParallel(f *framework.Framework, client clientset.Interface, namespace string, instanceId string, sc *storageV1.StorageClass, iterations int, wg *sync.WaitGroup) {
defer wg.Done()
defer GinkgoRecover()
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(f.ClientSet)
Expect(err).NotTo(HaveOccurred())
for iterationCount := 0; iterationCount < iterations; iterationCount++ {
logPrefix := fmt.Sprintf("Instance: [%v], Iteration: [%v] :", instanceId, iterationCount+1)
By(fmt.Sprintf("%v Creating PVC using the Storage Class: %v", logPrefix, sc.Name))
@ -164,19 +162,19 @@ func PerformVolumeLifeCycleInParallel(f *framework.Framework, client clientset.I
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("%v Verifing the volume: %v is attached to the node VM: %v", logPrefix, persistentvolumes[0].Spec.VsphereVolume.VolumePath, pod.Spec.NodeName))
isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, types.NodeName(pod.Spec.NodeName))
isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(client, vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, types.NodeName(pod.Spec.NodeName))
Expect(isVolumeAttached).To(BeTrue())
Expect(verifyDiskAttachedError).NotTo(HaveOccurred())
By(fmt.Sprintf("%v Verifing the volume: %v is accessible in the pod: %v", logPrefix, persistentvolumes[0].Spec.VsphereVolume.VolumePath, pod.Name))
verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp)
verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp)
By(fmt.Sprintf("%v Deleting pod: %v", logPrefix, pod.Name))
err = framework.DeletePodWithWait(f, client, pod)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("%v Waiting for volume: %v to be detached from the node: %v", logPrefix, persistentvolumes[0].Spec.VsphereVolume.VolumePath, pod.Spec.NodeName))
err = waitForVSphereDiskToDetach(vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName))
err = waitForVSphereDiskToDetach(client, vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName))
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("%v Deleting the Claim: %v", logPrefix, pvclaim.Name))

View File

@ -55,13 +55,13 @@ const (
)
// Sanity check for vSphere testing. Verify the persistent disk attached to the node.
func verifyVSphereDiskAttached(vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) (bool, error) {
func verifyVSphereDiskAttached(c clientset.Interface, vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) (bool, error) {
var (
isAttached bool
err error
)
if vsp == nil {
vsp, err = vsphere.GetVSphere()
vsp, err = getVSphere(c)
Expect(err).NotTo(HaveOccurred())
}
isAttached, err = vsp.DiskIsAttached(volumePath, nodeName)
@ -70,7 +70,7 @@ func verifyVSphereDiskAttached(vsp *vsphere.VSphere, volumePath string, nodeName
}
// Wait until vsphere volumes are detached from the list of nodes or time out after 5 minutes
func waitForVSphereDisksToDetach(vsp *vsphere.VSphere, nodeVolumes map[k8stype.NodeName][]string) error {
func waitForVSphereDisksToDetach(c clientset.Interface, vsp *vsphere.VSphere, nodeVolumes map[k8stype.NodeName][]string) error {
var (
err error
disksAttached = true
@ -78,7 +78,7 @@ func waitForVSphereDisksToDetach(vsp *vsphere.VSphere, nodeVolumes map[k8stype.N
detachPollTime = 10 * time.Second
)
if vsp == nil {
vsp, err = vsphere.GetVSphere()
vsp, err = getVSphere(c)
if err != nil {
return err
}
@ -110,7 +110,7 @@ func waitForVSphereDisksToDetach(vsp *vsphere.VSphere, nodeVolumes map[k8stype.N
}
// Wait until vsphere vmdk moves to expected state on the given node, or time out after 6 minutes
func waitForVSphereDiskStatus(vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName, expectedState volumeState) error {
func waitForVSphereDiskStatus(c clientset.Interface, vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName, expectedState volumeState) error {
var (
err error
diskAttached bool
@ -130,7 +130,7 @@ func waitForVSphereDiskStatus(vsp *vsphere.VSphere, volumePath string, nodeName
}
err = wait.Poll(pollTime, timeout, func() (bool, error) {
diskAttached, err = verifyVSphereDiskAttached(vsp, volumePath, nodeName)
diskAttached, err = verifyVSphereDiskAttached(c, vsp, volumePath, nodeName)
if err != nil {
return true, err
}
@ -154,13 +154,13 @@ func waitForVSphereDiskStatus(vsp *vsphere.VSphere, volumePath string, nodeName
}
// Wait until vsphere vmdk is attached from the given node or time out after 6 minutes
func waitForVSphereDiskToAttach(vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) error {
return waitForVSphereDiskStatus(vsp, volumePath, nodeName, volumeStateAttached)
func waitForVSphereDiskToAttach(c clientset.Interface, vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) error {
return waitForVSphereDiskStatus(c, vsp, volumePath, nodeName, volumeStateAttached)
}
// Wait until vsphere vmdk is detached from the given node or time out after 6 minutes
func waitForVSphereDiskToDetach(vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) error {
return waitForVSphereDiskStatus(vsp, volumePath, nodeName, volumeStateDetached)
func waitForVSphereDiskToDetach(c clientset.Interface, vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) error {
return waitForVSphereDiskStatus(c, vsp, volumePath, nodeName, volumeStateDetached)
}
// function to create vsphere volume spec with given VMDK volume path, Reclaim Policy and labels
@ -414,12 +414,12 @@ func createEmptyFilesOnVSphereVolume(namespace string, podName string, filePaths
}
// verify volumes are attached to the node and are accessible in pod
func verifyVSphereVolumesAccessible(pod *v1.Pod, persistentvolumes []*v1.PersistentVolume, vsp *vsphere.VSphere) {
func verifyVSphereVolumesAccessible(c clientset.Interface, pod *v1.Pod, persistentvolumes []*v1.PersistentVolume, vsp *vsphere.VSphere) {
nodeName := pod.Spec.NodeName
namespace := pod.Namespace
for index, pv := range persistentvolumes {
// Verify disks are attached to the node
isAttached, err := verifyVSphereDiskAttached(vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(nodeName))
isAttached, err := verifyVSphereDiskAttached(c, vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(nodeName))
Expect(err).NotTo(HaveOccurred())
Expect(isAttached).To(BeTrue(), fmt.Sprintf("disk %v is not attached with the node", pv.Spec.VsphereVolume.VolumePath))
// Verify Volumes are accessible
@ -437,3 +437,23 @@ func getvSphereVolumePathFromClaim(client clientset.Interface, namespace string,
Expect(err).NotTo(HaveOccurred())
return pv.Spec.VsphereVolume.VolumePath
}
func addNodesToVCP(vsp *vsphere.VSphere, c clientset.Interface) error {
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return err
}
for _, node := range nodes.Items {
vsp.NodeAdded(&node)
}
return nil
}
func getVSphere(c clientset.Interface) (*vsphere.VSphere, error) {
vsp, err := vsphere.GetVSphere()
if err != nil {
return nil, err
}
addNodesToVCP(vsp, c)
return vsp, nil
}

View File

@ -25,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -69,7 +68,7 @@ var _ = SIGDescribe("Volume Provisioning On Clustered Datastore [Feature:vsphere
It("verify static provisioning on clustered datastore", func() {
var volumePath string
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
By("creating a test vsphere volume")
@ -100,7 +99,7 @@ var _ = SIGDescribe("Volume Provisioning On Clustered Datastore [Feature:vsphere
nodeName := types.NodeName(pod.Spec.NodeName)
By("Verifying volume is attached")
isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, nodeName)
isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePath, nodeName)
Expect(err).NotTo(HaveOccurred())
Expect(isAttached).To(BeTrue(), fmt.Sprintf("disk: %s is not attached with the node: %v", volumePath, nodeName))
@ -109,7 +108,7 @@ var _ = SIGDescribe("Volume Provisioning On Clustered Datastore [Feature:vsphere
Expect(err).NotTo(HaveOccurred())
By("Waiting for volumes to be detached from the node")
err = waitForVSphereDiskToDetach(vsp, volumePath, nodeName)
err = waitForVSphereDiskToDetach(client, vsp, volumePath, nodeName)
Expect(err).NotTo(HaveOccurred())
})

View File

@ -68,7 +68,7 @@ var _ = SIGDescribe("Volume Provisioning on Datastore [Feature:vsphere]", func()
scParameters[DiskFormat] = ThinDisk
err := invokeInvalidDatastoreTestNeg(client, namespace, scParameters)
Expect(err).To(HaveOccurred())
errorMsg := `Failed to provision volume with StorageClass \"` + DatastoreSCName + `\": datastore '` + InvalidDatastore + `' not found`
errorMsg := `Failed to provision volume with StorageClass \"` + DatastoreSCName + `\": The specified datastore ` + InvalidDatastore + ` is not a shared datastore across node VMs`
if !strings.Contains(err.Error(), errorMsg) {
Expect(err).NotTo(HaveOccurred(), errorMsg)
}

View File

@ -145,9 +145,9 @@ func invokeTest(f *framework.Framework, client clientset.Interface, namespace st
pod, err := client.CoreV1().Pods(namespace).Create(podSpec)
Expect(err).NotTo(HaveOccurred())
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
verifyVSphereDiskAttached(vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(nodeName))
verifyVSphereDiskAttached(client, vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(nodeName))
By("Waiting for pod to be running")
Expect(framework.WaitForPodNameRunningInNamespace(client, pod.Name, namespace)).To(Succeed())

View File

@ -97,7 +97,7 @@ func invokeTestForFstype(f *framework.Framework, client clientset.Interface, nam
framework.Logf("Invoking Test for fstype: %s", fstype)
scParameters := make(map[string]string)
scParameters["fstype"] = fstype
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
// Create Persistent Volume
@ -117,7 +117,7 @@ func invokeTestForFstype(f *framework.Framework, client clientset.Interface, nam
func invokeTestForInvalidFstype(f *framework.Framework, client clientset.Interface, namespace string, fstype string) {
scParameters := make(map[string]string)
scParameters["fstype"] = fstype
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
// Create Persistent Volume
@ -170,12 +170,12 @@ func createPodAndVerifyVolumeAccessible(client clientset.Interface, namespace st
pvclaims = append(pvclaims, pvclaim)
By("Creating pod to attach PV to the node")
// Create pod to attach Volume to Node
pod, err := framework.CreatePod(client, namespace, nil, pvclaims, false, "")
pod, err := framework.CreatePod(client, namespace, nil, pvclaims, false, ExecCommand)
Expect(err).NotTo(HaveOccurred())
// Asserts: Right disk is attached to the pod
By("Verify the volume is accessible and available in the pod")
verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp)
verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp)
return pod
}
@ -184,7 +184,7 @@ func detachVolume(f *framework.Framework, client clientset.Interface, vsp *vsphe
framework.DeletePodWithWait(f, client, pod)
By("Waiting for volumes to be detached from the node")
waitForVSphereDiskToDetach(vsp, volPath, k8stype.NodeName(pod.Spec.NodeName))
waitForVSphereDiskToDetach(client, vsp, volPath, k8stype.NodeName(pod.Spec.NodeName))
}
func deleteVolume(client clientset.Interface, pvclaimName string, namespace string) {

View File

@ -27,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -79,7 +78,7 @@ var _ = SIGDescribe("Volume Attach Verify [Feature:vsphere][Serial][Disruptive]"
})
It("verify volume remains attached after master kubelet restart", func() {
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
// Create pod on each node
@ -106,7 +105,7 @@ var _ = SIGDescribe("Volume Attach Verify [Feature:vsphere][Serial][Disruptive]"
nodeName := types.NodeName(pod.Spec.NodeName)
By(fmt.Sprintf("Verify volume %s is attached to the pod %v", volumePath, nodeName))
isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, types.NodeName(nodeName))
isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePath, types.NodeName(nodeName))
Expect(err).NotTo(HaveOccurred())
Expect(isAttached).To(BeTrue(), fmt.Sprintf("disk: %s is not attached with the node", volumePath))
@ -126,7 +125,7 @@ var _ = SIGDescribe("Volume Attach Verify [Feature:vsphere][Serial][Disruptive]"
nodeName := types.NodeName(pod.Spec.NodeName)
By(fmt.Sprintf("After master restart, verify volume %v is attached to the pod %v", volumePath, nodeName))
isAttached, err := verifyVSphereDiskAttached(vsp, volumePaths[i], types.NodeName(nodeName))
isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePaths[i], types.NodeName(nodeName))
Expect(err).NotTo(HaveOccurred())
Expect(isAttached).To(BeTrue(), fmt.Sprintf("disk: %s is not attached with the node", volumePath))
@ -135,7 +134,7 @@ var _ = SIGDescribe("Volume Attach Verify [Feature:vsphere][Serial][Disruptive]"
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Waiting for volume %s to be detached from the node %v", volumePath, nodeName))
err = waitForVSphereDiskToDetach(vsp, volumePath, types.NodeName(nodeName))
err = waitForVSphereDiskToDetach(client, vsp, volumePath, types.NodeName(nodeName))
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Deleting volume %s", volumePath))

View File

@ -61,7 +61,7 @@ var _ = SIGDescribe("Node Poweroff [Feature:vsphere] [Slow] [Disruptive]", func(
nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
Expect(nodeList.Items).NotTo(BeEmpty(), "Unable to find ready and schedulable Node")
Expect(len(nodeList.Items) > 1).To(BeTrue(), "At least 2 nodes are required for this test")
vsp, err = vsphere.GetVSphere()
vsp, err = getVSphere(client)
Expect(err).NotTo(HaveOccurred())
workingDir = os.Getenv("VSPHERE_WORKING_DIR")
Expect(workingDir).NotTo(BeEmpty())
@ -112,7 +112,7 @@ var _ = SIGDescribe("Node Poweroff [Feature:vsphere] [Slow] [Disruptive]", func(
node1 := types.NodeName(pod.Spec.NodeName)
By(fmt.Sprintf("Verify disk is attached to the node: %v", node1))
isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, node1)
isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePath, node1)
Expect(err).NotTo(HaveOccurred())
Expect(isAttached).To(BeTrue(), "Disk is not attached to the node")
@ -139,11 +139,11 @@ var _ = SIGDescribe("Node Poweroff [Feature:vsphere] [Slow] [Disruptive]", func(
Expect(err).NotTo(HaveOccurred(), "Pod did not fail over to a different node")
By(fmt.Sprintf("Waiting for disk to be attached to the new node: %v", node2))
err = waitForVSphereDiskToAttach(vsp, volumePath, node2)
err = waitForVSphereDiskToAttach(client, vsp, volumePath, node2)
Expect(err).NotTo(HaveOccurred(), "Disk is not attached to the node")
By(fmt.Sprintf("Waiting for disk to be detached from the previous node: %v", node1))
err = waitForVSphereDiskToDetach(vsp, volumePath, node1)
err = waitForVSphereDiskToDetach(client, vsp, volumePath, node1)
Expect(err).NotTo(HaveOccurred(), "Disk is not detached from the node")
By(fmt.Sprintf("Power on the previous node: %v", node1))

View File

@ -75,7 +75,7 @@ var _ = SIGDescribe("Volume Operations Storm [Feature:vsphere]", func() {
volume_ops_scale = DEFAULT_VOLUME_OPS_SCALE
}
pvclaims = make([]*v1.PersistentVolumeClaim, volume_ops_scale)
vsp, err = vsphere.GetVSphere()
vsp, err = getVSphere(client)
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
@ -113,14 +113,14 @@ var _ = SIGDescribe("Volume Operations Storm [Feature:vsphere]", func() {
Expect(err).NotTo(HaveOccurred())
By("Verify all volumes are accessible and available in the pod")
verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp)
verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp)
By("Deleting pod")
framework.ExpectNoError(framework.DeletePodWithWait(f, client, pod))
By("Waiting for volumes to be detached from the node")
for _, pv := range persistentvolumes {
waitForVSphereDiskToDetach(vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName))
waitForVSphereDiskToDetach(client, vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName))
}
})
})

View File

@ -28,7 +28,6 @@ import (
storageV1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -214,11 +213,11 @@ func invokeVolumeLifeCyclePerformance(f *framework.Framework, client clientset.I
latency[AttachOp] = elapsed.Seconds()
// Verify access to the volumes
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
for i, pod := range totalpods {
verifyVSphereVolumesAccessible(pod, totalpvs[i], vsp)
verifyVSphereVolumesAccessible(client, pod, totalpvs[i], vsp)
}
By("Deleting pods")
@ -237,7 +236,7 @@ func invokeVolumeLifeCyclePerformance(f *framework.Framework, client clientset.I
}
}
err = waitForVSphereDisksToDetach(vsp, nodeVolumeMap)
err = waitForVSphereDisksToDetach(client, vsp, nodeVolumeMap)
Expect(err).NotTo(HaveOccurred())
By("Deleting the PVCs")

View File

@ -57,7 +57,7 @@ var _ = SIGDescribe("Volume Placement", func() {
isNodeLabeled = true
}
By("creating vmdk")
vsp, err = vsphere.GetVSphere()
vsp, err = getVSphere(c)
Expect(err).NotTo(HaveOccurred())
volumePath, err := createVSphereVolume(vsp, nil)
Expect(err).NotTo(HaveOccurred())
@ -285,7 +285,7 @@ var _ = SIGDescribe("Volume Placement", func() {
framework.ExpectNoError(framework.DeletePodWithWait(f, c, podB), "defer: Failed to delete pod ", podB.Name)
By(fmt.Sprintf("wait for volumes to be detached from the node: %v", node1Name))
for _, volumePath := range volumePaths {
framework.ExpectNoError(waitForVSphereDiskToDetach(vsp, volumePath, types.NodeName(node1Name)))
framework.ExpectNoError(waitForVSphereDiskToDetach(c, vsp, volumePath, types.NodeName(node1Name)))
}
}()
@ -362,7 +362,7 @@ func createPodWithVolumeAndNodeSelector(client clientset.Interface, namespace st
By(fmt.Sprintf("Verify volume is attached to the node:%v", nodeName))
for _, volumePath := range volumePaths {
isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, types.NodeName(nodeName))
isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePath, types.NodeName(nodeName))
Expect(err).NotTo(HaveOccurred())
Expect(isAttached).To(BeTrue(), "disk:"+volumePath+" is not attached with the node")
}
@ -385,6 +385,6 @@ func deletePodAndWaitForVolumeToDetach(f *framework.Framework, c clientset.Inter
By("Waiting for volume to be detached from the node")
for _, volumePath := range volumePaths {
framework.ExpectNoError(waitForVSphereDiskToDetach(vsp, volumePath, types.NodeName(nodeName)))
framework.ExpectNoError(waitForVSphereDiskToDetach(c, vsp, volumePath, types.NodeName(nodeName)))
}
}

View File

@ -295,16 +295,16 @@ func invokeValidPolicyTest(f *framework.Framework, client clientset.Interface, n
pod, err := framework.CreatePod(client, namespace, nil, pvclaims, false, "")
Expect(err).NotTo(HaveOccurred())
vsp, err := vsphere.GetVSphere()
vsp, err := getVSphere(client)
Expect(err).NotTo(HaveOccurred())
By("Verify the volume is accessible and available in the pod")
verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp)
verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp)
By("Deleting pod")
framework.DeletePodWithWait(f, client, pod)
By("Waiting for volumes to be detached from the node")
waitForVSphereDiskToDetach(vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName))
waitForVSphereDiskToDetach(client, vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName))
}
func invokeInvalidPolicyTestNeg(client clientset.Interface, namespace string, scParameters map[string]string) error {