From 923ad369c8a65c8aac47b22457e7e18bbcc1f335 Mon Sep 17 00:00:00 2001 From: Vladimir Vivien Date: Fri, 15 Feb 2019 17:56:26 -0500 Subject: [PATCH] CSI Inline Volume - Kubelet/Driver code impl --- pkg/volume/csi/csi_attacher.go | 34 +- pkg/volume/csi/csi_attacher_test.go | 603 ++++++++++++++++++++---- pkg/volume/csi/csi_client_test.go | 13 +- pkg/volume/csi/csi_mounter.go | 151 ++++-- pkg/volume/csi/csi_mounter_test.go | 261 +++++++++++ pkg/volume/csi/csi_plugin.go | 154 ++++++- pkg/volume/csi/csi_plugin_test.go | 687 ++++++++++++++++++++++++---- pkg/volume/csi/csi_util.go | 40 +- pkg/volume/csi/fake/fake_client.go | 19 +- 9 files changed, 1686 insertions(+), 276 deletions(-) diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 328f595b19..d4b84b4e65 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -63,15 +63,15 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string return "", errors.New("missing spec") } - csiSource, err := getCSISourceFromSpec(spec) + pvSrc, err := getPVSourceFromSpec(spec) if err != nil { - klog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err)) + klog.Error(log("attacher.Attach failed to get CSIPersistentVolumeSource: %v", err)) return "", err } node := string(nodeName) pvName := spec.PersistentVolume.GetName() - attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node) + attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node) attachment := &storage.VolumeAttachment{ ObjectMeta: meta.ObjectMeta{ @@ -79,7 +79,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string }, Spec: storage.VolumeAttachmentSpec{ NodeName: node, - Attacher: csiSource.Driver, + Attacher: pvSrc.Driver, Source: storage.VolumeAttachmentSource{ PersistentVolumeName: &pvName, }, @@ -97,12 +97,12 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string } if alreadyExist { - klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, csiSource.VolumeHandle)) + klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle)) } else { - klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, csiSource.VolumeHandle)) + klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle)) } - if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil { + if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil { return "", err } @@ -113,7 +113,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string } func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) { - source, err := getCSISourceFromSpec(spec) + source, err := getPVSourceFromSpec(spec) if err != nil { klog.Error(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err)) return "", err @@ -220,14 +220,18 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No klog.Error(log("attacher.VolumesAreAttached missing volume.Spec")) return nil, errors.New("missing spec") } - source, err := getCSISourceFromSpec(spec) + pvSrc, err := getPVSourceFromSpec(spec) if err != nil { - klog.Error(log("attacher.VolumesAreAttached failed: %v", err)) + attached[spec] = false + klog.Error(log("attacher.VolumesAreAttached failed to get CSIPersistentVolumeSource: %v", err)) continue } - skip, err := c.plugin.skipAttach(source.Driver) + driverName := pvSrc.Driver + volumeHandle := pvSrc.VolumeHandle + + skip, err := c.plugin.skipAttach(driverName) if err != nil { - klog.Error(log("Failed to check CSIDriver for %s: %s", source.Driver, err)) + klog.Error(log("Failed to check CSIDriver for %s: %s", driverName, err)) } else { if skip { // This volume is not attachable, pretend it's attached @@ -236,7 +240,7 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No } } - attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(nodeName)) + attachID := getAttachmentName(volumeHandle, driverName, string(nodeName)) klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID)) attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { @@ -285,9 +289,9 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo if spec == nil { return fmt.Errorf("attacher.MountDevice failed, spec is nil") } - csiSource, err := getCSISourceFromSpec(spec) + csiSource, err := getPVSourceFromSpec(spec) if err != nil { - klog.Error(log("attacher.MountDevice failed to get CSI persistent source: %v", err)) + klog.Error(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err)) return err } diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 1ef325c996..1de09af066 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -37,7 +37,6 @@ import ( fakeclient "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" utiltesting "k8s.io/client-go/util/testing" - "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" @@ -84,16 +83,17 @@ func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.R t.Error(err) } if attach != nil { - klog.Infof("stopping wait") + t.Logf("attachment found on try %d, stopping wait...", i) break } } - klog.Infof("stopped wait") + t.Logf("stopped waiting for attachment") if attach == nil { t.Logf("attachment not found for id:%v", attachID) } else { attach.Status = status + t.Logf("updating attachment %s with attach status %v", attachID, status) _, err := client.StorageV1().VolumeAttachments().Update(attach) if err != nil { t.Error(err) @@ -103,13 +103,13 @@ func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.R } func TestAttacherAttach(t *testing.T) { - testCases := []struct { name string nodeName string driverName string volumeName string attachID string + spec *volume.Spec injectAttacherError bool shouldFail bool }{ @@ -119,6 +119,7 @@ func TestAttacherAttach(t *testing.T) { driverName: "testdriver-01", volumeName: "testvol-01", attachID: getAttachmentName("testvol-01", "testdriver-01", "testnode-01"), + spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "testdriver-01", "testvol-01"), false), }, { name: "test ok 2", @@ -126,6 +127,7 @@ func TestAttacherAttach(t *testing.T) { driverName: "driver02", volumeName: "vol02", attachID: getAttachmentName("vol02", "driver02", "node02"), + spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver02", "vol02"), false), }, { name: "mismatch vol", @@ -133,6 +135,7 @@ func TestAttacherAttach(t *testing.T) { driverName: "driver02", volumeName: "vol01", attachID: getAttachmentName("vol02", "driver02", "node02"), + spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver02", "vol01"), false), shouldFail: true, }, { @@ -141,6 +144,7 @@ func TestAttacherAttach(t *testing.T) { driverName: "driver000", volumeName: "vol02", attachID: getAttachmentName("vol02", "driver02", "node02"), + spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver01", "vol02"), false), shouldFail: true, }, { @@ -149,6 +153,7 @@ func TestAttacherAttach(t *testing.T) { driverName: "driver000", volumeName: "vol02", attachID: getAttachmentName("vol02", "driver02", "node02"), + spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver02", "vol02"), false), shouldFail: true, }, { @@ -157,13 +162,31 @@ func TestAttacherAttach(t *testing.T) { driverName: "driver02", volumeName: "vol02", attachID: getAttachmentName("vol02", "driver02", "node02"), + spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver02", "vol02"), false), injectAttacherError: true, shouldFail: true, }, + { + name: "test with volume source", + nodeName: "node000", + driverName: "driver000", + volumeName: "vol02", + attachID: getAttachmentName("vol02", "driver02", "node02"), + spec: volume.NewSpecFromVolume(makeTestVol("pv01", "driver02")), + shouldFail: true, // csi not enabled + }, + { + name: "missing spec", + nodeName: "node000", + driverName: "driver000", + volumeName: "vol02", + attachID: getAttachmentName("vol02", "driver02", "node02"), + shouldFail: true, // csi not enabled + }, } // attacher loop - for i, tc := range testCases { + for _, tc := range testCases { t.Logf("test case: %s", tc.name) plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) @@ -175,9 +198,7 @@ func TestAttacherAttach(t *testing.T) { csiAttacher := attacher.(*csiAttacher) - spec := volume.NewSpecFromPersistentVolume(makeTestPV(fmt.Sprintf("test-pv%d", i), 10, tc.driverName, tc.volumeName), false) - - go func(id, nodename string, fail bool) { + go func(spec *volume.Spec, id, nodename string, fail bool) { attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename)) if !fail && err != nil { t.Errorf("expecting no failure, but got err: %v", err) @@ -188,7 +209,83 @@ func TestAttacherAttach(t *testing.T) { if attachID != id && !fail { t.Errorf("expecting attachID %v, got %v", id, attachID) } - }(tc.attachID, tc.nodeName, tc.shouldFail) + }(tc.spec, tc.attachID, tc.nodeName, tc.shouldFail) + + var status storage.VolumeAttachmentStatus + if tc.injectAttacherError { + status.Attached = false + status.AttachError = &storage.VolumeError{ + Message: "attacher error", + } + } else { + status.Attached = true + } + markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) + } +} + +func TestAttacherAttachWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + testCases := []struct { + name string + nodeName string + driverName string + volumeName string + attachID string + spec *volume.Spec + injectAttacherError bool + shouldFail bool + }{ + { + name: "test ok 1 with PV", + nodeName: "node01", + attachID: getAttachmentName("vol01", "driver01", "node01"), + spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv01", 10, "driver01", "vol01"), false), + }, + { + name: "test failure, attach with volSrc", + nodeName: "node01", + attachID: getAttachmentName("vol01", "driver01", "node01"), + spec: volume.NewSpecFromVolume(makeTestVol("vol01", "driver01")), + shouldFail: true, + }, + { + name: "attacher error", + nodeName: "node02", + attachID: getAttachmentName("vol02", "driver02", "node02"), + spec: volume.NewSpecFromPersistentVolume(makeTestPV("pv02", 10, "driver02", "vol02"), false), + injectAttacherError: true, + shouldFail: true, + }, + { + name: "missing spec", + nodeName: "node02", + attachID: getAttachmentName("vol02", "driver02", "node02"), + shouldFail: true, + }, + } + + // attacher loop + for _, tc := range testCases { + t.Logf("test case: %s", tc.name) + plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + csiAttacher := attacher.(*csiAttacher) + + go func(spec *volume.Spec, id, nodename string, fail bool) { + attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename)) + if fail != (err != nil) { + t.Errorf("expecting no failure, but got err: %v", err) + } + if attachID != id && !fail { + t.Errorf("expecting attachID %v, got %v", id, attachID) + } + }(tc.spec, tc.attachID, tc.nodeName, tc.shouldFail) var status storage.VolumeAttachmentStatus if tc.injectAttacherError { @@ -260,23 +357,25 @@ func TestAttacherWithCSIDriver(t *testing.T) { return } - expectedAttachID := getAttachmentName("test-vol", test.driver, "node") - status := storage.VolumeAttachmentStatus{ - Attached: true, - } - if test.expectVolumeAttachment { - go markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status) - } + go func(volSpec *volume.Spec, expectAttach bool) { + attachID, err := csiAttacher.Attach(volSpec, types.NodeName("node")) + if err != nil { + t.Errorf("Attach() failed: %s", err) + } + if expectAttach && attachID == "" { + t.Errorf("Expected attachID, got nothing") + } + if !expectAttach && attachID != "" { + t.Errorf("Expected empty attachID, got %q", attachID) + } + }(spec, test.expectVolumeAttachment) - attachID, err := csiAttacher.Attach(spec, types.NodeName("node")) - if err != nil { - t.Errorf("Attach() failed: %s", err) - } - if test.expectVolumeAttachment && attachID == "" { - t.Errorf("Expected attachID, got nothing") - } - if !test.expectVolumeAttachment && attachID != "" { - t.Errorf("Expected empty attachID, got %q", attachID) + if test.expectVolumeAttachment { + expectedAttachID := getAttachmentName("test-vol", test.driver, "node") + status := storage.VolumeAttachmentStatus{ + Attached: true, + } + markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status) } }) } @@ -354,6 +453,7 @@ func TestAttacherWaitForAttach(t *testing.T) { name string driver string makeAttachment func() *storage.VolumeAttachment + spec *volume.Spec expectedAttachID string expectError bool }{ @@ -367,9 +467,22 @@ func TestAttacherWaitForAttach(t *testing.T) { successfulAttachment.Status.Attached = true return successfulAttachment }, + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false), expectedAttachID: getAttachmentName("test-vol", "attachable", "node"), expectError: false, }, + { + name: "failed attach with vol source", + makeAttachment: func() *storage.VolumeAttachment { + + testAttachID := getAttachmentName("test-vol", "attachable", "node") + successfulAttachment := makeTestAttachment(testAttachID, "node", "volSrc01") + successfulAttachment.Status.Attached = true + return successfulAttachment + }, + spec: volume.NewSpecFromVolume(makeTestVol("volSrc01", "attachable")), + expectError: true, + }, { name: "failed attach", driver: "attachable", @@ -387,7 +500,6 @@ func TestAttacherWaitForAttach(t *testing.T) { t.Fatalf("failed to create new attacher: %v", err) } csiAttacher := attacher.(*csiAttacher) - spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false) if test.makeAttachment != nil { attachment := test.makeAttachment() @@ -402,7 +514,7 @@ func TestAttacherWaitForAttach(t *testing.T) { t.Logf("created test VolumeAttachment %+v", gotAttachment) } - attachID, err := csiAttacher.WaitForAttach(spec, "", nil, time.Second) + attachID, err := csiAttacher.WaitForAttach(test.spec, "", nil, time.Second) if err != nil && !test.expectError { t.Errorf("Unexpected error: %s", err) } @@ -416,6 +528,86 @@ func TestAttacherWaitForAttach(t *testing.T) { } } +func TestAttacherWaitForAttachWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + + tests := []struct { + name string + driver string + makeAttachment func() *storage.VolumeAttachment + spec *volume.Spec + expectedAttachID string + expectError bool + }{ + { + name: "successful attach with PV", + makeAttachment: func() *storage.VolumeAttachment { + + testAttachID := getAttachmentName("test-vol", "attachable", "node") + successfulAttachment := makeTestAttachment(testAttachID, "node", "test-pv") + successfulAttachment.Status.Attached = true + return successfulAttachment + }, + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false), + expectedAttachID: getAttachmentName("test-vol", "attachable", "node"), + expectError: false, + }, + { + name: "failed attach with volSrc", + makeAttachment: func() *storage.VolumeAttachment { + + testAttachID := getAttachmentName("test-vol", "attachable", "node") + successfulAttachment := makeTestAttachment(testAttachID, "node", "volSrc01") + successfulAttachment.Status.Attached = true + return successfulAttachment + }, + spec: volume.NewSpecFromVolume(makeTestVol("volSrc01", "attachable")), + expectError: true, + }, + { + name: "failed attach", + driver: "non-attachable", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "non-attachable", "test-vol"), false), + expectError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + plug, _, tmpDir, _ := newTestWatchPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + csiAttacher := attacher.(*csiAttacher) + + if test.makeAttachment != nil { + attachment := test.makeAttachment() + _, err = csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to create VolumeAttachment: %v", err) + } + gotAttachment, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Get(attachment.Name, meta.GetOptions{}) + if err != nil { + t.Fatalf("failed to get created VolumeAttachment: %v", err) + } + t.Logf("created test VolumeAttachment %+v", gotAttachment) + } + + attachID, err := csiAttacher.WaitForAttach(test.spec, "", nil, time.Second) + if test.expectError != (err != nil) { + t.Errorf("Unexpected error: %s", err) + return + } + if attachID != test.expectedAttachID { + t.Errorf("Expected attachID %q, got %q", test.expectedAttachID, attachID) + } + }) + } +} + func TestAttacherWaitForVolumeAttachment(t *testing.T) { nodeName := "test-node" testCases := []struct { @@ -518,60 +710,165 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { } func TestAttacherVolumesAreAttached(t *testing.T) { - plug, tmpDir := newTestPlugin(t, nil) - defer os.RemoveAll(tmpDir) - - attacher, err := plug.NewAttacher() - if err != nil { - t.Fatalf("failed to create new attacher: %v", err) + type attachedSpec struct { + volName string + spec *volume.Spec + attached bool } - csiAttacher := attacher.(*csiAttacher) - nodeName := "test-node" - testCases := []struct { name string - attachedStats map[string]bool + attachedSpecs []attachedSpec }{ - {"attach + detach", map[string]bool{"vol-01": true, "vol-02": true, "vol-03": false, "vol-04": false, "vol-05": true}}, - {"all detached", map[string]bool{"vol-11": false, "vol-12": false, "vol-13": false, "vol-14": false, "vol-15": false}}, - {"all attached", map[string]bool{"vol-21": true, "vol-22": true, "vol-23": true, "vol-24": true, "vol-25": true}}, + { + "attach and detach", + []attachedSpec{ + {"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), true}, + {"vol1", volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol1"), false), true}, + {"vol2", volume.NewSpecFromPersistentVolume(makeTestPV("pv2", 10, testDriver, "vol2"), false), false}, + {"vol3", volume.NewSpecFromPersistentVolume(makeTestPV("pv3", 10, testDriver, "vol3"), false), false}, + {"vol4", volume.NewSpecFromPersistentVolume(makeTestPV("pv4", 20, testDriver, "vol4"), false), true}, + }, + }, + { + "all detached", + []attachedSpec{ + {"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), false}, + {"vol1", volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol1"), false), false}, + {"vol2", volume.NewSpecFromPersistentVolume(makeTestPV("pv2", 10, testDriver, "vol2"), false), false}, + }, + }, + { + "all attached", + []attachedSpec{ + {"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), true}, + {"vol1", volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol1"), false), true}, + }, + }, + { + "include non-attable", + []attachedSpec{ + {"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), true}, + {"vol1", volume.NewSpecFromVolume(makeTestVol("pv1", testDriver)), false}, + }, + }, } for _, tc := range testCases { - var specs []*volume.Spec - // create and save volume attchments - for volName, stat := range tc.attachedStats { - pv := makeTestPV("test-pv", 10, testDriver, volName) - spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) - specs = append(specs, spec) - attachID := getAttachmentName(volName, testDriver, nodeName) - attachment := makeTestAttachment(attachID, nodeName, pv.GetName()) - attachment.Status.Attached = stat - _, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment) - if err != nil { - t.Fatalf("failed to attach: %v", err) - } - } + t.Run(tc.name, func(t *testing.T) { + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) - // retrieve attached status - stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName)) - if err != nil { - t.Fatal(err) - } - if len(tc.attachedStats) != len(stats) { - t.Errorf("expecting %d attachment status, got %d", len(tc.attachedStats), len(stats)) - } - - // compare attachment status for each spec - for spec, stat := range stats { - source, err := getCSISourceFromSpec(spec) + attacher, err := plug.NewAttacher() if err != nil { - t.Error(err) + t.Fatalf("failed to create new attacher: %v", err) } - if stat != tc.attachedStats[source.VolumeHandle] { - t.Errorf("expecting volume attachment %t, got %t", tc.attachedStats[source.VolumeHandle], stat) + csiAttacher := attacher.(*csiAttacher) + nodeName := "test-node" + + var specs []*volume.Spec + // create and save volume attchments + for _, attachedSpec := range tc.attachedSpecs { + specs = append(specs, attachedSpec.spec) + attachID := getAttachmentName(attachedSpec.volName, testDriver, nodeName) + attachment := makeTestAttachment(attachID, nodeName, attachedSpec.spec.Name()) + attachment.Status.Attached = attachedSpec.attached + _, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to attach: %v", err) + } } - } + + // retrieve attached status + stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName)) + if err != nil { + t.Fatal(err) + } + if len(tc.attachedSpecs) != len(stats) { + t.Errorf("expecting %d attachment status, got %d", len(tc.attachedSpecs), len(stats)) + } + + // compare attachment status for each spec + for _, attached := range tc.attachedSpecs { + stat, ok := stats[attached.spec] + if attached.attached && !ok { + t.Error("failed to retrieve attached status for:", attached.spec) + } + if attached.attached != stat { + t.Errorf("expecting volume attachment %t, got %t", attached.attached, stat) + } + } + }) + } +} + +func TestAttacherVolumesAreAttachedWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + type attachedSpec struct { + volName string + spec *volume.Spec + attached bool + } + testCases := []struct { + name string + attachedSpecs []attachedSpec + }{ + { + "attach and detach with volume sources", + []attachedSpec{ + {"vol0", volume.NewSpecFromPersistentVolume(makeTestPV("pv0", 10, testDriver, "vol0"), false), true}, + {"vol1", volume.NewSpecFromVolume(makeTestVol("pv1", testDriver)), false}, + {"vol2", volume.NewSpecFromPersistentVolume(makeTestPV("pv2", 10, testDriver, "vol2"), false), true}, + {"vol3", volume.NewSpecFromVolume(makeTestVol("pv3", testDriver)), false}, + {"vol4", volume.NewSpecFromPersistentVolume(makeTestPV("pv4", 20, testDriver, "vol4"), false), true}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + csiAttacher := attacher.(*csiAttacher) + nodeName := "test-node" + + var specs []*volume.Spec + // create and save volume attchments + for _, attachedSpec := range tc.attachedSpecs { + specs = append(specs, attachedSpec.spec) + attachID := getAttachmentName(attachedSpec.volName, testDriver, nodeName) + attachment := makeTestAttachment(attachID, nodeName, attachedSpec.spec.Name()) + attachment.Status.Attached = attachedSpec.attached + _, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to attach: %v", err) + } + } + + // retrieve attached status + stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName)) + if err != nil { + t.Fatal(err) + } + if len(tc.attachedSpecs) != len(stats) { + t.Errorf("expecting %d attachment status, got %d", len(tc.attachedSpecs), len(stats)) + } + + // compare attachment status for each spec + for _, attached := range tc.attachedSpecs { + stat, ok := stats[attached.spec] + if attached.attached && !ok { + t.Error("failed to retrieve attached status for:", attached.spec) + } + if attached.attached != stat { + t.Errorf("expecting volume attachment %t, got %t", attached.attached, stat) + } + } + }) } } @@ -708,6 +1005,7 @@ func TestAttacherGetDeviceMountPath(t *testing.T) { } func TestAttacherMountDevice(t *testing.T) { + pvName := "test-pv" testCases := []struct { testName string volName string @@ -715,13 +1013,15 @@ func TestAttacherMountDevice(t *testing.T) { deviceMountPath string stageUnstageSet bool shouldFail bool + spec *volume.Spec }{ { - testName: "normal", + testName: "normal PV", volName: "test-vol1", devicePath: "path1", deviceMountPath: "path2", stageUnstageSet: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), }, { testName: "no vol name", @@ -730,6 +1030,7 @@ func TestAttacherMountDevice(t *testing.T) { deviceMountPath: "path2", stageUnstageSet: true, shouldFail: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, ""), false), }, { testName: "no device path", @@ -738,6 +1039,7 @@ func TestAttacherMountDevice(t *testing.T) { deviceMountPath: "path2", stageUnstageSet: true, shouldFail: false, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), }, { testName: "no device mount path", @@ -746,6 +1048,7 @@ func TestAttacherMountDevice(t *testing.T) { deviceMountPath: "", stageUnstageSet: true, shouldFail: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), }, { testName: "stage_unstage cap not set", @@ -753,13 +1056,20 @@ func TestAttacherMountDevice(t *testing.T) { devicePath: "path1", deviceMountPath: "path2", stageUnstageSet: false, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + }, + { + testName: "failure with volume source", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + shouldFail: true, + spec: volume.NewSpecFromVolume(makeTestVol(pvName, testDriver)), }, } for _, tc := range testCases { t.Logf("Running test case: %s", tc.testName) - var spec *volume.Spec - pvName := "test-pv" // Setup // Create a new attacher @@ -777,11 +1087,6 @@ func TestAttacherMountDevice(t *testing.T) { } nodeName := string(csiAttacher.plugin.host.GetNodeName()) - - // Create spec - pv := makeTestPV(pvName, 10, testDriver, tc.volName) - spec = volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) - attachID := getAttachmentName(tc.volName, testDriver, nodeName) // Set up volume attachment @@ -795,7 +1100,147 @@ func TestAttacherMountDevice(t *testing.T) { }() // Run - err = csiAttacher.MountDevice(spec, tc.devicePath, tc.deviceMountPath) + err = csiAttacher.MountDevice(tc.spec, tc.devicePath, tc.deviceMountPath) + + // Verify + if err != nil { + if !tc.shouldFail { + t.Errorf("test should not fail, but error occurred: %v", err) + } + continue + } + if err == nil && tc.shouldFail { + t.Errorf("test should fail, but no error occurred") + } + + // Verify call goes through all the way + numStaged := 1 + if !tc.stageUnstageSet { + numStaged = 0 + } + + cdc := csiAttacher.csiClient.(*fakeCsiDriverClient) + staged := cdc.nodeClient.GetNodeStagedVolumes() + if len(staged) != numStaged { + t.Errorf("got wrong number of staged volumes, expecting %v got: %v", numStaged, len(staged)) + } + if tc.stageUnstageSet { + vol, ok := staged[tc.volName] + if !ok { + t.Errorf("could not find staged volume: %s", tc.volName) + } + if vol.Path != tc.deviceMountPath { + t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, vol.Path) + } + } + } +} + +func TestAttacherMountDeviceWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + pvName := "test-pv" + testCases := []struct { + testName string + volName string + devicePath string + deviceMountPath string + stageUnstageSet bool + shouldFail bool + spec *volume.Spec + }{ + { + testName: "normal PV", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + }, + { + testName: "failure with volSrc", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + shouldFail: true, + spec: volume.NewSpecFromVolume(makeTestVol(pvName, testDriver)), + }, + { + testName: "no vol name", + volName: "", + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: true, + shouldFail: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, ""), false), + }, + { + testName: "no device path", + volName: "test-vol1", + devicePath: "", + deviceMountPath: "path2", + stageUnstageSet: true, + shouldFail: false, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + }, + { + testName: "no device mount path", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "", + stageUnstageSet: true, + shouldFail: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + }, + { + testName: "stage_unstage cap not set", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: false, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + }, + { + testName: "missing spec", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + shouldFail: true, + }, + } + + for _, tc := range testCases { + t.Logf("Running test case: %s", tc.testName) + + // Setup + // Create a new attacher + plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil) + defer os.RemoveAll(tmpDir) + attacher, err0 := plug.NewAttacher() + if err0 != nil { + t.Fatalf("failed to create new attacher: %v", err0) + } + csiAttacher := attacher.(*csiAttacher) + csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) + + if tc.deviceMountPath != "" { + tc.deviceMountPath = filepath.Join(tmpDir, tc.deviceMountPath) + } + + nodeName := string(csiAttacher.plugin.host.GetNodeName()) + attachID := getAttachmentName(tc.volName, testDriver, nodeName) + + // Set up volume attachment + attachment := makeTestAttachment(attachID, nodeName, pvName) + _, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to attach: %v", err) + } + go func() { + fakeWatcher.Delete(attachment) + }() + + // Run + err = csiAttacher.MountDevice(tc.spec, tc.devicePath, tc.deviceMountPath) // Verify if err != nil { diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index 8e5c05ba8f..af8fe63f5d 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -76,12 +76,13 @@ func (c *fakeCsiDriverClient) NodePublishVolume( ) error { c.t.Log("calling fake.NodePublishVolume...") req := &csipbv1.NodePublishVolumeRequest{ - VolumeId: volID, - TargetPath: targetPath, - Readonly: readOnly, - PublishContext: publishContext, - VolumeContext: volumeContext, - Secrets: secrets, + VolumeId: volID, + TargetPath: targetPath, + StagingTargetPath: stagingTargetPath, + Readonly: readOnly, + PublishContext: publishContext, + VolumeContext: volumeContext, + Secrets: secrets, VolumeCapability: &csipbv1.VolumeCapability{ AccessMode: &csipbv1.VolumeCapability_AccessMode{ Mode: asCSIAccessModeV1(accessMode), diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 1630aa5076..f61ccc33d2 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -18,6 +18,7 @@ package csi import ( "context" + "crypto/sha256" "errors" "fmt" "os" @@ -42,13 +43,15 @@ var ( volHandle, driverName, nodeName, - attachmentID string + attachmentID, + driverMode string }{ "specVolID", "volumeHandle", "driverName", "nodeName", "attachmentID", + "driverMode", } ) @@ -57,6 +60,7 @@ type csiMountMgr struct { k8s kubernetes.Interface plugin *csiPlugin driverName csiDriverName + driverMode driverMode volumeID string specVolumeID string readOnly bool @@ -107,49 +111,97 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { return nil } - csiSource, err := getCSISourceFromSpec(c.spec) + csi := c.csiClient + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + + volSrc, pvSrc, err := getSourceFromSpec(c.spec) if err != nil { klog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err)) return err } - csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) - defer cancel() + driverName := c.driverName + volumeHandle := c.volumeID + readOnly := c.readOnly + accessMode := api.ReadWriteOnce - // Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so - deviceMountPath := "" - stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) - if err != nil { - klog.Error(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capabilty: %v", err)) - return err - } + var ( + fsType string + volAttribs map[string]string + nodePublishSecrets map[string]string + publishContext map[string]string + mountOptions []string + deviceMountPath string + secretRef *api.SecretReference + ) - if stageUnstageSet { - deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec) + switch { + case volSrc != nil: + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { + return fmt.Errorf("CSIInlineVolume feature required") + } + if c.driverMode != ephemeralDriverMode { + return fmt.Errorf("unexpected driver mode: %s", c.driverMode) + } + if volSrc.FSType != nil { + fsType = *volSrc.FSType + } + + volAttribs = volSrc.VolumeAttributes + + if volSrc.NodePublishSecretRef != nil { + secretName := volSrc.NodePublishSecretRef.Name + ns := c.pod.Namespace + secretRef = &api.SecretReference{Name: secretName, Namespace: ns} + } + case pvSrc != nil: + if c.driverMode != persistentDriverMode { + return fmt.Errorf("unexpected driver mode: %s", c.driverMode) + } + + fsType = pvSrc.FSType + + volAttribs = pvSrc.VolumeAttributes + + if pvSrc.NodePublishSecretRef != nil { + secretRef = pvSrc.NodePublishSecretRef + } + + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI + if c.spec.PersistentVolume.Spec.AccessModes != nil { + accessMode = c.spec.PersistentVolume.Spec.AccessModes[0] + } + + mountOptions = c.spec.PersistentVolume.Spec.MountOptions + + // Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so + stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) if err != nil { - klog.Error(log("mounter.SetUpAt failed to make device mount path: %v", err)) + klog.Error(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capabilty: %v", err)) return err } - } - // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName - if c.publishContext == nil { - nodeName := string(c.plugin.host.GetNodeName()) - c.publishContext, err = c.plugin.getPublishContext(c.k8s, c.volumeID, string(c.driverName), nodeName) - if err != nil { - return err - } - } - attribs := csiSource.VolumeAttributes - - nodePublishSecrets := map[string]string{} - if csiSource.NodePublishSecretRef != nil { - nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodePublishSecretRef) - if err != nil { - return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v", - csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err) + if stageUnstageSet { + deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec) + if err != nil { + klog.Error(log("mounter.SetUpAt failed to make device mount path: %v", err)) + return err + } } + + // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName + if c.publishContext == nil { + nodeName := string(c.plugin.host.GetNodeName()) + c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName) + if err != nil { + return err + } + publishContext = c.publishContext + } + + default: + return fmt.Errorf("volume source not found in volume.Spec") } // create target_dir before call to NodePublish @@ -159,10 +211,14 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { } klog.V(4).Info(log("created target path successfully [%s]", dir)) - //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI - accessMode := api.ReadWriteOnce - if c.spec.PersistentVolume.Spec.AccessModes != nil { - accessMode = c.spec.PersistentVolume.Spec.AccessModes[0] + nodePublishSecrets = map[string]string{} + if secretRef != nil { + nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef) + if err != nil { + return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v", + secretRef.Namespace, secretRef.Name, err) + } + } // Inject pod information into volume_attributes @@ -172,28 +228,27 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { return err } if podAttrs != nil { - if attribs == nil { - attribs = podAttrs + if volAttribs == nil { + volAttribs = podAttrs } else { for k, v := range podAttrs { - attribs[k] = v + volAttribs[k] = v } } } - fsType := csiSource.FSType err = csi.NodePublishVolume( ctx, - c.volumeID, - c.readOnly, + volumeHandle, + readOnly, deviceMountPath, dir, accessMode, - c.publishContext, - attribs, + publishContext, + volAttribs, nodePublishSecrets, fsType, - c.spec.PersistentVolume.Spec.MountOptions, + mountOptions, ) if err != nil { @@ -387,3 +442,9 @@ func removeMountDir(plug *csiPlugin, mountPath string) error { } return nil } + +// makeVolumeHandle returns csi- +func makeVolumeHandle(podUID, volSourceSpecName string) string { + result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", podUID, volSourceSpecName))) + return fmt.Sprintf("csi-%x", result) +} diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 2b02373482..e7f3edc741 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "os" "path" "testing" @@ -266,6 +267,266 @@ func TestMounterSetUp(t *testing.T) { MounterSetUpTests(t, false) }) } + +func TestMounterSetUpSimple(t *testing.T) { + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) + defer os.RemoveAll(tmpDir) + + testCases := []struct { + name string + podUID types.UID + mode driverMode + fsType string + options []string + spec func(string, []string) *volume.Spec + shouldFail bool + }{ + { + name: "setup with vol source", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + mode: ephemeralDriverMode, + fsType: "ext4", + shouldFail: true, + spec: func(fsType string, options []string) *volume.Spec { + volSrc := makeTestVol("pv1", testDriver) + volSrc.CSI.FSType = &fsType + return volume.NewSpecFromVolume(volSrc) + }, + }, + { + name: "setup with persistent source", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + mode: persistentDriverMode, + fsType: "zfs", + spec: func(fsType string, options []string) *volume.Spec { + pvSrc := makeTestPV("pv1", 20, testDriver, "vol1") + pvSrc.Spec.CSI.FSType = fsType + pvSrc.Spec.MountOptions = options + return volume.NewSpecFromPersistentVolume(pvSrc, false) + }, + }, + { + name: "setup with persistent source without unspecified fstype and options", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + mode: persistentDriverMode, + spec: func(fsType string, options []string) *volume.Spec { + return volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol2"), false) + }, + }, + { + name: "setup with missing spec", + shouldFail: true, + spec: func(fsType string, options []string) *volume.Spec { return nil }, + }, + } + + for _, tc := range testCases { + registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) + t.Run(tc.name, func(t *testing.T) { + mounter, err := plug.NewMounter( + tc.spec(tc.fsType, tc.options), + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if tc.shouldFail && err != nil { + t.Log(err) + return + } + if !tc.shouldFail && err != nil { + t.Fatal("unexpected error:", err) + } + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + + csiMounter := mounter.(*csiMountMgr) + csiMounter.csiClient = setupClient(t, true) + + if csiMounter.driverMode != persistentDriverMode { + t.Fatal("unexpected driver mode: ", csiMounter.driverMode) + } + + attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName())) + attachment := makeTestAttachment(attachID, "test-node", csiMounter.spec.Name()) + _, err = csiMounter.k8s.StorageV1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + + // Mounter.SetUp() + if err := csiMounter.SetUp(nil); err != nil { + t.Fatalf("mounter.Setup failed: %v", err) + } + + // ensure call went all the way + pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + vol, ok := pubs[csiMounter.volumeID] + if !ok { + t.Error("csi server may not have received NodePublishVolume call") + } + if vol.VolumeHandle != csiMounter.volumeID { + t.Error("volumeHandle not sent to CSI driver properly") + } + + devicePath, err := makeDeviceMountPath(plug, csiMounter.spec) + if err != nil { + t.Fatal(err) + } + if vol.DeviceMountPath != devicePath { + t.Errorf("DeviceMountPath not sent properly to CSI driver: %s, %s", vol.DeviceMountPath, devicePath) + } + + if !reflect.DeepEqual(vol.MountFlags, csiMounter.spec.PersistentVolume.Spec.MountOptions) { + t.Errorf("unexpected mount flags passed to driver: %+v", vol.MountFlags) + } + + if vol.FSType != tc.fsType { + t.Error("unexpected FSType sent to driver:", vol.FSType) + } + + if vol.Path != csiMounter.GetPath() { + t.Error("csi server may not have received NodePublishVolume call") + } + }) + } +} +func TestMounterSetUpWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) + defer os.RemoveAll(tmpDir) + + testCases := []struct { + name string + podUID types.UID + mode driverMode + fsType string + options []string + spec func(string, []string) *volume.Spec + shouldFail bool + }{ + { + name: "setup with vol source", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + mode: ephemeralDriverMode, + fsType: "ext4", + spec: func(fsType string, options []string) *volume.Spec { + volSrc := makeTestVol("pv1", testDriver) + volSrc.CSI.FSType = &fsType + return volume.NewSpecFromVolume(volSrc) + }, + }, + { + name: "setup with persistent source", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + mode: persistentDriverMode, + fsType: "zfs", + spec: func(fsType string, options []string) *volume.Spec { + pvSrc := makeTestPV("pv1", 20, testDriver, "vol1") + pvSrc.Spec.CSI.FSType = fsType + pvSrc.Spec.MountOptions = options + return volume.NewSpecFromPersistentVolume(pvSrc, false) + }, + }, + { + name: "setup with persistent source without unspecified fstype and options", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + mode: persistentDriverMode, + spec: func(fsType string, options []string) *volume.Spec { + return volume.NewSpecFromPersistentVolume(makeTestPV("pv1", 20, testDriver, "vol2"), false) + }, + }, + { + name: "setup with missing spec", + shouldFail: true, + spec: func(fsType string, options []string) *volume.Spec { return nil }, + }, + } + + for _, tc := range testCases { + registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) + t.Run(tc.name, func(t *testing.T) { + mounter, err := plug.NewMounter( + tc.spec(tc.fsType, tc.options), + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if tc.shouldFail && err != nil { + t.Log(err) + return + } + if !tc.shouldFail && err != nil { + t.Fatal("unexpected error:", err) + } + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + + csiMounter := mounter.(*csiMountMgr) + csiMounter.csiClient = setupClient(t, true) + + if csiMounter.driverMode != tc.mode { + t.Fatal("unexpected driver mode: ", csiMounter.driverMode) + } + + if csiMounter.driverMode == ephemeralDriverMode && csiMounter.volumeID != makeVolumeHandle(string(tc.podUID), csiMounter.specVolumeID) { + t.Fatal("unexpected generated volumeHandle:", csiMounter.volumeID) + } + + if csiMounter.driverMode == persistentDriverMode { + attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName())) + attachment := makeTestAttachment(attachID, "test-node", csiMounter.spec.Name()) + _, err = csiMounter.k8s.StorageV1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + } + + // Mounter.SetUp() + if err := csiMounter.SetUp(nil); err != nil { + t.Fatalf("mounter.Setup failed: %v", err) + } + + // ensure call went all the way + pubs := csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + vol, ok := pubs[csiMounter.volumeID] + if !ok { + t.Error("csi server may not have received NodePublishVolume call") + } + if vol.VolumeHandle != csiMounter.volumeID { + t.Error("volumeHandle not sent to CSI driver properly") + } + + // validate stagingTargetPath + if tc.mode == ephemeralDriverMode && vol.DeviceMountPath != "" { + t.Errorf("unexpected devicePathTarget sent to driver: %s", vol.DeviceMountPath) + } + if tc.mode == persistentDriverMode { + devicePath, err := makeDeviceMountPath(plug, csiMounter.spec) + if err != nil { + t.Fatal(err) + } + if vol.DeviceMountPath != devicePath { + t.Errorf("DeviceMountPath not sent properly to CSI driver: %s, %s", vol.DeviceMountPath, devicePath) + } + + if !reflect.DeepEqual(vol.MountFlags, csiMounter.spec.PersistentVolume.Spec.MountOptions) { + t.Errorf("unexpected mount flags passed to driver: %+v", vol.MountFlags) + } + } + + if vol.FSType != tc.fsType { + t.Error("unexpected FSType sent to driver:", vol.FSType) + } + + if vol.Path != csiMounter.GetPath() { + t.Error("csi server may not have received NodePublishVolume call") + } + }) + } +} func TestMounterSetUpWithFSGroup(t *testing.T) { fakeClient := fakeclient.NewSimpleClientset() plug, tmpDir := newTestPlugin(t, fakeClient) diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 31cbdf7296..f1ac67aced 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -74,6 +74,12 @@ type csiPlugin struct { csiDriverInformer csiinformer.CSIDriverInformer } +//TODO (vladimirvivien) add this type to storage api +type driverMode string + +const persistentDriverMode driverMode = "persistent" +const ephemeralDriverMode driverMode = "ephemeral" + // ProbeVolumePlugins returns implemented plugins func ProbeVolumePlugins() []volume.VolumePlugin { p := &csiPlugin{ @@ -303,7 +309,7 @@ func (p *csiPlugin) GetPluginName() string { // GetvolumeName returns a concatenated string of CSIVolumeSource.DriverCSIVolumeSource.VolumeHandle // That string value is used in Detach() to extract driver name and volumeName. func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) { - csi, err := getCSISourceFromSpec(spec) + csi, err := getPVSourceFromSpec(spec) if err != nil { klog.Error(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err)) return "", err @@ -316,6 +322,14 @@ func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) { func (p *csiPlugin) CanSupport(spec *volume.Spec) bool { // TODO (vladimirvivien) CanSupport should also take into account // the availability/registration of specified Driver in the volume source + if spec == nil { + return false + } + if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { + return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil) || + (spec.Volume != nil && spec.Volume.CSI != nil) + } + return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil } @@ -331,11 +345,34 @@ func (p *csiPlugin) NewMounter( spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { - pvSource, err := getCSISourceFromSpec(spec) + + volSrc, pvSrc, err := getSourceFromSpec(spec) if err != nil { return nil, err } - readOnly, err := getReadOnlyFromSpec(spec) + + var ( + driverName string + volumeHandle string + readOnly bool + ) + + switch { + case volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume): + volumeHandle = makeVolumeHandle(string(pod.UID), spec.Name()) + driverName = volSrc.Driver + if volSrc.ReadOnly != nil { + readOnly = *volSrc.ReadOnly + } + case pvSrc != nil: + driverName = pvSrc.Driver + volumeHandle = pvSrc.VolumeHandle + readOnly = spec.ReadOnly + default: + return nil, fmt.Errorf("volume source not found in volume.Spec") + } + + driverMode, err := p.getDriverMode(spec) if err != nil { return nil, err } @@ -346,7 +383,7 @@ func (p *csiPlugin) NewMounter( return nil, errors.New("failed to get a Kubernetes client") } - csi, err := newCsiDriverClient(csiDriverName(pvSource.Driver)) + csi, err := newCsiDriverClient(csiDriverName(driverName)) if err != nil { return nil, err } @@ -357,8 +394,9 @@ func (p *csiPlugin) NewMounter( spec: spec, pod: pod, podUID: pod.UID, - driverName: csiDriverName(pvSource.Driver), - volumeID: pvSource.VolumeHandle, + driverName: csiDriverName(driverName), + driverMode: driverMode, + volumeID: volumeHandle, specVolumeID: spec.Name(), csiClient: csi, readOnly: readOnly, @@ -376,15 +414,17 @@ func (p *csiPlugin) NewMounter( // persist volume info data for teardown node := string(p.host.GetNodeName()) - attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node) volData := map[string]string{ - volDataKey.specVolID: spec.Name(), - volDataKey.volHandle: pvSource.VolumeHandle, - volDataKey.driverName: pvSource.Driver, - volDataKey.nodeName: node, - volDataKey.attachmentID: attachID, + volDataKey.specVolID: spec.Name(), + volDataKey.volHandle: volumeHandle, + volDataKey.driverName: driverName, + volDataKey.nodeName: node, + volDataKey.driverMode: string(driverMode), } + attachID := getAttachmentName(volumeHandle, driverName, node) + volData[volDataKey.attachmentID] = attachID + if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil { klog.Error(log("failed to save volume info data: %v", err)) if err := os.RemoveAll(dataDir); err != nil { @@ -437,23 +477,58 @@ func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.S klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData)) + var spec *volume.Spec + inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) + + if inlineEnabled { + mode := driverMode(volData[volDataKey.driverMode]) + switch { + case mode == ephemeralDriverMode: + spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName]) + + case mode == persistentDriverMode: + fallthrough + default: + spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle]) + } + } else { + spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle]) + } + + return spec, nil +} + +// constructVolSourceSpec constructs volume.Spec with CSIVolumeSource +func (p *csiPlugin) constructVolSourceSpec(volSpecName, driverName string) *volume.Spec { + vol := &api.Volume{ + Name: volSpecName, + VolumeSource: api.VolumeSource{ + CSI: &api.CSIVolumeSource{ + Driver: driverName, + }, + }, + } + return volume.NewSpecFromVolume(vol) +} + +//constructPVSourceSpec constructs volume.Spec with CSIPersistentVolumeSource +func (p *csiPlugin) constructPVSourceSpec(volSpecName, driverName, volumeHandle string) *volume.Spec { fsMode := api.PersistentVolumeFilesystem pv := &api.PersistentVolume{ ObjectMeta: meta.ObjectMeta{ - Name: volData[volDataKey.specVolID], + Name: volSpecName, }, Spec: api.PersistentVolumeSpec{ PersistentVolumeSource: api.PersistentVolumeSource{ CSI: &api.CSIPersistentVolumeSource{ - Driver: volData[volDataKey.driverName], - VolumeHandle: volData[volDataKey.volHandle], + Driver: driverName, + VolumeHandle: volumeHandle, }, }, VolumeMode: &fsMode, }, } - - return volume.NewSpecFromPersistentVolume(pv, false), nil + return volume.NewSpecFromPersistentVolume(pv, false) } func (p *csiPlugin) SupportsMountOption() bool { @@ -506,24 +581,31 @@ func (p *csiPlugin) NewDetacher() (volume.Detacher, error) { }, nil } +// TODO change CanAttach to return error to propagate ability +// to support Attachment or an error - see https://github.com/kubernetes/kubernetes/issues/74810 func (p *csiPlugin) CanAttach(spec *volume.Spec) bool { - if spec.PersistentVolume == nil { - klog.Error(log("plugin.CanAttach test failed, spec missing PersistentVolume")) + driverMode, err := p.getDriverMode(spec) + if err != nil { return false } - var driverName string - if spec.PersistentVolume.Spec.CSI != nil { - driverName = spec.PersistentVolume.Spec.CSI.Driver - } else { - klog.Error(log("plugin.CanAttach test failed, spec missing CSIPersistentVolume")) + if driverMode == ephemeralDriverMode { + klog.V(4).Info(log("driver ephemeral mode detected for spec %v", spec.Name)) return false } + pvSrc, err := getCSISourceFromSpec(spec) + if err != nil { + klog.Error(log("plugin.CanAttach failed to get info from spec: %s", err)) + return false + } + + driverName := pvSrc.Driver + skipAttach, err := p.skipAttach(driverName) - if err != nil { klog.Error(log("plugin.CanAttach error when calling plugin.skipAttach for driver %s: %s", driverName, err)) + return false } return !skipAttach @@ -675,6 +757,8 @@ func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapP return volume.NewSpecFromPersistentVolume(pv, false), nil } +// skipAttach looks up CSIDriver object associated with driver name +// to determine if driver requies attachment volume operation func (p *csiPlugin) skipAttach(driver string) (bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { return false, nil @@ -696,6 +780,26 @@ func (p *csiPlugin) skipAttach(driver string) (bool, error) { return false, nil } +// getDriverMode returns the driver mode for the specified spec: {persistent|ephemeral}. +// 1) If mode cannot be determined, it will default to "persistent". +// 2) If Mode cannot be resolved to either {persistent | ephemeral}, an error is returned +// See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/20190122-csi-inline-volumes.md +func (p *csiPlugin) getDriverMode(spec *volume.Spec) (driverMode, error) { + // TODO (vladimirvivien) ultimately, mode will be retrieved from CSIDriver.Spec.Mode. + // However, in alpha version, mode is determined by the volume source: + // 1) if volume.Spec.Volume.CSI != nil -> mode is ephemeral + // 2) if volume.Spec.PersistentVolume.Spec.CSI != nil -> persistent + volSrc, _, err := getSourceFromSpec(spec) + if err != nil { + return "", err + } + + if volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { + return ephemeralDriverMode, nil + } + return persistentDriverMode, nil +} + func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) { skip, err := p.skipAttach(driver) if err != nil { diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index 7e957a0997..a45e0dcb99 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -18,6 +18,7 @@ package csi import ( "fmt" + "math/rand" "os" "path" "path/filepath" @@ -99,6 +100,19 @@ func makeTestPV(name string, sizeGig int, driverName, volID string) *api.Persist } } +func makeTestVol(name string, driverName string) *api.Volume { + ro := false + return &api.Volume{ + Name: name, + VolumeSource: api.VolumeSource{ + CSI: &api.CSIVolumeSource{ + Driver: driverName, + ReadOnly: &ro, + }, + }, + } +} + func registerFakePlugin(pluginName, endpoint string, versions []string, t *testing.T) { highestSupportedVersions, err := highestSupportedVersion(versions) if err != nil { @@ -131,22 +145,105 @@ func TestPluginGetVolumeName(t *testing.T) { name string driverName string volName string + spec *volume.Spec shouldFail bool }{ - {"alphanum names", "testdr", "testvol", false}, - {"mixchar driver", "test.dr.cc", "testvol", false}, - {"mixchar volume", "testdr", "test-vol-name", false}, - {"mixchars all", "test-driver", "test.vol.name", false}, + { + name: "alphanum names", + driverName: "testdr", + volName: "testvol", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "testdr", "testvol"), false), + }, + { + name: "mixchar driver", + driverName: "test.dr.cc", + volName: "testvol", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "test.dr.cc", "testvol"), false), + }, + { + name: "mixchar volume", + driverName: "testdr", + volName: "test-vol-name", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "testdr", "test-vol-name"), false), + }, + { + name: "mixchars all", + driverName: "test-driver", + volName: "test.vol.name", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "test-driver", "test.vol.name"), false), + }, + { + name: "volume source with mixchars all", + driverName: "test-driver", + volName: "test.vol.name", + spec: volume.NewSpecFromVolume(makeTestVol("test-pv", "test-driver")), + shouldFail: true, // csi inline feature off + }, + { + name: "missing spec", + shouldFail: true, + }, } for _, tc := range testCases { t.Logf("testing: %s", tc.name) registerFakePlugin(tc.driverName, "endpoint", []string{"0.3.0"}, t) - pv := makeTestPV("test-pv", 10, tc.driverName, tc.volName) - spec := volume.NewSpecFromPersistentVolume(pv, false) - name, err := plug.GetVolumeName(spec) - if tc.shouldFail && err == nil { - t.Fatal("GetVolumeName should fail, but got err=nil") + name, err := plug.GetVolumeName(tc.spec) + if tc.shouldFail != (err != nil) { + t.Fatal("shouldFail does match expected error") + } + if tc.shouldFail && err != nil { + t.Log(err) + continue + } + if name != fmt.Sprintf("%s%s%s", tc.driverName, volNameSep, tc.volName) { + t.Errorf("unexpected volume name %s", name) + } + } +} + +func TestPluginGetVolumeNameWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + testCases := []struct { + name string + driverName string + volName string + shouldFail bool + spec *volume.Spec + }{ + { + name: "missing spec", + shouldFail: true, + }, + { + name: "alphanum names for pv", + driverName: "testdr", + volName: "testvol", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "testdr", "testvol"), false), + }, + { + name: "alphanum names for vol source", + driverName: "testdr", + volName: "testvol", + spec: volume.NewSpecFromVolume(makeTestVol("test-pv", "testdr")), + shouldFail: true, + }, + } + + for _, tc := range testCases { + t.Logf("testing: %s", tc.name) + registerFakePlugin(tc.driverName, "endpoint", []string{"0.3.0"}, t) + name, err := plug.GetVolumeName(tc.spec) + if tc.shouldFail != (err != nil) { + t.Fatal("shouldFail does match expected error") + } + if tc.shouldFail && err != nil { + t.Log(err) + continue } if name != fmt.Sprintf("%s%s%s", tc.driverName, volNameSep, tc.volName) { t.Errorf("unexpected volume name %s", name) @@ -157,15 +254,79 @@ func TestPluginGetVolumeName(t *testing.T) { func TestPluginCanSupport(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + tests := []struct { + name string + spec *volume.Spec + canSupport bool + }{ + { + name: "no spec provided", + canSupport: false, + }, + { + name: "can support volume source", + spec: volume.NewSpecFromVolume(makeTestVol("test-vol", testDriver)), + canSupport: false, // csi inline not enabled + }, + { + name: "can support persistent volume source", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 20, testDriver, testVol), true), + canSupport: true, + }, + } + plug, tmpDir := newTestPlugin(t, nil) defer os.RemoveAll(tmpDir) - registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) - pv := makeTestPV("test-pv", 10, testDriver, testVol) - spec := volume.NewSpecFromPersistentVolume(pv, false) - if !plug.CanSupport(spec) { - t.Errorf("should support CSI spec") + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + actual := plug.CanSupport(tc.spec) + if tc.canSupport != actual { + t.Errorf("expecting canSupport %t, got %t", tc.canSupport, actual) + } + }) + } +} + +func TestPluginCanSupportWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + + tests := []struct { + name string + spec *volume.Spec + canSupport bool + }{ + { + name: "no spec provided", + canSupport: false, + }, + { + name: "can support volume source", + spec: volume.NewSpecFromVolume(makeTestVol("test-vol", testDriver)), + canSupport: true, + }, + { + name: "can support persistent volume source", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 20, testDriver, testVol), true), + canSupport: true, + }, + } + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + actual := plug.CanSupport(tc.spec) + if tc.canSupport != actual { + t.Errorf("expecting canSupport %t, got %t", tc.canSupport, actual) + } + }) } } @@ -177,108 +338,437 @@ func TestPluginConstructVolumeSpec(t *testing.T) { testCases := []struct { name string + originSpec *volume.Spec specVolID string - data map[string]string + volHandle string + podUID types.UID shouldFail bool }{ { - name: "valid spec name", - specVolID: "test.vol.id", - data: map[string]string{volDataKey.specVolID: "test.vol.id", volDataKey.volHandle: "test-vol0", volDataKey.driverName: "test-driver0"}, + name: "construct spec1 from original persistent spec", + specVolID: "test.vol.id", + volHandle: "testvol-handle1", + originSpec: volume.NewSpecFromPersistentVolume(makeTestPV("test.vol.id", 20, testDriver, "testvol-handle1"), true), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + }, + { + name: "construct spec2 from original persistent spec", + specVolID: "spec2", + volHandle: "handle2", + originSpec: volume.NewSpecFromPersistentVolume(makeTestPV("spec2", 20, testDriver, "handle2"), true), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + }, + { + name: "construct spec from original volume spec", + specVolID: "volspec", + originSpec: volume.NewSpecFromVolume(makeTestVol("spec2", testDriver)), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + shouldFail: true, // csi inline off }, } - for _, tc := range testCases { - t.Logf("test case: %s", tc.name) - dir := getTargetPath(testPodUID, tc.specVolID, plug.host) + registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) - // create the data file - if tc.data != nil { - mountDir := path.Join(getTargetPath(testPodUID, tc.specVolID, plug.host), "/mount") - if err := os.MkdirAll(mountDir, 0755); err != nil && !os.IsNotExist(err) { - t.Errorf("failed to create dir [%s]: %v", mountDir, err) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mounter, err := plug.NewMounter( + tc.originSpec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if tc.shouldFail && err != nil { + t.Log(err) + return } - if err := saveVolumeData(path.Dir(mountDir), volDataFileName, tc.data); err != nil { + if !tc.shouldFail && err != nil { t.Fatal(err) } - } - - // rebuild spec - spec, err := plug.ConstructVolumeSpec("test-pv", dir) - if tc.shouldFail { - if err == nil { - t.Fatal("expecting ConstructVolumeSpec to fail, but got nil error") + if mounter == nil { + t.Fatal("failed to create CSI mounter") } - continue - } + csiMounter := mounter.(*csiMountMgr) - volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle - if volHandle != tc.data[volDataKey.volHandle] { - t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle) - } + // rebuild spec + spec, err := plug.ConstructVolumeSpec("test-pv", path.Dir(csiMounter.GetPath())) + if err != nil { + t.Fatal(err) + } + if spec == nil { + t.Fatal("nil volume.Spec contstructed") + } - if spec.PersistentVolume.Spec.VolumeMode == nil { - t.Fatalf("Volume mode has not been set.") - } + // inspect spec + if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.CSI == nil { + t.Fatal("CSIPersistentVolume not found in constructed spec ") + } - if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem { - t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode) - } + volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle + if volHandle != tc.originSpec.PersistentVolume.Spec.CSI.VolumeHandle { + t.Error("unexpected volumeHandle constructed:", volHandle) + } + driverName := spec.PersistentVolume.Spec.CSI.Driver + if driverName != tc.originSpec.PersistentVolume.Spec.CSI.Driver { + t.Error("unexpected driverName constructed:", driverName) + } - if spec.Name() != tc.specVolID { - t.Errorf("Unexpected spec name %s", spec.Name()) - } + if spec.PersistentVolume.Spec.VolumeMode == nil { + t.Fatalf("Volume mode has not been set.") + } + + if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem { + t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode) + } + + if spec.Name() != tc.specVolID { + t.Errorf("Unexpected spec name constructed %s", spec.Name()) + } + + }) + } +} + +func TestPluginConstructVolumeSpecWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + testCases := []struct { + name string + originSpec *volume.Spec + specVolID string + volHandle string + podUID types.UID + shouldFail bool + }{ + { + name: "construct spec1 from persistent spec", + specVolID: "test.vol.id", + volHandle: "testvol-handle1", + originSpec: volume.NewSpecFromPersistentVolume(makeTestPV("test.vol.id", 20, testDriver, "testvol-handle1"), true), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + }, + { + name: "construct spec2 from persistent spec", + specVolID: "spec2", + volHandle: "handle2", + originSpec: volume.NewSpecFromPersistentVolume(makeTestPV("spec2", 20, testDriver, "handle2"), true), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + }, + { + name: "construct spec from volume spec", + specVolID: "volspec", + originSpec: volume.NewSpecFromVolume(makeTestVol("volspec", testDriver)), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + }, + { + name: "construct spec from volume spec2", + specVolID: "volspec2", + originSpec: volume.NewSpecFromVolume(makeTestVol("volspec2", testDriver)), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + }, + { + name: "missing spec", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + shouldFail: true, + }, + } + + registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mounter, err := plug.NewMounter( + tc.originSpec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if tc.shouldFail && err != nil { + t.Log(err) + return + } + if !tc.shouldFail && err != nil { + t.Fatal(err) + } + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + csiMounter := mounter.(*csiMountMgr) + + // rebuild spec + spec, err := plug.ConstructVolumeSpec("test-pv", path.Dir(csiMounter.GetPath())) + if err != nil { + t.Fatal(err) + } + if spec == nil { + t.Fatal("nil volume.Spec contstructed") + } + + if spec.Name() != tc.specVolID { + t.Errorf("unexpected spec name constructed volume.Spec: %s", spec.Name()) + } + + switch { + case spec.Volume != nil: + if spec.Volume.CSI == nil { + t.Error("missing CSIVolumeSource in constructed volume.Spec") + } + if spec.Volume.CSI.Driver != tc.originSpec.Volume.CSI.Driver { + t.Error("unexpected driver in constructed volume source:", spec.Volume.CSI.Driver) + } + + case spec.PersistentVolume != nil: + if spec.PersistentVolume.Spec.CSI == nil { + t.Fatal("missing CSIPersistentVolumeSource in constructed volume.spec") + } + volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle + if volHandle != tc.originSpec.PersistentVolume.Spec.CSI.VolumeHandle { + t.Error("unexpected volumeHandle constructed in persistent volume source:", volHandle) + } + driverName := spec.PersistentVolume.Spec.CSI.Driver + if driverName != tc.originSpec.PersistentVolume.Spec.CSI.Driver { + t.Error("unexpected driverName constructed in persistent volume source:", driverName) + } + if spec.PersistentVolume.Spec.VolumeMode == nil { + t.Fatalf("Volume mode has not been set.") + } + if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem { + t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode) + } + default: + t.Fatal("invalid volume.Spec constructed") + } + + }) } } func TestPluginNewMounter(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() - plug, tmpDir := newTestPlugin(t, nil) - defer os.RemoveAll(tmpDir) - - registerFakePlugin(testDriver, "endpoint", []string{"1.2.0"}, t) - pv := makeTestPV("test-pv", 10, testDriver, testVol) - mounter, err := plug.NewMounter( - volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), - &api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, - volume.VolumeOptions{}, - ) - if err != nil { - t.Fatalf("Failed to make a new Mounter: %v", err) + tests := []struct { + name string + spec *volume.Spec + podUID types.UID + namespace string + driverMode driverMode + shouldFail bool + }{ + { + name: "mounter from persistent volume source", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv1", 20, testDriver, testVol), true), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + namespace: "test-ns1", + driverMode: persistentDriverMode, + }, + { + name: "mounter from volume source", + spec: volume.NewSpecFromVolume(makeTestVol("test-vol1", testDriver)), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + namespace: "test-ns2", + driverMode: ephemeralDriverMode, + shouldFail: true, // csi inline not enabled + }, + { + name: "mounter from no spec provided", + shouldFail: true, + }, } - if mounter == nil { - t.Fatal("failed to create CSI mounter") - } - csiMounter := mounter.(*csiMountMgr) + for _, test := range tests { + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) - // validate mounter fields - if string(csiMounter.driverName) != testDriver { - t.Error("mounter driver name not set") + registerFakePlugin(testDriver, "endpoint", []string{"1.2.0"}, t) + + t.Run(test.name, func(t *testing.T) { + mounter, err := plug.NewMounter( + test.spec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: test.podUID, Namespace: test.namespace}}, + volume.VolumeOptions{}, + ) + if test.shouldFail != (err != nil) { + t.Fatal("Unexpected error:", err) + } + if test.shouldFail && err != nil { + t.Log(err) + return + } + + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + csiMounter := mounter.(*csiMountMgr) + + // validate mounter fields + if string(csiMounter.driverName) != testDriver { + t.Error("mounter driver name not set") + } + if csiMounter.volumeID == "" { + t.Error("mounter volume id not set") + } + if csiMounter.pod == nil { + t.Error("mounter pod not set") + } + if string(csiMounter.podUID) != string(test.podUID) { + t.Error("mounter podUID not set") + } + if csiMounter.csiClient == nil { + t.Error("mounter csiClient is nil") + } + if csiMounter.driverMode != test.driverMode { + t.Error("unexpected driver mode:", csiMounter.driverMode) + } + + // ensure data file is created + dataDir := path.Dir(mounter.GetPath()) + dataFile := filepath.Join(dataDir, volDataFileName) + if _, err := os.Stat(dataFile); err != nil { + if os.IsNotExist(err) { + t.Errorf("data file not created %s", dataFile) + } else { + t.Fatal(err) + } + } + data, err := loadVolumeData(dataDir, volDataFileName) + if err != nil { + t.Fatal(err) + } + if data[volDataKey.specVolID] != csiMounter.spec.Name() { + t.Error("volume data file unexpected specVolID:", data[volDataKey.specVolID]) + } + if data[volDataKey.volHandle] != csiMounter.volumeID { + t.Error("volume data file unexpected volHandle:", data[volDataKey.volHandle]) + } + if data[volDataKey.driverName] != string(csiMounter.driverName) { + t.Error("volume data file unexpected driverName:", data[volDataKey.driverName]) + } + if data[volDataKey.nodeName] != string(csiMounter.plugin.host.GetNodeName()) { + t.Error("volume data file unexpected nodeName:", data[volDataKey.nodeName]) + } + if data[volDataKey.driverMode] != string(test.driverMode) { + t.Error("volume data file unexpected driverMode:", data[volDataKey.driverMode]) + } + }) } - if csiMounter.volumeID != testVol { - t.Error("mounter volume id not set") - } - if csiMounter.pod == nil { - t.Error("mounter pod not set") - } - if csiMounter.podUID == types.UID("") { - t.Error("mounter podUID not set") - } - if csiMounter.csiClient == nil { - t.Error("mounter csiClient is nil") +} + +func TestPluginNewMounterWithInline(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() + tests := []struct { + name string + spec *volume.Spec + podUID types.UID + namespace string + driverMode driverMode + shouldFail bool + }{ + { + name: "mounter with missing spec", + shouldFail: true, + }, + { + name: "mounter with spec with both volSrc and pvSrc", + spec: &volume.Spec{ + Volume: makeTestVol("test-vol1", testDriver), + PersistentVolume: makeTestPV("test-pv1", 20, testDriver, testVol), + ReadOnly: true, + }, + shouldFail: true, + }, + { + name: "mounter with persistent volume source", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv1", 20, testDriver, testVol), true), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + namespace: "test-ns1", + driverMode: persistentDriverMode, + }, + { + name: "mounter with volume source", + spec: volume.NewSpecFromVolume(makeTestVol("test-vol1", testDriver)), + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + namespace: "test-ns2", + driverMode: ephemeralDriverMode, + }, } - // ensure data file is created - dataDir := path.Dir(mounter.GetPath()) - dataFile := filepath.Join(dataDir, volDataFileName) - if _, err := os.Stat(dataFile); err != nil { - if os.IsNotExist(err) { - t.Errorf("data file not created %s", dataFile) - } else { - t.Fatal(err) - } + for _, test := range tests { + plug, tmpDir := newTestPlugin(t, nil) + defer os.RemoveAll(tmpDir) + + registerFakePlugin(testDriver, "endpoint", []string{"1.2.0"}, t) + + t.Run(test.name, func(t *testing.T) { + mounter, err := plug.NewMounter( + test.spec, + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: test.podUID, Namespace: test.namespace}}, + volume.VolumeOptions{}, + ) + if test.shouldFail != (err != nil) { + t.Fatal("Unexpected error:", err) + } + if test.shouldFail && err != nil { + t.Log(err) + return + } + + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + csiMounter := mounter.(*csiMountMgr) + + // validate mounter fields + if string(csiMounter.driverName) != testDriver { + t.Error("mounter driver name not set") + } + if csiMounter.volumeID == "" { + t.Error("mounter volume id not set") + } + if csiMounter.pod == nil { + t.Error("mounter pod not set") + } + if string(csiMounter.podUID) != string(test.podUID) { + t.Error("mounter podUID not set") + } + if csiMounter.csiClient == nil { + t.Error("mounter csiClient is nil") + } + if csiMounter.driverMode != test.driverMode { + t.Error("unexpected driver mode:", csiMounter.driverMode) + } + + // ensure data file is created + dataDir := path.Dir(mounter.GetPath()) + dataFile := filepath.Join(dataDir, volDataFileName) + if _, err := os.Stat(dataFile); err != nil { + if os.IsNotExist(err) { + t.Errorf("data file not created %s", dataFile) + } else { + t.Fatal(err) + } + } + data, err := loadVolumeData(dataDir, volDataFileName) + if err != nil { + t.Fatal(err) + } + if data[volDataKey.specVolID] != csiMounter.spec.Name() { + t.Error("volume data file unexpected specVolID:", data[volDataKey.specVolID]) + } + if data[volDataKey.volHandle] != csiMounter.volumeID { + t.Error("volume data file unexpected volHandle:", data[volDataKey.volHandle]) + } + if data[volDataKey.driverName] != string(csiMounter.driverName) { + t.Error("volume data file unexpected driverName:", data[volDataKey.driverName]) + } + if data[volDataKey.nodeName] != string(csiMounter.plugin.host.GetNodeName()) { + t.Error("volume data file unexpected nodeName:", data[volDataKey.nodeName]) + } + if data[volDataKey.driverMode] != string(csiMounter.driverMode) { + t.Error("volume data file unexpected driverMode:", data[volDataKey.driverMode]) + } + }) } } @@ -371,25 +861,36 @@ func TestPluginNewDetacher(t *testing.T) { } func TestPluginCanAttach(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)() + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() tests := []struct { name string driverName string + spec *volume.Spec canAttach bool }{ { - name: "attachable", - driverName: "attachble-driver", + name: "non-attachable inline", + driverName: "attachable-inline", + spec: volume.NewSpecFromVolume(makeTestVol("test-vol", "attachable-inline")), + canAttach: false, + }, + { + name: "attachable PV", + driverName: "attachable-pv", + spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-vol", 20, "attachable-pv", testVol), true), canAttach: true, }, } for _, test := range tests { + csiDriver := getCSIDriver(test.driverName, nil, &test.canAttach) t.Run(test.name, func(t *testing.T) { - plug, tmpDir := newTestPlugin(t, nil) + fakeCSIClient := fakeclient.NewSimpleClientset(csiDriver) + plug, tmpDir := newTestPlugin(t, fakeCSIClient) defer os.RemoveAll(tmpDir) - spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driverName, "test-vol"), false) - pluginCanAttach := plug.CanAttach(spec) + pluginCanAttach := plug.CanAttach(test.spec) if pluginCanAttach != test.canAttach { t.Fatalf("expecting plugin.CanAttach %t got %t", test.canAttach, pluginCanAttach) return diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index 5d343e4c59..393a2cd527 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -25,8 +25,10 @@ import ( api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" "k8s.io/klog" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" utilstrings "k8s.io/utils/strings" ) @@ -90,12 +92,7 @@ func loadVolumeData(dir string, fileName string) (map[string]string, error) { } func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) { - if spec.PersistentVolume != nil && - spec.PersistentVolume.Spec.CSI != nil { - return spec.PersistentVolume.Spec.CSI, nil - } - - return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") + return getPVSourceFromSpec(spec) } func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) { @@ -140,3 +137,34 @@ func hasReadWriteOnce(modes []api.PersistentVolumeAccessMode) bool { } return false } + +// getSourceFromSpec returns either CSIVolumeSource or CSIPersistentVolumeSource, but not both +func getSourceFromSpec(spec *volume.Spec) (*api.CSIVolumeSource, *api.CSIPersistentVolumeSource, error) { + if spec == nil { + return nil, nil, fmt.Errorf("volume.Spec nil") + } + if spec.Volume != nil && spec.PersistentVolume != nil { + return nil, nil, fmt.Errorf("volume.Spec has both volume and persistent volume sources") + } + if spec.Volume != nil && spec.Volume.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { + return spec.Volume.CSI, nil, nil + } + if spec.PersistentVolume != nil && + spec.PersistentVolume.Spec.CSI != nil { + return nil, spec.PersistentVolume.Spec.CSI, nil + } + + return nil, nil, fmt.Errorf("volume source not found in volume.Spec") +} + +// getPVSourceFromSpec ensures only CSIPersistentVolumeSource is present in volume.Spec +func getPVSourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) { + volSrc, pvSrc, err := getSourceFromSpec(spec) + if err != nil { + return nil, err + } + if volSrc != nil { + return nil, fmt.Errorf("unexpected api.CSIVolumeSource found in volume.Spec") + } + return pvSrc, nil +} diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index f674fdd440..ac826aa1af 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -57,9 +57,12 @@ func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts } type CSIVolume struct { - VolumeContext map[string]string - Path string - MountFlags []string + VolumeHandle string + VolumeContext map[string]string + Path string + DeviceMountPath string + FSType string + MountFlags []string } // NodeClient returns CSI node client @@ -118,7 +121,6 @@ func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeCo // NodePublishVolume implements CSI NodePublishVolume func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodePublishVolumeResponse, error) { - if f.nextErr != nil { return nil, f.nextErr } @@ -135,9 +137,12 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli return nil, errors.New("invalid fstype") } f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{ - Path: req.GetTargetPath(), - VolumeContext: req.GetVolumeContext(), - MountFlags: req.GetVolumeCapability().GetMount().MountFlags, + VolumeHandle: req.GetVolumeId(), + Path: req.GetTargetPath(), + DeviceMountPath: req.GetStagingTargetPath(), + VolumeContext: req.GetVolumeContext(), + FSType: req.GetVolumeCapability().GetMount().GetFsType(), + MountFlags: req.GetVolumeCapability().GetMount().MountFlags, } return &csipb.NodePublishVolumeResponse{}, nil }