mirror of https://github.com/k3s-io/k3s
Merge pull request #22023 from mkulke/rackspace-improvements
Automatic merge from submit-queue Rackspace improvements (OpenStack Cinder) This adds PV support via Cinder on Rackspace clusters. Rackspace Cloud Block Storage is pretty much vanilla OpenStack Cinder, so there is no need for a separate Volume Plugin. Instead I refactored the Cinder/OpenStack interaction a bit (by introducing a CinderProvider Interface and moving the device path detection logic to the OpenStack part). Right now this is limited to `AttachDisk` and `DetachDisk`. Creation and deletion of Block Storage is not in scope of this PR. Also the `ExternalID` and `InstanceID` cloud provider methods have been implemented for Rackspace.pull/6/head
commit
06160b6abe
|
@ -23,6 +23,7 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -468,6 +469,11 @@ func (i *Instances) ExternalID(name string) (string, error) {
|
|||
return srv.ID, nil
|
||||
}
|
||||
|
||||
// InstanceID returns the kubelet's cloud provider ID.
|
||||
func (os *OpenStack) InstanceID() (string, error) {
|
||||
return os.localInstanceID, nil
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the specified instance.
|
||||
func (i *Instances) InstanceID(name string) (string, error) {
|
||||
srv, err := getServerByName(i.compute, name)
|
||||
|
@ -956,7 +962,7 @@ func (os *OpenStack) Routes() (cloudprovider.Routes, bool) {
|
|||
}
|
||||
|
||||
// Attaches given cinder volume to the compute running kubelet
|
||||
func (os *OpenStack) AttachDisk(diskName string) (string, error) {
|
||||
func (os *OpenStack) AttachDisk(instanceID string, diskName string) (string, error) {
|
||||
disk, err := os.getVolume(diskName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -970,8 +976,8 @@ func (os *OpenStack) AttachDisk(diskName string) (string, error) {
|
|||
}
|
||||
|
||||
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil {
|
||||
if os.localInstanceID == disk.Attachments[0]["server_id"] {
|
||||
glog.V(4).Infof("Disk: %q is already attached to compute: %q", diskName, os.localInstanceID)
|
||||
if instanceID == disk.Attachments[0]["server_id"] {
|
||||
glog.V(4).Infof("Disk: %q is already attached to compute: %q", diskName, instanceID)
|
||||
return disk.ID, nil
|
||||
} else {
|
||||
errMsg := fmt.Sprintf("Disk %q is attached to a different compute: %q, should be detached before proceeding", diskName, disk.Attachments[0]["server_id"])
|
||||
|
@ -980,19 +986,19 @@ func (os *OpenStack) AttachDisk(diskName string) (string, error) {
|
|||
}
|
||||
}
|
||||
// add read only flag here if possible spothanis
|
||||
_, err = volumeattach.Create(cClient, os.localInstanceID, &volumeattach.CreateOpts{
|
||||
_, err = volumeattach.Create(cClient, instanceID, &volumeattach.CreateOpts{
|
||||
VolumeID: disk.ID,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to attach %s volume to %s compute", diskName, os.localInstanceID)
|
||||
glog.Errorf("Failed to attach %s volume to %s compute", diskName, instanceID)
|
||||
return "", err
|
||||
}
|
||||
glog.V(2).Infof("Successfully attached %s volume to %s compute", diskName, os.localInstanceID)
|
||||
glog.V(2).Infof("Successfully attached %s volume to %s compute", diskName, instanceID)
|
||||
return disk.ID, nil
|
||||
}
|
||||
|
||||
// Detaches given cinder volume from the compute running kubelet
|
||||
func (os *OpenStack) DetachDisk(partialDiskId string) error {
|
||||
func (os *OpenStack) DetachDisk(instanceID string, partialDiskId string) error {
|
||||
disk, err := os.getVolume(partialDiskId)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1004,17 +1010,17 @@ func (os *OpenStack) DetachDisk(partialDiskId string) error {
|
|||
glog.Errorf("Unable to initialize nova client for region: %s", os.region)
|
||||
return err
|
||||
}
|
||||
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && os.localInstanceID == disk.Attachments[0]["server_id"] {
|
||||
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] {
|
||||
// This is a blocking call and effects kubelet's performance directly.
|
||||
// We should consider kicking it out into a separate routine, if it is bad.
|
||||
err = volumeattach.Delete(cClient, os.localInstanceID, disk.ID).ExtractErr()
|
||||
err = volumeattach.Delete(cClient, instanceID, disk.ID).ExtractErr()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete volume %s from compute %s attached %v", disk.ID, os.localInstanceID, err)
|
||||
glog.Errorf("Failed to delete volume %s from compute %s attached %v", disk.ID, instanceID, err)
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Successfully detached volume: %s from compute: %s", disk.ID, os.localInstanceID)
|
||||
glog.V(2).Infof("Successfully detached volume: %s from compute: %s", disk.ID, instanceID)
|
||||
} else {
|
||||
errMsg := fmt.Sprintf("Disk: %s has no attachments or is not attached to compute: %s", disk.Name, os.localInstanceID)
|
||||
errMsg := fmt.Sprintf("Disk: %s has no attachments or is not attached to compute: %s", disk.Name, instanceID)
|
||||
glog.Errorf(errMsg)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
|
@ -1086,6 +1092,22 @@ func (os *OpenStack) CreateVolume(name string, size int, tags *map[string]string
|
|||
return vol.ID, err
|
||||
}
|
||||
|
||||
// GetDevicePath returns the path of an attached block storage volume, specified by its id.
|
||||
func (os *OpenStack) GetDevicePath(diskId string) string {
|
||||
files, _ := ioutil.ReadDir("/dev/disk/by-id/")
|
||||
for _, f := range files {
|
||||
if strings.Contains(f.Name(), "virtio-") {
|
||||
devid_prefix := f.Name()[len("virtio-"):len(f.Name())]
|
||||
if strings.Contains(diskId, devid_prefix) {
|
||||
glog.V(4).Infof("Found disk attached as %q; full devicepath: %s\n", f.Name(), path.Join("/dev/disk/by-id/", f.Name()))
|
||||
return path.Join("/dev/disk/by-id/", f.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.Warningf("Failed to find device for the diskid: %q\n", diskId)
|
||||
return ""
|
||||
}
|
||||
|
||||
func (os *OpenStack) DeleteVolume(volumeName string) error {
|
||||
sClient, err := openstack.NewBlockStorageV1(os.provider, gophercloud.EndpointOpts{
|
||||
Region: os.region,
|
||||
|
|
|
@ -268,7 +268,7 @@ func TestVolumes(t *testing.T) {
|
|||
|
||||
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus, volumeCreateTimeoutSeconds)
|
||||
|
||||
diskId, err := os.AttachDisk(vol)
|
||||
diskId, err := os.AttachDisk(os.localInstanceID, vol)
|
||||
if err != nil {
|
||||
t.Fatalf("Cannot AttachDisk Cinder volume %s: %v", vol, err)
|
||||
}
|
||||
|
@ -276,7 +276,7 @@ func TestVolumes(t *testing.T) {
|
|||
|
||||
WaitForVolumeStatus(t, os, vol, volumeInUseStatus, volumeCreateTimeoutSeconds)
|
||||
|
||||
err = os.DetachDisk(vol)
|
||||
err = os.DetachDisk(os.localInstanceID, vol)
|
||||
if err != nil {
|
||||
t.Fatalf("Cannot DetachDisk Cinder volume %s: %v", vol, err)
|
||||
}
|
||||
|
|
|
@ -17,18 +17,25 @@ limitations under the License.
|
|||
package rackspace
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rackspace/gophercloud"
|
||||
osvolumeattach "github.com/rackspace/gophercloud/openstack/compute/v2/extensions/volumeattach"
|
||||
osservers "github.com/rackspace/gophercloud/openstack/compute/v2/servers"
|
||||
"github.com/rackspace/gophercloud/pagination"
|
||||
"github.com/rackspace/gophercloud/rackspace"
|
||||
"github.com/rackspace/gophercloud/rackspace/blockstorage/v1/volumes"
|
||||
"github.com/rackspace/gophercloud/rackspace/compute/v2/servers"
|
||||
"github.com/rackspace/gophercloud/rackspace/compute/v2/volumeattach"
|
||||
"gopkg.in/gcfg.v1"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -37,6 +44,7 @@ import (
|
|||
)
|
||||
|
||||
const ProviderName = "rackspace"
|
||||
const metaDataPath = "/media/configdrive/openstack/latest/meta_data.json"
|
||||
|
||||
var ErrNotFound = errors.New("Failed to find object")
|
||||
var ErrMultipleResults = errors.New("Multiple results where only one expected")
|
||||
|
@ -57,6 +65,11 @@ func (d *MyDuration) UnmarshalText(text []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type MetaData struct {
|
||||
UUID string `json:"uuid"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type LoadBalancerOpts struct {
|
||||
SubnetId string `gcfg:"subnet-id"` // required
|
||||
CreateMonitor bool `gcfg:"create-monitor"`
|
||||
|
@ -88,6 +101,64 @@ type Config struct {
|
|||
LoadBalancer LoadBalancerOpts
|
||||
}
|
||||
|
||||
func probeNodeAddress(compute *gophercloud.ServiceClient, name string) (string, error) {
|
||||
id, err := readInstanceID()
|
||||
if err == nil {
|
||||
srv, err := servers.Get(compute, id).Extract()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return getAddressByServer(srv)
|
||||
}
|
||||
|
||||
ip, err := getAddressByName(compute, name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return ip, nil
|
||||
}
|
||||
|
||||
func probeInstanceID(client *gophercloud.ServiceClient, name string) (string, error) {
|
||||
// Attempt to read id from config drive.
|
||||
id, err := readInstanceID()
|
||||
if err == nil {
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// Attempt to get the server by the name from the API
|
||||
server, err := getServerByName(client, name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return server.ID, nil
|
||||
}
|
||||
|
||||
func parseMetaData(file io.Reader) (string, error) {
|
||||
metaDataBytes, err := ioutil.ReadAll(file)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Cannot read %s: %v", file, err)
|
||||
}
|
||||
|
||||
metaData := MetaData{}
|
||||
err = json.Unmarshal(metaDataBytes, &metaData)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Cannot parse %s: %v", metaDataPath, err)
|
||||
}
|
||||
|
||||
return metaData.UUID, nil
|
||||
}
|
||||
|
||||
func readInstanceID() (string, error) {
|
||||
file, err := os.Open(metaDataPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Cannot open %s: %v", metaDataPath, err)
|
||||
}
|
||||
|
||||
return parseMetaData(file)
|
||||
}
|
||||
|
||||
func init() {
|
||||
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) {
|
||||
cfg, err := readConfig(config)
|
||||
|
@ -135,6 +206,7 @@ func newRackspace(cfg Config) (*Rackspace, error) {
|
|||
region: cfg.Global.Region,
|
||||
lbOpts: cfg.LoadBalancer,
|
||||
}
|
||||
|
||||
return &os, nil
|
||||
}
|
||||
|
||||
|
@ -146,9 +218,7 @@ type Instances struct {
|
|||
func (os *Rackspace) Instances() (cloudprovider.Instances, bool) {
|
||||
glog.V(2).Info("rackspace.Instances() called")
|
||||
|
||||
compute, err := rackspace.NewComputeV2(os.provider, gophercloud.EndpointOpts{
|
||||
Region: os.region,
|
||||
})
|
||||
compute, err := os.getComputeClient()
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to find compute endpoint: %v", err)
|
||||
return nil, false
|
||||
|
@ -295,12 +365,7 @@ func firstAddr(netblob interface{}) string {
|
|||
return addr
|
||||
}
|
||||
|
||||
func getAddressByName(api *gophercloud.ServiceClient, name string) (string, error) {
|
||||
srv, err := getServerByName(api, name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
func getAddressByServer(srv *osservers.Server) (string, error) {
|
||||
var s string
|
||||
if s == "" {
|
||||
s = firstAddr(srv.Addresses["private"])
|
||||
|
@ -320,10 +385,19 @@ func getAddressByName(api *gophercloud.ServiceClient, name string) (string, erro
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func getAddressByName(api *gophercloud.ServiceClient, name string) (string, error) {
|
||||
srv, err := getServerByName(api, name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return getAddressByServer(srv)
|
||||
}
|
||||
|
||||
func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) {
|
||||
glog.V(2).Infof("NodeAddresses(%v) called", name)
|
||||
|
||||
ip, err := getAddressByName(i.compute, name)
|
||||
ip, err := probeNodeAddress(i.compute, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -336,12 +410,17 @@ func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) {
|
|||
|
||||
// ExternalID returns the cloud provider ID of the specified instance (deprecated).
|
||||
func (i *Instances) ExternalID(name string) (string, error) {
|
||||
return "", fmt.Errorf("unimplemented")
|
||||
return probeInstanceID(i.compute, name)
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the kubelet's instance.
|
||||
func (rs *Rackspace) InstanceID() (string, error) {
|
||||
return readInstanceID()
|
||||
}
|
||||
|
||||
// InstanceID returns the cloud provider ID of the specified instance.
|
||||
func (i *Instances) InstanceID(name string) (string, error) {
|
||||
return "", nil
|
||||
return probeInstanceID(i.compute, name)
|
||||
}
|
||||
|
||||
// InstanceType returns the type of the specified instance.
|
||||
|
@ -355,6 +434,8 @@ func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error {
|
|||
|
||||
// Implementation of Instances.CurrentNodeName
|
||||
func (i *Instances) CurrentNodeName(hostname string) (string, error) {
|
||||
// Beware when changing this, nodename == hostname assumption is crucial to
|
||||
// apiserver => kubelet communication.
|
||||
return hostname, nil
|
||||
}
|
||||
|
||||
|
@ -391,3 +472,143 @@ func (os *Rackspace) GetZone() (cloudprovider.Zone, error) {
|
|||
|
||||
return cloudprovider.Zone{Region: os.region}, nil
|
||||
}
|
||||
|
||||
// Create a volume of given size (in GiB)
|
||||
func (rs *Rackspace) CreateVolume(name string, size int, tags *map[string]string) (volumeName string, err error) {
|
||||
return "", errors.New("unimplemented")
|
||||
}
|
||||
|
||||
func (rs *Rackspace) DeleteVolume(volumeName string) error {
|
||||
return errors.New("unimplemented")
|
||||
}
|
||||
|
||||
// Attaches given cinder volume to the compute running kubelet
|
||||
func (rs *Rackspace) AttachDisk(instanceID string, diskName string) (string, error) {
|
||||
disk, err := rs.getVolume(diskName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
compute, err := rs.getComputeClient()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(disk.Attachments) > 0 {
|
||||
if instanceID == disk.Attachments[0]["server_id"] {
|
||||
glog.V(4).Infof("Disk: %q is already attached to compute: %q", diskName, instanceID)
|
||||
return disk.ID, nil
|
||||
}
|
||||
|
||||
errMsg := fmt.Sprintf("Disk %q is attached to a different compute: %q, should be detached before proceeding", diskName, disk.Attachments[0]["server_id"])
|
||||
glog.Errorf(errMsg)
|
||||
return "", errors.New(errMsg)
|
||||
}
|
||||
|
||||
_, err = volumeattach.Create(compute, instanceID, &osvolumeattach.CreateOpts{
|
||||
VolumeID: disk.ID,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to attach %s volume to %s compute", diskName, instanceID)
|
||||
return "", err
|
||||
}
|
||||
glog.V(2).Infof("Successfully attached %s volume to %s compute", diskName, instanceID)
|
||||
return disk.ID, nil
|
||||
}
|
||||
|
||||
// GetDevicePath returns the path of an attached block storage volume, specified by its id.
|
||||
func (rs *Rackspace) GetDevicePath(diskId string) string {
|
||||
volume, err := rs.getVolume(diskId)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
attachments := volume.Attachments
|
||||
if len(attachments) != 1 {
|
||||
glog.Warningf("Unexpected number of volume attachments on %s: %d", diskId, len(attachments))
|
||||
return ""
|
||||
}
|
||||
return attachments[0]["device"].(string)
|
||||
}
|
||||
|
||||
// Takes a partial/full disk id or diskname
|
||||
func (rs *Rackspace) getVolume(diskName string) (volumes.Volume, error) {
|
||||
sClient, err := rackspace.NewBlockStorageV1(rs.provider, gophercloud.EndpointOpts{
|
||||
Region: rs.region,
|
||||
})
|
||||
|
||||
var volume volumes.Volume
|
||||
if err != nil || sClient == nil {
|
||||
glog.Errorf("Unable to initialize cinder client for region: %s", rs.region)
|
||||
return volume, err
|
||||
}
|
||||
|
||||
err = volumes.List(sClient).EachPage(func(page pagination.Page) (bool, error) {
|
||||
vols, err := volumes.ExtractVolumes(page)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to extract volumes: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, v := range vols {
|
||||
glog.V(4).Infof("%s %s %v", v.ID, v.Name, v.Attachments)
|
||||
if v.Name == diskName || strings.Contains(v.ID, diskName) {
|
||||
volume = v
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// if it reached here then no disk with the given name was found.
|
||||
errmsg := fmt.Sprintf("Unable to find disk: %s in region %s", diskName, rs.region)
|
||||
return false, errors.New(errmsg)
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("Error occured getting volume: %s", diskName)
|
||||
}
|
||||
return volume, err
|
||||
}
|
||||
|
||||
func (rs *Rackspace) getComputeClient() (*gophercloud.ServiceClient, error) {
|
||||
client, err := rackspace.NewComputeV2(rs.provider, gophercloud.EndpointOpts{
|
||||
Region: rs.region,
|
||||
})
|
||||
if err != nil || client == nil {
|
||||
glog.Errorf("Unable to initialize nova client for region: %s", rs.region)
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// Detaches given cinder volume from the compute running kubelet
|
||||
func (rs *Rackspace) DetachDisk(instanceID string, partialDiskId string) error {
|
||||
disk, err := rs.getVolume(partialDiskId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
compute, err := rs.getComputeClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(disk.Attachments) > 1 {
|
||||
// Rackspace does not support "multiattach", this is a sanity check.
|
||||
errmsg := fmt.Sprintf("Volume %s is attached to multiple instances, which is not supported by this provider.", disk.ID)
|
||||
return errors.New(errmsg)
|
||||
}
|
||||
|
||||
if len(disk.Attachments) > 0 && instanceID == disk.Attachments[0]["server_id"] {
|
||||
// This is a blocking call and effects kubelet's performance directly.
|
||||
// We should consider kicking it out into a separate routine, if it is bad.
|
||||
err = volumeattach.Delete(compute, instanceID, disk.ID).ExtractErr()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete volume %s from compute %s attached %v", disk.ID, instanceID, err)
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Successfully detached volume: %s from compute: %s", disk.ID, instanceID)
|
||||
} else {
|
||||
errMsg := fmt.Sprintf("Disk: %s has no attachments or is not attached to compute: %s", disk.Name, instanceID)
|
||||
glog.Errorf(errMsg)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -107,6 +107,27 @@ func configFromEnv() (cfg Config, ok bool) {
|
|||
return
|
||||
}
|
||||
|
||||
func TestParseMetaData(t *testing.T) {
|
||||
_, err := parseMetaData(strings.NewReader(""))
|
||||
if err == nil {
|
||||
t.Errorf("Should fail when invalid meta data is provided: %s", err)
|
||||
}
|
||||
|
||||
id, err := parseMetaData(strings.NewReader(`
|
||||
{
|
||||
"UUID":"someuuid",
|
||||
"name":"somename",
|
||||
"project_id":"someprojectid"
|
||||
}
|
||||
`))
|
||||
if err != nil {
|
||||
t.Fatalf("Should succeed when valid meta data is provided: %s", err)
|
||||
}
|
||||
if id != "someuuid" {
|
||||
t.Errorf("incorrect uuid: %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewRackspace(t *testing.T) {
|
||||
cfg, ok := configFromEnv()
|
||||
if !ok {
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/openstack"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/rackspace"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||
|
@ -39,6 +40,15 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
|
|||
return []volume.VolumePlugin{&cinderPlugin{}}
|
||||
}
|
||||
|
||||
type CinderProvider interface {
|
||||
AttachDisk(instanceID string, diskName string) (string, error)
|
||||
DetachDisk(instanceID string, partialDiskId string) error
|
||||
DeleteVolume(volumeName string) error
|
||||
CreateVolume(name string, size int, tags *map[string]string) (volumeName string, err error)
|
||||
GetDevicePath(diskId string) string
|
||||
InstanceID() (string, error)
|
||||
}
|
||||
|
||||
type cinderPlugin struct {
|
||||
host volume.VolumeHost
|
||||
// Guarding SetUp and TearDown operations
|
||||
|
@ -153,18 +163,21 @@ func (plugin *cinderPlugin) newProvisionerInternal(options volume.VolumeOptions,
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (plugin *cinderPlugin) getCloudProvider() (*openstack.OpenStack, error) {
|
||||
func (plugin *cinderPlugin) getCloudProvider() (CinderProvider, error) {
|
||||
cloud := plugin.host.GetCloudProvider()
|
||||
if cloud == nil {
|
||||
glog.Errorf("Cloud provider not initialized properly")
|
||||
return nil, errors.New("Cloud provider not initialized properly")
|
||||
}
|
||||
|
||||
os := cloud.(*openstack.OpenStack)
|
||||
if os == nil {
|
||||
return nil, errors.New("Invalid cloud provider: expected OpenStack")
|
||||
switch cloud := cloud.(type) {
|
||||
case *rackspace.Rackspace:
|
||||
return cloud, nil
|
||||
case *openstack.OpenStack:
|
||||
return cloud, nil
|
||||
default:
|
||||
return nil, errors.New("Invalid cloud provider: expected OpenStack or Rackspace.")
|
||||
}
|
||||
return os, nil
|
||||
}
|
||||
|
||||
// Abstract interface to PD operations.
|
||||
|
|
|
@ -18,10 +18,7 @@ package cinder
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -42,7 +39,11 @@ func (util *CinderDiskUtil) AttachDisk(b *cinderVolumeMounter, globalPDPath stri
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
diskid, err := cloud.AttachDisk(b.pdName)
|
||||
instanceid, err := cloud.InstanceID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
diskid, err := cloud.AttachDisk(instanceid, b.pdName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -50,8 +51,7 @@ func (util *CinderDiskUtil) AttachDisk(b *cinderVolumeMounter, globalPDPath stri
|
|||
var devicePath string
|
||||
numTries := 0
|
||||
for {
|
||||
devicePath = makeDevicePath(diskid)
|
||||
// probe the attached vol so that symlink in /dev/disk/by-id is created
|
||||
devicePath = cloud.GetDevicePath(diskid)
|
||||
probeAttachedVolume()
|
||||
|
||||
_, err := os.Stat(devicePath)
|
||||
|
@ -89,21 +89,6 @@ func (util *CinderDiskUtil) AttachDisk(b *cinderVolumeMounter, globalPDPath stri
|
|||
return nil
|
||||
}
|
||||
|
||||
func makeDevicePath(diskid string) string {
|
||||
files, _ := ioutil.ReadDir("/dev/disk/by-id/")
|
||||
for _, f := range files {
|
||||
if strings.Contains(f.Name(), "virtio-") {
|
||||
devid_prefix := f.Name()[len("virtio-"):len(f.Name())]
|
||||
if strings.Contains(diskid, devid_prefix) {
|
||||
glog.V(4).Infof("Found disk attached as %q; full devicepath: %s\n", f.Name(), path.Join("/dev/disk/by-id/", f.Name()))
|
||||
return path.Join("/dev/disk/by-id/", f.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.Warningf("Failed to find device for the diskid: %q\n", diskid)
|
||||
return ""
|
||||
}
|
||||
|
||||
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
||||
func (util *CinderDiskUtil) DetachDisk(cd *cinderVolumeUnmounter) error {
|
||||
globalPDPath := makeGlobalPDName(cd.plugin.host, cd.pdName)
|
||||
|
@ -119,8 +104,11 @@ func (util *CinderDiskUtil) DetachDisk(cd *cinderVolumeUnmounter) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = cloud.DetachDisk(cd.pdName); err != nil {
|
||||
instanceid, err := cloud.InstanceID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = cloud.DetachDisk(instanceid, cd.pdName); err != nil {
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Successfully detached cinder volume %s", cd.pdName)
|
||||
|
|
Loading…
Reference in New Issue