mirror of https://github.com/k3s-io/k3s
AWS: Update copy-paste of GCE PD code to latest version
We are (sadly) using a copy-and-paste of the GCE PD code for AWS EBS. This code hasn't been updated in a while, and it seems that the GCE code has some code to make volume mounting more robust that we should copy.pull/6/head
parent
0dc25f545f
commit
1ae1db6027
|
@ -158,11 +158,13 @@ type Volumes interface {
|
|||
AttachDisk(instanceName string, volumeName string, readOnly bool) (string, error)
|
||||
// Detach the disk from the specified instance
|
||||
// instanceName can be empty to mean "the instance on which we are running"
|
||||
DetachDisk(instanceName string, volumeName string) error
|
||||
// Returns the device where the volume was attached
|
||||
DetachDisk(instanceName string, volumeName string) (string, error)
|
||||
|
||||
// Create a volume with the specified options
|
||||
CreateVolume(volumeOptions *VolumeOptions) (volumeName string, err error)
|
||||
DeleteVolume(volumeName string) error
|
||||
CreateDisk(volumeOptions *VolumeOptions) (volumeName string, err error)
|
||||
// Delete the specified volume
|
||||
DeleteDisk(volumeName string) error
|
||||
|
||||
// Get labels to apply to volume on creation
|
||||
GetVolumeLabels(volumeName string) (map[string]string, error)
|
||||
|
@ -201,6 +203,8 @@ type AWSCloud struct {
|
|||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
var _ Volumes = &AWSCloud{}
|
||||
|
||||
type AWSCloudConfig struct {
|
||||
Global struct {
|
||||
// TODO: Is there any use for this? We can get it from the instance metadata service
|
||||
|
@ -901,7 +905,7 @@ func (self *awsInstance) getInfo() (*ec2.Instance, error) {
|
|||
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
|
||||
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
|
||||
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
|
||||
func (self *awsInstance) getMountDevice(volumeID string) (assigned mountDevice, alreadyAttached bool, err error) {
|
||||
func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
|
||||
instanceType := self.getInstanceType()
|
||||
if instanceType == nil {
|
||||
return "", false, fmt.Errorf("could not get instance type for instance: %s", self.awsID)
|
||||
|
@ -939,11 +943,17 @@ func (self *awsInstance) getMountDevice(volumeID string) (assigned mountDevice,
|
|||
// Check to see if this volume is already assigned a device on this machine
|
||||
for mountDevice, mappingVolumeID := range self.deviceMappings {
|
||||
if volumeID == mappingVolumeID {
|
||||
glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID)
|
||||
if assign {
|
||||
glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID)
|
||||
}
|
||||
return mountDevice, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
if !assign {
|
||||
return mountDevice(""), false, nil
|
||||
}
|
||||
|
||||
// Check all the valid mountpoints to see if any of them are free
|
||||
valid := instanceType.getEBSMountDevices()
|
||||
chosen := mountDevice("")
|
||||
|
@ -1172,7 +1182,7 @@ func (c *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly boo
|
|||
return "", errors.New("AWS volumes cannot be mounted read-only")
|
||||
}
|
||||
|
||||
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID)
|
||||
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, true)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -1220,15 +1230,25 @@ func (c *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly boo
|
|||
}
|
||||
|
||||
// Implements Volumes.DetachDisk
|
||||
func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
||||
func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) (string, error) {
|
||||
disk, err := newAWSDisk(aws, diskName)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
awsInstance, err := aws.getAwsInstance(instanceName)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, false)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if !alreadyAttached {
|
||||
glog.Warning("DetachDisk called on non-attached disk: ", diskName)
|
||||
// TODO: Continue? Tolerate non-attached error in DetachVolume?
|
||||
}
|
||||
|
||||
request := ec2.DetachVolumeInput{
|
||||
|
@ -1238,12 +1258,16 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
|||
|
||||
response, err := aws.ec2.DetachVolume(&request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error detaching EBS volume: %v", err)
|
||||
return "", fmt.Errorf("error detaching EBS volume: %v", err)
|
||||
}
|
||||
if response == nil {
|
||||
return errors.New("no response from DetachVolume")
|
||||
return "", errors.New("no response from DetachVolume")
|
||||
}
|
||||
|
||||
// TODO: Fix this - just remove the cache?
|
||||
// If we don't have a cache; we don't have to wait any more (the driver does it for us)
|
||||
// Also, maybe we could get the locally connected drivers from the AWS metadata service?
|
||||
|
||||
// At this point we are waiting for the volume being detached. This
|
||||
// releases the volume and invalidates the cache even when there is a timeout.
|
||||
//
|
||||
|
@ -1253,6 +1277,7 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
|||
// works though. An option would be to completely flush the cache upon timeouts.
|
||||
//
|
||||
defer func() {
|
||||
// TODO: Not thread safe?
|
||||
for mountDevice, existingVolumeID := range awsInstance.deviceMappings {
|
||||
if existingVolumeID == disk.awsID {
|
||||
awsInstance.releaseMountDevice(disk.awsID, mountDevice)
|
||||
|
@ -1263,14 +1288,15 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
|||
|
||||
err = disk.waitForAttachmentStatus("detached")
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
return err
|
||||
hostDevicePath := "/dev/xvd" + string(mountDevice)
|
||||
return hostDevicePath, err
|
||||
}
|
||||
|
||||
// Implements Volumes.CreateVolume
|
||||
func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
|
||||
func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
|
||||
// TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?)
|
||||
|
||||
request := &ec2.CreateVolumeInput{}
|
||||
|
@ -1302,7 +1328,7 @@ func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
|
|||
tagRequest.Tags = tags
|
||||
if _, err := s.createTags(tagRequest); err != nil {
|
||||
// delete the volume and hope it succeeds
|
||||
delerr := s.DeleteVolume(volumeName)
|
||||
delerr := s.DeleteDisk(volumeName)
|
||||
if delerr != nil {
|
||||
// delete did not succeed, we have a stray volume!
|
||||
return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr)
|
||||
|
@ -1313,8 +1339,8 @@ func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
|
|||
return volumeName, nil
|
||||
}
|
||||
|
||||
// Implements Volumes.DeleteVolume
|
||||
func (aws *AWSCloud) DeleteVolume(volumeName string) error {
|
||||
// Implements Volumes.DeleteDisk
|
||||
func (aws *AWSCloud) DeleteDisk(volumeName string) error {
|
||||
awsDisk, err := newAWSDisk(aws, volumeName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -27,7 +27,6 @@ import (
|
|||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
|
@ -100,15 +99,15 @@ func (plugin *awsElasticBlockStorePlugin) newBuilderInternal(spec *volume.Spec,
|
|||
|
||||
return &awsElasticBlockStoreBuilder{
|
||||
awsElasticBlockStore: &awsElasticBlockStore{
|
||||
podUID: podUID,
|
||||
volName: spec.Name(),
|
||||
volumeID: volumeID,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
podUID: podUID,
|
||||
volName: spec.Name(),
|
||||
volumeID: volumeID,
|
||||
partition: partition,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
},
|
||||
fsType: fsType,
|
||||
partition: partition,
|
||||
readOnly: readOnly,
|
||||
diskMounter: &mount.SafeFormatAndMount{plugin.host.GetMounter(), exec.New()}}, nil
|
||||
}
|
||||
|
@ -181,6 +180,8 @@ type awsElasticBlockStore struct {
|
|||
podUID types.UID
|
||||
// Unique id of the PD, used to find the disk resource in the provider.
|
||||
volumeID string
|
||||
// Specifies the partition to mount
|
||||
partition string
|
||||
// Utility interface that provides API calls to the provider to attach/detach disks.
|
||||
manager ebsManager
|
||||
// Mounter interface that provides system calls to mount the global path to the pod local path.
|
||||
|
@ -196,22 +197,10 @@ func detachDiskLogError(ebs *awsElasticBlockStore) {
|
|||
}
|
||||
}
|
||||
|
||||
// getVolumeProvider returns the AWS Volumes interface
|
||||
func (ebs *awsElasticBlockStore) getVolumeProvider() (awscloud.Volumes, error) {
|
||||
cloud := ebs.plugin.host.GetCloudProvider()
|
||||
volumes, ok := cloud.(awscloud.Volumes)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Cloud provider does not support volumes")
|
||||
}
|
||||
return volumes, nil
|
||||
}
|
||||
|
||||
type awsElasticBlockStoreBuilder struct {
|
||||
*awsElasticBlockStore
|
||||
// Filesystem type, optional.
|
||||
fsType string
|
||||
// Specifies the partition to mount
|
||||
partition string
|
||||
// Specifies whether the disk will be attached as read-only.
|
||||
readOnly bool
|
||||
// diskMounter provides the interface that is used to mount the actual block device.
|
||||
|
@ -304,6 +293,7 @@ func makeGlobalPDPath(host volume.VolumeHost, volumeID string) string {
|
|||
return path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts", name)
|
||||
}
|
||||
|
||||
// Reverses the mapping done in makeGlobalPDPath
|
||||
func getVolumeIDFromGlobalMount(host volume.VolumeHost, globalPath string) (string, error) {
|
||||
basePath := path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts")
|
||||
rel, err := filepath.Rel(basePath, globalPath)
|
||||
|
|
|
@ -68,11 +68,12 @@ func TestGetAccessModes(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
|
||||
if !contains(plug.GetAccessModes(), api.ReadWriteOnce) {
|
||||
t.Errorf("Expected to find AccessMode: %s", api.ReadWriteOnce)
|
||||
t.Errorf("Expected to support AccessModeTypes: %s", api.ReadWriteOnce)
|
||||
}
|
||||
if len(plug.GetAccessModes()) != 1 {
|
||||
t.Errorf("Expected to find exactly one AccessMode")
|
||||
if contains(plug.GetAccessModes(), api.ReadOnlyMany) {
|
||||
t.Errorf("Expected not to support AccessModeTypes: %s", api.ReadOnlyMany)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,7 +86,10 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA
|
|||
return false
|
||||
}
|
||||
|
||||
type fakePDManager struct{}
|
||||
type fakePDManager struct {
|
||||
attachCalled bool
|
||||
detachCalled bool
|
||||
}
|
||||
|
||||
// TODO(jonesdl) To fully test this, we could create a loopback device
|
||||
// and mount that instead.
|
||||
|
@ -95,6 +99,10 @@ func (fake *fakePDManager) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, gl
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fake.attachCalled = true
|
||||
// Simulate the global mount so that the fakeMounter returns the
|
||||
// expected number of mounts for the attached disk.
|
||||
b.mounter.Mount(globalPath, globalPath, b.fsType, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -104,6 +112,7 @@ func (fake *fakePDManager) DetachDisk(c *awsElasticBlockStoreCleaner) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fake.detachCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -121,7 +130,7 @@ func (fake *fakePDManager) DeleteVolume(cd *awsElasticBlockStoreDeleter) error {
|
|||
func TestPlugin(t *testing.T) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("awsebsTest")
|
||||
if err != nil {
|
||||
t.Fatalf("can't make a temp dir: %v")
|
||||
t.Fatalf("can't make a temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
|
@ -140,13 +149,16 @@ func TestPlugin(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
builder, err := plug.(*awsElasticBlockStorePlugin).newBuilderInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
|
||||
fakeManager := &fakePDManager{}
|
||||
fakeMounter := &mount.FakeMounter{}
|
||||
builder, err := plug.(*awsElasticBlockStorePlugin).newBuilderInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), fakeManager, fakeMounter)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Builder: %v", err)
|
||||
}
|
||||
if builder == nil {
|
||||
t.Errorf("Got a nil Builder")
|
||||
}
|
||||
|
||||
volPath := path.Join(tmpDir, "pods/poduid/volumes/kubernetes.io~aws-ebs/vol1")
|
||||
path := builder.GetPath()
|
||||
if path != volPath {
|
||||
|
@ -170,8 +182,12 @@ func TestPlugin(t *testing.T) {
|
|||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
||||
if !fakeManager.attachCalled {
|
||||
t.Errorf("Attach watch not called")
|
||||
}
|
||||
|
||||
cleaner, err := plug.(*awsElasticBlockStorePlugin).newCleanerInternal("vol1", types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
|
||||
fakeManager = &fakePDManager{}
|
||||
cleaner, err := plug.(*awsElasticBlockStorePlugin).newCleanerInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Cleaner: %v", err)
|
||||
}
|
||||
|
@ -187,9 +203,12 @@ func TestPlugin(t *testing.T) {
|
|||
} else if !os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
if !fakeManager.detachCalled {
|
||||
t.Errorf("Detach watch not called")
|
||||
}
|
||||
|
||||
// Test Provisioner
|
||||
cap := resource.MustParse("100Gi")
|
||||
cap := resource.MustParse("100Mi")
|
||||
options := volume.VolumeOptions{
|
||||
Capacity: cap,
|
||||
AccessModes: []api.PersistentVolumeAccessMode{
|
||||
|
|
|
@ -17,47 +17,58 @@ limitations under the License.
|
|||
package aws_ebs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
aws_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
const (
|
||||
diskPartitionSuffix = ""
|
||||
diskXVDPath = "/dev/xvd"
|
||||
diskXVDPattern = "/dev/xvd*"
|
||||
maxChecks = 60
|
||||
maxRetries = 10
|
||||
checkSleepDuration = time.Second
|
||||
errorSleepDuration = 5 * time.Second
|
||||
)
|
||||
|
||||
// Singleton key mutex for keeping attach/detach operations for the same PD atomic
|
||||
var attachDetachMutex = keymutex.NewKeyMutex()
|
||||
|
||||
type AWSDiskUtil struct{}
|
||||
|
||||
// Attaches a disk specified by a volume.AWSElasticBlockStore to the current kubelet.
|
||||
// Attaches a disk to the current kubelet.
|
||||
// Mounts the disk to it's global path.
|
||||
func (util *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error {
|
||||
volumes, err := b.getVolumeProvider()
|
||||
func (diskUtil *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error {
|
||||
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Will block for existing operations, if any. (globalPDPath=%q)\r\n", b.volumeID, globalPDPath)
|
||||
|
||||
// Block execution until any pending detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(b.volumeID)
|
||||
defer attachDetachMutex.UnlockKey(b.volumeID)
|
||||
|
||||
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Awake and ready to execute. (globalPDPath=%q)\r\n", b.volumeID, globalPDPath)
|
||||
|
||||
xvdBefore, err := filepath.Glob(diskXVDPattern)
|
||||
if err != nil {
|
||||
glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskXVDPattern, err)
|
||||
}
|
||||
xvdBeforeSet := sets.NewString(xvdBefore...)
|
||||
|
||||
devicePath, err := attachDiskAndVerify(b, xvdBeforeSet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
devicePath, err := volumes.AttachDisk("", b.volumeID, b.readOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b.partition != "" {
|
||||
devicePath = devicePath + b.partition
|
||||
}
|
||||
//TODO(jonesdl) There should probably be better method than busy-waiting here.
|
||||
numTries := 0
|
||||
for {
|
||||
_, err := os.Stat(devicePath)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
numTries++
|
||||
if numTries == 10 {
|
||||
return errors.New("Could not attach disk: Timeout after 10s (" + devicePath + ")")
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
// Only mount the PD globally once.
|
||||
notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath)
|
||||
|
@ -87,64 +98,292 @@ func (util *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, glob
|
|||
|
||||
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
||||
func (util *AWSDiskUtil) DetachDisk(c *awsElasticBlockStoreCleaner) error {
|
||||
// Unmount the global PD mount, which should be the only one.
|
||||
globalPDPath := makeGlobalPDPath(c.plugin.host, c.volumeID)
|
||||
if err := c.mounter.Unmount(globalPDPath); err != nil {
|
||||
glog.V(2).Info("Error unmount dir ", globalPDPath, ": ", err)
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(globalPDPath); err != nil {
|
||||
glog.V(2).Info("Error removing dir ", globalPDPath, ": ", err)
|
||||
return err
|
||||
}
|
||||
// Detach the disk
|
||||
volumes, err := c.getVolumeProvider()
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error getting volume provider for volumeID ", c.volumeID, ": ", err)
|
||||
return err
|
||||
}
|
||||
if err := volumes.DetachDisk("", c.volumeID); err != nil {
|
||||
glog.V(2).Info("Error detaching disk ", c.volumeID, ": ", err)
|
||||
return err
|
||||
glog.V(5).Infof("DetachDisk(...) for PD %q\r\n", c.volumeID)
|
||||
|
||||
if err := unmountPDAndRemoveGlobalPath(c); err != nil {
|
||||
glog.Errorf("Error unmounting PD %q: %v", c.volumeID, err)
|
||||
}
|
||||
|
||||
// Detach disk asynchronously so that the kubelet sync loop is not blocked.
|
||||
go detachDiskAndVerify(c)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (util *AWSDiskUtil) DeleteVolume(d *awsElasticBlockStoreDeleter) error {
|
||||
volumes, err := d.getVolumeProvider()
|
||||
cloud, err := getCloudProvider()
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error getting volume provider: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := volumes.DeleteVolume(d.volumeID); err != nil {
|
||||
glog.V(2).Infof("Error deleting AWS EBS volume %s: %v", d.volumeID, err)
|
||||
if err = cloud.DeleteDisk(d.volumeID); err != nil {
|
||||
glog.V(2).Infof("Error deleting EBS Disk volume %s: %v", d.volumeID, err)
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Successfully deleted AWS EBS volume %s", d.volumeID)
|
||||
glog.V(2).Infof("Successfully deleted EBS Disk volume %s", d.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, err error) {
|
||||
volumes, err := c.getVolumeProvider()
|
||||
cloud, err := getCloudProvider()
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error getting volume provider: ", err)
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
requestBytes := c.options.Capacity.Value()
|
||||
// AWS works with gigabytes, convert to GiB with rounding up
|
||||
requestGB := int(volume.RoundUpSize(requestBytes, 1024*1024*1024))
|
||||
volSpec := &aws_cloud.VolumeOptions{
|
||||
CapacityGB: requestGB,
|
||||
Tags: c.options.CloudTags,
|
||||
}
|
||||
// The cloud provider works with gigabytes, convert to GiB with rounding up
|
||||
requestGB := volume.RoundUpSize(requestBytes, 1024*1024*1024)
|
||||
|
||||
name, err := volumes.CreateVolume(volSpec)
|
||||
volumeOptions := &aws.VolumeOptions{}
|
||||
volumeOptions.CapacityGB = int(requestGB)
|
||||
|
||||
name, err := cloud.CreateDisk(volumeOptions)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Error creating AWS EBS volume: %v", err)
|
||||
glog.V(2).Infof("Error creating EBS Disk volume: %v", err)
|
||||
return "", 0, err
|
||||
}
|
||||
glog.V(2).Infof("Successfully created AWS EBS volume %s", name)
|
||||
return name, requestGB, nil
|
||||
glog.V(2).Infof("Successfully created EBS Disk volume %s", name)
|
||||
return name, int(requestGB), nil
|
||||
}
|
||||
|
||||
// Attaches the specified persistent disk device to node, verifies that it is attached, and retries if it fails.
|
||||
func attachDiskAndVerify(b *awsElasticBlockStoreBuilder, xvdBeforeSet sets.String) (string, error) {
|
||||
var awsCloud *aws.AWSCloud
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
var err error
|
||||
if awsCloud == nil {
|
||||
awsCloud, err = getCloudProvider()
|
||||
if err != nil || awsCloud == nil {
|
||||
// Retry on error. See issue #11321
|
||||
glog.Errorf("Error getting AWSCloudProvider while detaching PD %q: %v", b.volumeID, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if numRetries > 0 {
|
||||
glog.Warningf("Retrying attach for EBS Disk %q (retry count=%v).", b.volumeID, numRetries)
|
||||
}
|
||||
|
||||
devicePath, err := awsCloud.AttachDisk(b.volumeID, b.plugin.host.GetHostName(), b.readOnly)
|
||||
if err != nil {
|
||||
glog.Errorf("Error attaching PD %q: %v", b.volumeID, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
devicePaths := getDiskByIdPaths(b.awsElasticBlockStore, devicePath)
|
||||
|
||||
for numChecks := 0; numChecks < maxChecks; numChecks++ {
|
||||
path, err := verifyDevicePath(devicePaths, xvdBeforeSet)
|
||||
if err != nil {
|
||||
// Log error, if any, and continue checking periodically. See issue #11321
|
||||
glog.Errorf("Error verifying EBS Disk (%q) is attached: %v", b.volumeID, err)
|
||||
} else if path != "" {
|
||||
// A device path has successfully been created for the PD
|
||||
glog.Infof("Successfully attached EBS Disk %q.", b.volumeID)
|
||||
return path, nil
|
||||
}
|
||||
|
||||
// Sleep then check again
|
||||
glog.V(3).Infof("Waiting for EBS Disk %q to attach.", b.volumeID)
|
||||
time.Sleep(checkSleepDuration)
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("Could not attach EBS Disk %q. Timeout waiting for mount paths to be created.", b.volumeID)
|
||||
}
|
||||
|
||||
// Returns the first path that exists, or empty string if none exist.
|
||||
func verifyDevicePath(devicePaths []string, xvdBeforeSet sets.String) (string, error) {
|
||||
if err := udevadmChangeToNewDrives(xvdBeforeSet); err != nil {
|
||||
// udevadm errors should not block disk detachment, log and continue
|
||||
glog.Errorf("udevadmChangeToNewDrives failed with: %v", err)
|
||||
}
|
||||
|
||||
for _, path := range devicePaths {
|
||||
if pathExists, err := pathExists(path); err != nil {
|
||||
return "", fmt.Errorf("Error checking if path exists: %v", err)
|
||||
} else if pathExists {
|
||||
return path, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails.
|
||||
// This function is intended to be called asynchronously as a go routine.
|
||||
func detachDiskAndVerify(c *awsElasticBlockStoreCleaner) {
|
||||
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.volumeID)
|
||||
defer util.HandleCrash()
|
||||
|
||||
// Block execution until any pending attach/detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(c.volumeID)
|
||||
defer attachDetachMutex.UnlockKey(c.volumeID)
|
||||
|
||||
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Awake and ready to execute.", c.volumeID)
|
||||
|
||||
var awsCloud *aws.AWSCloud
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
var err error
|
||||
if awsCloud == nil {
|
||||
awsCloud, err = getCloudProvider()
|
||||
if err != nil || awsCloud == nil {
|
||||
// Retry on error. See issue #11321
|
||||
glog.Errorf("Error getting AWSCloudProvider while detaching PD %q: %v", c.volumeID, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if numRetries > 0 {
|
||||
glog.Warningf("Retrying detach for EBS Disk %q (retry count=%v).", c.volumeID, numRetries)
|
||||
}
|
||||
|
||||
devicePath, err := awsCloud.DetachDisk(c.volumeID, c.plugin.host.GetHostName())
|
||||
if err != nil {
|
||||
glog.Errorf("Error detaching PD %q: %v", c.volumeID, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
devicePaths := getDiskByIdPaths(c.awsElasticBlockStore, devicePath)
|
||||
|
||||
for numChecks := 0; numChecks < maxChecks; numChecks++ {
|
||||
allPathsRemoved, err := verifyAllPathsRemoved(devicePaths)
|
||||
if err != nil {
|
||||
// Log error, if any, and continue checking periodically.
|
||||
glog.Errorf("Error verifying EBS Disk (%q) is detached: %v", c.volumeID, err)
|
||||
} else if allPathsRemoved {
|
||||
// All paths to the PD have been succefully removed
|
||||
unmountPDAndRemoveGlobalPath(c)
|
||||
glog.Infof("Successfully detached EBS Disk %q.", c.volumeID)
|
||||
return
|
||||
}
|
||||
|
||||
// Sleep then check again
|
||||
glog.V(3).Infof("Waiting for EBS Disk %q to detach.", c.volumeID)
|
||||
time.Sleep(checkSleepDuration)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
glog.Errorf("Failed to detach EBS Disk %q. One or more mount paths was not removed.", c.volumeID)
|
||||
}
|
||||
|
||||
// Unmount the global PD mount, which should be the only one, and delete it.
|
||||
func unmountPDAndRemoveGlobalPath(c *awsElasticBlockStoreCleaner) error {
|
||||
globalPDPath := makeGlobalPDPath(c.plugin.host, c.volumeID)
|
||||
|
||||
err := c.mounter.Unmount(globalPDPath)
|
||||
os.Remove(globalPDPath)
|
||||
return err
|
||||
}
|
||||
|
||||
// Returns the first path that exists, or empty string if none exist.
|
||||
func verifyAllPathsRemoved(devicePaths []string) (bool, error) {
|
||||
allPathsRemoved := true
|
||||
for _, path := range devicePaths {
|
||||
if err := udevadmChangeToDrive(path); err != nil {
|
||||
// udevadm errors should not block disk detachment, log and continue
|
||||
glog.Errorf("%v", err)
|
||||
}
|
||||
if exists, err := pathExists(path); err != nil {
|
||||
return false, fmt.Errorf("Error checking if path exists: %v", err)
|
||||
} else {
|
||||
allPathsRemoved = allPathsRemoved && !exists
|
||||
}
|
||||
}
|
||||
|
||||
return allPathsRemoved, nil
|
||||
}
|
||||
|
||||
// Returns list of all paths for given EBS mount
|
||||
// This is more interesting on GCE (where we are able to identify volumes under /dev/disk-by-id)
|
||||
// Here it is mostly about applying the partition path
|
||||
func getDiskByIdPaths(d *awsElasticBlockStore, devicePath string) []string {
|
||||
devicePaths := []string{}
|
||||
if devicePath != "" {
|
||||
devicePaths = append(devicePaths, devicePath)
|
||||
}
|
||||
|
||||
if d.partition != "" {
|
||||
for i, path := range devicePaths {
|
||||
devicePaths[i] = path + diskPartitionSuffix + d.partition
|
||||
}
|
||||
}
|
||||
|
||||
return devicePaths
|
||||
}
|
||||
|
||||
// Checks if the specified path exists
|
||||
func pathExists(path string) (bool, error) {
|
||||
_, err := os.Stat(path)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
} else if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
} else {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// Return cloud provider
|
||||
func getCloudProvider() (*aws.AWSCloud, error) {
|
||||
awsCloudProvider, err := cloudprovider.GetCloudProvider("aws", nil)
|
||||
if err != nil || awsCloudProvider == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The conversion must be safe otherwise bug in GetCloudProvider()
|
||||
return awsCloudProvider.(*aws.AWSCloud), nil
|
||||
}
|
||||
|
||||
// TODO: This udev code is copy-and-paste from the gce_pd provider; refactor
|
||||
|
||||
// Calls "udevadm trigger --action=change" for newly created "/dev/xvd*" drives (exist only in after set).
|
||||
// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed.
|
||||
func udevadmChangeToNewDrives(xvdBeforeSet sets.String) error {
|
||||
xvdAfter, err := filepath.Glob(diskXVDPattern)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskXVDPattern, err)
|
||||
}
|
||||
|
||||
for _, xvd := range xvdAfter {
|
||||
if !xvdBeforeSet.Has(xvd) {
|
||||
return udevadmChangeToDrive(xvd)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calls "udevadm trigger --action=change" on the specified drive.
|
||||
// drivePath must be the the block device path to trigger on, in the format "/dev/sd*", or a symlink to it.
|
||||
// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed.
|
||||
func udevadmChangeToDrive(drivePath string) error {
|
||||
glog.V(5).Infof("udevadmChangeToDrive: drive=%q", drivePath)
|
||||
|
||||
// Evaluate symlink, if any
|
||||
drive, err := filepath.EvalSymlinks(drivePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("udevadmChangeToDrive: filepath.EvalSymlinks(%q) failed with %v.", drivePath, err)
|
||||
}
|
||||
glog.V(5).Infof("udevadmChangeToDrive: symlink path is %q", drive)
|
||||
|
||||
// Check to make sure input is "/dev/xvd*"
|
||||
if !strings.Contains(drive, diskXVDPath) {
|
||||
return fmt.Errorf("udevadmChangeToDrive: expected input in the form \"%s\" but drive is %q.", diskXVDPattern, drive)
|
||||
}
|
||||
|
||||
// Call "udevadm trigger --action=change --property-match=DEVNAME=/dev/sd..."
|
||||
_, err = exec.New().Command(
|
||||
"udevadm",
|
||||
"trigger",
|
||||
"--action=change",
|
||||
fmt.Sprintf("--property-match=DEVNAME=%s", drive)).CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("udevadmChangeToDrive: udevadm trigger failed for drive %q with %v.", drive, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -37,11 +37,11 @@ func (v *mockVolumes) AttachDisk(instanceName string, volumeName string, readOnl
|
|||
return "", fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (v *mockVolumes) DetachDisk(instanceName string, volumeName string) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
func (v *mockVolumes) DetachDisk(instanceName string, volumeName string) (string, error) {
|
||||
return "", fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (v *mockVolumes) CreateVolume(volumeOptions *aws.VolumeOptions) (volumeName string, err error) {
|
||||
func (v *mockVolumes) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName string, err error) {
|
||||
return "", fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
|
|
|
@ -326,7 +326,7 @@ func createPD() (string, error) {
|
|||
}
|
||||
volumeOptions := &awscloud.VolumeOptions{}
|
||||
volumeOptions.CapacityGB = 10
|
||||
return volumes.CreateVolume(volumeOptions)
|
||||
return volumes.CreateDisk(volumeOptions)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -353,7 +353,7 @@ func deletePD(pdName string) error {
|
|||
if !ok {
|
||||
return fmt.Errorf("Provider does not support volumes")
|
||||
}
|
||||
return volumes.DeleteVolume(pdName)
|
||||
return volumes.DeleteDisk(pdName)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -383,7 +383,8 @@ func detachPD(hostName, pdName string) error {
|
|||
if !ok {
|
||||
return fmt.Errorf("Provider does not support volumes")
|
||||
}
|
||||
return volumes.DetachDisk(hostName, pdName)
|
||||
_, err := volumes.DetachDisk(hostName, pdName)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue