mirror of https://github.com/k3s-io/k3s
Adding support for Block Volume to rbd plugin
parent
8971a516ed
commit
335c5d959f
|
@ -31,6 +31,7 @@ go_library(
|
|||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -37,12 +37,16 @@ import (
|
|||
type diskManager interface {
|
||||
// MakeGlobalPDName creates global persistent disk path.
|
||||
MakeGlobalPDName(disk rbd) string
|
||||
// MakeGlobalVDPDName creates global block disk path.
|
||||
MakeGlobalVDPDName(disk rbd) string
|
||||
// Attaches the disk to the kubelet's host machine.
|
||||
// If it successfully attaches, the path to the device
|
||||
// is returned. Otherwise, an error will be returned.
|
||||
AttachDisk(disk rbdMounter) (string, error)
|
||||
// Detaches the disk from the kubelet's host machine.
|
||||
DetachDisk(plugin *rbdPlugin, deviceMountPath string, device string) error
|
||||
// Detaches the block disk from the kubelet's host machine.
|
||||
DetachBlockDisk(disk rbdDiskUnmapper, mntPath string) error
|
||||
// Creates a rbd image.
|
||||
CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error)
|
||||
// Deletes a rbd image.
|
||||
|
|
|
@ -18,6 +18,8 @@ package rbd
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
dstrings "strings"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -55,6 +57,7 @@ var _ volume.DeletableVolumePlugin = &rbdPlugin{}
|
|||
var _ volume.ProvisionableVolumePlugin = &rbdPlugin{}
|
||||
var _ volume.AttachableVolumePlugin = &rbdPlugin{}
|
||||
var _ volume.ExpandableVolumePlugin = &rbdPlugin{}
|
||||
var _ volume.BlockVolumePlugin = &rbdPlugin{}
|
||||
|
||||
const (
|
||||
rbdPluginName = "kubernetes.io/rbd"
|
||||
|
@ -368,6 +371,127 @@ func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol
|
|||
return volume.NewSpecFromVolume(rbdVolume), nil
|
||||
}
|
||||
|
||||
func (plugin *rbdPlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeName, mapPath string) (*volume.Spec, error) {
|
||||
pluginDir := plugin.host.GetVolumeDevicePluginDir(rbdPluginName)
|
||||
blkutil := volutil.NewBlockVolumePathHandler()
|
||||
|
||||
globalMapPathUUID, err := blkutil.FindGlobalMapPathUUIDFromPod(pluginDir, mapPath, podUID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(5).Infof("globalMapPathUUID: %v, err: %v", globalMapPathUUID, err)
|
||||
globalMapPath := filepath.Dir(globalMapPathUUID)
|
||||
if len(globalMapPath) == 1 {
|
||||
return nil, fmt.Errorf("failed to retreive volume plugin information from globalMapPathUUID: %v", globalMapPathUUID)
|
||||
}
|
||||
return getVolumeSpecFromGlobalMapPath(globalMapPath)
|
||||
}
|
||||
|
||||
func getVolumeSpecFromGlobalMapPath(globalMapPath string) (*volume.Spec, error) {
|
||||
// Retreive volume spec information from globalMapPath
|
||||
// globalMapPath example:
|
||||
// plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumePluginDependentPath}
|
||||
pool, image, err := getPoolAndImageFromMapPath(globalMapPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
block := v1.PersistentVolumeBlock
|
||||
rbdVolume := &v1.PersistentVolume{
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
PersistentVolumeSource: v1.PersistentVolumeSource{
|
||||
RBD: &v1.RBDPersistentVolumeSource{
|
||||
RBDImage: image,
|
||||
RBDPool: pool,
|
||||
},
|
||||
},
|
||||
VolumeMode: &block,
|
||||
},
|
||||
}
|
||||
|
||||
return volume.NewSpecFromPersistentVolume(rbdVolume, true), nil
|
||||
}
|
||||
|
||||
func (plugin *rbdPlugin) NewBlockVolumeMapper(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
|
||||
|
||||
var uid types.UID
|
||||
if pod != nil {
|
||||
uid = pod.UID
|
||||
}
|
||||
secret := ""
|
||||
// var err error
|
||||
if pod != nil {
|
||||
secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(secretName) > 0 && len(secretNs) > 0 {
|
||||
// if secret is provideded, retrieve it
|
||||
kubeClient := plugin.host.GetKubeClient()
|
||||
if kubeClient == nil {
|
||||
return nil, fmt.Errorf("Cannot get kube client")
|
||||
}
|
||||
secrets, err := kubeClient.Core().Secrets(secretNs).Get(secretName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Couldn't get secret %v/%v err: %v", secretNs, secretName, err)
|
||||
return nil, err
|
||||
}
|
||||
for _, data := range secrets.Data {
|
||||
secret = string(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return plugin.newBlockVolumeMapperInternal(spec, uid, &RBDUtil{}, secret, plugin.host.GetMounter(plugin.GetPluginName()), plugin.host.GetExec(plugin.GetPluginName()))
|
||||
}
|
||||
|
||||
func (plugin *rbdPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUID types.UID, manager diskManager, secret string, mounter mount.Interface, exec mount.Exec) (volume.BlockVolumeMapper, error) {
|
||||
mon, err := getVolumeSourceMonitors(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
img, err := getVolumeSourceImage(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pool, err := getVolumeSourcePool(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
id, err := getVolumeSourceUser(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keyring, err := getVolumeSourceKeyRing(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ro, err := getVolumeSourceReadOnly(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &rbdDiskMapper{
|
||||
rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager),
|
||||
mon: mon,
|
||||
id: id,
|
||||
keyring: keyring,
|
||||
secret: secret,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (plugin *rbdPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
|
||||
return plugin.newUnmapperInternal(volName, podUID, &RBDUtil{})
|
||||
}
|
||||
|
||||
func (plugin *rbdPlugin) newUnmapperInternal(volName string, podUID types.UID, manager diskManager) (volume.BlockVolumeUnmapper, error) {
|
||||
return &rbdDiskUnmapper{
|
||||
rbdDiskMapper: &rbdDiskMapper{
|
||||
rbd: newRBD(podUID, volName, "", "", false, plugin, manager),
|
||||
mon: make([]string, 0),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (plugin *rbdPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
|
||||
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.RBD == nil {
|
||||
return nil, fmt.Errorf("spec.PersistentVolumeSource.Spec.RBD is nil")
|
||||
|
@ -661,6 +785,134 @@ func (c *rbdUnmounter) TearDownAt(dir string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
var _ volume.BlockVolumeMapper = &rbdDiskMapper{}
|
||||
|
||||
type rbdDiskMapper struct {
|
||||
*rbd
|
||||
mon []string
|
||||
id string
|
||||
keyring string
|
||||
secret string
|
||||
adminSecret string
|
||||
adminId string
|
||||
imageFormat string
|
||||
imageFeatures []string
|
||||
}
|
||||
|
||||
var _ volume.BlockVolumeUnmapper = &rbdDiskUnmapper{}
|
||||
|
||||
// GetGlobalMapPath returns global map path and error
|
||||
// path: plugins/kubernetes.io/{PluginName}/volumeDevices/{rbd pool}-image-{rbd image-name}/{podUid}
|
||||
func (rbd *rbd) GetGlobalMapPath(spec *volume.Spec) (string, error) {
|
||||
return rbd.rbdGlobalMapPath(spec)
|
||||
}
|
||||
|
||||
// GetPodDeviceMapPath returns pod device map path and volume name
|
||||
// path: pods/{podUid}/volumeDevices/kubernetes.io~rbd
|
||||
// volumeName: pv0001
|
||||
func (rbd *rbd) GetPodDeviceMapPath() (string, string) {
|
||||
return rbd.rbdPodDeviceMapPath()
|
||||
}
|
||||
|
||||
func (rbd *rbdDiskMapper) SetUpDevice() (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (rbd *rbd) rbdGlobalMapPath(spec *volume.Spec) (string, error) {
|
||||
var err error
|
||||
mon, err := getVolumeSourceMonitors(spec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
img, err := getVolumeSourceImage(spec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
pool, err := getVolumeSourcePool(spec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ro, err := getVolumeSourceReadOnly(spec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
mounter := &rbdMounter{
|
||||
rbd: newRBD("", spec.Name(), img, pool, ro, rbd.plugin, &RBDUtil{}),
|
||||
Mon: mon,
|
||||
}
|
||||
return rbd.manager.MakeGlobalVDPDName(*mounter.rbd), nil
|
||||
}
|
||||
|
||||
func (rbd *rbd) rbdPodDeviceMapPath() (string, string) {
|
||||
name := rbdPluginName
|
||||
return rbd.plugin.host.GetPodVolumeDeviceDir(rbd.podUID, strings.EscapeQualifiedNameForDisk(name)), rbd.volName
|
||||
}
|
||||
|
||||
type rbdDiskUnmapper struct {
|
||||
*rbdDiskMapper
|
||||
}
|
||||
|
||||
func getPoolAndImageFromMapPath(mapPath string) (string, string, error) {
|
||||
|
||||
pathParts := dstrings.Split(mapPath, "/")
|
||||
if len(pathParts) < 2 {
|
||||
return "", "", fmt.Errorf("corrupted mapPath")
|
||||
}
|
||||
rbdParts := dstrings.Split(pathParts[len(pathParts)-1], "-image-")
|
||||
|
||||
if len(rbdParts) < 2 {
|
||||
return "", "", fmt.Errorf("corrupted mapPath")
|
||||
}
|
||||
return string(rbdParts[0]), string(rbdParts[1]), nil
|
||||
}
|
||||
|
||||
func getBlockVolumeDevice(mapPath string) (string, error) {
|
||||
pool, image, err := getPoolAndImageFromMapPath(mapPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// Getting full device path
|
||||
device, found := getDevFromImageAndPool(pool, image)
|
||||
if !found {
|
||||
return "", err
|
||||
}
|
||||
return device, nil
|
||||
}
|
||||
|
||||
func (rbd *rbdDiskUnmapper) TearDownDevice(mapPath, _ string) error {
|
||||
|
||||
device, err := getBlockVolumeDevice(mapPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rbd: failed to get loopback for device: %v, err: %v", device, err)
|
||||
}
|
||||
blkUtil := volutil.NewBlockVolumePathHandler()
|
||||
loop, err := volutil.BlockVolumePathHandler.GetLoopDevice(blkUtil, device)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rbd: failed to get loopback for device: %v, err: %v", device, err)
|
||||
}
|
||||
// Remove loop device before detaching volume since volume detach operation gets busy if volume is opened by loopback.
|
||||
err = volutil.BlockVolumePathHandler.RemoveLoopDevice(blkUtil, loop)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rbd: failed to remove loopback :%v, err: %v", loop, err)
|
||||
}
|
||||
glog.V(4).Infof("rbd: successfully removed loop device: %s", loop)
|
||||
|
||||
err = rbd.manager.DetachBlockDisk(*rbd, mapPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rbd: failed to detach disk: %s\nError: %v", mapPath, err)
|
||||
}
|
||||
glog.V(4).Infof("rbd: %q is unmapped, deleting the directory", mapPath)
|
||||
|
||||
err = os.RemoveAll(mapPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rbd: failed to delete the directory: %s\nError: %v", mapPath, err)
|
||||
}
|
||||
glog.V(4).Infof("rbd: successfully detached disk: %s", mapPath)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getVolumeSourceMonitors(spec *volume.Spec) ([]string, error) {
|
||||
if spec.Volume != nil && spec.Volume.RBD != nil {
|
||||
return spec.Volume.RBD.CephMonitors, nil
|
||||
|
|
|
@ -81,10 +81,14 @@ func (fake *fakeDiskManager) MakeGlobalPDName(rbd rbd) string {
|
|||
return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image)
|
||||
}
|
||||
|
||||
func (fake *fakeDiskManager) MakeGlobalVDPDName(rbd rbd) string {
|
||||
return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image)
|
||||
}
|
||||
|
||||
func (fake *fakeDiskManager) AttachDisk(b rbdMounter) (string, error) {
|
||||
fake.mutex.Lock()
|
||||
defer fake.mutex.Unlock()
|
||||
fake.rbdMapIndex += 1
|
||||
fake.rbdMapIndex++
|
||||
devicePath := fmt.Sprintf("/dev/rbd%d", fake.rbdMapIndex)
|
||||
fake.rbdDevices[devicePath] = true
|
||||
return devicePath, nil
|
||||
|
@ -101,6 +105,17 @@ func (fake *fakeDiskManager) DetachDisk(r *rbdPlugin, deviceMountPath string, de
|
|||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakeDiskManager) DetachBlockDisk(r rbdDiskUnmapper, device string) error {
|
||||
fake.mutex.Lock()
|
||||
defer fake.mutex.Unlock()
|
||||
ok := fake.rbdDevices[device]
|
||||
if !ok {
|
||||
return fmt.Errorf("rbd: failed to detach device %s, it does not exist", device)
|
||||
}
|
||||
delete(fake.rbdDevices, device)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakeDiskManager) CreateImage(provisioner *rbdVolumeProvisioner) (r *v1.RBDPersistentVolumeSource, volumeSizeGB int, err error) {
|
||||
return nil, 0, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
fileutil "k8s.io/kubernetes/pkg/util/file"
|
||||
"k8s.io/kubernetes/pkg/util/node"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
|
@ -46,6 +47,11 @@ const (
|
|||
imageSizeStr = "size "
|
||||
sizeDivStr = " MB in"
|
||||
kubeLockMagic = "kubelet_lock_magic_"
|
||||
// The following three values are used for 30 seconds timeout
|
||||
// while waiting for RBD Watcher to expire.
|
||||
rbdImageWatcherInitDelay = 1 * time.Second
|
||||
rbdImageWatcherFactor = 1.4
|
||||
rbdImageWatcherSteps = 10
|
||||
)
|
||||
|
||||
// search /sys/bus for rbd device that matches given pool and image
|
||||
|
@ -109,6 +115,11 @@ func makePDNameInternal(host volume.VolumeHost, pool string, image string) strin
|
|||
return path.Join(host.GetPluginDir(rbdPluginName), "rbd", pool+"-image-"+image)
|
||||
}
|
||||
|
||||
// make a directory like /var/lib/kubelet/plugins/kubernetes.io/rbd/volumeDevices/pool-image-image
|
||||
func makeVDPDNameInternal(host volume.VolumeHost, pool string, image string) string {
|
||||
return path.Join(host.GetVolumeDevicePluginDir(rbdPluginName), pool+"-image-"+image)
|
||||
}
|
||||
|
||||
// RBDUtil implements diskManager interface.
|
||||
type RBDUtil struct{}
|
||||
|
||||
|
@ -118,6 +129,10 @@ func (util *RBDUtil) MakeGlobalPDName(rbd rbd) string {
|
|||
return makePDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image)
|
||||
}
|
||||
|
||||
func (util *RBDUtil) MakeGlobalVDPDName(rbd rbd) string {
|
||||
return makeVDPDNameInternal(rbd.plugin.host, rbd.Pool, rbd.Image)
|
||||
}
|
||||
|
||||
func rbdErrors(runErr, resultErr error) error {
|
||||
if err, ok := runErr.(*exec.Error); ok {
|
||||
if err.Err == exec.ErrNotFound {
|
||||
|
@ -217,13 +232,27 @@ func (util *RBDUtil) AttachDisk(b rbdMounter) (string, error) {
|
|||
|
||||
// Currently, we don't acquire advisory lock on image, but for backward
|
||||
// compatibility, we need to check if the image is being used by nodes running old kubelet.
|
||||
found, rbdOutput, err := util.rbdStatus(&b)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error: %v, rbd output: %v", err, rbdOutput)
|
||||
// osd_client_watch_timeout defaults to 30 seconds, if the watcher stays active longer than 30 seconds,
|
||||
// rbd image does not get mounted and failure message gets generated.
|
||||
backoff := wait.Backoff{
|
||||
Duration: rbdImageWatcherInitDelay,
|
||||
Factor: rbdImageWatcherFactor,
|
||||
Steps: rbdImageWatcherSteps,
|
||||
}
|
||||
if found {
|
||||
glog.Infof("rbd image %s/%s is still being used ", b.Pool, b.Image)
|
||||
return "", fmt.Errorf("rbd image %s/%s is still being used. rbd output: %s", b.Pool, b.Image, rbdOutput)
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
used, rbdOutput, err := util.rbdStatus(&b)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("fail to check rbd image status with: (%v), rbd output: (%s)", err, rbdOutput)
|
||||
}
|
||||
return !used, nil
|
||||
})
|
||||
// return error if rbd image has not become available for the specified timeout
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return "", fmt.Errorf("rbd image %s/%s is still being used", b.Pool, b.Image)
|
||||
}
|
||||
// return error if any other errors were encountered during wating for the image to becme avialble
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
mon := util.kernelRBDMonitorsOpt(b.Mon)
|
||||
|
@ -281,6 +310,35 @@ func (util *RBDUtil) DetachDisk(plugin *rbdPlugin, deviceMountPath string, devic
|
|||
return nil
|
||||
}
|
||||
|
||||
// DetachBlockDisk detaches the disk from the node.
|
||||
func (util *RBDUtil) DetachBlockDisk(disk rbdDiskUnmapper, mapPath string) error {
|
||||
|
||||
if pathExists, pathErr := volutil.PathExists(mapPath); pathErr != nil {
|
||||
return fmt.Errorf("Error checking if path exists: %v", pathErr)
|
||||
} else if !pathExists {
|
||||
glog.Warningf("Warning: Unmap skipped because path does not exist: %v", mapPath)
|
||||
return nil
|
||||
}
|
||||
// If we arrive here, device is no longer used, see if need to logout the target
|
||||
device, err := getBlockVolumeDevice(mapPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(device) == 0 {
|
||||
return fmt.Errorf("DetachDisk failed , device is empty")
|
||||
}
|
||||
// rbd unmap
|
||||
exec := disk.plugin.host.GetExec(disk.plugin.GetPluginName())
|
||||
output, err := exec.Run("rbd", "unmap", device)
|
||||
if err != nil {
|
||||
return rbdErrors(err, fmt.Errorf("rbd: failed to unmap device %s, error %v, rbd output: %s", device, err, string(output)))
|
||||
}
|
||||
glog.V(3).Infof("rbd: successfully unmap device %s", device)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanOldRBDFile read rbd info from rbd.json file and removes lock if found.
|
||||
// At last, it removes rbd.json file.
|
||||
func (util *RBDUtil) cleanOldRBDFile(plugin *rbdPlugin, rbdFile string) error {
|
||||
|
|
Loading…
Reference in New Issue