diff --git a/pkg/cloudprovider/providers/openstack/openstack.go b/pkg/cloudprovider/providers/openstack/openstack.go index e19071e28f..059d3b47da 100644 --- a/pkg/cloudprovider/providers/openstack/openstack.go +++ b/pkg/cloudprovider/providers/openstack/openstack.go @@ -23,6 +23,7 @@ import ( "io" "io/ioutil" "net/http" + "path" "regexp" "strings" "time" @@ -468,6 +469,11 @@ func (i *Instances) ExternalID(name string) (string, error) { return srv.ID, nil } +// InstanceID returns the kubelet's cloud provider ID. +func (os *OpenStack) InstanceID() (string, error) { + return os.localInstanceID, nil +} + // InstanceID returns the cloud provider ID of the specified instance. func (i *Instances) InstanceID(name string) (string, error) { srv, err := getServerByName(i.compute, name) @@ -956,7 +962,7 @@ func (os *OpenStack) Routes() (cloudprovider.Routes, bool) { } // Attaches given cinder volume to the compute running kubelet -func (os *OpenStack) AttachDisk(diskName string) (string, error) { +func (os *OpenStack) AttachDisk(instanceID string, diskName string) (string, error) { disk, err := os.getVolume(diskName) if err != nil { return "", err @@ -970,8 +976,8 @@ func (os *OpenStack) AttachDisk(diskName string) (string, error) { } if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil { - if os.localInstanceID == disk.Attachments[0]["server_id"] { - glog.V(4).Infof("Disk: %q is already attached to compute: %q", diskName, os.localInstanceID) + if instanceID == disk.Attachments[0]["server_id"] { + glog.V(4).Infof("Disk: %q is already attached to compute: %q", diskName, instanceID) return disk.ID, nil } else { errMsg := fmt.Sprintf("Disk %q is attached to a different compute: %q, should be detached before proceeding", diskName, disk.Attachments[0]["server_id"]) @@ -980,19 +986,19 @@ func (os *OpenStack) AttachDisk(diskName string) (string, error) { } } // add read only flag here if possible spothanis - _, err = volumeattach.Create(cClient, os.localInstanceID, &volumeattach.CreateOpts{ + _, err = volumeattach.Create(cClient, instanceID, &volumeattach.CreateOpts{ VolumeID: disk.ID, }).Extract() if err != nil { - glog.Errorf("Failed to attach %s volume to %s compute", diskName, os.localInstanceID) + glog.Errorf("Failed to attach %s volume to %s compute", diskName, instanceID) return "", err } - glog.V(2).Infof("Successfully attached %s volume to %s compute", diskName, os.localInstanceID) + glog.V(2).Infof("Successfully attached %s volume to %s compute", diskName, instanceID) return disk.ID, nil } // Detaches given cinder volume from the compute running kubelet -func (os *OpenStack) DetachDisk(partialDiskId string) error { +func (os *OpenStack) DetachDisk(instanceID string, partialDiskId string) error { disk, err := os.getVolume(partialDiskId) if err != nil { return err @@ -1004,17 +1010,17 @@ func (os *OpenStack) DetachDisk(partialDiskId string) error { glog.Errorf("Unable to initialize nova client for region: %s", os.region) return err } - if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && os.localInstanceID == disk.Attachments[0]["server_id"] { + if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] { // This is a blocking call and effects kubelet's performance directly. // We should consider kicking it out into a separate routine, if it is bad. - err = volumeattach.Delete(cClient, os.localInstanceID, disk.ID).ExtractErr() + err = volumeattach.Delete(cClient, instanceID, disk.ID).ExtractErr() if err != nil { - glog.Errorf("Failed to delete volume %s from compute %s attached %v", disk.ID, os.localInstanceID, err) + glog.Errorf("Failed to delete volume %s from compute %s attached %v", disk.ID, instanceID, err) return err } - glog.V(2).Infof("Successfully detached volume: %s from compute: %s", disk.ID, os.localInstanceID) + glog.V(2).Infof("Successfully detached volume: %s from compute: %s", disk.ID, instanceID) } else { - errMsg := fmt.Sprintf("Disk: %s has no attachments or is not attached to compute: %s", disk.Name, os.localInstanceID) + errMsg := fmt.Sprintf("Disk: %s has no attachments or is not attached to compute: %s", disk.Name, instanceID) glog.Errorf(errMsg) return errors.New(errMsg) } @@ -1086,6 +1092,22 @@ func (os *OpenStack) CreateVolume(name string, size int, tags *map[string]string return vol.ID, err } +// GetDevicePath returns the path of an attached block storage volume, specified by its id. +func (os *OpenStack) GetDevicePath(diskId string) string { + files, _ := ioutil.ReadDir("/dev/disk/by-id/") + for _, f := range files { + if strings.Contains(f.Name(), "virtio-") { + devid_prefix := f.Name()[len("virtio-"):len(f.Name())] + if strings.Contains(diskId, devid_prefix) { + glog.V(4).Infof("Found disk attached as %q; full devicepath: %s\n", f.Name(), path.Join("/dev/disk/by-id/", f.Name())) + return path.Join("/dev/disk/by-id/", f.Name()) + } + } + } + glog.Warningf("Failed to find device for the diskid: %q\n", diskId) + return "" +} + func (os *OpenStack) DeleteVolume(volumeName string) error { sClient, err := openstack.NewBlockStorageV1(os.provider, gophercloud.EndpointOpts{ Region: os.region, diff --git a/pkg/cloudprovider/providers/openstack/openstack_test.go b/pkg/cloudprovider/providers/openstack/openstack_test.go index 7584b8723e..06f618e454 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_test.go +++ b/pkg/cloudprovider/providers/openstack/openstack_test.go @@ -268,7 +268,7 @@ func TestVolumes(t *testing.T) { WaitForVolumeStatus(t, os, vol, volumeAvailableStatus, volumeCreateTimeoutSeconds) - diskId, err := os.AttachDisk(vol) + diskId, err := os.AttachDisk(os.localInstanceID, vol) if err != nil { t.Fatalf("Cannot AttachDisk Cinder volume %s: %v", vol, err) } @@ -276,7 +276,7 @@ func TestVolumes(t *testing.T) { WaitForVolumeStatus(t, os, vol, volumeInUseStatus, volumeCreateTimeoutSeconds) - err = os.DetachDisk(vol) + err = os.DetachDisk(os.localInstanceID, vol) if err != nil { t.Fatalf("Cannot DetachDisk Cinder volume %s: %v", vol, err) } diff --git a/pkg/cloudprovider/providers/rackspace/rackspace.go b/pkg/cloudprovider/providers/rackspace/rackspace.go index 1988438162..35d4e1e3b9 100644 --- a/pkg/cloudprovider/providers/rackspace/rackspace.go +++ b/pkg/cloudprovider/providers/rackspace/rackspace.go @@ -17,18 +17,25 @@ limitations under the License. package rackspace import ( + "encoding/json" "errors" "fmt" "io" + "io/ioutil" "net" + "os" "regexp" + "strings" "time" "github.com/rackspace/gophercloud" + osvolumeattach "github.com/rackspace/gophercloud/openstack/compute/v2/extensions/volumeattach" osservers "github.com/rackspace/gophercloud/openstack/compute/v2/servers" "github.com/rackspace/gophercloud/pagination" "github.com/rackspace/gophercloud/rackspace" + "github.com/rackspace/gophercloud/rackspace/blockstorage/v1/volumes" "github.com/rackspace/gophercloud/rackspace/compute/v2/servers" + "github.com/rackspace/gophercloud/rackspace/compute/v2/volumeattach" "gopkg.in/gcfg.v1" "github.com/golang/glog" @@ -37,6 +44,7 @@ import ( ) const ProviderName = "rackspace" +const metaDataPath = "/media/configdrive/openstack/latest/meta_data.json" var ErrNotFound = errors.New("Failed to find object") var ErrMultipleResults = errors.New("Multiple results where only one expected") @@ -57,6 +65,11 @@ func (d *MyDuration) UnmarshalText(text []byte) error { return nil } +type MetaData struct { + UUID string `json:"uuid"` + Name string `json:"name"` +} + type LoadBalancerOpts struct { SubnetId string `gcfg:"subnet-id"` // required CreateMonitor bool `gcfg:"create-monitor"` @@ -88,6 +101,64 @@ type Config struct { LoadBalancer LoadBalancerOpts } +func probeNodeAddress(compute *gophercloud.ServiceClient, name string) (string, error) { + id, err := readInstanceID() + if err == nil { + srv, err := servers.Get(compute, id).Extract() + if err != nil { + return "", err + } + return getAddressByServer(srv) + } + + ip, err := getAddressByName(compute, name) + if err != nil { + return "", err + } + + return ip, nil +} + +func probeInstanceID(client *gophercloud.ServiceClient, name string) (string, error) { + // Attempt to read id from config drive. + id, err := readInstanceID() + if err == nil { + return id, nil + } + + // Attempt to get the server by the name from the API + server, err := getServerByName(client, name) + if err != nil { + return "", err + } + + return server.ID, nil +} + +func parseMetaData(file io.Reader) (string, error) { + metaDataBytes, err := ioutil.ReadAll(file) + if err != nil { + return "", fmt.Errorf("Cannot read %s: %v", file, err) + } + + metaData := MetaData{} + err = json.Unmarshal(metaDataBytes, &metaData) + if err != nil { + return "", fmt.Errorf("Cannot parse %s: %v", metaDataPath, err) + } + + return metaData.UUID, nil +} + +func readInstanceID() (string, error) { + file, err := os.Open(metaDataPath) + if err != nil { + return "", fmt.Errorf("Cannot open %s: %v", metaDataPath, err) + } + + return parseMetaData(file) +} + func init() { cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { cfg, err := readConfig(config) @@ -135,6 +206,7 @@ func newRackspace(cfg Config) (*Rackspace, error) { region: cfg.Global.Region, lbOpts: cfg.LoadBalancer, } + return &os, nil } @@ -146,9 +218,7 @@ type Instances struct { func (os *Rackspace) Instances() (cloudprovider.Instances, bool) { glog.V(2).Info("rackspace.Instances() called") - compute, err := rackspace.NewComputeV2(os.provider, gophercloud.EndpointOpts{ - Region: os.region, - }) + compute, err := os.getComputeClient() if err != nil { glog.Warningf("Failed to find compute endpoint: %v", err) return nil, false @@ -295,12 +365,7 @@ func firstAddr(netblob interface{}) string { return addr } -func getAddressByName(api *gophercloud.ServiceClient, name string) (string, error) { - srv, err := getServerByName(api, name) - if err != nil { - return "", err - } - +func getAddressByServer(srv *osservers.Server) (string, error) { var s string if s == "" { s = firstAddr(srv.Addresses["private"]) @@ -320,10 +385,19 @@ func getAddressByName(api *gophercloud.ServiceClient, name string) (string, erro return s, nil } +func getAddressByName(api *gophercloud.ServiceClient, name string) (string, error) { + srv, err := getServerByName(api, name) + if err != nil { + return "", err + } + + return getAddressByServer(srv) +} + func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) { glog.V(2).Infof("NodeAddresses(%v) called", name) - ip, err := getAddressByName(i.compute, name) + ip, err := probeNodeAddress(i.compute, name) if err != nil { return nil, err } @@ -336,12 +410,17 @@ func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) { // ExternalID returns the cloud provider ID of the specified instance (deprecated). func (i *Instances) ExternalID(name string) (string, error) { - return "", fmt.Errorf("unimplemented") + return probeInstanceID(i.compute, name) +} + +// InstanceID returns the cloud provider ID of the kubelet's instance. +func (rs *Rackspace) InstanceID() (string, error) { + return readInstanceID() } // InstanceID returns the cloud provider ID of the specified instance. func (i *Instances) InstanceID(name string) (string, error) { - return "", nil + return probeInstanceID(i.compute, name) } // InstanceType returns the type of the specified instance. @@ -355,6 +434,8 @@ func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error { // Implementation of Instances.CurrentNodeName func (i *Instances) CurrentNodeName(hostname string) (string, error) { + // Beware when changing this, nodename == hostname assumption is crucial to + // apiserver => kubelet communication. return hostname, nil } @@ -391,3 +472,143 @@ func (os *Rackspace) GetZone() (cloudprovider.Zone, error) { return cloudprovider.Zone{Region: os.region}, nil } + +// Create a volume of given size (in GiB) +func (rs *Rackspace) CreateVolume(name string, size int, tags *map[string]string) (volumeName string, err error) { + return "", errors.New("unimplemented") +} + +func (rs *Rackspace) DeleteVolume(volumeName string) error { + return errors.New("unimplemented") +} + +// Attaches given cinder volume to the compute running kubelet +func (rs *Rackspace) AttachDisk(instanceID string, diskName string) (string, error) { + disk, err := rs.getVolume(diskName) + if err != nil { + return "", err + } + + compute, err := rs.getComputeClient() + if err != nil { + return "", err + } + + if len(disk.Attachments) > 0 { + if instanceID == disk.Attachments[0]["server_id"] { + glog.V(4).Infof("Disk: %q is already attached to compute: %q", diskName, instanceID) + return disk.ID, nil + } + + errMsg := fmt.Sprintf("Disk %q is attached to a different compute: %q, should be detached before proceeding", diskName, disk.Attachments[0]["server_id"]) + glog.Errorf(errMsg) + return "", errors.New(errMsg) + } + + _, err = volumeattach.Create(compute, instanceID, &osvolumeattach.CreateOpts{ + VolumeID: disk.ID, + }).Extract() + if err != nil { + glog.Errorf("Failed to attach %s volume to %s compute", diskName, instanceID) + return "", err + } + glog.V(2).Infof("Successfully attached %s volume to %s compute", diskName, instanceID) + return disk.ID, nil +} + +// GetDevicePath returns the path of an attached block storage volume, specified by its id. +func (rs *Rackspace) GetDevicePath(diskId string) string { + volume, err := rs.getVolume(diskId) + if err != nil { + return "" + } + attachments := volume.Attachments + if len(attachments) != 1 { + glog.Warningf("Unexpected number of volume attachments on %s: %d", diskId, len(attachments)) + return "" + } + return attachments[0]["device"].(string) +} + +// Takes a partial/full disk id or diskname +func (rs *Rackspace) getVolume(diskName string) (volumes.Volume, error) { + sClient, err := rackspace.NewBlockStorageV1(rs.provider, gophercloud.EndpointOpts{ + Region: rs.region, + }) + + var volume volumes.Volume + if err != nil || sClient == nil { + glog.Errorf("Unable to initialize cinder client for region: %s", rs.region) + return volume, err + } + + err = volumes.List(sClient).EachPage(func(page pagination.Page) (bool, error) { + vols, err := volumes.ExtractVolumes(page) + if err != nil { + glog.Errorf("Failed to extract volumes: %v", err) + return false, err + } + + for _, v := range vols { + glog.V(4).Infof("%s %s %v", v.ID, v.Name, v.Attachments) + if v.Name == diskName || strings.Contains(v.ID, diskName) { + volume = v + return true, nil + } + } + + // if it reached here then no disk with the given name was found. + errmsg := fmt.Sprintf("Unable to find disk: %s in region %s", diskName, rs.region) + return false, errors.New(errmsg) + }) + if err != nil { + glog.Errorf("Error occured getting volume: %s", diskName) + } + return volume, err +} + +func (rs *Rackspace) getComputeClient() (*gophercloud.ServiceClient, error) { + client, err := rackspace.NewComputeV2(rs.provider, gophercloud.EndpointOpts{ + Region: rs.region, + }) + if err != nil || client == nil { + glog.Errorf("Unable to initialize nova client for region: %s", rs.region) + } + return client, nil +} + +// Detaches given cinder volume from the compute running kubelet +func (rs *Rackspace) DetachDisk(instanceID string, partialDiskId string) error { + disk, err := rs.getVolume(partialDiskId) + if err != nil { + return err + } + + compute, err := rs.getComputeClient() + if err != nil { + return err + } + + if len(disk.Attachments) > 1 { + // Rackspace does not support "multiattach", this is a sanity check. + errmsg := fmt.Sprintf("Volume %s is attached to multiple instances, which is not supported by this provider.", disk.ID) + return errors.New(errmsg) + } + + if len(disk.Attachments) > 0 && instanceID == disk.Attachments[0]["server_id"] { + // This is a blocking call and effects kubelet's performance directly. + // We should consider kicking it out into a separate routine, if it is bad. + err = volumeattach.Delete(compute, instanceID, disk.ID).ExtractErr() + if err != nil { + glog.Errorf("Failed to delete volume %s from compute %s attached %v", disk.ID, instanceID, err) + return err + } + glog.V(2).Infof("Successfully detached volume: %s from compute: %s", disk.ID, instanceID) + } else { + errMsg := fmt.Sprintf("Disk: %s has no attachments or is not attached to compute: %s", disk.Name, instanceID) + glog.Errorf(errMsg) + return errors.New(errMsg) + } + + return nil +} diff --git a/pkg/cloudprovider/providers/rackspace/rackspace_test.go b/pkg/cloudprovider/providers/rackspace/rackspace_test.go index bc37383108..1b00b13306 100644 --- a/pkg/cloudprovider/providers/rackspace/rackspace_test.go +++ b/pkg/cloudprovider/providers/rackspace/rackspace_test.go @@ -107,6 +107,27 @@ func configFromEnv() (cfg Config, ok bool) { return } +func TestParseMetaData(t *testing.T) { + _, err := parseMetaData(strings.NewReader("")) + if err == nil { + t.Errorf("Should fail when invalid meta data is provided: %s", err) + } + + id, err := parseMetaData(strings.NewReader(` + { + "UUID":"someuuid", + "name":"somename", + "project_id":"someprojectid" + } + `)) + if err != nil { + t.Fatalf("Should succeed when valid meta data is provided: %s", err) + } + if id != "someuuid" { + t.Errorf("incorrect uuid: %s", id) + } +} + func TestNewRackspace(t *testing.T) { cfg, ok := configFromEnv() if !ok { diff --git a/pkg/volume/cinder/cinder.go b/pkg/volume/cinder/cinder.go index f4db4d1d39..b4dcaedb8a 100644 --- a/pkg/volume/cinder/cinder.go +++ b/pkg/volume/cinder/cinder.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/cloudprovider/providers/openstack" + "k8s.io/kubernetes/pkg/cloudprovider/providers/rackspace" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/keymutex" @@ -39,6 +40,15 @@ func ProbeVolumePlugins() []volume.VolumePlugin { return []volume.VolumePlugin{&cinderPlugin{}} } +type CinderProvider interface { + AttachDisk(instanceID string, diskName string) (string, error) + DetachDisk(instanceID string, partialDiskId string) error + DeleteVolume(volumeName string) error + CreateVolume(name string, size int, tags *map[string]string) (volumeName string, err error) + GetDevicePath(diskId string) string + InstanceID() (string, error) +} + type cinderPlugin struct { host volume.VolumeHost // Guarding SetUp and TearDown operations @@ -153,18 +163,21 @@ func (plugin *cinderPlugin) newProvisionerInternal(options volume.VolumeOptions, }, nil } -func (plugin *cinderPlugin) getCloudProvider() (*openstack.OpenStack, error) { +func (plugin *cinderPlugin) getCloudProvider() (CinderProvider, error) { cloud := plugin.host.GetCloudProvider() if cloud == nil { glog.Errorf("Cloud provider not initialized properly") return nil, errors.New("Cloud provider not initialized properly") } - os := cloud.(*openstack.OpenStack) - if os == nil { - return nil, errors.New("Invalid cloud provider: expected OpenStack") + switch cloud := cloud.(type) { + case *rackspace.Rackspace: + return cloud, nil + case *openstack.OpenStack: + return cloud, nil + default: + return nil, errors.New("Invalid cloud provider: expected OpenStack or Rackspace.") } - return os, nil } // Abstract interface to PD operations. diff --git a/pkg/volume/cinder/cinder_util.go b/pkg/volume/cinder/cinder_util.go index 8159dfd6f5..8f513cb6a1 100644 --- a/pkg/volume/cinder/cinder_util.go +++ b/pkg/volume/cinder/cinder_util.go @@ -18,10 +18,7 @@ package cinder import ( "errors" - "io/ioutil" "os" - "path" - "strings" "time" "github.com/golang/glog" @@ -42,7 +39,11 @@ func (util *CinderDiskUtil) AttachDisk(b *cinderVolumeMounter, globalPDPath stri if err != nil { return err } - diskid, err := cloud.AttachDisk(b.pdName) + instanceid, err := cloud.InstanceID() + if err != nil { + return err + } + diskid, err := cloud.AttachDisk(instanceid, b.pdName) if err != nil { return err } @@ -50,8 +51,7 @@ func (util *CinderDiskUtil) AttachDisk(b *cinderVolumeMounter, globalPDPath stri var devicePath string numTries := 0 for { - devicePath = makeDevicePath(diskid) - // probe the attached vol so that symlink in /dev/disk/by-id is created + devicePath = cloud.GetDevicePath(diskid) probeAttachedVolume() _, err := os.Stat(devicePath) @@ -89,21 +89,6 @@ func (util *CinderDiskUtil) AttachDisk(b *cinderVolumeMounter, globalPDPath stri return nil } -func makeDevicePath(diskid string) string { - files, _ := ioutil.ReadDir("/dev/disk/by-id/") - for _, f := range files { - if strings.Contains(f.Name(), "virtio-") { - devid_prefix := f.Name()[len("virtio-"):len(f.Name())] - if strings.Contains(diskid, devid_prefix) { - glog.V(4).Infof("Found disk attached as %q; full devicepath: %s\n", f.Name(), path.Join("/dev/disk/by-id/", f.Name())) - return path.Join("/dev/disk/by-id/", f.Name()) - } - } - } - glog.Warningf("Failed to find device for the diskid: %q\n", diskid) - return "" -} - // Unmounts the device and detaches the disk from the kubelet's host machine. func (util *CinderDiskUtil) DetachDisk(cd *cinderVolumeUnmounter) error { globalPDPath := makeGlobalPDName(cd.plugin.host, cd.pdName) @@ -119,8 +104,11 @@ func (util *CinderDiskUtil) DetachDisk(cd *cinderVolumeUnmounter) error { if err != nil { return err } - - if err = cloud.DetachDisk(cd.pdName); err != nil { + instanceid, err := cloud.InstanceID() + if err != nil { + return err + } + if err = cloud.DetachDisk(instanceid, cd.pdName); err != nil { return err } glog.V(2).Infof("Successfully detached cinder volume %s", cd.pdName)