diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index c8e3e62f27..5f3a8b7b5d 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -7,14 +7,12 @@ go_library( srcs = [ "expand_controller.go", "pvc_populator.go", - "sync_volume_resize.go", ], importpath = "k8s.io/kubernetes/pkg/controller/volume/expand", deps = [ "//pkg/controller:go_default_library", "//pkg/controller/volume/events:go_default_library", "//pkg/controller/volume/expand/cache:go_default_library", - "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library", @@ -23,7 +21,7 @@ go_library( "//pkg/volume/util/volumepathhandler:go_default_library", "//staging/src/k8s.io/api/authentication/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", @@ -35,6 +33,7 @@ go_library( "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/controller/volume/expand/cache/BUILD b/pkg/controller/volume/expand/cache/BUILD index a416cc859c..64ace31c7d 100644 --- a/pkg/controller/volume/expand/cache/BUILD +++ b/pkg/controller/volume/expand/cache/BUILD @@ -1,15 +1,10 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = ["volume_resize_map.go"], importpath = "k8s.io/kubernetes/pkg/controller/volume/expand/cache", + visibility = ["//visibility:public"], deps = [ "//pkg/volume/util:go_default_library", "//pkg/volume/util/types:go_default_library", @@ -34,18 +29,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], -) - -go_test( - name = "go_default_test", - srcs = ["volume_resize_map_test.go"], - embed = [":go_default_library"], - deps = [ - "//pkg/volume/util/types:go_default_library", - "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - ], + visibility = ["//visibility:public"], ) diff --git a/pkg/controller/volume/expand/cache/volume_resize_map_test.go b/pkg/controller/volume/expand/cache/volume_resize_map_test.go deleted file mode 100644 index d048b3c94a..0000000000 --- a/pkg/controller/volume/expand/cache/volume_resize_map_test.go +++ /dev/null @@ -1,147 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" - "k8s.io/kubernetes/pkg/volume/util/types" -) - -func Test_AddValidPVCUpdate(t *testing.T) { - claim := testVolumeClaim("foo", "ns", v1.PersistentVolumeClaimSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{ - v1.ReadWriteOnce, - v1.ReadOnlyMany, - }, - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceName(v1.ResourceStorage): resource.MustParse("12G"), - }, - }, - VolumeName: "foo", - }) - - unboundClaim := claim.DeepCopy() - unboundClaim.Status.Phase = v1.ClaimPending - - noResizeClaim := claim.DeepCopy() - noResizeClaim.Status.Capacity = v1.ResourceList{ - v1.ResourceName(v1.ResourceStorage): resource.MustParse("12G"), - } - - boundPV := getPersistentVolume("foo", resource.MustParse("10G"), claim) - unboundPV := getPersistentVolume("foo", resource.MustParse("10G"), nil) - misboundPV := getPersistentVolume("foo", resource.MustParse("10G"), nil) - misboundPV.Spec.ClaimRef = &v1.ObjectReference{ - Namespace: "someOtherNamespace", - Name: "someOtherName", - } - - tests := []struct { - name string - pvc *v1.PersistentVolumeClaim - pv *v1.PersistentVolume - expectedPVCs int - }{ - { - "validPVCUpdate", - claim, - boundPV, - 1, - }, - { - "noResizeRequired", - noResizeClaim, - boundPV, - 0, - }, - { - "unboundPVC", - unboundClaim, - boundPV, - 0, - }, - { - "unboundPV", - claim, - unboundPV, - 0, - }, - { - "misboundPV", - claim, - misboundPV, - 0, - }, - } - for _, test := range tests { - resizeMap := createTestVolumeResizeMap() - pvc := test.pvc.DeepCopy() - pv := test.pv.DeepCopy() - resizeMap.AddPVCUpdate(pvc, pv) - pvcr := resizeMap.GetPVCsWithResizeRequest() - if len(pvcr) != test.expectedPVCs { - t.Errorf("Test %q expected %d pvc resize request got %d", test.name, test.expectedPVCs, len(pvcr)) - } - if test.expectedPVCs > 0 { - assert.Equal(t, resource.MustParse("12G"), pvcr[0].ExpectedSize, test.name) - } - assert.Equal(t, 0, len(resizeMap.pvcrs), test.name) - } -} - -func createTestVolumeResizeMap() *volumeResizeMap { - fakeClient := &fake.Clientset{} - resizeMap := &volumeResizeMap{} - resizeMap.pvcrs = make(map[types.UniquePVCName]*PVCWithResizeRequest) - resizeMap.kubeClient = fakeClient - return resizeMap -} - -func testVolumeClaim(name string, namespace string, spec v1.PersistentVolumeClaimSpec) *v1.PersistentVolumeClaim { - return &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, - Spec: spec, - Status: v1.PersistentVolumeClaimStatus{ - Phase: v1.ClaimBound, - }, - } -} - -func getPersistentVolume(volumeName string, capacity resource.Quantity, pvc *v1.PersistentVolumeClaim) *v1.PersistentVolume { - volume := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{Name: volumeName}, - Spec: v1.PersistentVolumeSpec{ - Capacity: v1.ResourceList{ - v1.ResourceName(v1.ResourceStorage): capacity, - }, - }, - } - if pvc != nil { - volume.Spec.ClaimRef = &v1.ObjectReference{ - Namespace: pvc.Namespace, - Name: pvc.Name, - } - } - return volume -} diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 5356f66cd0..2332f322fa 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -14,9 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package expand implements interfaces that attempt to resize a pvc -// by adding pvc to a volume resize map from which PVCs are picked and -// resized package expand import ( @@ -28,8 +25,10 @@ import ( authenticationv1 "k8s.io/api/authentication/v1" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -37,10 +36,10 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/events" - "k8s.io/kubernetes/pkg/controller/volume/expand/cache" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -50,10 +49,8 @@ import ( ) const ( - // How often resizing loop runs - syncLoopPeriod time.Duration = 400 * time.Millisecond - // How often pvc populator runs - populatorLoopPeriod time.Duration = 2 * time.Minute + // number of default volume expansion workers + defaultWorkerCount = 10 ) // ExpandController expands the pvs @@ -84,17 +81,9 @@ type expandController struct { // recorder is used to record events in the API server recorder record.EventRecorder - // Volume resize map of volumes that needs resizing - resizeMap cache.VolumeResizeMap + operationGenerator operationexecutor.OperationGenerator - // Worker goroutine to process resize requests from resizeMap - syncResize SyncVolumeResize - - // Operation executor - opExecutor operationexecutor.OperationExecutor - - // populator for periodically polling all PVCs - pvcPopulator PVCPopulator + queue workqueue.RateLimitingInterface } func NewExpandController( @@ -111,10 +100,11 @@ func NewExpandController( pvcsSynced: pvcInformer.Informer().HasSynced, pvLister: pvInformer.Lister(), pvSynced: pvInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), } if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil { - return nil, fmt.Errorf("Could not initialize volume plugins for Expand Controller : %+v", err) + return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err) } eventBroadcaster := record.NewBroadcaster() @@ -123,33 +113,140 @@ func NewExpandController( expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"}) blkutil := volumepathhandler.NewBlockVolumePathHandler() - expc.opExecutor = operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + expc.operationGenerator = operationexecutor.NewOperationGenerator( kubeClient, &expc.volumePluginMgr, expc.recorder, false, - blkutil)) - - expc.resizeMap = cache.NewVolumeResizeMap(expc.kubeClient) + blkutil) pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ - UpdateFunc: expc.pvcUpdate, - DeleteFunc: expc.deletePVC, + AddFunc: expc.enqueuePVC, + UpdateFunc: func(old, new interface{}) { + oldPVC, ok := old.(*v1.PersistentVolumeClaim) + if !ok { + return + } + + oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage] + newPVC, ok := new.(*v1.PersistentVolumeClaim) + if !ok { + return + } + newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage] + if newSize.Cmp(oldSize) > 0 { + expc.enqueuePVC(new) + } + }, + DeleteFunc: expc.enqueuePVC, }) - expc.syncResize = NewSyncVolumeResize(syncLoopPeriod, expc.opExecutor, expc.resizeMap, kubeClient) - expc.pvcPopulator = NewPVCPopulator( - populatorLoopPeriod, - expc.resizeMap, - expc.pvcLister, - expc.pvLister, - &expc.volumePluginMgr, - kubeClient) return expc, nil } +func (expc *expandController) enqueuePVC(obj interface{}) { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return + } + + size := pvc.Spec.Resources.Requests[v1.ResourceStorage] + statusSize := pvc.Status.Capacity[v1.ResourceStorage] + + if pvc.Status.Phase == v1.ClaimBound && size.Cmp(statusSize) > 0 { + key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err)) + return + } + expc.queue.Add(key) + } +} + +func (expc *expandController) processNextWorkItem() bool { + key, shutdown := expc.queue.Get() + if shutdown { + return false + } + defer expc.queue.Done(key) + + err := expc.syncHandler(key.(string)) + if err == nil { + expc.queue.Forget(key) + return true + } + + runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + expc.queue.AddRateLimited(key) + + return true +} + +func (expc *expandController) syncHandler(key string) error { + namespace, name, err := kcache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + pvc, err := expc.pvcLister.PersistentVolumeClaims(namespace).Get(name) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + klog.V(5).Infof("Error getting PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err) + return err + } + + pv, err := getPersistentVolume(pvc, expc.pvLister) + if err != nil { + klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err) + return err + } + if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID { + err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc)) + klog.V(4).Infof("%v", err) + return err + } + + volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) + volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) + if err != nil || volumePlugin == nil { + msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " + + "waiting for an external controller to process this PVC") + eventType := v1.EventTypeNormal + if err != nil { + eventType = v1.EventTypeWarning + } + expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg)) + klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg) + return err + } + + return expc.expand(pvc, pv) +} + +func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error { + pvc, err := util.MarkResizeInProgress(pvc, expc.kubeClient) + if err != nil { + klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + return err + } + + generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv) + if err != nil { + klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + return err + } + klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) + _, detailedErr := generatedOperations.Run() + + return detailedErr +} + +// TODO make concurrency configurable (workers/threadiness argument). previously, nestedpendingoperations spawned unlimited goroutines func (expc *expandController) Run(stopCh <-chan struct{}) { defer runtime.HandleCrash() + defer expc.queue.ShutDown() + klog.Infof("Starting expand controller") defer klog.Infof("Shutting down expand controller") @@ -157,73 +254,15 @@ func (expc *expandController) Run(stopCh <-chan struct{}) { return } - // Run volume sync work goroutine - go expc.syncResize.Run(stopCh) - // Start the pvc populator loop - go expc.pvcPopulator.Run(stopCh) + for i := 0; i < defaultWorkerCount; i++ { + go wait.Until(expc.runWorker, time.Second, stopCh) + } + <-stopCh } -func (expc *expandController) deletePVC(obj interface{}) { - pvc, ok := obj.(*v1.PersistentVolumeClaim) - if !ok { - tombstone, ok := obj.(kcache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) - return - } - pvc, ok = tombstone.Obj.(*v1.PersistentVolumeClaim) - if !ok { - runtime.HandleError(fmt.Errorf("tombstone contained object that is not a pvc %#v", obj)) - return - } - } - - expc.resizeMap.DeletePVC(pvc) -} - -func (expc *expandController) pvcUpdate(oldObj, newObj interface{}) { - oldPVC, ok := oldObj.(*v1.PersistentVolumeClaim) - - if oldPVC == nil || !ok { - return - } - - newPVC, ok := newObj.(*v1.PersistentVolumeClaim) - - if newPVC == nil || !ok { - return - } - - newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage] - oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage] - - // We perform additional checks inside resizeMap.AddPVCUpdate function - // this check here exists to ensure - we do not consider every - // PVC update event for resizing, just those where the PVC size changes - if newSize.Cmp(oldSize) > 0 { - pv, err := getPersistentVolume(newPVC, expc.pvLister) - if err != nil { - klog.V(5).Infof("Error getting Persistent Volume for PVC %q : %v", newPVC.UID, err) - return - } - - volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) - volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) - if err != nil || volumePlugin == nil { - retErr := fmt.Errorf("didn't find a plugin capable of expanding the volume; " + - "waiting for an external controller to process this PVC") - eventType := v1.EventTypeNormal - if err != nil { - eventType = v1.EventTypeWarning - } - expc.recorder.Event(newPVC, eventType, events.ExternalExpanding, - fmt.Sprintf("Ignoring the PVC: %v.", retErr)) - klog.V(3).Infof("Ignoring the PVC %q (uid: %q) : %v.", - util.GetPersistentVolumeClaimQualifiedName(newPVC), newPVC.UID, retErr) - return - } - expc.resizeMap.AddPVCUpdate(newPVC, pv) +func (expc *expandController) runWorker() { + for expc.processNextWorkItem() { } } diff --git a/pkg/controller/volume/expand/sync_volume_resize.go b/pkg/controller/volume/expand/sync_volume_resize.go deleted file mode 100644 index b59ff845ec..0000000000 --- a/pkg/controller/volume/expand/sync_volume_resize.go +++ /dev/null @@ -1,103 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package expand - -import ( - "time" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller/volume/expand/cache" - "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume/util" - "k8s.io/kubernetes/pkg/volume/util/operationexecutor" -) - -type SyncVolumeResize interface { - Run(stopCh <-chan struct{}) -} - -type syncResize struct { - loopPeriod time.Duration - resizeMap cache.VolumeResizeMap - opsExecutor operationexecutor.OperationExecutor - kubeClient clientset.Interface -} - -// NewSyncVolumeResize returns actual volume resize handler -func NewSyncVolumeResize( - loopPeriod time.Duration, - opsExecutor operationexecutor.OperationExecutor, - resizeMap cache.VolumeResizeMap, - kubeClient clientset.Interface) SyncVolumeResize { - rc := &syncResize{ - loopPeriod: loopPeriod, - opsExecutor: opsExecutor, - resizeMap: resizeMap, - kubeClient: kubeClient, - } - return rc -} - -func (rc *syncResize) Run(stopCh <-chan struct{}) { - wait.Until(rc.Sync, rc.loopPeriod, stopCh) -} - -func (rc *syncResize) Sync() { - // Resize PVCs that require resize - for _, pvcWithResizeRequest := range rc.resizeMap.GetPVCsWithResizeRequest() { - uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey()) - if rc.opsExecutor.IsOperationPending(uniqueVolumeKey, "") { - klog.V(10).Infof("Operation for PVC %s is already pending", pvcWithResizeRequest.QualifiedName()) - continue - } - - updatedClaim, err := markPVCResizeInProgress(pvcWithResizeRequest, rc.kubeClient) - if err != nil { - klog.V(5).Infof("Error setting PVC %s in progress with error : %v", pvcWithResizeRequest.QualifiedName(), err) - continue - } - if updatedClaim != nil { - pvcWithResizeRequest.PVC = updatedClaim - } - - growFuncError := rc.opsExecutor.ExpandVolume(pvcWithResizeRequest, rc.resizeMap) - if growFuncError != nil && !exponentialbackoff.IsExponentialBackoff(growFuncError) { - klog.Errorf("Error growing pvc %s with %v", pvcWithResizeRequest.QualifiedName(), growFuncError) - } - if growFuncError == nil { - klog.V(5).Infof("Started opsExecutor.ExpandVolume for volume %s", pvcWithResizeRequest.QualifiedName()) - } - } -} - -func markPVCResizeInProgress(pvcWithResizeRequest *cache.PVCWithResizeRequest, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { - // Mark PVC as Resize Started - progressCondition := v1.PersistentVolumeClaimCondition{ - Type: v1.PersistentVolumeClaimResizing, - Status: v1.ConditionTrue, - LastTransitionTime: metav1.Now(), - } - conditions := []v1.PersistentVolumeClaimCondition{progressCondition} - newPVC := pvcWithResizeRequest.PVC.DeepCopy() - newPVC = util.MergeResizeConditionOnPVC(newPVC, conditions) - - return util.PatchPVCStatus(pvcWithResizeRequest.PVC /*oldPVC*/, newPVC, kubeClient) -} diff --git a/pkg/volume/util/BUILD b/pkg/volume/util/BUILD index d6200ddc6e..950fb62166 100644 --- a/pkg/volume/util/BUILD +++ b/pkg/volume/util/BUILD @@ -29,8 +29,10 @@ go_library( "//pkg/volume/util/volumepathhandler:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index ee9d11978a..193e362cd0 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -143,15 +143,7 @@ func (grm *nestedPendingOperations) Run( defer k8sRuntime.HandleCrash() // Handle completion of and error, if any, from operationFunc() defer grm.operationComplete(volumeName, podName, &detailedErr) - if generatedOperations.CompleteFunc != nil { - defer generatedOperations.CompleteFunc(&detailedErr) - } - if generatedOperations.EventRecorderFunc != nil { - defer generatedOperations.EventRecorderFunc(&eventErr) - } - // Handle panic, if any, from operationFunc() - defer k8sRuntime.RecoverFromPanic(&detailedErr) - return generatedOperations.OperationFunc() + return generatedOperations.Run() }() return nil diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index 314dcaa8c3..8ae0ce393b 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -14,7 +14,6 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/volume/util/operationexecutor", deps = [ - "//pkg/controller/volume/expand/cache:go_default_library", "//pkg/features:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/util/mount:go_default_library", @@ -43,7 +42,6 @@ go_test( srcs = ["operation_executor_test.go"], embed = [":go_default_library"], deps = [ - "//pkg/controller/volume/expand/cache:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util/types:go_default_library", diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 535c95c134..18668f8d29 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -28,7 +28,6 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -140,8 +139,6 @@ type OperationExecutor interface { // IsOperationPending returns true if an operation for the given volumeName and podName is pending, // otherwise it returns false IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool - // Expand Volume will grow size available to PVC - ExpandVolume(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) error // ExpandVolumeFSWithoutUnmounting will resize volume's file system to expected size without unmounting the volume. ExpandVolumeFSWithoutUnmounting(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin @@ -818,17 +815,6 @@ func (oe *operationExecutor) UnmountDevice( deviceToDetach.VolumeName, podName, generatedOperations) } -func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error { - generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap) - - if err != nil { - return err - } - uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey()) - - return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations) -} - func (oe *operationExecutor) ExpandVolumeFSWithoutUnmounting(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount, actualStateOfWorld) if err != nil { diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 83372c5093..c5eb06e9fd 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -25,7 +25,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" - expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" @@ -453,8 +452,7 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v }, nil } -func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, - resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) { +func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) return nil, nil diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index caeda8f8a3..7300d84399 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -34,7 +34,6 @@ import ( volerr "k8s.io/cloud-provider/volume/errors" csilib "k8s.io/csi-translation-lib" "k8s.io/klog" - expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache" "k8s.io/kubernetes/pkg/features" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/mount" @@ -129,7 +128,7 @@ type OperationGenerator interface { string, map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) - GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) + GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) // Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume. GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) @@ -810,7 +809,7 @@ func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOp og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) klog.Infof(detailedMsg) // File system resize succeeded, now update the PVC's Capacity to match the PV's - err = util.MarkFSResizeFinished(pvc, pv.Spec.Capacity, og.kubeClient) + err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) if err != nil { // On retry, resizeFileSystem will be called again but do nothing return false, fmt.Errorf("MountVolume.resizeFileSystem update PVC status failed : %v", err) @@ -1494,47 +1493,47 @@ func (og *operationGenerator) verifyVolumeIsSafeToDetach( } func (og *operationGenerator) GenerateExpandVolumeFunc( - pvcWithResizeRequest *expandcache.PVCWithResizeRequest, - resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) { + pvc *v1.PersistentVolumeClaim, + pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { - volumeSpec := volume.NewSpecFromPersistentVolume(pvcWithResizeRequest.PersistentVolume, false) + volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) - if err != nil { - return volumetypes.GeneratedOperations{}, fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", pvcWithResizeRequest.QualifiedName(), err) + return volumetypes.GeneratedOperations{}, fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) } if volumePlugin == nil { - return volumetypes.GeneratedOperations{}, fmt.Errorf("Can not find plugin for expanding volume: %q", pvcWithResizeRequest.QualifiedName()) + return volumetypes.GeneratedOperations{}, fmt.Errorf("Can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc)) } expandVolumeFunc := func() (error, error) { - newSize := pvcWithResizeRequest.ExpectedSize - pvSize := pvcWithResizeRequest.PersistentVolume.Spec.Capacity[v1.ResourceStorage] + newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] + statusSize := pvc.Status.Capacity[v1.ResourceStorage] + pvSize := pv.Spec.Capacity[v1.ResourceStorage] if pvSize.Cmp(newSize) < 0 { updatedSize, expandErr := volumePlugin.ExpandVolumeDevice( volumeSpec, - pvcWithResizeRequest.ExpectedSize, - pvcWithResizeRequest.CurrentSize) - + newSize, + statusSize) if expandErr != nil { - detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", pvcWithResizeRequest.QualifiedName(), volumePlugin.GetPluginName(), expandErr) + detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), volumePlugin.GetPluginName(), expandErr) return detailedErr, detailedErr } - klog.Infof("ExpandVolume succeeded for volume %s", pvcWithResizeRequest.QualifiedName()) + klog.Infof("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) + newSize = updatedSize // k8s doesn't have transactions, we can't guarantee that after updating PV - updating PVC will be // successful, that is why all PVCs for which pvc.Spec.Size > pvc.Status.Size must be reprocessed // until they reflect user requested size in pvc.Status.Size - updateErr := resizeMap.UpdatePVSize(pvcWithResizeRequest, newSize) - + updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient) if updateErr != nil { - detailedErr := fmt.Errorf("Error updating PV spec capacity for volume %q with : %v", pvcWithResizeRequest.QualifiedName(), updateErr) + detailedErr := fmt.Errorf("Error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr) return detailedErr, detailedErr } - klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", pvcWithResizeRequest.QualifiedName()) + + klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) } fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec) @@ -1542,19 +1541,18 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( // Rest of the volume expand controller code will assume PVC as *not* resized until pvc.Status.Size // reflects user requested size. if !volumePlugin.RequiresFSResize() || !fsVolume { - klog.V(4).Infof("Controller resizing done for PVC %s", pvcWithResizeRequest.QualifiedName()) - err := resizeMap.MarkAsResized(pvcWithResizeRequest, newSize) - + klog.V(4).Infof("Controller resizing done for PVC %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) + err := util.MarkResizeFinished(pvc, newSize, og.kubeClient) if err != nil { - detailedErr := fmt.Errorf("Error marking pvc %s as resized : %v", pvcWithResizeRequest.QualifiedName(), err) + detailedErr := fmt.Errorf("Error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) return detailedErr, detailedErr } - successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", pvcWithResizeRequest.QualifiedName()) - og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg) + successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) + og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg) } else { - err := resizeMap.MarkForFSResize(pvcWithResizeRequest) + err := util.MarkForFSResize(pvc, og.kubeClient) if err != nil { - detailedErr := fmt.Errorf("Error updating pvc %s condition for fs resize : %v", pvcWithResizeRequest.QualifiedName(), err) + detailedErr := fmt.Errorf("Error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) klog.Warning(detailedErr) return nil, nil } @@ -1564,7 +1562,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( eventRecorderFunc := func(err *error) { if *err != nil { - og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) + og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) } } diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index 9090ff8686..52398412e5 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -21,6 +21,10 @@ import ( "fmt" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" clientset "k8s.io/client-go/kubernetes" @@ -46,45 +50,154 @@ func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string { return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name) } +// UpdatePVSize updates just pv size after cloudprovider resizing is successful +func UpdatePVSize( + pv *v1.PersistentVolume, + newSize resource.Quantity, + kubeClient clientset.Interface) error { + pvClone := pv.DeepCopy() + + oldData, err := json.Marshal(pvClone) + if err != nil { + return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", pvClone.Name, err) + } + + pvClone.Spec.Capacity[v1.ResourceStorage] = newSize + + newData, err := json.Marshal(pvClone) + if err != nil { + return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", pvClone.Name, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone) + if err != nil { + return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", pvClone.Name, err) + } + + _, err = kubeClient.CoreV1().PersistentVolumes().Patch(pvClone.Name, types.StrategicMergePatchType, patchBytes) + if err != nil { + return fmt.Errorf("error Patching PV %q with error : %v", pvClone.Name, err) + } + return nil +} + +// MarkResizeInProgress marks cloudprovider resizing as in progress +func MarkResizeInProgress( + pvc *v1.PersistentVolumeClaim, + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + // Mark PVC as Resize Started + progressCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimResizing, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } + conditions := []v1.PersistentVolumeClaimCondition{progressCondition} + newPVC := pvc.DeepCopy() + newPVC = MergeResizeConditionOnPVC(newPVC, conditions) + return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) +} + +// MarkForFSResize marks file system resizing as pending +func MarkForFSResize( + pvc *v1.PersistentVolumeClaim, + kubeClient clientset.Interface) error { + pvcCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimFileSystemResizePending, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.", + } + conditions := []v1.PersistentVolumeClaimCondition{pvcCondition} + newPVC := pvc.DeepCopy() + newPVC = MergeResizeConditionOnPVC(newPVC, conditions) + _, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) + return err +} + +// MarkResizeFinished marks all resizing as done +func MarkResizeFinished( + pvc *v1.PersistentVolumeClaim, + newSize resource.Quantity, + kubeClient clientset.Interface) error { + return MarkFSResizeFinished(pvc, newSize, kubeClient) +} + // MarkFSResizeFinished marks file system resizing as done func MarkFSResizeFinished( pvc *v1.PersistentVolumeClaim, - capacity v1.ResourceList, + newSize resource.Quantity, kubeClient clientset.Interface) error { newPVC := pvc.DeepCopy() - newPVC.Status.Capacity = capacity + newPVC.Status.Capacity[v1.ResourceStorage] = newSize newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{}) _, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) return err } // PatchPVCStatus updates PVC status using PATCH verb +// Don't use Update because this can be called from kubelet and if kubelet has an older client its +// Updates will overwrite new fields. And to avoid writing to a stale object, add ResourceVersion +// to the patch so that Patch will fail if the patch's RV != actual up-to-date RV like Update would func PatchPVCStatus( oldPVC *v1.PersistentVolumeClaim, newPVC *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { - pvcName := oldPVC.Name + patchBytes, err := createPVCPatch(oldPVC, newPVC) + if err != nil { + return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err) + } + updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace). + Patch(oldPVC.Name, types.StrategicMergePatchType, patchBytes, "status") + if updateErr != nil { + return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr) + } + return updatedClaim, nil +} + +func createPVCPatch( + oldPVC *v1.PersistentVolumeClaim, + newPVC *v1.PersistentVolumeClaim) ([]byte, error) { oldData, err := json.Marshal(oldPVC) if err != nil { - return nil, fmt.Errorf("PatchPVCStatus.Failed to marshal oldData for pvc %q with %v", pvcName, err) + return nil, fmt.Errorf("failed to marshal old data: %v", err) } newData, err := json.Marshal(newPVC) if err != nil { - return nil, fmt.Errorf("PatchPVCStatus.Failed to marshal newData for pvc %q with %v", pvcName, err) + return nil, fmt.Errorf("failed to marshal new data: %v", err) } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC) if err != nil { - return nil, fmt.Errorf("PatchPVCStatus.Failed to CreateTwoWayMergePatch for pvc %q with %v ", pvcName, err) + return nil, fmt.Errorf("failed to create 2 way merge patch: %v", err) } - updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace). - Patch(pvcName, types.StrategicMergePatchType, patchBytes, "status") - if updateErr != nil { - return nil, fmt.Errorf("PatchPVCStatus.Failed to patch PVC %q with %v", pvcName, updateErr) + + patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion) + if err != nil { + return nil, fmt.Errorf("failed to add resource version: %v", err) } - return updatedClaim, nil + + return patchBytes, nil +} + +func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) { + var patchMap map[string]interface{} + err := json.Unmarshal(patchBytes, &patchMap) + if err != nil { + return nil, fmt.Errorf("error unmarshalling patch: %v", err) + } + u := unstructured.Unstructured{Object: patchMap} + a, err := meta.Accessor(&u) + if err != nil { + return nil, fmt.Errorf("error creating accessor: %v", err) + } + a.SetResourceVersion(resourceVersion) + versionBytes, err := json.Marshal(patchMap) + if err != nil { + return nil, fmt.Errorf("error marshalling json patch: %v", err) + } + return versionBytes, nil } // MergeResizeConditionOnPVC updates pvc with requested resize conditions diff --git a/pkg/volume/util/resize_util_test.go b/pkg/volume/util/resize_util_test.go index 28fb859eca..ac637382f3 100644 --- a/pkg/volume/util/resize_util_test.go +++ b/pkg/volume/util/resize_util_test.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "encoding/json" "reflect" "testing" "time" @@ -141,6 +142,38 @@ func TestMergeResizeCondition(t *testing.T) { } +func TestCreatePVCPatch(t *testing.T) { + pvc1 := getPVC([]v1.PersistentVolumeClaimCondition{ + { + Type: v1.PersistentVolumeClaimFileSystemResizePending, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + }) + pvc1.SetResourceVersion("10") + pvc2 := pvc1.DeepCopy() + pvc2.Status.Capacity = v1.ResourceList{ + v1.ResourceName("size"): resource.MustParse("10G"), + } + patchBytes, err := createPVCPatch(pvc1, pvc2) + if err != nil { + t.Errorf("error creating patch bytes %v", err) + } + var patchMap map[string]interface{} + err = json.Unmarshal(patchBytes, &patchMap) + if err != nil { + t.Errorf("error unmarshalling json patch : %v", err) + } + metadata, ok := patchMap["metadata"].(map[string]interface{}) + if !ok { + t.Errorf("error converting metadata to version map") + } + resourceVersion, _ := metadata["resourceVersion"].(string) + if resourceVersion != "10" { + t.Errorf("expected resource version to 10 got %s", resourceVersion) + } +} + func getPVC(conditions []v1.PersistentVolumeClaimCondition) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "resize"}, diff --git a/pkg/volume/util/types/BUILD b/pkg/volume/util/types/BUILD index 919ee30fda..80d844ea02 100644 --- a/pkg/volume/util/types/BUILD +++ b/pkg/volume/util/types/BUILD @@ -9,7 +9,10 @@ go_library( name = "go_default_library", srcs = ["types.go"], importpath = "k8s.io/kubernetes/pkg/volume/util/types", - deps = ["//staging/src/k8s.io/apimachinery/pkg/types:go_default_library"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + ], ) filegroup( diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index b5f1009bee..8afea9bac2 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -17,7 +17,10 @@ limitations under the License. // Package types defines types used only by volume components package types -import "k8s.io/apimachinery/pkg/types" +import ( + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/runtime" +) // UniquePodName defines the type to key pods off of type UniquePodName types.UID @@ -34,3 +37,16 @@ type GeneratedOperations struct { EventRecorderFunc func(*error) CompleteFunc func(*error) } + +// Run executes the operations and its supporting functions +func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { + if o.CompleteFunc != nil { + defer o.CompleteFunc(&detailedErr) + } + if o.EventRecorderFunc != nil { + defer o.EventRecorderFunc(&eventErr) + } + // Handle panic, if any, from operationFunc() + defer runtime.RecoverFromPanic(&detailedErr) + return o.OperationFunc() +}