Merge pull request #74863 from gnufied/csi-volume-expansion

CSI volume expansion
pull/564/head
Kubernetes Prow Robot 2019-03-08 07:37:29 -08:00 committed by GitHub
commit 3624c74ce8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1814 additions and 404 deletions

4
Godeps/Godeps.json generated
View File

@ -975,8 +975,8 @@
},
{
"ImportPath": "github.com/container-storage-interface/spec/lib/go/csi",
"Comment": "v1.0.0",
"Rev": "ed0bb0e1557548aa028307f48728767cfe8f6345"
"Comment": "v1.1.0-rc1",
"Rev": "c751be1edc7952e7aac35720d5b34b669e48f113"
},
{
"ImportPath": "github.com/containerd/console",

View File

@ -109,6 +109,11 @@ const (
// Ability to expand persistent volumes' file system without unmounting volumes.
ExpandInUsePersistentVolumes utilfeature.Feature = "ExpandInUsePersistentVolumes"
// owner: @gnufied
// alpha: v1.14
// Ability to expand CSI volumes
ExpandCSIVolumes utilfeature.Feature = "ExpandCSIVolumes"
// owner: @verb
// alpha: v1.10
//
@ -450,6 +455,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
QOSReserved: {Default: false, PreRelease: utilfeature.Alpha},
ExpandPersistentVolumes: {Default: true, PreRelease: utilfeature.Beta},
ExpandInUsePersistentVolumes: {Default: false, PreRelease: utilfeature.Alpha},
ExpandCSIVolumes: {Default: false, PreRelease: utilfeature.Alpha},
AttachVolumeLimit: {Default: true, PreRelease: utilfeature.Beta},
CPUManager: {Default: true, PreRelease: utilfeature.Beta},
CPUCFSQuotaPeriod: {Default: false, PreRelease: utilfeature.Alpha},

View File

@ -542,7 +542,7 @@ func (asw *actualStateOfWorld) MarkFSResizeRequired(
}
volumePlugin, err :=
asw.volumePluginMgr.FindExpandablePluginBySpec(podObj.volumeSpec)
asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.Errorf(

View File

@ -317,12 +317,15 @@ func (plugin *awsElasticBlockStorePlugin) ExpandVolumeDevice(
return awsVolume.ResizeDisk(volumeID, oldSize, newSize)
}
func (plugin *awsElasticBlockStorePlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (plugin *awsElasticBlockStorePlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}
var _ volume.FSResizableVolumePlugin = &awsElasticBlockStorePlugin{}
var _ volume.NodeExpandableVolumePlugin = &awsElasticBlockStorePlugin{}
var _ volume.ExpandableVolumePlugin = &awsElasticBlockStorePlugin{}
var _ volume.VolumePluginWithAttachLimits = &awsElasticBlockStorePlugin{}

View File

@ -312,12 +312,15 @@ func (plugin *azureDataDiskPlugin) ExpandVolumeDevice(
return diskController.ResizeDisk(spec.PersistentVolume.Spec.AzureDisk.DataDiskURI, oldSize, newSize)
}
func (plugin *azureDataDiskPlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (plugin *azureDataDiskPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}
var _ volume.FSResizableVolumePlugin = &azureDataDiskPlugin{}
var _ volume.NodeExpandableVolumePlugin = &azureDataDiskPlugin{}
func (plugin *azureDataDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())

View File

@ -304,12 +304,15 @@ func (plugin *cinderPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resour
return expandedSize, nil
}
func (plugin *cinderPlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (plugin *cinderPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}
var _ volume.FSResizableVolumePlugin = &cinderPlugin{}
var _ volume.NodeExpandableVolumePlugin = &cinderPlugin{}
func (plugin *cinderPlugin) RequiresFSResize() bool {
return true

View File

@ -10,6 +10,7 @@ go_library(
"csi_mounter.go",
"csi_plugin.go",
"csi_util.go",
"expander.go",
],
importpath = "k8s.io/kubernetes/pkg/volume/csi",
visibility = ["//visibility:public"],
@ -22,6 +23,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@ -50,6 +52,7 @@ go_test(
"csi_drivers_store_test.go",
"csi_mounter_test.go",
"csi_plugin_test.go",
"expander_test.go",
],
embed = [":go_default_library"],
deps = [

View File

@ -28,6 +28,7 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -55,6 +56,7 @@ type csiClient interface {
fsType string,
mountOptions []string,
) error
NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error)
NodeUnpublishVolume(
ctx context.Context,
volID string,
@ -71,6 +73,7 @@ type csiClient interface {
) error
NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error
NodeSupportsStageUnstage(ctx context.Context) (bool, error)
NodeSupportsNodeExpand(ctx context.Context) (bool, error)
}
// Strongly typed address
@ -304,6 +307,41 @@ func (c *csiDriverClient) NodePublishVolume(
}
func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
if c.nodeV1ClientCreator == nil {
return newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
}
if volumeID == "" {
return newSize, errors.New("missing volume id")
}
if volumePath == "" {
return newSize, errors.New("missing volume path")
}
if newSize.Value() < 0 {
return newSize, errors.New("size can not be less than 0")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return newSize, err
}
defer closer.Close()
req := &csipbv1.NodeExpandVolumeRequest{
VolumeId: volumeID,
VolumePath: volumePath,
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
}
resp, err := nodeClient.NodeExpandVolume(ctx, req)
if err != nil {
return newSize, err
}
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
return *updatedQuantity, nil
}
func (c *csiDriverClient) nodePublishVolumeV1(
ctx context.Context,
volID string,
@ -624,6 +662,41 @@ func (c *csiDriverClient) nodeUnstageVolumeV0(ctx context.Context, volID, stagin
return err
}
func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if Node has EXPAND_VOLUME capability"))
if c.nodeV1ClientCreator != nil {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return false, err
}
defer closer.Close()
req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}
capabilities := resp.GetCapabilities()
nodeExpandSet := false
if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
nodeExpandSet = true
}
}
return nodeExpandSet, nil
} else if c.nodeV0ClientCreator != nil {
return false, nil
}
return false, fmt.Errorf("failed to call NodeSupportsNodeExpand. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsStageUnstage"))

View File

@ -25,6 +25,7 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)
@ -40,6 +41,13 @@ func newFakeCsiDriverClient(t *testing.T, stagingCapable bool) *fakeCsiDriverCli
}
}
func newFakeCsiDriverClientWithExpansion(t *testing.T, stagingCapable bool, expansionSet bool) *fakeCsiDriverClient {
return &fakeCsiDriverClient{
t: t,
nodeClient: fake.NewNodeClientWithExpansion(stagingCapable, expansionSet),
}
}
func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
@ -144,6 +152,28 @@ func (c *fakeCsiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stag
return err
}
func (c *fakeCsiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
c.t.Log("calling fake.NodeSupportsNodeExpand...")
req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}
capabilities := resp.GetCapabilities()
if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
return true, nil
}
}
return false, nil
}
func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsStageUnstage...")
req := &csipbv1.NodeGetCapabilitiesRequest{}
@ -166,10 +196,29 @@ func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (boo
return stageUnstageSet, nil
}
func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
c.t.Log("calling fake.NodeExpandVolume")
req := &csipbv1.NodeExpandVolumeRequest{
VolumeId: volumeid,
VolumePath: volumePath,
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
}
resp, err := c.nodeClient.NodeExpandVolume(ctx, req)
if err != nil {
return newSize, err
}
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
return *updatedQuantity, nil
}
func setupClient(t *testing.T, stageUnstageSet bool) csiClient {
return newFakeCsiDriverClient(t, stageUnstageSet)
}
func setupClientWithExpansion(t *testing.T, stageUnstageSet bool, expansionSet bool) csiClient {
return newFakeCsiDriverClientWithExpansion(t, stageUnstageSet, expansionSet)
}
func checkErr(t *testing.T, expectedAnError bool, actualError error) {
t.Helper()
@ -415,3 +464,59 @@ func TestClientNodeUnstageVolume(t *testing.T) {
}
}
}
func TestNodeExpandVolume(t *testing.T) {
testCases := []struct {
name string
volID string
volumePath string
newSize resource.Quantity
mustFail bool
err error
}{
{
name: "with all correct values",
volID: "vol-abcde",
volumePath: "/foo/bar",
newSize: resource.MustParse("10Gi"),
mustFail: false,
},
{
name: "with missing volume-id",
volumePath: "/foo/bar",
newSize: resource.MustParse("10Gi"),
mustFail: true,
},
{
name: "with missing volume path",
volID: "vol-1234",
newSize: resource.MustParse("10Gi"),
mustFail: true,
},
{
name: "with invalid quantity",
volID: "vol-1234",
volumePath: "/foo/bar",
newSize: *resource.NewQuantity(-10, resource.DecimalSI),
mustFail: true,
},
}
for _, tc := range testCases {
t.Logf("Running test cases : %s", tc.name)
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
},
}
_, err := client.NodeExpandVolume(context.Background(), tc.volID, tc.volumePath, tc.newSize)
checkErr(t, tc.mustFail, err)
if !tc.mustFail {
fakeCloser.Check()
}
}
}

