Merge pull request #30535 from abrarshivani/vsphere_attach_detach_interface

Automatic merge from submit-queue

Implements Attacher Plugin Interface for vSphere

This PR does the following,

Fixes #29028 (vsphere volume should implement attacher interface):  Implements Attacher Plugin Interface for vSphere. 
See file: 
pkg/volume/vsphere_volume/vsphere_volume.go. - Removed attach and detach calls from SetupAt and TearDownAt.
pkg/volume/vsphere_volume/attacher.go. - Implements Attacher & Detacher Plugin Interface for vSphere. (Ref :- GCE_PD & AWS attacher.go)
pkg/cloudproviders/provider/vsphere.go - Added DiskIsAttach method.

The vSphere plugin code needs clean up. (ex: The code for getting vSphere instance is repeated in file pkg/cloudprovider/providers/vsphere.go). I will fix this in next PR.
pull/6/head
Kubernetes Submit Queue 2016-08-23 05:13:12 -07:00 committed by GitHub
commit c5d56ea356
7 changed files with 792 additions and 247 deletions

View File

@ -59,6 +59,7 @@ func ProbeAttachableVolumePlugins(config componentconfig.VolumeConfiguration) []
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(config.FlexVolumePluginDir)...)
allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...)
return allPlugins
}

View File

