mirror of https://github.com/k3s-io/k3s
Merge pull request #68635 from mkimuram/issue/68424
CSI block volume refactor to fix target pathpull/58/head
commit
cde4c9ebe1
|
@ -21,15 +21,19 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
storage "k8s.io/api/storage/v1beta1"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
kstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
ioutil "k8s.io/kubernetes/pkg/volume/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type csiBlockMapper struct {
|
type csiBlockMapper struct {
|
||||||
|
@ -47,80 +51,61 @@ type csiBlockMapper struct {
|
||||||
|
|
||||||
var _ volume.BlockVolumeMapper = &csiBlockMapper{}
|
var _ volume.BlockVolumeMapper = &csiBlockMapper{}
|
||||||
|
|
||||||
// GetGlobalMapPath returns a path (on the node) to a device file which will be symlinked to
|
// GetGlobalMapPath returns a global map path (on the node) to a device file which will be symlinked to
|
||||||
// Example: plugins/kubernetes.io/csi/volumeDevices/{volumeID}/dev
|
// Example: plugins/kubernetes.io/csi/volumeDevices/{pvname}/dev
|
||||||
func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
|
func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
|
||||||
dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host)
|
dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host)
|
||||||
klog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
|
klog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
|
||||||
return dir, nil
|
return dir, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
|
||||||
|
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{pvname}
|
||||||
|
func (m *csiBlockMapper) getStagingPath() string {
|
||||||
|
sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(m.specName)
|
||||||
|
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "staging", sanitizedSpecVolID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
|
||||||
|
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{pvname}
|
||||||
|
func (m *csiBlockMapper) getPublishPath() string {
|
||||||
|
sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(m.specName)
|
||||||
|
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "publish", sanitizedSpecVolID)
|
||||||
|
}
|
||||||
|
|
||||||
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
|
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
|
||||||
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi/{volumeID}/dev, {volumeID}
|
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {pvname}
|
||||||
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
|
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
|
||||||
path := filepath.Join(m.plugin.host.GetPodVolumeDeviceDir(m.podUID, csiPluginName), m.specName, "dev")
|
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, kstrings.EscapeQualifiedNameForDisk(csiPluginName))
|
||||||
specName := m.specName
|
specName := m.specName
|
||||||
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
|
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
|
||||||
return path, specName
|
return path, specName
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetUpDevice ensures the device is attached returns path where the device is located.
|
// stageVolumeForBlock stages a block volume to stagingPath
|
||||||
func (m *csiBlockMapper) SetUpDevice() (string, error) {
|
func (m *csiBlockMapper) stageVolumeForBlock(
|
||||||
if !m.plugin.blockEnabled {
|
ctx context.Context,
|
||||||
return "", errors.New("CSIBlockVolume feature not enabled")
|
csi csiClient,
|
||||||
}
|
accessMode v1.PersistentVolumeAccessMode,
|
||||||
|
csiSource *v1.CSIPersistentVolumeSource,
|
||||||
|
attachment *storage.VolumeAttachment,
|
||||||
|
) (string, error) {
|
||||||
|
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called"))
|
||||||
|
|
||||||
klog.V(4).Infof(log("blockMapper.SetupDevice called"))
|
stagingPath := m.getStagingPath()
|
||||||
|
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath))
|
||||||
if m.spec == nil {
|
|
||||||
klog.Error(log("blockMapper.Map spec is nil"))
|
|
||||||
return "", fmt.Errorf("spec is nil")
|
|
||||||
}
|
|
||||||
csiSource, err := getCSISourceFromSpec(m.spec)
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(log("blockMapper.SetupDevice failed to get CSI persistent source: %v", err))
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
globalMapPath, err := m.GetGlobalMapPath(m.spec)
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(log("blockMapper.SetupDevice failed to get global map path: %v", err))
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
globalMapPathBlockFile := filepath.Join(globalMapPath, "file")
|
|
||||||
klog.V(4).Infof(log("blockMapper.SetupDevice global device map path file set [%s]", globalMapPathBlockFile))
|
|
||||||
|
|
||||||
csi := m.csiClient
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Check whether "STAGE_UNSTAGE_VOLUME" is set
|
// Check whether "STAGE_UNSTAGE_VOLUME" is set
|
||||||
stageUnstageSet, err := hasStageUnstageCapability(ctx, csi)
|
stageUnstageSet, err := hasStageUnstageCapability(ctx, csi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Error(log("blockMapper.SetupDevice failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
|
klog.Error(log("blockMapper.stageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
if !stageUnstageSet {
|
if !stageUnstageSet {
|
||||||
klog.Infof(log("blockMapper.SetupDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
|
klog.Infof(log("blockMapper.stageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start MountDevice
|
|
||||||
nodeName := string(m.plugin.host.GetNodeName())
|
|
||||||
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
|
|
||||||
|
|
||||||
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
|
|
||||||
attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
if attachment == nil {
|
|
||||||
klog.Error(log("blockMapper.SetupDevice unable to find VolumeAttachment [id=%s]", attachID))
|
|
||||||
return "", errors.New("no existing VolumeAttachment found")
|
|
||||||
}
|
|
||||||
publishVolumeInfo := attachment.Status.AttachmentMetadata
|
publishVolumeInfo := attachment.Status.AttachmentMetadata
|
||||||
|
|
||||||
nodeStageSecrets := map[string]string{}
|
nodeStageSecrets := map[string]string{}
|
||||||
|
@ -132,145 +117,78 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setup path globalMapPath and block file before call to NodeStageVolume
|
// Creating a stagingPath directory before call to NodeStageVolume
|
||||||
if err := os.MkdirAll(globalMapPath, 0750); err != nil {
|
if err := os.MkdirAll(stagingPath, 0750); err != nil {
|
||||||
klog.Error(log("blockMapper.SetupDevice failed to create dir %s: %v", globalMapPath, err))
|
klog.Error(log("blockMapper.stageVolumeForBlock failed to create dir %s: %v", stagingPath, err))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
klog.V(4).Info(log("blockMapper.SetupDevice created global device map path successfully [%s]", globalMapPath))
|
klog.V(4).Info(log("blockMapper.stageVolumeForBlock created stagingPath directory successfully [%s]", stagingPath))
|
||||||
|
|
||||||
// create block device file
|
|
||||||
blockFile, err := os.OpenFile(globalMapPathBlockFile, os.O_CREATE|os.O_RDWR, 0750)
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(log("blockMapper.SetupDevice failed to create dir %s: %v", globalMapPathBlockFile, err))
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
if err := blockFile.Close(); err != nil {
|
|
||||||
klog.Error(log("blockMapper.SetupDevice failed to close file %s: %v", globalMapPathBlockFile, err))
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
klog.V(4).Info(log("blockMapper.SetupDevice created global map path block device file successfully [%s]", globalMapPathBlockFile))
|
|
||||||
|
|
||||||
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
|
|
||||||
accessMode := v1.ReadWriteOnce
|
|
||||||
if m.spec.PersistentVolume.Spec.AccessModes != nil {
|
|
||||||
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Request to stage a block volume to stagingPath.
|
||||||
|
// Expected implementation for driver is creating driver specific resource on stagingPath and
|
||||||
|
// attaching the block volume to the node.
|
||||||
err = csi.NodeStageVolume(ctx,
|
err = csi.NodeStageVolume(ctx,
|
||||||
csiSource.VolumeHandle,
|
csiSource.VolumeHandle,
|
||||||
publishVolumeInfo,
|
publishVolumeInfo,
|
||||||
globalMapPathBlockFile,
|
stagingPath,
|
||||||
fsTypeBlockName,
|
fsTypeBlockName,
|
||||||
accessMode,
|
accessMode,
|
||||||
nodeStageSecrets,
|
nodeStageSecrets,
|
||||||
csiSource.VolumeAttributes)
|
csiSource.VolumeAttributes)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Error(log("blockMapper.SetupDevice failed: %v", err))
|
klog.Error(log("blockMapper.stageVolumeForBlock failed: %v", err))
|
||||||
if err := os.RemoveAll(globalMapPath); err != nil {
|
|
||||||
klog.Error(log("blockMapper.SetupDevice failed to remove dir after a NodeStageVolume() error [%s]: %v", globalMapPath, err))
|
|
||||||
}
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof(log("blockMapper.SetupDevice successfully requested NodeStageVolume [%s]", globalMapPathBlockFile))
|
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath))
|
||||||
return globalMapPathBlockFile, nil
|
return stagingPath, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error {
|
// publishVolumeForBlock publishes a block volume to publishPath
|
||||||
if !m.plugin.blockEnabled {
|
func (m *csiBlockMapper) publishVolumeForBlock(
|
||||||
return errors.New("CSIBlockVolume feature not enabled")
|
ctx context.Context,
|
||||||
}
|
csi csiClient,
|
||||||
|
accessMode v1.PersistentVolumeAccessMode,
|
||||||
|
csiSource *v1.CSIPersistentVolumeSource,
|
||||||
|
attachment *storage.VolumeAttachment,
|
||||||
|
stagingPath string,
|
||||||
|
) (string, error) {
|
||||||
|
klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called"))
|
||||||
|
|
||||||
klog.V(4).Infof(log("blockMapper.MapDevice mapping block device %s", devicePath))
|
|
||||||
|
|
||||||
if m.spec == nil {
|
|
||||||
klog.Error(log("blockMapper.MapDevice spec is nil"))
|
|
||||||
return fmt.Errorf("spec is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
csiSource, err := getCSISourceFromSpec(m.spec)
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(log("blockMapper.MapDevice failed to get CSI persistent source: %v", err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
csi := m.csiClient
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
globalMapPathBlockFile := devicePath
|
|
||||||
dir, _ := m.GetPodDeviceMapPath()
|
|
||||||
targetBlockFilePath := filepath.Join(dir, "file")
|
|
||||||
klog.V(4).Infof(log("blockMapper.MapDevice target volume map file path %s", targetBlockFilePath))
|
|
||||||
|
|
||||||
stageCapable, err := hasStageUnstageCapability(ctx, csi)
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(log("blockMapper.MapDevice failed to check for STAGE_UNSTAGE_VOLUME capabilty: %v", err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !stageCapable {
|
|
||||||
globalMapPathBlockFile = ""
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeName := string(m.plugin.host.GetNodeName())
|
|
||||||
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
|
|
||||||
|
|
||||||
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
|
|
||||||
attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(log("blockMapper.MapDevice failed to get volume attachment [id=%v]: %v", attachID, err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if attachment == nil {
|
|
||||||
klog.Error(log("blockMapper.MapDevice unable to find VolumeAttachment [id=%s]", attachID))
|
|
||||||
return errors.New("no existing VolumeAttachment found")
|
|
||||||
}
|
|
||||||
publishVolumeInfo := attachment.Status.AttachmentMetadata
|
publishVolumeInfo := attachment.Status.AttachmentMetadata
|
||||||
|
|
||||||
nodePublishSecrets := map[string]string{}
|
nodePublishSecrets := map[string]string{}
|
||||||
|
var err error
|
||||||
if csiSource.NodePublishSecretRef != nil {
|
if csiSource.NodePublishSecretRef != nil {
|
||||||
nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
|
nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("blockMapper.MapDevice failed to get NodePublishSecretRef %s/%s: %v",
|
klog.Errorf("blockMapper.publishVolumeForBlock failed to get NodePublishSecretRef %s/%s: %v",
|
||||||
csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
|
csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
publishPath := m.getPublishPath()
|
||||||
klog.Error(log("blockMapper.MapDevice failed to create dir %s: %v", dir, err))
|
// Setup a parent directory for publishPath before call to NodePublishVolume
|
||||||
return err
|
publishDir := filepath.Dir(publishPath)
|
||||||
}
|
if err := os.MkdirAll(publishDir, 0750); err != nil {
|
||||||
klog.V(4).Info(log("blockMapper.MapDevice created target volume map path successfully [%s]", dir))
|
klog.Error(log("blockMapper.publishVolumeForBlock failed to create dir %s: %v", publishDir, err))
|
||||||
|
return "", err
|
||||||
// create target map volume block file
|
|
||||||
targetBlockFile, err := os.OpenFile(targetBlockFilePath, os.O_CREATE|os.O_RDWR, 0750)
|
|
||||||
if err != nil {
|
|
||||||
klog.Error(log("blockMapper.MapDevice failed to create file %s: %v", targetBlockFilePath, err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := targetBlockFile.Close(); err != nil {
|
|
||||||
klog.Error(log("blockMapper.MapDevice failed to close file %s: %v", targetBlockFilePath, err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
klog.V(4).Info(log("blockMapper.MapDevice created target volume map file successfully [%s]", targetBlockFilePath))
|
|
||||||
|
|
||||||
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
|
|
||||||
accessMode := v1.ReadWriteOnce
|
|
||||||
if m.spec.PersistentVolume.Spec.AccessModes != nil {
|
|
||||||
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
|
|
||||||
}
|
}
|
||||||
|
klog.V(4).Info(log("blockMapper.publishVolumeForBlock created directory for publishPath successfully [%s]", publishDir))
|
||||||
|
|
||||||
|
// Request to publish a block volume to publishPath.
|
||||||
|
// Expectation for driver is to place a block volume on the publishPath, by bind-mounting the device file on the publishPath or
|
||||||
|
// creating device file on the publishPath.
|
||||||
|
// Parent directory for publishPath is created by k8s, but driver is responsible for creating publishPath itself.
|
||||||
|
// If driver doesn't implement NodeStageVolume, attaching the block volume to the node may be done, here.
|
||||||
err = csi.NodePublishVolume(
|
err = csi.NodePublishVolume(
|
||||||
ctx,
|
ctx,
|
||||||
m.volumeID,
|
m.volumeID,
|
||||||
m.readOnly,
|
m.readOnly,
|
||||||
globalMapPathBlockFile,
|
stagingPath,
|
||||||
targetBlockFilePath,
|
publishPath,
|
||||||
accessMode,
|
accessMode,
|
||||||
publishVolumeInfo,
|
publishVolumeInfo,
|
||||||
csiSource.VolumeAttributes,
|
csiSource.VolumeAttributes,
|
||||||
|
@ -280,18 +198,123 @@ func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, vol
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf(log("blockMapper.MapDevice failed: %v", err))
|
klog.Errorf(log("blockMapper.publishVolumeForBlock failed: %v", err))
|
||||||
if err := os.RemoveAll(dir); err != nil {
|
return "", err
|
||||||
klog.Error(log("blockMapper.MapDevice failed to remove mapped dir after a NodePublish() error [%s]: %v", dir, err))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return publishPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetUpDevice ensures the device is attached returns path where the device is located.
|
||||||
|
func (m *csiBlockMapper) SetUpDevice() (string, error) {
|
||||||
|
if !m.plugin.blockEnabled {
|
||||||
|
return "", errors.New("CSIBlockVolume feature not enabled")
|
||||||
|
}
|
||||||
|
klog.V(4).Infof(log("blockMapper.SetUpDevice called"))
|
||||||
|
|
||||||
|
// Get csiSource from spec
|
||||||
|
if m.spec == nil {
|
||||||
|
klog.Error(log("blockMapper.SetUpDevice spec is nil"))
|
||||||
|
return "", fmt.Errorf("spec is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
csiSource, err := getCSISourceFromSpec(m.spec)
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
|
||||||
|
nodeName := string(m.plugin.host.GetNodeName())
|
||||||
|
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
|
||||||
|
attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if attachment == nil {
|
||||||
|
klog.Error(log("blockMapper.SetupDevice unable to find VolumeAttachment [id=%s]", attachID))
|
||||||
|
return "", errors.New("no existing VolumeAttachment found")
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
|
||||||
|
accessMode := v1.ReadWriteOnce
|
||||||
|
if m.spec.PersistentVolume.Spec.AccessModes != nil {
|
||||||
|
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Call NodeStageVolume
|
||||||
|
stagingPath, err := m.stageVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call NodePublishVolume
|
||||||
|
publishPath, err := m.publishVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment, stagingPath)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return publishPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error {
|
||||||
|
return ioutil.MapBlockVolume(devicePath, globalMapPath, volumeMapPath, volumeMapName, podUID)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
|
||||||
|
|
||||||
|
// unpublishVolumeForBlock unpublishes a block volume from publishPath
|
||||||
|
func (m *csiBlockMapper) unpublishVolumeForBlock(ctx context.Context, csi csiClient, publishPath string) error {
|
||||||
|
// Request to unpublish a block volume from publishPath.
|
||||||
|
// Expectation for driver is to remove block volume from the publishPath, by unmounting bind-mounted device file
|
||||||
|
// or deleting device file.
|
||||||
|
// Driver is responsible for deleting publishPath itself.
|
||||||
|
// If driver doesn't implement NodeUnstageVolume, detaching the block volume from the node may be done, here.
|
||||||
|
if err := csi.NodeUnpublishVolume(ctx, m.volumeID, publishPath); err != nil {
|
||||||
|
klog.Error(log("blockMapper.unpublishVolumeForBlock failed: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
klog.V(4).Infof(log("blockMapper.unpublishVolumeForBlock NodeUnpublished successfully [%s]", publishPath))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// unstageVolumeForBlock unstages a block volume from stagingPath
|
||||||
|
func (m *csiBlockMapper) unstageVolumeForBlock(ctx context.Context, csi csiClient, stagingPath string) error {
|
||||||
|
// Check whether "STAGE_UNSTAGE_VOLUME" is set
|
||||||
|
stageUnstageSet, err := hasStageUnstageCapability(ctx, csi)
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(log("blockMapper.unstageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !stageUnstageSet {
|
||||||
|
klog.Infof(log("blockMapper.unstageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping unstageVolumeForBlock ..."))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request to unstage a block volume from stagingPath.
|
||||||
|
// Expected implementation for driver is removing driver specific resource in stagingPath and
|
||||||
|
// detaching the block volume from the node.
|
||||||
|
if err := csi.NodeUnstageVolume(ctx, m.volumeID, stagingPath); err != nil {
|
||||||
|
klog.Errorf(log("blockMapper.unstageVolumeForBlock failed: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
klog.V(4).Infof(log("blockMapper.unstageVolumeForBlock NodeUnstageVolume successfully [%s]", stagingPath))
|
||||||
|
|
||||||
|
// Remove stagingPath directory and its contents
|
||||||
|
if err := os.RemoveAll(stagingPath); err != nil {
|
||||||
|
klog.Error(log("blockMapper.unstageVolumeForBlock failed to remove staging path after NodeUnstageVolume() error [%s]: %v", stagingPath, err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
|
|
||||||
|
|
||||||
// TearDownDevice removes traces of the SetUpDevice.
|
// TearDownDevice removes traces of the SetUpDevice.
|
||||||
func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error {
|
func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error {
|
||||||
if !m.plugin.blockEnabled {
|
if !m.plugin.blockEnabled {
|
||||||
|
@ -300,26 +323,38 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
|
||||||
|
|
||||||
klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath))
|
klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath))
|
||||||
|
|
||||||
csi := m.csiClient
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// unmap global device map path
|
// Call NodeUnpublishVolume
|
||||||
if err := csi.NodeUnstageVolume(ctx, m.volumeID, globalMapPath); err != nil {
|
publishPath := m.getPublishPath()
|
||||||
klog.Errorf(log("blockMapper.TearDownDevice failed: %v", err))
|
if _, err := os.Stat(publishPath); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
klog.V(4).Infof(log("blockMapper.TearDownDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
|
||||||
|
} else {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
klog.V(4).Infof(log("blockMapper.TearDownDevice NodeUnstageVolume successfully [%s]", globalMapPath))
|
} else {
|
||||||
|
err := m.unpublishVolumeForBlock(ctx, m.csiClient, publishPath)
|
||||||
// request to remove pod volume map path also
|
if err != nil {
|
||||||
podVolumePath, volumeName := m.GetPodDeviceMapPath()
|
|
||||||
podVolumeMapPath := filepath.Join(podVolumePath, volumeName)
|
|
||||||
if err := csi.NodeUnpublishVolume(ctx, m.volumeID, podVolumeMapPath); err != nil {
|
|
||||||
klog.Error(log("blockMapper.TearDownDevice failed: %v", err))
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
klog.V(4).Infof(log("blockMapper.TearDownDevice NodeUnpublished successfully [%s]", podVolumeMapPath))
|
// Call NodeUnstageVolume
|
||||||
|
stagingPath := m.getStagingPath()
|
||||||
|
if _, err := os.Stat(stagingPath); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath))
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err := m.unstageVolumeForBlock(ctx, m.csiClient, stagingPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,21 @@ import (
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) {
|
||||||
|
pv := makeTestPV(specVolumeName, 10, testDriver, testVol)
|
||||||
|
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
||||||
|
mapper, err := plug.NewBlockVolumeMapper(
|
||||||
|
spec,
|
||||||
|
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
||||||
|
volume.VolumeOptions{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, fmt.Errorf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
csiMapper := mapper.(*csiBlockMapper)
|
||||||
|
return csiMapper, spec, pv, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestBlockMapperGetGlobalMapPath(t *testing.T) {
|
func TestBlockMapperGetGlobalMapPath(t *testing.T) {
|
||||||
plug, tmpDir := newTestPlugin(t, nil, nil)
|
plug, tmpDir := newTestPlugin(t, nil, nil)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
@ -53,17 +68,10 @@ func TestBlockMapperGetGlobalMapPath(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Logf("test case: %s", tc.name)
|
t.Logf("test case: %s", tc.name)
|
||||||
pv := makeTestPV(tc.specVolumeName, 10, testDriver, testVol)
|
csiMapper, spec, _, err := prepareBlockMapperTest(plug, tc.specVolumeName)
|
||||||
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
|
||||||
mapper, err := plug.NewBlockVolumeMapper(
|
|
||||||
spec,
|
|
||||||
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
|
||||||
volume.VolumeOptions{},
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to make a new Mapper: %v", err)
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
}
|
}
|
||||||
csiMapper := mapper.(*csiBlockMapper)
|
|
||||||
|
|
||||||
path, err := csiMapper.GetGlobalMapPath(spec)
|
path, err := csiMapper.GetGlobalMapPath(spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -76,6 +84,115 @@ func TestBlockMapperGetGlobalMapPath(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBlockMapperGetStagingPath(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t, nil, nil)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
specVolumeName string
|
||||||
|
path string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple specName",
|
||||||
|
specVolumeName: "spec-0",
|
||||||
|
path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/staging/%s", "spec-0")),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "specName with dots",
|
||||||
|
specVolumeName: "test.spec.1",
|
||||||
|
path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/staging/%s", "test.spec.1")),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Logf("test case: %s", tc.name)
|
||||||
|
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
path := csiMapper.getStagingPath()
|
||||||
|
|
||||||
|
if tc.path != path {
|
||||||
|
t.Errorf("expecting path %s, got %s", tc.path, path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlockMapperGetPublishPath(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t, nil, nil)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
specVolumeName string
|
||||||
|
path string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple specName",
|
||||||
|
specVolumeName: "spec-0",
|
||||||
|
path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s", "spec-0")),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "specName with dots",
|
||||||
|
specVolumeName: "test.spec.1",
|
||||||
|
path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s", "test.spec.1")),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Logf("test case: %s", tc.name)
|
||||||
|
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
path := csiMapper.getPublishPath()
|
||||||
|
|
||||||
|
if tc.path != path {
|
||||||
|
t.Errorf("expecting path %s, got %s", tc.path, path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlockMapperGetDeviceMapPath(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t, nil, nil)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
specVolumeName string
|
||||||
|
path string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple specName",
|
||||||
|
specVolumeName: "spec-0",
|
||||||
|
path: path.Join(tmpDir, fmt.Sprintf("pods/%s/volumeDevices/kubernetes.io~csi", testPodUID)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "specName with dots",
|
||||||
|
specVolumeName: "test.spec.1",
|
||||||
|
path: path.Join(tmpDir, fmt.Sprintf("pods/%s/volumeDevices/kubernetes.io~csi", testPodUID)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Logf("test case: %s", tc.name)
|
||||||
|
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
path, volName := csiMapper.GetPodDeviceMapPath()
|
||||||
|
|
||||||
|
if tc.path != path {
|
||||||
|
t.Errorf("expecting path %s, got %s", tc.path, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.specVolumeName != volName {
|
||||||
|
t.Errorf("expecting volName %s, got %s", tc.specVolumeName, volName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestBlockMapperSetupDevice(t *testing.T) {
|
func TestBlockMapperSetupDevice(t *testing.T) {
|
||||||
plug, tmpDir := newTestPlugin(t, nil, nil)
|
plug, tmpDir := newTestPlugin(t, nil, nil)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
@ -88,21 +205,15 @@ func TestBlockMapperSetupDevice(t *testing.T) {
|
||||||
"fakeNode",
|
"fakeNode",
|
||||||
)
|
)
|
||||||
plug.host = host
|
plug.host = host
|
||||||
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
|
||||||
|
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
pvName := pv.GetName()
|
pvName := pv.GetName()
|
||||||
nodeName := string(plug.host.GetNodeName())
|
nodeName := string(plug.host.GetNodeName())
|
||||||
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
|
||||||
|
|
||||||
// MapDevice
|
|
||||||
mapper, err := plug.NewBlockVolumeMapper(
|
|
||||||
spec,
|
|
||||||
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
|
||||||
volume.VolumeOptions{},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to create new mapper: %v", err)
|
|
||||||
}
|
|
||||||
csiMapper := mapper.(*csiBlockMapper)
|
|
||||||
csiMapper.csiClient = setupClient(t, true)
|
csiMapper.csiClient = setupClient(t, true)
|
||||||
|
|
||||||
attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName))
|
attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName))
|
||||||
|
@ -119,22 +230,31 @@ func TestBlockMapperSetupDevice(t *testing.T) {
|
||||||
t.Fatalf("mapper failed to SetupDevice: %v", err)
|
t.Fatalf("mapper failed to SetupDevice: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
globalMapPath, err := csiMapper.GetGlobalMapPath(spec)
|
// Check if SetUpDevice returns the right path
|
||||||
if err != nil {
|
publishPath := csiMapper.getPublishPath()
|
||||||
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
if devicePath != publishPath {
|
||||||
|
t.Fatalf("mapper.SetupDevice returned unexpected path %s instead of %v", devicePath, publishPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
if devicePath != filepath.Join(globalMapPath, "file") {
|
// Check if NodeStageVolume staged to the right path
|
||||||
t.Fatalf("mapper.SetupDevice returned unexpected path %s instead of %v", devicePath, globalMapPath)
|
stagingPath := csiMapper.getStagingPath()
|
||||||
|
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||||
|
svol, ok := svols[csiMapper.volumeID]
|
||||||
|
if !ok {
|
||||||
|
t.Error("csi server may not have received NodeStageVolume call")
|
||||||
|
}
|
||||||
|
if svol.Path != stagingPath {
|
||||||
|
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
|
||||||
}
|
}
|
||||||
|
|
||||||
vols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
// Check if NodePublishVolume published to the right path
|
||||||
vol, ok := vols[csiMapper.volumeID]
|
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
||||||
|
pvol, ok := pvols[csiMapper.volumeID]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Error("csi server may not have received NodePublishVolume call")
|
t.Error("csi server may not have received NodePublishVolume call")
|
||||||
}
|
}
|
||||||
if vol.Path != devicePath {
|
if pvol.Path != publishPath {
|
||||||
t.Errorf("csi server expected device path %s, got %s", devicePath, vol.Path)
|
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,21 +270,15 @@ func TestBlockMapperMapDevice(t *testing.T) {
|
||||||
"fakeNode",
|
"fakeNode",
|
||||||
)
|
)
|
||||||
plug.host = host
|
plug.host = host
|
||||||
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
|
||||||
|
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
pvName := pv.GetName()
|
pvName := pv.GetName()
|
||||||
nodeName := string(plug.host.GetNodeName())
|
nodeName := string(plug.host.GetNodeName())
|
||||||
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
|
||||||
|
|
||||||
// MapDevice
|
|
||||||
mapper, err := plug.NewBlockVolumeMapper(
|
|
||||||
spec,
|
|
||||||
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
|
||||||
volume.VolumeOptions{},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to create new mapper: %v", err)
|
|
||||||
}
|
|
||||||
csiMapper := mapper.(*csiBlockMapper)
|
|
||||||
csiMapper.csiClient = setupClient(t, true)
|
csiMapper.csiClient = setupClient(t, true)
|
||||||
|
|
||||||
attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName))
|
attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName))
|
||||||
|
@ -185,6 +299,16 @@ func TestBlockMapperMapDevice(t *testing.T) {
|
||||||
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Actual SetupDevice should create a symlink to or a bind mout of device in devicePath.
|
||||||
|
// Create dummy file there before calling MapDevice to test it properly.
|
||||||
|
fd, err := os.Create(devicePath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapper failed to create dummy file in devicePath: %v", err)
|
||||||
|
}
|
||||||
|
if err := fd.Close(); err != nil {
|
||||||
|
t.Fatalf("mapper failed to close dummy file in devicePath: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Map device to global and pod device map path
|
// Map device to global and pod device map path
|
||||||
volumeMapPath, volName := csiMapper.GetPodDeviceMapPath()
|
volumeMapPath, volName := csiMapper.GetPodDeviceMapPath()
|
||||||
err = csiMapper.MapDevice(devicePath, globalMapPath, volumeMapPath, volName, csiMapper.podUID)
|
err = csiMapper.MapDevice(devicePath, globalMapPath, volumeMapPath, volName, csiMapper.podUID)
|
||||||
|
@ -192,22 +316,26 @@ func TestBlockMapperMapDevice(t *testing.T) {
|
||||||
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
podVolumeBlockFilePath := filepath.Join(volumeMapPath, "file")
|
// Check if symlink {globalMapPath}/{podUID} exists
|
||||||
if _, err := os.Stat(podVolumeBlockFilePath); err != nil {
|
globalMapFilePath := filepath.Join(globalMapPath, string(csiMapper.podUID))
|
||||||
|
if _, err := os.Stat(globalMapFilePath); err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
t.Errorf("mapper.MapDevice failed, volume path not created: %v", err)
|
t.Errorf("mapper.MapDevice failed, symlink in globalMapPath not created: %v", err)
|
||||||
|
t.Errorf("mapper.MapDevice devicePath:%v, globalMapPath: %v, globalMapFilePath: %v",
|
||||||
|
devicePath, globalMapPath, globalMapFilePath)
|
||||||
} else {
|
} else {
|
||||||
t.Errorf("mapper.MapDevice failed: %v", err)
|
t.Errorf("mapper.MapDevice failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pubs := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
// Check if symlink {volumeMapPath}/{volName} exists
|
||||||
vol, ok := pubs[csiMapper.volumeID]
|
volumeMapFilePath := filepath.Join(volumeMapPath, volName)
|
||||||
if !ok {
|
if _, err := os.Stat(volumeMapFilePath); err != nil {
|
||||||
t.Error("csi server may not have received NodePublishVolume call")
|
if os.IsNotExist(err) {
|
||||||
|
t.Errorf("mapper.MapDevice failed, symlink in volumeMapPath not created: %v", err)
|
||||||
|
} else {
|
||||||
|
t.Errorf("mapper.MapDevice failed: %v", err)
|
||||||
}
|
}
|
||||||
if vol.Path != podVolumeBlockFilePath {
|
|
||||||
t.Errorf("csi server expected path %s, got %s", podVolumeBlockFilePath, vol.Path)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,8 +351,11 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
|
||||||
"fakeNode",
|
"fakeNode",
|
||||||
)
|
)
|
||||||
plug.host = host
|
plug.host = host
|
||||||
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
|
||||||
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
_, spec, pv, err := prepareBlockMapperTest(plug, "test-pv")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// save volume data
|
// save volume data
|
||||||
dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
||||||
|
@ -262,15 +393,15 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure csi client call and node unstaged
|
|
||||||
vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
|
||||||
if _, ok := vols[csiUnmapper.volumeID]; ok {
|
|
||||||
t.Error("csi server may not have received NodeUnstageVolume call")
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensure csi client call and node unpblished
|
// ensure csi client call and node unpblished
|
||||||
pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
||||||
if _, ok := pubs[csiUnmapper.volumeID]; ok {
|
if _, ok := pubs[csiUnmapper.volumeID]; ok {
|
||||||
t.Error("csi server may not have received NodeUnpublishVolume call")
|
t.Error("csi server may not have received NodeUnpublishVolume call")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensure csi client call and node unstaged
|
||||||
|
vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||||
|
if _, ok := vols[csiUnmapper.volumeID]; ok {
|
||||||
|
t.Error("csi server may not have received NodeUnstageVolume call")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1045,7 +1045,7 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
|
||||||
}
|
}
|
||||||
|
|
||||||
blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
|
blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
|
||||||
string(deviceToDetach.VolumeName),
|
deviceToDetach.VolumeSpec.Name(),
|
||||||
"" /* podUID */)
|
"" /* podUID */)
|
||||||
if newUnmapperErr != nil {
|
if newUnmapperErr != nil {
|
||||||
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr)
|
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr)
|
||||||
|
|
Loading…
Reference in New Issue