View File

@ -0,0 +1,96 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"fmt"
api "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
)
var _ volume.NodeExpandableVolumePlugin = &csiPlugin{}
func (c *csiPlugin) RequiresFSResize() bool {
// We could check plugin's node capability but we instead are going to rely on
// NodeExpand to do the right thing and return early if plugin does not have
// node expansion capability.
if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandCSIVolumes) {
klog.V(4).Infof("Resizing is not enabled for CSI volume")
return false
}
return true
}
func (c *csiPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
klog.V(4).Infof(log("Expander.NodeExpand(%s)", resizeOptions.DeviceMountPath))
csiSource, err := getCSISourceFromSpec(resizeOptions.VolumeSpec)
if err != nil {
klog.Error(log("Expander.NodeExpand failed to get CSI persistent source: %v", err))
return false, err
}
csClient, err := newCsiDriverClient(csiDriverName(csiSource.Driver))
if err != nil {
return false, err
}
return c.nodeExpandWithClient(resizeOptions, csiSource, csClient)
}
func (c *csiPlugin) nodeExpandWithClient(
resizeOptions volume.NodeResizeOptions,
csiSource *api.CSIPersistentVolumeSource,
csClient csiClient) (bool, error) {
driverName := csiSource.Driver
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
nodeExpandSet, err := csClient.NodeSupportsNodeExpand(ctx)
if err != nil {
return false, fmt.Errorf("Expander.NodeExpand failed to check if node supports expansion : %v", err)
}
if !nodeExpandSet {
return false, fmt.Errorf("Expander.NodeExpand found CSI plugin %s/%s to not support node expansion", c.GetPluginName(), driverName)
}
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := csClient.NodeSupportsStageUnstage(ctx)
if err != nil {
return false, fmt.Errorf("Expander.NodeExpand failed to check if plugins supports stage_unstage %v", err)
}
// if plugin does not support STAGE_UNSTAGE but CSI volume path is staged
// it must mean this was placeholder staging performed by k8s and not CSI staging
// in which case we should return from here so as volume can be node published
// before we can resize
if !stageUnstageSet && resizeOptions.CSIVolumePhase == volume.CSIVolumeStaged {
return false, nil
}
_, err = csClient.NodeExpandVolume(ctx, csiSource.VolumeHandle, resizeOptions.DeviceMountPath, resizeOptions.NewSize)
if err != nil {
return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %v", err)
}
return true, nil
}

View File

@ -0,0 +1,94 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"os"
"testing"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume"
)
func TestNodeExpand(t *testing.T) {
tests := []struct {
name string
nodeExpansion bool
nodeStageSet bool
volumePhase volume.CSIVolumePhaseType
success bool
}{
{
name: "when node expansion is not set",
success: false,
},
{
name: "when nodeExpansion=on, nodeStage=on, volumePhase=staged",
nodeExpansion: true,
nodeStageSet: true,
volumePhase: volume.CSIVolumeStaged,
success: true,
},
{
name: "when nodeExpansion=on, nodeStage=off, volumePhase=staged",
nodeExpansion: true,
volumePhase: volume.CSIVolumeStaged,
success: false,
},
{
name: "when nodeExpansion=on, nodeStage=on, volumePhase=published",
nodeExpansion: true,
nodeStageSet: true,
volumePhase: volume.CSIVolumePublished,
success: true,
},
{
name: "when nodeExpansion=on, nodeStage=off, volumePhase=published",
nodeExpansion: true,
volumePhase: volume.CSIVolumePublished,
success: true,
},
}
for _, tc := range tests {
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "expandable", "test-vol"), false)
newSize, _ := resource.ParseQuantity("20Gi")
resizeOptions := volume.NodeResizeOptions{
VolumeSpec: spec,
NewSize: newSize,
DeviceMountPath: "/foo/bar",
CSIVolumePhase: tc.volumePhase,
}
csiSource, _ := getCSISourceFromSpec(resizeOptions.VolumeSpec)
csClient := setupClientWithExpansion(t, tc.nodeStageSet, tc.nodeExpansion)
ok, err := plug.nodeExpandWithClient(resizeOptions, csiSource, csClient)
if ok != tc.success {
if err != nil {
t.Errorf("For %s : expected %v got %v with %v", tc.name, tc.success, ok, err)
} else {
t.Errorf("For %s : expected %v got %v", tc.name, tc.success, ok)
}
}
}
}

