CSI Inline Volume - Kubelet/Driver code impl

pull/564/head
Vladimir Vivien 2019-02-15 17:56:26 -05:00
parent d998fc8f0f
commit 923ad369c8
9 changed files with 1686 additions and 276 deletions

View File

@ -63,15 +63,15 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
return "", errors.New("missing spec")
}
csiSource, err := getCSISourceFromSpec(spec)
pvSrc, err := getPVSourceFromSpec(spec)
if err != nil {
klog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err))
klog.Error(log("attacher.Attach failed to get CSIPersistentVolumeSource: %v", err))
return "", err
}
node := string(nodeName)
pvName := spec.PersistentVolume.GetName()
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node)
attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node)
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
@ -79,7 +79,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
},
Spec: storage.VolumeAttachmentSpec{
NodeName: node,
Attacher: csiSource.Driver,
Attacher: pvSrc.Driver,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
@ -97,12 +97,12 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
}
if alreadyExist {
klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, csiSource.VolumeHandle))
klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle))
} else {
klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, csiSource.VolumeHandle))
klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle))
}
if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {
if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil {
return "", err
}
@ -113,7 +113,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
}
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {
source, err := getCSISourceFromSpec(spec)
source, err := getPVSourceFromSpec(spec)
if err != nil {
klog.Error(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
return "", err
@ -220,14 +220,18 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No
klog.Error(log("attacher.VolumesAreAttached missing volume.Spec"))
return nil, errors.New("missing spec")
}
source, err := getCSISourceFromSpec(spec)
pvSrc, err := getPVSourceFromSpec(spec)
if err != nil {
klog.Error(log("attacher.VolumesAreAttached failed: %v", err))
attached[spec] = false
klog.Error(log("attacher.VolumesAreAttached failed to get CSIPersistentVolumeSource: %v", err))
continue
}
skip, err := c.plugin.skipAttach(source.Driver)
driverName := pvSrc.Driver
volumeHandle := pvSrc.VolumeHandle
skip, err := c.plugin.skipAttach(driverName)
if err != nil {
klog.Error(log("Failed to check CSIDriver for %s: %s", source.Driver, err))
klog.Error(log("Failed to check CSIDriver for %s: %s", driverName, err))
} else {
if skip {
// This volume is not attachable, pretend it's attached
@ -236,7 +240,7 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No
}
}
attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(nodeName))
attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
@ -285,9 +289,9 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
if spec == nil {
return fmt.Errorf("attacher.MountDevice failed, spec is nil")
}
csiSource, err := getCSISourceFromSpec(spec)
csiSource, err := getPVSourceFromSpec(spec)
if err != nil {
klog.Error(log("attacher.MountDevice failed to get CSI persistent source: %v", err))
klog.Error(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err))
return err
}

View File

