mirror of https://github.com/k3s-io/k3s
CSI - Multiple bug fixes for NodeProbe, vol data file, mount dir create
- NodeProbe rpc before node attach - Teardown fix using volume info data file stored on node - Pre-create the mount prior to calling nodepublishpull/6/head
parent
a6741ea743
commit
23d59cbe54
|
@ -14,6 +14,7 @@ go_library(
|
|||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/util/strings:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/golang.org/x/net/context:go_default_library",
|
||||
|
@ -38,7 +39,6 @@ go_test(
|
|||
importpath = "k8s.io/kubernetes/pkg/volume/csi",
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//pkg/util/strings:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/csi/fake:go_default_library",
|
||||
"//pkg/volume/testing:go_default_library",
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
|
||||
type csiClient interface {
|
||||
AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error
|
||||
NodeProbe(ctx grpctx.Context, ver *csipb.Version) error
|
||||
NodePublishVolume(
|
||||
ctx grpctx.Context,
|
||||
volumeid string,
|
||||
|
@ -135,6 +136,13 @@ func (c *csiDriverClient) AssertSupportedVersion(ctx grpctx.Context, ver *csipb.
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *csiDriverClient) NodeProbe(ctx grpctx.Context, ver *csipb.Version) error {
|
||||
glog.V(4).Info(log("sending NodeProbe rpc call to csi driver: [version %v]", ver))
|
||||
req := &csipb.NodeProbeRequest{Version: ver}
|
||||
_, err := c.nodeClient.NodeProbe(ctx, req)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *csiDriverClient) NodePublishVolume(
|
||||
ctx grpctx.Context,
|
||||
volID string,
|
||||
|
@ -145,7 +153,7 @@ func (c *csiDriverClient) NodePublishVolume(
|
|||
volumeAttribs map[string]string,
|
||||
fsType string,
|
||||
) error {
|
||||
|
||||
glog.V(4).Info(log("calling NodePublishVolume rpc [volid=%s,target_path=%s]", volID, targetPath))
|
||||
if volID == "" {
|
||||
return errors.New("missing volume id")
|
||||
}
|
||||
|
@ -182,7 +190,7 @@ func (c *csiDriverClient) NodePublishVolume(
|
|||
}
|
||||
|
||||
func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error {
|
||||
|
||||
glog.V(4).Info(log("calling NodeUnpublishVolume rpc: [volid=%s, target_path=%s", volID, targetPath))
|
||||
if volID == "" {
|
||||
return errors.New("missing volume id")
|
||||
}
|
||||
|
|
|
@ -62,6 +62,28 @@ func TestClientAssertSupportedVersion(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClientNodeProbe(t *testing.T) {
|
||||
testCases := []struct {
|
||||
testName string
|
||||
ver *csipb.Version
|
||||
mustFail bool
|
||||
err error
|
||||
}{
|
||||
{testName: "supported version", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}},
|
||||
{testName: "grpc error", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}, mustFail: true, err: errors.New("grpc error")},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Log("case: ", tc.testName)
|
||||
client := setupClient(t)
|
||||
client.nodeClient.(*fake.NodeClient).SetNextError(tc.err)
|
||||
err := client.NodeProbe(grpctx.Background(), tc.ver)
|
||||
if tc.mustFail && err == nil {
|
||||
t.Error("must fail, but err = nil")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientNodePublishVolume(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -30,20 +31,39 @@ import (
|
|||
"k8s.io/client-go/kubernetes"
|
||||
kstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
//TODO (vladimirvivien) move this in a central loc later
|
||||
var (
|
||||
volDataKey = struct {
|
||||
specVolID,
|
||||
volHandle,
|
||||
driverName,
|
||||
nodeName,
|
||||
attachmentID string
|
||||
}{
|
||||
"specVolID",
|
||||
"volumeHandle",
|
||||
"driverName",
|
||||
"nodeName",
|
||||
"attachmentID",
|
||||
}
|
||||
)
|
||||
|
||||
type csiMountMgr struct {
|
||||
k8s kubernetes.Interface
|
||||
csiClient csiClient
|
||||
plugin *csiPlugin
|
||||
driverName string
|
||||
volumeID string
|
||||
readOnly bool
|
||||
spec *volume.Spec
|
||||
pod *api.Pod
|
||||
podUID types.UID
|
||||
options volume.VolumeOptions
|
||||
volumeInfo map[string]string
|
||||
k8s kubernetes.Interface
|
||||
csiClient csiClient
|
||||
plugin *csiPlugin
|
||||
driverName string
|
||||
volumeID string
|
||||
specVolumeID string
|
||||
readOnly bool
|
||||
spec *volume.Spec
|
||||
pod *api.Pod
|
||||
podUID types.UID
|
||||
options volume.VolumeOptions
|
||||
volumeInfo map[string]string
|
||||
volume.MetricsNil
|
||||
}
|
||||
|
||||
|
@ -51,14 +71,14 @@ type csiMountMgr struct {
|
|||
var _ volume.Volume = &csiMountMgr{}
|
||||
|
||||
func (c *csiMountMgr) GetPath() string {
|
||||
return getTargetPath(c.podUID, c.driverName, c.volumeID, c.plugin.host)
|
||||
dir := path.Join(getTargetPath(c.podUID, c.specVolumeID, c.plugin.host), "/mount")
|
||||
glog.V(4).Info(log("mounter.GetPath generated [%s]", dir))
|
||||
return dir
|
||||
}
|
||||
|
||||
func getTargetPath(uid types.UID, driverName string, volID string, host volume.VolumeHost) string {
|
||||
// driverName validated at Mounter creation
|
||||
// sanitize (replace / with ~) in volumeID before it's appended to path:w
|
||||
driverPath := fmt.Sprintf("%s/%s", driverName, kstrings.EscapeQualifiedNameForDisk(volID))
|
||||
return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), driverPath)
|
||||
func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string {
|
||||
specVolID := kstrings.EscapeQualifiedNameForDisk(specVolumeID)
|
||||
return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), specVolID)
|
||||
}
|
||||
|
||||
// volume.Mounter methods
|
||||
|
@ -77,6 +97,17 @@ func (c *csiMountMgr) SetUp(fsGroup *int64) error {
|
|||
func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
||||
glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
|
||||
|
||||
mounted, err := isDirMounted(c.plugin, dir)
|
||||
if err != nil {
|
||||
glog.Error(log("mounter.SetUpAt failed while checking mount status for dir [%s]", dir))
|
||||
return err
|
||||
}
|
||||
|
||||
if mounted {
|
||||
glog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir))
|
||||
return nil
|
||||
}
|
||||
|
||||
csiSource, err := getCSISourceFromSpec(c.spec)
|
||||
if err != nil {
|
||||
glog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
|
||||
|
@ -92,13 +123,19 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
|||
|
||||
// ensure version is supported
|
||||
if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil {
|
||||
glog.Errorf(log("failed to assert version: %v", err))
|
||||
glog.Error(log("mounter.SetUpAt failed to assert version: %v", err))
|
||||
return err
|
||||
}
|
||||
|
||||
// probe driver
|
||||
// TODO (vladimirvivien) move probe call where it is done only when it is needed.
|
||||
if err := csi.NodeProbe(ctx, csiVersion); err != nil {
|
||||
glog.Error(log("mounter.SetUpAt failed to probe driver: %v", err))
|
||||
return err
|
||||
}
|
||||
|
||||
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
|
||||
if c.volumeInfo == nil {
|
||||
|
||||
attachment, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
|
||||
if err != nil {
|
||||
glog.Error(log("mounter.SetupAt failed while getting volume attachment [id=%v]: %v", attachID, err))
|
||||
|
@ -121,6 +158,31 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// create target_dir before call to NodePublish
|
||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||
glog.Error(log("mouter.SetUpAt failed to create dir %#v: %v", dir, err))
|
||||
return err
|
||||
}
|
||||
glog.V(4).Info(log("created target path successfully [%s]", dir))
|
||||
|
||||
// persist volume info data for teardown
|
||||
volData := map[string]string{
|
||||
volDataKey.specVolID: c.spec.Name(),
|
||||
volDataKey.volHandle: csiSource.VolumeHandle,
|
||||
volDataKey.driverName: csiSource.Driver,
|
||||
volDataKey.nodeName: nodeName,
|
||||
volDataKey.attachmentID: attachID,
|
||||
}
|
||||
|
||||
if err := saveVolumeData(c.plugin, c.podUID, c.spec.Name(), volData); err != nil {
|
||||
glog.Error(log("mounter.SetUpAt failed to save volume info data: %v", err))
|
||||
if err := removeMountDir(c.plugin, dir); err != nil {
|
||||
glog.Error(log("mounter.SetUpAt failed to remove mount dir after a saveVolumeData() error [%s]: %v", dir, err))
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
|
||||
accessMode := api.ReadWriteOnce
|
||||
if c.spec.PersistentVolume.Spec.AccessModes != nil {
|
||||
|
@ -139,11 +201,15 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
|||
)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf(log("Mounter.SetupAt failed: %v", err))
|
||||
glog.Errorf(log("mounter.SetupAt failed: %v", err))
|
||||
if err := removeMountDir(c.plugin, dir); err != nil {
|
||||
glog.Error(log("mounter.SetuAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, err))
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
glog.V(4).Infof(log("successfully mounted %s", dir))
|
||||
|
||||
glog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -164,10 +230,30 @@ func (c *csiMountMgr) TearDown() error {
|
|||
func (c *csiMountMgr) TearDownAt(dir string) error {
|
||||
glog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
|
||||
|
||||
// extract driverName and volID from path
|
||||
base, volID := path.Split(dir)
|
||||
volID = kstrings.UnescapeQualifiedNameForDisk(volID)
|
||||
driverName := path.Base(base)
|
||||
// is dir even mounted ?
|
||||
// TODO (vladimirvivien) this check may not work for an emptyDir or local storage
|
||||
// see https://github.com/kubernetes/kubernetes/pull/56836#discussion_r155834524
|
||||
mounted, err := isDirMounted(c.plugin, dir)
|
||||
if err != nil {
|
||||
glog.Error(log("unmounter.Teardown failed while checking mount status for dir [%s]: %v", dir, err))
|
||||
return err
|
||||
}
|
||||
|
||||
if !mounted {
|
||||
glog.V(4).Info(log("unmounter.Teardown skipping unmout, dir not mounted [%s]", dir))
|
||||
return nil
|
||||
}
|
||||
|
||||
// load volume info from file
|
||||
dataDir := path.Dir(dir) // dropoff /mount at end
|
||||
data, err := loadVolumeData(dataDir, volDataFileName)
|
||||
if err != nil {
|
||||
glog.Error(log("unmounter.Teardown failed to load volume data file using dir [%s]: %v", dir, err))
|
||||
return err
|
||||
}
|
||||
|
||||
volID := data[volDataKey.volHandle]
|
||||
driverName := data[volDataKey.driverName]
|
||||
|
||||
if c.csiClient == nil {
|
||||
addr := fmt.Sprintf(csiAddrTemplate, driverName)
|
||||
|
@ -183,18 +269,21 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
|
|||
|
||||
// TODO make all assertion calls private within the client itself
|
||||
if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil {
|
||||
glog.Errorf(log("failed to assert version: %v", err))
|
||||
glog.Errorf(log("mounter.SetUpAt failed to assert version: %v", err))
|
||||
return err
|
||||
}
|
||||
|
||||
err := csi.NodeUnpublishVolume(ctx, volID, dir)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf(log("Mounter.Setup failed: %v", err))
|
||||
if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil {
|
||||
glog.Errorf(log("mounter.SetUpAt failed: %v", err))
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(4).Infof(log("successfully unmounted %s", dir))
|
||||
// clean mount point dir
|
||||
if err := removeMountDir(c.plugin, dir); err != nil {
|
||||
glog.Error(log("mounter.SetUpAt failed to clean mount dir [%s]: %v", dir, err))
|
||||
return err
|
||||
}
|
||||
glog.V(4).Infof(log("mounte.SetUpAt successfully unmounted dir [%s]", dir))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -221,3 +310,92 @@ func getVolAttribsFromSpec(spec *volume.Spec) (map[string]string, error) {
|
|||
}
|
||||
return attribs, nil
|
||||
}
|
||||
|
||||
// saveVolumeData persists parameter data as json file using the locagion
|
||||
// generated by /var/lib/kubelet/pods/<podID>/volumes/kubernetes.io~csi/<specVolId>/volume_data.json
|
||||
func saveVolumeData(p *csiPlugin, podUID types.UID, specVolID string, data map[string]string) error {
|
||||
dir := getTargetPath(podUID, specVolID, p.host)
|
||||
dataFilePath := path.Join(dir, volDataFileName)
|
||||
|
||||
file, err := os.Create(dataFilePath)
|
||||
if err != nil {
|
||||
glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err))
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
if err := json.NewEncoder(file).Encode(data); err != nil {
|
||||
glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err))
|
||||
return err
|
||||
}
|
||||
glog.V(4).Info(log("volume data file saved successfully [%s]", dataFilePath))
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadVolumeData uses the directory returned by mounter.GetPath with value
|
||||
// /var/lib/kubelet/pods/<podID>/volumes/kubernetes.io~csi/<specVolumeId>/mount.
|
||||
// The function extracts specVolumeID and uses it to load the json data file from dir
|
||||
// /var/lib/kubelet/pods/<podID>/volumes/kubernetes.io~csi/<specVolId>/volume_data.json
|
||||
func loadVolumeData(dir string, fileName string) (map[string]string, error) {
|
||||
// remove /mount at the end
|
||||
dataFileName := path.Join(dir, fileName)
|
||||
glog.V(4).Info(log("loading volume data file [%s]", dataFileName))
|
||||
|
||||
file, err := os.Open(dataFileName)
|
||||
if err != nil {
|
||||
glog.Error(log("failed to open volume data file [%s]: %v", dataFileName, err))
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
data := map[string]string{}
|
||||
if err := json.NewDecoder(file).Decode(&data); err != nil {
|
||||
glog.Error(log("failed to parse volume data file [%s]: %v", dataFileName, err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check
|
||||
func isDirMounted(plug *csiPlugin, dir string) (bool, error) {
|
||||
mounter := plug.host.GetMounter(plug.GetPluginName())
|
||||
notMnt, err := mounter.IsLikelyNotMountPoint(dir)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
glog.Error(log("isDirMounted IsLikelyNotMountPoint test failed for dir [%v]", dir))
|
||||
return false, err
|
||||
}
|
||||
return !notMnt, nil
|
||||
}
|
||||
|
||||
// removeMountDir cleans the mount dir when dir is not mounted and removed the volume data file in dir
|
||||
func removeMountDir(plug *csiPlugin, mountPath string) error {
|
||||
glog.V(4).Info(log("removing mount path [%s]", mountPath))
|
||||
if pathExists, pathErr := util.PathExists(mountPath); pathErr != nil {
|
||||
glog.Error(log("failed while checking mount path stat [%s]", pathErr))
|
||||
return pathErr
|
||||
} else if !pathExists {
|
||||
glog.Warning(log("skipping mount dir removal, path does not exist [%v]", mountPath))
|
||||
return nil
|
||||
}
|
||||
|
||||
mounter := plug.host.GetMounter(plug.GetPluginName())
|
||||
notMnt, err := mounter.IsLikelyNotMountPoint(mountPath)
|
||||
if err != nil {
|
||||
glog.Error(log("mount dir removal failed [%s]: %v", mountPath, err))
|
||||
return err
|
||||
}
|
||||
if notMnt {
|
||||
glog.V(4).Info(log("dir not mounted, deleting it [%s]", mountPath))
|
||||
if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) {
|
||||
glog.Error(log("failed to remove dir [%s]: %v", mountPath, err))
|
||||
return err
|
||||
}
|
||||
// remove volume data file as well
|
||||
dataFile := path.Join(path.Dir(mountPath), volDataFileName)
|
||||
glog.V(4).Info(log("also deleting volume info data file [%s]", dataFile))
|
||||
if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) {
|
||||
glog.Error(log("failed to delete volume data file [%s]: %v", dataFile, err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -17,7 +17,10 @@ limitations under the License.
|
|||
package csi
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
@ -43,28 +46,44 @@ func TestMounterGetPath(t *testing.T) {
|
|||
plug, tmpDir := newTestPlugin(t)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
pv := makeTestPV("test-pv", 10, testDriver, testVol)
|
||||
|
||||
mounter, err := plug.NewMounter(
|
||||
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
|
||||
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
||||
volume.VolumeOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a new Mounter: %v", err)
|
||||
}
|
||||
csiMounter := mounter.(*csiMountMgr)
|
||||
expectedPath := path.Join(tmpDir, fmt.Sprintf(
|
||||
"pods/%s/volumes/kubernetes.io~csi/%s/%s",
|
||||
testPodUID,
|
||||
csiMounter.driverName,
|
||||
csiMounter.volumeID,
|
||||
))
|
||||
mountPath := csiMounter.GetPath()
|
||||
if mountPath != expectedPath {
|
||||
t.Errorf("Got unexpected path: %s", mountPath)
|
||||
// TODO (vladimirvivien) specName with slashes will not work
|
||||
testCases := []struct {
|
||||
name string
|
||||
specVolumeName string
|
||||
path string
|
||||
}{
|
||||
{
|
||||
name: "simple specName",
|
||||
specVolumeName: "spec-0",
|
||||
path: path.Join(tmpDir, fmt.Sprintf("pods/%s/volumes/kubernetes.io~csi/%s/%s", testPodUID, "spec-0", "/mount")),
|
||||
},
|
||||
{
|
||||
name: "specName with dots",
|
||||
specVolumeName: "test.spec.1",
|
||||
path: path.Join(tmpDir, fmt.Sprintf("pods/%s/volumes/kubernetes.io~csi/%s/%s", testPodUID, "test.spec.1", "/mount")),
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Log("test case:", tc.name)
|
||||
pv := makeTestPV(tc.specVolumeName, 10, testDriver, testVol)
|
||||
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
|
||||
mounter, err := plug.NewMounter(
|
||||
spec,
|
||||
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
|
||||
volume.VolumeOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a new Mounter: %v", err)
|
||||
}
|
||||
csiMounter := mounter.(*csiMountMgr)
|
||||
|
||||
path := csiMounter.GetPath()
|
||||
t.Log("*** GetPath: ", path)
|
||||
|
||||
if tc.path != path {
|
||||
t.Errorf("expecting path %s, got %s", tc.path, path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMounterSetUp(t *testing.T) {
|
||||
|
@ -125,6 +144,14 @@ func TestMounterSetUp(t *testing.T) {
|
|||
if err := csiMounter.SetUp(nil); err != nil {
|
||||
t.Fatalf("mounter.Setup failed: %v", err)
|
||||
}
|
||||
path := csiMounter.GetPath()
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed, volume path not created: %s", path)
|
||||
} else {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ensure call went all the way
|
||||
pubs := csiMounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes()
|
||||
|
@ -149,6 +176,19 @@ func TestUnmounterTeardown(t *testing.T) {
|
|||
|
||||
dir := csiUnmounter.GetPath()
|
||||
|
||||
// save the data file prior to unmount
|
||||
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) {
|
||||
t.Errorf("failed to create dir [%s]: %v", dir, err)
|
||||
}
|
||||
if err := saveVolumeData(
|
||||
plug,
|
||||
testPodUID,
|
||||
"test-pv",
|
||||
map[string]string{volDataKey.specVolID: "test-pv", volDataKey.driverName: "driver", volDataKey.volHandle: "vol-handle"},
|
||||
); err != nil {
|
||||
t.Fatal("failed to save volume data:", err)
|
||||
}
|
||||
|
||||
err = csiUnmounter.TearDownAt(dir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -208,3 +248,51 @@ func TestGetVolAttribsFromSpec(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveVolumeData(t *testing.T) {
|
||||
plug, tmpDir := newTestPlugin(t)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
testCases := []struct {
|
||||
name string
|
||||
data map[string]string
|
||||
shouldFail bool
|
||||
}{
|
||||
{name: "test with data ok", data: map[string]string{"key0": "val0", "_key1": "val1", "key2": "val2"}},
|
||||
{name: "test with data ok 2 ", data: map[string]string{"_key0_": "val0", "&key1": "val1", "key2": "val2"}},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
t.Log("test case:", tc.name)
|
||||
specVolID := fmt.Sprintf("spec-volid-%d", i)
|
||||
mountDir := path.Join(getTargetPath(testPodUID, specVolID, plug.host), "/mount")
|
||||
if err := os.MkdirAll(mountDir, 0755); err != nil && !os.IsNotExist(err) {
|
||||
t.Errorf("failed to create dir [%s]: %v", mountDir, err)
|
||||
}
|
||||
|
||||
err := saveVolumeData(plug, testPodUID, specVolID, tc.data)
|
||||
|
||||
if !tc.shouldFail && err != nil {
|
||||
t.Error("unexpected failure: ", err)
|
||||
}
|
||||
// did file get created
|
||||
dataDir := getTargetPath(testPodUID, specVolID, plug.host)
|
||||
file := path.Join(dataDir, volDataFileName)
|
||||
if _, err := os.Stat(file); err != nil {
|
||||
t.Error("failed to create data dir:", err)
|
||||
}
|
||||
|
||||
// validate content
|
||||
data, err := ioutil.ReadFile(file)
|
||||
if !tc.shouldFail && err != nil {
|
||||
t.Error("failed to read data file:", err)
|
||||
}
|
||||
|
||||
jsonData := new(bytes.Buffer)
|
||||
if err := json.NewEncoder(jsonData).Encode(tc.data); err != nil {
|
||||
t.Error("failed to encode json:", err)
|
||||
}
|
||||
if string(data) != jsonData.String() {
|
||||
t.Errorf("expecting encoded data %v, got %v", string(data), jsonData)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package csi
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
|
@ -29,7 +28,6 @@ import (
|
|||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
kstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
|
@ -44,6 +42,7 @@ const (
|
|||
csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock"
|
||||
csiTimeout = 15 * time.Second
|
||||
volNameSep = "^"
|
||||
volDataFileName = "vol_data.json"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -134,14 +133,15 @@ func (p *csiPlugin) NewMounter(
|
|||
}
|
||||
|
||||
mounter := &csiMountMgr{
|
||||
plugin: p,
|
||||
k8s: k8s,
|
||||
spec: spec,
|
||||
pod: pod,
|
||||
podUID: pod.UID,
|
||||
driverName: pvSource.Driver,
|
||||
volumeID: pvSource.VolumeHandle,
|
||||
csiClient: client,
|
||||
plugin: p,
|
||||
k8s: k8s,
|
||||
spec: spec,
|
||||
pod: pod,
|
||||
podUID: pod.UID,
|
||||
driverName: pvSource.Driver,
|
||||
volumeID: pvSource.VolumeHandle,
|
||||
specVolumeID: spec.Name(),
|
||||
csiClient: client,
|
||||
}
|
||||
return mounter, nil
|
||||
}
|
||||
|
@ -149,37 +149,33 @@ func (p *csiPlugin) NewMounter(
|
|||
func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) {
|
||||
glog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID))
|
||||
unmounter := &csiMountMgr{
|
||||
plugin: p,
|
||||
podUID: podUID,
|
||||
plugin: p,
|
||||
podUID: podUID,
|
||||
specVolumeID: specName,
|
||||
}
|
||||
return unmounter, nil
|
||||
}
|
||||
|
||||
func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||
glog.V(4).Infof(log("constructing volume spec [pv.Name=%v, path=%v]", volumeName, mountPath))
|
||||
glog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath))
|
||||
|
||||
// extract driverName/volumeId from end of mountPath
|
||||
dir, volID := path.Split(mountPath)
|
||||
volID = kstrings.UnescapeQualifiedNameForDisk(volID)
|
||||
driverName := path.Base(dir)
|
||||
|
||||
// TODO (vladimirvivien) consider moving this check in API validation
|
||||
if !isDriverNameValid(driverName) {
|
||||
glog.Error(log("failed while reconstructing volume spec csi: driver name extracted from path is invalid: [path=%s; driverName=%s]", mountPath, driverName))
|
||||
return nil, errors.New("invalid csi driver name from path")
|
||||
volData, err := loadVolumeData(mountPath, volDataFileName)
|
||||
if err != nil {
|
||||
glog.Error(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [volumeID=%s; driverName=%s]", volID, driverName))
|
||||
glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
|
||||
|
||||
pv := &api.PersistentVolume{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
Name: volumeName,
|
||||
Name: volData[volDataKey.specVolID],
|
||||
},
|
||||
Spec: api.PersistentVolumeSpec{
|
||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||
CSI: &api.CSIPersistentVolumeSource{
|
||||
Driver: driverName,
|
||||
VolumeHandle: volID,
|
||||
Driver: volData[volDataKey.driverName],
|
||||
VolumeHandle: volData[volDataKey.volHandle],
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -19,6 +19,7 @@ package csi
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
api "k8s.io/api/core/v1"
|
||||
|
@ -27,7 +28,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/types"
|
||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
kstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||
)
|
||||
|
@ -140,17 +140,31 @@ func TestPluginConstructVolumeSpec(t *testing.T) {
|
|||
|
||||
testCases := []struct {
|
||||
name string
|
||||
driverName string
|
||||
volID string
|
||||
specVolID string
|
||||
data map[string]string
|
||||
shouldFail bool
|
||||
}{
|
||||
{"valid driver and vol", "test.csi-driver", "abc-cde", false},
|
||||
{"valid driver + vol with slash", "test.csi-driver", "a/b/c/d", false},
|
||||
{"invalid driver name", "_test.csi.driver>", "a/b/c/d", true},
|
||||
{
|
||||
name: "valid spec name",
|
||||
specVolID: "test.vol.id",
|
||||
data: map[string]string{volDataKey.specVolID: "test.vol.id", volDataKey.volHandle: "test-vol0", volDataKey.driverName: "test-driver0"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
dir := getTargetPath(testPodUID, tc.driverName, tc.volID, plug.host)
|
||||
t.Logf("test case: %s", tc.name)
|
||||
dir := getTargetPath(testPodUID, tc.specVolID, plug.host)
|
||||
|
||||
// create the data file
|
||||
if tc.data != nil {
|
||||
mountDir := path.Join(getTargetPath(testPodUID, tc.specVolID, plug.host), "/mount")
|
||||
if err := os.MkdirAll(mountDir, 0755); err != nil && !os.IsNotExist(err) {
|
||||
t.Errorf("failed to create dir [%s]: %v", mountDir, err)
|
||||
}
|
||||
if err := saveVolumeData(plug, testPodUID, tc.specVolID, tc.data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// rebuild spec
|
||||
spec, err := plug.ConstructVolumeSpec("test-pv", dir)
|
||||
|
@ -161,13 +175,12 @@ func TestPluginConstructVolumeSpec(t *testing.T) {
|
|||
continue
|
||||
}
|
||||
|
||||
volID := spec.PersistentVolume.Spec.CSI.VolumeHandle
|
||||
unsanitizedVolID := kstrings.UnescapeQualifiedNameForDisk(tc.volID)
|
||||
if volID != unsanitizedVolID {
|
||||
t.Errorf("expected unsanitized volID %s, got volID %s", unsanitizedVolID, volID)
|
||||
volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle
|
||||
if volHandle != tc.data[volDataKey.volHandle] {
|
||||
t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle)
|
||||
}
|
||||
|
||||
if spec.Name() != "test-pv" {
|
||||
if spec.Name() != tc.specVolID {
|
||||
t.Errorf("Unexpected spec name %s", spec.Name())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,6 +109,17 @@ func (f *NodeClient) NodePublishVolume(ctx grpctx.Context, req *csipb.NodePublis
|
|||
return &csipb.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// NodeProbe implements csi NodeProbe
|
||||
func (f *NodeClient) NodeProbe(ctx context.Context, req *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) {
|
||||
if f.nextErr != nil {
|
||||
return nil, f.nextErr
|
||||
}
|
||||
if req.Version == nil {
|
||||
return nil, errors.New("missing version")
|
||||
}
|
||||
return &csipb.NodeProbeResponse{}, nil
|
||||
}
|
||||
|
||||
// NodeUnpublishVolume implements csi method
|
||||
func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) {
|
||||
if f.nextErr != nil {
|
||||
|
@ -130,11 +141,6 @@ func (f *NodeClient) GetNodeID(ctx context.Context, in *csipb.GetNodeIDRequest,
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// NodeProbe implements csi method
|
||||
func (f *NodeClient) NodeProbe(ctx context.Context, in *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// NodeGetCapabilities implements csi method
|
||||
func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) {
|
||||
return nil, nil
|
||||
|
|
Loading…
Reference in New Issue