Modify CSI to handle both 0.3 and 1.0

Modify the CSI volume plugin to handle CSI version 0.x as well as 1.x
pull/58/head
saad-ali 2018-11-21 03:02:15 -08:00
parent d1b44857ad
commit aa8244beb5
21 changed files with 5858 additions and 143 deletions

View File

@ -394,6 +394,7 @@ pkg/version/verflag
pkg/volume
pkg/volume/azure_dd
pkg/volume/azure_file
pkg/volume/csi/csiv0
pkg/volume/csi/fake
pkg/volume/git_repo
pkg/volume/host_path

View File

@ -16,6 +16,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/csiv0:go_default_library",
"//pkg/volume/csi/nodeinfomanager:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -341,7 +341,11 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}()
if c.csiClient == nil {
c.csiClient = newCsiDriverClient(csiSource.Driver)
c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
if err != nil {
klog.Errorf(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
return err
}
}
csi := c.csiClient
@ -360,7 +364,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
// Start MountDevice
nodeName := string(c.plugin.host.GetNodeName())
publishVolumeInfo, err := c.plugin.getPublishVolumeInfo(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
nodeStageSecrets := map[string]string{}
if csiSource.NodeStageSecretRef != nil {
@ -381,7 +385,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
fsType := csiSource.FSType
err = csi.NodeStageVolume(ctx,
csiSource.VolumeHandle,
publishVolumeInfo,
publishContext,
deviceMountPath,
fsType,
accessMode,
@ -521,7 +525,11 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
}
if c.csiClient == nil {
c.csiClient = newCsiDriverClient(driverName)
c.csiClient, err = newCsiDriverClient(csiDriverName(driverName))
if err != nil {
klog.Errorf(log("attacher.UnmountDevice failed to create newCsiDriverClient: %v", err))
return err
}
}
csi := c.csiClient

View File

@ -40,7 +40,7 @@ type csiBlockMapper struct {
k8s kubernetes.Interface
csiClient csiClient
plugin *csiPlugin
driverName string
driverName csiDriverName
specName string
volumeID string
readOnly bool

View File

@ -33,7 +33,8 @@ import (
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) {
func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) {
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV(specVolumeName, 10, testDriver, testVol)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
mapper, err := plug.NewBlockVolumeMapper(
@ -73,7 +74,7 @@ func TestBlockMapperGetGlobalMapPath(t *testing.T) {
}
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
csiMapper, spec, _, err := prepareBlockMapperTest(plug, tc.specVolumeName)
csiMapper, spec, _, err := prepareBlockMapperTest(plug, tc.specVolumeName, t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}
@ -113,7 +114,7 @@ func TestBlockMapperGetStagingPath(t *testing.T) {
}
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName)
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName, t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}
@ -150,7 +151,7 @@ func TestBlockMapperGetPublishPath(t *testing.T) {
}
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName)
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName, t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}
@ -187,7 +188,7 @@ func TestBlockMapperGetDeviceMapPath(t *testing.T) {
}
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName)
csiMapper, _, _, err := prepareBlockMapperTest(plug, tc.specVolumeName, t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}
@ -219,7 +220,7 @@ func TestBlockMapperSetupDevice(t *testing.T) {
)
plug.host = host
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv")
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}
@ -229,7 +230,7 @@ func TestBlockMapperSetupDevice(t *testing.T) {
csiMapper.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName))
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = true
_, err = csiMapper.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
@ -286,7 +287,7 @@ func TestBlockMapperMapDevice(t *testing.T) {
)
plug.host = host
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv")
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}
@ -296,7 +297,7 @@ func TestBlockMapperMapDevice(t *testing.T) {
csiMapper.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMapper.volumeID, csiMapper.driverName, string(nodeName))
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = true
_, err = csiMapper.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
@ -369,7 +370,7 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
)
plug.host = host
_, spec, pv, err := prepareBlockMapperTest(plug, "test-pv")
_, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err)
}

View File