@ -66,6 +66,7 @@ var supportedSCSIControllerType = []string{"lsilogic-sas", "pvscsi"}
var ErrNoDiskUUIDFound = errors.New("No disk UUID found")
var ErrNoDiskIDFound = errors.New("No vSphere disk ID found")
var ErrNoDevicesFound = errors.New("No devices found")
var ErrNonSupportedControllerType = errors.New("Disk is attached to non-supported controller type")
// VSphere is an implementation of cloud provider Interface for VSphere.
type VSphere struct {
@ -104,6 +105,27 @@ type VSphereConfig struct {
}
}
type Volumes interface {
// AttachDisk attaches given disk to given node. Current node
// is used when nodeName is empty string.
AttachDisk(vmDiskPath string, nodeName string) (diskID string, diskUUID string, err error)
// DetachDisk detaches given disk to given node. Current node
// is used when nodeName is empty string.
// Assumption: If node doesn't exist, disk is already detached from node.
DetachDisk(volPath string, nodeName string) error
// DiskIsAttached checks if a disk is attached to the given node.
// Assumption: If node doesn't exist, disk is not attached to the node.
DiskIsAttached(volPath, nodeName string) (bool, error)
// CreateVolume creates a new vmdk with specified parameters.
CreateVolume(name string, size int, tags *map[string]string) (volumePath string, err error)
// DeleteVolume deletes vmdk.
DeleteVolume(vmDiskPath string) error
}
// Parses vSphere cloud config file and stores it into VSphereConfig.
func readConfig(config io.Reader) (VSphereConfig, error) {
if config == nil {
@ -582,11 +604,18 @@ func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName string) (diskID string
}
// Get VM device list
vm, vmDevices, ds, _, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance)
vm, vmDevices, ds, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance)
if err != nil {
return "", "", err
}
attached, _ := checkDiskAttached(vmDiskPath, vmDevices, dc, c)
if attached {
diskID, _ = getVirtualDiskID(vmDiskPath, vmDevices, dc, c)
diskUUID, _ = getVirtualDiskUUIDByPath(vmDiskPath, dc, c)
return diskID, diskUUID, nil
}
var diskControllerType = vs.cfg.Disk.SCSIControllerType
// find SCSI controller of particular type from VM devices
allSCSIControllers := getSCSIControllers(vmDevices)
@ -768,6 +797,107 @@ func getAvailableSCSIController(scsiControllers []*types.VirtualController) *typ
return nil
}
// DiskIsAttached returns if disk is attached to the VM using controllers supported by the plugin.
func (vs *VSphere) DiskIsAttached(volPath string, nodeName string) (bool, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create vSphere client
c, err := vsphereLogin(vs.cfg, ctx)
if err != nil {
glog.Errorf("Failed to create vSphere client. err: %s", err)
return false, err
}
defer c.Logout(ctx)
// Find virtual machine to attach disk to
var vSphereInstance string
if nodeName == "" {
vSphereInstance = vs.localInstanceID
} else {
vSphereInstance = nodeName
}
nodeExist, err := vs.NodeExists(c, vSphereInstance)
if err != nil {
glog.Errorf("Failed to check whether node exist. err: %s.", err)
return false, err
}
if !nodeExist {
glog.Warningf(
"Node %q does not exist. DiskIsAttached will assume vmdk %q is not attached to it.",
vSphereInstance,
volPath)
return false, nil
}
// Get VM device list
_, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance)
if err != nil {
glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err)
return false, err
}
attached, err := checkDiskAttached(volPath, vmDevices, dc, c)
return attached, err
}
func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (bool, error) {
virtualDiskControllerKey, err := getVirtualDiskControllerKey(volPath, vmdevices, dc, client)
if err != nil {
if err == ErrNoDevicesFound {
return false, nil
}
glog.Errorf("Failed to check whether disk is attached. err: %s", err)
return false, err
}
for _, controllerType := range supportedSCSIControllerType {
controllerkey, _ := getControllerKey(controllerType, vmdevices, dc, client)
if controllerkey == virtualDiskControllerKey {
return true, nil
}
}
return false, ErrNonSupportedControllerType
}
// Returns the object key that denotes the controller object to which vmdk is attached.
func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) {
volumeUUID, err := getVirtualDiskUUIDByPath(volPath, dc, client)
if err != nil {
glog.Errorf("disk uuid not found for %v. err: %s", volPath, err)
return -1, err
}
// filter vm devices to retrieve disk ID for the given vmdk file
for _, device := range vmDevices {
if vmDevices.TypeName(device) == "VirtualDisk" {
diskUUID, _ := getVirtualDiskUUID(device)
if diskUUID == volumeUUID {
return device.GetVirtualDevice().ControllerKey, nil
}
}
}
return -1, ErrNoDevicesFound
}
// Returns key of the controller.
// Key is unique id that distinguishes one device from other devices in the same virtual machine.
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) {
for _, device := range vmDevices {
devType := vmDevices.Type(device)
if devType == scsiType {
if c, ok := device.(types.BaseVirtualController); ok {
return c.GetVirtualController().Key, nil
}
}
}
return -1, ErrNoDevicesFound
}
// Returns formatted UUID for a virtual disk device.
func getVirtualDiskUUID(newDevice types.BaseVirtualDevice) (string, error) {
vd := newDevice.GetVirtualDevice()
@ -859,6 +989,21 @@ func (vs *VSphere) DetachDisk(volPath string, nodeName string) error {
vSphereInstance = nodeName
}
nodeExist, err := vs.NodeExists(c, vSphereInstance)
if err != nil {
glog.Errorf("Failed to check whether node exist. err: %s.", err)
return err
}
if !nodeExist {
glog.Warningf(
"Node %q does not exist. DetachDisk will assume vmdk %q is not attached to it.",
vSphereInstance,
volPath)
return nil
}
vm, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance)
if err != nil {
return err
@ -962,3 +1107,43 @@ func (vs *VSphere) DeleteVolume(vmDiskPath string) error {
return task.Wait(ctx)
}
// NodeExists checks if the node with given nodeName exist.
// Returns false if VM doesn't exist or VM is in powerOff state.
func (vs *VSphere) NodeExists(c *govmomi.Client, nodeName string) (bool, error) {
if nodeName == "" {
return false, nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vm, err := getVirtualMachineByName(vs.cfg, ctx, c, nodeName)
if err != nil {
if _, ok := err.(*find.NotFoundError); ok {
return false, nil
}
glog.Errorf("Failed to get virtual machine object for node %+q. err %s", nodeName, err)
return false, err
}
var mvm mo.VirtualMachine
err = getVirtualMachineManagedObjectReference(ctx, c, vm, "summary", &mvm)
if err != nil {
glog.Errorf("Failed to get virtual machine object reference for node %+q. err %s", nodeName, err)
return false, err
}
if mvm.Summary.Runtime.PowerState == ActivePowerState {
return true, nil
}
if mvm.Summary.Config.Template == false {
glog.Warningf("VM %s, is not in %s state", nodeName, ActivePowerState)
} else {
glog.Warningf("VM %s, is a template", nodeName)
}
return false, nil
}

View File

@ -0,0 +1,243 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vsphere_volume
import (
"fmt"
"os"
"path"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/keymutex"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
type vsphereVMDKAttacher struct {
host volume.VolumeHost
vsphereVolumes vsphere.Volumes
}
var _ volume.Attacher = &vsphereVMDKAttacher{}
var _ volume.AttachableVolumePlugin = &vsphereVolumePlugin{}
// Singleton key mutex for keeping attach operations for the same host atomic
var attachdetachMutex = keymutex.NewKeyMutex()
func (plugin *vsphereVolumePlugin) NewAttacher() (volume.Attacher, error) {
vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
if err != nil {
return nil, err
}
return &vsphereVMDKAttacher{
host: plugin.host,
vsphereVolumes: vsphereCloud,
}, nil
}
// Attaches the volume specified by the given spec to the given host.
// On success, returns the device path where the device was attached on the
// node.
// Callers are responsible for retryinging on failure.
// Callers are responsible for thread safety between concurrent attach and
// detach operations.
func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, hostName string) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
glog.V(4).Infof("vSphere: Attach disk called for host %s", hostName)
// Keeps concurrent attach operations to same host atomic
attachdetachMutex.LockKey(hostName)
defer attachdetachMutex.UnlockKey(hostName)
// vsphereCloud.AttachDisk checks if disk is already attached to host and
// succeeds in that case, so no need to do that separately.
_, diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, hostName)
if err != nil {
glog.Errorf("Error attaching volume %q: %+v", volumeSource.VolumePath, err)
return "", err
}
return path.Join(diskByIDPath, diskSCSIPrefix+diskUUID), nil
}
func (attacher *vsphereVMDKAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
if devicePath == "" {
return "", fmt.Errorf("WaitForAttach failed for VMDK %q: devicePath is empty.", volumeSource.VolumePath)
}
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ticker.C:
glog.V(5).Infof("Checking VMDK %q is attached", volumeSource.VolumePath)
path, err := verifyDevicePath(devicePath)
if err != nil {
// Log error, if any, and continue checking periodically. See issue #11321
glog.Warningf("Error verifying VMDK (%q) is attached: %v", volumeSource.VolumePath, err)
} else if path != "" {
// A device path has successfully been created for the VMDK
glog.Infof("Successfully found attached VMDK %q.", volumeSource.VolumePath)
return path, nil
}
case <-timer.C:
return "", fmt.Errorf("Could not find attached VMDK %q. Timeout waiting for mount paths to be created.", volumeSource.VolumePath)
}
}
}
// GetDeviceMountPath returns a path where the device should
// point which should be bind mounted for individual volumes.
func (attacher *vsphereVMDKAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
return makeGlobalPDPath(attacher.host, volumeSource.VolumePath), nil
}
// GetMountDeviceRefs finds all other references to the device referenced
// by deviceMountPath; returns a list of paths.
func (plugin *vsphereVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter()
return mount.GetMountRefs(mounter, deviceMountPath)
}
// MountDevice mounts device to global mount point.
func (attacher *vsphereVMDKAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
mounter := attacher.host.GetMounter()
notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
glog.Errorf("Failed to create directory at %#v. err: %s", deviceMountPath, err)
return err
}
notMnt = true
} else {
return err
}
}
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return err
}
options := []string{}
if notMnt {
diskMounter := &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()}
err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, options)
if err != nil {
os.Remove(deviceMountPath)
return err
}
glog.V(4).Infof("formatting spec %v devicePath %v deviceMountPath %v fs %v with options %+v", spec.Name(), devicePath, deviceMountPath, volumeSource.FSType, options)
}
return nil
}
type vsphereVMDKDetacher struct {
mounter mount.Interface
vsphereVolumes vsphere.Volumes
}
var _ volume.Detacher = &vsphereVMDKDetacher{}
func (plugin *vsphereVolumePlugin) NewDetacher() (volume.Detacher, error) {
vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
if err != nil {
return nil, err
}
return &vsphereVMDKDetacher{
mounter: plugin.host.GetMounter(),
vsphereVolumes: vsphereCloud,
}, nil
}
// Detach the given device from the given host.
func (detacher *vsphereVMDKDetacher) Detach(deviceMountPath string, hostName string) error {
volPath := getVolPathfromDeviceMountPath(deviceMountPath)
attached, err := detacher.vsphereVolumes.DiskIsAttached(volPath, hostName)
if err != nil {
// Log error and continue with detach
glog.Errorf(
"Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
volPath, hostName, err)
}
if err == nil && !attached {
// Volume is already detached from node.
glog.Infof("detach operation was successful. volume %q is already detached from node %q.", volPath, hostName)
return nil
}
attachdetachMutex.LockKey(hostName)
defer attachdetachMutex.UnlockKey(hostName)
if err := detacher.vsphereVolumes.DetachDisk(volPath, hostName); err != nil {
glog.Errorf("Error detaching volume %q: %v", volPath, err)
return err
}
return nil
}
func (detacher *vsphereVMDKDetacher) WaitForDetach(devicePath string, timeout time.Duration) error {
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ticker.C:
glog.V(5).Infof("Checking device %q is detached.", devicePath)
if pathExists, err := volumeutil.PathExists(devicePath); err != nil {
return fmt.Errorf("Error checking if device path exists: %v", err)
} else if !pathExists {
return nil
}
case <-timer.C:
return fmt.Errorf("Timeout reached; Device %v is still attached", devicePath)
}
}
}
func (detacher *vsphereVMDKDetacher) UnmountDevice(deviceMountPath string) error {
return volumeutil.UnmountPath(deviceMountPath, detacher.mounter)
}