View File

@ -67,6 +67,7 @@ type NodeClient struct {
nodePublishedVolumes map[string]CSIVolume
nodeStagedVolumes map[string]CSIVolume
stageUnstageSet bool
expansionSet bool
nodeGetInfoResp *csipb.NodeGetInfoResponse
nextErr error
}
@ -80,6 +81,15 @@ func NewNodeClient(stageUnstageSet bool) *NodeClient {
}
}
func NewNodeClientWithExpansion(stageUnstageSet bool, expansionSet bool) *NodeClient {
return &NodeClient{
nodePublishedVolumes: make(map[string]CSIVolume),
nodeStagedVolumes: make(map[string]CSIVolume),
stageUnstageSet: stageUnstageSet,
expansionSet: expansionSet,
}
}
// SetNextError injects next expected error
func (f *NodeClient) SetNextError(err error) {
f.nextErr = err
@ -195,6 +205,29 @@ func (f *NodeClient) NodeUnstageVolume(ctx context.Context, req *csipb.NodeUnsta
return &csipb.NodeUnstageVolumeResponse{}, nil
}
// NodeExpandVolume implements csi method
func (f *NodeClient) NodeExpandVolume(ctx context.Context, req *csipb.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeExpandVolumeResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if req.GetVolumeId() == "" {
return nil, errors.New("missing volume id")
}
if req.GetVolumePath() == "" {
return nil, errors.New("missing volume path")
}
if req.GetCapacityRange().RequiredBytes <= 0 {
return nil, errors.New("required bytes should be greater than 0")
}
resp := &csipb.NodeExpandVolumeResponse{
CapacityBytes: req.GetCapacityRange().RequiredBytes,
}
return resp, nil
}
// NodeGetId implements csi method
func (f *NodeClient) NodeGetInfo(ctx context.Context, in *csipb.NodeGetInfoRequest, opts ...grpc.CallOption) (*csipb.NodeGetInfoResponse, error) {
if f.nextErr != nil {
@ -206,20 +239,27 @@ func (f *NodeClient) NodeGetInfo(ctx context.Context, in *csipb.NodeGetInfoReque
// NodeGetCapabilities implements csi method
func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) {
resp := &csipb.NodeGetCapabilitiesResponse{
Capabilities: []*csipb.NodeServiceCapability{
{
Type: &csipb.NodeServiceCapability_Rpc{
Rpc: &csipb.NodeServiceCapability_RPC{
Type: csipb.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
},
Capabilities: []*csipb.NodeServiceCapability{},
}
if f.stageUnstageSet {
return resp, nil
resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
Type: &csipb.NodeServiceCapability_Rpc{
Rpc: &csipb.NodeServiceCapability_RPC{
Type: csipb.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
})
}
return nil, nil
if f.expansionSet {
resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
Type: &csipb.NodeServiceCapability_Rpc{
Rpc: &csipb.NodeServiceCapability_RPC{
Type: csipb.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
},
})
}
return resp, nil
}
// NodeGetVolumeStats implements csi method

View File

@ -36,10 +36,13 @@ func (e *expanderDefaults) ExpandVolumeDevice(spec *volume.Spec, newSize resourc
return newSize, nil
}
// the defaults for ExpandFS return a generic resize indicator that will trigger the operation executor to go ahead with
// the defaults for NodeExpand return a generic resize indicator that will trigger the operation executor to go ahead with
// generic filesystem resize
func (e *expanderDefaults) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
klog.Warning(logPrefix(e.plugin), "using default filesystem resize for volume ", spec.Name(), ", at ", devicePath)
_, err := util.GenericResizeFS(e.plugin.host, e.plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (e *expanderDefaults) NodeExpand(rsOpt volume.NodeResizeOptions) (bool, error) {
klog.Warning(logPrefix(e.plugin), "using default filesystem resize for volume ", rsOpt.VolumeSpec.Name(), ", at ", rsOpt.DevicePath)
_, err := util.GenericResizeFS(e.plugin.host, e.plugin.GetPluginName(), rsOpt.DevicePath, rsOpt.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}

View File

@ -18,9 +18,10 @@ package flexvolume
import (
"fmt"
"strconv"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume"
"strconv"
)
func (plugin *flexVolumePlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) {
@ -42,26 +43,29 @@ func (plugin *flexVolumePlugin) ExpandVolumeDevice(spec *volume.Spec, newSize re
return newSize, err
}
func (plugin *flexVolumePlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, newSize, oldSize resource.Quantity) error {
func (plugin *flexVolumePlugin) NodeExpand(rsOpt volume.NodeResizeOptions) (bool, error) {
// This method is called after we spec.PersistentVolume.Spec.Capacity
// has been updated to the new size. The underlying driver thus sees
// the _new_ (requested) size and can find out the _current_ size from
// its underlying storage implementation
if spec.PersistentVolume == nil {
return fmt.Errorf("PersistentVolume not found for spec: %s", spec.Name())
if rsOpt.VolumeSpec.PersistentVolume == nil {
return false, fmt.Errorf("PersistentVolume not found for spec: %s", rsOpt.VolumeSpec.Name())
}
call := plugin.NewDriverCall(expandFSCmd)
call.AppendSpec(spec, plugin.host, nil)
call.Append(devicePath)
call.Append(deviceMountPath)
call.Append(strconv.FormatInt(newSize.Value(), 10))
call.Append(strconv.FormatInt(oldSize.Value(), 10))
call.AppendSpec(rsOpt.VolumeSpec, plugin.host, nil)
call.Append(rsOpt.DevicePath)
call.Append(rsOpt.DeviceMountPath)
call.Append(strconv.FormatInt(rsOpt.NewSize.Value(), 10))
call.Append(strconv.FormatInt(rsOpt.OldSize.Value(), 10))
_, err := call.Run()
if isCmdNotSupportedErr(err) {
return newExpanderDefaults(plugin).ExpandFS(spec, devicePath, deviceMountPath, newSize, oldSize)
return newExpanderDefaults(plugin).NodeExpand(rsOpt)
}
return err
if err != nil {
return false, err
}
return true, nil
}

View File

@ -56,7 +56,7 @@ type flexVolumeAttachablePlugin struct {
var _ volume.AttachableVolumePlugin = &flexVolumeAttachablePlugin{}
var _ volume.PersistentVolumePlugin = &flexVolumePlugin{}
var _ volume.FSResizableVolumePlugin = &flexVolumePlugin{}
var _ volume.NodeExpandableVolumePlugin = &flexVolumePlugin{}
var _ volume.ExpandableVolumePlugin = &flexVolumePlugin{}
var _ volume.DeviceMountableVolumePlugin = &flexVolumeAttachablePlugin{}

View File

@ -290,12 +290,15 @@ func (plugin *gcePersistentDiskPlugin) ExpandVolumeDevice(
return updatedQuantity, nil
}
func (plugin *gcePersistentDiskPlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (plugin *gcePersistentDiskPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}
var _ volume.FSResizableVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.NodeExpandableVolumePlugin = &gcePersistentDiskPlugin{}
func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())

View File

@ -46,6 +46,9 @@ type ProbeEvent struct {
Op ProbeOperation // The operation to the plugin
}
// CSIVolumePhaseType stores information about CSI volume path.
type CSIVolumePhaseType string
const (
// Common parameter which can be specified in StorageClass to specify the desired FSType
// Provisioners SHOULD implement support for this if they are block device based
@ -55,6 +58,8 @@ const (
ProbeAddOrUpdate ProbeOperation = 1 << iota
ProbeRemove
CSIVolumeStaged CSIVolumePhaseType = "staged"
CSIVolumePublished CSIVolumePhaseType = "published"
)
// VolumeOptions contains option information about a volume.
@ -85,6 +90,26 @@ type VolumeOptions struct {
Parameters map[string]string
}
// NodeResizeOptions contain options to be passed for node expansion.
type NodeResizeOptions struct {
VolumeSpec *Spec
// DevicePath - location of actual device on the node. In case of CSI
// this just could be volumeID
DevicePath string
// DeviceMountPath location where device is mounted on the node. If volume type
// is attachable - this would be global mount path otherwise
// it would be location where volume was mounted for the pod
DeviceMountPath string
NewSize resource.Quantity
OldSize resource.Quantity
// CSIVolumePhase contains volume phase on the node
CSIVolumePhase CSIVolumePhaseType
}
type DynamicPluginProber interface {
Init() error
@ -219,18 +244,20 @@ type DeviceMountableVolumePlugin interface {
}
// ExpandableVolumePlugin is an extended interface of VolumePlugin and is used for volumes that can be
// expanded
// expanded via control-plane ExpandVolumeDevice call.
type ExpandableVolumePlugin interface {
VolumePlugin
ExpandVolumeDevice(spec *Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error)
RequiresFSResize() bool
}
// FSResizableVolumePlugin is an extension of ExpandableVolumePlugin and is used for volumes (flex)
// that require extra steps on nodes for expansion to complete
type FSResizableVolumePlugin interface {
ExpandableVolumePlugin
ExpandFS(spec *Spec, devicePath, deviceMountPath string, newSize, oldSize resource.Quantity) error
// NodeExpandableVolumePlugin is an expanded interface of VolumePlugin and is used for volumes that
// require expansion on the node via NodeExpand call.
type NodeExpandableVolumePlugin interface {
VolumePlugin
RequiresFSResize() bool
// NodeExpand expands volume on given deviceMountPath and returns true if resize is successful.
NodeExpand(resizeOptions NodeResizeOptions) (bool, error)
}
// VolumePluginWithAttachLimits is an extended interface of VolumePlugin that restricts number of
@ -951,26 +978,26 @@ func (pm *VolumePluginMgr) FindMapperPluginByName(name string) (BlockVolumePlugi
return nil, nil
}
// FindFSResizablePluginBySpec fetches a persistent volume plugin by spec
func (pm *VolumePluginMgr) FindFSResizablePluginBySpec(spec *Spec) (FSResizableVolumePlugin, error) {
// FindNodeExpandablePluginBySpec fetches a persistent volume plugin by spec
func (pm *VolumePluginMgr) FindNodeExpandablePluginBySpec(spec *Spec) (NodeExpandableVolumePlugin, error) {
volumePlugin, err := pm.FindPluginBySpec(spec)
if err != nil {
return nil, err
}
if fsResizablePlugin, ok := volumePlugin.(FSResizableVolumePlugin); ok {
if fsResizablePlugin, ok := volumePlugin.(NodeExpandableVolumePlugin); ok {
return fsResizablePlugin, nil
}
return nil, nil
}
// FindFSResizablePluginByName fetches a persistent volume plugin by name
func (pm *VolumePluginMgr) FindFSResizablePluginByName(name string) (FSResizableVolumePlugin, error) {
// FindNodeExpandablePluginByName fetches a persistent volume plugin by name
func (pm *VolumePluginMgr) FindNodeExpandablePluginByName(name string) (NodeExpandableVolumePlugin, error) {
volumePlugin, err := pm.FindPluginByName(name)
if err != nil {
return nil, err
}
if fsResizablePlugin, ok := volumePlugin.(FSResizableVolumePlugin); ok {
if fsResizablePlugin, ok := volumePlugin.(NodeExpandableVolumePlugin); ok {
return fsResizablePlugin, nil
}

View File

@ -201,12 +201,15 @@ func (plugin *rbdPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resource.
}
}
func (plugin *rbdPlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
_, err := volutil.GenericResizeFS(plugin.host, plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (plugin *rbdPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
_, err := volutil.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}
var _ volume.FSResizableVolumePlugin = &rbdPlugin{}
var _ volume.NodeExpandableVolumePlugin = &rbdPlugin{}
func (expander *rbdVolumeExpander) ResizeImage(oldSize resource.Quantity, newSize resource.Quantity) (resource.Quantity, error) {
return expander.manager.ExpandImage(expander, oldSize, newSize)

View File

@ -280,7 +280,7 @@ var _ ProvisionableVolumePlugin = &FakeVolumePlugin{}
var _ AttachableVolumePlugin = &FakeVolumePlugin{}
var _ VolumePluginWithAttachLimits = &FakeVolumePlugin{}
var _ DeviceMountableVolumePlugin = &FakeVolumePlugin{}
var _ FSResizableVolumePlugin = &FakeVolumePlugin{}
var _ NodeExpandableVolumePlugin = &FakeVolumePlugin{}
func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
volumeList := *list
@ -537,8 +537,8 @@ func (plugin *FakeVolumePlugin) RequiresFSResize() bool {
return true
}
func (plugin *FakeVolumePlugin) ExpandFS(spec *Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
return nil
func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) {
return true, nil
}
func (plugin *FakeVolumePlugin) GetVolumeLimits() (map[string]int64, error) {

View File

@ -598,6 +598,12 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)))
}
var resizeDone bool
var resizeError error
resizeOptions := volume.NodeResizeOptions{
DevicePath: devicePath,
}
if volumeDeviceMounter != nil {
deviceMountPath, err :=
volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
@ -626,12 +632,16 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
}
resizeOptions.DeviceMountPath = deviceMountPath
resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged
// resizeFileSystem will resize the file system if user has requested a resize of
// underlying persistent volume and is allowed to do so.
resizeSimpleError, resizeDetailedError := og.resizeFileSystem(volumeToMount, devicePath, deviceMountPath, volumePluginName)
resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName)
if resizeSimpleError != nil || resizeDetailedError != nil {
return resizeSimpleError, resizeDetailedError
if resizeError != nil {
klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError)
return volumeToMount.GenerateError("MountVolume.MountDevice failed while expanding volume", resizeError)
}
}
@ -657,6 +667,20 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
verbosity = klog.Level(4)
}
klog.V(verbosity).Infof(detailedMsg)
resizeOptions.DeviceMountPath = volumeMounter.GetPath()
resizeOptions.CSIVolumePhase = volume.CSIVolumePublished
// We need to call resizing here again in case resizing was not done during device mount. There could be
// two reasons of that:
// - Volume does not support DeviceMounter interface.
// - In case of CSI the volume does not have node stage_unstage capability.
if !resizeDone {
resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName)
if resizeError != nil {
klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError)
return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
}
}
// Update actual state of world
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(
@ -689,15 +713,15 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
}
}
func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devicePath, deviceMountPath, pluginName string) (simpleErr, detailedErr error) {
func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions, pluginName string) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName)
return nil, nil
return true, nil
}
// Get expander, if possible
expandableVolumePlugin, _ :=
og.volumePluginMgr.FindFSResizablePluginBySpec(volumeToMount.VolumeSpec)
og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
if expandableVolumePlugin != nil &&
expandableVolumePlugin.RequiresFSResize() &&
@ -706,7 +730,7 @@ func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devi
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
// Return error rather than leave the file system un-resized, caller will log and retry
return volumeToMount.GenerateError("MountVolume.resizeFileSystem get PVC failed", err)
return false, fmt.Errorf("MountVolume.resizeFileSystem get PVC failed : %v", err)
}
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
@ -719,10 +743,20 @@ func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devi
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.resizeFileSystem failed", "requested read-only file system")
klog.Warningf(detailedMsg)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return nil, nil
return true, nil
}
if resizeErr := expandableVolumePlugin.ExpandFS(volumeToMount.VolumeSpec, devicePath, deviceMountPath, pvSpecCap, pvcStatusCap); resizeErr != nil {
return volumeToMount.GenerateError("MountVolume.resizeFileSystem failed", resizeErr)
rsOpts.VolumeSpec = volumeToMount.VolumeSpec
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
if resizeErr != nil {
return false, fmt.Errorf("MountVolume.resizeFileSystem failed : %v", resizeErr)
}
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return false, nil
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.resizeFileSystem succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
@ -731,12 +765,12 @@ func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devi
err = util.MarkFSResizeFinished(pvc, pv.Spec.Capacity, og.kubeClient)
if err != nil {
// On retry, resizeFileSystem will be called again but do nothing
return volumeToMount.GenerateError("MountVolume.resizeFileSystem update PVC status failed", err)
return false, fmt.Errorf("MountVolume.resizeFileSystem update PVC status failed : %v", err)
}
return nil, nil
return true, nil
}
}
return nil, nil
return true, nil
}
func (og *operationGenerator) GenerateUnmountVolumeFunc(
@ -1446,41 +1480,58 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err)
}
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
if attachableVolumePlugin == nil {
err = fmt.Errorf("AttachableVolumePlugin is nil")
}
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindAttachablePluginBySpec failed", err)
}
volumeAttacher, err := attachableVolumePlugin.NewAttacher()
if err != nil || volumeAttacher == nil {
if volumeAttacher == nil {
err = fmt.Errorf("VolumeAttacher is nil")
}
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.NewAttacher failed", err)
}
deviceMountPath, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.GetDeviceMountPath failed", err)
}
fsResizeFunc := func() (error, error) {
resizeSimpleError, resizeDetailedError := og.resizeFileSystem(volumeToMount, volumeToMount.DevicePath, deviceMountPath, volumePlugin.GetPluginName())
var resizeDone bool
var simpleErr, detailedErr error
resizeOptions := volume.NodeResizeOptions{
VolumeSpec: volumeToMount.VolumeSpec,
}
if resizeSimpleError != nil || resizeDetailedError != nil {
return resizeSimpleError, resizeDetailedError
attachableVolumePlugin, _ :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
if attachableVolumePlugin != nil {
volumeAttacher, _ := attachableVolumePlugin.NewAttacher()
if volumeAttacher != nil {
resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged
resizeOptions.DevicePath = volumeToMount.DevicePath
dmp, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec)
if err != nil {
return volumeToMount.GenerateError("VolumeFSResize.GetDeviceMountPath failed", err)
}
resizeOptions.DeviceMountPath = dmp
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName())
if simpleErr != nil || detailedErr != nil {
return simpleErr, detailedErr
}
if resizeDone {
return nil, nil
}
}
}
markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName)
if markFSResizedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("VolumeFSResize.MarkVolumeAsResized failed", markFSResizedErr)
// if we are here that means volume plugin does not support attach interface
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod,
volume.VolumeOptions{})
if newMounterErr != nil {
return volumeToMount.GenerateError("VolumeFSResize.NewMounter initialization failed", newMounterErr)
}
return nil, nil
resizeOptions.DeviceMountPath = volumeMounter.GetPath()
resizeOptions.CSIVolumePhase = volume.CSIVolumePublished
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName())
if simpleErr != nil || detailedErr != nil {
return simpleErr, detailedErr
}
if resizeDone {
return nil, nil
}
// This is a placeholder error - we should NEVER reach here.
err := fmt.Errorf("volume resizing failed for unknown reason")
return volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed to resize volume", err)
}
eventRecorderFunc := func(err *error) {
if *err != nil {
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
@ -1494,6 +1545,28 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(
}, nil
}
func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
resizeOptions volume.NodeResizeOptions,
pluginName string) (bool, error, error) {
resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions, pluginName)
if err != nil {
klog.Errorf("VolumeFSResize.resizeFileSystem failed : %v", err)
e1, e2 := volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed", err)
return false, e1, e2
}
if resizeDone {
markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName)
if markFSResizedErr != nil {
// On failure, return error. Caller will log and retry.
e1, e2 := volumeToMount.GenerateError("VolumeFSResize.MarkVolumeAsResized failed", markFSResizedErr)
return false, e1, e2
}
return true, nil, nil
}
return false, nil, nil
}
func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec)