@ -24,12 +24,14 @@ import (
"net"
"time"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
utilversion "k8s.io/apimachinery/pkg/util/version"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
csipbv0 "k8s.io/kubernetes/pkg/volume/csi/csiv0"
)
type csiClient interface {
@ -45,7 +47,7 @@ type csiClient interface {
stagingTargetPath string,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
publishContext map[string]string,
volumeContext map[string]string,
secrets map[string]string,
fsType string,
@ -69,42 +71,104 @@ type csiClient interface {
NodeSupportsStageUnstage(ctx context.Context) (bool, error)
}
// Strongly typed address
type csiAddr string
// Strongly typed driver name
type csiDriverName string
// csiClient encapsulates all csi-plugin methods
type csiDriverClient struct {
driverName string
nodeClientCreator nodeClientCreator
driverName csiDriverName
addr csiAddr
nodeV1ClientCreator nodeV1ClientCreator
nodeV0ClientCreator nodeV0ClientCreator
}
var _ csiClient = &csiDriverClient{}
type nodeClientCreator func(driverName string) (
nodeClient csipb.NodeClient,
type nodeV1ClientCreator func(addr csiAddr) (
nodeClient csipbv1.NodeClient,
closer io.Closer,
err error,
)
// newNodeClient creates a new NodeClient with the internally used gRPC
type nodeV0ClientCreator func(addr csiAddr) (
nodeClient csipbv0.NodeClient,
closer io.Closer,
err error,
)
// newV1NodeClient creates a new NodeClient with the internally used gRPC
// connection set up. It also returns a closer which must to be called to close
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeClientCreator, used in
// This is the default implementation for the nodeV1ClientCreator, used in
// newCsiDriverClient.
func newNodeClient(driverName string) (nodeClient csipb.NodeClient, closer io.Closer, err error) {
func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(driverName)
conn, err = newGrpcConn(addr)
if err != nil {
return nil, nil, err
}
nodeClient = csipb.NewNodeClient(conn)
nodeClient = csipbv1.NewNodeClient(conn)
return nodeClient, conn, nil
}
func newCsiDriverClient(driverName string) *csiDriverClient {
c := &csiDriverClient{
driverName: driverName,
nodeClientCreator: newNodeClient,
// newV0NodeClient creates a new NodeClient with the internally used gRPC
// connection set up. It also returns a closer which must to be called to close
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeV1ClientCreator, used in
// newCsiDriverClient.
func newV0NodeClient(addr csiAddr) (nodeClient csipbv0.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(addr)
if err != nil {
return nil, nil, err
}
return c
nodeClient = csipbv0.NewNodeClient(conn)
return nodeClient, conn, nil
}
func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
if driverName == "" {
return nil, fmt.Errorf("driver name is empty")
}
addr := fmt.Sprintf(csiAddrTemplate, driverName)
requiresV0Client := true
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) {
var existingDriver csiDriver
driverExists := false
func() {
csiDrivers.RLock()
defer csiDrivers.RUnlock()
existingDriver, driverExists = csiDrivers.driversMap[string(driverName)]
}()
if !driverExists {
return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
}
addr = existingDriver.driverEndpoint
requiresV0Client = versionRequiresV0Client(existingDriver.highestSupportedVersion)
}
nodeV1ClientCreator := newV1NodeClient
nodeV0ClientCreator := newV0NodeClient
if requiresV0Client {
nodeV1ClientCreator = nil
} else {
nodeV0ClientCreator = nil
}
return &csiDriverClient{
driverName: driverName,
addr: csiAddr(addr),
nodeV1ClientCreator: nodeV1ClientCreator,
nodeV0ClientCreator: nodeV0ClientCreator,
}, nil
}
func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
@ -113,14 +177,54 @@ func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
accessibleTopology map[string]string,
err error) {
klog.V(4).Info(log("calling NodeGetInfo rpc"))
if c.nodeV1ClientCreator != nil {
return c.nodeGetInfoV1(ctx)
} else if c.nodeV0ClientCreator != nil {
return c.nodeGetInfoV0(ctx)
}
nodeClient, closer, err := c.nodeClientCreator(c.driverName)
err = fmt.Errorf("failed to call NodeGetInfo. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
return nodeID, maxVolumePerNode, accessibleTopology, err
}
func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
accessibleTopology map[string]string,
err error) {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return "", 0, nil, err
}
defer closer.Close()
res, err := nodeClient.NodeGetInfo(ctx, &csipb.NodeGetInfoRequest{})
res, err := nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
if err != nil {
return "", 0, nil, err
}
topology := res.GetAccessibleTopology()
if topology != nil {
accessibleTopology = topology.Segments
}
return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
}
func (c *csiDriverClient) nodeGetInfoV0(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
accessibleTopology map[string]string,
err error) {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return "", 0, nil, err
}
defer closer.Close()
res, err := nodeClient.NodeGetInfo(ctx, &csipbv0.NodeGetInfoRequest{})
if err != nil {
return "", 0, nil, err
}
@ -139,7 +243,7 @@ func (c *csiDriverClient) NodePublishVolume(
stagingTargetPath string,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
publishContext map[string]string,
volumeContext map[string]string,
secrets map[string]string,
fsType string,
@ -152,23 +256,69 @@ func (c *csiDriverClient) NodePublishVolume(
if targetPath == "" {
return errors.New("missing target path")
}
if c.nodeV1ClientCreator != nil {
return c.nodePublishVolumeV1(
ctx,
volID,
readOnly,
stagingTargetPath,
targetPath,
accessMode,
publishContext,
volumeContext,
secrets,
fsType,
mountOptions,
)
} else if c.nodeV0ClientCreator != nil {
return c.nodePublishVolumeV0(
ctx,
volID,
readOnly,
stagingTargetPath,
targetPath,
accessMode,
publishContext,
volumeContext,
secrets,
fsType,
mountOptions,
)
}
nodeClient, closer, err := c.nodeClientCreator(c.driverName)
return fmt.Errorf("failed to call NodePublishVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodePublishVolumeV1(
ctx context.Context,
volID string,
readOnly bool,
stagingTargetPath string,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
publishContext map[string]string,
volumeContext map[string]string,
secrets map[string]string,
fsType string,
mountOptions []string,
) error {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipb.NodePublishVolumeRequest{
req := &csipbv1.NodePublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishContext: volumeInfo,
PublishContext: publishContext,
VolumeContext: volumeContext,
Secrets: secrets,
VolumeCapability: &csipb.VolumeCapability{
AccessMode: &csipb.VolumeCapability_AccessMode{
Mode: asCSIAccessMode(accessMode),
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),
},
},
}
@ -177,12 +327,65 @@ func (c *csiDriverClient) NodePublishVolume(
}
if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csipb.VolumeCapability_Block{
Block: &csipb.VolumeCapability_BlockVolume{},
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
Block: &csipbv1.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csipb.VolumeCapability_Mount{
Mount: &csipb.VolumeCapability_MountVolume{
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
Mount: &csipbv1.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
}
}
_, err = nodeClient.NodePublishVolume(ctx, req)
return err
}
func (c *csiDriverClient) nodePublishVolumeV0(
ctx context.Context,
volID string,
readOnly bool,
stagingTargetPath string,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
publishContext map[string]string,
volumeContext map[string]string,
secrets map[string]string,
fsType string,
mountOptions []string,
) error {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipbv0.NodePublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishInfo: publishContext,
VolumeAttributes: volumeContext,
NodePublishSecrets: secrets,
VolumeCapability: &csipbv0.VolumeCapability{
AccessMode: &csipbv0.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV0(accessMode),
},
},
}
if stagingTargetPath != "" {
req.StagingTargetPath = stagingTargetPath
}
if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Block{
Block: &csipbv0.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Mount{
Mount: &csipbv0.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
@ -202,13 +405,39 @@ func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string,
return errors.New("missing target path")
}
nodeClient, closer, err := c.nodeClientCreator(c.driverName)
if c.nodeV1ClientCreator != nil {
return c.nodeUnpublishVolumeV1(ctx, volID, targetPath)
} else if c.nodeV0ClientCreator != nil {
return c.nodeUnpublishVolumeV0(ctx, volID, targetPath)
}
return fmt.Errorf("failed to call NodeUnpublishVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodeUnpublishVolumeV1(ctx context.Context, volID string, targetPath string) error {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipb.NodeUnpublishVolumeRequest{
req := &csipbv1.NodeUnpublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
}
_, err = nodeClient.NodeUnpublishVolume(ctx, req)
return err
}
func (c *csiDriverClient) nodeUnpublishVolumeV0(ctx context.Context, volID string, targetPath string) error {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipbv0.NodeUnpublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
}
@ -234,19 +463,38 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
return errors.New("missing staging target path")
}
nodeClient, closer, err := c.nodeClientCreator(c.driverName)
if c.nodeV1ClientCreator != nil {
return c.nodeStageVolumeV1(ctx, volID, publishContext, stagingTargetPath, fsType, accessMode, secrets, volumeContext)
} else if c.nodeV0ClientCreator != nil {
return c.nodeStageVolumeV0(ctx, volID, publishContext, stagingTargetPath, fsType, accessMode, secrets, volumeContext)
}
return fmt.Errorf("failed to call NodeStageVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodeStageVolumeV1(
ctx context.Context,
volID string,
publishContext map[string]string,
stagingTargetPath string,
fsType string,
accessMode api.PersistentVolumeAccessMode,
secrets map[string]string,
volumeContext map[string]string,
) error {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipb.NodeStageVolumeRequest{
req := &csipbv1.NodeStageVolumeRequest{
VolumeId: volID,
PublishContext: publishContext,
StagingTargetPath: stagingTargetPath,
VolumeCapability: &csipb.VolumeCapability{
AccessMode: &csipb.VolumeCapability_AccessMode{
Mode: asCSIAccessMode(accessMode),
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),
},
},
Secrets: secrets,
@ -254,12 +502,57 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
}
if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csipb.VolumeCapability_Block{
Block: &csipb.VolumeCapability_BlockVolume{},
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
Block: &csipbv1.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csipb.VolumeCapability_Mount{
Mount: &csipb.VolumeCapability_MountVolume{
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
Mount: &csipbv1.VolumeCapability_MountVolume{
FsType: fsType,
},
}
}
_, err = nodeClient.NodeStageVolume(ctx, req)
return err
}
func (c *csiDriverClient) nodeStageVolumeV0(
ctx context.Context,
volID string,
publishContext map[string]string,
stagingTargetPath string,
fsType string,
accessMode api.PersistentVolumeAccessMode,
secrets map[string]string,
volumeContext map[string]string,
) error {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipbv0.NodeStageVolumeRequest{
VolumeId: volID,
PublishInfo: publishContext,
StagingTargetPath: stagingTargetPath,
VolumeCapability: &csipbv0.VolumeCapability{
AccessMode: &csipbv0.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV0(accessMode),
},
},
NodeStageSecrets: secrets,
VolumeAttributes: volumeContext,
}
if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Block{
Block: &csipbv0.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Mount{
Mount: &csipbv0.VolumeCapability_MountVolume{
FsType: fsType,
},
}
@ -278,13 +571,38 @@ func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingT
return errors.New("missing staging target path")
}
nodeClient, closer, err := c.nodeClientCreator(c.driverName)
if c.nodeV1ClientCreator != nil {
return c.nodeUnstageVolumeV1(ctx, volID, stagingTargetPath)
} else if c.nodeV0ClientCreator != nil {
return c.nodeUnstageVolumeV0(ctx, volID, stagingTargetPath)
}
return fmt.Errorf("failed to call NodeUnstageVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodeUnstageVolumeV1(ctx context.Context, volID, stagingTargetPath string) error {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipb.NodeUnstageVolumeRequest{
req := &csipbv1.NodeUnstageVolumeRequest{
VolumeId: volID,
StagingTargetPath: stagingTargetPath,
}
_, err = nodeClient.NodeUnstageVolume(ctx, req)
return err
}
func (c *csiDriverClient) nodeUnstageVolumeV0(ctx context.Context, volID, stagingTargetPath string) error {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipbv0.NodeUnstageVolumeRequest{
VolumeId: volID,
StagingTargetPath: stagingTargetPath,
}
@ -295,13 +613,23 @@ func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingT
func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsStageUnstage"))
nodeClient, closer, err := c.nodeClientCreator(c.driverName)
if c.nodeV1ClientCreator != nil {
return c.nodeSupportsStageUnstageV1(ctx)
} else if c.nodeV0ClientCreator != nil {
return c.nodeSupportsStageUnstageV0(ctx)
}
return false, fmt.Errorf("failed to call NodeSupportsStageUnstage. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodeSupportsStageUnstageV1(ctx context.Context) (bool, error) {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return false, err
}
defer closer.Close()
req := &csipb.NodeGetCapabilitiesRequest{}
req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
@ -314,49 +642,81 @@ func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, e
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipb.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
stageUnstageSet = true
}
}
return stageUnstageSet, nil
}
func asCSIAccessMode(am api.PersistentVolumeAccessMode) csipb.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
return csipb.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.ReadOnlyMany:
return csipb.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.ReadWriteMany:
return csipb.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
func (c *csiDriverClient) nodeSupportsStageUnstageV0(ctx context.Context) (bool, error) {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return false, err
}
return csipb.VolumeCapability_AccessMode_UNKNOWN
defer closer.Close()
req := &csipbv0.NodeGetCapabilitiesRequest{}
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}
capabilities := resp.GetCapabilities()
stageUnstageSet := false
if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv0.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
stageUnstageSet = true
}
}
return stageUnstageSet, nil
}
func newGrpcConn(driverName string) (*grpc.ClientConn, error) {
if driverName == "" {
return nil, fmt.Errorf("driver name is empty")
func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.ReadOnlyMany:
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.ReadWriteMany:
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
addr := fmt.Sprintf(csiAddrTemplate, driverName)
// TODO once KubeletPluginsWatcher graduates to beta, remove FeatureGate check
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) {
csiDrivers.RLock()
driver, ok := csiDrivers.driversMap[driverName]
csiDrivers.RUnlock()
return csipbv1.VolumeCapability_AccessMode_UNKNOWN
}
if !ok {
return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
}
addr = driver.driverEndpoint
func asCSIAccessModeV0(am api.PersistentVolumeAccessMode) csipbv0.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
return csipbv0.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.ReadOnlyMany:
return csipbv0.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.ReadWriteMany:
return csipbv0.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
return csipbv0.VolumeCapability_AccessMode_UNKNOWN
}
func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
network := "unix"
klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))
return grpc.Dial(
addr,
string(addr),
grpc.WithInsecure(),
grpc.WithDialer(func(target string, timeout time.Duration) (net.Conn, error) {
return net.Dial(network, target)
}),
)
}
func versionRequiresV0Client(version *utilversion.Version) bool {
if version != nil && version.Major() == 0 {
return true
}
return false
}