View File

@ -0,0 +1,314 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vsphere_volume
import (
"errors"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"github.com/golang/glog"
)
func TestGetDeviceName_Volume(t *testing.T) {
plugin := newPlugin()
volPath := "[local] volumes/test"
spec := createVolSpec(volPath)
deviceName, err := plugin.GetVolumeName(spec)
if err != nil {
t.Errorf("GetDeviceName error: %v", err)
}
if deviceName != volPath {
t.Errorf("GetDeviceName error: expected %s, got %s", volPath, deviceName)
}
}
func TestGetDeviceName_PersistentVolume(t *testing.T) {
plugin := newPlugin()
volPath := "[local] volumes/test"
spec := createPVSpec(volPath)
deviceName, err := plugin.GetVolumeName(spec)
if err != nil {
t.Errorf("GetDeviceName error: %v", err)
}
if deviceName != volPath {
t.Errorf("GetDeviceName error: expected %s, got %s", volPath, deviceName)
}
}
// One testcase for TestAttachDetach table test below
type testcase struct {
name string
// For fake vSphere:
attach attachCall
detach detachCall
diskIsAttached diskIsAttachedCall
t *testing.T
// Actual test to run
test func(test *testcase) (string, error)
// Expected return of the test
expectedDevice string
expectedError error
}
func TestAttachDetach(t *testing.T) {
uuid := "00000000000000"
diskName := "[local] volumes/test"
hostName := "host"
spec := createVolSpec(diskName)
attachError := errors.New("Fake attach error")
detachError := errors.New("Fake detach error")
diskCheckError := errors.New("Fake DiskIsAttached error")
tests := []testcase{
// Successful Attach call
{
name: "Attach_Positive",
attach: attachCall{diskName, hostName, uuid, nil},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, hostName)
},
expectedDevice: "/dev/disk/by-id/wwn-0x" + uuid,
},
// Attach call fails
{
name: "Attach_Negative",
attach: attachCall{diskName, hostName, "", attachError},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, hostName)
},
expectedError: attachError,
},
// Detach succeeds
{
name: "Detach_Positive",
diskIsAttached: diskIsAttachedCall{diskName, hostName, true, nil},
detach: detachCall{diskName, hostName, nil},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(diskName, hostName)
},
},
// Disk is already detached
{
name: "Detach_Positive_AlreadyDetached",
diskIsAttached: diskIsAttachedCall{diskName, hostName, false, nil},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(diskName, hostName)
},
},
// Detach succeeds when DiskIsAttached fails
{
name: "Detach_Positive_CheckFails",
diskIsAttached: diskIsAttachedCall{diskName, hostName, false, diskCheckError},
detach: detachCall{diskName, hostName, nil},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(diskName, hostName)
},
},
// Detach fails
{
name: "Detach_Negative",
diskIsAttached: diskIsAttachedCall{diskName, hostName, false, diskCheckError},
detach: detachCall{diskName, hostName, detachError},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
return "", detacher.Detach(diskName, hostName)
},
expectedError: detachError,
},
}
for _, testcase := range tests {
testcase.t = t
device, err := testcase.test(&testcase)
if err != testcase.expectedError {
t.Errorf("%s failed: expected err=%q, got %q", testcase.name, testcase.expectedError.Error(), err.Error())
}
if device != testcase.expectedDevice {
t.Errorf("%s failed: expected device=%q, got %q", testcase.name, testcase.expectedDevice, device)
}
t.Logf("Test %q succeeded", testcase.name)
}
}
// newPlugin creates a new vsphereVolumePlugin with fake cloud, NewAttacher
// and NewDetacher won't work.
func newPlugin() *vsphereVolumePlugin {
host := volumetest.NewFakeVolumeHost("/tmp", nil, nil, "")
plugins := ProbeVolumePlugins()
plugin := plugins[0]
plugin.Init(host)
return plugin.(*vsphereVolumePlugin)
}
func newAttacher(testcase *testcase) *vsphereVMDKAttacher {
return &vsphereVMDKAttacher{
host: nil,
vsphereVolumes: testcase,
}
}
func newDetacher(testcase *testcase) *vsphereVMDKDetacher {
return &vsphereVMDKDetacher{
vsphereVolumes: testcase,
}
}
func createVolSpec(name string) *volume.Spec {
return &volume.Spec{
Volume: &api.Volume{
VolumeSource: api.VolumeSource{
VsphereVolume: &api.VsphereVirtualDiskVolumeSource{
VolumePath: name,
},
},
},
}
}
func createPVSpec(name string) *volume.Spec {
return &volume.Spec{
PersistentVolume: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
PersistentVolumeSource: api.PersistentVolumeSource{
VsphereVolume: &api.VsphereVirtualDiskVolumeSource{
VolumePath: name,
},
},
},
},
}
}
// Fake vSphere implementation
type attachCall struct {
diskName string
hostName string
retDeviceUUID string
ret error
}
type detachCall struct {
diskName string
hostName string
ret error
}
type diskIsAttachedCall struct {
diskName, hostName string
isAttached bool
ret error
}
func (testcase *testcase) AttachDisk(diskName string, hostName string) (string, string, error) {
expected := &testcase.attach
if expected.diskName == "" && expected.hostName == "" {
// testcase.attach looks uninitialized, test did not expect to call
// AttachDisk
testcase.t.Errorf("Unexpected AttachDisk call!")
return "", "", errors.New("Unexpected AttachDisk call!")
}
if expected.diskName != diskName {
testcase.t.Errorf("Unexpected AttachDisk call: expected diskName %s, got %s", expected.diskName, diskName)
return "", "", errors.New("Unexpected AttachDisk call: wrong diskName")
}
if expected.hostName != hostName {
testcase.t.Errorf("Unexpected AttachDisk call: expected hostName %s, got %s", expected.hostName, hostName)
return "", "", errors.New("Unexpected AttachDisk call: wrong hostName")
}
glog.V(4).Infof("AttachDisk call: %s, %s, returning %q, %v", diskName, hostName, expected.retDeviceUUID, expected.ret)
return "", expected.retDeviceUUID, expected.ret
}
func (testcase *testcase) DetachDisk(diskName string, hostName string) error {
expected := &testcase.detach
if expected.diskName == "" && expected.hostName == "" {
// testcase.detach looks uninitialized, test did not expect to call
// DetachDisk
testcase.t.Errorf("Unexpected DetachDisk call!")
return errors.New("Unexpected DetachDisk call!")
}
if expected.diskName != diskName {
testcase.t.Errorf("Unexpected DetachDisk call: expected diskName %s, got %s", expected.diskName, diskName)
return errors.New("Unexpected DetachDisk call: wrong diskName")
}
if expected.hostName != hostName {
testcase.t.Errorf("Unexpected DetachDisk call: expected hostname %s, got %s", expected.hostName, hostName)
return errors.New("Unexpected DetachDisk call: wrong hostname")
}
glog.V(4).Infof("DetachDisk call: %s, %s, returning %v", diskName, hostName, expected.ret)
return expected.ret
}
func (testcase *testcase) DiskIsAttached(diskName, hostName string) (bool, error) {
expected := &testcase.diskIsAttached
if expected.diskName == "" && expected.hostName == "" {
// testcase.diskIsAttached looks uninitialized, test did not expect to
// call DiskIsAttached
testcase.t.Errorf("Unexpected DiskIsAttached call!")
return false, errors.New("Unexpected DiskIsAttached call!")
}
if expected.diskName != diskName {
testcase.t.Errorf("Unexpected DiskIsAttached call: expected diskName %s, got %s", expected.diskName, diskName)
return false, errors.New("Unexpected DiskIsAttached call: wrong diskName")
}
if expected.hostName != hostName {
testcase.t.Errorf("Unexpected DiskIsAttached call: expected hostName %s, got %s", expected.hostName, hostName)
return false, errors.New("Unexpected DiskIsAttached call: wrong hostName")
}
glog.V(4).Infof("DiskIsAttached call: %s, %s, returning %v, %v", diskName, hostName, expected.isAttached, expected.ret)
return expected.isAttached, expected.ret
}
func (testcase *testcase) CreateVolume(name string, size int, tags *map[string]string) (volumePath string, err error) {
return "", errors.New("Not implemented")
}
func (testcase *testcase) DeleteVolume(vmDiskPath string) error {
return errors.New("Not implemented")
}