@ -37,7 +37,6 @@ import (
fakeclient "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
@ -84,16 +83,17 @@ func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.R
t.Error(err)
}
if attach != nil {
klog.Infof("stopping wait")
t.Logf("attachment found on try %d, stopping wait...", i)
break
}
}
klog.Infof("stopped wait")
t.Logf("stopped waiting for attachment")
if attach == nil {
t.Logf("attachment not found for id:%v", attachID)
} else {
attach.Status = status
t.Logf("updating attachment %s with attach status %v", attachID, status)
_, err := client.StorageV1().VolumeAttachments().Update(attach)
if err != nil {
t.Error(err)
@ -103,13 +103,13 @@ func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.R
}
func TestAttacherAttach(t *testing.T) {
testCases := []struct {
name string
nodeName string
driverName string
volumeName string
attachID string
spec *volume.Spec
injectAttacherError bool
shouldFail bool
}{
@ -119,6 +119,7 @@ func TestAttacherAttach(t *testing.T) {
driverName: "testdriver-01",
volumeName: "testvol-01",
attachID: getAttachmentName("testvol-01", "testdriver-01", "testnode-01"),
spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "testdriver-01", "testvol-01"), false),
},
{
name: "test ok 2",
@ -126,6 +127,7 @@ func TestAttacherAttach(t *testing.T) {
driverName: "driver02",
volumeName: "vol02",
attachID: getAttachmentName("vol02", "driver02", "node02"),
spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver02", "vol02"), false),
},
{
name: "mismatch vol",
@ -133,6 +135,7 @@ func TestAttacherAttach(t *testing.T) {
driverName: "driver02",
volumeName: "vol01",
attachID: getAttachmentName("vol02", "driver02", "node02"),
spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver02", "vol01"), false),
shouldFail: true,
},
{
@ -141,6 +144,7 @@ func TestAttacherAttach(t *testing.T) {
driverName: "driver000",
volumeName: "vol02",
attachID: getAttachmentName("vol02", "driver02", "node02"),
spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver01", "vol02"), false),
shouldFail: true,
},
{
@ -149,6 +153,7 @@ func TestAttacherAttach(t *testing.T) {
driverName: "driver000",
volumeName: "vol02",
attachID: getAttachmentName("vol02", "driver02", "node02"),
spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver02", "vol02"), false),
shouldFail: true,
},
{
@ -157,13 +162,31 @@ func TestAttacherAttach(t *testing.T) {
driverName: "driver02",
volumeName: "vol02",
attachID: getAttachmentName("vol02", "driver02", "node02"),
spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver02", "vol02"), false),
injectAttacherError: true,
shouldFail: true,
},
{
name: "test with volume source",
nodeName: "node000",
driverName: "driver000",
volumeName: "vol02",
attachID: getAttachmentName("vol02", "driver02", "node02"),
spec: volume.NewSpecFromVolume(makeTestVol("pv01", "driver02")),
shouldFail: true, // csi not enabled
},
{
name: "missing spec",
nodeName: "node000",
driverName: "driver000",
volumeName: "vol02",
attachID: getAttachmentName("vol02", "driver02", "node02"),
shouldFail: true, // csi not enabled
},
}
// attacher loop
for i, tc := range testCases {
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
@ -175,9 +198,7 @@ func TestAttacherAttach(t *testing.T) {
csiAttacher := attacher.(*csiAttacher)
spec := volume.NewSpecFromPersistentVolume(makeTestPV(fmt.Sprintf("test-pv%d", i), 10, tc.driverName, tc.volumeName), false)
go func(id, nodename string, fail bool) {
go func(spec *volume.Spec, id, nodename string, fail bool) {
attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename))
if !fail && err != nil {
t.Errorf("expecting no failure, but got err: %v", err)
@ -188,7 +209,83 @@ func TestAttacherAttach(t *testing.T) {
if attachID != id && !fail {
t.Errorf("expecting attachID %v, got %v", id, attachID)
}
}(tc.attachID, tc.nodeName, tc.shouldFail)
}(tc.spec, tc.attachID, tc.nodeName, tc.shouldFail)
var status storage.VolumeAttachmentStatus
if tc.injectAttacherError {
status.Attached = false
status.AttachError = &storage.VolumeError{
Message: "attacher error",
}
} else {
status.Attached = true
}
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status)
}
}
func TestAttacherAttachWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
testCases := []struct {
name string
nodeName string
driverName string
volumeName string
attachID string
spec *volume.Spec
injectAttacherError bool
shouldFail bool
}{
{
name: "test ok 1 with PV",
nodeName: "node01",
attachID: getAttachmentName("vol01", "driver01", "node01"),
spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver01", "vol01"), false),
},
{
name: "test failure, attach with volSrc",
nodeName: "node01",
attachID: getAttachmentName("vol01", "driver01", "node01"),
spec: volume.NewSpecFromVolume(makeTestVol("vol01", "driver01")),
shouldFail: true,
},
{
name: "attacher error",
nodeName: "node02",
attachID: getAttachmentName("vol02", "driver02", "node02"),
spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv02", 10, "driver02", "vol02"), false),
injectAttacherError: true,
shouldFail: true,
},
{
name: "missing spec",
nodeName: "node02",
attachID: getAttachmentName("vol02", "driver02", "node02"),
shouldFail: true,
},
}
// attacher loop
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
go func(spec *volume.Spec, id, nodename string, fail bool) {
attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename))
if fail != (err != nil) {
t.Errorf("expecting no failure, but got err: %v", err)
}
if attachID != id && !fail {
t.Errorf("expecting attachID %v, got %v", id, attachID)
}
}(tc.spec, tc.attachID, tc.nodeName, tc.shouldFail)
var status storage.VolumeAttachmentStatus
if tc.injectAttacherError {
@ -260,23 +357,25 @@ func TestAttacherWithCSIDriver(t *testing.T) {
return
}
expectedAttachID := getAttachmentName("test-vol", test.driver, "node")
status := storage.VolumeAttachmentStatus{
Attached: true,
}
if test.expectVolumeAttachment {
go markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status)
}
go func(volSpec *volume.Spec, expectAttach bool) {
attachID, err := csiAttacher.Attach(volSpec, types.NodeName("node"))
if err != nil {
t.Errorf("Attach() failed: %s", err)
}
if expectAttach && attachID == "" {
t.Errorf("Expected attachID, got nothing")
}
if !expectAttach && attachID != "" {
t.Errorf("Expected empty attachID, got %q", attachID)
}
}(spec, test.expectVolumeAttachment)
attachID, err := csiAttacher.Attach(spec, types.NodeName("node"))
if err != nil {
t.Errorf("Attach() failed: %s", err)
}
if test.expectVolumeAttachment && attachID == "" {
t.Errorf("Expected attachID, got nothing")
}
if !test.expectVolumeAttachment && attachID != "" {
t.Errorf("Expected empty attachID, got %q", attachID)
if test.expectVolumeAttachment {
expectedAttachID := getAttachmentName("test-vol", test.driver, "node")
status := storage.VolumeAttachmentStatus{
Attached: true,
}
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status)
}
})
}
@ -354,6 +453,7 @@ func TestAttacherWaitForAttach(t *testing.T) {
name string
driver string
makeAttachment func() *storage.VolumeAttachment
spec *volume.Spec
expectedAttachID string
expectError bool
}{
@ -367,9 +467,22 @@ func TestAttacherWaitForAttach(t *testing.T) {
successfulAttachment.Status.Attached = true
return successfulAttachment
},
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false),
expectedAttachID: getAttachmentName("test-vol", "attachable", "node"),
expectError: false,
},
{
name: "failed attach with vol source",
makeAttachment: func() *storage.VolumeAttachment {
testAttachID := getAttachmentName("test-vol", "attachable", "node")
successfulAttachment := makeTestAttachment(testAttachID, "node", "volSrc01")
successfulAttachment.Status.Attached = true
return successfulAttachment
},
spec: volume.NewSpecFromVolume(makeTestVol("volSrc01", "attachable")),
expectError: true,
},
{
name: "failed attach",
driver: "attachable",
@ -387,7 +500,6 @@ func TestAttacherWaitForAttach(t *testing.T) {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false)
if test.makeAttachment != nil {
attachment := test.makeAttachment()
@ -402,7 +514,7 @@ func TestAttacherWaitForAttach(t *testing.T) {
t.Logf("created test VolumeAttachment %+v", gotAttachment)
}
attachID, err := csiAttacher.WaitForAttach(spec, "", nil, time.Second)
attachID, err := csiAttacher.WaitForAttach(test.spec, "", nil, time.Second)
if err != nil && !test.expectError {
t.Errorf("Unexpected error: %s", err)
}
@ -416,6 +528,86 @@ func TestAttacherWaitForAttach(t *testing.T) {
}
}
func TestAttacherWaitForAttachWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
tests := []struct {
name string
driver string
makeAttachment func() *storage.VolumeAttachment
spec *volume.Spec
expectedAttachID string
expectError bool
}{
{
name: "successful attach with PV",
makeAttachment: func() *storage.VolumeAttachment {
testAttachID := getAttachmentName("test-vol", "attachable", "node")
successfulAttachment := makeTestAttachment(testAttachID, "node", "test-pv")
successfulAttachment.Status.Attached = true
return successfulAttachment
},
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false),
expectedAttachID: getAttachmentName("test-vol", "attachable", "node"),
expectError: false,
},
{
name: "failed attach with volSrc",
makeAttachment: func() *storage.VolumeAttachment {
testAttachID := getAttachmentName("test-vol", "attachable", "node")
successfulAttachment := makeTestAttachment(testAttachID, "node", "volSrc01")
successfulAttachment.Status.Attached = true
return successfulAttachment
},
spec: volume.NewSpecFromVolume(makeTestVol("volSrc01", "attachable")),
expectError: true,
},
{
name: "failed attach",
driver: "non-attachable",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "non-attachable", "test-vol"), false),
expectError: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
if test.makeAttachment != nil {
attachment := test.makeAttachment()
_, err = csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to create VolumeAttachment: %v", err)
}
gotAttachment, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Get(attachment.Name, meta.GetOptions{})
if err != nil {
t.Fatalf("failed to get created VolumeAttachment: %v", err)
}
t.Logf("created test VolumeAttachment %+v", gotAttachment)
}
attachID, err := csiAttacher.WaitForAttach(test.spec, "", nil, time.Second)
if test.expectError != (err != nil) {
t.Errorf("Unexpected error: %s", err)
return
}
if attachID != test.expectedAttachID {
t.Errorf("Expected attachID %q, got %q", test.expectedAttachID, attachID)
}
})
}
}
func TestAttacherWaitForVolumeAttachment(t *testing.T) {
nodeName := "test-node"
testCases := []struct {
@ -518,60 +710,165 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
}
func TestAttacherVolumesAreAttached(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
type attachedSpec struct {
volName string
spec *volume.Spec
attached bool
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
testCases := []struct {
name string
attachedStats map[string]bool
attachedSpecs []attachedSpec
}{
{"attach + detach", map[string]bool{"vol-01": true, "vol-02": true, "vol-03": false, "vol-04": false, "vol-05": true}},
{"all detached", map[string]bool{"vol-11": false, "vol-12": false, "vol-13": false, "vol-14": false, "vol-15": false}},
{"all attached", map[string]bool{"vol-21": true, "vol-22": true, "vol-23": true, "vol-24": true, "vol-25": true}},
{
"attach and detach",
[]attachedSpec{
{"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), true},
{"vol1", volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol1"), false), true},
{"vol2", volume.NewSpecFromPersistentVolume(makeTestPV("pv2", 10, testDriver, "vol2"), false), false},
{"vol3", volume.NewSpecFromPersistentVolume(makeTestPV("pv3", 10, testDriver, "vol3"), false), false},
{"vol4", volume.NewSpecFromPersistentVolume(makeTestPV("pv4", 20, testDriver, "vol4"), false), true},
},
},
{
"all detached",
[]attachedSpec{
{"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), false},
{"vol1", volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol1"), false), false},
{"vol2", volume.NewSpecFromPersistentVolume(makeTestPV("pv2", 10, testDriver, "vol2"), false), false},
},
},
{
"all attached",
[]attachedSpec{
{"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), true},
{"vol1", volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol1"), false), true},
},
},
{
"include non-attable",
[]attachedSpec{
{"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), true},
{"vol1", volume.NewSpecFromVolume(makeTestVol("pv1", testDriver)), false},
},
},
}
for _, tc := range testCases {
var specs []*volume.Spec
// create and save volume attchments
for volName, stat := range tc.attachedStats {
pv := makeTestPV("test-pv", 10, testDriver, volName)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
specs = append(specs, spec)
attachID := getAttachmentName(volName, testDriver, nodeName)
attachment := makeTestAttachment(attachID, nodeName, pv.GetName())
attachment.Status.Attached = stat
_, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
}
t.Run(tc.name, func(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
// retrieve attached status
stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName))
if err != nil {
t.Fatal(err)
}
if len(tc.attachedStats) != len(stats) {
t.Errorf("expecting %d attachment status, got %d", len(tc.attachedStats), len(stats))
}
// compare attachment status for each spec
for spec, stat := range stats {
source, err := getCSISourceFromSpec(spec)
attacher, err := plug.NewAttacher()
if err != nil {
t.Error(err)
t.Fatalf("failed to create new attacher: %v", err)
}
if stat != tc.attachedStats[source.VolumeHandle] {
t.Errorf("expecting volume attachment %t, got %t", tc.attachedStats[source.VolumeHandle], stat)
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
var specs []*volume.Spec
// create and save volume attchments
for _, attachedSpec := range tc.attachedSpecs {
specs = append(specs, attachedSpec.spec)
attachID := getAttachmentName(attachedSpec.volName, testDriver, nodeName)
attachment := makeTestAttachment(attachID, nodeName, attachedSpec.spec.Name())
attachment.Status.Attached = attachedSpec.attached
_, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
}
}
// retrieve attached status
stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName))
if err != nil {
t.Fatal(err)
}
if len(tc.attachedSpecs) != len(stats) {
t.Errorf("expecting %d attachment status, got %d", len(tc.attachedSpecs), len(stats))
}
// compare attachment status for each spec
for _, attached := range tc.attachedSpecs {
stat, ok := stats[attached.spec]
if attached.attached && !ok {
t.Error("failed to retrieve attached status for:", attached.spec)
}
if attached.attached != stat {
t.Errorf("expecting volume attachment %t, got %t", attached.attached, stat)
}
}
})
}
}
func TestAttacherVolumesAreAttachedWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
type attachedSpec struct {
volName string
spec *volume.Spec
attached bool
}
testCases := []struct {
name string
attachedSpecs []attachedSpec
}{
{
"attach and detach with volume sources",
[]attachedSpec{
{"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), true},
{"vol1", volume.NewSpecFromVolume(makeTestVol("pv1", testDriver)), false},
{"vol2", volume.NewSpecFromPersistentVolume(makeTestPV("pv2", 10, testDriver, "vol2"), false), true},
{"vol3", volume.NewSpecFromVolume(makeTestVol("pv3", testDriver)), false},
{"vol4", volume.NewSpecFromPersistentVolume(makeTestPV("pv4", 20, testDriver, "vol4"), false), true},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
var specs []*volume.Spec
// create and save volume attchments
for _, attachedSpec := range tc.attachedSpecs {
specs = append(specs, attachedSpec.spec)
attachID := getAttachmentName(attachedSpec.volName, testDriver, nodeName)
attachment := makeTestAttachment(attachID, nodeName, attachedSpec.spec.Name())
attachment.Status.Attached = attachedSpec.attached
_, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
}
// retrieve attached status
stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName))
if err != nil {
t.Fatal(err)
}
if len(tc.attachedSpecs) != len(stats) {
t.Errorf("expecting %d attachment status, got %d", len(tc.attachedSpecs), len(stats))
}
// compare attachment status for each spec
for _, attached := range tc.attachedSpecs {
stat, ok := stats[attached.spec]
if attached.attached && !ok {
t.Error("failed to retrieve attached status for:", attached.spec)
}
if attached.attached != stat {
t.Errorf("expecting volume attachment %t, got %t", attached.attached, stat)
}
}
})
}
}
@ -708,6 +1005,7 @@ func TestAttacherGetDeviceMountPath(t *testing.T) {
}
func TestAttacherMountDevice(t *testing.T) {
pvName := "test-pv"
testCases := []struct {
testName string
volName string
@ -715,13 +1013,15 @@ func TestAttacherMountDevice(t *testing.T) {
deviceMountPath string
stageUnstageSet bool
shouldFail bool
spec *volume.Spec
}{
{
testName: "normal",
testName: "normal PV",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "no vol name",
@ -730,6 +1030,7 @@ func TestAttacherMountDevice(t *testing.T) {
deviceMountPath: "path2",
stageUnstageSet: true,
shouldFail: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, ""), false),
},
{
testName: "no device path",
@ -738,6 +1039,7 @@ func TestAttacherMountDevice(t *testing.T) {
deviceMountPath: "path2",
stageUnstageSet: true,
shouldFail: false,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "no device mount path",
@ -746,6 +1048,7 @@ func TestAttacherMountDevice(t *testing.T) {
deviceMountPath: "",
stageUnstageSet: true,
shouldFail: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "stage_unstage cap not set",
@ -753,13 +1056,20 @@ func TestAttacherMountDevice(t *testing.T) {
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: false,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "failure with volume source",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
shouldFail: true,
spec: volume.NewSpecFromVolume(makeTestVol(pvName, testDriver)),
},
}
for _, tc := range testCases {
t.Logf("Running test case: %s", tc.testName)
var spec *volume.Spec
pvName := "test-pv"
// Setup
// Create a new attacher
@ -777,11 +1087,6 @@ func TestAttacherMountDevice(t *testing.T) {
}
nodeName := string(csiAttacher.plugin.host.GetNodeName())
// Create spec
pv := makeTestPV(pvName, 10, testDriver, tc.volName)
spec = volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
attachID := getAttachmentName(tc.volName, testDriver, nodeName)
// Set up volume attachment
@ -795,7 +1100,147 @@ func TestAttacherMountDevice(t *testing.T) {
}()
// Run
err = csiAttacher.MountDevice(spec, tc.devicePath, tc.deviceMountPath)
err = csiAttacher.MountDevice(tc.spec, tc.devicePath, tc.deviceMountPath)
// Verify
if err != nil {
if !tc.shouldFail {
t.Errorf("test should not fail, but error occurred: %v", err)
}
continue
}
if err == nil && tc.shouldFail {
t.Errorf("test should fail, but no error occurred")
}
// Verify call goes through all the way
numStaged := 1
if !tc.stageUnstageSet {
numStaged = 0
}
cdc := csiAttacher.csiClient.(*fakeCsiDriverClient)
staged := cdc.nodeClient.GetNodeStagedVolumes()
if len(staged) != numStaged {
t.Errorf("got wrong number of staged volumes, expecting %v got: %v", numStaged, len(staged))
}
if tc.stageUnstageSet {
vol, ok := staged[tc.volName]
if !ok {
t.Errorf("could not find staged volume: %s", tc.volName)
}
if vol.Path != tc.deviceMountPath {
t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, vol.Path)
}
}
}
}
func TestAttacherMountDeviceWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
pvName := "test-pv"
testCases := []struct {
testName string
volName string
devicePath string
deviceMountPath string
stageUnstageSet bool
shouldFail bool
spec *volume.Spec
}{
{
testName: "normal PV",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "failure with volSrc",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
shouldFail: true,
spec: volume.NewSpecFromVolume(makeTestVol(pvName, testDriver)),
},
{
testName: "no vol name",
volName: "",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: true,
shouldFail: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, ""), false),
},
{
testName: "no device path",
volName: "test-vol1",
devicePath: "",
deviceMountPath: "path2",
stageUnstageSet: true,
shouldFail: false,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "no device mount path",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "",
stageUnstageSet: true,
shouldFail: true,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "stage_unstage cap not set",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
stageUnstageSet: false,
spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false),
},
{
testName: "missing spec",
volName: "test-vol1",
devicePath: "path1",
deviceMountPath: "path2",
shouldFail: true,
},
}
for _, tc := range testCases {
t.Logf("Running test case: %s", tc.testName)
// Setup
// Create a new attacher
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err0 := plug.NewAttacher()
if err0 != nil {
t.Fatalf("failed to create new attacher: %v", err0)
}
csiAttacher := attacher.(*csiAttacher)
csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet)
if tc.deviceMountPath != "" {
tc.deviceMountPath = filepath.Join(tmpDir, tc.deviceMountPath)
}
nodeName := string(csiAttacher.plugin.host.GetNodeName())
attachID := getAttachmentName(tc.volName, testDriver, nodeName)
// Set up volume attachment
attachment := makeTestAttachment(attachID, nodeName, pvName)
_, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
go func() {
fakeWatcher.Delete(attachment)
}()
// Run
err = csiAttacher.MountDevice(tc.spec, tc.devicePath, tc.deviceMountPath)
// Verify
if err != nil {

View File

@ -76,12 +76,13 @@ func (c *fakeCsiDriverClient) NodePublishVolume(
) error {
c.t.Log("calling fake.NodePublishVolume...")
req := &csipbv1.NodePublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishContext: publishContext,
VolumeContext: volumeContext,
Secrets: secrets,
VolumeId: volID,
TargetPath: targetPath,
StagingTargetPath: stagingTargetPath,
Readonly: readOnly,
PublishContext: publishContext,
VolumeContext: volumeContext,
Secrets: secrets,
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),

View File

@ -18,6 +18,7 @@ package csi
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"os"
@ -42,13 +43,15 @@ var (
volHandle,
driverName,
nodeName,
attachmentID string
attachmentID,
driverMode string
}{
"specVolID",
"volumeHandle",
"driverName",
"nodeName",
"attachmentID",
"driverMode",
}
)
@ -57,6 +60,7 @@ type csiMountMgr struct {
k8s kubernetes.Interface
plugin *csiPlugin
driverName csiDriverName
driverMode driverMode
volumeID string
specVolumeID string
readOnly bool
@ -107,49 +111,97 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
return nil
}
csiSource, err := getCSISourceFromSpec(c.spec)
csi := c.csiClient
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
volSrc, pvSrc, err := getSourceFromSpec(c.spec)
if err != nil {
klog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
return err
}
csi := c.csiClient
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
driverName := c.driverName
volumeHandle := c.volumeID
readOnly := c.readOnly
accessMode := api.ReadWriteOnce
// Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
deviceMountPath := ""
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
if err != nil {
klog.Error(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capabilty: %v", err))
return err
}
var (
fsType string
volAttribs map[string]string
nodePublishSecrets map[string]string
publishContext map[string]string
mountOptions []string
deviceMountPath string
secretRef *api.SecretReference
)
if stageUnstageSet {
deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec)
switch {
case volSrc != nil:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
return fmt.Errorf("CSIInlineVolume feature required")
}
if c.driverMode != ephemeralDriverMode {
return fmt.Errorf("unexpected driver mode: %s", c.driverMode)
}
if volSrc.FSType != nil {
fsType = *volSrc.FSType
}
volAttribs = volSrc.VolumeAttributes
if volSrc.NodePublishSecretRef != nil {
secretName := volSrc.NodePublishSecretRef.Name
ns := c.pod.Namespace
secretRef = &api.SecretReference{Name: secretName, Namespace: ns}
}
case pvSrc != nil:
if c.driverMode != persistentDriverMode {
return fmt.Errorf("unexpected driver mode: %s", c.driverMode)
}
fsType = pvSrc.FSType
volAttribs = pvSrc.VolumeAttributes
if pvSrc.NodePublishSecretRef != nil {
secretRef = pvSrc.NodePublishSecretRef
}
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
if c.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
}
mountOptions = c.spec.PersistentVolume.Spec.MountOptions
// Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
if err != nil {
klog.Error(log("mounter.SetUpAt failed to make device mount path: %v", err))
klog.Error(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capabilty: %v", err))
return err
}
}
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
if c.publishContext == nil {
nodeName := string(c.plugin.host.GetNodeName())
c.publishContext, err = c.plugin.getPublishContext(c.k8s, c.volumeID, string(c.driverName), nodeName)
if err != nil {
return err
}
}
attribs := csiSource.VolumeAttributes
nodePublishSecrets := map[string]string{}
if csiSource.NodePublishSecretRef != nil {
nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodePublishSecretRef)
if err != nil {
return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
if stageUnstageSet {
deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec)
if err != nil {
klog.Error(log("mounter.SetUpAt failed to make device mount path: %v", err))
return err
}
}
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
if c.publishContext == nil {
nodeName := string(c.plugin.host.GetNodeName())
c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName)
if err != nil {
return err
}
publishContext = c.publishContext
}
default:
return fmt.Errorf("volume source not found in volume.Spec")
}
// create target_dir before call to NodePublish
@ -159,10 +211,14 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
}
klog.V(4).Info(log("created target path successfully [%s]", dir))
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := api.ReadWriteOnce
if c.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
nodePublishSecrets = map[string]string{}
if secretRef != nil {
nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef)
if err != nil {
return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
secretRef.Namespace, secretRef.Name, err)
}
}
// Inject pod information into volume_attributes
@ -172,28 +228,27 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
return err
}
if podAttrs != nil {
if attribs == nil {
attribs = podAttrs
if volAttribs == nil {
volAttribs = podAttrs
} else {
for k, v := range podAttrs {
attribs[k] = v
volAttribs[k] = v
}
}
}
fsType := csiSource.FSType
err = csi.NodePublishVolume(
ctx,
c.volumeID,
c.readOnly,
volumeHandle,
readOnly,
deviceMountPath,
dir,
accessMode,
c.publishContext,
attribs,
publishContext,
volAttribs,
nodePublishSecrets,
fsType,
c.spec.PersistentVolume.Spec.MountOptions,
mountOptions,
)
if err != nil {
@ -387,3 +442,9 @@ func removeMountDir(plug *csiPlugin, mountPath string) error {
}
return nil
}
// makeVolumeHandle returns csi-<sha256(podUID,volSourceSpecName)>
func makeVolumeHandle(podUID, volSourceSpecName string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", podUID, volSourceSpecName)))
return fmt.Sprintf("csi-%x", result)
}

View File

@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"testing"
@ -266,6 +267,266 @@ func TestMounterSetUp(t *testing.T) {
MounterSetUpTests(t, false)
})
}
func TestMounterSetUpSimple(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
podUID types.UID
mode driverMode
fsType string
options []string
spec func(string, []string) *volume.Spec
shouldFail bool
}{
{
name: "setup with vol source",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
mode: ephemeralDriverMode,
fsType: "ext4",
shouldFail: true,
spec: func(fsType string, options []string) *volume.Spec {
volSrc := makeTestVol("pv1", testDriver)
volSrc.CSI.FSType = &fsType
return volume.NewSpecFromVolume(volSrc)
},
},
{
name: "setup with persistent source",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
mode: persistentDriverMode,
fsType: "zfs",
spec: func(fsType string, options []string) *volume.Spec {
pvSrc := makeTestPV("pv1", 20, testDriver, "vol1")
pvSrc.Spec.CSI.FSType = fsType
pvSrc.Spec.MountOptions = options
return volume.NewSpecFromPersistentVolume(pvSrc, false)
},
},
{
name: "setup with persistent source without unspecified fstype and options",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
mode: persistentDriverMode,
spec: func(fsType string, options []string) *volume.Spec {
return volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol2"), false)
},
},
{
name: "setup with missing spec",
shouldFail: true,
spec: func(fsType string, options []string) *volume.Spec { return nil },
},
}
for _, tc := range testCases {
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
t.Run(tc.name, func(t *testing.T) {
mounter, err := plug.NewMounter(
tc.spec(tc.fsType, tc.options),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}},
volume.VolumeOptions{},
)
if tc.shouldFail && err != nil {
t.Log(err)
return
}
if !tc.shouldFail && err != nil {
t.Fatal("unexpected error:", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
if csiMounter.driverMode != persistentDriverMode {
t.Fatal("unexpected driver mode: ", csiMounter.driverMode)
}
attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName()))
attachment := makeTestAttachment(attachID, "test-node", csiMounter.spec.Name())
_, err = csiMounter.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
// Mounter.SetUp()
if err := csiMounter.SetUp(nil); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
vol, ok := pubs[csiMounter.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
if vol.VolumeHandle != csiMounter.volumeID {
t.Error("volumeHandle not sent to CSI driver properly")
}
devicePath, err := makeDeviceMountPath(plug, csiMounter.spec)
if err != nil {
t.Fatal(err)
}
if vol.DeviceMountPath != devicePath {
t.Errorf("DeviceMountPath not sent properly to CSI driver: %s, %s", vol.DeviceMountPath, devicePath)
}
if !reflect.DeepEqual(vol.MountFlags, csiMounter.spec.PersistentVolume.Spec.MountOptions) {
t.Errorf("unexpected mount flags passed to driver: %+v", vol.MountFlags)
}
if vol.FSType != tc.fsType {
t.Error("unexpected FSType sent to driver:", vol.FSType)
}
if vol.Path != csiMounter.GetPath() {
t.Error("csi server may not have received NodePublishVolume call")
}
})
}
}
func TestMounterSetUpWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
podUID types.UID
mode driverMode
fsType string
options []string
spec func(string, []string) *volume.Spec
shouldFail bool
}{
{
name: "setup with vol source",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
mode: ephemeralDriverMode,
fsType: "ext4",
spec: func(fsType string, options []string) *volume.Spec {
volSrc := makeTestVol("pv1", testDriver)
volSrc.CSI.FSType = &fsType
return volume.NewSpecFromVolume(volSrc)
},
},
{
name: "setup with persistent source",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
mode: persistentDriverMode,
fsType: "zfs",
spec: func(fsType string, options []string) *volume.Spec {
pvSrc := makeTestPV("pv1", 20, testDriver, "vol1")
pvSrc.Spec.CSI.FSType = fsType
pvSrc.Spec.MountOptions = options
return volume.NewSpecFromPersistentVolume(pvSrc, false)
},
},
{
name: "setup with persistent source without unspecified fstype and options",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
mode: persistentDriverMode,
spec: func(fsType string, options []string) *volume.Spec {
return volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol2"), false)
},
},
{
name: "setup with missing spec",
shouldFail: true,
spec: func(fsType string, options []string) *volume.Spec { return nil },
},
}
for _, tc := range testCases {
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
t.Run(tc.name, func(t *testing.T) {
mounter, err := plug.NewMounter(
tc.spec(tc.fsType, tc.options),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}},
volume.VolumeOptions{},
)
if tc.shouldFail && err != nil {
t.Log(err)
return
}
if !tc.shouldFail && err != nil {
t.Fatal("unexpected error:", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
if csiMounter.driverMode != tc.mode {
t.Fatal("unexpected driver mode: ", csiMounter.driverMode)
}
if csiMounter.driverMode == ephemeralDriverMode && csiMounter.volumeID != makeVolumeHandle(string(tc.podUID), csiMounter.specVolumeID) {
t.Fatal("unexpected generated volumeHandle:", csiMounter.volumeID)
}
if csiMounter.driverMode == persistentDriverMode {
attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName()))
attachment := makeTestAttachment(attachID, "test-node", csiMounter.spec.Name())
_, err = csiMounter.k8s.StorageV1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
}
// Mounter.SetUp()
if err := csiMounter.SetUp(nil); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
vol, ok := pubs[csiMounter.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
if vol.VolumeHandle != csiMounter.volumeID {
t.Error("volumeHandle not sent to CSI driver properly")
}
// validate stagingTargetPath
if tc.mode == ephemeralDriverMode && vol.DeviceMountPath != "" {
t.Errorf("unexpected devicePathTarget sent to driver: %s", vol.DeviceMountPath)
}
if tc.mode == persistentDriverMode {
devicePath, err := makeDeviceMountPath(plug, csiMounter.spec)
if err != nil {
t.Fatal(err)
}
if vol.DeviceMountPath != devicePath {
t.Errorf("DeviceMountPath not sent properly to CSI driver: %s, %s", vol.DeviceMountPath, devicePath)
}
if !reflect.DeepEqual(vol.MountFlags, csiMounter.spec.PersistentVolume.Spec.MountOptions) {
t.Errorf("unexpected mount flags passed to driver: %+v", vol.MountFlags)
}
}
if vol.FSType != tc.fsType {
t.Error("unexpected FSType sent to driver:", vol.FSType)
}
if vol.Path != csiMounter.GetPath() {
t.Error("csi server may not have received NodePublishVolume call")
}
})
}
}
func TestMounterSetUpWithFSGroup(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)