View File

@ -23,7 +23,7 @@ import (
"reflect"
"testing"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
api "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)
@ -45,7 +45,7 @@ func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
maxVolumePerNode int64,
accessibleTopology map[string]string,
err error) {
resp, err := c.nodeClient.NodeGetInfo(ctx, &csipb.NodeGetInfoRequest{})
resp, err := c.nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
topology := resp.GetAccessibleTopology()
if topology != nil {
accessibleTopology = topology.Segments
@ -60,26 +60,26 @@ func (c *fakeCsiDriverClient) NodePublishVolume(
stagingTargetPath string,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
publishContext map[string]string,
volumeContext map[string]string,
secrets map[string]string,
fsType string,
mountOptions []string,
) error {
c.t.Log("calling fake.NodePublishVolume...")
req := &csipb.NodePublishVolumeRequest{
req := &csipbv1.NodePublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishContext: volumeInfo,
PublishContext: publishContext,
VolumeContext: volumeContext,
Secrets: secrets,
VolumeCapability: &csipb.VolumeCapability{
AccessMode: &csipb.VolumeCapability_AccessMode{
Mode: asCSIAccessMode(accessMode),
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),
},
AccessType: &csipb.VolumeCapability_Mount{
Mount: &csipb.VolumeCapability_MountVolume{
AccessType: &csipbv1.VolumeCapability_Mount{
Mount: &csipbv1.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
@ -93,7 +93,7 @@ func (c *fakeCsiDriverClient) NodePublishVolume(
func (c *fakeCsiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
c.t.Log("calling fake.NodeUnpublishVolume...")
req := &csipb.NodeUnpublishVolumeRequest{
req := &csipbv1.NodeUnpublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
}
@ -112,16 +112,16 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context,
volumeContext map[string]string,
) error {
c.t.Log("calling fake.NodeStageVolume...")
req := &csipb.NodeStageVolumeRequest{
req := &csipbv1.NodeStageVolumeRequest{
VolumeId: volID,
PublishContext: publishContext,
StagingTargetPath: stagingTargetPath,
VolumeCapability: &csipb.VolumeCapability{
AccessMode: &csipb.VolumeCapability_AccessMode{
Mode: asCSIAccessMode(accessMode),
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),
},
AccessType: &csipb.VolumeCapability_Mount{
Mount: &csipb.VolumeCapability_MountVolume{
AccessType: &csipbv1.VolumeCapability_Mount{
Mount: &csipbv1.VolumeCapability_MountVolume{
FsType: fsType,
},
},
@ -136,7 +136,7 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context,
func (c *fakeCsiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
c.t.Log("calling fake.NodeUnstageVolume...")
req := &csipb.NodeUnstageVolumeRequest{
req := &csipbv1.NodeUnstageVolumeRequest{
VolumeId: volID,
StagingTargetPath: stagingTargetPath,
}
@ -146,7 +146,7 @@ func (c *fakeCsiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stag
func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsStageUnstage...")
req := &csipb.NodeGetCapabilitiesRequest{}
req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
@ -159,7 +159,7 @@ func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (boo
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipb.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
stageUnstageSet = true
}
}
@ -212,13 +212,13 @@ func TestClientNodeGetInfo(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeClientCreator: func(driverName string) (csipb.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
nodeClient.SetNodeGetInfoResp(&csipb.NodeGetInfoResponse{
nodeClient.SetNodeGetInfoResp(&csipbv1.NodeGetInfoResponse{
NodeId: tc.expectedNodeID,
MaxVolumesPerNode: tc.expectedMaxVolumePerNode,
AccessibleTopology: &csipb.Topology{
AccessibleTopology: &csipbv1.Topology{
Segments: tc.expectedAccessibleTopology,
},
})
@ -268,7 +268,7 @@ func TestClientNodePublishVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeClientCreator: func(driverName string) (csipb.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
@ -315,7 +315,7 @@ func TestClientNodeUnpublishVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeClientCreator: func(driverName string) (csipb.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
@ -353,7 +353,7 @@ func TestClientNodeStageVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeClientCreator: func(driverName string) (csipb.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
@ -397,7 +397,7 @@ func TestClientNodeUnstageVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeClientCreator: func(driverName string) (csipb.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil

View File

@ -55,18 +55,18 @@ var (
)
type csiMountMgr struct {
csiClient csiClient
k8s kubernetes.Interface
plugin *csiPlugin
driverName string
volumeID string
specVolumeID string
readOnly bool
spec *volume.Spec
pod *api.Pod
podUID types.UID
options volume.VolumeOptions
volumeInfo map[string]string
csiClient csiClient
k8s kubernetes.Interface
plugin *csiPlugin
driverName csiDriverName
volumeID string
specVolumeID string
readOnly bool
spec *volume.Spec
pod *api.Pod
podUID types.UID
options volume.VolumeOptions
publishContext map[string]string
volume.MetricsNil
}
@ -135,9 +135,9 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
}
}
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
if c.volumeInfo == nil {
if c.publishContext == nil {
nodeName := string(c.plugin.host.GetNodeName())
c.volumeInfo, err = c.plugin.getPublishVolumeInfo(c.k8s, c.volumeID, c.driverName, nodeName)
c.publishContext, err = c.plugin.getPublishContext(c.k8s, c.volumeID, string(c.driverName), nodeName)
if err != nil {
return err
}
@ -191,7 +191,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
deviceMountPath,
dir,
accessMode,
c.volumeInfo,
c.publishContext,
attribs,
nodePublishSecrets,
fsType,
@ -239,7 +239,7 @@ func (c *csiMountMgr) podAttributes() (map[string]string, error) {
return nil, errors.New("CSIDriver lister does not exist")
}
csiDriver, err := c.plugin.csiDriverLister.Get(c.driverName)
csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
if err != nil {
if apierrs.IsNotFound(err) {
klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", c.driverName))

View File

@ -75,6 +75,7 @@ func TestMounterGetPath(t *testing.T) {
}
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV(tc.specVolumeName, 10, testDriver, testVol)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
mounter, err := plug.NewMounter(
@ -161,6 +162,7 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
})
}
registerFakePlugin(test.driver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, test.driver, testVol)
pv.Spec.CSI.VolumeAttributes = test.volumeContext
pv.Spec.MountOptions = []string{"foo=bar", "baz=qux"}
@ -187,7 +189,7 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName()))
attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName()))
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
@ -331,6 +333,7 @@ func TestMounterSetUpWithFSGroup(t *testing.T) {
t.Logf("Running test %s", tc.name)
volName := fmt.Sprintf("test-vol-%d", i)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, volName)
pv.Spec.AccessModes = tc.accessModes
pvName := pv.GetName()
@ -357,7 +360,7 @@ func TestMounterSetUpWithFSGroup(t *testing.T) {
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMounter.volumeID, csiMounter.driverName, string(plug.host.GetNodeName()))
attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName()))
attachment := makeTestAttachment(attachID, "test-node", pvName)
_, err = csiMounter.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
@ -392,6 +395,7 @@ func TestMounterSetUpWithFSGroup(t *testing.T) {
func TestUnmounterTeardown(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
// save the data file prior to unmount

View File

@ -154,7 +154,11 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
}()
// Get node info from the driver.
csi := newCsiDriverClient(pluginName)
csi, err := newCsiDriverClient(csiDriverName(pluginName))
if err != nil {
return err
}
// TODO (verult) retry with exponential backoff, possibly added in csi client library.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
@ -298,7 +302,10 @@ func (p *csiPlugin) NewMounter(
return nil, errors.New("failed to get a Kubernetes client")
}
csi := newCsiDriverClient(pvSource.Driver)
csi, err := newCsiDriverClient(csiDriverName(pvSource.Driver))
if err != nil {
return nil, err
}
mounter := &csiMountMgr{
plugin: p,
@ -306,7 +313,7 @@ func (p *csiPlugin) NewMounter(
spec: spec,
pod: pod,
podUID: pod.UID,
driverName: pvSource.Driver,
driverName: csiDriverName(pvSource.Driver),
volumeID: pvSource.VolumeHandle,
specVolumeID: spec.Name(),
csiClient: csi,
@ -365,9 +372,12 @@ func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmo
klog.Error(log("unmounter failed to load volume data file [%s]: %v", dir, err))
return nil, err
}
unmounter.driverName = data[volDataKey.driverName]
unmounter.driverName = csiDriverName(data[volDataKey.driverName])
unmounter.volumeID = data[volDataKey.volHandle]
unmounter.csiClient = newCsiDriverClient(unmounter.driverName)
unmounter.csiClient, err = newCsiDriverClient(unmounter.driverName)
if err != nil {
return nil, err
}
return unmounter, nil
}
@ -479,7 +489,10 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
}
klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
client := newCsiDriverClient(pvSource.Driver)
client, err := newCsiDriverClient(csiDriverName(pvSource.Driver))
if err != nil {
return nil, err
}
k8s := p.host.GetKubeClient()
if k8s == nil {
@ -492,7 +505,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
k8s: k8s,
plugin: p,
volumeID: pvSource.VolumeHandle,
driverName: pvSource.Driver,
driverName: csiDriverName(pvSource.Driver),
readOnly: readOnly,
spec: spec,
specName: spec.Name(),
@ -550,9 +563,12 @@ func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (vo
klog.Error(log("unmapper failed to load volume data file [%s]: %v", dataDir, err))
return nil, err
}
unmapper.driverName = data[volDataKey.driverName]
unmapper.driverName = csiDriverName(data[volDataKey.driverName])
unmapper.volumeID = data[volDataKey.volHandle]
unmapper.csiClient = newCsiDriverClient(unmapper.driverName)
unmapper.csiClient, err = newCsiDriverClient(unmapper.driverName)
if err != nil {
return nil, err
}
return unmapper, nil
}
@ -613,7 +629,7 @@ func (p *csiPlugin) skipAttach(driver string) (bool, error) {
return false, nil
}
func (p *csiPlugin) getPublishVolumeInfo(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
skip, err := p.skipAttach(driver)
if err != nil {
return nil, err

View File

@ -104,6 +104,16 @@ func makeTestPV(name string, sizeGig int, driverName, volID string) *api.Persist
}
}
func registerFakePlugin(pluginName, endpoint string, versions []string, t *testing.T) {
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
highestSupportedVersions, err := highestSupportedVersion(versions)
if err != nil {
t.Fatalf("unexpected error parsing versions (%v) for pluginName % q endpoint %q: %#v", versions, pluginName, endpoint, err)
}
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint, highestSupportedVersion: highestSupportedVersions}
}
func TestPluginGetPluginName(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
@ -133,6 +143,7 @@ func TestPluginGetVolumeName(t *testing.T) {
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
registerFakePlugin(tc.driverName, "endpoint", []string{"0.3.0"}, t)
pv := makeTestPV("test-pv", 10, tc.driverName, tc.volName)
spec := volume.NewSpecFromPersistentVolume(pv, false)
name, err := plug.GetVolumeName(spec)
@ -151,6 +162,7 @@ func TestPluginCanSupport(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
spec := volume.NewSpecFromPersistentVolume(pv, false)
@ -227,6 +239,7 @@ func TestPluginNewMounter(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.2.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
@ -243,7 +256,7 @@ func TestPluginNewMounter(t *testing.T) {
csiMounter := mounter.(*csiMountMgr)
// validate mounter fields
if csiMounter.driverName != testDriver {
if string(csiMounter.driverName) != testDriver {
t.Error("mounter driver name not set")
}
if csiMounter.volumeID != testVol {
@ -277,6 +290,7 @@ func TestPluginNewUnmounter(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
// save the data file to re-create client
@ -364,6 +378,7 @@ func TestPluginNewBlockMapper(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-block-pv", 10, testDriver, testVol)
mounter, err := plug.NewBlockVolumeMapper(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
@ -380,7 +395,7 @@ func TestPluginNewBlockMapper(t *testing.T) {
csiMapper := mounter.(*csiBlockMapper)
// validate mounter fields
if csiMapper.driverName != testDriver {
if string(csiMapper.driverName) != testDriver {
t.Error("CSI block mapper missing driver name")
}
if csiMapper.volumeID != testVol {
@ -411,6 +426,7 @@ func TestPluginNewUnmapper(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
// save the data file to re-create client
@ -456,7 +472,7 @@ func TestPluginNewUnmapper(t *testing.T) {
}
// test loaded vol data
if csiUnmapper.driverName != testDriver {
if string(csiUnmapper.driverName) != testDriver {
t.Error("unmapper driverName not set")
}
if csiUnmapper.volumeID != testVol {

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["csi.pb.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/csiv0",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/golang/protobuf/proto:go_default_library",
"//vendor/github.com/golang/protobuf/ptypes/wrappers:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

File diff suppressed because it is too large Load Diff

View File

@ -47,6 +47,7 @@ var csiTestDrivers = []func() drivers.TestDriver{
drivers.InitHostPathCSIDriver,
drivers.InitGcePDCSIDriver,
drivers.InitGcePDExternalCSIDriver,
drivers.InitHostV0PathCSIDriver,
}
// List of testSuites to be executed in below loop

View File

@ -134,6 +134,92 @@ func (h *hostpathCSIDriver) CleanupDriver() {
}
}
// hostpathV0CSIDriver
type hostpathV0CSIDriver struct {
cleanup func()
driverInfo DriverInfo
}
var _ TestDriver = &hostpathV0CSIDriver{}
var _ DynamicPVTestDriver = &hostpathV0CSIDriver{}
// InitHostPathV0CSIDriver returns hostpathV0CSIDriver that implements TestDriver interface
func InitHostV0PathCSIDriver() TestDriver {
return &hostpathV0CSIDriver{
driverInfo: DriverInfo{
Name: "csi-hostpath-v0",
FeatureTag: "",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
),
IsPersistent: true,
IsFsGroupSupported: false,
IsBlockSupported: false,
},
}
}
func (h *hostpathV0CSIDriver) GetDriverInfo() *DriverInfo {
return &h.driverInfo
}
func (h *hostpathV0CSIDriver) SkipUnsupportedTest(pattern testpatterns.TestPattern) {
}
func (h *hostpathV0CSIDriver) GetDynamicProvisionStorageClass(fsType string) *storagev1.StorageClass {
provisioner := GetUniqueDriverName(h)
parameters := map[string]string{}
ns := h.driverInfo.Framework.Namespace.Name
suffix := fmt.Sprintf("%s-sc", provisioner)
return getStorageClass(provisioner, parameters, nil, ns, suffix)
}
func (h *hostpathV0CSIDriver) CreateDriver() {
By("deploying csi hostpath v0 driver")
f := h.driverInfo.Framework
cs := f.ClientSet
// pods should be scheduled on the node
nodes := framework.GetReadySchedulableNodesOrDie(cs)
node := nodes.Items[rand.Intn(len(nodes.Items))]
h.driverInfo.Config.ClientNodeName = node.Name
h.driverInfo.Config.ServerNodeName = node.Name
// TODO (?): the storage.csi.image.version and storage.csi.image.registry
// settings are ignored for this test. We could patch the image definitions.
o := utils.PatchCSIOptions{
OldDriverName: h.driverInfo.Name,
NewDriverName: GetUniqueDriverName(h),
DriverContainerName: "hostpath",
ProvisionerContainerName: "csi-provisioner-v0",
NodeName: h.driverInfo.Config.ServerNodeName,
}
cleanup, err := h.driverInfo.Framework.CreateFromManifests(func(item interface{}) error {
return utils.PatchCSIDeployment(h.driverInfo.Framework, o, item)
},
"test/e2e/testing-manifests/storage-csi/driver-registrar/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/hostpath/hostpath-v0/csi-hostpath-attacher.yaml",
"test/e2e/testing-manifests/storage-csi/hostpath/hostpath-v0/csi-hostpath-provisioner.yaml",
"test/e2e/testing-manifests/storage-csi/hostpath/hostpath-v0/csi-hostpathplugin.yaml",
"test/e2e/testing-manifests/storage-csi/hostpath/hostpath-v0/e2e-test-rbac.yaml",
)
h.cleanup = cleanup
if err != nil {
framework.Failf("deploying csi hostpath v0 driver: %v", err)
}
}
func (h *hostpathV0CSIDriver) CleanupDriver() {
if h.cleanup != nil {
By("uninstalling csi hostpath v0 driver")
h.cleanup()
}
}
// gce-pd
type gcePDCSIDriver struct {
cleanup func()

View File

@ -14,7 +14,7 @@ spec:
serviceAccountName: csi-node-sa
containers:
- name: csi-driver-registrar
image: gcr.io/gke-release/csi-driver-registrar:v1.0.0-gke.0
image: gcr.io/gke-release/csi-driver-registrar:v1.0.1-gke.0
args:
- "--v=5"
- "--csi-address=/csi/csi.sock"

View File

@ -0,0 +1,48 @@
kind: Service
apiVersion: v1
metadata:
name: csi-hostpath-attacher
labels:
app: csi-hostpath-attacher
spec:
selector:
app: csi-hostpath-attacher
ports:
- name: dummy
port: 12345
---
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: csi-hostpath-attacher
spec:
serviceName: "csi-hostpath-attacher"
replicas: 1
selector:
matchLabels:
app: csi-hostpath-attacher
template:
metadata:
labels:
app: csi-hostpath-attacher
spec:
serviceAccountName: csi-attacher
containers:
- name: csi-attacher
image: gcr.io/gke-release/csi-attacher:v0.4.1-gke.0
args:
- --v=5
- --csi-address=$(ADDRESS)
env:
- name: ADDRESS
value: /csi/csi.sock
imagePullPolicy: Always
volumeMounts:
- mountPath: /csi
name: socket-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-hostpath-v0
type: DirectoryOrCreate
name: socket-dir

View File

@ -0,0 +1,49 @@
kind: Service
apiVersion: v1
metadata:
name: csi-hostpath-provisioner
labels:
app: csi-hostpath-provisioner
spec:
selector:
app: csi-hostpath-provisioner
ports:
- name: dummy
port: 12345
---
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: csi-hostpath-provisioner
spec:
serviceName: "csi-hostpath-provisioner"
replicas: 1
selector:
matchLabels:
app: csi-hostpath-provisioner
template:
metadata:
labels:
app: csi-hostpath-provisioner
spec:
serviceAccountName: csi-provisioner
containers:
- name: csi-provisioner-v0
image: gcr.io/gke-release/csi-provisioner:v0.4.1-gke.0
args:
- "--provisioner=csi-hostpath-v0"
- "--csi-address=$(ADDRESS)"
- "--connection-timeout=15s"
env:
- name: ADDRESS
value: /csi/csi.sock
imagePullPolicy: Always
volumeMounts:
- mountPath: /csi
name: socket-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-hostpath-v0
type: DirectoryOrCreate
name: socket-dir

View File

@ -0,0 +1,70 @@
kind: DaemonSet
apiVersion: apps/v1
metadata:
name: csi-hostpathplugin
spec:
selector:
matchLabels:
app: csi-hostpathplugin
template:
metadata:
labels:
app: csi-hostpathplugin
spec:
serviceAccountName: csi-node-sa
hostNetwork: true
containers:
- name: driver-registrar
image: gcr.io/gke-release/csi-driver-registrar:v0.4.1-gke.0
args:
- --v=5
- --csi-address=/csi/csi.sock
- --kubelet-registration-path=/var/lib/kubelet/plugins/csi-hostpath-v0/csi.sock
env:
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
imagePullPolicy: Always
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /registration
name: registration-dir
- name: hostpath
image: quay.io/k8scsi/hostpathplugin:v0.4.1
args:
- "--v=5"
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(KUBE_NODE_NAME)"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
imagePullPolicy: Always
securityContext:
privileged: true
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /var/lib/kubelet/pods
mountPropagation: Bidirectional
name: mountpoint-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-hostpath-v0
type: DirectoryOrCreate
name: socket-dir
- hostPath:
path: /var/lib/kubelet/pods
type: DirectoryOrCreate
name: mountpoint-dir
- hostPath:
path: /var/lib/kubelet/plugins
type: Directory
name: registration-dir

View File

@ -0,0 +1,19 @@
# priviledged Pod Security Policy, previously defined just for gcePD via PrivilegedTestPSPClusterRoleBinding()
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: psp-csi-hostpath-role
subjects:
- kind: ServiceAccount
name: csi-attacher
namespace: default
- kind: ServiceAccount
name: csi-node-sa
namespace: default
- kind: ServiceAccount
name: csi-provisioner
namespace: default
roleRef:
kind: ClusterRole
name: e2e-test-privileged-psp
apiGroup: rbac.authorization.k8s.io

View File

@ -15,7 +15,7 @@ spec:
hostNetwork: true
containers:
- name: driver-registrar
image: gcr.io/gke-release/csi-driver-registrar:v1.0.0-gke.0
image: gcr.io/gke-release/csi-driver-registrar:v1.0.1-gke.0
args:
- --v=5
- --csi-address=/csi/csi.sock