View File

@ -28,6 +28,7 @@ import (
storage "k8s.io/api/storage/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
@ -50,16 +51,23 @@ type cleanupFuncs func()
const (
csiNodeLimitUpdateTimeout = 5 * time.Minute
csiPodUnschedulableTimeout = 5 * time.Minute
csiResizeWaitPeriod = 5 * time.Minute
// how long to wait for Resizing Condition on PVC to appear
csiResizingConditionWait = 2 * time.Minute
)
var _ = utils.SIGDescribe("CSI mock volume", func() {
type testParameters struct {
disableAttach bool
attachLimit int
registerDriver bool
podInfo *bool
scName string
nodeSelectorKey string
disableAttach bool
attachLimit int
registerDriver bool
podInfo *bool
scName string
nodeSelectorKey string
enableResizing bool // enable resizing for both CSI mock driver and storageClass.
enableNodeExpansion bool // enable node expansion for CSI mock driver
// just disable resizing on driver it overrides enableResizing flag for CSI mock driver
disableResizingOnDriver bool
}
type mockDriverSetup struct {
@ -87,8 +95,21 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
}
cs := f.ClientSet
var err error
driverOpts := drivers.CSIMockDriverOpts{
RegisterDriver: tp.registerDriver,
PodInfo: tp.podInfo,
AttachLimit: tp.attachLimit,
DisableAttach: tp.disableAttach,
EnableResizing: tp.enableResizing,
EnableNodeExpansion: tp.enableNodeExpansion,
}
m.driver = drivers.InitMockCSIDriver(tp.registerDriver, !tp.disableAttach, tp.podInfo, tp.attachLimit)
// this just disable resizing on driver, keeping resizing on SC enabled.
if tp.disableResizingOnDriver {
driverOpts.EnableResizing = false
}
m.driver = drivers.InitMockCSIDriver(driverOpts)
config, testCleanup := m.driver.PrepareTest(f)
m.testCleanups = append(m.testCleanups, testCleanup)
m.config = config
@ -127,6 +148,11 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
if m.tp.scName != "" {
scTest.StorageClassName = m.tp.scName
}
if m.tp.enableResizing {
scTest.AllowVolumeExpansion = true
}
nodeSelection := testsuites.NodeSelection{
// The mock driver only works when everything runs on a single node.
Name: nodeName,
@ -149,6 +175,23 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
return class, claim, pod
}
createPodWithPVC := func(pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) {
nodeName := m.config.ClientNodeName
nodeSelection := testsuites.NodeSelection{
Name: nodeName,
}
if len(m.nodeLabel) > 0 {
nodeSelection = testsuites.NodeSelection{
Selector: m.nodeLabel,
}
}
pod, err := startPausePodWithClaim(m.cs, pvc, nodeSelection, f.Namespace.Name)
if pod != nil {
m.pods = append(m.pods, pod)
}
return pod, err
}
cleanup := func() {
cs := f.ClientSet
var errs []error
@ -340,6 +383,177 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
})
})
Context("CSI Volume expansion [Feature:ExpandCSIVolumes]", func() {
tests := []struct {
name string
nodeExpansionRequired bool
disableAttach bool
disableResizingOnDriver bool
expectFailure bool
}{
{
name: "should expand volume without restarting pod if nodeExpansion=off",
nodeExpansionRequired: false,
},
{
name: "should expand volume by restarting pod if attach=on, nodeExpansion=on",
nodeExpansionRequired: true,
},
{
name: "should expand volume by restarting pod if attach=off, nodeExpansion=on",
disableAttach: true,
nodeExpansionRequired: true,
},
{
name: "should not expand volume if resizingOnDriver=off, resizingOnSC=on",
disableResizingOnDriver: true,
expectFailure: true,
},
}
for _, t := range tests {
test := t
It(t.name, func() {
var err error
tp := testParameters{
enableResizing: true,
enableNodeExpansion: test.nodeExpansionRequired,
disableResizingOnDriver: test.disableResizingOnDriver,
}
// disabling attach requires drive registration feature
if test.disableAttach {
tp.disableAttach = true
tp.registerDriver = true
}
init(tp)
defer cleanup()
ns := f.Namespace.Name
sc, pvc, pod := createPod()
Expect(pod).NotTo(BeNil(), "while creating pod for resizing")
Expect(*sc.AllowVolumeExpansion).To(BeTrue(), "failed creating sc with allowed expansion")
err = framework.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
framework.ExpectNoError(err, "Failed to start pod1: %v", err)
By("Expanding current pvc")
newSize := resource.MustParse("6Gi")
pvc, err = expandPVCSize(pvc, newSize, m.cs)
Expect(err).NotTo(HaveOccurred(), "While updating pvc for more size")
Expect(pvc).NotTo(BeNil())
pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
if pvcSize.Cmp(newSize) != 0 {
framework.Failf("error updating pvc size %q", pvc.Name)
}
if test.expectFailure {
err = waitForResizingCondition(pvc, m.cs, csiResizingConditionWait)
Expect(err).To(HaveOccurred(), "unexpected resizing condition on PVC")
return
}
By("Waiting for persistent volume resize to finish")
err = waitForControllerVolumeResize(pvc, m.cs, csiResizeWaitPeriod)
Expect(err).NotTo(HaveOccurred(), "While waiting for CSI PV resize to finish")
checkPVCSize := func() {
By("Waiting for PVC resize to finish")
pvc, err = waitForFSResize(pvc, m.cs)
Expect(err).NotTo(HaveOccurred(), "while waiting for PVC resize to finish")
pvcConditions := pvc.Status.Conditions
Expect(len(pvcConditions)).To(Equal(0), "pvc should not have conditions")
}
// if node expansion is not required PVC should be resized as well
if !test.nodeExpansionRequired {
checkPVCSize()
} else {
By("Checking for conditions on pvc")
pvc, err = m.cs.CoreV1().PersistentVolumeClaims(ns).Get(pvc.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "While fetching pvc after controller resize")
inProgressConditions := pvc.Status.Conditions
if len(inProgressConditions) > 0 {
Expect(inProgressConditions[0].Type).To(Equal(v1.PersistentVolumeClaimFileSystemResizePending), "pvc must have fs resizing condition")
}
By("Deleting the previously created pod")
err = framework.DeletePodWithWait(f, m.cs, pod)
Expect(err).NotTo(HaveOccurred(), "while deleting pod for resizing")
By("Creating a new pod with same volume")
pod2, err := createPodWithPVC(pvc)
Expect(pod2).NotTo(BeNil(), "while creating pod for csi resizing")
Expect(err).NotTo(HaveOccurred(), "while recreating pod for resizing")
checkPVCSize()
}
})
}
})
Context("CSI online volume expansion [Feature:ExpandCSIVolumes][Feature:ExpandInUseVolumes]", func() {
tests := []struct {
name string
disableAttach bool
}{
{
name: "should expand volume without restarting pod if attach=on, nodeExpansion=on",
},
{
name: "should expand volume without restarting pod if attach=off, nodeExpansion=on",
disableAttach: true,
},
}
for _, t := range tests {
test := t
It(test.name, func() {
var err error
params := testParameters{enableResizing: true, enableNodeExpansion: true}
if test.disableAttach {
params.disableAttach = true
params.registerDriver = true
}
init(params)
defer cleanup()
sc, pvc, pod := createPod()
Expect(pod).NotTo(BeNil(), "while creating pod for resizing")
Expect(*sc.AllowVolumeExpansion).To(BeTrue(), "failed creating sc with allowed expansion")
err = framework.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
framework.ExpectNoError(err, "Failed to start pod1: %v", err)
By("Expanding current pvc")
newSize := resource.MustParse("6Gi")
pvc, err = expandPVCSize(pvc, newSize, m.cs)
Expect(err).NotTo(HaveOccurred(), "While updating pvc for more size")
Expect(pvc).NotTo(BeNil())
pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
if pvcSize.Cmp(newSize) != 0 {
framework.Failf("error updating pvc size %q", pvc.Name)
}
By("Waiting for persistent volume resize to finish")
err = waitForControllerVolumeResize(pvc, m.cs, csiResizeWaitPeriod)
Expect(err).NotTo(HaveOccurred(), "While waiting for PV resize to finish")
By("Waiting for PVC resize to finish")
pvc, err = waitForFSResize(pvc, m.cs)
Expect(err).NotTo(HaveOccurred(), "while waiting for PVC to finish")
pvcConditions := pvc.Status.Conditions
Expect(len(pvcConditions)).To(Equal(0), "pvc should not have conditions")
})
}
})
})
func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
@ -445,6 +659,49 @@ func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node t
return class, claim, pod
}
func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node testsuites.NodeSelection, ns string) (*v1.Pod, error) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "pvc-volume-tester-",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "volume-tester",
Image: imageutils.GetE2EImage(imageutils.Pause),
VolumeMounts: []v1.VolumeMount{
{
Name: "my-volume",
MountPath: "/mnt/test",
},
},
},
},
RestartPolicy: v1.RestartPolicyNever,
Volumes: []v1.Volume{
{
Name: "my-volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
ReadOnly: false,
},
},
},
},
},
}
if node.Name != "" {
pod.Spec.NodeName = node.Name
}
if len(node.Selector) != 0 {
pod.Spec.NodeSelector = node.Selector
}
return cs.CoreV1().Pods(ns).Create(pod)
}
// checkPodInfo tests that NodePublish was called with expected volume_context
func checkPodInfo(cs clientset.Interface, namespace, driverPodName, driverContainerName string, pod *v1.Pod, expectPodInfo bool) error {
expectedAttributes := map[string]string{
@ -508,7 +765,7 @@ func checkPodInfo(cs clientset.Interface, namespace, driverPodName, driverContai
}
func waitForCSIDriver(cs clientset.Interface, driverName string) error {
timeout := 2 * time.Minute
timeout := 4 * time.Minute
framework.Logf("waiting up to %v for CSIDriver %q", timeout, driverName)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(framework.Poll) {

View File

@ -169,36 +169,52 @@ func (h *hostpathCSIDriver) PrepareTest(f *framework.Framework) (*testsuites.Per
// mockCSI
type mockCSIDriver struct {
driverInfo testsuites.DriverInfo
manifests []string
podInfo *bool
attachable bool
attachLimit int
driverInfo testsuites.DriverInfo
manifests []string
podInfo *bool
attachable bool
attachLimit int
enableNodeExpansion bool
}
// CSIMockDriverOpts defines options used for csi driver
type CSIMockDriverOpts struct {
RegisterDriver bool
DisableAttach bool
PodInfo *bool
AttachLimit int
EnableResizing bool
EnableNodeExpansion bool
}
var _ testsuites.TestDriver = &mockCSIDriver{}
var _ testsuites.DynamicPVTestDriver = &mockCSIDriver{}
// InitMockCSIDriver returns a mockCSIDriver that implements TestDriver interface
func InitMockCSIDriver(registerDriver, driverAttachable bool, podInfo *bool, attachLimit int) testsuites.TestDriver {
func InitMockCSIDriver(driverOpts CSIMockDriverOpts) testsuites.TestDriver {
driverManifests := []string{
"test/e2e/testing-manifests/storage-csi/cluster-driver-registrar/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/driver-registrar/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/mock/csi-mock-rbac.yaml",
"test/e2e/testing-manifests/storage-csi/mock/csi-storageclass.yaml",
"test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml",
}
if registerDriver {
if driverOpts.RegisterDriver {
driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-cluster-driver-registrar.yaml")
}
if driverAttachable {
if !driverOpts.DisableAttach {
driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-attacher.yaml")
}
if driverOpts.EnableResizing {
driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-resizer.yaml")
}
return &mockCSIDriver{
driverInfo: testsuites.DriverInfo{
Name: "csi-mock",
@ -213,10 +229,11 @@ func InitMockCSIDriver(registerDriver, driverAttachable bool, podInfo *bool, att
testsuites.CapExec: false,
},
},
manifests: driverManifests,
podInfo: podInfo,
attachable: driverAttachable,
attachLimit: attachLimit,
manifests: driverManifests,
podInfo: driverOpts.PodInfo,
attachable: !driverOpts.DisableAttach,
attachLimit: driverOpts.AttachLimit,
enableNodeExpansion: driverOpts.EnableNodeExpansion,
}
}
@ -264,6 +281,10 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*testsuites.PerTest
containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit))
}
if m.enableNodeExpansion {
containerArgs = append(containerArgs, "--node-expand-required=true")
}
// TODO (?): the storage.csi.image.version and storage.csi.image.registry
// settings are ignored for this test. We could patch the image definitions.
o := utils.PatchCSIOptions{

View File

@ -159,7 +159,7 @@ var _ = utils.SIGDescribe("Mounted flexvolume expand[Slow]", func() {
}
By("Waiting for cloudprovider resize to finish")
err = waitForControllerVolumeResize(pvc, c)
err = waitForControllerVolumeResize(pvc, c, totalResizeWaitPeriod)
Expect(err).NotTo(HaveOccurred(), "While waiting for pvc resize to finish")
By("Getting a pod from deployment")

View File

@ -164,7 +164,7 @@ var _ = utils.SIGDescribe("Mounted flexvolume volume expand [Slow] [Feature:Expa
}
By("Waiting for cloudprovider resize to finish")
err = waitForControllerVolumeResize(pvc, c)
err = waitForControllerVolumeResize(pvc, c, totalResizeWaitPeriod)
Expect(err).NotTo(HaveOccurred(), "While waiting for pvc resize to finish")
By("Waiting for file system resize to finish")

View File

@ -136,7 +136,7 @@ var _ = utils.SIGDescribe("Mounted volume expand", func() {
}
By("Waiting for cloudprovider resize to finish")
err = waitForControllerVolumeResize(pvc, c)
err = waitForControllerVolumeResize(pvc, c, totalResizeWaitPeriod)
Expect(err).NotTo(HaveOccurred(), "While waiting for pvc resize to finish")
By("Getting a pod from deployment")

View File

@ -136,7 +136,7 @@ var _ = utils.SIGDescribe("Volume expand", func() {
}
By("Waiting for cloudprovider resize to finish")
err = waitForControllerVolumeResize(pvc, c)
err = waitForControllerVolumeResize(pvc, c, totalResizeWaitPeriod)
Expect(err).NotTo(HaveOccurred(), "While waiting for pvc resize to finish")
By("Checking for conditions on pvc")
@ -198,9 +198,29 @@ func expandPVCSize(origPVC *v1.PersistentVolumeClaim, size resource.Quantity, c
return updatedPVC, waitErr
}
func waitForControllerVolumeResize(pvc *v1.PersistentVolumeClaim, c clientset.Interface) error {
func waitForResizingCondition(pvc *v1.PersistentVolumeClaim, c clientset.Interface, duration time.Duration) error {
waitErr := wait.PollImmediate(resizePollInterval, duration, func() (bool, error) {
var err error
updatedPVC, err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(pvc.Name, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("error fetching pvc %q for checking for resize status : %v", pvc.Name, err)
}
pvcConditions := updatedPVC.Status.Conditions
if len(pvcConditions) > 0 {
if pvcConditions[0].Type == v1.PersistentVolumeClaimResizing {
return true, nil
}
}
return false, nil
})
return waitErr
}
func waitForControllerVolumeResize(pvc *v1.PersistentVolumeClaim, c clientset.Interface, duration time.Duration) error {
pvName := pvc.Spec.VolumeName
return wait.PollImmediate(resizePollInterval, totalResizeWaitPeriod, func() (bool, error) {
return wait.PollImmediate(resizePollInterval, duration, func() (bool, error) {
pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
pv, err := c.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})

View File

@ -0,0 +1 @@
The original file is (or will be) https://github.com/kubernetes-csi/external-resizer/blob/master/deploy/kubernetes/rbac.yaml

View File

@ -0,0 +1,90 @@
# This YAML file contains all RBAC objects that are necessary to run external
# CSI resizer.
#
# In production, each CSI driver deployment has to be customized:
# - to avoid conflicts, use non-default namespace and different names
# for non-namespaced entities like the ClusterRole
# - decide whether the deployment replicates the external CSI
# resizer, in which case leadership election must be enabled;
# this influences the RBAC setup, see below
apiVersion: v1
kind: ServiceAccount
metadata:
name: csi-resizer
# replace with non-default namespace name
namespace: default
---
# Resizer must be able to work with PVCs, PVs, SCs.
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: external-resizer-runner
rules:
# The following rule should be uncommented for plugins that require secrets
# for provisioning.
# - apiGroups: [""]
# resources: ["secrets"]
# verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumeclaims/status"]
verbs: ["update", "patch"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["list", "watch", "create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-resizer-role
subjects:
- kind: ServiceAccount
name: csi-resizer
# replace with non-default namespace name
namespace: default
roleRef:
kind: ClusterRole
name: external-resizer-runner
apiGroup: rbac.authorization.k8s.io
---
# Resizer must be able to work with end point in current namespace
# if (and only if) leadership election is enabled
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
# replace with non-default namespace name
namespace: default
name: external-resizer-cfg
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "watch", "list", "delete", "update", "create"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-resizer-role-cfg
# replace with non-default namespace name
namespace: default
subjects:
- kind: ServiceAccount
name: csi-resizer
# replace with non-default namespace name
namespace: default
roleRef:
kind: Role
name: external-resizer-cfg
apiGroup: rbac.authorization.k8s.io

View File

@ -0,0 +1,33 @@
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: csi-mockplugin-resizer
spec:
selector:
matchLabels:
app: csi-mockplugin-resizer
replicas: 1
template:
metadata:
labels:
app: csi-mockplugin-resizer
spec:
serviceAccountName: csi-mock
containers:
- name: csi-resizer
image: quay.io/k8scsi/csi-resizer:canary
args:
- "--v=5"
- "--csi-address=$(ADDRESS)"
env:
- name: ADDRESS
value: /csi/csi.sock
imagePullPolicy: Always
volumeMounts:
- mountPath: /csi
name: socket-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-mock
type: DirectoryOrCreate
name: socket-dir

View File

@ -54,7 +54,7 @@ spec:
- name: mock
image: quay.io/k8scsi/mock-driver:v1.0.0-1
image: quay.io/k8scsi/mock-driver:v1.1.1
env:
- name: CSI_ENDPOINT
value: /csi/csi.sock

View File

@ -59,3 +59,16 @@ roleRef:
kind: ClusterRole
name: e2e-test-privileged-psp
apiGroup: rbac.authorization.k8s.io
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-controller-resizer-role
subjects:
- kind: ServiceAccount
name: csi-mock
namespace: default
roleRef:
kind: ClusterRole
name: external-resizer-runner
apiGroup: rbac.authorization.k8s.io

File diff suppressed because it is too large Load Diff