From f0354ad605c634f70d29beda1d3788cfa55f34c7 Mon Sep 17 00:00:00 2001 From: Masaki Kimura Date: Wed, 19 Sep 2018 18:58:12 +0000 Subject: [PATCH] Fix for adding block volume support to CSI RBD driver --- pkg/volume/csi/csi_block.go | 383 ++++++++++-------- pkg/volume/csi/csi_block_test.go | 249 +++++++++--- .../operationexecutor/operation_generator.go | 2 +- 3 files changed, 411 insertions(+), 223 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index 41129fd4fb..c6f4e399b9 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -21,15 +21,19 @@ import ( "errors" "fmt" "os" + "path" "path/filepath" "k8s.io/klog" "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + kstrings "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/volume" + ioutil "k8s.io/kubernetes/pkg/volume/util" ) type csiBlockMapper struct { @@ -47,7 +51,7 @@ type csiBlockMapper struct { var _ volume.BlockVolumeMapper = &csiBlockMapper{} -// GetGlobalMapPath returns a path (on the node) to a device file which will be symlinked to +// GetGlobalMapPath returns a global map path (on the node) to a device file which will be symlinked to // Example: plugins/kubernetes.io/csi/volumeDevices/{volumeID}/dev func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) { dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host) @@ -55,72 +59,54 @@ func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) { return dir, nil } +// getStagingPath returns a staging path (on the node) to a device file which will be bind mounted to +// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{volumeID} +func (m *csiBlockMapper) getStagingPath() string { + sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(m.specName) + return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "staging", sanitizedSpecVolID) +} + +// getPublishPath returns a publish path (on the node) to a device file which will be bind mounted to +// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{volumeID} +func (m *csiBlockMapper) getPublishPath() string { + sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(m.specName) + return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "publish", sanitizedSpecVolID) +} + // GetPodDeviceMapPath returns pod's device file which will be mapped to a volume -// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi/{volumeID}/dev, {volumeID} +// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {volumeID} func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) { - path := filepath.Join(m.plugin.host.GetPodVolumeDeviceDir(m.podUID, csiPluginName), m.specName, "dev") + path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, kstrings.EscapeQualifiedNameForDisk(csiPluginName)) specName := m.specName klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName)) return path, specName } -// SetUpDevice ensures the device is attached returns path where the device is located. -func (m *csiBlockMapper) SetUpDevice() (string, error) { - if !m.plugin.blockEnabled { - return "", errors.New("CSIBlockVolume feature not enabled") - } +// stageVolumeForBlock makes a block volume visible via the staging path +func (m *csiBlockMapper) stageVolumeForBlock( + ctx context.Context, + csi csiClient, + accessMode v1.PersistentVolumeAccessMode, + csiSource *v1.CSIPersistentVolumeSource, + attachment *storage.VolumeAttachment, +) (string, error) { + klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called")) - klog.V(4).Infof(log("blockMapper.SetupDevice called")) - - if m.spec == nil { - klog.Error(log("blockMapper.Map spec is nil")) - return "", fmt.Errorf("spec is nil") - } - csiSource, err := getCSISourceFromSpec(m.spec) - if err != nil { - klog.Error(log("blockMapper.SetupDevice failed to get CSI persistent source: %v", err)) - return "", err - } - - globalMapPath, err := m.GetGlobalMapPath(m.spec) - if err != nil { - klog.Error(log("blockMapper.SetupDevice failed to get global map path: %v", err)) - return "", err - } - - globalMapPathBlockFile := filepath.Join(globalMapPath, "file") - klog.V(4).Infof(log("blockMapper.SetupDevice global device map path file set [%s]", globalMapPathBlockFile)) - - csi := m.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) - defer cancel() + stagingPath := m.getStagingPath() + klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath)) // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := hasStageUnstageCapability(ctx, csi) if err != nil { - klog.Error(log("blockMapper.SetupDevice failed to check STAGE_UNSTAGE_VOLUME capability: %v", err)) + klog.Error(log("blockMapper.stageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err)) return "", err } if !stageUnstageSet { - klog.Infof(log("blockMapper.SetupDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice...")) + klog.Infof(log("blockMapper.stageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice...")) return "", nil } // Start MountDevice - nodeName := string(m.plugin.host.GetNodeName()) - attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) - - // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName - attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) - if err != nil { - klog.Error(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err)) - return "", err - } - - if attachment == nil { - klog.Error(log("blockMapper.SetupDevice unable to find VolumeAttachment [id=%s]", attachID)) - return "", errors.New("no existing VolumeAttachment found") - } publishVolumeInfo := attachment.Status.AttachmentMetadata nodeStageSecrets := map[string]string{} @@ -132,145 +118,98 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) { } } - // setup path globalMapPath and block file before call to NodeStageVolume - if err := os.MkdirAll(globalMapPath, 0750); err != nil { - klog.Error(log("blockMapper.SetupDevice failed to create dir %s: %v", globalMapPath, err)) + // setup path directory for stagingPath before call to NodeStageVolume + stagingDir := filepath.Dir(stagingPath) + if err := os.MkdirAll(stagingDir, 0750); err != nil { + klog.Error(log("blockMapper.stageVolumeForBlock failed to create dir %s: %v", stagingDir, err)) return "", err } - klog.V(4).Info(log("blockMapper.SetupDevice created global device map path successfully [%s]", globalMapPath)) + klog.V(4).Info(log("blockMapper.stageVolumeForBlock created directory for stagingPath successfully [%s]", stagingDir)) - // create block device file - blockFile, err := os.OpenFile(globalMapPathBlockFile, os.O_CREATE|os.O_RDWR, 0750) + // create an empty file on staging path where block device is bind mounted to + stagingPathFile, err := os.OpenFile(stagingPath, os.O_CREATE|os.O_RDWR, 0750) if err != nil { - klog.Error(log("blockMapper.SetupDevice failed to create dir %s: %v", globalMapPathBlockFile, err)) + klog.Error(log("blockMapper.stageVolumeForBlock failed to create file %s: %v", stagingPathFile, err)) return "", err } - if err := blockFile.Close(); err != nil { - klog.Error(log("blockMapper.SetupDevice failed to close file %s: %v", globalMapPathBlockFile, err)) + if err := stagingPathFile.Close(); err != nil { + klog.Error(log("blockMapper.stageVolumeForBlock failed to close file %s: %v", stagingPathFile, err)) return "", err } - klog.V(4).Info(log("blockMapper.SetupDevice created global map path block device file successfully [%s]", globalMapPathBlockFile)) - - //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI - accessMode := v1.ReadWriteOnce - if m.spec.PersistentVolume.Spec.AccessModes != nil { - accessMode = m.spec.PersistentVolume.Spec.AccessModes[0] - } + klog.V(4).Info(log("blockMapper.stageVolumeForBlock created an empty file on staging path successfully [%s]", stagingPathFile)) + // Request to attach the device to the node and to bind mount the device to stagingPath. err = csi.NodeStageVolume(ctx, csiSource.VolumeHandle, publishVolumeInfo, - globalMapPathBlockFile, + stagingPath, fsTypeBlockName, accessMode, nodeStageSecrets, csiSource.VolumeAttributes) if err != nil { - klog.Error(log("blockMapper.SetupDevice failed: %v", err)) - if err := os.RemoveAll(globalMapPath); err != nil { - klog.Error(log("blockMapper.SetupDevice failed to remove dir after a NodeStageVolume() error [%s]: %v", globalMapPath, err)) - } + klog.Error(log("blockMapper.stageVolumeForBlock failed: %v", err)) return "", err } - klog.V(4).Infof(log("blockMapper.SetupDevice successfully requested NodeStageVolume [%s]", globalMapPathBlockFile)) - return globalMapPathBlockFile, nil + klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath)) + return stagingPath, nil } -func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error { - if !m.plugin.blockEnabled { - return errors.New("CSIBlockVolume feature not enabled") - } +// publishVolumeForBlock makes a block volume visible via the publish path +func (m *csiBlockMapper) publishVolumeForBlock( + ctx context.Context, + csi csiClient, + accessMode v1.PersistentVolumeAccessMode, + csiSource *v1.CSIPersistentVolumeSource, + attachment *storage.VolumeAttachment, + stagingPath string, +) (string, error) { + klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called")) - klog.V(4).Infof(log("blockMapper.MapDevice mapping block device %s", devicePath)) - - if m.spec == nil { - klog.Error(log("blockMapper.MapDevice spec is nil")) - return fmt.Errorf("spec is nil") - } - - csiSource, err := getCSISourceFromSpec(m.spec) - if err != nil { - klog.Error(log("blockMapper.MapDevice failed to get CSI persistent source: %v", err)) - return err - } - - csi := m.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) - defer cancel() - - globalMapPathBlockFile := devicePath - dir, _ := m.GetPodDeviceMapPath() - targetBlockFilePath := filepath.Join(dir, "file") - klog.V(4).Infof(log("blockMapper.MapDevice target volume map file path %s", targetBlockFilePath)) - - stageCapable, err := hasStageUnstageCapability(ctx, csi) - if err != nil { - klog.Error(log("blockMapper.MapDevice failed to check for STAGE_UNSTAGE_VOLUME capabilty: %v", err)) - return err - } - - if !stageCapable { - globalMapPathBlockFile = "" - } - - nodeName := string(m.plugin.host.GetNodeName()) - attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) - - // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName - attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) - if err != nil { - klog.Error(log("blockMapper.MapDevice failed to get volume attachment [id=%v]: %v", attachID, err)) - return err - } - - if attachment == nil { - klog.Error(log("blockMapper.MapDevice unable to find VolumeAttachment [id=%s]", attachID)) - return errors.New("no existing VolumeAttachment found") - } publishVolumeInfo := attachment.Status.AttachmentMetadata nodePublishSecrets := map[string]string{} + var err error if csiSource.NodePublishSecretRef != nil { nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef) if err != nil { - klog.Errorf("blockMapper.MapDevice failed to get NodePublishSecretRef %s/%s: %v", + klog.Errorf("blockMapper.publishVolumeForBlock failed to get NodePublishSecretRef %s/%s: %v", csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err) - return err + return "", err } } - if err := os.MkdirAll(dir, 0750); err != nil { - klog.Error(log("blockMapper.MapDevice failed to create dir %s: %v", dir, err)) - return err + publishPath := m.getPublishPath() + // setup path directory for stagingPath before call to NodeStageVolume + publishDir := filepath.Dir(publishPath) + if err := os.MkdirAll(publishDir, 0750); err != nil { + klog.Error(log("blockMapper.publishVolumeForBlock failed to create dir %s: %v", publishDir, err)) + return "", err } - klog.V(4).Info(log("blockMapper.MapDevice created target volume map path successfully [%s]", dir)) + klog.V(4).Info(log("blockMapper.publishVolumeForBlock created directory for publishPath successfully [%s]", publishDir)) - // create target map volume block file - targetBlockFile, err := os.OpenFile(targetBlockFilePath, os.O_CREATE|os.O_RDWR, 0750) + // create an empty file on publish path where block device is bind mounted to + publishPathFile, err := os.OpenFile(publishPath, os.O_CREATE|os.O_RDWR, 0750) if err != nil { - klog.Error(log("blockMapper.MapDevice failed to create file %s: %v", targetBlockFilePath, err)) - return err + klog.Error(log("blockMapper.publishVolumeForBlock failed to create file %s: %v", publishPathFile, err)) + return "", err } - if err := targetBlockFile.Close(); err != nil { - klog.Error(log("blockMapper.MapDevice failed to close file %s: %v", targetBlockFilePath, err)) - return err - } - klog.V(4).Info(log("blockMapper.MapDevice created target volume map file successfully [%s]", targetBlockFilePath)) - - //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI - accessMode := v1.ReadWriteOnce - if m.spec.PersistentVolume.Spec.AccessModes != nil { - accessMode = m.spec.PersistentVolume.Spec.AccessModes[0] + if err := publishPathFile.Close(); err != nil { + klog.Error(log("blockMapper.publishVolumeForBlock failed to close file %s: %v", publishPathFile, err)) + return "", err } + klog.V(4).Info(log("blockMapper.publishVolumeForBlock created an empty file on publish path successfully [%s]", publishPathFile)) + // Request to bind mount the device to publishPath. + // If driver doesn't implement NodeStageVolume, attaching the device to the node is required, here. err = csi.NodePublishVolume( ctx, m.volumeID, m.readOnly, - globalMapPathBlockFile, - targetBlockFilePath, + stagingPath, + publishPath, accessMode, publishVolumeInfo, csiSource.VolumeAttributes, @@ -280,17 +219,123 @@ func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, vol ) if err != nil { - klog.Errorf(log("blockMapper.MapDevice failed: %v", err)) - if err := os.RemoveAll(dir); err != nil { - klog.Error(log("blockMapper.MapDevice failed to remove mapped dir after a NodePublish() error [%s]: %v", dir, err)) - } + klog.Errorf(log("blockMapper.publishVolumeForBlock failed: %v", err)) + return "", err + } + + return publishPath, nil +} + +// SetUpDevice ensures the device is attached returns path where the device is located. +func (m *csiBlockMapper) SetUpDevice() (string, error) { + if !m.plugin.blockEnabled { + return "", errors.New("CSIBlockVolume feature not enabled") + } + klog.V(4).Infof(log("blockMapper.SetUpDevice called")) + + // Get csiSource from spec + if m.spec == nil { + klog.Error(log("blockMapper.SetUpDevice spec is nil")) + return "", fmt.Errorf("spec is nil") + } + + csiSource, err := getCSISourceFromSpec(m.spec) + if err != nil { + klog.Error(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err)) + return "", err + } + + // Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName + nodeName := string(m.plugin.host.GetNodeName()) + attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) + attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + klog.Error(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err)) + return "", err + } + + if attachment == nil { + klog.Error(log("blockMapper.SetupDevice unable to find VolumeAttachment [id=%s]", attachID)) + return "", errors.New("no existing VolumeAttachment found") + } + + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI + accessMode := v1.ReadWriteOnce + if m.spec.PersistentVolume.Spec.AccessModes != nil { + accessMode = m.spec.PersistentVolume.Spec.AccessModes[0] + } + + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + + // Call NodeStageVolume + stagingPath, err := m.stageVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment) + if err != nil { + return "", err + } + + // Call NodePublishVolume + publishPath, err := m.publishVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment, stagingPath) + if err != nil { + return "", err + } + + return publishPath, nil +} + +func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error { + return ioutil.MapBlockVolume(devicePath, globalMapPath, volumeMapPath, volumeMapName, podUID) +} + +var _ volume.BlockVolumeUnmapper = &csiBlockMapper{} + +// unpublishVolumeForBlock makes a block volume invisible via the publish path +func (m *csiBlockMapper) unpublishVolumeForBlock(ctx context.Context, csi csiClient, publishPath string) error { + // Request to unmount bind mount to publishPath and to detach the device from the node. + // If driver doesn't implement NodeUnstageVolume, detaching the device from the node is required, here. + if err := csi.NodeUnpublishVolume(ctx, m.volumeID, publishPath); err != nil { + klog.Error(log("blockMapper.unpublishVolumeForBlock failed: %v", err)) + return err + } + klog.V(4).Infof(log("blockMapper.unpublishVolumeForBlock NodeUnpublished successfully [%s]", publishPath)) + + // Remove publishPath + if err := os.Remove(publishPath); err != nil { + klog.Error(log("blockMapper.unpublishVolumeForBlock failed to remove staging path after NodeUnpublishVolume() error [%s]: %v", publishPath, err)) return err } return nil } -var _ volume.BlockVolumeUnmapper = &csiBlockMapper{} +// unstageVolumeForBlock makes a block volume invisible via the staging path +func (m *csiBlockMapper) unstageVolumeForBlock(ctx context.Context, csi csiClient, stagingPath string) error { + // Check whether "STAGE_UNSTAGE_VOLUME" is set + stageUnstageSet, err := hasStageUnstageCapability(ctx, csi) + if err != nil { + klog.Error(log("blockMapper.unstageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err)) + return err + } + if !stageUnstageSet { + klog.Infof(log("blockMapper.unstageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping unstageVolumeForBlock ...")) + return nil + } + + // Request to unmount bind mount to stagingPath and to detach the device from the node. + if err := csi.NodeUnstageVolume(ctx, m.volumeID, stagingPath); err != nil { + klog.Errorf(log("blockMapper.unstageVolumeForBlock failed: %v", err)) + return err + } + klog.V(4).Infof(log("blockMapper.unstageVolumeForBlock NodeUnstageVolume successfully [%s]", stagingPath)) + + // Remove stagingPath + if err := os.Remove(stagingPath); err != nil { + klog.Error(log("blockMapper.unstageVolumeForBlock failed to remove staging path after NodeUnstageVolume() error [%s]: %v", stagingPath, err)) + return err + } + + return nil +} // TearDownDevice removes traces of the SetUpDevice. func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error { @@ -300,26 +345,38 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath)) - csi := m.csiClient ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() - // unmap global device map path - if err := csi.NodeUnstageVolume(ctx, m.volumeID, globalMapPath); err != nil { - klog.Errorf(log("blockMapper.TearDownDevice failed: %v", err)) - return err - } - klog.V(4).Infof(log("blockMapper.TearDownDevice NodeUnstageVolume successfully [%s]", globalMapPath)) - - // request to remove pod volume map path also - podVolumePath, volumeName := m.GetPodDeviceMapPath() - podVolumeMapPath := filepath.Join(podVolumePath, volumeName) - if err := csi.NodeUnpublishVolume(ctx, m.volumeID, podVolumeMapPath); err != nil { - klog.Error(log("blockMapper.TearDownDevice failed: %v", err)) - return err + // Call NodeUnpublishVolume + publishPath := m.getPublishPath() + if _, err := os.Stat(publishPath); err != nil { + if os.IsNotExist(err) { + klog.V(4).Infof(log("blockMapper.TearDownDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath)) + } else { + return err + } + } else { + err := m.unpublishVolumeForBlock(ctx, m.csiClient, publishPath) + if err != nil { + return err + } } - klog.V(4).Infof(log("blockMapper.TearDownDevice NodeUnpublished successfully [%s]", podVolumeMapPath)) + // Call NodeUnstageVolume + stagingPath := m.getStagingPath() + if _, err := os.Stat(stagingPath); err != nil { + if os.IsNotExist(err) { + klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath)) + } else { + return err + } + } else { + err := m.unstageVolumeForBlock(ctx, m.csiClient, stagingPath) + if err != nil { + return err + } + } return nil } diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index b9de55609a..8bff6b7f85 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -30,6 +30,21 @@ import ( volumetest "k8s.io/kubernetes/pkg/volume/testing" ) +func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) { + pv := makeTestPV(specVolumeName, 10, testDriver, testVol) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + mapper, err := plug.NewBlockVolumeMapper( + spec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("Failed to make a new Mapper: %v", err) + } + csiMapper := mapper.(*csiBlockMapper) + return csiMapper, spec, pv, nil +} + func TestBlockMapperGetGlobalMapPath(t *testing.T) { plug, tmpDir := newTestPlugin(t, nil, nil) defer os.RemoveAll(tmpDir) @@ -53,17 +68,10 @@ func TestBlockMapperGetGlobalMapPath(t *testing.T) { } for _, tc := range testCases { t.Logf("test case: %s", tc.name) - pv := makeTestPV(tc.specVolumeName, 10, testDriver, testVol) - spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) - mapper, err := plug.NewBlockVolumeMapper( - spec, - &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, - volume.VolumeOptions{}, - ) + csiMapper, spec, _, err := prepareBlockMapperTest(plug, tc.specVolumeName) if err != nil { t.Fatalf("Failed to make a new Mapper: %v", err) } - csiMapper := mapper.(*csiBlockMapper) path, err := csiMapper.GetGlobalMapPath(spec) if err != nil { @@ -76,6 +84,115 @@ func TestBlockMapperGetGlobalMapPath(t *testing.T) { } } +func TestBlockMapperGetStagingPath(t *testing.T) { + plug, tmpDir := newTestPlugin(t, nil, nil) + defer os.RemoveAll(tmpDir) + + testCases := []struct { + name string + specVolumeName string + path string + }{ + { + name: "simple specName", + specVolumeName: "spec-0", + path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/staging/%s", "spec-0")), + }, + { + name: "specName with dots", + specVolumeName: "test.spec.1", + path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/staging/%s", "test.spec.1")), + }, + } + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + path := csiMapper.getStagingPath() + + if tc.path != path { + t.Errorf("expecting path %s, got %s", tc.path, path) + } + } +} + +func TestBlockMapperGetPublishPath(t *testing.T) { + plug, tmpDir := newTestPlugin(t, nil, nil) + defer os.RemoveAll(tmpDir) + + testCases := []struct { + name string + specVolumeName string + path string + }{ + { + name: "simple specName", + specVolumeName: "spec-0", + path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s", "spec-0")), + }, + { + name: "specName with dots", + specVolumeName: "test.spec.1", + path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s", "test.spec.1")), + }, + } + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + path := csiMapper.getPublishPath() + + if tc.path != path { + t.Errorf("expecting path %s, got %s", tc.path, path) + } + } +} + +func TestBlockMapperGetDeviceMapPath(t *testing.T) { + plug, tmpDir := newTestPlugin(t, nil, nil) + defer os.RemoveAll(tmpDir) + + testCases := []struct { + name string + specVolumeName string + path string + }{ + { + name: "simple specName", + specVolumeName: "spec-0", + path: path.Join(tmpDir, fmt.Sprintf("pods/%s/volumeDevices/kubernetes.io~csi", testPodUID)), + }, + { + name: "specName with dots", + specVolumeName: "test.spec.1", + path: path.Join(tmpDir, fmt.Sprintf("pods/%s/volumeDevices/kubernetes.io~csi", testPodUID)), + }, + } + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + + path, volName := csiMapper.GetPodDeviceMapPath() + + if tc.path != path { + t.Errorf("expecting path %s, got %s", tc.path, path) + } + + if tc.specVolumeName != volName { + t.Errorf("expecting volName %s, got %s", tc.specVolumeName, volName) + } + } +} + func TestBlockMapperSetupDevice(t *testing.T) { plug, tmpDir := newTestPlugin(t, nil, nil) defer os.RemoveAll(tmpDir) @@ -88,21 +205,15 @@ func TestBlockMapperSetupDevice(t *testing.T) { "fakeNode", ) plug.host = host - pv := makeTestPV("test-pv", 10, testDriver, testVol) + + csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv") + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + pvName := pv.GetName() nodeName := string(plug.host.GetNodeName()) - spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) - // MapDevice - mapper, err := plug.NewBlockVolumeMapper( - spec, - &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, - volume.VolumeOptions{}, - ) - if err != nil { - t.Fatalf("failed to create new mapper: %v", err) - } - csiMapper := mapper.(*csiBlockMapper) csiMapper.csiClient = setupClient(t, true) attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName)) @@ -119,22 +230,31 @@ func TestBlockMapperSetupDevice(t *testing.T) { t.Fatalf("mapper failed to SetupDevice: %v", err) } - globalMapPath, err := csiMapper.GetGlobalMapPath(spec) - if err != nil { - t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) + // Check if SetUpDevice returns the right path + publishPath := csiMapper.getPublishPath() + if devicePath != publishPath { + t.Fatalf("mapper.SetupDevice returned unexpected path %s instead of %v", devicePath, publishPath) } - if devicePath != filepath.Join(globalMapPath, "file") { - t.Fatalf("mapper.SetupDevice returned unexpected path %s instead of %v", devicePath, globalMapPath) + // Check if NodeStageVolume staged to the right path + stagingPath := csiMapper.getStagingPath() + svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + svol, ok := svols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodeStageVolume call") + } + if svol.Path != stagingPath { + t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path) } - vols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() - vol, ok := vols[csiMapper.volumeID] + // Check if NodePublishVolume published to the right path + pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + pvol, ok := pvols[csiMapper.volumeID] if !ok { t.Error("csi server may not have received NodePublishVolume call") } - if vol.Path != devicePath { - t.Errorf("csi server expected device path %s, got %s", devicePath, vol.Path) + if pvol.Path != publishPath { + t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path) } } @@ -150,21 +270,15 @@ func TestBlockMapperMapDevice(t *testing.T) { "fakeNode", ) plug.host = host - pv := makeTestPV("test-pv", 10, testDriver, testVol) + + csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv") + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + pvName := pv.GetName() nodeName := string(plug.host.GetNodeName()) - spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) - // MapDevice - mapper, err := plug.NewBlockVolumeMapper( - spec, - &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, - volume.VolumeOptions{}, - ) - if err != nil { - t.Fatalf("failed to create new mapper: %v", err) - } - csiMapper := mapper.(*csiBlockMapper) csiMapper.csiClient = setupClient(t, true) attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName)) @@ -185,6 +299,16 @@ func TestBlockMapperMapDevice(t *testing.T) { t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) } + // Actual SetupDevice should create a symlink to or a bind mout of device in devicePath. + // Create dummy file there before calling MapDevice to test it properly. + fd, err := os.Create(devicePath) + if err != nil { + t.Fatalf("mapper failed to create dummy file in devicePath: %v", err) + } + if err := fd.Close(); err != nil { + t.Fatalf("mapper failed to close dummy file in devicePath: %v", err) + } + // Map device to global and pod device map path volumeMapPath, volName := csiMapper.GetPodDeviceMapPath() err = csiMapper.MapDevice(devicePath, globalMapPath, volumeMapPath, volName, csiMapper.podUID) @@ -192,22 +316,26 @@ func TestBlockMapperMapDevice(t *testing.T) { t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) } - podVolumeBlockFilePath := filepath.Join(volumeMapPath, "file") - if _, err := os.Stat(podVolumeBlockFilePath); err != nil { + // Check if symlink {globalMapPath}/{podUID} exists + globalMapFilePath := filepath.Join(globalMapPath, string(csiMapper.podUID)) + if _, err := os.Stat(globalMapFilePath); err != nil { if os.IsNotExist(err) { - t.Errorf("mapper.MapDevice failed, volume path not created: %v", err) + t.Errorf("mapper.MapDevice failed, symlink in globalMapPath not created: %v", err) + t.Errorf("mapper.MapDevice devicePath:%v, globalMapPath: %v, globalMapFilePath: %v", + devicePath, globalMapPath, globalMapFilePath) } else { t.Errorf("mapper.MapDevice failed: %v", err) } } - pubs := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() - vol, ok := pubs[csiMapper.volumeID] - if !ok { - t.Error("csi server may not have received NodePublishVolume call") - } - if vol.Path != podVolumeBlockFilePath { - t.Errorf("csi server expected path %s, got %s", podVolumeBlockFilePath, vol.Path) + // Check if symlink {volumeMapPath}/{volName} exists + volumeMapFilePath := filepath.Join(volumeMapPath, volName) + if _, err := os.Stat(volumeMapFilePath); err != nil { + if os.IsNotExist(err) { + t.Errorf("mapper.MapDevice failed, symlink in volumeMapPath not created: %v", err) + } else { + t.Errorf("mapper.MapDevice failed: %v", err) + } } } @@ -223,8 +351,11 @@ func TestBlockMapperTearDownDevice(t *testing.T) { "fakeNode", ) plug.host = host - pv := makeTestPV("test-pv", 10, testDriver, testVol) - spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + + _, spec, pv, err := prepareBlockMapperTest(plug, "test-pv") + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } // save volume data dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) @@ -262,15 +393,15 @@ func TestBlockMapperTearDownDevice(t *testing.T) { t.Fatal(err) } - // ensure csi client call and node unstaged - vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() - if _, ok := vols[csiUnmapper.volumeID]; ok { - t.Error("csi server may not have received NodeUnstageVolume call") - } - // ensure csi client call and node unpblished pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() if _, ok := pubs[csiUnmapper.volumeID]; ok { t.Error("csi server may not have received NodeUnpublishVolume call") } + + // ensure csi client call and node unstaged + vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + if _, ok := vols[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnstageVolume call") + } } diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 26c5a712a0..36c10b6775 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -1045,7 +1045,7 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( } blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper( - string(deviceToDetach.VolumeName), + deviceToDetach.VolumeSpec.Name(), "" /* podUID */) if newUnmapperErr != nil { return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr)