From 405d33eae4ebc4b7d1fe18ee0f34ffae8e6da7e5 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 16 May 2019 17:42:16 -0400 Subject: [PATCH 1/4] Add code to handle in-tree to CSI migration for resizing --- cmd/kube-controller-manager/app/core.go | 1 + pkg/controller/volume/expand/BUILD | 17 ++++- .../volume/expand/expand_controller.go | 67 ++++++++++++++++--- .../volume/expand/expand_controller_test.go | 47 +++++++++++++ pkg/volume/util/resize_util.go | 38 +++++++++++ pkg/volume/util/types/types.go | 7 ++ 6 files changed, 166 insertions(+), 11 deletions(-) create mode 100644 pkg/controller/volume/expand/expand_controller_test.go diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 99a4ac166a..b21feca75e 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -244,6 +244,7 @@ func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, err ctx.ClientBuilder.ClientOrDie("expand-controller"), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumes(), + ctx.InformerFactory.Storage().V1().StorageClasses(), ctx.Cloud, ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)) diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index 5f3a8b7b5d..185fcb75ca 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -1,6 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -10,6 +10,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/volume/expand", deps = [ + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/volume/events:go_default_library", "//pkg/controller/volume/expand/cache:go_default_library", @@ -27,14 +28,17 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", + "//staging/src/k8s.io/csi-translation-lib:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -54,3 +58,14 @@ filegroup( ], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["expand_controller_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/controller:go_default_library", + "//pkg/controller/volume/attachdetach/testing:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + ], +) diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 2332f322fa..29997d26eb 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -30,14 +30,19 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" + storageclassinformer "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" + storagelisters "k8s.io/client-go/listers/storage/v1" + "k8s.io/client-go/tools/cache" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" + csitranslation "k8s.io/csi-translation-lib" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/events" "k8s.io/kubernetes/pkg/util/mount" @@ -72,6 +77,10 @@ type expandController struct { pvLister corelisters.PersistentVolumeLister pvSynced kcache.InformerSynced + // storageClass lister for fetching provisioner name + classLister storagelisters.StorageClassLister + classListerSynced cache.InformerSynced + // cloud provider used by volume host cloud cloudprovider.Interface @@ -90,17 +99,20 @@ func NewExpandController( kubeClient clientset.Interface, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, + scInformer storageclassinformer.StorageClassInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin) (ExpandController, error) { expc := &expandController{ - kubeClient: kubeClient, - cloud: cloud, - pvcLister: pvcInformer.Lister(), - pvcsSynced: pvcInformer.Informer().HasSynced, - pvLister: pvInformer.Lister(), - pvSynced: pvInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), + kubeClient: kubeClient, + cloud: cloud, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + pvSynced: pvInformer.Informer().HasSynced, + classLister: scInformer.Lister(), + classListerSynced: scInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), } if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil { @@ -201,14 +213,29 @@ func (expc *expandController) syncHandler(key string) error { klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err) return err } + if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID { err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc)) klog.V(4).Infof("%v", err) return err } + claimClass := v1helper.GetPersistentVolumeClaimClass(pvc) + if claimClass == "" { + klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", util.ClaimToClaimKey(pvc)) + return nil + } + + class, err := expc.classLister.Get(claimClass) + if err != nil { + klog.V(4).Infof("volume expansion is not supported for PVC: %s, can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass) + return nil + } + volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) + volumeResizerName := class.Provisioner + if err != nil || volumePlugin == nil { msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " + "waiting for an external controller to process this PVC") @@ -221,11 +248,31 @@ func (expc *expandController) syncHandler(key string) error { return err } - return expc.expand(pvc, pv) + if volumePlugin.IsMigratedToCSI() { + msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName) + expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg) + csiResizerName, err := csitranslation.GetCSINameFromInTreeName(class.Provisioner) + if err != nil { + errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) + expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) + return fmt.Errorf(errorMsg) + } + + pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient) + if err != nil { + errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) + expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) + return fmt.Errorf(errorMsg) + + } + return nil + } + + return expc.expand(pvc, pv, volumeResizerName) } -func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error { - pvc, err := util.MarkResizeInProgress(pvc, expc.kubeClient) +func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error { + pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient) if err != nil { klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) return err diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go new file mode 100644 index 0000000000..c01d4f69f7 --- /dev/null +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package expand + +import ( + "testing" + + "k8s.io/client-go/informers" + "k8s.io/kubernetes/pkg/controller" + controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" +) + +func TestSyncHandler(t *testing.T) { + fakeKubeClient := controllervolumetesting.CreateTestClient() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvInformer := informerFactory.Core().V1().PersistentVolumes() + storageClassInformer := informerFactory.Storage().V1().StorageClasses() + + expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, nil) + if err != nil { + t.Fatalf("error creating expand controller : %v", err) + } + + var expController *expandController + expController, _ = expc.(*expandController) + + err = expController.syncHandler("default/foo") + if err != nil { + t.Fatalf("error running sync handler : %v", err) + } + +} diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index 52398412e5..f3ac367801 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/resizefs" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) var ( @@ -97,6 +98,43 @@ func MarkResizeInProgress( return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) } +// MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress +// and also annotates the PVC with the name of the resizer. +func MarkResizeInProgressWithResizer( + pvc *v1.PersistentVolumeClaim, + resizerName string, + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + // Mark PVC as Resize Started + progressCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimResizing, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } + conditions := []v1.PersistentVolumeClaimCondition{progressCondition} + newPVC := pvc.DeepCopy() + newPVC = MergeResizeConditionOnPVC(newPVC, conditions) + newPVC = setResizer(newPVC, resizerName) + return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) +} + +// SetClaimResizer sets resizer annotation on PVC +func SetClaimResizer( + pvc *v1.PersistentVolumeClaim, + resizerName string, + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + newPVC := pvc.DeepCopy() + newPVC = setResizer(newPVC, resizerName) + return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) +} + +func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.PersistentVolumeClaim { + if val, ok := pvc.Annotations[volumetypes.VolumeResizerKey]; ok && val == resizerName { + return pvc + } + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volumetypes.VolumeResizerKey, resizerName) + return pvc +} + // MarkForFSResize marks file system resizing as pending func MarkForFSResize( pvc *v1.PersistentVolumeClaim, diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 8afea9bac2..2811cd1dc6 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -50,3 +50,10 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { defer runtime.RecoverFromPanic(&detailedErr) return o.OperationFunc() } + +const ( + // VolumeResizerKey is key that will be used to store resizer used + // for resizing PVC. The generated key/value pair will be added + // as a annotation to the PVC. + VolumeResizerKey = "volume.kubernetes.io/storage-resizer" +) From 33b95ebddb56202b704c19d362cc1fc5f75c031e Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Fri, 17 May 2019 15:14:17 -0400 Subject: [PATCH 2/4] add some tests for expand controller Update bazel file --- pkg/controller/volume/expand/BUILD | 11 ++ .../volume/expand/expand_controller.go | 4 +- .../volume/expand/expand_controller_test.go | 153 ++++++++++++++++-- pkg/volume/util/operationexecutor/BUILD | 1 + .../util/operationexecutor/fakegenerator.go | 115 +++++++++++++ .../operationexecutor/operation_generator.go | 27 +++- 6 files changed, 292 insertions(+), 19 deletions(-) create mode 100644 pkg/volume/util/operationexecutor/fakegenerator.go diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index 185fcb75ca..7034d0de45 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -66,6 +66,17 @@ go_test( deps = [ "//pkg/controller:go_default_library", "//pkg/controller/volume/attachdetach/testing:go_default_library", + "//pkg/features:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/awsebs:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", ], ) diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 29997d26eb..af278693b8 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -228,7 +228,7 @@ func (expc *expandController) syncHandler(key string) error { class, err := expc.classLister.Get(claimClass) if err != nil { - klog.V(4).Infof("volume expansion is not supported for PVC: %s, can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass) + klog.V(4).Infof("volume expansion is not supported for PVC: %s;can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass) return nil } @@ -297,7 +297,7 @@ func (expc *expandController) Run(stopCh <-chan struct{}) { klog.Infof("Starting expand controller") defer klog.Infof("Shutting down expand controller") - if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) { + if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) { return } diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go index c01d4f69f7..517e23ac4c 100644 --- a/pkg/controller/volume/expand/expand_controller_test.go +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -17,11 +17,23 @@ limitations under the License. package expand import ( + "regexp" "testing" + "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" + featuregatetesting "k8s.io/component-base/featuregate/testing" + csitranslationplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/controller" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/awsebs" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) func TestSyncHandler(t *testing.T) { @@ -31,17 +43,140 @@ func TestSyncHandler(t *testing.T) { pvInformer := informerFactory.Core().V1().PersistentVolumes() storageClassInformer := informerFactory.Storage().V1().StorageClasses() - expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, nil) - if err != nil { - t.Fatalf("error creating expand controller : %v", err) + tests := []struct { + name string + csiMigrationEnabled bool + expansionCalled bool + storageClass *storagev1.StorageClass + pvcKey string + pv *v1.PersistentVolume + pvc *v1.PersistentVolumeClaim + hasError bool + }{ + { + name: "when pvc has no PV binding", + pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", "", ""), + pvcKey: "default/no-pv-pvc", + hasError: true, + }, + { + name: "when pvc has no storageclass", + pv: getFakePersistentVolume("vol-1", csitranslationplugins.AWSEBSInTreePluginName, "no-sc-pvc-vol-1"), + pvc: getFakePersistentVolumeClaim("no-sc-pvc", "vol-1", "", "no-sc-pvc-vol-1"), + pvcKey: "default/no-sc-pvc", + }, + { + name: "when pvc storageclass is missing", + pv: getFakePersistentVolume("vol-2", csitranslationplugins.AWSEBSInTreePluginName, "missing-sc-pvc-vol-2"), + pvc: getFakePersistentVolumeClaim("missing-sc-pvc", "vol-2", "resizable", "missing-sc-pvc-vol-2"), + pvcKey: "default/missing-sc-pvc", + }, + { + name: "when pvc and pv has everything for in-tree plugin", + pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"), + pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"), + storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName), + pvcKey: "default/good-pvc", + expansionCalled: true, + }, + { + name: "when csi migration is enabled for a in-tree plugin", + csiMigrationEnabled: true, + pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "csi-pvc-vol-4"), + pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "resizable3", "csi-pvc-vol-4"), + storageClass: getFakeStorageClass("resizable3", csitranslationplugins.AWSEBSInTreePluginName), + pvcKey: "default/csi-pvc", + }, } - var expController *expandController - expController, _ = expc.(*expandController) + for _, tc := range tests { + test := tc + if tc.pv != nil { + informerFactory.Core().V1().PersistentVolumes().Informer().GetIndexer().Add(tc.pv) + } - err = expController.syncHandler("default/foo") - if err != nil { - t.Fatalf("error running sync handler : %v", err) + if tc.pvc != nil { + informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(tc.pvc) + } + allPlugins := []volume.VolumePlugin{} + allPlugins = append(allPlugins, awsebs.ProbeVolumePlugins()...) + if tc.storageClass != nil { + informerFactory.Storage().V1().StorageClasses().Informer().GetIndexer().Add(tc.storageClass) + } + expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, allPlugins) + if err != nil { + t.Fatalf("error creating expand controller : %v", err) + } + + if test.csiMigrationEnabled { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)() + } + + var expController *expandController + expController, _ = expc.(*expandController) + var expansionCalled bool + expController.operationGenerator = operationexecutor.NewFakeOgCounter(func() (error, error) { + expansionCalled = true + return nil, nil + }) + + err = expController.syncHandler(test.pvcKey) + if err != nil && !test.hasError { + t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err) + } + if expansionCalled != test.expansionCalled { + t.Fatalf("for: %s; expected expansionCalled to be %v but was %v", test.name, test.expansionCalled, expansionCalled) + } + } +} + +func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: volumeName}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{}, + ClaimRef: &v1.ObjectReference{ + Namespace: "default", + }, + }, + } + if pvcUID != "" { + pv.Spec.ClaimRef.UID = pvcUID + } + + if matched, _ := regexp.MatchString(`csi`, pluginName); matched { + pv.Spec.PersistentVolumeSource.CSI = &v1.CSIPersistentVolumeSource{ + Driver: pluginName, + VolumeHandle: volumeName, + } + } else { + pv.Spec.PersistentVolumeSource.AWSElasticBlockStore = &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: volumeName, + FSType: "ext4", + } + } + return pv +} + +func getFakePersistentVolumeClaim(pvcName, volumeName, scName string, uid types.UID) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default", UID: uid}, + Spec: v1.PersistentVolumeClaimSpec{}, + } + if volumeName != "" { + pvc.Spec.VolumeName = volumeName + } + + if scName != "" { + pvc.Spec.StorageClassName = &scName + } + return pvc +} + +func getFakeStorageClass(scName, pluginName string) *storagev1.StorageClass { + return &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{Name: scName}, + Provisioner: pluginName, } - } diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index 8ae0ce393b..8c12938b53 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -9,6 +9,7 @@ load( go_library( name = "go_default_library", srcs = [ + "fakegenerator.go", "operation_executor.go", "operation_generator.go", ], diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go new file mode 100644 index 0000000000..03cf02e2ae --- /dev/null +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -0,0 +1,115 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationexecutor + +import ( + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" +) + +// fakeOgCounter is a simple OperationGenerator which counts number of times a function +// has been caled +type fakeOgCounter struct { + // calledFuncs stores name and count of functions + calledFuncs map[string]int + opFunc func() (error, error) +} + +var _ OperationGenerator = &fakeOgCounter{} + +// NewFakeOgCounter returns a OperationGenerator +func NewFakeOgCounter(opFunc func() (error, error)) OperationGenerator { + return &fakeOgCounter{ + calledFuncs: map[string]int{}, + opFunc: opFunc, + } +} + +func (f *fakeOgCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { + return f.recordFuncCall("GenerateMountVolumeFunc") +} + +func (f *fakeOgCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmountVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { + return f.recordFuncCall("GenerateAttachVolumeFunc") +} + +func (f *fakeOgCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateDetachVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateVolumesAreAttachedFunc"), nil +} + +func (f *fakeOgCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmountDeviceFunc"), nil +} + +func (f *fakeOgCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateVerifyControllerAttachedVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateMapVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmapVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmapDeviceFunc"), nil +} + +func (f *fakeOgCounter) GetVolumePluginMgr() *volume.VolumePluginMgr { + return nil +} + +func (f *fakeOgCounter) GenerateBulkVolumeVerifyFunc( + map[types.NodeName][]*volume.Spec, + string, + map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateBulkVolumeVerifyFunc"), nil +} + +func (f *fakeOgCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateExpandVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateExpandVolumeFSWithoutUnmountingFunc"), nil +} + +func (f *fakeOgCounter) recordFuncCall(name string) volumetypes.GeneratedOperations { + if _, ok := f.calledFuncs[name]; ok { + f.calledFuncs[name]++ + } + ops := volumetypes.GeneratedOperations{ + OperationName: name, + OperationFunc: f.opFunc, + } + return ops +} diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 7300d84399..9ae816bffd 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -683,7 +683,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( // resizeFileSystem will resize the file system if user has requested a resize of // underlying persistent volume and is allowed to do so. - resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName) + resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError) @@ -721,7 +721,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( // - Volume does not support DeviceMounter interface. // - In case of CSI the volume does not have node stage_unstage capability. if !resizeDone { - resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName) + resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError) return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) @@ -760,7 +760,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } } -func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions, pluginName string) (bool, error) { +func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions) (bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName) return true, nil @@ -1577,6 +1577,18 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + // Need to translate the spec here if the plugin is migrated so that the metrics + // emitted show the correct (migrated) plugin + if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { + csiSpec, err := translateSpec(volumeToMount.VolumeSpec) + if err == nil { + volumeToMount.VolumeSpec = csiSpec + } + // If we have an error here we ignore it, the metric emitted will then be for the + // in-tree plugin. This error case(skipped one) will also trigger an error + // while the generated function is executed. And those errors will be handled during the execution of the generated + // function with a back off policy. + } volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err != nil || volumePlugin == nil { @@ -1603,7 +1615,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( return volumeToMount.GenerateError("VolumeFSResize.GetDeviceMountPath failed", err) } resizeOptions.DeviceMountPath = dmp - resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName()) + resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) if simpleErr != nil || detailedErr != nil { return simpleErr, detailedErr } @@ -1623,7 +1635,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( resizeOptions.DeviceMountPath = volumeMounter.GetPath() resizeOptions.CSIVolumePhase = volume.CSIVolumePublished - resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName()) + resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) if simpleErr != nil || detailedErr != nil { return simpleErr, detailedErr } @@ -1651,9 +1663,8 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, - resizeOptions volume.NodeResizeOptions, - pluginName string) (bool, error, error) { - resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions, pluginName) + resizeOptions volume.NodeResizeOptions) (bool, error, error) { + resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions) if err != nil { klog.Errorf("VolumeFSResize.resizeFileSystem failed : %v", err) e1, e2 := volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed", err) From d119899e2294c66ebb917e1f1306c635640064e0 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Tue, 28 May 2019 15:21:23 -0400 Subject: [PATCH 3/4] handle review comments --- pkg/controller/volume/expand/BUILD | 4 + .../volume/expand/expand_controller.go | 9 +- .../volume/expand/expand_controller_test.go | 86 +++++++++++++++---- .../util/operationexecutor/fakegenerator.go | 44 +++++----- pkg/volume/util/resize_util.go | 16 ---- 5 files changed, 103 insertions(+), 56 deletions(-) diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index 7034d0de45..38c33353a3 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -70,12 +70,16 @@ go_test( "//pkg/volume:go_default_library", "//pkg/volume/awsebs:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library", + "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", ], diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index af278693b8..4c305ff164 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -194,6 +194,8 @@ func (expc *expandController) processNextWorkItem() bool { return true } +// syncHandler performs actual expansion of volume. If an error is returned +// from this function - PVC will be requeued for resizing. func (expc *expandController) syncHandler(key string) error { namespace, name, err := kcache.SplitMetaNamespaceKey(key) if err != nil { @@ -228,7 +230,7 @@ func (expc *expandController) syncHandler(key string) error { class, err := expc.classLister.Get(claimClass) if err != nil { - klog.V(4).Infof("volume expansion is not supported for PVC: %s;can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass) + klog.V(4).Infof("failed to expand PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err) return nil } @@ -245,7 +247,9 @@ func (expc *expandController) syncHandler(key string) error { } expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg)) klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg) - return err + // If we are expecting that an external plugin will handle resizing this volume then + // is no point in requeuing this PVC. + return nil } if volumePlugin.IsMigratedToCSI() { @@ -263,7 +267,6 @@ func (expc *expandController) syncHandler(key string) error { errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) return fmt.Errorf(errorMsg) - } return nil } diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go index 517e23ac4c..15cd524f30 100644 --- a/pkg/controller/volume/expand/expand_controller_test.go +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -17,15 +17,21 @@ limitations under the License. package expand import ( + "encoding/json" + "fmt" + "reflect" "regexp" "testing" "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" + coretesting "k8s.io/client-go/testing" featuregatetesting "k8s.io/component-base/featuregate/testing" csitranslationplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/controller" @@ -34,24 +40,20 @@ import ( "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/awsebs" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) func TestSyncHandler(t *testing.T) { - fakeKubeClient := controllervolumetesting.CreateTestClient() - informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - pvInformer := informerFactory.Core().V1().PersistentVolumes() - storageClassInformer := informerFactory.Storage().V1().StorageClasses() - tests := []struct { name string csiMigrationEnabled bool - expansionCalled bool storageClass *storagev1.StorageClass pvcKey string pv *v1.PersistentVolume pvc *v1.PersistentVolumeClaim + expansionCalled bool hasError bool + expectedAnnotation map[string]string }{ { name: "when pvc has no PV binding", @@ -72,12 +74,13 @@ func TestSyncHandler(t *testing.T) { pvcKey: "default/missing-sc-pvc", }, { - name: "when pvc and pv has everything for in-tree plugin", - pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"), - pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"), - storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName), - pvcKey: "default/good-pvc", - expansionCalled: true, + name: "when pvc and pv has everything for in-tree plugin", + pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"), + pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"), + storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName), + pvcKey: "default/good-pvc", + expansionCalled: true, + expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSInTreePluginName}, }, { name: "when csi migration is enabled for a in-tree plugin", @@ -86,17 +89,34 @@ func TestSyncHandler(t *testing.T) { pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "resizable3", "csi-pvc-vol-4"), storageClass: getFakeStorageClass("resizable3", csitranslationplugins.AWSEBSInTreePluginName), pvcKey: "default/csi-pvc", + expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSDriverName}, + }, + { + name: "for csi plugin without migration path", + pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "ceph-csi-pvc-vol-5"), + pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "resizable4", "ceph-csi-pvc-vol-5"), + storageClass: getFakeStorageClass("resizable4", "com.csi.ceph"), + pvcKey: "default/ceph-csi-pvc", + expansionCalled: false, + hasError: false, }, } for _, tc := range tests { test := tc + fakeKubeClient := controllervolumetesting.CreateTestClient() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvInformer := informerFactory.Core().V1().PersistentVolumes() + storageClassInformer := informerFactory.Storage().V1().StorageClasses() + + pvc := test.pvc if tc.pv != nil { informerFactory.Core().V1().PersistentVolumes().Informer().GetIndexer().Add(tc.pv) } if tc.pvc != nil { - informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(tc.pvc) + informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(pvc) } allPlugins := []volume.VolumePlugin{} allPlugins = append(allPlugins, awsebs.ProbeVolumePlugins()...) @@ -116,21 +136,57 @@ func TestSyncHandler(t *testing.T) { var expController *expandController expController, _ = expc.(*expandController) var expansionCalled bool - expController.operationGenerator = operationexecutor.NewFakeOgCounter(func() (error, error) { + expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() (error, error) { expansionCalled = true return nil, nil }) + fakeKubeClient.AddReactor("patch", "persistentvolumeclaims", func(action coretesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + patchActionaction, _ := action.(coretesting.PatchAction) + pvc, err = applyPVCPatch(pvc, patchActionaction.GetPatch()) + if err != nil { + return false, nil, err + } + return true, pvc, nil + } + return true, pvc, nil + }) + err = expController.syncHandler(test.pvcKey) if err != nil && !test.hasError { t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err) } + + if err == nil && test.hasError { + t.Fatalf("for: %s; unexpected success", test.name) + } if expansionCalled != test.expansionCalled { t.Fatalf("for: %s; expected expansionCalled to be %v but was %v", test.name, test.expansionCalled, expansionCalled) } + + if len(test.expectedAnnotation) != 0 && !reflect.DeepEqual(test.expectedAnnotation, pvc.Annotations) { + t.Fatalf("for: %s; expected %v annotations, got %v", test.name, test.expectedAnnotation, pvc.Annotations) + } } } +func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.PersistentVolumeClaim, error) { + pvcData, err := json.Marshal(originalPVC) + if err != nil { + return nil, fmt.Errorf("failed to marshal pvc with %v", err) + } + updated, err := strategicpatch.StrategicMergePatch(pvcData, patch, v1.PersistentVolumeClaim{}) + if err != nil { + return nil, fmt.Errorf("failed to apply patch on pvc %v", err) + } + updatedPVC := &v1.PersistentVolumeClaim{} + if err := json.Unmarshal(updated, updatedPVC); err != nil { + return nil, fmt.Errorf("failed to unmarshal updated pvc : %v", err) + } + return updatedPVC, nil +} + func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume { pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{Name: volumeName}, diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go index 03cf02e2ae..0dc2c9e414 100644 --- a/pkg/volume/util/operationexecutor/fakegenerator.go +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -26,84 +26,84 @@ import ( volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) -// fakeOgCounter is a simple OperationGenerator which counts number of times a function +// fakeOGCounter is a simple OperationGenerator which counts number of times a function // has been caled -type fakeOgCounter struct { +type fakeOGCounter struct { // calledFuncs stores name and count of functions calledFuncs map[string]int opFunc func() (error, error) } -var _ OperationGenerator = &fakeOgCounter{} +var _ OperationGenerator = &fakeOGCounter{} -// NewFakeOgCounter returns a OperationGenerator -func NewFakeOgCounter(opFunc func() (error, error)) OperationGenerator { - return &fakeOgCounter{ +// NewFakeOGCounter returns a OperationGenerator +func NewFakeOGCounter(opFunc func() (error, error)) OperationGenerator { + return &fakeOGCounter{ calledFuncs: map[string]int{}, opFunc: opFunc, } } -func (f *fakeOgCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { +func (f *fakeOGCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { return f.recordFuncCall("GenerateMountVolumeFunc") } -func (f *fakeOgCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateUnmountVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { +func (f *fakeOGCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { return f.recordFuncCall("GenerateAttachVolumeFunc") } -func (f *fakeOgCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateDetachVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateVolumesAreAttachedFunc"), nil } -func (f *fakeOgCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateUnmountDeviceFunc"), nil } -func (f *fakeOgCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateVerifyControllerAttachedVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateMapVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateUnmapVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateUnmapDeviceFunc"), nil } -func (f *fakeOgCounter) GetVolumePluginMgr() *volume.VolumePluginMgr { +func (f *fakeOGCounter) GetVolumePluginMgr() *volume.VolumePluginMgr { return nil } -func (f *fakeOgCounter) GenerateBulkVolumeVerifyFunc( +func (f *fakeOGCounter) GenerateBulkVolumeVerifyFunc( map[types.NodeName][]*volume.Spec, string, map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateBulkVolumeVerifyFunc"), nil } -func (f *fakeOgCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateExpandVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateExpandVolumeFSWithoutUnmountingFunc"), nil } -func (f *fakeOgCounter) recordFuncCall(name string) volumetypes.GeneratedOperations { +func (f *fakeOGCounter) recordFuncCall(name string) volumetypes.GeneratedOperations { if _, ok := f.calledFuncs[name]; ok { f.calledFuncs[name]++ } diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index f3ac367801..b35daba380 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -82,22 +82,6 @@ func UpdatePVSize( return nil } -// MarkResizeInProgress marks cloudprovider resizing as in progress -func MarkResizeInProgress( - pvc *v1.PersistentVolumeClaim, - kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { - // Mark PVC as Resize Started - progressCondition := v1.PersistentVolumeClaimCondition{ - Type: v1.PersistentVolumeClaimResizing, - Status: v1.ConditionTrue, - LastTransitionTime: metav1.Now(), - } - conditions := []v1.PersistentVolumeClaimCondition{progressCondition} - newPVC := pvc.DeepCopy() - newPVC = MergeResizeConditionOnPVC(newPVC, conditions) - return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) -} - // MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress // and also annotates the PVC with the name of the resizer. func MarkResizeInProgressWithResizer( From 7563b4d01b5e5dde148f746ccb1ff47c5b5b9272 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 29 May 2019 11:01:45 -0400 Subject: [PATCH 4/4] Address comments about retry in online resizing block --- .../volume/expand/expand_controller_test.go | 3 ++ .../operationexecutor/operation_generator.go | 52 ++++++++++++------- 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go index 15cd524f30..5226e94e89 100644 --- a/pkg/controller/volume/expand/expand_controller_test.go +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -131,6 +131,9 @@ func TestSyncHandler(t *testing.T) { if test.csiMigrationEnabled { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)() + } else { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)() } var expController *expandController diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 9ae816bffd..5704be5425 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -1577,25 +1577,23 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { - // Need to translate the spec here if the plugin is migrated so that the metrics - // emitted show the correct (migrated) plugin - if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { - csiSpec, err := translateSpec(volumeToMount.VolumeSpec) - if err == nil { - volumeToMount.VolumeSpec = csiSpec - } - // If we have an error here we ignore it, the metric emitted will then be for the - // in-tree plugin. This error case(skipped one) will also trigger an error - // while the generated function is executed. And those errors will be handled during the execution of the generated - // function with a back off policy. - } - volumePlugin, err := - og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) - if err != nil || volumePlugin == nil { - return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err) - } fsResizeFunc := func() (error, error) { + // Need to translate the spec here if the plugin is migrated so that the metrics + // emitted show the correct (migrated) plugin + if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { + csiSpec, err := translateSpec(volumeToMount.VolumeSpec) + if err != nil { + return volumeToMount.GenerateError("VolumeFSResize.translateSpec failed", err) + } + volumeToMount.VolumeSpec = csiSpec + } + volumePlugin, err := + og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) + if err != nil || volumePlugin == nil { + return volumeToMount.GenerateError("VolumeFSResize.FindPluginBySpec failed", err) + } + var resizeDone bool var simpleErr, detailedErr error resizeOptions := volume.NodeResizeOptions{ @@ -1643,7 +1641,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( return nil, nil } // This is a placeholder error - we should NEVER reach here. - err := fmt.Errorf("volume resizing failed for unknown reason") + err = fmt.Errorf("volume resizing failed for unknown reason") return volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed to resize volume", err) } @@ -1653,6 +1651,24 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( } } + // Need to translate the spec here if the plugin is migrated so that the metrics + // emitted show the correct (migrated) plugin + if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { + csiSpec, err := translateSpec(volumeToMount.VolumeSpec) + if err == nil { + volumeToMount.VolumeSpec = csiSpec + } + // If we have an error here we ignore it, the metric emitted will then be for the + // in-tree plugin. This error case(skipped one) will also trigger an error + // while the generated function is executed. And those errors will be handled during the execution of the generated + // function with a back off policy. + } + volumePlugin, err := + og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) + if err != nil || volumePlugin == nil { + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err) + } + return volumetypes.GeneratedOperations{ OperationName: "volume_fs_resize", OperationFunc: fsResizeFunc,