View File

@ -17,16 +17,13 @@ limitations under the License.
package vsphere_volume
import (
"errors"
"fmt"
"os"
"path"
"strings"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
@ -121,20 +118,6 @@ func (plugin *vsphereVolumePlugin) newUnmounterInternal(volName string, podUID t
}}, nil
}
func (plugin *vsphereVolumePlugin) getCloudProvider() (*vsphere.VSphere, error) {
cloud := plugin.host.GetCloudProvider()
if cloud == nil {
glog.Errorf("Cloud provider not initialized properly")
return nil, errors.New("Cloud provider not initialized properly")
}
vs := cloud.(*vsphere.VSphere)
if vs == nil {
return nil, errors.New("Invalid cloud provider: expected vSphere")
}
return vs, nil
}
func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
vsphereVolume := &api.Volume{
Name: volumeName,
@ -149,10 +132,6 @@ func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath str
// Abstract interface to disk operations.
type vdManager interface {
// Attaches the disk to the kubelet's host machine.
AttachDisk(mounter *vsphereVolumeMounter, globalPDPath string) error
// Detaches the disk from the kubelet's host machine.
DetachDisk(unmounter *vsphereVolumeUnmounter) error
// Creates a volume
CreateVolume(provisioner *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeGB int, err error)
// Deletes a volume
@ -179,13 +158,6 @@ type vsphereVolume struct {
volume.MetricsNil
}
func detachDiskLogError(vv *vsphereVolume) {
err := vv.manager.DetachDisk(&vsphereVolumeUnmounter{vv})
if err != nil {
glog.Warningf("Failed to detach disk: %v (%v)", vv, err)
}
}
var _ volume.Mounter = &vsphereVolumeMounter{}
type vsphereVolumeMounter struct {
@ -219,23 +191,16 @@ func (b *vsphereVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
glog.V(4).Infof("Something is already mounted to target %s", dir)
return nil
}
globalPDPath := makeGlobalPDPath(b.plugin.host, b.volPath)
if err := b.manager.AttachDisk(b, globalPDPath); err != nil {
glog.V(3).Infof("AttachDisk failed: %v", err)
if err := os.MkdirAll(dir, 0750); err != nil {
glog.V(4).Infof("Could not create directory %s: %v", dir, err)
return err
}
glog.V(3).Infof("vSphere volume %s attached", b.volPath)
options := []string{"bind"}
if err := os.MkdirAll(dir, 0750); err != nil {
// TODO: we should really eject the attach/detach out into its own control loop.
glog.V(4).Infof("Could not create directory %s: %v", dir, err)
detachDiskLogError(b.vsphereVolume)
return err
}
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
globalPDPath := makeGlobalPDPath(b.plugin.host, b.volPath)
err = b.mounter.Mount(globalPDPath, dir, "", options)
if err != nil {
notmnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
@ -259,7 +224,6 @@ func (b *vsphereVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
}
}
os.Remove(dir)
detachDiskLogError(b.vsphereVolume)
return err
}
glog.V(3).Infof("vSphere volume %s mounted to %s", b.volPath, dir)
@ -283,69 +247,25 @@ func (v *vsphereVolumeUnmounter) TearDown() error {
// resource was the last reference to that disk on the kubelet.
func (v *vsphereVolumeUnmounter) TearDownAt(dir string) error {
glog.V(5).Infof("vSphere Volume TearDown of %s", dir)
notmnt, err := v.mounter.IsLikelyNotMountPoint(dir)
notMnt, err := v.mounter.IsLikelyNotMountPoint(dir)
if err != nil {
glog.V(4).Infof("Error checking if mountpoint ", dir, ": ", err)
return err
}
if notmnt {
glog.V(4).Infof("Not mount point,deleting")
if notMnt {
return os.Remove(dir)
}
// Find vSphere volumeID to lock the right volume
refs, err := mount.GetMountRefs(v.mounter, dir)
if err != nil {
glog.V(4).Infof("Error getting mountrefs for ", dir, ": ", err)
return err
}
if len(refs) == 0 {
glog.V(4).Infof("Directory %s is not mounted", dir)
return fmt.Errorf("directory %s is not mounted", dir)
}
mountPath := refs[0]
// Assumption: No file or folder is named starting with '[' in datastore
volumePath := mountPath[strings.LastIndex(mountPath, "["):]
// space between datastore and vmdk name in volumePath is encoded as '\040' when returned by GetMountRefs().
// volumePath eg: "[local] xxx.vmdk" provided to attach/mount
// replacing \040 with space to match the actual volumePath
v.volPath = strings.Replace(volumePath, "\\040", " ", -1)
glog.V(4).Infof("Found volume %s mounted to %s", v.volPath, dir)
// Reload list of references, there might be SetUpAt finished in the meantime
refs, err = mount.GetMountRefs(v.mounter, dir)
if err != nil {
glog.V(4).Infof("GetMountRefs failed: %v", err)
return err
}
if err := v.mounter.Unmount(dir); err != nil {
glog.V(4).Infof("Unmount failed: %v", err)
return err
}
glog.V(3).Infof("Successfully unmounted: %s\n", dir)
// If refCount is 1, then all bind mounts have been removed, and the
// remaining reference is the global mount. It is safe to detach.
if len(refs) == 1 {
if err := v.manager.DetachDisk(v); err != nil {
glog.V(4).Infof("DetachDisk failed: %v", err)
return err
}
glog.V(3).Infof("Volume %s detached", v.volPath)
}
notmnt, mntErr := v.mounter.IsLikelyNotMountPoint(dir)
notMnt, mntErr := v.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if notmnt {
if err := os.Remove(dir); err != nil {
glog.V(4).Infof("Failed to remove directory after unmount: %v", err)
return err
}
if notMnt {
return os.Remove(dir)
}
return nil
return fmt.Errorf("Failed to unmount volume dir")
}
func makeGlobalPDPath(host volume.VolumeHost, devName string) string {

View File

@ -58,46 +58,12 @@ func TestCanSupport(t *testing.T) {
}
type fakePDManager struct {
attachCalled bool
detachCalled bool
}
func getFakeDeviceName(host volume.VolumeHost, volPath string) string {
return path.Join(host.GetPluginDir(vsphereVolumePluginName), "device", volPath)
}
func (fake *fakePDManager) AttachDisk(b *vsphereVolumeMounter, globalPDPath string) error {
fakeDeviceName := getFakeDeviceName(b.plugin.host, b.volPath)
err := os.MkdirAll(fakeDeviceName, 0750)
if err != nil {
return err
}
fake.attachCalled = true
// Simulate the global mount so that the fakeMounter returns the
// expected number of mounts for the attached disk.
err = b.mounter.Mount(fakeDeviceName, globalPDPath, "", []string{"bind"})
if err != nil {
return err
}
return nil
}
func (fake *fakePDManager) DetachDisk(v *vsphereVolumeUnmounter) error {
globalPath := makeGlobalPDPath(v.plugin.host, v.volPath)
fakeDeviceName := getFakeDeviceName(v.plugin.host, v.volPath)
err := v.mounter.Unmount(globalPath)
if err != nil {
return err
}
// "Detach" the fake "device"
err = os.RemoveAll(fakeDeviceName)
if err != nil {
return err
}
fake.detachCalled = true
return nil
}
func (fake *fakePDManager) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeKB int, err error) {
return "[local] test-volume-name.vmdk", 100, nil
}
@ -156,10 +122,6 @@ func TestPlugin(t *testing.T) {
t.Errorf("Expected success, got: %v", err)
}
if !fakeManager.attachCalled {
t.Errorf("Attach watch not called")
}
// Test Unmounter
fakeManager = &fakePDManager{}
unmounter, err := plug.(*vsphereVolumePlugin).newUnmounterInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter)
@ -178,9 +140,6 @@ func TestPlugin(t *testing.T) {
} else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err)
}
if !fakeManager.detachCalled {
t.Errorf("Detach watch not called")
}
// Test Provisioner
cap := resource.MustParse("100Mi")

View File

@ -18,140 +18,41 @@ package vsphere_volume
import (
"errors"
"io/ioutil"
"os"
"path"
"fmt"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/keymutex"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
maxRetries = 10
maxRetries = 10
checkSleepDuration = time.Second
diskByIDPath = "/dev/disk/by-id/"
diskSCSIPrefix = "wwn-0x"
)
var ErrProbeVolume = errors.New("Error scanning attached volumes")
// Singleton key mutex for keeping attach/detach operations for the same PD atomic
var attachDetachMutex = keymutex.NewKeyMutex()
type VsphereDiskUtil struct{}
// Attaches a disk to the current kubelet.
// Mounts the disk to it's global path.
func (util *VsphereDiskUtil) AttachDisk(vm *vsphereVolumeMounter, globalPDPath string) error {
options := []string{}
// Block execution until any pending attach/detach operations for this PD have completed
attachDetachMutex.LockKey(vm.volPath)
defer attachDetachMutex.UnlockKey(vm.volPath)
cloud, err := vm.plugin.getCloudProvider()
if err != nil {
return err
func verifyDevicePath(path string) (string, error) {
if pathExists, err := volumeutil.PathExists(path); err != nil {
return "", fmt.Errorf("Error checking if path exists: %v", err)
} else if pathExists {
return path, nil
}
diskID, diskUUID, attachError := cloud.AttachDisk(vm.volPath, "")
if attachError != nil {
return attachError
} else if diskUUID == "" {
return errors.New("Disk UUID has no value")
}
// diskID for detach Disk
vm.diskID = diskID
var devicePath string
numTries := 0
for {
devicePath = verifyDevicePath(diskUUID)
_, err := os.Stat(devicePath)
if err == nil {
break
}
if err != nil && !os.IsNotExist(err) {
return err
}
numTries++
if numTries == maxRetries {
return errors.New("Could not attach disk: Timeout after 60s")
}
time.Sleep(time.Second * 60)
}
notMnt, err := vm.mounter.IsLikelyNotMountPoint(globalPDPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(globalPDPath, 0750); err != nil {
return err
}
notMnt = true
} else {
return err
}
}
if notMnt {
err = vm.diskMounter.FormatAndMount(devicePath, globalPDPath, vm.fsType, options)
if err != nil {
os.Remove(globalPDPath)
return err
}
glog.V(2).Infof("Safe mount successful: %q\n", devicePath)
}
return nil
}
func verifyDevicePath(diskUUID string) string {
files, _ := ioutil.ReadDir("/dev/disk/by-id/")
for _, f := range files {
// TODO: should support other controllers
if strings.Contains(f.Name(), "scsi-") {
devID := f.Name()[len("scsi-"):len(f.Name())]
if strings.Contains(devID, diskUUID) {
glog.V(4).Infof("Found disk attached as %q; full devicepath: %s\n", f.Name(), path.Join("/dev/disk/by-id/", f.Name()))
return path.Join("/dev/disk/by-id/", f.Name())
}
}
}
glog.Warningf("Failed to find device for the diskid: %q\n", diskUUID)
return ""
}
// Unmounts the device and detaches the disk from the kubelet's host machine.
func (util *VsphereDiskUtil) DetachDisk(vu *vsphereVolumeUnmounter) error {
// Block execution until any pending attach/detach operations for this PD have completed
attachDetachMutex.LockKey(vu.volPath)
defer attachDetachMutex.UnlockKey(vu.volPath)
globalPDPath := makeGlobalPDPath(vu.plugin.host, vu.volPath)
if err := vu.mounter.Unmount(globalPDPath); err != nil {
return err
}
if err := os.Remove(globalPDPath); err != nil {
return err
}
glog.V(2).Infof("Successfully unmounted main device: %s\n", globalPDPath)
cloud, err := vu.plugin.getCloudProvider()
if err != nil {
return err
}
if err = cloud.DetachDisk(vu.volPath, ""); err != nil {
return err
}
glog.V(2).Infof("Successfully detached vSphere volume %s", vu.volPath)
return nil
return "", nil
}
// CreateVolume creates a vSphere volume.
func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeKB int, err error) {
cloud, err := v.plugin.getCloudProvider()
cloud, err := getCloudProvider(v.plugin.host.GetCloudProvider())
if err != nil {
return "", 0, err
}
@ -171,7 +72,7 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPa
// DeleteVolume deletes a vSphere volume.
func (util *VsphereDiskUtil) DeleteVolume(vd *vsphereVolumeDeleter) error {
cloud, err := vd.plugin.getCloudProvider()
cloud, err := getCloudProvider(vd.plugin.host.GetCloudProvider())
if err != nil {
return err
}
@ -183,3 +84,25 @@ func (util *VsphereDiskUtil) DeleteVolume(vd *vsphereVolumeDeleter) error {
glog.V(2).Infof("Successfully deleted vsphere volume %s", vd.volPath)
return nil
}
func getVolPathfromDeviceMountPath(deviceMountPath string) string {
// Assumption: No file or folder is named starting with '[' in datastore
volPath := deviceMountPath[strings.LastIndex(deviceMountPath, "["):]
// space between datastore and vmdk name in volumePath is encoded as '\040' when returned by GetMountRefs().
// volumePath eg: "[local] xxx.vmdk" provided to attach/mount
// replacing \040 with space to match the actual volumePath
return strings.Replace(volPath, "\\040", " ", -1)
}
func getCloudProvider(cloud cloudprovider.Interface) (*vsphere.VSphere, error) {
if cloud == nil {
glog.Errorf("Cloud provider not initialized properly")
return nil, errors.New("Cloud provider not initialized properly")
}
vs := cloud.(*vsphere.VSphere)
if vs == nil {
return nil, errors.New("Invalid cloud provider: expected vSphere")
}
return vs, nil
}