mirror of https://github.com/k3s-io/k3s
CSI implementation of raw block volume support
parent
b672d2ee3c
commit
5044a3d12c
|
@ -331,6 +331,12 @@ const (
|
||||||
//
|
//
|
||||||
// Enable resource quota scope selectors
|
// Enable resource quota scope selectors
|
||||||
ResourceQuotaScopeSelectors utilfeature.Feature = "ResourceQuotaScopeSelectors"
|
ResourceQuotaScopeSelectors utilfeature.Feature = "ResourceQuotaScopeSelectors"
|
||||||
|
|
||||||
|
// owner: @vladimirvivien
|
||||||
|
// alpha: v1.11
|
||||||
|
//
|
||||||
|
// Enables CSI to use raw block storage volumes
|
||||||
|
CSIBlockVolume utilfeature.Feature = "CSIBlockVolume"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -387,6 +393,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
|
||||||
VolumeSubpathEnvExpansion: {Default: false, PreRelease: utilfeature.Alpha},
|
VolumeSubpathEnvExpansion: {Default: false, PreRelease: utilfeature.Alpha},
|
||||||
KubeletPluginsWatcher: {Default: false, PreRelease: utilfeature.Alpha},
|
KubeletPluginsWatcher: {Default: false, PreRelease: utilfeature.Alpha},
|
||||||
ResourceQuotaScopeSelectors: {Default: false, PreRelease: utilfeature.Alpha},
|
ResourceQuotaScopeSelectors: {Default: false, PreRelease: utilfeature.Alpha},
|
||||||
|
CSIBlockVolume: {Default: false, PreRelease: utilfeature.Alpha},
|
||||||
|
|
||||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||||
// unintentionally on either side:
|
// unintentionally on either side:
|
||||||
|
|
|
@ -4,6 +4,7 @@ go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"csi_attacher.go",
|
"csi_attacher.go",
|
||||||
|
"csi_block.go",
|
||||||
"csi_client.go",
|
"csi_client.go",
|
||||||
"csi_mounter.go",
|
"csi_mounter.go",
|
||||||
"csi_plugin.go",
|
"csi_plugin.go",
|
||||||
|
@ -36,6 +37,7 @@ go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = [
|
srcs = [
|
||||||
"csi_attacher_test.go",
|
"csi_attacher_test.go",
|
||||||
|
"csi_block_test.go",
|
||||||
"csi_client_test.go",
|
"csi_client_test.go",
|
||||||
"csi_mounter_test.go",
|
"csi_mounter_test.go",
|
||||||
"csi_plugin_test.go",
|
"csi_plugin_test.go",
|
||||||
|
@ -55,6 +57,7 @@ go_test(
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/util/testing:go_default_library",
|
"//vendor/k8s.io/client-go/util/testing:go_default_library",
|
||||||
|
|
|
@ -0,0 +1,283 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package csi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
)
|
||||||
|
|
||||||
|
type csiBlockMapper struct {
|
||||||
|
k8s kubernetes.Interface
|
||||||
|
csiClient csiClient
|
||||||
|
plugin *csiPlugin
|
||||||
|
driverName string
|
||||||
|
specName string
|
||||||
|
volumeID string
|
||||||
|
readOnly bool
|
||||||
|
spec *volume.Spec
|
||||||
|
podUID types.UID
|
||||||
|
volumeInfo map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ volume.BlockVolumeMapper = &csiBlockMapper{}
|
||||||
|
|
||||||
|
// GetGlobalMapPath returns a path (on the node) where the devicePath will be symlinked to
|
||||||
|
// Example: plugins/kubernetes.io/csi/volumeDevices/{volumeID}
|
||||||
|
func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
|
||||||
|
dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host)
|
||||||
|
glog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
|
||||||
|
return dir, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPodDeviceMapPath returns pod's device map path and volume name
|
||||||
|
// path: pods/{podUid}/volumeDevices/kubernetes.io~csi/, {volumeID}
|
||||||
|
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
|
||||||
|
path, specName := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, csiPluginName), m.specName
|
||||||
|
glog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath = %s", path))
|
||||||
|
return path, specName
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetUpDevice ensures the device is attached returns path where the device is located.
|
||||||
|
func (m *csiBlockMapper) SetUpDevice() (string, error) {
|
||||||
|
if !m.plugin.blockEnabled {
|
||||||
|
return "", errors.New("CSIBlockVolume feature not enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof(log("blockMapper.SetupDevice called"))
|
||||||
|
|
||||||
|
if m.spec == nil {
|
||||||
|
glog.Error(log("blockMapper.Map spec is nil"))
|
||||||
|
return "", fmt.Errorf("spec is nil")
|
||||||
|
}
|
||||||
|
csiSource, err := getCSISourceFromSpec(m.spec)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("blockMapper.SetupDevice failed to get CSI persistent source: %v", err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
globalMapPath, err := m.GetGlobalMapPath(m.spec)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("blockMapper.SetupDevice failed to get global map path: %v", err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
csi := m.csiClient
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Check whether "STAGE_UNSTAGE_VOLUME" is set
|
||||||
|
stageUnstageSet, err := hasStageUnstageCapability(ctx, csi)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("blockMapper.SetupDevice failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if !stageUnstageSet {
|
||||||
|
glog.Infof(log("blockMapper.SetupDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start MountDevice
|
||||||
|
nodeName := string(m.plugin.host.GetNodeName())
|
||||||
|
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
|
||||||
|
|
||||||
|
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
|
||||||
|
attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if attachment == nil {
|
||||||
|
glog.Error(log("blockMapper.SetupDevice unable to find VolumeAttachment [id=%s]", attachID))
|
||||||
|
return "", errors.New("no existing VolumeAttachment found")
|
||||||
|
}
|
||||||
|
publishVolumeInfo := attachment.Status.AttachmentMetadata
|
||||||
|
|
||||||
|
nodeStageSecrets := map[string]string{}
|
||||||
|
if csiSource.NodeStageSecretRef != nil {
|
||||||
|
nodeStageSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodeStageSecretRef)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to get NodeStageSecretRef %s/%s: %v",
|
||||||
|
csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create globalMapPath before call to NodeStageVolume
|
||||||
|
if err := os.MkdirAll(globalMapPath, 0750); err != nil {
|
||||||
|
glog.Error(log("blockMapper.SetupDevice failed to create dir %s: %v", globalMapPath, err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
glog.V(4).Info(log("blockMapper.SetupDevice created global device map path successfully [%s]", globalMapPath))
|
||||||
|
|
||||||
|
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
|
||||||
|
accessMode := v1.ReadWriteOnce
|
||||||
|
if m.spec.PersistentVolume.Spec.AccessModes != nil {
|
||||||
|
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
err = csi.NodeStageVolume(ctx,
|
||||||
|
csiSource.VolumeHandle,
|
||||||
|
publishVolumeInfo,
|
||||||
|
globalMapPath,
|
||||||
|
fsTypeBlockName,
|
||||||
|
accessMode,
|
||||||
|
nodeStageSecrets,
|
||||||
|
csiSource.VolumeAttributes)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("blockMapper.SetupDevice failed: %v", err))
|
||||||
|
if err := os.RemoveAll(globalMapPath); err != nil {
|
||||||
|
glog.Error(log("blockMapper.SetupDevice failed to remove dir after a NodeStageVolume() error [%s]: %v", globalMapPath, err))
|
||||||
|
}
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof(log("blockMapper.SetupDevice successfully requested NodeStageVolume [%s]", globalMapPath))
|
||||||
|
return globalMapPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *csiBlockMapper) MapDevice(devicePath, globalMapPath, volumeMapPath, volumeMapName string, podUID types.UID) error {
|
||||||
|
if !m.plugin.blockEnabled {
|
||||||
|
return errors.New("CSIBlockVolume feature not enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof(log("blockMapper.MapDevice mapping block device %s", devicePath))
|
||||||
|
|
||||||
|
if m.spec == nil {
|
||||||
|
glog.Error(log("blockMapper.MapDevice spec is nil"))
|
||||||
|
return fmt.Errorf("spec is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
csiSource, err := getCSISourceFromSpec(m.spec)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("blockMapper.Map failed to get CSI persistent source: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dir := filepath.Join(volumeMapPath, volumeMapName)
|
||||||
|
csi := m.csiClient
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
nodeName := string(m.plugin.host.GetNodeName())
|
||||||
|
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
|
||||||
|
|
||||||
|
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
|
||||||
|
attachment, err := m.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("blockMapper.MapDevice failed to get volume attachment [id=%v]: %v", attachID, err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if attachment == nil {
|
||||||
|
glog.Error(log("blockMapper.MapDevice unable to find VolumeAttachment [id=%s]", attachID))
|
||||||
|
return errors.New("no existing VolumeAttachment found")
|
||||||
|
}
|
||||||
|
publishVolumeInfo := attachment.Status.AttachmentMetadata
|
||||||
|
|
||||||
|
nodePublishSecrets := map[string]string{}
|
||||||
|
if csiSource.NodePublishSecretRef != nil {
|
||||||
|
nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("blockMapper.MapDevice failed to get NodePublishSecretRef %s/%s: %v",
|
||||||
|
csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||||
|
glog.Error(log("blockMapper.MapDevice failed to create dir %#v: %v", dir, err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(4).Info(log("blockMapper.MapDevice created NodePublish path [%s]", dir))
|
||||||
|
|
||||||
|
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
|
||||||
|
accessMode := v1.ReadWriteOnce
|
||||||
|
if m.spec.PersistentVolume.Spec.AccessModes != nil {
|
||||||
|
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
err = csi.NodePublishVolume(
|
||||||
|
ctx,
|
||||||
|
m.volumeID,
|
||||||
|
m.readOnly,
|
||||||
|
globalMapPath,
|
||||||
|
dir,
|
||||||
|
accessMode,
|
||||||
|
publishVolumeInfo,
|
||||||
|
csiSource.VolumeAttributes,
|
||||||
|
nodePublishSecrets,
|
||||||
|
fsTypeBlockName,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf(log("blockMapper.MapDevice failed: %v", err))
|
||||||
|
if err := os.RemoveAll(dir); err != nil {
|
||||||
|
glog.Error(log("blockMapper.MapDevice failed to remove mount dir after a NodePublish() error [%s]: %v", dir, err))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
|
||||||
|
|
||||||
|
// TearDownDevice removes traces of the SetUpDevice.
|
||||||
|
func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error {
|
||||||
|
if !m.plugin.blockEnabled {
|
||||||
|
return errors.New("CSIBlockVolume feature not enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath))
|
||||||
|
|
||||||
|
csi := m.csiClient
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// unmap global device map path
|
||||||
|
if err := csi.NodeUnstageVolume(ctx, m.volumeID, globalMapPath); err != nil {
|
||||||
|
glog.Errorf(log("blockMapper.TearDownDevice failed: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(4).Infof(log("blockMapper.TearDownDevice NodeUnstageVolume successfully [%s]", globalMapPath))
|
||||||
|
|
||||||
|
// request to remove pod volume map path also
|
||||||
|
podVolumePath, volumeName := m.GetPodDeviceMapPath()
|
||||||
|
podVolumeMapPath := filepath.Join(podVolumePath, volumeName)
|
||||||
|
if err := csi.NodeUnpublishVolume(ctx, m.volumeID, podVolumeMapPath); err != nil {
|
||||||
|
glog.Error(log("blockMapper.TearDownDevice failed: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof(log("blockMapper.TearDownDevice NodeUnpublished successfully [%s]", podVolumeMapPath))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,264 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package csi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
api "k8s.io/api/core/v1"
|
||||||
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||||
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBlockMapperGetGlobalMapPath(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
// TODO (vladimirvivien) specName with slashes will not work
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
specVolumeName string
|
||||||
|
path string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple specName",
|
||||||
|
specVolumeName: "spec-0",
|
||||||
|
path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/%s/%s", "spec-0", "dev")),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "specName with dots",
|
||||||
|
specVolumeName: "test.spec.1",
|
||||||
|
path: path.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/%s/%s", "test.spec.1", "dev")),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Logf("test case: %s", tc.name)
|
||||||
|
pv := makeTestPV(tc.specVolumeName, 10, testDriver, testVol)
|
||||||
|
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
||||||
|
mapper, err := plug.NewBlockVolumeMapper(
|
||||||
|
spec,
|
||||||
|
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
||||||
|
volume.VolumeOptions{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Mapper: %v", err)
|
||||||
|
}
|
||||||
|
csiMapper := mapper.(*csiBlockMapper)
|
||||||
|
|
||||||
|
path, err := csiMapper.GetGlobalMapPath(spec)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("mapper GetGlobalMapPath failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.path != path {
|
||||||
|
t.Errorf("expecting path %s, got %s", tc.path, path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlockMapperSetupDevice(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
fakeClient := fakeclient.NewSimpleClientset()
|
||||||
|
host := volumetest.NewFakeVolumeHostWithNodeName(
|
||||||
|
tmpDir,
|
||||||
|
fakeClient,
|
||||||
|
nil,
|
||||||
|
"fakeNode",
|
||||||
|
)
|
||||||
|
plug.host = host
|
||||||
|
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
||||||
|
pvName := pv.GetName()
|
||||||
|
nodeName := string(plug.host.GetNodeName())
|
||||||
|
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
||||||
|
|
||||||
|
// MapDevice
|
||||||
|
mapper, err := plug.NewBlockVolumeMapper(
|
||||||
|
spec,
|
||||||
|
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
||||||
|
volume.VolumeOptions{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create new mapper: %v", err)
|
||||||
|
}
|
||||||
|
csiMapper := mapper.(*csiBlockMapper)
|
||||||
|
csiMapper.csiClient = setupClient(t, true)
|
||||||
|
|
||||||
|
attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName))
|
||||||
|
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||||
|
attachment.Status.Attached = true
|
||||||
|
_, err = csiMapper.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||||
|
}
|
||||||
|
t.Log("created attachement ", attachID)
|
||||||
|
|
||||||
|
devicePath, err := csiMapper.SetUpDevice()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapper failed to SetupDevice: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
globalMapPath, err := csiMapper.GetGlobalMapPath(spec)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if devicePath != globalMapPath {
|
||||||
|
t.Fatalf("mapper.SetupDevice returned unexpected path %s instead of %v", devicePath, globalMapPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
vols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||||
|
if vols[csiMapper.volumeID] != devicePath {
|
||||||
|
t.Error("csi server may not have received NodePublishVolume call")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlockMapperMapDevice(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
fakeClient := fakeclient.NewSimpleClientset()
|
||||||
|
host := volumetest.NewFakeVolumeHostWithNodeName(
|
||||||
|
tmpDir,
|
||||||
|
fakeClient,
|
||||||
|
nil,
|
||||||
|
"fakeNode",
|
||||||
|
)
|
||||||
|
plug.host = host
|
||||||
|
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
||||||
|
pvName := pv.GetName()
|
||||||
|
nodeName := string(plug.host.GetNodeName())
|
||||||
|
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
||||||
|
|
||||||
|
// MapDevice
|
||||||
|
mapper, err := plug.NewBlockVolumeMapper(
|
||||||
|
spec,
|
||||||
|
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
||||||
|
volume.VolumeOptions{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create new mapper: %v", err)
|
||||||
|
}
|
||||||
|
csiMapper := mapper.(*csiBlockMapper)
|
||||||
|
csiMapper.csiClient = setupClient(t, true)
|
||||||
|
|
||||||
|
attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName))
|
||||||
|
attachment := makeTestAttachment(attachID, nodeName, pvName)
|
||||||
|
attachment.Status.Attached = true
|
||||||
|
_, err = csiMapper.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to setup VolumeAttachment: %v", err)
|
||||||
|
}
|
||||||
|
t.Log("created attachement ", attachID)
|
||||||
|
|
||||||
|
devicePath, err := csiMapper.SetUpDevice()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapper failed to SetupDevice: %v", err)
|
||||||
|
}
|
||||||
|
globalMapPath, err := csiMapper.GetGlobalMapPath(csiMapper.spec)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Map device to global and pod device map path
|
||||||
|
volumeMapPath, volName := csiMapper.GetPodDeviceMapPath()
|
||||||
|
err = csiMapper.MapDevice(devicePath, globalMapPath, volumeMapPath, volName, csiMapper.podUID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(filepath.Join(volumeMapPath, volName)); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
t.Errorf("mapper.MapDevice failed, volume path not created: %s", volumeMapPath)
|
||||||
|
} else {
|
||||||
|
t.Errorf("mapper.MapDevice failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pubs := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
||||||
|
if pubs[csiMapper.volumeID] != volumeMapPath {
|
||||||
|
t.Error("csi server may not have received NodePublishVolume call")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlockMapperTearDownDevice(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
fakeClient := fakeclient.NewSimpleClientset()
|
||||||
|
host := volumetest.NewFakeVolumeHostWithNodeName(
|
||||||
|
tmpDir,
|
||||||
|
fakeClient,
|
||||||
|
nil,
|
||||||
|
"fakeNode",
|
||||||
|
)
|
||||||
|
plug.host = host
|
||||||
|
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
||||||
|
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
||||||
|
|
||||||
|
// save volume data
|
||||||
|
dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
||||||
|
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) {
|
||||||
|
t.Errorf("failed to create dir [%s]: %v", dir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := saveVolumeData(
|
||||||
|
dir,
|
||||||
|
volDataFileName,
|
||||||
|
map[string]string{
|
||||||
|
volDataKey.specVolID: pv.ObjectMeta.Name,
|
||||||
|
volDataKey.driverName: testDriver,
|
||||||
|
volDataKey.volHandle: testVol,
|
||||||
|
},
|
||||||
|
); err != nil {
|
||||||
|
t.Fatalf("failed to save volume data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to make a new Unmapper: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
csiUnmapper := unmapper.(*csiBlockMapper)
|
||||||
|
csiUnmapper.csiClient = setupClient(t, true)
|
||||||
|
|
||||||
|
globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure csi client call and node unstaged
|
||||||
|
vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
|
||||||
|
if _, ok := vols[csiUnmapper.volumeID]; ok {
|
||||||
|
t.Error("csi server may not have received NodeUnstageVolume call")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure csi client call and node unpblished
|
||||||
|
pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
|
||||||
|
if _, ok := pubs[csiUnmapper.volumeID]; ok {
|
||||||
|
t.Error("csi server may not have received NodeUnpublishVolume call")
|
||||||
|
}
|
||||||
|
}
|
|
@ -113,17 +113,24 @@ func (c *csiDriverClient) NodePublishVolume(
|
||||||
AccessMode: &csipb.VolumeCapability_AccessMode{
|
AccessMode: &csipb.VolumeCapability_AccessMode{
|
||||||
Mode: asCSIAccessMode(accessMode),
|
Mode: asCSIAccessMode(accessMode),
|
||||||
},
|
},
|
||||||
AccessType: &csipb.VolumeCapability_Mount{
|
|
||||||
Mount: &csipb.VolumeCapability_MountVolume{
|
|
||||||
FsType: fsType,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if stagingTargetPath != "" {
|
if stagingTargetPath != "" {
|
||||||
req.StagingTargetPath = stagingTargetPath
|
req.StagingTargetPath = stagingTargetPath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if fsType == fsTypeBlockName {
|
||||||
|
req.VolumeCapability.AccessType = &csipb.VolumeCapability_Block{
|
||||||
|
Block: &csipb.VolumeCapability_BlockVolume{},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
req.VolumeCapability.AccessType = &csipb.VolumeCapability_Mount{
|
||||||
|
Mount: &csipb.VolumeCapability_MountVolume{
|
||||||
|
FsType: fsType,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, err = nodeClient.NodePublishVolume(ctx, req)
|
_, err = nodeClient.NodePublishVolume(ctx, req)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -185,16 +192,23 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
|
||||||
AccessMode: &csipb.VolumeCapability_AccessMode{
|
AccessMode: &csipb.VolumeCapability_AccessMode{
|
||||||
Mode: asCSIAccessMode(accessMode),
|
Mode: asCSIAccessMode(accessMode),
|
||||||
},
|
},
|
||||||
AccessType: &csipb.VolumeCapability_Mount{
|
|
||||||
Mount: &csipb.VolumeCapability_MountVolume{
|
|
||||||
FsType: fsType,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
NodeStageSecrets: nodeStageSecrets,
|
NodeStageSecrets: nodeStageSecrets,
|
||||||
VolumeAttributes: volumeAttribs,
|
VolumeAttributes: volumeAttribs,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if fsType == fsTypeBlockName {
|
||||||
|
req.VolumeCapability.AccessType = &csipb.VolumeCapability_Block{
|
||||||
|
Block: &csipb.VolumeCapability_BlockVolume{},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
req.VolumeCapability.AccessType = &csipb.VolumeCapability_Mount{
|
||||||
|
Mount: &csipb.VolumeCapability_MountVolume{
|
||||||
|
FsType: fsType,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, err = nodeClient.NodeStageVolume(ctx, req)
|
_, err = nodeClient.NodeStageVolume(ctx, req)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@ package csi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
@ -294,45 +293,6 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// saveVolumeData persists parameter data as json file at the provided location
|
|
||||||
func saveVolumeData(dir string, fileName string, data map[string]string) error {
|
|
||||||
dataFilePath := path.Join(dir, fileName)
|
|
||||||
glog.V(4).Info(log("saving volume data file [%s]", dataFilePath))
|
|
||||||
file, err := os.Create(dataFilePath)
|
|
||||||
if err != nil {
|
|
||||||
glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer file.Close()
|
|
||||||
if err := json.NewEncoder(file).Encode(data); err != nil {
|
|
||||||
glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
glog.V(4).Info(log("volume data file saved successfully [%s]", dataFilePath))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// loadVolumeData loads volume info from specified json file/location
|
|
||||||
func loadVolumeData(dir string, fileName string) (map[string]string, error) {
|
|
||||||
// remove /mount at the end
|
|
||||||
dataFileName := path.Join(dir, fileName)
|
|
||||||
glog.V(4).Info(log("loading volume data file [%s]", dataFileName))
|
|
||||||
|
|
||||||
file, err := os.Open(dataFileName)
|
|
||||||
if err != nil {
|
|
||||||
glog.Error(log("failed to open volume data file [%s]: %v", dataFileName, err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer file.Close()
|
|
||||||
data := map[string]string{}
|
|
||||||
if err := json.NewDecoder(file).Decode(&data); err != nil {
|
|
||||||
glog.Error(log("failed to parse volume data file [%s]: %v", dataFileName, err))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return data, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check
|
// isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check
|
||||||
func isDirMounted(plug *csiPlugin, dir string) (bool, error) {
|
func isDirMounted(plug *csiPlugin, dir string) (bool, error) {
|
||||||
mounter := plug.host.GetMounter(plug.GetPluginName())
|
mounter := plug.host.GetMounter(plug.GetPluginName())
|
||||||
|
|
|
@ -29,6 +29,8 @@ import (
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/volume/csi/labelmanager"
|
"k8s.io/kubernetes/pkg/volume/csi/labelmanager"
|
||||||
|
@ -45,16 +47,19 @@ const (
|
||||||
csiTimeout = 15 * time.Second
|
csiTimeout = 15 * time.Second
|
||||||
volNameSep = "^"
|
volNameSep = "^"
|
||||||
volDataFileName = "vol_data.json"
|
volDataFileName = "vol_data.json"
|
||||||
|
fsTypeBlockName = "block"
|
||||||
)
|
)
|
||||||
|
|
||||||
type csiPlugin struct {
|
type csiPlugin struct {
|
||||||
host volume.VolumeHost
|
host volume.VolumeHost
|
||||||
|
blockEnabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProbeVolumePlugins returns implemented plugins
|
// ProbeVolumePlugins returns implemented plugins
|
||||||
func ProbeVolumePlugins() []volume.VolumePlugin {
|
func ProbeVolumePlugins() []volume.VolumePlugin {
|
||||||
p := &csiPlugin{
|
p := &csiPlugin{
|
||||||
host: nil,
|
host: nil,
|
||||||
|
blockEnabled: utilfeature.DefaultFeatureGate.Enabled(features.CSIBlockVolume),
|
||||||
}
|
}
|
||||||
return []volume.VolumePlugin{p}
|
return []volume.VolumePlugin{p}
|
||||||
}
|
}
|
||||||
|
@ -307,25 +312,132 @@ func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error)
|
||||||
return mount.GetMountRefs(m, deviceMountPath)
|
return mount.GetMountRefs(m, deviceMountPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) {
|
// BlockVolumePlugin methods
|
||||||
if spec.PersistentVolume != nil &&
|
var _ volume.BlockVolumePlugin = &csiPlugin{}
|
||||||
spec.PersistentVolume.Spec.CSI != nil {
|
|
||||||
return spec.PersistentVolume.Spec.CSI, nil
|
func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
|
||||||
|
if !p.blockEnabled {
|
||||||
|
return nil, errors.New("CSIBlockVolume feature not enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
|
pvSource, err := getCSISourceFromSpec(spec)
|
||||||
}
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) {
|
}
|
||||||
if spec.PersistentVolume != nil &&
|
readOnly, err := getReadOnlyFromSpec(spec)
|
||||||
spec.PersistentVolume.Spec.CSI != nil {
|
if err != nil {
|
||||||
return spec.ReadOnly, nil
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
|
glog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
|
||||||
|
client := newCsiDriverClient(pvSource.Driver)
|
||||||
|
|
||||||
|
k8s := p.host.GetKubeClient()
|
||||||
|
if k8s == nil {
|
||||||
|
glog.Error(log("failed to get a kubernetes client"))
|
||||||
|
return nil, errors.New("failed to get a Kubernetes client")
|
||||||
|
}
|
||||||
|
|
||||||
|
mapper := &csiBlockMapper{
|
||||||
|
csiClient: client,
|
||||||
|
k8s: k8s,
|
||||||
|
plugin: p,
|
||||||
|
volumeID: pvSource.VolumeHandle,
|
||||||
|
driverName: pvSource.Driver,
|
||||||
|
readOnly: readOnly,
|
||||||
|
spec: spec,
|
||||||
|
podUID: podRef.UID,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save volume info in pod dir
|
||||||
|
dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
|
||||||
|
|
||||||
|
if err := os.MkdirAll(dataDir, 0750); err != nil {
|
||||||
|
glog.Error(log("failed to create data dir %s: %v", dataDir, err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
glog.V(4).Info(log("created path successfully [%s]", dataDir))
|
||||||
|
|
||||||
|
// persist volume info data for teardown
|
||||||
|
node := string(p.host.GetNodeName())
|
||||||
|
attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node)
|
||||||
|
volData := map[string]string{
|
||||||
|
volDataKey.specVolID: spec.Name(),
|
||||||
|
volDataKey.volHandle: pvSource.VolumeHandle,
|
||||||
|
volDataKey.driverName: pvSource.Driver,
|
||||||
|
volDataKey.nodeName: node,
|
||||||
|
volDataKey.attachmentID: attachID,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil {
|
||||||
|
glog.Error(log("failed to save volume info data: %v", err))
|
||||||
|
if err := os.RemoveAll(dataDir); err != nil {
|
||||||
|
glog.Error(log("failed to remove dir after error [%s]: %v", dataDir, err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return mapper, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// log prepends log string with `kubernetes.io/csi`
|
func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
|
||||||
func log(msg string, parts ...interface{}) string {
|
if !p.blockEnabled {
|
||||||
return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...)
|
return nil, errors.New("CSIBlockVolume feature not enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof(log("setting up block unmapper for [Spec=%v, podUID=%v]", volName, podUID))
|
||||||
|
unmapper := &csiBlockMapper{
|
||||||
|
plugin: p,
|
||||||
|
podUID: podUID,
|
||||||
|
specName: volName,
|
||||||
|
}
|
||||||
|
|
||||||
|
// load volume info from file
|
||||||
|
dataDir := getVolumeDeviceDataDir(unmapper.specName, p.host)
|
||||||
|
data, err := loadVolumeData(dataDir, volDataFileName)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("unmapper failed to load volume data file [%s]: %v", dataDir, err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
unmapper.driverName = data[volDataKey.driverName]
|
||||||
|
unmapper.volumeID = data[volDataKey.volHandle]
|
||||||
|
unmapper.csiClient = newCsiDriverClient(unmapper.driverName)
|
||||||
|
|
||||||
|
return unmapper, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) {
|
||||||
|
if !p.blockEnabled {
|
||||||
|
return nil, errors.New("CSIBlockVolume feature not enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof("plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]", string(podUID), specVolName, mapPath)
|
||||||
|
|
||||||
|
dataDir := getVolumeDeviceDataDir(specVolName, p.host)
|
||||||
|
volData, err := loadVolumeData(dataDir, volDataFileName)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v", mapPath, err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Info(log("plugin.ConstructBlockVolumeSpec extracted [%#v]", volData))
|
||||||
|
|
||||||
|
blockMode := api.PersistentVolumeBlock
|
||||||
|
pv := &api.PersistentVolume{
|
||||||
|
ObjectMeta: meta.ObjectMeta{
|
||||||
|
Name: volData[volDataKey.specVolID],
|
||||||
|
},
|
||||||
|
Spec: api.PersistentVolumeSpec{
|
||||||
|
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||||
|
CSI: &api.CSIPersistentVolumeSource{
|
||||||
|
Driver: volData[volDataKey.driverName],
|
||||||
|
VolumeHandle: volData[volDataKey.volHandle],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
VolumeMode: &blockMode,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return volume.NewSpecFromPersistentVolume(pv, false), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,12 +20,14 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||||
utiltesting "k8s.io/client-go/util/testing"
|
utiltesting "k8s.io/client-go/util/testing"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
@ -34,6 +36,11 @@ import (
|
||||||
|
|
||||||
// create a plugin mgr to load plugins and setup a fake client
|
// create a plugin mgr to load plugins and setup a fake client
|
||||||
func newTestPlugin(t *testing.T) (*csiPlugin, string) {
|
func newTestPlugin(t *testing.T) (*csiPlugin, string) {
|
||||||
|
err := utilfeature.DefaultFeatureGate.Set("CSIBlockVolume=true")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to enable feature gate for CSIBlockVolume: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create temp dir: %v", err)
|
t.Fatalf("can't create temp dir: %v", err)
|
||||||
|
@ -215,7 +222,21 @@ func TestPluginNewMounter(t *testing.T) {
|
||||||
t.Error("mounter pod not set")
|
t.Error("mounter pod not set")
|
||||||
}
|
}
|
||||||
if csiMounter.podUID == types.UID("") {
|
if csiMounter.podUID == types.UID("") {
|
||||||
t.Error("mounter podUID mot set")
|
t.Error("mounter podUID not set")
|
||||||
|
}
|
||||||
|
if csiMounter.csiClient == nil {
|
||||||
|
t.Error("mounter csiClient is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure data file is created
|
||||||
|
dataDir := path.Dir(mounter.GetPath())
|
||||||
|
dataFile := filepath.Join(dataDir, volDataFileName)
|
||||||
|
if _, err := os.Stat(dataFile); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
t.Errorf("data file not created %s", dataFile)
|
||||||
|
} else {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,6 +280,9 @@ func TestPluginNewUnmounter(t *testing.T) {
|
||||||
t.Error("podUID not set")
|
t.Error("podUID not set")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if csiUnmounter.csiClient == nil {
|
||||||
|
t.Error("unmounter csiClient is nil")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPluginNewAttacher(t *testing.T) {
|
func TestPluginNewAttacher(t *testing.T) {
|
||||||
|
@ -296,3 +320,156 @@ func TestPluginNewDetacher(t *testing.T) {
|
||||||
t.Error("Kubernetes client not set for detacher")
|
t.Error("Kubernetes client not set for detacher")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPluginNewBlockMapper(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
pv := makeTestPV("test-block-pv", 10, testDriver, testVol)
|
||||||
|
mounter, err := plug.NewBlockVolumeMapper(
|
||||||
|
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
|
||||||
|
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
||||||
|
volume.VolumeOptions{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new BlockMapper: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mounter == nil {
|
||||||
|
t.Fatal("failed to create CSI BlockMapper, mapper is nill")
|
||||||
|
}
|
||||||
|
csiMapper := mounter.(*csiBlockMapper)
|
||||||
|
|
||||||
|
// validate mounter fields
|
||||||
|
if csiMapper.driverName != testDriver {
|
||||||
|
t.Error("CSI block mapper missing driver name")
|
||||||
|
}
|
||||||
|
if csiMapper.volumeID != testVol {
|
||||||
|
t.Error("CSI block mapper missing volumeID")
|
||||||
|
}
|
||||||
|
|
||||||
|
if csiMapper.podUID == types.UID("") {
|
||||||
|
t.Error("CSI block mapper missing pod.UID")
|
||||||
|
}
|
||||||
|
if csiMapper.csiClient == nil {
|
||||||
|
t.Error("mapper csiClient is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure data file is created
|
||||||
|
dataFile := getVolumeDeviceDataDir(csiMapper.spec.Name(), plug.host)
|
||||||
|
if _, err := os.Stat(dataFile); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
t.Errorf("data file not created %s", dataFile)
|
||||||
|
} else {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPluginNewUnmapper(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
||||||
|
|
||||||
|
// save the data file to re-create client
|
||||||
|
dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
|
||||||
|
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) {
|
||||||
|
t.Errorf("failed to create dir [%s]: %v", dir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := saveVolumeData(
|
||||||
|
dir,
|
||||||
|
volDataFileName,
|
||||||
|
map[string]string{
|
||||||
|
volDataKey.specVolID: pv.ObjectMeta.Name,
|
||||||
|
volDataKey.driverName: testDriver,
|
||||||
|
volDataKey.volHandle: testVol,
|
||||||
|
},
|
||||||
|
); err != nil {
|
||||||
|
t.Fatalf("failed to save volume data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// test unmounter
|
||||||
|
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
|
||||||
|
csiUnmapper := unmapper.(*csiBlockMapper)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to make a new Unmounter: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if csiUnmapper == nil {
|
||||||
|
t.Fatal("failed to create CSI Unmounter")
|
||||||
|
}
|
||||||
|
|
||||||
|
if csiUnmapper.podUID != testPodUID {
|
||||||
|
t.Error("podUID not set")
|
||||||
|
}
|
||||||
|
|
||||||
|
if csiUnmapper.specName != pv.ObjectMeta.Name {
|
||||||
|
t.Error("specName not set")
|
||||||
|
}
|
||||||
|
|
||||||
|
if csiUnmapper.csiClient == nil {
|
||||||
|
t.Error("unmapper csiClient is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// test loaded vol data
|
||||||
|
if csiUnmapper.driverName != testDriver {
|
||||||
|
t.Error("unmapper driverName not set")
|
||||||
|
}
|
||||||
|
if csiUnmapper.volumeID != testVol {
|
||||||
|
t.Error("unmapper volumeHandle not set")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPluginConstructBlockVolumeSpec(t *testing.T) {
|
||||||
|
plug, tmpDir := newTestPlugin(t)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
specVolID string
|
||||||
|
data map[string]string
|
||||||
|
shouldFail bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "valid spec name",
|
||||||
|
specVolID: "test.vol.id",
|
||||||
|
data: map[string]string{volDataKey.specVolID: "test.vol.id", volDataKey.volHandle: "test-vol0", volDataKey.driverName: "test-driver0"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Logf("test case: %s", tc.name)
|
||||||
|
deviceDataDir := getVolumeDeviceDataDir(tc.specVolID, plug.host)
|
||||||
|
|
||||||
|
// create data file in csi plugin dir
|
||||||
|
if tc.data != nil {
|
||||||
|
if err := os.MkdirAll(deviceDataDir, 0755); err != nil && !os.IsNotExist(err) {
|
||||||
|
t.Errorf("failed to create dir [%s]: %v", deviceDataDir, err)
|
||||||
|
}
|
||||||
|
if err := saveVolumeData(deviceDataDir, volDataFileName, tc.data); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// rebuild spec
|
||||||
|
spec, err := plug.ConstructBlockVolumeSpec("test-podUID", tc.specVolID, getVolumeDevicePluginDir(tc.specVolID, plug.host))
|
||||||
|
if tc.shouldFail {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expecting ConstructVolumeSpec to fail, but got nil error")
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle
|
||||||
|
if volHandle != tc.data[volDataKey.volHandle] {
|
||||||
|
t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle)
|
||||||
|
}
|
||||||
|
|
||||||
|
if spec.Name() != tc.specVolID {
|
||||||
|
t.Errorf("Unexpected spec name %s", spec.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -17,10 +17,17 @@ limitations under the License.
|
||||||
package csi
|
package csi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
kstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||||
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) {
|
func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) {
|
||||||
|
@ -36,3 +43,81 @@ func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretRef
|
||||||
|
|
||||||
return credentials, nil
|
return credentials, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// saveVolumeData persists parameter data as json file at the provided location
|
||||||
|
func saveVolumeData(dir string, fileName string, data map[string]string) error {
|
||||||
|
dataFilePath := path.Join(dir, fileName)
|
||||||
|
glog.V(4).Info(log("saving volume data file [%s]", dataFilePath))
|
||||||
|
file, err := os.Create(dataFilePath)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
if err := json.NewEncoder(file).Encode(data); err != nil {
|
||||||
|
glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(4).Info(log("volume data file saved successfully [%s]", dataFilePath))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadVolumeData loads volume info from specified json file/location
|
||||||
|
func loadVolumeData(dir string, fileName string) (map[string]string, error) {
|
||||||
|
// remove /mount at the end
|
||||||
|
dataFileName := path.Join(dir, fileName)
|
||||||
|
glog.V(4).Info(log("loading volume data file [%s]", dataFileName))
|
||||||
|
|
||||||
|
file, err := os.Open(dataFileName)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error(log("failed to open volume data file [%s]: %v", dataFileName, err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
data := map[string]string{}
|
||||||
|
if err := json.NewDecoder(file).Decode(&data); err != nil {
|
||||||
|
glog.Error(log("failed to parse volume data file [%s]: %v", dataFileName, err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) {
|
||||||
|
if spec.PersistentVolume != nil &&
|
||||||
|
spec.PersistentVolume.Spec.CSI != nil {
|
||||||
|
return spec.PersistentVolume.Spec.CSI, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) {
|
||||||
|
if spec.PersistentVolume != nil &&
|
||||||
|
spec.PersistentVolume.Spec.CSI != nil {
|
||||||
|
return spec.ReadOnly, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
|
||||||
|
}
|
||||||
|
|
||||||
|
// log prepends log string with `kubernetes.io/csi`
|
||||||
|
func log(msg string, parts ...interface{}) string {
|
||||||
|
return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getVolumeDevicePluginDir returns the path where the CSI plugin keeps the
|
||||||
|
// symlink for a block device associated with a given specVolumeID.
|
||||||
|
// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev
|
||||||
|
func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string {
|
||||||
|
sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(specVolID)
|
||||||
|
return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "dev")
|
||||||
|
}
|
||||||
|
|
||||||
|
// getVolumeDeviceDataDir returns the path where the CSI plugin keeps the
|
||||||
|
// volume data for a block device associated with a given specVolumeID.
|
||||||
|
// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data
|
||||||
|
func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string {
|
||||||
|
sanitizedSpecVolID := kstrings.EscapeQualifiedNameForDisk(specVolID)
|
||||||
|
return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "data")
|
||||||
|
}
|
||||||
|
|
|
@ -105,7 +105,7 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli
|
||||||
if req.GetTargetPath() == "" {
|
if req.GetTargetPath() == "" {
|
||||||
return nil, errors.New("missing target path")
|
return nil, errors.New("missing target path")
|
||||||
}
|
}
|
||||||
fsTypes := "ext4|xfs|zfs"
|
fsTypes := "block|ext4|xfs|zfs"
|
||||||
fsType := req.GetVolumeCapability().GetMount().GetFsType()
|
fsType := req.GetVolumeCapability().GetMount().GetFsType()
|
||||||
if !strings.Contains(fsTypes, fsType) {
|
if !strings.Contains(fsTypes, fsType) {
|
||||||
return nil, errors.New("invalid fstype")
|
return nil, errors.New("invalid fstype")
|
||||||
|
@ -144,7 +144,7 @@ func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVo
|
||||||
}
|
}
|
||||||
|
|
||||||
fsType := ""
|
fsType := ""
|
||||||
fsTypes := "ext4|xfs|zfs"
|
fsTypes := "block|ext4|xfs|zfs"
|
||||||
mounted := req.GetVolumeCapability().GetMount()
|
mounted := req.GetVolumeCapability().GetMount()
|
||||||
if mounted != nil {
|
if mounted != nil {
|
||||||
fsType = mounted.GetFsType()
|
fsType = mounted.GetFsType()
|
||||||
|
|
Loading…
Reference in New Issue