View File

@ -74,6 +74,12 @@ type csiPlugin struct {
csiDriverInformer csiinformer.CSIDriverInformer
}
//TODO (vladimirvivien) add this type to storage api
type driverMode string
const persistentDriverMode driverMode = "persistent"
const ephemeralDriverMode driverMode = "ephemeral"
// ProbeVolumePlugins returns implemented plugins
func ProbeVolumePlugins() []volume.VolumePlugin {
p := &csiPlugin{
@ -303,7 +309,7 @@ func (p *csiPlugin) GetPluginName() string {
// GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
// That string value is used in Detach() to extract driver name and volumeName.
func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
csi, err := getCSISourceFromSpec(spec)
csi, err := getPVSourceFromSpec(spec)
if err != nil {
klog.Error(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err))
return "", err
@ -316,6 +322,14 @@ func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
func (p *csiPlugin) CanSupport(spec *volume.Spec) bool {
// TODO (vladimirvivien) CanSupport should also take into account
// the availability/registration of specified Driver in the volume source
if spec == nil {
return false
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil) ||
(spec.Volume != nil && spec.Volume.CSI != nil)
}
return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil
}
@ -331,11 +345,34 @@ func (p *csiPlugin) NewMounter(
spec *volume.Spec,
pod *api.Pod,
_ volume.VolumeOptions) (volume.Mounter, error) {
pvSource, err := getCSISourceFromSpec(spec)
volSrc, pvSrc, err := getSourceFromSpec(spec)
if err != nil {
return nil, err
}
readOnly, err := getReadOnlyFromSpec(spec)
var (
driverName string
volumeHandle string
readOnly bool
)
switch {
case volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume):
volumeHandle = makeVolumeHandle(string(pod.UID), spec.Name())
driverName = volSrc.Driver
if volSrc.ReadOnly != nil {
readOnly = *volSrc.ReadOnly
}
case pvSrc != nil:
driverName = pvSrc.Driver
volumeHandle = pvSrc.VolumeHandle
readOnly = spec.ReadOnly
default:
return nil, fmt.Errorf("volume source not found in volume.Spec")
}
driverMode, err := p.getDriverMode(spec)
if err != nil {
return nil, err
}
@ -346,7 +383,7 @@ func (p *csiPlugin) NewMounter(
return nil, errors.New("failed to get a Kubernetes client")
}
csi, err := newCsiDriverClient(csiDriverName(pvSource.Driver))
csi, err := newCsiDriverClient(csiDriverName(driverName))
if err != nil {
return nil, err
}
@ -357,8 +394,9 @@ func (p *csiPlugin) NewMounter(
spec: spec,
pod: pod,
podUID: pod.UID,
driverName: csiDriverName(pvSource.Driver),
volumeID: pvSource.VolumeHandle,
driverName: csiDriverName(driverName),
driverMode: driverMode,
volumeID: volumeHandle,
specVolumeID: spec.Name(),
csiClient: csi,
readOnly: readOnly,
@ -376,15 +414,17 @@ func (p *csiPlugin) NewMounter(
// persist volume info data for teardown
node := string(p.host.GetNodeName())
attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node)
volData := map[string]string{
volDataKey.specVolID: spec.Name(),
volDataKey.volHandle: pvSource.VolumeHandle,
volDataKey.driverName: pvSource.Driver,
volDataKey.nodeName: node,
volDataKey.attachmentID: attachID,
volDataKey.specVolID: spec.Name(),
volDataKey.volHandle: volumeHandle,
volDataKey.driverName: driverName,
volDataKey.nodeName: node,
volDataKey.driverMode: string(driverMode),
}
attachID := getAttachmentName(volumeHandle, driverName, node)
volData[volDataKey.attachmentID] = attachID
if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil {
klog.Error(log("failed to save volume info data: %v", err))
if err := os.RemoveAll(dataDir); err != nil {
@ -437,23 +477,58 @@ func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.S
klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
var spec *volume.Spec
inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume)
if inlineEnabled {
mode := driverMode(volData[volDataKey.driverMode])
switch {
case mode == ephemeralDriverMode:
spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName])
case mode == persistentDriverMode:
fallthrough
default:
spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
}
} else {
spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
}
return spec, nil
}
// constructVolSourceSpec constructs volume.Spec with CSIVolumeSource
func (p *csiPlugin) constructVolSourceSpec(volSpecName, driverName string) *volume.Spec {
vol := &api.Volume{
Name: volSpecName,
VolumeSource: api.VolumeSource{
CSI: &api.CSIVolumeSource{
Driver: driverName,
},
},
}
return volume.NewSpecFromVolume(vol)
}
//constructPVSourceSpec constructs volume.Spec with CSIPersistentVolumeSource
func (p *csiPlugin) constructPVSourceSpec(volSpecName, driverName, volumeHandle string) *volume.Spec {
fsMode := api.PersistentVolumeFilesystem
pv := &api.PersistentVolume{
ObjectMeta: meta.ObjectMeta{
Name: volData[volDataKey.specVolID],
Name: volSpecName,
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeSource: api.PersistentVolumeSource{
CSI: &api.CSIPersistentVolumeSource{
Driver: volData[volDataKey.driverName],
VolumeHandle: volData[volDataKey.volHandle],
Driver: driverName,
VolumeHandle: volumeHandle,
},
},
VolumeMode: &fsMode,
},
}
return volume.NewSpecFromPersistentVolume(pv, false), nil
return volume.NewSpecFromPersistentVolume(pv, false)
}
func (p *csiPlugin) SupportsMountOption() bool {
@ -506,24 +581,31 @@ func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
}, nil
}
// TODO change CanAttach to return error to propagate ability
// to support Attachment or an error - see https://github.com/kubernetes/kubernetes/issues/74810
func (p *csiPlugin) CanAttach(spec *volume.Spec) bool {
if spec.PersistentVolume == nil {
klog.Error(log("plugin.CanAttach test failed, spec missing PersistentVolume"))
driverMode, err := p.getDriverMode(spec)
if err != nil {
return false
}
var driverName string
if spec.PersistentVolume.Spec.CSI != nil {
driverName = spec.PersistentVolume.Spec.CSI.Driver
} else {
klog.Error(log("plugin.CanAttach test failed, spec missing CSIPersistentVolume"))
if driverMode == ephemeralDriverMode {
klog.V(4).Info(log("driver ephemeral mode detected for spec %v", spec.Name))
return false
}
pvSrc, err := getCSISourceFromSpec(spec)
if err != nil {
klog.Error(log("plugin.CanAttach failed to get info from spec: %s", err))
return false
}
driverName := pvSrc.Driver
skipAttach, err := p.skipAttach(driverName)
if err != nil {
klog.Error(log("plugin.CanAttach error when calling plugin.skipAttach for driver %s: %s", driverName, err))
return false
}
return !skipAttach
@ -675,6 +757,8 @@ func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapP
return volume.NewSpecFromPersistentVolume(pv, false), nil
}
// skipAttach looks up CSIDriver object associated with driver name
// to determine if driver requies attachment volume operation
func (p *csiPlugin) skipAttach(driver string) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
return false, nil
@ -696,6 +780,26 @@ func (p *csiPlugin) skipAttach(driver string) (bool, error) {
return false, nil
}
// getDriverMode returns the driver mode for the specified spec: {persistent|ephemeral}.
// 1) If mode cannot be determined, it will default to "persistent".
// 2) If Mode cannot be resolved to either {persistent | ephemeral}, an error is returned
// See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/20190122-csi-inline-volumes.md
func (p *csiPlugin) getDriverMode(spec *volume.Spec) (driverMode, error) {
// TODO (vladimirvivien) ultimately, mode will be retrieved from CSIDriver.Spec.Mode.
// However, in alpha version, mode is determined by the volume source:
// 1) if volume.Spec.Volume.CSI != nil -> mode is ephemeral
// 2) if volume.Spec.PersistentVolume.Spec.CSI != nil -> persistent
volSrc, _, err := getSourceFromSpec(spec)
if err != nil {
return "", err
}
if volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
return ephemeralDriverMode, nil
}
return persistentDriverMode, nil
}
func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
skip, err := p.skipAttach(driver)
if err != nil {

View File

@ -18,6 +18,7 @@ package csi
import (
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
@ -99,6 +100,19 @@ func makeTestPV(name string, sizeGig int, driverName, volID string) *api.Persist
}
}
func makeTestVol(name string, driverName string) *api.Volume {
ro := false
return &api.Volume{
Name: name,
VolumeSource: api.VolumeSource{
CSI: &api.CSIVolumeSource{
Driver: driverName,
ReadOnly: &ro,
},
},
}
}
func registerFakePlugin(pluginName, endpoint string, versions []string, t *testing.T) {
highestSupportedVersions, err := highestSupportedVersion(versions)
if err != nil {
@ -131,22 +145,105 @@ func TestPluginGetVolumeName(t *testing.T) {
name string
driverName string
volName string
spec *volume.Spec
shouldFail bool
}{
{"alphanum names", "testdr", "testvol", false},
{"mixchar driver", "test.dr.cc", "testvol", false},
{"mixchar volume", "testdr", "test-vol-name", false},
{"mixchars all", "test-driver", "test.vol.name", false},
{
name: "alphanum names",
driverName: "testdr",
volName: "testvol",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "testdr", "testvol"), false),
},
{
name: "mixchar driver",
driverName: "test.dr.cc",
volName: "testvol",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "test.dr.cc", "testvol"), false),
},
{
name: "mixchar volume",
driverName: "testdr",
volName: "test-vol-name",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "testdr", "test-vol-name"), false),
},
{
name: "mixchars all",
driverName: "test-driver",
volName: "test.vol.name",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "test-driver", "test.vol.name"), false),
},
{
name: "volume source with mixchars all",
driverName: "test-driver",
volName: "test.vol.name",
spec: volume.NewSpecFromVolume(makeTestVol("test-pv", "test-driver")),
shouldFail: true, // csi inline feature off
},
{
name: "missing spec",
shouldFail: true,
},
}
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
registerFakePlugin(tc.driverName, "endpoint", []string{"0.3.0"}, t)
pv := makeTestPV("test-pv", 10, tc.driverName, tc.volName)
spec := volume.NewSpecFromPersistentVolume(pv, false)
name, err := plug.GetVolumeName(spec)
if tc.shouldFail && err == nil {
t.Fatal("GetVolumeName should fail, but got err=nil")
name, err := plug.GetVolumeName(tc.spec)
if tc.shouldFail != (err != nil) {
t.Fatal("shouldFail does match expected error")
}
if tc.shouldFail && err != nil {
t.Log(err)
continue
}
if name != fmt.Sprintf("%s%s%s", tc.driverName, volNameSep, tc.volName) {
t.Errorf("unexpected volume name %s", name)
}
}
}
func TestPluginGetVolumeNameWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
driverName string
volName string
shouldFail bool
spec *volume.Spec
}{
{
name: "missing spec",
shouldFail: true,
},
{
name: "alphanum names for pv",
driverName: "testdr",
volName: "testvol",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "testdr", "testvol"), false),
},
{
name: "alphanum names for vol source",
driverName: "testdr",
volName: "testvol",
spec: volume.NewSpecFromVolume(makeTestVol("test-pv", "testdr")),
shouldFail: true,
},
}
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
registerFakePlugin(tc.driverName, "endpoint", []string{"0.3.0"}, t)
name, err := plug.GetVolumeName(tc.spec)
if tc.shouldFail != (err != nil) {
t.Fatal("shouldFail does match expected error")
}
if tc.shouldFail && err != nil {
t.Log(err)
continue
}
if name != fmt.Sprintf("%s%s%s", tc.driverName, volNameSep, tc.volName) {
t.Errorf("unexpected volume name %s", name)
@ -157,15 +254,79 @@ func TestPluginGetVolumeName(t *testing.T) {
func TestPluginCanSupport(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
tests := []struct {
name string
spec *volume.Spec
canSupport bool
}{
{
name: "no spec provided",
canSupport: false,
},
{
name: "can support volume source",
spec: volume.NewSpecFromVolume(makeTestVol("test-vol", testDriver)),
canSupport: false, // csi inline not enabled
},
{
name: "can support persistent volume source",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 20, testDriver, testVol), true),
canSupport: true,
},
}
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
spec := volume.NewSpecFromPersistentVolume(pv, false)
if !plug.CanSupport(spec) {
t.Errorf("should support CSI spec")
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actual := plug.CanSupport(tc.spec)
if tc.canSupport != actual {
t.Errorf("expecting canSupport %t, got %t", tc.canSupport, actual)
}
})
}
}
func TestPluginCanSupportWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
tests := []struct {
name string
spec *volume.Spec
canSupport bool
}{
{
name: "no spec provided",
canSupport: false,
},
{
name: "can support volume source",
spec: volume.NewSpecFromVolume(makeTestVol("test-vol", testDriver)),
canSupport: true,
},
{
name: "can support persistent volume source",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 20, testDriver, testVol), true),
canSupport: true,
},
}
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actual := plug.CanSupport(tc.spec)
if tc.canSupport != actual {
t.Errorf("expecting canSupport %t, got %t", tc.canSupport, actual)
}
})
}
}
@ -177,108 +338,437 @@ func TestPluginConstructVolumeSpec(t *testing.T) {
testCases := []struct {
name string
originSpec *volume.Spec
specVolID string
data map[string]string
volHandle string
podUID types.UID
shouldFail bool
}{
{
name: "valid spec name",
specVolID: "test.vol.id",
data: map[string]string{volDataKey.specVolID: "test.vol.id", volDataKey.volHandle: "test-vol0", volDataKey.driverName: "test-driver0"},
name: "construct spec1 from original persistent spec",
specVolID: "test.vol.id",
volHandle: "testvol-handle1",
originSpec: volume.NewSpecFromPersistentVolume(makeTestPV("test.vol.id", 20, testDriver, "testvol-handle1"), true),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
},
{
name: "construct spec2 from original persistent spec",
specVolID: "spec2",
volHandle: "handle2",
originSpec: volume.NewSpecFromPersistentVolume(makeTestPV("spec2", 20, testDriver, "handle2"), true),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
},
{
name: "construct spec from original volume spec",
specVolID: "volspec",
originSpec: volume.NewSpecFromVolume(makeTestVol("spec2", testDriver)),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
shouldFail: true, // csi inline off
},
}
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
dir := getTargetPath(testPodUID, tc.specVolID, plug.host)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
// create the data file
if tc.data != nil {
mountDir := path.Join(getTargetPath(testPodUID, tc.specVolID, plug.host), "/mount")
if err := os.MkdirAll(mountDir, 0755); err != nil && !os.IsNotExist(err) {
t.Errorf("failed to create dir [%s]: %v", mountDir, err)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mounter, err := plug.NewMounter(
tc.originSpec,
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}},
volume.VolumeOptions{},
)
if tc.shouldFail && err != nil {
t.Log(err)
return
}
if err := saveVolumeData(path.Dir(mountDir), volDataFileName, tc.data); err != nil {
if !tc.shouldFail && err != nil {
t.Fatal(err)
}
}
// rebuild spec
spec, err := plug.ConstructVolumeSpec("test-pv", dir)
if tc.shouldFail {
if err == nil {
t.Fatal("expecting ConstructVolumeSpec to fail, but got nil error")
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
continue
}
csiMounter := mounter.(*csiMountMgr)
volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle
if volHandle != tc.data[volDataKey.volHandle] {
t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle)
}
// rebuild spec
spec, err := plug.ConstructVolumeSpec("test-pv", path.Dir(csiMounter.GetPath()))
if err != nil {
t.Fatal(err)
}
if spec == nil {
t.Fatal("nil volume.Spec contstructed")
}
if spec.PersistentVolume.Spec.VolumeMode == nil {
t.Fatalf("Volume mode has not been set.")
}
// inspect spec
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.CSI == nil {
t.Fatal("CSIPersistentVolume not found in constructed spec ")
}
if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode)
}
volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle
if volHandle != tc.originSpec.PersistentVolume.Spec.CSI.VolumeHandle {
t.Error("unexpected volumeHandle constructed:", volHandle)
}
driverName := spec.PersistentVolume.Spec.CSI.Driver
if driverName != tc.originSpec.PersistentVolume.Spec.CSI.Driver {
t.Error("unexpected driverName constructed:", driverName)
}
if spec.Name() != tc.specVolID {
t.Errorf("Unexpected spec name %s", spec.Name())
}
if spec.PersistentVolume.Spec.VolumeMode == nil {
t.Fatalf("Volume mode has not been set.")
}
if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode)
}
if spec.Name() != tc.specVolID {
t.Errorf("Unexpected spec name constructed %s", spec.Name())
}
})
}
}
func TestPluginConstructVolumeSpecWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
originSpec *volume.Spec
specVolID string
volHandle string
podUID types.UID
shouldFail bool
}{
{
name: "construct spec1 from persistent spec",
specVolID: "test.vol.id",
volHandle: "testvol-handle1",
originSpec: volume.NewSpecFromPersistentVolume(makeTestPV("test.vol.id", 20, testDriver, "testvol-handle1"), true),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
},
{
name: "construct spec2 from persistent spec",
specVolID: "spec2",
volHandle: "handle2",
originSpec: volume.NewSpecFromPersistentVolume(makeTestPV("spec2", 20, testDriver, "handle2"), true),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
},
{
name: "construct spec from volume spec",
specVolID: "volspec",
originSpec: volume.NewSpecFromVolume(makeTestVol("volspec", testDriver)),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
},
{
name: "construct spec from volume spec2",
specVolID: "volspec2",
originSpec: volume.NewSpecFromVolume(makeTestVol("volspec2", testDriver)),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
},
{
name: "missing spec",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
shouldFail: true,
},
}
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mounter, err := plug.NewMounter(
tc.originSpec,
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}},
volume.VolumeOptions{},
)
if tc.shouldFail && err != nil {
t.Log(err)
return
}
if !tc.shouldFail && err != nil {
t.Fatal(err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
// rebuild spec
spec, err := plug.ConstructVolumeSpec("test-pv", path.Dir(csiMounter.GetPath()))
if err != nil {
t.Fatal(err)
}
if spec == nil {
t.Fatal("nil volume.Spec contstructed")
}
if spec.Name() != tc.specVolID {
t.Errorf("unexpected spec name constructed volume.Spec: %s", spec.Name())
}
switch {
case spec.Volume != nil:
if spec.Volume.CSI == nil {
t.Error("missing CSIVolumeSource in constructed volume.Spec")
}
if spec.Volume.CSI.Driver != tc.originSpec.Volume.CSI.Driver {
t.Error("unexpected driver in constructed volume source:", spec.Volume.CSI.Driver)
}
case spec.PersistentVolume != nil:
if spec.PersistentVolume.Spec.CSI == nil {
t.Fatal("missing CSIPersistentVolumeSource in constructed volume.spec")
}
volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle
if volHandle != tc.originSpec.PersistentVolume.Spec.CSI.VolumeHandle {
t.Error("unexpected volumeHandle constructed in persistent volume source:", volHandle)
}
driverName := spec.PersistentVolume.Spec.CSI.Driver
if driverName != tc.originSpec.PersistentVolume.Spec.CSI.Driver {
t.Error("unexpected driverName constructed in persistent volume source:", driverName)
}
if spec.PersistentVolume.Spec.VolumeMode == nil {
t.Fatalf("Volume mode has not been set.")
}
if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode)
}
default:
t.Fatal("invalid volume.Spec constructed")
}
})
}
}
func TestPluginNewMounter(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.2.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
tests := []struct {
name string
spec *volume.Spec
podUID types.UID
namespace string
driverMode driverMode
shouldFail bool
}{
{
name: "mounter from persistent volume source",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv1", 20, testDriver, testVol), true),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
namespace: "test-ns1",
driverMode: persistentDriverMode,
},
{
name: "mounter from volume source",
spec: volume.NewSpecFromVolume(makeTestVol("test-vol1", testDriver)),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
namespace: "test-ns2",
driverMode: ephemeralDriverMode,
shouldFail: true, // csi inline not enabled
},
{
name: "mounter from no spec provided",
shouldFail: true,
},
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
for _, test := range tests {
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
// validate mounter fields
if string(csiMounter.driverName) != testDriver {
t.Error("mounter driver name not set")
registerFakePlugin(testDriver, "endpoint", []string{"1.2.0"}, t)
t.Run(test.name, func(t *testing.T) {
mounter, err := plug.NewMounter(
test.spec,
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: test.podUID, Namespace: test.namespace}},
volume.VolumeOptions{},
)
if test.shouldFail != (err != nil) {
t.Fatal("Unexpected error:", err)
}
if test.shouldFail && err != nil {
t.Log(err)
return
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
// validate mounter fields
if string(csiMounter.driverName) != testDriver {
t.Error("mounter driver name not set")
}
if csiMounter.volumeID == "" {
t.Error("mounter volume id not set")
}
if csiMounter.pod == nil {
t.Error("mounter pod not set")
}
if string(csiMounter.podUID) != string(test.podUID) {
t.Error("mounter podUID not set")
}
if csiMounter.csiClient == nil {
t.Error("mounter csiClient is nil")
}
if csiMounter.driverMode != test.driverMode {
t.Error("unexpected driver mode:", csiMounter.driverMode)
}
// ensure data file is created
dataDir := path.Dir(mounter.GetPath())
dataFile := filepath.Join(dataDir, volDataFileName)
if _, err := os.Stat(dataFile); err != nil {
if os.IsNotExist(err) {
t.Errorf("data file not created %s", dataFile)
} else {
t.Fatal(err)
}
}
data, err := loadVolumeData(dataDir, volDataFileName)
if err != nil {
t.Fatal(err)
}
if data[volDataKey.specVolID] != csiMounter.spec.Name() {
t.Error("volume data file unexpected specVolID:", data[volDataKey.specVolID])
}
if data[volDataKey.volHandle] != csiMounter.volumeID {
t.Error("volume data file unexpected volHandle:", data[volDataKey.volHandle])
}
if data[volDataKey.driverName] != string(csiMounter.driverName) {
t.Error("volume data file unexpected driverName:", data[volDataKey.driverName])
}
if data[volDataKey.nodeName] != string(csiMounter.plugin.host.GetNodeName()) {
t.Error("volume data file unexpected nodeName:", data[volDataKey.nodeName])
}
if data[volDataKey.driverMode] != string(test.driverMode) {
t.Error("volume data file unexpected driverMode:", data[volDataKey.driverMode])
}
})
}
if csiMounter.volumeID != testVol {
t.Error("mounter volume id not set")
}
if csiMounter.pod == nil {
t.Error("mounter pod not set")
}
if csiMounter.podUID == types.UID("") {
t.Error("mounter podUID not set")
}
if csiMounter.csiClient == nil {
t.Error("mounter csiClient is nil")
}
func TestPluginNewMounterWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
tests := []struct {
name string
spec *volume.Spec
podUID types.UID
namespace string
driverMode driverMode
shouldFail bool
}{
{
name: "mounter with missing spec",
shouldFail: true,
},
{
name: "mounter with spec with both volSrc and pvSrc",
spec: &volume.Spec{
Volume: makeTestVol("test-vol1", testDriver),
PersistentVolume: makeTestPV("test-pv1", 20, testDriver, testVol),
ReadOnly: true,
},
shouldFail: true,
},
{
name: "mounter with persistent volume source",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv1", 20, testDriver, testVol), true),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
namespace: "test-ns1",
driverMode: persistentDriverMode,
},
{
name: "mounter with volume source",
spec: volume.NewSpecFromVolume(makeTestVol("test-vol1", testDriver)),
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
namespace: "test-ns2",
driverMode: ephemeralDriverMode,
},
}
// ensure data file is created
dataDir := path.Dir(mounter.GetPath())
dataFile := filepath.Join(dataDir, volDataFileName)
if _, err := os.Stat(dataFile); err != nil {
if os.IsNotExist(err) {
t.Errorf("data file not created %s", dataFile)
} else {
t.Fatal(err)
}
for _, test := range tests {
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.2.0"}, t)
t.Run(test.name, func(t *testing.T) {
mounter, err := plug.NewMounter(
test.spec,
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: test.podUID, Namespace: test.namespace}},
volume.VolumeOptions{},
)
if test.shouldFail != (err != nil) {
t.Fatal("Unexpected error:", err)
}
if test.shouldFail && err != nil {
t.Log(err)
return
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
// validate mounter fields
if string(csiMounter.driverName) != testDriver {
t.Error("mounter driver name not set")
}
if csiMounter.volumeID == "" {
t.Error("mounter volume id not set")
}
if csiMounter.pod == nil {
t.Error("mounter pod not set")
}
if string(csiMounter.podUID) != string(test.podUID) {
t.Error("mounter podUID not set")
}
if csiMounter.csiClient == nil {
t.Error("mounter csiClient is nil")
}
if csiMounter.driverMode != test.driverMode {
t.Error("unexpected driver mode:", csiMounter.driverMode)
}
// ensure data file is created
dataDir := path.Dir(mounter.GetPath())
dataFile := filepath.Join(dataDir, volDataFileName)
if _, err := os.Stat(dataFile); err != nil {
if os.IsNotExist(err) {
t.Errorf("data file not created %s", dataFile)
} else {
t.Fatal(err)
}
}
data, err := loadVolumeData(dataDir, volDataFileName)
if err != nil {
t.Fatal(err)
}
if data[volDataKey.specVolID] != csiMounter.spec.Name() {
t.Error("volume data file unexpected specVolID:", data[volDataKey.specVolID])
}
if data[volDataKey.volHandle] != csiMounter.volumeID {
t.Error("volume data file unexpected volHandle:", data[volDataKey.volHandle])
}
if data[volDataKey.driverName] != string(csiMounter.driverName) {
t.Error("volume data file unexpected driverName:", data[volDataKey.driverName])
}
if data[volDataKey.nodeName] != string(csiMounter.plugin.host.GetNodeName()) {
t.Error("volume data file unexpected nodeName:", data[volDataKey.nodeName])
}
if data[volDataKey.driverMode] != string(csiMounter.driverMode) {
t.Error("volume data file unexpected driverMode:", data[volDataKey.driverMode])
}
})
}
}
@ -371,25 +861,36 @@ func TestPluginNewDetacher(t *testing.T) {
}
func TestPluginCanAttach(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()
tests := []struct {
name string
driverName string
spec *volume.Spec
canAttach bool
}{
{
name: "attachable",
driverName: "attachble-driver",
name: "non-attachable inline",
driverName: "attachable-inline",
spec: volume.NewSpecFromVolume(makeTestVol("test-vol", "attachable-inline")),
canAttach: false,
},
{
name: "attachable PV",
driverName: "attachable-pv",
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-vol", 20, "attachable-pv", testVol), true),
canAttach: true,
},
}
for _, test := range tests {
csiDriver := getCSIDriver(test.driverName, nil, &test.canAttach)
t.Run(test.name, func(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil)
fakeCSIClient := fakeclient.NewSimpleClientset(csiDriver)
plug, tmpDir := newTestPlugin(t, fakeCSIClient)
defer os.RemoveAll(tmpDir)
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driverName, "test-vol"), false)
pluginCanAttach := plug.CanAttach(spec)
pluginCanAttach := plug.CanAttach(test.spec)
if pluginCanAttach != test.canAttach {
t.Fatalf("expecting plugin.CanAttach %t got %t", test.canAttach, pluginCanAttach)
return

View File

@ -25,8 +25,10 @@ import (
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
utilstrings "k8s.io/utils/strings"
)
@ -90,12 +92,7 @@ func loadVolumeData(dir string, fileName string) (map[string]string, error) {
}
func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) {
if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.CSI != nil {
return spec.PersistentVolume.Spec.CSI, nil
}
return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
return getPVSourceFromSpec(spec)
}
func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) {
@ -140,3 +137,34 @@ func hasReadWriteOnce(modes []api.PersistentVolumeAccessMode) bool {
}
return false
}
// getSourceFromSpec returns either CSIVolumeSource or CSIPersistentVolumeSource, but not both
func getSourceFromSpec(spec *volume.Spec) (*api.CSIVolumeSource, *api.CSIPersistentVolumeSource, error) {
if spec == nil {
return nil, nil, fmt.Errorf("volume.Spec nil")
}
if spec.Volume != nil && spec.PersistentVolume != nil {
return nil, nil, fmt.Errorf("volume.Spec has both volume and persistent volume sources")
}
if spec.Volume != nil && spec.Volume.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
return spec.Volume.CSI, nil, nil
}
if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.CSI != nil {
return nil, spec.PersistentVolume.Spec.CSI, nil
}
return nil, nil, fmt.Errorf("volume source not found in volume.Spec")
}
// getPVSourceFromSpec ensures only CSIPersistentVolumeSource is present in volume.Spec
func getPVSourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) {
volSrc, pvSrc, err := getSourceFromSpec(spec)
if err != nil {
return nil, err
}
if volSrc != nil {
return nil, fmt.Errorf("unexpected api.CSIVolumeSource found in volume.Spec")
}
return pvSrc, nil
}

View File

@ -57,9 +57,12 @@ func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts
}
type CSIVolume struct {
VolumeContext map[string]string
Path string
MountFlags []string
VolumeHandle string
VolumeContext map[string]string
Path string
DeviceMountPath string
FSType string
MountFlags []string
}
// NodeClient returns CSI node client
@ -118,7 +121,6 @@ func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeCo
// NodePublishVolume implements CSI NodePublishVolume
func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodePublishVolumeResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
@ -135,9 +137,12 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli
return nil, errors.New("invalid fstype")
}
f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{
Path: req.GetTargetPath(),
VolumeContext: req.GetVolumeContext(),
MountFlags: req.GetVolumeCapability().GetMount().MountFlags,
VolumeHandle: req.GetVolumeId(),
Path: req.GetTargetPath(),
DeviceMountPath: req.GetStagingTargetPath(),
VolumeContext: req.GetVolumeContext(),
FSType: req.GetVolumeCapability().GetMount().GetFsType(),
MountFlags: req.GetVolumeCapability().GetMount().MountFlags,
}
return &csipb.NodePublishVolumeResponse{}, nil
}