support AWS and Cinder attacher

Signed-off-by: Huamin Chen <hchen@redhat.com>
pull/6/head
Huamin Chen 2016-05-10 19:02:41 +00:00
parent 015bc3d60d
commit d1e0a13924
11 changed files with 695 additions and 410 deletions

View File

@ -216,6 +216,13 @@ type Volumes interface {
// Get labels to apply to volume on creation
GetVolumeLabels(volumeName string) (map[string]string, error)
// Get volume's disk path from volume name
// return the device path where the volume is attached
GetDiskPath(volumeName string) (string, error)
// Check if the volume is already attached to the instance
DiskIsAttached(diskName, instanceID string) (bool, error)
}
// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
@ -1454,6 +1461,39 @@ func (c *AWSCloud) GetVolumeLabels(volumeName string) (map[string]string, error)
return labels, nil
}
// Implement Volumes.GetDiskPath
func (c *AWSCloud) GetDiskPath(volumeName string) (string, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return "", err
}
info, err := awsDisk.describeVolume()
if err != nil {
return "", err
}
if len(info.Attachments) == 0 {
return "", fmt.Errorf("No attachement to volume %s", volumeName)
}
return aws.StringValue(info.Attachments[0].Device), nil
}
// Implement Volumes.DiskIsAttached
func (c *AWSCloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
awsInstance, err := c.getAwsInstance(instanceID)
info, err := awsInstance.describeInstance()
if err != nil {
return false, err
}
for _, blockDevice := range info.BlockDeviceMappings {
name := aws.StringValue(blockDevice.Ebs.VolumeId)
if name == diskName {
return true, nil
}
}
return false, nil
}
// Gets the current load balancer state
func (s *AWSCloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) {
request := &elb.DescribeLoadBalancersInput{}

View File

@ -1123,3 +1123,35 @@ func (os *OpenStack) DeleteVolume(volumeName string) error {
}
return err
}
// Get device path of attached volume to the compute running kubelet
func (os *OpenStack) GetAttachmentDiskPath(instanceID string, diskName string) (string, error) {
disk, err := os.getVolume(diskName)
if err != nil {
return "", err
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil {
if instanceID == disk.Attachments[0]["server_id"] {
// Attachment[0]["device"] points to the device path
// see http://developer.openstack.org/api-ref-blockstorage-v1.html
return disk.Attachments[0]["device"].(string), nil
} else {
errMsg := fmt.Sprintf("Disk %q is attached to a different compute: %q, should be detached before proceeding", diskName, disk.Attachments[0]["server_id"])
glog.Errorf(errMsg)
return "", errors.New(errMsg)
}
}
return "", fmt.Errorf("volume %s is not attached to %s", diskName, instanceID)
}
// query if a volume is attached to a compute instance
func (os *OpenStack) DiskIsAttached(diskName, instanceID string) (bool, error) {
disk, err := os.getVolume(diskName)
if err != nil {
return false, err
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] {
return true, nil
}
return false, nil
}

View File

@ -612,3 +612,35 @@ func (rs *Rackspace) DetachDisk(instanceID string, partialDiskId string) error {
return nil
}
// Get device path of attached volume to the compute running kubelet
func (rs *Rackspace) GetAttachmentDiskPath(instanceID string, diskName string) (string, error) {
disk, err := rs.getVolume(diskName)
if err != nil {
return "", err
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil {
if instanceID == disk.Attachments[0]["server_id"] {
// Attachment[0]["device"] points to the device path
// see http://developer.openstack.org/api-ref-blockstorage-v1.html
return disk.Attachments[0]["device"].(string), nil
} else {
errMsg := fmt.Sprintf("Disk %q is attached to a different compute: %q, should be detached before proceeding", diskName, disk.Attachments[0]["server_id"])
glog.Errorf(errMsg)
return "", errors.New(errMsg)
}
}
return "", fmt.Errorf("volume %s is not attached to %s", diskName, instanceID)
}
// query if a volume is attached to a compute instance
func (rs *Rackspace) DiskIsAttached(diskName, instanceID string) (bool, error) {
disk, err := rs.getVolume(diskName)
if err != nil {
return false, err
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] {
return true, nil
}
return false, nil
}

View File

@ -0,0 +1,256 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws_ebs
import (
"fmt"
"os"
"path"
"strconv"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
)
type awsElasticBlockStoreAttacher struct {
host volume.VolumeHost
}
var _ volume.Attacher = &awsElasticBlockStoreAttacher{}
var _ volume.AttachableVolumePlugin = &awsElasticBlockStorePlugin{}
func (plugin *awsElasticBlockStorePlugin) NewAttacher() (volume.Attacher, error) {
return &awsElasticBlockStoreAttacher{host: plugin.host}, nil
}
func (plugin *awsElasticBlockStorePlugin) GetDeviceName(spec *volume.Spec) (string, error) {
volumeSource, _ := getVolumeSource(spec)
if volumeSource == nil {
return "", fmt.Errorf("Spec does not reference an EBS volume type")
}
return volumeSource.VolumeID, nil
}
func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, hostName string) error {
volumeSource, readOnly := getVolumeSource(spec)
volumeID := volumeSource.VolumeID
awsCloud, err := getCloudProvider(attacher.host.GetCloudProvider())
if err != nil {
return err
}
attached, err := awsCloud.DiskIsAttached(volumeID, hostName)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if volume (%q) is already attached to current node (%q). Will continue and try attach anyway. err=%v",
volumeID, hostName, err)
}
if err == nil && attached {
// Volume is already attached to node.
glog.Infof("Attach operation is successful. volume %q is already attached to node %q.", volumeID, hostName)
return nil
}
if _, err = awsCloud.AttachDisk(volumeID, hostName, readOnly); err != nil {
glog.Errorf("Error attaching volume %q: %+v", volumeID, err)
return err
}
return nil
}
func (attacher *awsElasticBlockStoreAttacher) WaitForAttach(spec *volume.Spec, timeout time.Duration) (string, error) {
awsCloud, err := getCloudProvider(attacher.host.GetCloudProvider())
if err != nil {
return "", err
}
volumeSource, _ := getVolumeSource(spec)
volumeID := volumeSource.VolumeID
partition := ""
if volumeSource.Partition != 0 {
partition = strconv.Itoa(int(volumeSource.Partition))
}
devicePath := ""
if d, err := awsCloud.GetDiskPath(volumeID); err == nil {
devicePath = d
} else {
glog.Errorf("GetDiskPath %q gets error %v", volumeID, err)
}
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ticker.C:
glog.V(5).Infof("Checking AWS Volume %q is attached.", volumeID)
if devicePath == "" {
if d, err := awsCloud.GetDiskPath(volumeID); err == nil {
devicePath = d
} else {
glog.Errorf("GetDiskPath %q gets error %v", volumeID, err)
}
}
if devicePath != "" {
devicePaths := getDiskByIdPaths(partition, devicePath)
path, err := verifyDevicePath(devicePaths)
if err != nil {
// Log error, if any, and continue checking periodically. See issue #11321
glog.Errorf("Error verifying AWS Volume (%q) is attached: %v", volumeID, err)
} else if path != "" {
// A device path has successfully been created for the PD
glog.Infof("Successfully found attached AWS Volume %q.", volumeID)
return path, nil
}
} else {
glog.V(5).Infof("AWS Volume (%q) is not attached yet", volumeID)
}
case <-timer.C:
return "", fmt.Errorf("Could not find attached AWS Volume %q. Timeout waiting for mount paths to be created.", volumeID)
}
}
}
func (attacher *awsElasticBlockStoreAttacher) GetDeviceMountPath(spec *volume.Spec) string {
volumeSource, _ := getVolumeSource(spec)
return makeGlobalPDPath(attacher.host, volumeSource.VolumeID)
}
// FIXME: this method can be further pruned.
func (attacher *awsElasticBlockStoreAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, mounter mount.Interface) error {
notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
return err
}
notMnt = true
} else {
return err
}
}
volumeSource, readOnly := getVolumeSource(spec)
options := []string{}
if readOnly {
options = append(options, "ro")
}
if notMnt {
diskMounter := &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()}
err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, options)
if err != nil {
os.Remove(deviceMountPath)
return err
}
}
return nil
}
type awsElasticBlockStoreDetacher struct {
host volume.VolumeHost
}
var _ volume.Detacher = &awsElasticBlockStoreDetacher{}
func (plugin *awsElasticBlockStorePlugin) NewDetacher() (volume.Detacher, error) {
return &awsElasticBlockStoreDetacher{host: plugin.host}, nil
}
func (detacher *awsElasticBlockStoreDetacher) Detach(deviceMountPath string, hostName string) error {
volumeID := path.Base(deviceMountPath)
awsCloud, err := getCloudProvider(detacher.host.GetCloudProvider())
if err != nil {
return err
}
attached, err := awsCloud.DiskIsAttached(volumeID, hostName)
if err != nil {
// Log error and continue with detach
glog.Errorf(
"Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
volumeID, hostName, err)
}
if err == nil && !attached {
// Volume is already detached from node.
glog.Infof("detach operation was successful. volume %q is already detached from node %q.", volumeID, hostName)
return nil
}
if _, err = awsCloud.DetachDisk(volumeID, hostName); err != nil {
glog.Errorf("Error detaching volumeID %q: %v", volumeID, err)
return err
}
return nil
}
func (detacher *awsElasticBlockStoreDetacher) WaitForDetach(devicePath string, timeout time.Duration) error {
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ticker.C:
glog.V(5).Infof("Checking device %q is detached.", devicePath)
if pathExists, err := pathExists(devicePath); err != nil {
return fmt.Errorf("Error checking if device path exists: %v", err)
} else if !pathExists {
return nil
}
case <-timer.C:
return fmt.Errorf("Timeout reached; PD Device %v is still attached", devicePath)
}
}
}
func (detacher *awsElasticBlockStoreDetacher) UnmountDevice(deviceMountPath string, mounter mount.Interface) error {
volume := path.Base(deviceMountPath)
if err := unmountPDAndRemoveGlobalPath(deviceMountPath, mounter); err != nil {
glog.Errorf("Error unmounting %q: %v", volume, err)
}
return nil
}
func getVolumeSource(spec *volume.Spec) (*api.AWSElasticBlockStoreVolumeSource, bool) {
var readOnly bool
var volumeSource *api.AWSElasticBlockStoreVolumeSource
if spec.Volume != nil && spec.Volume.AWSElasticBlockStore != nil {
volumeSource = spec.Volume.AWSElasticBlockStore
readOnly = volumeSource.ReadOnly
} else {
volumeSource = spec.PersistentVolume.Spec.AWSElasticBlockStore
readOnly = spec.ReadOnly
}
return volumeSource, readOnly
}

