diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 85cc74a826..81200ce4d9 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -244,6 +244,7 @@ func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, err ctx.ClientBuilder.ClientOrDie("expand-controller"), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumes(), + ctx.InformerFactory.Storage().V1().StorageClasses(), ctx.Cloud, ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)) diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index 5f3a8b7b5d..38c33353a3 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -1,6 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -10,6 +10,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/volume/expand", deps = [ + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/volume/events:go_default_library", "//pkg/controller/volume/expand/cache:go_default_library", @@ -27,14 +28,17 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", + "//staging/src/k8s.io/csi-translation-lib:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -54,3 +58,29 @@ filegroup( ], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["expand_controller_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/controller:go_default_library", + "//pkg/controller/volume/attachdetach/testing:go_default_library", + "//pkg/features:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/awsebs:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", + "//pkg/volume/util/types:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", + ], +) diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 2332f322fa..4c305ff164 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -30,14 +30,19 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" + storageclassinformer "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" + storagelisters "k8s.io/client-go/listers/storage/v1" + "k8s.io/client-go/tools/cache" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" + csitranslation "k8s.io/csi-translation-lib" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/events" "k8s.io/kubernetes/pkg/util/mount" @@ -72,6 +77,10 @@ type expandController struct { pvLister corelisters.PersistentVolumeLister pvSynced kcache.InformerSynced + // storageClass lister for fetching provisioner name + classLister storagelisters.StorageClassLister + classListerSynced cache.InformerSynced + // cloud provider used by volume host cloud cloudprovider.Interface @@ -90,17 +99,20 @@ func NewExpandController( kubeClient clientset.Interface, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, + scInformer storageclassinformer.StorageClassInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin) (ExpandController, error) { expc := &expandController{ - kubeClient: kubeClient, - cloud: cloud, - pvcLister: pvcInformer.Lister(), - pvcsSynced: pvcInformer.Informer().HasSynced, - pvLister: pvInformer.Lister(), - pvSynced: pvInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), + kubeClient: kubeClient, + cloud: cloud, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + pvSynced: pvInformer.Informer().HasSynced, + classLister: scInformer.Lister(), + classListerSynced: scInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), } if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil { @@ -182,6 +194,8 @@ func (expc *expandController) processNextWorkItem() bool { return true } +// syncHandler performs actual expansion of volume. If an error is returned +// from this function - PVC will be requeued for resizing. func (expc *expandController) syncHandler(key string) error { namespace, name, err := kcache.SplitMetaNamespaceKey(key) if err != nil { @@ -201,14 +215,29 @@ func (expc *expandController) syncHandler(key string) error { klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err) return err } + if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID { err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc)) klog.V(4).Infof("%v", err) return err } + claimClass := v1helper.GetPersistentVolumeClaimClass(pvc) + if claimClass == "" { + klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", util.ClaimToClaimKey(pvc)) + return nil + } + + class, err := expc.classLister.Get(claimClass) + if err != nil { + klog.V(4).Infof("failed to expand PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err) + return nil + } + volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) + volumeResizerName := class.Provisioner + if err != nil || volumePlugin == nil { msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " + "waiting for an external controller to process this PVC") @@ -218,14 +247,35 @@ func (expc *expandController) syncHandler(key string) error { } expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg)) klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg) - return err + // If we are expecting that an external plugin will handle resizing this volume then + // is no point in requeuing this PVC. + return nil } - return expc.expand(pvc, pv) + if volumePlugin.IsMigratedToCSI() { + msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName) + expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg) + csiResizerName, err := csitranslation.GetCSINameFromInTreeName(class.Provisioner) + if err != nil { + errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) + expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) + return fmt.Errorf(errorMsg) + } + + pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient) + if err != nil { + errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) + expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) + return fmt.Errorf(errorMsg) + } + return nil + } + + return expc.expand(pvc, pv, volumeResizerName) } -func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error { - pvc, err := util.MarkResizeInProgress(pvc, expc.kubeClient) +func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error { + pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient) if err != nil { klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) return err @@ -250,7 +300,7 @@ func (expc *expandController) Run(stopCh <-chan struct{}) { klog.Infof("Starting expand controller") defer klog.Infof("Shutting down expand controller") - if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) { + if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) { return } diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go new file mode 100644 index 0000000000..5226e94e89 --- /dev/null +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -0,0 +1,241 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package expand + +import ( + "encoding/json" + "fmt" + "reflect" + "regexp" + "testing" + + "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + coretesting "k8s.io/client-go/testing" + featuregatetesting "k8s.io/component-base/featuregate/testing" + csitranslationplugins "k8s.io/csi-translation-lib/plugins" + "k8s.io/kubernetes/pkg/controller" + controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/awsebs" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" +) + +func TestSyncHandler(t *testing.T) { + tests := []struct { + name string + csiMigrationEnabled bool + storageClass *storagev1.StorageClass + pvcKey string + pv *v1.PersistentVolume + pvc *v1.PersistentVolumeClaim + expansionCalled bool + hasError bool + expectedAnnotation map[string]string + }{ + { + name: "when pvc has no PV binding", + pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", "", ""), + pvcKey: "default/no-pv-pvc", + hasError: true, + }, + { + name: "when pvc has no storageclass", + pv: getFakePersistentVolume("vol-1", csitranslationplugins.AWSEBSInTreePluginName, "no-sc-pvc-vol-1"), + pvc: getFakePersistentVolumeClaim("no-sc-pvc", "vol-1", "", "no-sc-pvc-vol-1"), + pvcKey: "default/no-sc-pvc", + }, + { + name: "when pvc storageclass is missing", + pv: getFakePersistentVolume("vol-2", csitranslationplugins.AWSEBSInTreePluginName, "missing-sc-pvc-vol-2"), + pvc: getFakePersistentVolumeClaim("missing-sc-pvc", "vol-2", "resizable", "missing-sc-pvc-vol-2"), + pvcKey: "default/missing-sc-pvc", + }, + { + name: "when pvc and pv has everything for in-tree plugin", + pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"), + pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"), + storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName), + pvcKey: "default/good-pvc", + expansionCalled: true, + expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSInTreePluginName}, + }, + { + name: "when csi migration is enabled for a in-tree plugin", + csiMigrationEnabled: true, + pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "csi-pvc-vol-4"), + pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "resizable3", "csi-pvc-vol-4"), + storageClass: getFakeStorageClass("resizable3", csitranslationplugins.AWSEBSInTreePluginName), + pvcKey: "default/csi-pvc", + expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSDriverName}, + }, + { + name: "for csi plugin without migration path", + pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "ceph-csi-pvc-vol-5"), + pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "resizable4", "ceph-csi-pvc-vol-5"), + storageClass: getFakeStorageClass("resizable4", "com.csi.ceph"), + pvcKey: "default/ceph-csi-pvc", + expansionCalled: false, + hasError: false, + }, + } + + for _, tc := range tests { + test := tc + fakeKubeClient := controllervolumetesting.CreateTestClient() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvInformer := informerFactory.Core().V1().PersistentVolumes() + storageClassInformer := informerFactory.Storage().V1().StorageClasses() + + pvc := test.pvc + if tc.pv != nil { + informerFactory.Core().V1().PersistentVolumes().Informer().GetIndexer().Add(tc.pv) + } + + if tc.pvc != nil { + informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(pvc) + } + allPlugins := []volume.VolumePlugin{} + allPlugins = append(allPlugins, awsebs.ProbeVolumePlugins()...) + if tc.storageClass != nil { + informerFactory.Storage().V1().StorageClasses().Informer().GetIndexer().Add(tc.storageClass) + } + expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, allPlugins) + if err != nil { + t.Fatalf("error creating expand controller : %v", err) + } + + if test.csiMigrationEnabled { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)() + } else { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)() + } + + var expController *expandController + expController, _ = expc.(*expandController) + var expansionCalled bool + expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() (error, error) { + expansionCalled = true + return nil, nil + }) + + fakeKubeClient.AddReactor("patch", "persistentvolumeclaims", func(action coretesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + patchActionaction, _ := action.(coretesting.PatchAction) + pvc, err = applyPVCPatch(pvc, patchActionaction.GetPatch()) + if err != nil { + return false, nil, err + } + return true, pvc, nil + } + return true, pvc, nil + }) + + err = expController.syncHandler(test.pvcKey) + if err != nil && !test.hasError { + t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err) + } + + if err == nil && test.hasError { + t.Fatalf("for: %s; unexpected success", test.name) + } + if expansionCalled != test.expansionCalled { + t.Fatalf("for: %s; expected expansionCalled to be %v but was %v", test.name, test.expansionCalled, expansionCalled) + } + + if len(test.expectedAnnotation) != 0 && !reflect.DeepEqual(test.expectedAnnotation, pvc.Annotations) { + t.Fatalf("for: %s; expected %v annotations, got %v", test.name, test.expectedAnnotation, pvc.Annotations) + } + } +} + +func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.PersistentVolumeClaim, error) { + pvcData, err := json.Marshal(originalPVC) + if err != nil { + return nil, fmt.Errorf("failed to marshal pvc with %v", err) + } + updated, err := strategicpatch.StrategicMergePatch(pvcData, patch, v1.PersistentVolumeClaim{}) + if err != nil { + return nil, fmt.Errorf("failed to apply patch on pvc %v", err) + } + updatedPVC := &v1.PersistentVolumeClaim{} + if err := json.Unmarshal(updated, updatedPVC); err != nil { + return nil, fmt.Errorf("failed to unmarshal updated pvc : %v", err) + } + return updatedPVC, nil +} + +func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: volumeName}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{}, + ClaimRef: &v1.ObjectReference{ + Namespace: "default", + }, + }, + } + if pvcUID != "" { + pv.Spec.ClaimRef.UID = pvcUID + } + + if matched, _ := regexp.MatchString(`csi`, pluginName); matched { + pv.Spec.PersistentVolumeSource.CSI = &v1.CSIPersistentVolumeSource{ + Driver: pluginName, + VolumeHandle: volumeName, + } + } else { + pv.Spec.PersistentVolumeSource.AWSElasticBlockStore = &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: volumeName, + FSType: "ext4", + } + } + return pv +} + +func getFakePersistentVolumeClaim(pvcName, volumeName, scName string, uid types.UID) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default", UID: uid}, + Spec: v1.PersistentVolumeClaimSpec{}, + } + if volumeName != "" { + pvc.Spec.VolumeName = volumeName + } + + if scName != "" { + pvc.Spec.StorageClassName = &scName + } + return pvc +} + +func getFakeStorageClass(scName, pluginName string) *storagev1.StorageClass { + return &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{Name: scName}, + Provisioner: pluginName, + } +} diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index 8ae0ce393b..8c12938b53 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -9,6 +9,7 @@ load( go_library( name = "go_default_library", srcs = [ + "fakegenerator.go", "operation_executor.go", "operation_generator.go", ], diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go new file mode 100644 index 0000000000..0dc2c9e414 --- /dev/null +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -0,0 +1,115 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationexecutor + +import ( + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" +) + +// fakeOGCounter is a simple OperationGenerator which counts number of times a function +// has been caled +type fakeOGCounter struct { + // calledFuncs stores name and count of functions + calledFuncs map[string]int + opFunc func() (error, error) +} + +var _ OperationGenerator = &fakeOGCounter{} + +// NewFakeOGCounter returns a OperationGenerator +func NewFakeOGCounter(opFunc func() (error, error)) OperationGenerator { + return &fakeOGCounter{ + calledFuncs: map[string]int{}, + opFunc: opFunc, + } +} + +func (f *fakeOGCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { + return f.recordFuncCall("GenerateMountVolumeFunc") +} + +func (f *fakeOGCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmountVolumeFunc"), nil +} + +func (f *fakeOGCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { + return f.recordFuncCall("GenerateAttachVolumeFunc") +} + +func (f *fakeOGCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateDetachVolumeFunc"), nil +} + +func (f *fakeOGCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateVolumesAreAttachedFunc"), nil +} + +func (f *fakeOGCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmountDeviceFunc"), nil +} + +func (f *fakeOGCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateVerifyControllerAttachedVolumeFunc"), nil +} + +func (f *fakeOGCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateMapVolumeFunc"), nil +} + +func (f *fakeOGCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmapVolumeFunc"), nil +} + +func (f *fakeOGCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmapDeviceFunc"), nil +} + +func (f *fakeOGCounter) GetVolumePluginMgr() *volume.VolumePluginMgr { + return nil +} + +func (f *fakeOGCounter) GenerateBulkVolumeVerifyFunc( + map[types.NodeName][]*volume.Spec, + string, + map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateBulkVolumeVerifyFunc"), nil +} + +func (f *fakeOGCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateExpandVolumeFunc"), nil +} + +func (f *fakeOGCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateExpandVolumeFSWithoutUnmountingFunc"), nil +} + +func (f *fakeOGCounter) recordFuncCall(name string) volumetypes.GeneratedOperations { + if _, ok := f.calledFuncs[name]; ok { + f.calledFuncs[name]++ + } + ops := volumetypes.GeneratedOperations{ + OperationName: name, + OperationFunc: f.opFunc, + } + return ops +} diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 7300d84399..5704be5425 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -683,7 +683,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( // resizeFileSystem will resize the file system if user has requested a resize of // underlying persistent volume and is allowed to do so. - resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName) + resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError) @@ -721,7 +721,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( // - Volume does not support DeviceMounter interface. // - In case of CSI the volume does not have node stage_unstage capability. if !resizeDone { - resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName) + resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError) return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) @@ -760,7 +760,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } } -func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions, pluginName string) (bool, error) { +func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions) (bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName) return true, nil @@ -1577,13 +1577,23 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { - volumePlugin, err := - og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) - if err != nil || volumePlugin == nil { - return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err) - } fsResizeFunc := func() (error, error) { + // Need to translate the spec here if the plugin is migrated so that the metrics + // emitted show the correct (migrated) plugin + if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { + csiSpec, err := translateSpec(volumeToMount.VolumeSpec) + if err != nil { + return volumeToMount.GenerateError("VolumeFSResize.translateSpec failed", err) + } + volumeToMount.VolumeSpec = csiSpec + } + volumePlugin, err := + og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) + if err != nil || volumePlugin == nil { + return volumeToMount.GenerateError("VolumeFSResize.FindPluginBySpec failed", err) + } + var resizeDone bool var simpleErr, detailedErr error resizeOptions := volume.NodeResizeOptions{ @@ -1603,7 +1613,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( return volumeToMount.GenerateError("VolumeFSResize.GetDeviceMountPath failed", err) } resizeOptions.DeviceMountPath = dmp - resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName()) + resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) if simpleErr != nil || detailedErr != nil { return simpleErr, detailedErr } @@ -1623,7 +1633,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( resizeOptions.DeviceMountPath = volumeMounter.GetPath() resizeOptions.CSIVolumePhase = volume.CSIVolumePublished - resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName()) + resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) if simpleErr != nil || detailedErr != nil { return simpleErr, detailedErr } @@ -1631,7 +1641,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( return nil, nil } // This is a placeholder error - we should NEVER reach here. - err := fmt.Errorf("volume resizing failed for unknown reason") + err = fmt.Errorf("volume resizing failed for unknown reason") return volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed to resize volume", err) } @@ -1641,6 +1651,24 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( } } + // Need to translate the spec here if the plugin is migrated so that the metrics + // emitted show the correct (migrated) plugin + if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { + csiSpec, err := translateSpec(volumeToMount.VolumeSpec) + if err == nil { + volumeToMount.VolumeSpec = csiSpec + } + // If we have an error here we ignore it, the metric emitted will then be for the + // in-tree plugin. This error case(skipped one) will also trigger an error + // while the generated function is executed. And those errors will be handled during the execution of the generated + // function with a back off policy. + } + volumePlugin, err := + og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) + if err != nil || volumePlugin == nil { + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err) + } + return volumetypes.GeneratedOperations{ OperationName: "volume_fs_resize", OperationFunc: fsResizeFunc, @@ -1651,9 +1679,8 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, - resizeOptions volume.NodeResizeOptions, - pluginName string) (bool, error, error) { - resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions, pluginName) + resizeOptions volume.NodeResizeOptions) (bool, error, error) { + resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions) if err != nil { klog.Errorf("VolumeFSResize.resizeFileSystem failed : %v", err) e1, e2 := volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed", err) diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index 52398412e5..b35daba380 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/resizefs" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) var ( @@ -81,9 +82,11 @@ func UpdatePVSize( return nil } -// MarkResizeInProgress marks cloudprovider resizing as in progress -func MarkResizeInProgress( +// MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress +// and also annotates the PVC with the name of the resizer. +func MarkResizeInProgressWithResizer( pvc *v1.PersistentVolumeClaim, + resizerName string, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { // Mark PVC as Resize Started progressCondition := v1.PersistentVolumeClaimCondition{ @@ -94,9 +97,28 @@ func MarkResizeInProgress( conditions := []v1.PersistentVolumeClaimCondition{progressCondition} newPVC := pvc.DeepCopy() newPVC = MergeResizeConditionOnPVC(newPVC, conditions) + newPVC = setResizer(newPVC, resizerName) return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) } +// SetClaimResizer sets resizer annotation on PVC +func SetClaimResizer( + pvc *v1.PersistentVolumeClaim, + resizerName string, + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + newPVC := pvc.DeepCopy() + newPVC = setResizer(newPVC, resizerName) + return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) +} + +func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.PersistentVolumeClaim { + if val, ok := pvc.Annotations[volumetypes.VolumeResizerKey]; ok && val == resizerName { + return pvc + } + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volumetypes.VolumeResizerKey, resizerName) + return pvc +} + // MarkForFSResize marks file system resizing as pending func MarkForFSResize( pvc *v1.PersistentVolumeClaim, diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 8afea9bac2..2811cd1dc6 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -50,3 +50,10 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { defer runtime.RecoverFromPanic(&detailedErr) return o.OperationFunc() } + +const ( + // VolumeResizerKey is key that will be used to store resizer used + // for resizing PVC. The generated key/value pair will be added + // as a annotation to the PVC. + VolumeResizerKey = "volume.kubernetes.io/storage-resizer" +)