From 5044a3d12cd8a7c01045651646d6a76864661027 Mon Sep 17 00:00:00 2001 From: Vladimir Vivien Date: Wed, 9 May 2018 14:28:53 -0400 Subject: [PATCH] CSI implementation of raw block volume support --- pkg/features/kube_features.go | 7 + pkg/volume/csi/BUILD | 3 + pkg/volume/csi/csi_block.go | 283 +++++++++++++++++++++++++++++ pkg/volume/csi/csi_block_test.go | 264 +++++++++++++++++++++++++++ pkg/volume/csi/csi_client.go | 34 +++- pkg/volume/csi/csi_mounter.go | 40 ---- pkg/volume/csi/csi_plugin.go | 146 +++++++++++++-- pkg/volume/csi/csi_plugin_test.go | 179 +++++++++++++++++- pkg/volume/csi/csi_util.go | 85 +++++++++ pkg/volume/csi/fake/fake_client.go | 4 +- 10 files changed, 975 insertions(+), 70 deletions(-) create mode 100644 pkg/volume/csi/csi_block.go create mode 100644 pkg/volume/csi/csi_block_test.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 491c7c0586..d29fa06d21 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -331,6 +331,12 @@ const ( // // Enable resource quota scope selectors ResourceQuotaScopeSelectors utilfeature.Feature = "ResourceQuotaScopeSelectors" + + // owner: @vladimirvivien + // alpha: v1.11 + // + // Enables CSI to use raw block storage volumes + CSIBlockVolume utilfeature.Feature = "CSIBlockVolume" ) func init() { @@ -387,6 +393,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS VolumeSubpathEnvExpansion: {Default: false, PreRelease: utilfeature.Alpha}, KubeletPluginsWatcher: {Default: false, PreRelease: utilfeature.Alpha}, ResourceQuotaScopeSelectors: {Default: false, PreRelease: utilfeature.Alpha}, + CSIBlockVolume: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index c8e7efc1f9..54a2574eb7 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "csi_attacher.go", + "csi_block.go", "csi_client.go", "csi_mounter.go", "csi_plugin.go", @@ -36,6 +37,7 @@ go_test( name = "go_default_test", srcs = [ "csi_attacher_test.go", + "csi_block_test.go", "csi_client_test.go", "csi_mounter_test.go", "csi_plugin_test.go", @@ -55,6 +57,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go new file mode 100644 index 0000000000..502747d28c --- /dev/null +++ b/pkg/volume/csi/csi_block.go @@ -0,0 +1,283 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/volume" +) + +type csiBlockMapper struct { + k8s kubernetes.Interface + csiClient csiClient + plugin *csiPlugin + driverName string + specName string + volumeID string + readOnly bool + spec *volume.Spec + podUID types.UID + volumeInfo map[string]string +} + +var _ volume.BlockVolumeMapper = &csiBlockMapper{} + +// GetGlobalMapPath returns a path (on the node) where the devicePath will be symlinked to +// Example: plugins/kubernetes.io/csi/volumeDevices/{volumeID} +func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) { + dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host) + glog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir)) + return dir, nil +} + +// GetPodDeviceMapPath returns pod's device map path and volume name +// path: pods/{podUid}/volumeDevices/kubernetes.io~csi/, {volumeID} +func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) { + path, specName := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, csiPluginName), m.specName + glog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath = %s", path)) + return path, specName +} + +// SetUpDevice ensures the device is attached returns path where the device is located. +func (m *csiBlockMapper) SetUpDevice() (string, error) { + if !m.plugin.blockEnabled { + return "", errors.New("CSIBlockVolume feature not enabled") + } + + glog.V(4).Infof(log("blockMapper.SetupDevice called")) + + if m.spec == nil { + glog.Error(log("blockMapper.Map spec is nil")) + return "", fmt.Errorf("spec is nil") + } + csiSource, err := getCSISourceFromSpec(m.spec) + if err != nil { + glog.Error(log("blockMapper.SetupDevice failed to get CSI persistent source: %v", err)) + return "", err + } + + globalMapPath, err := m.GetGlobalMapPath(m.spec) + if err != nil { + glog.Error(log("blockMapper.SetupDevice failed to get global map path: %v", err)) + return "", err + } + + csi := m.csiClient + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + + // Check whether "STAGE_UNSTAGE_VOLUME" is set + stageUnstageSet, err := hasStageUnstageCapability(ctx, csi) + if err != nil { + glog.Error(log("blockMapper.SetupDevice failed to check STAGE_UNSTAGE_VOLUME capability: %v", err)) + return "", err + } + if !stageUnstageSet { + glog.Infof(log("blockMapper.SetupDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice...")) + return "", nil + } + + // Start MountDevice + nodeName := string(m.plugin.host.GetNodeName()) + attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) + + // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName + attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + glog.Error(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err)) + return "", err + } + + if attachment == nil { + glog.Error(log("blockMapper.SetupDevice unable to find VolumeAttachment [id=%s]", attachID)) + return "", errors.New("no existing VolumeAttachment found") + } + publishVolumeInfo := attachment.Status.AttachmentMetadata + + nodeStageSecrets := map[string]string{} + if csiSource.NodeStageSecretRef != nil { + nodeStageSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodeStageSecretRef) + if err != nil { + return "", fmt.Errorf("failed to get NodeStageSecretRef %s/%s: %v", + csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err) + } + } + + // create globalMapPath before call to NodeStageVolume + if err := os.MkdirAll(globalMapPath, 0750); err != nil { + glog.Error(log("blockMapper.SetupDevice failed to create dir %s: %v", globalMapPath, err)) + return "", err + } + glog.V(4).Info(log("blockMapper.SetupDevice created global device map path successfully [%s]", globalMapPath)) + + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI + accessMode := v1.ReadWriteOnce + if m.spec.PersistentVolume.Spec.AccessModes != nil { + accessMode = m.spec.PersistentVolume.Spec.AccessModes[0] + } + + err = csi.NodeStageVolume(ctx, + csiSource.VolumeHandle, + publishVolumeInfo, + globalMapPath, + fsTypeBlockName, + accessMode, + nodeStageSecrets, + csiSource.VolumeAttributes) + + if err != nil { + glog.Error(log("blockMapper.SetupDevice failed: %v", err)) + if err := os.RemoveAll(globalMapPath); err != nil { + glog.Error(log("blockMapper.SetupDevice failed to remove dir after a NodeStageVolume() error [%s]: %v", globalMapPath, err)) + } + return "", err + } + + glog.V(4).Infof(log("blockMapper.SetupDevice successfully requested NodeStageVolume [%s]", globalMapPath)) + return globalMapPath, nil +} + +func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error { + if !m.plugin.blockEnabled { + return errors.New("CSIBlockVolume feature not enabled") + } + + glog.V(4).Infof(log("blockMapper.MapDevice mapping block device %s", devicePath)) + + if m.spec == nil { + glog.Error(log("blockMapper.MapDevice spec is nil")) + return fmt.Errorf("spec is nil") + } + + csiSource, err := getCSISourceFromSpec(m.spec) + if err != nil { + glog.Error(log("blockMapper.Map failed to get CSI persistent source: %v", err)) + return err + } + + dir := filepath.Join(volumeMapPath, volumeMapName) + csi := m.csiClient + + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + + nodeName := string(m.plugin.host.GetNodeName()) + attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) + + // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName + attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + glog.Error(log("blockMapper.MapDevice failed to get volume attachment [id=%v]: %v", attachID, err)) + return err + } + + if attachment == nil { + glog.Error(log("blockMapper.MapDevice unable to find VolumeAttachment [id=%s]", attachID)) + return errors.New("no existing VolumeAttachment found") + } + publishVolumeInfo := attachment.Status.AttachmentMetadata + + nodePublishSecrets := map[string]string{} + if csiSource.NodePublishSecretRef != nil { + nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef) + if err != nil { + glog.Errorf("blockMapper.MapDevice failed to get NodePublishSecretRef %s/%s: %v", + csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err) + return err + } + } + + if err := os.MkdirAll(dir, 0750); err != nil { + glog.Error(log("blockMapper.MapDevice failed to create dir %#v: %v", dir, err)) + return err + } + glog.V(4).Info(log("blockMapper.MapDevice created NodePublish path [%s]", dir)) + + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI + accessMode := v1.ReadWriteOnce + if m.spec.PersistentVolume.Spec.AccessModes != nil { + accessMode = m.spec.PersistentVolume.Spec.AccessModes[0] + } + + err = csi.NodePublishVolume( + ctx, + m.volumeID, + m.readOnly, + globalMapPath, + dir, + accessMode, + publishVolumeInfo, + csiSource.VolumeAttributes, + nodePublishSecrets, + fsTypeBlockName, + ) + + if err != nil { + glog.Errorf(log("blockMapper.MapDevice failed: %v", err)) + if err := os.RemoveAll(dir); err != nil { + glog.Error(log("blockMapper.MapDevice failed to remove mount dir after a NodePublish() error [%s]: %v", dir, err)) + } + return err + } + + return nil +} + +var _ volume.BlockVolumeUnmapper = &csiBlockMapper{} + +// TearDownDevice removes traces of the SetUpDevice. +func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error { + if !m.plugin.blockEnabled { + return errors.New("CSIBlockVolume feature not enabled") + } + + glog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath)) + + csi := m.csiClient + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + + // unmap global device map path + if err := csi.NodeUnstageVolume(ctx, m.volumeID, globalMapPath); err != nil { + glog.Errorf(log("blockMapper.TearDownDevice failed: %v", err)) + return err + } + glog.V(4).Infof(log("blockMapper.TearDownDevice NodeUnstageVolume successfully [%s]", globalMapPath)) + + // request to remove pod volume map path also + podVolumePath, volumeName := m.GetPodDeviceMapPath() + podVolumeMapPath := filepath.Join(podVolumePath, volumeName) + if err := csi.NodeUnpublishVolume(ctx, m.volumeID, podVolumeMapPath); err != nil { + glog.Error(log("blockMapper.TearDownDevice failed: %v", err)) + return err + } + + glog.V(4).Infof(log("blockMapper.TearDownDevice NodeUnpublished successfully [%s]", podVolumeMapPath)) + + return nil +} diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go new file mode 100644 index 0000000000..af34014788 --- /dev/null +++ b/pkg/volume/csi/csi_block_test.go @@ -0,0 +1,264 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "fmt" + "os" + "path" + "path/filepath" + "testing" + + api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + fakeclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" +) + +func TestBlockMapperGetGlobalMapPath(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + // TODO (vladimirvivien) specName with slashes will not work + testCases := []struct { + name string + specVolumeName string + path string + }{ + { + name: "simple specName", + specVolumeName: "spec-0", + path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/%s/%s", "spec-0", "dev")), + }, + { + name: "specName with dots", + specVolumeName: "test.spec.1", + path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/%s/%s", "test.spec.1", "dev")), + }, + } + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + pv := makeTestPV(tc.specVolumeName, 10, testDriver, testVol) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + mapper, err := plug.NewBlockVolumeMapper( + spec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("Failed to make a new Mapper: %v", err) + } + csiMapper := mapper.(*csiBlockMapper) + + path, err := csiMapper.GetGlobalMapPath(spec) + if err != nil { + t.Errorf("mapper GetGlobalMapPath failed: %v", err) + } + + if tc.path != path { + t.Errorf("expecting path %s, got %s", tc.path, path) + } + } +} + +func TestBlockMapperSetupDevice(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + fakeClient := fakeclient.NewSimpleClientset() + host := volumetest.NewFakeVolumeHostWithNodeName( + tmpDir, + fakeClient, + nil, + "fakeNode", + ) + plug.host = host + pv := makeTestPV("test-pv", 10, testDriver, testVol) + pvName := pv.GetName() + nodeName := string(plug.host.GetNodeName()) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + + // MapDevice + mapper, err := plug.NewBlockVolumeMapper( + spec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("failed to create new mapper: %v", err) + } + csiMapper := mapper.(*csiBlockMapper) + csiMapper.csiClient = setupClient(t, true) + + attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName)) + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = true + _, err = csiMapper.k8s.StorageV1beta1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + t.Log("created attachement ", attachID) + + devicePath, err := csiMapper.SetUpDevice() + if err != nil { + t.Fatalf("mapper failed to SetupDevice: %v", err) + } + + globalMapPath, err := csiMapper.GetGlobalMapPath(spec) + if err != nil { + t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) + } + + if devicePath != globalMapPath { + t.Fatalf("mapper.SetupDevice returned unexpected path %s instead of %v", devicePath, globalMapPath) + } + + vols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + if vols[csiMapper.volumeID] != devicePath { + t.Error("csi server may not have received NodePublishVolume call") + } +} + +func TestBlockMapperMapDevice(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + fakeClient := fakeclient.NewSimpleClientset() + host := volumetest.NewFakeVolumeHostWithNodeName( + tmpDir, + fakeClient, + nil, + "fakeNode", + ) + plug.host = host + pv := makeTestPV("test-pv", 10, testDriver, testVol) + pvName := pv.GetName() + nodeName := string(plug.host.GetNodeName()) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + + // MapDevice + mapper, err := plug.NewBlockVolumeMapper( + spec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("failed to create new mapper: %v", err) + } + csiMapper := mapper.(*csiBlockMapper) + csiMapper.csiClient = setupClient(t, true) + + attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName)) + attachment := makeTestAttachment(attachID, nodeName, pvName) + attachment.Status.Attached = true + _, err = csiMapper.k8s.StorageV1beta1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + t.Log("created attachement ", attachID) + + devicePath, err := csiMapper.SetUpDevice() + if err != nil { + t.Fatalf("mapper failed to SetupDevice: %v", err) + } + globalMapPath, err := csiMapper.GetGlobalMapPath(csiMapper.spec) + if err != nil { + t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) + } + + // Map device to global and pod device map path + volumeMapPath, volName := csiMapper.GetPodDeviceMapPath() + err = csiMapper.MapDevice(devicePath, globalMapPath, volumeMapPath, volName, csiMapper.podUID) + if err != nil { + t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) + } + + if _, err := os.Stat(filepath.Join(volumeMapPath, volName)); err != nil { + if os.IsNotExist(err) { + t.Errorf("mapper.MapDevice failed, volume path not created: %s", volumeMapPath) + } else { + t.Errorf("mapper.MapDevice failed: %v", err) + } + } + + pubs := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + if pubs[csiMapper.volumeID] != volumeMapPath { + t.Error("csi server may not have received NodePublishVolume call") + } +} + +func TestBlockMapperTearDownDevice(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + fakeClient := fakeclient.NewSimpleClientset() + host := volumetest.NewFakeVolumeHostWithNodeName( + tmpDir, + fakeClient, + nil, + "fakeNode", + ) + plug.host = host + pv := makeTestPV("test-pv", 10, testDriver, testVol) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + + // save volume data + dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) { + t.Errorf("failed to create dir [%s]: %v", dir, err) + } + + if err := saveVolumeData( + dir, + volDataFileName, + map[string]string{ + volDataKey.specVolID: pv.ObjectMeta.Name, + volDataKey.driverName: testDriver, + volDataKey.volHandle: testVol, + }, + ); err != nil { + t.Fatalf("failed to save volume data: %v", err) + } + + unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID) + if err != nil { + t.Fatalf("failed to make a new Unmapper: %v", err) + } + + csiUnmapper := unmapper.(*csiBlockMapper) + csiUnmapper.csiClient = setupClient(t, true) + + globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec) + if err != nil { + t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err) + } + + err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test") + if err != nil { + t.Fatal(err) + } + + // ensure csi client call and node unstaged + vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() + if _, ok := vols[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnstageVolume call") + } + + // ensure csi client call and node unpblished + pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + if _, ok := pubs[csiUnmapper.volumeID]; ok { + t.Error("csi server may not have received NodeUnpublishVolume call") + } +} diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index c6d11abb8c..e12772d7a9 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -113,17 +113,24 @@ func (c *csiDriverClient) NodePublishVolume( AccessMode: &csipb.VolumeCapability_AccessMode{ Mode: asCSIAccessMode(accessMode), }, - AccessType: &csipb.VolumeCapability_Mount{ - Mount: &csipb.VolumeCapability_MountVolume{ - FsType: fsType, - }, - }, }, } if stagingTargetPath != "" { req.StagingTargetPath = stagingTargetPath } + if fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipb.VolumeCapability_Block{ + Block: &csipb.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipb.VolumeCapability_Mount{ + Mount: &csipb.VolumeCapability_MountVolume{ + FsType: fsType, + }, + } + } + _, err = nodeClient.NodePublishVolume(ctx, req) return err } @@ -185,16 +192,23 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context, AccessMode: &csipb.VolumeCapability_AccessMode{ Mode: asCSIAccessMode(accessMode), }, - AccessType: &csipb.VolumeCapability_Mount{ - Mount: &csipb.VolumeCapability_MountVolume{ - FsType: fsType, - }, - }, }, NodeStageSecrets: nodeStageSecrets, VolumeAttributes: volumeAttribs, } + if fsType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csipb.VolumeCapability_Block{ + Block: &csipb.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csipb.VolumeCapability_Mount{ + Mount: &csipb.VolumeCapability_MountVolume{ + FsType: fsType, + }, + } + } + _, err = nodeClient.NodeStageVolume(ctx, req) return err } diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index cb3de6475f..b6d86956d1 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -18,7 +18,6 @@ package csi import ( "context" - "encoding/json" "errors" "fmt" "os" @@ -294,45 +293,6 @@ func (c *csiMountMgr) TearDownAt(dir string) error { return nil } -// saveVolumeData persists parameter data as json file at the provided location -func saveVolumeData(dir string, fileName string, data map[string]string) error { - dataFilePath := path.Join(dir, fileName) - glog.V(4).Info(log("saving volume data file [%s]", dataFilePath)) - file, err := os.Create(dataFilePath) - if err != nil { - glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err)) - return err - } - defer file.Close() - if err := json.NewEncoder(file).Encode(data); err != nil { - glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err)) - return err - } - glog.V(4).Info(log("volume data file saved successfully [%s]", dataFilePath)) - return nil -} - -// loadVolumeData loads volume info from specified json file/location -func loadVolumeData(dir string, fileName string) (map[string]string, error) { - // remove /mount at the end - dataFileName := path.Join(dir, fileName) - glog.V(4).Info(log("loading volume data file [%s]", dataFileName)) - - file, err := os.Open(dataFileName) - if err != nil { - glog.Error(log("failed to open volume data file [%s]: %v", dataFileName, err)) - return nil, err - } - defer file.Close() - data := map[string]string{} - if err := json.NewDecoder(file).Decode(&data); err != nil { - glog.Error(log("failed to parse volume data file [%s]: %v", dataFileName, err)) - return nil, err - } - - return data, nil -} - // isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check func isDirMounted(plug *csiPlugin, dir string) (bool, error) { mounter := plug.host.GetMounter(plug.GetPluginName()) diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 48b2a9835a..d10457686a 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -29,6 +29,8 @@ import ( api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/labelmanager" @@ -45,16 +47,19 @@ const ( csiTimeout = 15 * time.Second volNameSep = "^" volDataFileName = "vol_data.json" + fsTypeBlockName = "block" ) type csiPlugin struct { - host volume.VolumeHost + host volume.VolumeHost + blockEnabled bool } // ProbeVolumePlugins returns implemented plugins func ProbeVolumePlugins() []volume.VolumePlugin { p := &csiPlugin{ - host: nil, + host: nil, + blockEnabled: utilfeature.DefaultFeatureGate.Enabled(features.CSIBlockVolume), } return []volume.VolumePlugin{p} } @@ -307,25 +312,132 @@ func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) return mount.GetMountRefs(m, deviceMountPath) } -func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) { - if spec.PersistentVolume != nil && - spec.PersistentVolume.Spec.CSI != nil { - return spec.PersistentVolume.Spec.CSI, nil +// BlockVolumePlugin methods +var _ volume.BlockVolumePlugin = &csiPlugin{} + +func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) { + if !p.blockEnabled { + return nil, errors.New("CSIBlockVolume feature not enabled") } - return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") -} - -func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) { - if spec.PersistentVolume != nil && - spec.PersistentVolume.Spec.CSI != nil { - return spec.ReadOnly, nil + pvSource, err := getCSISourceFromSpec(spec) + if err != nil { + return nil, err + } + readOnly, err := getReadOnlyFromSpec(spec) + if err != nil { + return nil, err } - return false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") + glog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver)) + client := newCsiDriverClient(pvSource.Driver) + + k8s := p.host.GetKubeClient() + if k8s == nil { + glog.Error(log("failed to get a kubernetes client")) + return nil, errors.New("failed to get a Kubernetes client") + } + + mapper := &csiBlockMapper{ + csiClient: client, + k8s: k8s, + plugin: p, + volumeID: pvSource.VolumeHandle, + driverName: pvSource.Driver, + readOnly: readOnly, + spec: spec, + podUID: podRef.UID, + } + + // Save volume info in pod dir + dataDir := getVolumeDeviceDataDir(spec.Name(), p.host) + + if err := os.MkdirAll(dataDir, 0750); err != nil { + glog.Error(log("failed to create data dir %s: %v", dataDir, err)) + return nil, err + } + glog.V(4).Info(log("created path successfully [%s]", dataDir)) + + // persist volume info data for teardown + node := string(p.host.GetNodeName()) + attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node) + volData := map[string]string{ + volDataKey.specVolID: spec.Name(), + volDataKey.volHandle: pvSource.VolumeHandle, + volDataKey.driverName: pvSource.Driver, + volDataKey.nodeName: node, + volDataKey.attachmentID: attachID, + } + + if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil { + glog.Error(log("failed to save volume info data: %v", err)) + if err := os.RemoveAll(dataDir); err != nil { + glog.Error(log("failed to remove dir after error [%s]: %v", dataDir, err)) + return nil, err + } + return nil, err + } + + return mapper, nil } -// log prepends log string with `kubernetes.io/csi` -func log(msg string, parts ...interface{}) string { - return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...) +func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) { + if !p.blockEnabled { + return nil, errors.New("CSIBlockVolume feature not enabled") + } + + glog.V(4).Infof(log("setting up block unmapper for [Spec=%v, podUID=%v]", volName, podUID)) + unmapper := &csiBlockMapper{ + plugin: p, + podUID: podUID, + specName: volName, + } + + // load volume info from file + dataDir := getVolumeDeviceDataDir(unmapper.specName, p.host) + data, err := loadVolumeData(dataDir, volDataFileName) + if err != nil { + glog.Error(log("unmapper failed to load volume data file [%s]: %v", dataDir, err)) + return nil, err + } + unmapper.driverName = data[volDataKey.driverName] + unmapper.volumeID = data[volDataKey.volHandle] + unmapper.csiClient = newCsiDriverClient(unmapper.driverName) + + return unmapper, nil +} + +func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) { + if !p.blockEnabled { + return nil, errors.New("CSIBlockVolume feature not enabled") + } + + glog.V(4).Infof("plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]", string(podUID), specVolName, mapPath) + + dataDir := getVolumeDeviceDataDir(specVolName, p.host) + volData, err := loadVolumeData(dataDir, volDataFileName) + if err != nil { + glog.Error(log("plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v", mapPath, err)) + return nil, err + } + + glog.V(4).Info(log("plugin.ConstructBlockVolumeSpec extracted [%#v]", volData)) + + blockMode := api.PersistentVolumeBlock + pv := &api.PersistentVolume{ + ObjectMeta: meta.ObjectMeta{ + Name: volData[volDataKey.specVolID], + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + CSI: &api.CSIPersistentVolumeSource{ + Driver: volData[volDataKey.driverName], + VolumeHandle: volData[volDataKey.volHandle], + }, + }, + VolumeMode: &blockMode, + }, + } + + return volume.NewSpecFromPersistentVolume(pv, false), nil } diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index ed7a92ffe5..ce76592e4e 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -20,12 +20,14 @@ import ( "fmt" "os" "path" + "path/filepath" "testing" api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" fakeclient "k8s.io/client-go/kubernetes/fake" utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/volume" @@ -34,6 +36,11 @@ import ( // create a plugin mgr to load plugins and setup a fake client func newTestPlugin(t *testing.T) (*csiPlugin, string) { + err := utilfeature.DefaultFeatureGate.Set("CSIBlockVolume=true") + if err != nil { + t.Fatalf("Failed to enable feature gate for CSIBlockVolume: %v", err) + } + tmpDir, err := utiltesting.MkTmpdir("csi-test") if err != nil { t.Fatalf("can't create temp dir: %v", err) @@ -215,7 +222,21 @@ func TestPluginNewMounter(t *testing.T) { t.Error("mounter pod not set") } if csiMounter.podUID == types.UID("") { - t.Error("mounter podUID mot set") + t.Error("mounter podUID not set") + } + if csiMounter.csiClient == nil { + t.Error("mounter csiClient is nil") + } + + // ensure data file is created + dataDir := path.Dir(mounter.GetPath()) + dataFile := filepath.Join(dataDir, volDataFileName) + if _, err := os.Stat(dataFile); err != nil { + if os.IsNotExist(err) { + t.Errorf("data file not created %s", dataFile) + } else { + t.Fatal(err) + } } } @@ -259,6 +280,9 @@ func TestPluginNewUnmounter(t *testing.T) { t.Error("podUID not set") } + if csiUnmounter.csiClient == nil { + t.Error("unmounter csiClient is nil") + } } func TestPluginNewAttacher(t *testing.T) { @@ -296,3 +320,156 @@ func TestPluginNewDetacher(t *testing.T) { t.Error("Kubernetes client not set for detacher") } } + +func TestPluginNewBlockMapper(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + pv := makeTestPV("test-block-pv", 10, testDriver, testVol) + mounter, err := plug.NewBlockVolumeMapper( + volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if err != nil { + t.Fatalf("Failed to make a new BlockMapper: %v", err) + } + + if mounter == nil { + t.Fatal("failed to create CSI BlockMapper, mapper is nill") + } + csiMapper := mounter.(*csiBlockMapper) + + // validate mounter fields + if csiMapper.driverName != testDriver { + t.Error("CSI block mapper missing driver name") + } + if csiMapper.volumeID != testVol { + t.Error("CSI block mapper missing volumeID") + } + + if csiMapper.podUID == types.UID("") { + t.Error("CSI block mapper missing pod.UID") + } + if csiMapper.csiClient == nil { + t.Error("mapper csiClient is nil") + } + + // ensure data file is created + dataFile := getVolumeDeviceDataDir(csiMapper.spec.Name(), plug.host) + if _, err := os.Stat(dataFile); err != nil { + if os.IsNotExist(err) { + t.Errorf("data file not created %s", dataFile) + } else { + t.Fatal(err) + } + } +} + +func TestPluginNewUnmapper(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + pv := makeTestPV("test-pv", 10, testDriver, testVol) + + // save the data file to re-create client + dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) { + t.Errorf("failed to create dir [%s]: %v", dir, err) + } + + if err := saveVolumeData( + dir, + volDataFileName, + map[string]string{ + volDataKey.specVolID: pv.ObjectMeta.Name, + volDataKey.driverName: testDriver, + volDataKey.volHandle: testVol, + }, + ); err != nil { + t.Fatalf("failed to save volume data: %v", err) + } + + // test unmounter + unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID) + csiUnmapper := unmapper.(*csiBlockMapper) + + if err != nil { + t.Fatalf("Failed to make a new Unmounter: %v", err) + } + + if csiUnmapper == nil { + t.Fatal("failed to create CSI Unmounter") + } + + if csiUnmapper.podUID != testPodUID { + t.Error("podUID not set") + } + + if csiUnmapper.specName != pv.ObjectMeta.Name { + t.Error("specName not set") + } + + if csiUnmapper.csiClient == nil { + t.Error("unmapper csiClient is nil") + } + + // test loaded vol data + if csiUnmapper.driverName != testDriver { + t.Error("unmapper driverName not set") + } + if csiUnmapper.volumeID != testVol { + t.Error("unmapper volumeHandle not set") + } +} + +func TestPluginConstructBlockVolumeSpec(t *testing.T) { + plug, tmpDir := newTestPlugin(t) + defer os.RemoveAll(tmpDir) + + testCases := []struct { + name string + specVolID string + data map[string]string + shouldFail bool + }{ + { + name: "valid spec name", + specVolID: "test.vol.id", + data: map[string]string{volDataKey.specVolID: "test.vol.id", volDataKey.volHandle: "test-vol0", volDataKey.driverName: "test-driver0"}, + }, + } + + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + deviceDataDir := getVolumeDeviceDataDir(tc.specVolID, plug.host) + + // create data file in csi plugin dir + if tc.data != nil { + if err := os.MkdirAll(deviceDataDir, 0755); err != nil && !os.IsNotExist(err) { + t.Errorf("failed to create dir [%s]: %v", deviceDataDir, err) + } + if err := saveVolumeData(deviceDataDir, volDataFileName, tc.data); err != nil { + t.Fatal(err) + } + } + + // rebuild spec + spec, err := plug.ConstructBlockVolumeSpec("test-podUID", tc.specVolID, getVolumeDevicePluginDir(tc.specVolID, plug.host)) + if tc.shouldFail { + if err == nil { + t.Fatal("expecting ConstructVolumeSpec to fail, but got nil error") + } + continue + } + + volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle + if volHandle != tc.data[volDataKey.volHandle] { + t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle) + } + + if spec.Name() != tc.specVolID { + t.Errorf("Unexpected spec name %s", spec.Name()) + } + } +} diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index 81a973e44d..6a0b67212e 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -17,10 +17,17 @@ limitations under the License. package csi import ( + "encoding/json" + "fmt" + "os" + "path" + "github.com/golang/glog" api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + kstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/volume" ) func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) { @@ -36,3 +43,81 @@ func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretRef return credentials, nil } + +// saveVolumeData persists parameter data as json file at the provided location +func saveVolumeData(dir string, fileName string, data map[string]string) error { + dataFilePath := path.Join(dir, fileName) + glog.V(4).Info(log("saving volume data file [%s]", dataFilePath)) + file, err := os.Create(dataFilePath) + if err != nil { + glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err)) + return err + } + defer file.Close() + if err := json.NewEncoder(file).Encode(data); err != nil { + glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err)) + return err + } + glog.V(4).Info(log("volume data file saved successfully [%s]", dataFilePath)) + return nil +} + +// loadVolumeData loads volume info from specified json file/location +func loadVolumeData(dir string, fileName string) (map[string]string, error) { + // remove /mount at the end + dataFileName := path.Join(dir, fileName) + glog.V(4).Info(log("loading volume data file [%s]", dataFileName)) + + file, err := os.Open(dataFileName) + if err != nil { + glog.Error(log("failed to open volume data file [%s]: %v", dataFileName, err)) + return nil, err + } + defer file.Close() + data := map[string]string{} + if err := json.NewDecoder(file).Decode(&data); err != nil { + glog.Error(log("failed to parse volume data file [%s]: %v", dataFileName, err)) + return nil, err + } + + return data, nil +} + +func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) { + if spec.PersistentVolume != nil && + spec.PersistentVolume.Spec.CSI != nil { + return spec.PersistentVolume.Spec.CSI, nil + } + + return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") +} + +func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) { + if spec.PersistentVolume != nil && + spec.PersistentVolume.Spec.CSI != nil { + return spec.ReadOnly, nil + } + + return false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") +} + +// log prepends log string with `kubernetes.io/csi` +func log(msg string, parts ...interface{}) string { + return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...) +} + +// getVolumeDevicePluginDir returns the path where the CSI plugin keeps the +// symlink for a block device associated with a given specVolumeID. +// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev +func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string { + sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(specVolID) + return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "dev") +} + +// getVolumeDeviceDataDir returns the path where the CSI plugin keeps the +// volume data for a block device associated with a given specVolumeID. +// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data +func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string { + sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(specVolID) + return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "data") +} diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 805f73ca20..b4341e7151 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -105,7 +105,7 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli if req.GetTargetPath() == "" { return nil, errors.New("missing target path") } - fsTypes := "ext4|xfs|zfs" + fsTypes := "block|ext4|xfs|zfs" fsType := req.GetVolumeCapability().GetMount().GetFsType() if !strings.Contains(fsTypes, fsType) { return nil, errors.New("invalid fstype") @@ -144,7 +144,7 @@ func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVo } fsType := "" - fsTypes := "ext4|xfs|zfs" + fsTypes := "block|ext4|xfs|zfs" mounted := req.GetVolumeCapability().GetMount() if mounted != nil { fsType = mounted.GetFsType()