View File

@ -169,11 +169,6 @@ func (plugin *awsElasticBlockStorePlugin) newProvisionerInternal(options volume.
// Abstract interface to PD operations.
type ebsManager interface {
// Attaches the disk to the kubelet's host machine.
AttachAndMountDisk(b *awsElasticBlockStoreMounter, globalPDPath string) error
// Detaches the disk from the kubelet's host machine.
DetachDisk(c *awsElasticBlockStoreUnmounter) error
// Creates a volume
CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error)
// Deletes a volume
DeleteVolume(deleter *awsElasticBlockStoreDeleter) error
@ -196,13 +191,6 @@ type awsElasticBlockStore struct {
volume.MetricsProvider
}
func detachDiskLogError(ebs *awsElasticBlockStore) {
err := ebs.manager.DetachDisk(&awsElasticBlockStoreUnmounter{ebs})
if err != nil {
glog.Warningf("Failed to detach disk: %v (%v)", ebs, err)
}
}
type awsElasticBlockStoreMounter struct {
*awsElasticBlockStore
// Filesystem type, optional.
@ -241,13 +229,8 @@ func (b *awsElasticBlockStoreMounter) SetUpAt(dir string, fsGroup *int64) error
}
globalPDPath := makeGlobalPDPath(b.plugin.host, b.volumeID)
if err := b.manager.AttachAndMountDisk(b, globalPDPath); err != nil {
return err
}
if err := os.MkdirAll(dir, 0750); err != nil {
// TODO: we should really eject the attach/detach out into its own control loop.
detachDiskLogError(b.awsElasticBlockStore)
return err
}
@ -280,8 +263,6 @@ func (b *awsElasticBlockStoreMounter) SetUpAt(dir string, fsGroup *int64) error
}
}
os.Remove(dir)
// TODO: we should really eject the attach/detach out into its own control loop.
detachDiskLogError(b.awsElasticBlockStore)
return err
}
@ -334,8 +315,7 @@ func (c *awsElasticBlockStoreUnmounter) TearDown() error {
return c.TearDownAt(c.GetPath())
}
// Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet.
// Unmounts the bind mount
func (c *awsElasticBlockStoreUnmounter) TearDownAt(dir string) error {
notMnt, err := c.mounter.IsLikelyNotMountPoint(dir)
if err != nil {
@ -347,35 +327,11 @@ func (c *awsElasticBlockStoreUnmounter) TearDownAt(dir string) error {
return os.Remove(dir)
}
refs, err := mount.GetMountRefs(c.mounter, dir)
if err != nil {
glog.V(2).Info("Error getting mountrefs for ", dir, ": ", err)
return err
}
if len(refs) == 0 {
glog.Warning("Did not find pod-mount for ", dir, " during tear-down")
}
// Unmount the bind-mount inside this pod
if err := c.mounter.Unmount(dir); err != nil {
glog.V(2).Info("Error unmounting dir ", dir, ": ", err)
return err
}
// If len(refs) is 1, then all bind mounts have been removed, and the
// remaining reference is the global mount. It is safe to detach.
if len(refs) == 1 {
// c.volumeID is not initially set for volume-unmounters, so set it here.
c.volumeID, err = getVolumeIDFromGlobalMount(c.plugin.host, refs[0])
if err != nil {
glog.V(2).Info("Could not determine volumeID from mountpoint ", refs[0], ": ", err)
return err
}
if err := c.manager.DetachDisk(&awsElasticBlockStoreUnmounter{c.awsElasticBlockStore}); err != nil {
glog.V(2).Info("Error detaching disk ", c.volumeID, ": ", err)
return err
}
} else {
glog.V(2).Infof("Found multiple refs; won't detach EBS volume: %v", refs)
}
notMnt, mntErr := c.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)

View File

@ -88,35 +88,10 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA
}
type fakePDManager struct {
attachCalled bool
detachCalled bool
}
// TODO(jonesdl) To fully test this, we could create a loopback device
// and mount that instead.
func (fake *fakePDManager) AttachAndMountDisk(b *awsElasticBlockStoreMounter, globalPDPath string) error {
globalPath := makeGlobalPDPath(b.plugin.host, b.volumeID)
err := os.MkdirAll(globalPath, 0750)
if err != nil {
return err
}
fake.attachCalled = true
// Simulate the global mount so that the fakeMounter returns the
// expected number of mounts for the attached disk.
b.mounter.Mount(globalPath, globalPath, b.fsType, nil)
return nil
}
func (fake *fakePDManager) DetachDisk(c *awsElasticBlockStoreUnmounter) error {
globalPath := makeGlobalPDPath(c.plugin.host, c.volumeID)
err := os.RemoveAll(globalPath)
if err != nil {
return err
}
fake.detachCalled = true
return nil
}
func (fake *fakePDManager) CreateVolume(c *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error) {
labels = make(map[string]string)
labels["fakepdmanager"] = "yes"
@ -185,9 +160,6 @@ func TestPlugin(t *testing.T) {
t.Errorf("SetUp() failed: %v", err)
}
}
if !fakeManager.attachCalled {
t.Errorf("Attach watch not called")
}
fakeManager = &fakePDManager{}
unmounter, err := plug.(*awsElasticBlockStorePlugin).newUnmounterInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter)
@ -206,9 +178,6 @@ func TestPlugin(t *testing.T) {
} else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err)
}
if !fakeManager.detachCalled {
t.Errorf("Detach watch not called")
}
// Test Provisioner
cap := resource.MustParse("100Mi")

View File

@ -19,14 +19,12 @@ package aws_ebs
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/util/keymutex"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
)
@ -40,74 +38,10 @@ const (
errorSleepDuration = 5 * time.Second
)
// Singleton key mutex for keeping attach/detach operations for the same PD atomic
var attachDetachMutex = keymutex.NewKeyMutex()
type AWSDiskUtil struct{}
// Attaches a disk to the current kubelet.
// Mounts the disk to it's global path.
func (diskUtil *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreMounter, globalPDPath string) error {
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Will block for existing operations, if any. (globalPDPath=%q)\r\n", b.volumeID, globalPDPath)
// Block execution until any pending detach operations for this PD have completed
attachDetachMutex.LockKey(b.volumeID)
defer attachDetachMutex.UnlockKey(b.volumeID)
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Awake and ready to execute. (globalPDPath=%q)\r\n", b.volumeID, globalPDPath)
xvdBefore, err := filepath.Glob(diskXVDPattern)
if err != nil {
glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskXVDPattern, err)
}
xvdBeforeSet := sets.NewString(xvdBefore...)
devicePath, err := attachDiskAndVerify(b, xvdBeforeSet)
if err != nil {
return err
}
// Only mount the PD globally once.
notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(globalPDPath, 0750); err != nil {
return err
}
notMnt = true
} else {
return err
}
}
options := []string{}
if b.readOnly {
options = append(options, "ro")
}
if notMnt {
err = b.diskMounter.FormatAndMount(devicePath, globalPDPath, b.fsType, options)
if err != nil {
os.Remove(globalPDPath)
return err
}
}
return nil
}
// Unmounts the device and detaches the disk from the kubelet's host machine.
func (util *AWSDiskUtil) DetachDisk(c *awsElasticBlockStoreUnmounter) error {
glog.V(5).Infof("DetachDisk(...) for PD %q\r\n", c.volumeID)
if err := unmountPDAndRemoveGlobalPath(c); err != nil {
glog.Errorf("Error unmounting PD %q: %v", c.volumeID, err)
}
// Detach disk asynchronously so that the kubelet sync loop is not blocked.
go detachDiskAndVerify(c)
return nil
}
func (util *AWSDiskUtil) DeleteVolume(d *awsElasticBlockStoreDeleter) error {
cloud, err := getCloudProvider(d.awsElasticBlockStore.plugin)
cloud, err := getCloudProvider(d.awsElasticBlockStore.plugin.host.GetCloudProvider())
if err != nil {
return err
}
@ -128,7 +62,7 @@ func (util *AWSDiskUtil) DeleteVolume(d *awsElasticBlockStoreDeleter) error {
// CreateVolume creates an AWS EBS volume.
// Returns: volumeID, volumeSizeGB, labels, error
func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (string, int, map[string]string, error) {
cloud, err := getCloudProvider(c.awsElasticBlockStore.plugin)
cloud, err := getCloudProvider(c.awsElasticBlockStore.plugin.host.GetCloudProvider())
if err != nil {
return "", 0, nil, err
}
@ -166,60 +100,6 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (strin
return name, int(requestGB), labels, nil
}
// Attaches the specified persistent disk device to node, verifies that it is attached, and retries if it fails.
func attachDiskAndVerify(b *awsElasticBlockStoreMounter, xvdBeforeSet sets.String) (string, error) {
var awsCloud *aws.AWSCloud
var attachError error
for numRetries := 0; numRetries < maxRetries; numRetries++ {
var err error
if awsCloud == nil {
awsCloud, err = getCloudProvider(b.awsElasticBlockStore.plugin)
if err != nil || awsCloud == nil {
// Retry on error. See issue #11321
glog.Errorf("Error getting AWSCloudProvider while detaching PD %q: %v", b.volumeID, err)
time.Sleep(errorSleepDuration)
continue
}
}
if numRetries > 0 {
glog.Warningf("Retrying attach for EBS Disk %q (retry count=%v).", b.volumeID, numRetries)
}
var devicePath string
devicePath, attachError = awsCloud.AttachDisk(b.volumeID, "", b.readOnly)
if attachError != nil {
glog.Errorf("Error attaching PD %q: %v", b.volumeID, attachError)
time.Sleep(errorSleepDuration)
continue
}
devicePaths := getDiskByIdPaths(b.awsElasticBlockStore, devicePath)
for numChecks := 0; numChecks < maxChecks; numChecks++ {
path, err := verifyDevicePath(devicePaths)
if err != nil {
// Log error, if any, and continue checking periodically. See issue #11321
glog.Errorf("Error verifying EBS Disk (%q) is attached: %v", b.volumeID, err)
} else if path != "" {
// A device path has successfully been created for the PD
glog.Infof("Successfully attached EBS Disk %q.", b.volumeID)
return path, nil
}
// Sleep then check again
glog.V(3).Infof("Waiting for EBS Disk %q to attach.", b.volumeID)
time.Sleep(checkSleepDuration)
}
}
if attachError != nil {
return "", fmt.Errorf("Could not attach EBS Disk %q: %v", b.volumeID, attachError)
}
return "", fmt.Errorf("Could not attach EBS Disk %q. Timeout waiting for mount paths to be created.", b.volumeID)
}
// Returns the first path that exists, or empty string if none exist.
func verifyDevicePath(devicePaths []string) (string, error) {
for _, path := range devicePaths {
@ -233,72 +113,10 @@ func verifyDevicePath(devicePaths []string) (string, error) {
return "", nil
}
// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails.
// This function is intended to be called asynchronously as a go routine.
func detachDiskAndVerify(c *awsElasticBlockStoreUnmounter) {
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.volumeID)
defer runtime.HandleCrash()
// Block execution until any pending attach/detach operations for this PD have completed
attachDetachMutex.LockKey(c.volumeID)
defer attachDetachMutex.UnlockKey(c.volumeID)
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Awake and ready to execute.", c.volumeID)
var awsCloud *aws.AWSCloud
for numRetries := 0; numRetries < maxRetries; numRetries++ {
var err error
if awsCloud == nil {
awsCloud, err = getCloudProvider(c.awsElasticBlockStore.plugin)
if err != nil || awsCloud == nil {
// Retry on error. See issue #11321
glog.Errorf("Error getting AWSCloudProvider while detaching PD %q: %v", c.volumeID, err)
time.Sleep(errorSleepDuration)
continue
}
}
if numRetries > 0 {
glog.Warningf("Retrying detach for EBS Disk %q (retry count=%v).", c.volumeID, numRetries)
}
devicePath, err := awsCloud.DetachDisk(c.volumeID, "")
if err != nil {
glog.Errorf("Error detaching PD %q: %v", c.volumeID, err)
time.Sleep(errorSleepDuration)
continue
}
devicePaths := getDiskByIdPaths(c.awsElasticBlockStore, devicePath)
for numChecks := 0; numChecks < maxChecks; numChecks++ {
allPathsRemoved, err := verifyAllPathsRemoved(devicePaths)
if err != nil {
// Log error, if any, and continue checking periodically.
glog.Errorf("Error verifying EBS Disk (%q) is detached: %v", c.volumeID, err)
} else if allPathsRemoved {
// All paths to the PD have been successfully removed
unmountPDAndRemoveGlobalPath(c)
glog.Infof("Successfully detached EBS Disk %q.", c.volumeID)
return
}
// Sleep then check again
glog.V(3).Infof("Waiting for EBS Disk %q to detach.", c.volumeID)
time.Sleep(checkSleepDuration)
}
}
glog.Errorf("Failed to detach EBS Disk %q. One or more mount paths was not removed.", c.volumeID)
}
// Unmount the global PD mount, which should be the only one, and delete it.
func unmountPDAndRemoveGlobalPath(c *awsElasticBlockStoreUnmounter) error {
globalPDPath := makeGlobalPDPath(c.plugin.host, c.volumeID)
err := c.mounter.Unmount(globalPDPath)
os.Remove(globalPDPath)
// Unmount the global mount path, which should be the only one, and delete it.
func unmountPDAndRemoveGlobalPath(globalMountPath string, mounter mount.Interface) error {
err := mounter.Unmount(globalMountPath)
os.Remove(globalMountPath)
return err
}
@ -319,15 +137,15 @@ func verifyAllPathsRemoved(devicePaths []string) (bool, error) {
// Returns list of all paths for given EBS mount
// This is more interesting on GCE (where we are able to identify volumes under /dev/disk-by-id)
// Here it is mostly about applying the partition path
func getDiskByIdPaths(d *awsElasticBlockStore, devicePath string) []string {
func getDiskByIdPaths(partition string, devicePath string) []string {
devicePaths := []string{}
if devicePath != "" {
devicePaths = append(devicePaths, devicePath)
}
if d.partition != "" {
if partition != "" {
for i, path := range devicePaths {
devicePaths[i] = path + diskPartitionSuffix + d.partition
devicePaths[i] = path + diskPartitionSuffix + partition
}
}
@ -347,18 +165,10 @@ func pathExists(path string) (bool, error) {
}
// Return cloud provider
func getCloudProvider(plugin *awsElasticBlockStorePlugin) (*aws.AWSCloud, error) {
if plugin == nil {
return nil, fmt.Errorf("Failed to get AWS Cloud Provider. plugin object is nil.")
}
if plugin.host == nil {
return nil, fmt.Errorf("Failed to get AWS Cloud Provider. plugin.host object is nil.")
}
cloudProvider := plugin.host.GetCloudProvider()
func getCloudProvider(cloudProvider cloudprovider.Interface) (*aws.AWSCloud, error) {
awsCloudProvider, ok := cloudProvider.(*aws.AWSCloud)
if !ok || awsCloudProvider == nil {
return nil, fmt.Errorf("Failed to get AWS Cloud Provider. plugin.host.GetCloudProvider returned %v instead", cloudProvider)
return nil, fmt.Errorf("Failed to get AWS Cloud Provider. GetCloudProvider returned %v instead", cloudProvider)
}
return awsCloudProvider, nil

View File

@ -0,0 +1,298 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cinder
import (
"fmt"
"os"
"path"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
)
type cinderDiskAttacher struct {
host volume.VolumeHost
}
var _ volume.Attacher = &cinderDiskAttacher{}
var _ volume.AttachableVolumePlugin = &cinderPlugin{}
const (
checkSleepDuration = time.Second
)
func (plugin *cinderPlugin) NewAttacher() (volume.Attacher, error) {
return &cinderDiskAttacher{host: plugin.host}, nil
}
func (plugin *cinderPlugin) GetDeviceName(spec *volume.Spec) (string, error) {
volumeSource, _ := getVolumeSource(spec)
if volumeSource == nil {
return "", fmt.Errorf("Spec does not reference a Cinder volume type")
}
return volumeSource.VolumeID, nil
}
func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, hostName string) error {
volumeSource, _ := getVolumeSource(spec)
volumeID := volumeSource.VolumeID
cloud, err := getCloudProvider(attacher.host.GetCloudProvider())
if err != nil {
return err
}
instances, res := cloud.Instances()
if !res {
return fmt.Errorf("failed to list openstack instances")
}
instanceid, err := instances.InstanceID(hostName)
if err != nil {
return err
}
if ind := strings.LastIndex(instanceid, "/"); ind >= 0 {
instanceid = instanceid[(ind + 1):]
}
attached, err := cloud.DiskIsAttached(volumeID, instanceid)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if volume (%q) is already attached to current node (%q). Will continue and try attach anyway. err=%v",
volumeID, instanceid, err)
}
if err == nil && attached {
// Volume is already attached to node.
glog.Infof("Attach operation is successful. volume %q is already attached to node %q.", volumeID, instanceid)
return nil
}
_, err = cloud.AttachDisk(instanceid, volumeID)
if err != nil {
glog.Infof("attach volume %q to instance %q gets %v", volumeID, instanceid, err)
}
glog.Infof("attached volume %q to instance %q", volumeID, instanceid)
return err
}
func (attacher *cinderDiskAttacher) WaitForAttach(spec *volume.Spec, timeout time.Duration) (string, error) {
cloud, err := getCloudProvider(attacher.host.GetCloudProvider())
if err != nil {
return "", err
}
volumeSource, _ := getVolumeSource(spec)
volumeID := volumeSource.VolumeID
instanceid, err := cloud.InstanceID()
if err != nil {
return "", err
}
devicePath := ""
if d, err := cloud.GetAttachmentDiskPath(instanceid, volumeID); err == nil {
devicePath = d
} else {
glog.Errorf("%q GetAttachmentDiskPath (%q) gets error %v", instanceid, volumeID, err)
}
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
probeAttachedVolume()
select {
case <-ticker.C:
glog.V(5).Infof("Checking Cinder disk %q is attached.", volumeID)
if devicePath == "" {
if d, err := cloud.GetAttachmentDiskPath(instanceid, volumeID); err == nil {
devicePath = d
} else {
glog.Errorf("%q GetAttachmentDiskPath (%q) gets error %v", instanceid, volumeID, err)
}
}
if devicePath == "" {
glog.V(5).Infof("Cinder disk (%q) is not attached yet", volumeID)
} else {
probeAttachedVolume()
exists, err := pathExists(devicePath)
if exists && err == nil {
glog.Infof("Successfully found attached Cinder disk %q.", volumeID)
return devicePath, nil
} else {
//Log error, if any, and continue checking periodically
glog.Errorf("Error Stat Cinder disk (%q) is attached: %v", volumeID, err)
}
}
case <-timer.C:
return "", fmt.Errorf("Could not find attached Cinder disk %q. Timeout waiting for mount paths to be created.", volumeID)
}
}
}
func (attacher *cinderDiskAttacher) GetDeviceMountPath(spec *volume.Spec) string {
volumeSource, _ := getVolumeSource(spec)
return makeGlobalPDName(attacher.host, volumeSource.VolumeID)
}
// FIXME: this method can be further pruned.
func (attacher *cinderDiskAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, mounter mount.Interface) error {
notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
return err
}
notMnt = true
} else {
return err
}
}
volumeSource, readOnly := getVolumeSource(spec)
options := []string{}
if readOnly {
options = append(options, "ro")
}
if notMnt {
diskMounter := &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()}
err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, options)
if err != nil {
os.Remove(deviceMountPath)
return err
}
}
return nil
}
type cinderDiskDetacher struct {
host volume.VolumeHost
}
var _ volume.Detacher = &cinderDiskDetacher{}
func (plugin *cinderPlugin) NewDetacher() (volume.Detacher, error) {
return &cinderDiskDetacher{host: plugin.host}, nil
}
func (detacher *cinderDiskDetacher) Detach(deviceMountPath string, hostName string) error {
volumeID := path.Base(deviceMountPath)
cloud, err := getCloudProvider(detacher.host.GetCloudProvider())
if err != nil {
return err
}
instances, res := cloud.Instances()
if !res {
return fmt.Errorf("failed to list openstack instances")
}
instanceid, err := instances.InstanceID(hostName)
if ind := strings.LastIndex(instanceid, "/"); ind >= 0 {
instanceid = instanceid[(ind + 1):]
}
attached, err := cloud.DiskIsAttached(volumeID, instanceid)
if err != nil {
// Log error and continue with detach
glog.Errorf(
"Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
volumeID, hostName, err)
}
if err == nil && !attached {
// Volume is already detached from node.
glog.Infof("detach operation was successful. volume %q is already detached from node %q.", volumeID, hostName)
return nil
}
if err = cloud.DetachDisk(instanceid, volumeID); err != nil {
glog.Errorf("Error detaching volume %q: %v", volumeID, err)
return err
}
glog.Infof("detatached volume %q from instance %q", volumeID, instanceid)
return nil
}
func (detacher *cinderDiskDetacher) WaitForDetach(devicePath string, timeout time.Duration) error {
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ticker.C:
glog.V(5).Infof("Checking device %q is detached.", devicePath)
if pathExists, err := pathExists(devicePath); err != nil {
return fmt.Errorf("Error checking if device path exists: %v", err)
} else if !pathExists {
return nil
}
case <-timer.C:
return fmt.Errorf("Timeout reached; PD Device %v is still attached", devicePath)
}
}
}
func (detacher *cinderDiskDetacher) UnmountDevice(deviceMountPath string, mounter mount.Interface) error {
volume := path.Base(deviceMountPath)
if err := unmountPDAndRemoveGlobalPath(deviceMountPath, mounter); err != nil {
glog.Errorf("Error unmounting %q: %v", volume, err)
}
return nil
}
func getVolumeSource(spec *volume.Spec) (*api.CinderVolumeSource, bool) {
var readOnly bool
var volumeSource *api.CinderVolumeSource
if spec.Volume != nil && spec.Volume.Cinder != nil {
volumeSource = spec.Volume.Cinder
readOnly = volumeSource.ReadOnly
} else {
volumeSource = spec.PersistentVolume.Spec.Cinder
readOnly = spec.ReadOnly
}
return volumeSource, readOnly
}
// Checks if the specified path exists
func pathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
} else if os.IsNotExist(err) {
return false, nil
} else {
return false, err
}
}
// Unmount the global mount path, which should be the only one, and delete it.
func unmountPDAndRemoveGlobalPath(globalMountPath string, mounter mount.Interface) error {
err := mounter.Unmount(globalMountPath)
os.Remove(globalMountPath)
return err
}

View File

@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/openstack"
"k8s.io/kubernetes/pkg/cloudprovider/providers/rackspace"
"k8s.io/kubernetes/pkg/types"
@ -47,6 +48,9 @@ type CinderProvider interface {
CreateVolume(name string, size int, tags *map[string]string) (volumeName string, err error)
GetDevicePath(diskId string) string
InstanceID() (string, error)
GetAttachmentDiskPath(instanceID string, diskName string) (string, error)
DiskIsAttached(diskName, instanceID string) (bool, error)
Instances() (cloudprovider.Instances, bool)
}
type cinderPlugin struct {
@ -163,6 +167,16 @@ func (plugin *cinderPlugin) newProvisionerInternal(options volume.VolumeOptions,
}, nil
}
func getCloudProvider(cloudProvider cloudprovider.Interface) (CinderProvider, error) {
if cloud, ok := cloudProvider.(*rackspace.Rackspace); ok && cloud != nil {
return cloud, nil
}
if cloud, ok := cloudProvider.(*openstack.OpenStack); ok && cloud != nil {
return cloud, nil
}
return nil, fmt.Errorf("wrong cloud type")
}
func (plugin *cinderPlugin) getCloudProvider() (CinderProvider, error) {
cloud := plugin.host.GetCloudProvider()
if cloud == nil {
@ -243,7 +257,7 @@ func (b *cinderVolumeMounter) SetUp(fsGroup *int64) error {
return b.SetUpAt(b.GetPath(), fsGroup)
}
// SetUp attaches the disk and bind mounts to the volume path.
// SetUp bind mounts to the volume path.
func (b *cinderVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
glog.V(5).Infof("Cinder SetUp %s to %s", b.pdName, dir)
@ -261,11 +275,6 @@ func (b *cinderVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
return nil
}
globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName)
if err := b.manager.AttachDisk(b, globalPDPath); err != nil {
glog.V(4).Infof("AttachDisk failed: %v", err)
return err
}
glog.V(3).Infof("Cinder volume %s attached", b.pdName)
options := []string{"bind"}
if b.readOnly {
@ -382,15 +391,6 @@ func (c *cinderVolumeUnmounter) TearDownAt(dir string) error {
}
glog.V(3).Infof("Successfully unmounted: %s\n", dir)
// If refCount is 1, then all bind mounts have been removed, and the
// remaining reference is the global mount. It is safe to detach.
if len(refs) == 1 {
if err := c.manager.DetachDisk(c); err != nil {
glog.V(4).Infof("DetachDisk failed: %v", err)
return err
}
glog.V(3).Infof("Volume %s detached", c.pdName)
}
notmnt, mntErr := c.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)

View File

@ -18,15 +18,11 @@ package cinder
import (
"fmt"
"io/ioutil"
"os"
"path"
"sync/atomic"
"testing"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/types"
@ -236,115 +232,3 @@ func TestPlugin(t *testing.T) {
t.Errorf("Deleter() failed: %v", err)
}
}
// Test a race when a volume is simultaneously SetUp and TearedDown
func TestAttachDetachRace(t *testing.T) {
tmpDir, err := ioutil.TempDir(os.TempDir(), "cinderTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
host := volumetest.NewFakeVolumeHost(tmpDir, nil, nil)
plugMgr.InitPlugins(ProbeVolumePlugins(), host)
plug, err := plugMgr.FindPluginByName("kubernetes.io/cinder")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
spec := &api.Volume{
Name: "vol1",
VolumeSource: api.VolumeSource{
Cinder: &api.CinderVolumeSource{
VolumeID: "pd",
FSType: "ext4",
},
},
}
fakeMounter := &mount.FakeMounter{}
// SetUp the volume for 1st time
mounter, err := plug.(*cinderPlugin).newMounterInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), &fakePDManager{time.Second}, fakeMounter)
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Errorf("Got a nil Mounter")
}
if err := mounter.SetUp(nil); err != nil {
t.Errorf("Expected success, got: %v", err)
}
path := mounter.GetPath()
// TearDown the 1st volume and SetUp the 2nd volume (to different pod) at the same time
mounter, err = plug.(*cinderPlugin).newMounterInternal(volume.NewSpecFromVolume(spec), types.UID("poduid2"), &fakePDManager{time.Second}, fakeMounter)
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Errorf("Got a nil Mounter")
}
unmounter, err := plug.(*cinderPlugin).newUnmounterInternal("vol1", types.UID("poduid"), &fakePDManager{time.Second}, fakeMounter)
if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err)
}
var buildComplete uint32 = 0
go func() {
glog.Infof("Attaching volume")
if err := mounter.SetUp(nil); err != nil {
t.Errorf("Expected success, got: %v", err)
}
glog.Infof("Volume attached")
atomic.AddUint32(&buildComplete, 1)
}()
// mounter is attaching the volume, which takes 1 second. Detach it in the middle of this interval
time.Sleep(time.Second / 2)
glog.Infof("Detaching volume")
if err = unmounter.TearDown(); err != nil {
t.Errorf("Expected success, got: %v", err)
}
glog.Infof("Volume detached")
// wait for the mounter to finish
for atomic.LoadUint32(&buildComplete) == 0 {
time.Sleep(time.Millisecond * 100)
}
// The volume should still be attached
devicePath := getFakeDeviceName(host, "pd")
if _, err := os.Stat(devicePath); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume detached by simultaneous TearDown: %s", path)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
// TearDown the 2nd volume
unmounter, err = plug.(*cinderPlugin).newUnmounterInternal("vol1", types.UID("poduid2"), &fakePDManager{0}, fakeMounter)
if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err)
}
if unmounter == nil {
t.Errorf("Got a nil Unmounter")
}
if err := unmounter.TearDown(); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(path); err == nil {
t.Errorf("TearDown() failed, volume path still exists: %s", path)
} else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err)
}
if _, err := os.Stat(devicePath); err == nil {
t.Errorf("TearDown() failed, volume is still attached: %s", devicePath)
} else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err)
}
}

View File

@ -53,6 +53,14 @@ func (v *mockVolumes) GetVolumeLabels(volumeName string) (map[string]string, err
return v.volumeLabels, v.volumeLabelsError
}
func (c *mockVolumes) GetDiskPath(volumeName string) (string, error) {
return "", fmt.Errorf("not implemented")
}
func (c *mockVolumes) DiskIsAttached(volumeName, instanceID string) (bool, error) {
return false, fmt.Errorf("not implemented")
}
func mockVolumeFailure(err error) *mockVolumes {
return &mockVolumes{volumeLabelsError: err}
}