Merge pull request #77883 from gnufied/mat-expand-workqueue

Use workqueues for volume expansion
k3s-v1.15.3
Kubernetes Prow Robot 2019-05-17 02:04:18 -07:00 committed by GitHub
commit da4d6f5829
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 350 additions and 441 deletions

View File

@ -7,14 +7,12 @@ go_library(
srcs = [
"expand_controller.go",
"pvc_populator.go",
"sync_volume_resize.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/expand",
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/events:go_default_library",
"//pkg/controller/volume/expand/cache:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
@ -23,7 +21,7 @@ go_library(
"//pkg/volume/util/volumepathhandler:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@ -35,6 +33,7 @@ go_library(
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -1,15 +1,10 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["volume_resize_map.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/expand/cache",
visibility = ["//visibility:public"],
deps = [
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/types:go_default_library",
@ -34,18 +29,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["volume_resize_map_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
visibility = ["//visibility:public"],
)

View File

@ -1,147 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/volume/util/types"
)
func Test_AddValidPVCUpdate(t *testing.T) {
claim := testVolumeClaim("foo", "ns", v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
v1.ReadOnlyMany,
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("12G"),
},
},
VolumeName: "foo",
})
unboundClaim := claim.DeepCopy()
unboundClaim.Status.Phase = v1.ClaimPending
noResizeClaim := claim.DeepCopy()
noResizeClaim.Status.Capacity = v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("12G"),
}
boundPV := getPersistentVolume("foo", resource.MustParse("10G"), claim)
unboundPV := getPersistentVolume("foo", resource.MustParse("10G"), nil)
misboundPV := getPersistentVolume("foo", resource.MustParse("10G"), nil)
misboundPV.Spec.ClaimRef = &v1.ObjectReference{
Namespace: "someOtherNamespace",
Name: "someOtherName",
}
tests := []struct {
name string
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
expectedPVCs int
}{
{
"validPVCUpdate",
claim,
boundPV,
1,
},
{
"noResizeRequired",
noResizeClaim,
boundPV,
0,
},
{
"unboundPVC",
unboundClaim,
boundPV,
0,
},
{
"unboundPV",
claim,
unboundPV,
0,
},
{
"misboundPV",
claim,
misboundPV,
0,
},
}
for _, test := range tests {
resizeMap := createTestVolumeResizeMap()
pvc := test.pvc.DeepCopy()
pv := test.pv.DeepCopy()
resizeMap.AddPVCUpdate(pvc, pv)
pvcr := resizeMap.GetPVCsWithResizeRequest()
if len(pvcr) != test.expectedPVCs {
t.Errorf("Test %q expected %d pvc resize request got %d", test.name, test.expectedPVCs, len(pvcr))
}
if test.expectedPVCs > 0 {
assert.Equal(t, resource.MustParse("12G"), pvcr[0].ExpectedSize, test.name)
}
assert.Equal(t, 0, len(resizeMap.pvcrs), test.name)
}
}
func createTestVolumeResizeMap() *volumeResizeMap {
fakeClient := &fake.Clientset{}
resizeMap := &volumeResizeMap{}
resizeMap.pvcrs = make(map[types.UniquePVCName]*PVCWithResizeRequest)
resizeMap.kubeClient = fakeClient
return resizeMap
}
func testVolumeClaim(name string, namespace string, spec v1.PersistentVolumeClaimSpec) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
Spec: spec,
Status: v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimBound,
},
}
}
func getPersistentVolume(volumeName string, capacity resource.Quantity, pvc *v1.PersistentVolumeClaim) *v1.PersistentVolume {
volume := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: volumeName},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): capacity,
},
},
}
if pvc != nil {
volume.Spec.ClaimRef = &v1.ObjectReference{
Namespace: pvc.Namespace,
Name: pvc.Name,
}
}
return volume
}

View File

@ -14,9 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package expand implements interfaces that attempt to resize a pvc
// by adding pvc to a volume resize map from which PVCs are picked and
// resized
package expand
import (
@ -28,8 +25,10 @@ import (
authenticationv1 "k8s.io/api/authentication/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
@ -37,10 +36,10 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
@ -50,10 +49,8 @@ import (
)
const (
// How often resizing loop runs
syncLoopPeriod time.Duration = 400 * time.Millisecond
// How often pvc populator runs
populatorLoopPeriod time.Duration = 2 * time.Minute
// number of default volume expansion workers
defaultWorkerCount = 10
)
// ExpandController expands the pvs
@ -84,17 +81,9 @@ type expandController struct {
// recorder is used to record events in the API server
recorder record.EventRecorder
// Volume resize map of volumes that needs resizing
resizeMap cache.VolumeResizeMap
operationGenerator operationexecutor.OperationGenerator
// Worker goroutine to process resize requests from resizeMap
syncResize SyncVolumeResize
// Operation executor
opExecutor operationexecutor.OperationExecutor
// populator for periodically polling all PVCs
pvcPopulator PVCPopulator
queue workqueue.RateLimitingInterface
}
func NewExpandController(
@ -111,10 +100,11 @@ func NewExpandController(
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
}
if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for Expand Controller : %+v", err)
return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err)
}
eventBroadcaster := record.NewBroadcaster()
@ -123,33 +113,140 @@ func NewExpandController(
expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
blkutil := volumepathhandler.NewBlockVolumePathHandler()
expc.opExecutor = operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
expc.operationGenerator = operationexecutor.NewOperationGenerator(
kubeClient,
&expc.volumePluginMgr,
expc.recorder,
false,
blkutil))
expc.resizeMap = cache.NewVolumeResizeMap(expc.kubeClient)
blkutil)
pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
UpdateFunc: expc.pvcUpdate,
DeleteFunc: expc.deletePVC,
AddFunc: expc.enqueuePVC,
UpdateFunc: func(old, new interface{}) {
oldPVC, ok := old.(*v1.PersistentVolumeClaim)
if !ok {
return
}
oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
newPVC, ok := new.(*v1.PersistentVolumeClaim)
if !ok {
return
}
newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
if newSize.Cmp(oldSize) > 0 {
expc.enqueuePVC(new)
}
},
DeleteFunc: expc.enqueuePVC,
})
expc.syncResize = NewSyncVolumeResize(syncLoopPeriod, expc.opExecutor, expc.resizeMap, kubeClient)
expc.pvcPopulator = NewPVCPopulator(
populatorLoopPeriod,
expc.resizeMap,
expc.pvcLister,
expc.pvLister,
&expc.volumePluginMgr,
kubeClient)
return expc, nil
}
func (expc *expandController) enqueuePVC(obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return
}
size := pvc.Spec.Resources.Requests[v1.ResourceStorage]
statusSize := pvc.Status.Capacity[v1.ResourceStorage]
if pvc.Status.Phase == v1.ClaimBound && size.Cmp(statusSize) > 0 {
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
return
}
expc.queue.Add(key)
}
}
func (expc *expandController) processNextWorkItem() bool {
key, shutdown := expc.queue.Get()
if shutdown {
return false
}
defer expc.queue.Done(key)
err := expc.syncHandler(key.(string))
if err == nil {
expc.queue.Forget(key)
return true
}
runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
expc.queue.AddRateLimited(key)
return true
}
func (expc *expandController) syncHandler(key string) error {
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
pvc, err := expc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
klog.V(5).Infof("Error getting PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
return err
}
pv, err := getPersistentVolume(pvc, expc.pvLister)
if err != nil {
klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
return err
}
if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID {
err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc))
klog.V(4).Infof("%v", err)
return err
}
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
"waiting for an external controller to process this PVC")
eventType := v1.EventTypeNormal
if err != nil {
eventType = v1.EventTypeWarning
}
expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
return err
}
return expc.expand(pvc, pv)
}
func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
pvc, err := util.MarkResizeInProgress(pvc, expc.kubeClient)
if err != nil {
klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv)
if err != nil {
klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
_, detailedErr := generatedOperations.Run()
return detailedErr
}
// TODO make concurrency configurable (workers/threadiness argument). previously, nestedpendingoperations spawned unlimited goroutines
func (expc *expandController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer expc.queue.ShutDown()
klog.Infof("Starting expand controller")
defer klog.Infof("Shutting down expand controller")
@ -157,73 +254,15 @@ func (expc *expandController) Run(stopCh <-chan struct{}) {
return
}
// Run volume sync work goroutine
go expc.syncResize.Run(stopCh)
// Start the pvc populator loop
go expc.pvcPopulator.Run(stopCh)
for i := 0; i < defaultWorkerCount; i++ {
go wait.Until(expc.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (expc *expandController) deletePVC(obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
tombstone, ok := obj.(kcache.DeletedFinalStateUnknown)
if !ok {
runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pvc, ok = tombstone.Obj.(*v1.PersistentVolumeClaim)
if !ok {
runtime.HandleError(fmt.Errorf("tombstone contained object that is not a pvc %#v", obj))
return
}
}
expc.resizeMap.DeletePVC(pvc)
}
func (expc *expandController) pvcUpdate(oldObj, newObj interface{}) {
oldPVC, ok := oldObj.(*v1.PersistentVolumeClaim)
if oldPVC == nil || !ok {
return
}
newPVC, ok := newObj.(*v1.PersistentVolumeClaim)
if newPVC == nil || !ok {
return
}
newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
// We perform additional checks inside resizeMap.AddPVCUpdate function
// this check here exists to ensure - we do not consider every
// PVC update event for resizing, just those where the PVC size changes
if newSize.Cmp(oldSize) > 0 {
pv, err := getPersistentVolume(newPVC, expc.pvLister)
if err != nil {
klog.V(5).Infof("Error getting Persistent Volume for PVC %q : %v", newPVC.UID, err)
return
}
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
retErr := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
"waiting for an external controller to process this PVC")
eventType := v1.EventTypeNormal
if err != nil {
eventType = v1.EventTypeWarning
}
expc.recorder.Event(newPVC, eventType, events.ExternalExpanding,
fmt.Sprintf("Ignoring the PVC: %v.", retErr))
klog.V(3).Infof("Ignoring the PVC %q (uid: %q) : %v.",
util.GetPersistentVolumeClaimQualifiedName(newPVC), newPVC.UID, retErr)
return
}
expc.resizeMap.AddPVCUpdate(newPVC, pv)
func (expc *expandController) runWorker() {
for expc.processNextWorkItem() {
}
}

View File

@ -1,103 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expand
import (
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
type SyncVolumeResize interface {
Run(stopCh <-chan struct{})
}
type syncResize struct {
loopPeriod time.Duration
resizeMap cache.VolumeResizeMap
opsExecutor operationexecutor.OperationExecutor
kubeClient clientset.Interface
}
// NewSyncVolumeResize returns actual volume resize handler
func NewSyncVolumeResize(
loopPeriod time.Duration,
opsExecutor operationexecutor.OperationExecutor,
resizeMap cache.VolumeResizeMap,
kubeClient clientset.Interface) SyncVolumeResize {
rc := &syncResize{
loopPeriod: loopPeriod,
opsExecutor: opsExecutor,
resizeMap: resizeMap,
kubeClient: kubeClient,
}
return rc
}
func (rc *syncResize) Run(stopCh <-chan struct{}) {
wait.Until(rc.Sync, rc.loopPeriod, stopCh)
}
func (rc *syncResize) Sync() {
// Resize PVCs that require resize
for _, pvcWithResizeRequest := range rc.resizeMap.GetPVCsWithResizeRequest() {
uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey())
if rc.opsExecutor.IsOperationPending(uniqueVolumeKey, "") {
klog.V(10).Infof("Operation for PVC %s is already pending", pvcWithResizeRequest.QualifiedName())
continue
}
updatedClaim, err := markPVCResizeInProgress(pvcWithResizeRequest, rc.kubeClient)
if err != nil {
klog.V(5).Infof("Error setting PVC %s in progress with error : %v", pvcWithResizeRequest.QualifiedName(), err)
continue
}
if updatedClaim != nil {
pvcWithResizeRequest.PVC = updatedClaim
}
growFuncError := rc.opsExecutor.ExpandVolume(pvcWithResizeRequest, rc.resizeMap)
if growFuncError != nil && !exponentialbackoff.IsExponentialBackoff(growFuncError) {
klog.Errorf("Error growing pvc %s with %v", pvcWithResizeRequest.QualifiedName(), growFuncError)
}
if growFuncError == nil {
klog.V(5).Infof("Started opsExecutor.ExpandVolume for volume %s", pvcWithResizeRequest.QualifiedName())
}
}
}
func markPVCResizeInProgress(pvcWithResizeRequest *cache.PVCWithResizeRequest, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimResizing,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}
conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
newPVC := pvcWithResizeRequest.PVC.DeepCopy()
newPVC = util.MergeResizeConditionOnPVC(newPVC, conditions)
return util.PatchPVCStatus(pvcWithResizeRequest.PVC /*oldPVC*/, newPVC, kubeClient)
}

View File

@ -29,8 +29,10 @@ go_library(
"//pkg/volume/util/volumepathhandler:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -143,15 +143,7 @@ func (grm *nestedPendingOperations) Run(
defer k8sRuntime.HandleCrash()
// Handle completion of and error, if any, from operationFunc()
defer grm.operationComplete(volumeName, podName, &detailedErr)
if generatedOperations.CompleteFunc != nil {
defer generatedOperations.CompleteFunc(&detailedErr)
}
if generatedOperations.EventRecorderFunc != nil {
defer generatedOperations.EventRecorderFunc(&eventErr)
}
// Handle panic, if any, from operationFunc()
defer k8sRuntime.RecoverFromPanic(&detailedErr)
return generatedOperations.OperationFunc()
return generatedOperations.Run()
}()
return nil

View File

@ -14,7 +14,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/volume/util/operationexecutor",
deps = [
"//pkg/controller/volume/expand/cache:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/util/mount:go_default_library",
@ -43,7 +42,6 @@ go_test(
srcs = ["operation_executor_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/controller/volume/expand/cache:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/types:go_default_library",

View File

@ -28,7 +28,6 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
@ -140,8 +139,6 @@ type OperationExecutor interface {
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
// otherwise it returns false
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
// Expand Volume will grow size available to PVC
ExpandVolume(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) error
// ExpandVolumeFSWithoutUnmounting will resize volume's file system to expected size without unmounting the volume.
ExpandVolumeFSWithoutUnmounting(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
@ -818,17 +815,6 @@ func (oe *operationExecutor) UnmountDevice(
deviceToDetach.VolumeName, podName, generatedOperations)
}
func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error {
generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap)
if err != nil {
return err
}
uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey())
return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations)
}
func (oe *operationExecutor) ExpandVolumeFSWithoutUnmounting(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount, actualStateOfWorld)
if err != nil {

View File

@ -25,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
@ -453,8 +452,7 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v
}, nil
}
func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeRequest *expandcache.PVCWithResizeRequest,
resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) {
func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil

View File

@ -34,7 +34,6 @@ import (
volerr "k8s.io/cloud-provider/volume/errors"
csilib "k8s.io/csi-translation-lib"
"k8s.io/klog"
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/features"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/mount"
@ -129,7 +128,7 @@ type OperationGenerator interface {
string,
map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error)
GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error)
// Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume.
GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
@ -810,7 +809,7 @@ func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOp
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.Infof(detailedMsg)
// File system resize succeeded, now update the PVC's Capacity to match the PV's
err = util.MarkFSResizeFinished(pvc, pv.Spec.Capacity, og.kubeClient)
err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
if err != nil {
// On retry, resizeFileSystem will be called again but do nothing
return false, fmt.Errorf("MountVolume.resizeFileSystem update PVC status failed : %v", err)
@ -1494,47 +1493,47 @@ func (og *operationGenerator) verifyVolumeIsSafeToDetach(
}
func (og *operationGenerator) GenerateExpandVolumeFunc(
pvcWithResizeRequest *expandcache.PVCWithResizeRequest,
resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) {
pvc *v1.PersistentVolumeClaim,
pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
volumeSpec := volume.NewSpecFromPersistentVolume(pvcWithResizeRequest.PersistentVolume, false)
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", pvcWithResizeRequest.QualifiedName(), err)
return volumetypes.GeneratedOperations{}, fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
}
if volumePlugin == nil {
return volumetypes.GeneratedOperations{}, fmt.Errorf("Can not find plugin for expanding volume: %q", pvcWithResizeRequest.QualifiedName())
return volumetypes.GeneratedOperations{}, fmt.Errorf("Can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc))
}
expandVolumeFunc := func() (error, error) {
newSize := pvcWithResizeRequest.ExpectedSize
pvSize := pvcWithResizeRequest.PersistentVolume.Spec.Capacity[v1.ResourceStorage]
newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
statusSize := pvc.Status.Capacity[v1.ResourceStorage]
pvSize := pv.Spec.Capacity[v1.ResourceStorage]
if pvSize.Cmp(newSize) < 0 {
updatedSize, expandErr := volumePlugin.ExpandVolumeDevice(
volumeSpec,
pvcWithResizeRequest.ExpectedSize,
pvcWithResizeRequest.CurrentSize)
newSize,
statusSize)
if expandErr != nil {
detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", pvcWithResizeRequest.QualifiedName(), volumePlugin.GetPluginName(), expandErr)
detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), volumePlugin.GetPluginName(), expandErr)
return detailedErr, detailedErr
}
klog.Infof("ExpandVolume succeeded for volume %s", pvcWithResizeRequest.QualifiedName())
klog.Infof("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
newSize = updatedSize
// k8s doesn't have transactions, we can't guarantee that after updating PV - updating PVC will be
// successful, that is why all PVCs for which pvc.Spec.Size > pvc.Status.Size must be reprocessed
// until they reflect user requested size in pvc.Status.Size
updateErr := resizeMap.UpdatePVSize(pvcWithResizeRequest, newSize)
updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient)
if updateErr != nil {
detailedErr := fmt.Errorf("Error updating PV spec capacity for volume %q with : %v", pvcWithResizeRequest.QualifiedName(), updateErr)
detailedErr := fmt.Errorf("Error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
return detailedErr, detailedErr
}
klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", pvcWithResizeRequest.QualifiedName())
klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
}
fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec)
@ -1542,19 +1541,18 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
// Rest of the volume expand controller code will assume PVC as *not* resized until pvc.Status.Size
// reflects user requested size.
if !volumePlugin.RequiresFSResize() || !fsVolume {
klog.V(4).Infof("Controller resizing done for PVC %s", pvcWithResizeRequest.QualifiedName())
err := resizeMap.MarkAsResized(pvcWithResizeRequest, newSize)
klog.V(4).Infof("Controller resizing done for PVC %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
err := util.MarkResizeFinished(pvc, newSize, og.kubeClient)
if err != nil {
detailedErr := fmt.Errorf("Error marking pvc %s as resized : %v", pvcWithResizeRequest.QualifiedName(), err)
detailedErr := fmt.Errorf("Error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return detailedErr, detailedErr
}
successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", pvcWithResizeRequest.QualifiedName())
og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
} else {
err := resizeMap.MarkForFSResize(pvcWithResizeRequest)
err := util.MarkForFSResize(pvc, og.kubeClient)
if err != nil {
detailedErr := fmt.Errorf("Error updating pvc %s condition for fs resize : %v", pvcWithResizeRequest.QualifiedName(), err)
detailedErr := fmt.Errorf("Error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
klog.Warning(detailedErr)
return nil, nil
}
@ -1564,7 +1562,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
eventRecorderFunc := func(err *error) {
if *err != nil {
og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
}
}

View File

@ -21,6 +21,10 @@ import (
"fmt"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes"
@ -46,45 +50,154 @@ func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}
// UpdatePVSize updates just pv size after cloudprovider resizing is successful
func UpdatePVSize(
pv *v1.PersistentVolume,
newSize resource.Quantity,
kubeClient clientset.Interface) error {
pvClone := pv.DeepCopy()
oldData, err := json.Marshal(pvClone)
if err != nil {
return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", pvClone.Name, err)
}
pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
newData, err := json.Marshal(pvClone)
if err != nil {
return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", pvClone.Name, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone)
if err != nil {
return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", pvClone.Name, err)
}
_, err = kubeClient.CoreV1().PersistentVolumes().Patch(pvClone.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
return fmt.Errorf("error Patching PV %q with error : %v", pvClone.Name, err)
}
return nil
}
// MarkResizeInProgress marks cloudprovider resizing as in progress
func MarkResizeInProgress(
pvc *v1.PersistentVolumeClaim,
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimResizing,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}
conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
newPVC := pvc.DeepCopy()
newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
}
// MarkForFSResize marks file system resizing as pending
func MarkForFSResize(
pvc *v1.PersistentVolumeClaim,
kubeClient clientset.Interface) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
}
conditions := []v1.PersistentVolumeClaimCondition{pvcCondition}
newPVC := pvc.DeepCopy()
newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
_, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
return err
}
// MarkResizeFinished marks all resizing as done
func MarkResizeFinished(
pvc *v1.PersistentVolumeClaim,
newSize resource.Quantity,
kubeClient clientset.Interface) error {
return MarkFSResizeFinished(pvc, newSize, kubeClient)
}
// MarkFSResizeFinished marks file system resizing as done
func MarkFSResizeFinished(
pvc *v1.PersistentVolumeClaim,
capacity v1.ResourceList,
newSize resource.Quantity,
kubeClient clientset.Interface) error {
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity = capacity
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{})
_, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
return err
}
// PatchPVCStatus updates PVC status using PATCH verb
// Don't use Update because this can be called from kubelet and if kubelet has an older client its
// Updates will overwrite new fields. And to avoid writing to a stale object, add ResourceVersion
// to the patch so that Patch will fail if the patch's RV != actual up-to-date RV like Update would
func PatchPVCStatus(
oldPVC *v1.PersistentVolumeClaim,
newPVC *v1.PersistentVolumeClaim,
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
pvcName := oldPVC.Name
patchBytes, err := createPVCPatch(oldPVC, newPVC)
if err != nil {
return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err)
}
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
Patch(oldPVC.Name, types.StrategicMergePatchType, patchBytes, "status")
if updateErr != nil {
return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr)
}
return updatedClaim, nil
}
func createPVCPatch(
oldPVC *v1.PersistentVolumeClaim,
newPVC *v1.PersistentVolumeClaim) ([]byte, error) {
oldData, err := json.Marshal(oldPVC)
if err != nil {
return nil, fmt.Errorf("PatchPVCStatus.Failed to marshal oldData for pvc %q with %v", pvcName, err)
return nil, fmt.Errorf("failed to marshal old data: %v", err)
}
newData, err := json.Marshal(newPVC)
if err != nil {
return nil, fmt.Errorf("PatchPVCStatus.Failed to marshal newData for pvc %q with %v", pvcName, err)
return nil, fmt.Errorf("failed to marshal new data: %v", err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
if err != nil {
return nil, fmt.Errorf("PatchPVCStatus.Failed to CreateTwoWayMergePatch for pvc %q with %v ", pvcName, err)
return nil, fmt.Errorf("failed to create 2 way merge patch: %v", err)
}
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
Patch(pvcName, types.StrategicMergePatchType, patchBytes, "status")
if updateErr != nil {
return nil, fmt.Errorf("PatchPVCStatus.Failed to patch PVC %q with %v", pvcName, updateErr)
patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
if err != nil {
return nil, fmt.Errorf("failed to add resource version: %v", err)
}
return updatedClaim, nil
return patchBytes, nil
}
func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patchBytes, &patchMap)
if err != nil {
return nil, fmt.Errorf("error unmarshalling patch: %v", err)
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, fmt.Errorf("error creating accessor: %v", err)
}
a.SetResourceVersion(resourceVersion)
versionBytes, err := json.Marshal(patchMap)
if err != nil {
return nil, fmt.Errorf("error marshalling json patch: %v", err)
}
return versionBytes, nil
}
// MergeResizeConditionOnPVC updates pvc with requested resize conditions

View File

@ -17,6 +17,7 @@ limitations under the License.
package util
import (
"encoding/json"
"reflect"
"testing"
"time"
@ -141,6 +142,38 @@ func TestMergeResizeCondition(t *testing.T) {
}
func TestCreatePVCPatch(t *testing.T) {
pvc1 := getPVC([]v1.PersistentVolumeClaimCondition{
{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
},
})
pvc1.SetResourceVersion("10")
pvc2 := pvc1.DeepCopy()
pvc2.Status.Capacity = v1.ResourceList{
v1.ResourceName("size"): resource.MustParse("10G"),
}
patchBytes, err := createPVCPatch(pvc1, pvc2)
if err != nil {
t.Errorf("error creating patch bytes %v", err)
}
var patchMap map[string]interface{}
err = json.Unmarshal(patchBytes, &patchMap)
if err != nil {
t.Errorf("error unmarshalling json patch : %v", err)
}
metadata, ok := patchMap["metadata"].(map[string]interface{})
if !ok {
t.Errorf("error converting metadata to version map")
}
resourceVersion, _ := metadata["resourceVersion"].(string)
if resourceVersion != "10" {
t.Errorf("expected resource version to 10 got %s", resourceVersion)
}
}
func getPVC(conditions []v1.PersistentVolumeClaimCondition) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "resize"},

View File

@ -9,7 +9,10 @@ go_library(
name = "go_default_library",
srcs = ["types.go"],
importpath = "k8s.io/kubernetes/pkg/volume/util/types",
deps = ["//staging/src/k8s.io/apimachinery/pkg/types:go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
)
filegroup(

View File

@ -17,7 +17,10 @@ limitations under the License.
// Package types defines types used only by volume components
package types
import "k8s.io/apimachinery/pkg/types"
import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
)
// UniquePodName defines the type to key pods off of
type UniquePodName types.UID
@ -34,3 +37,16 @@ type GeneratedOperations struct {
EventRecorderFunc func(*error)
CompleteFunc func(*error)
}
// Run executes the operations and its supporting functions
func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
if o.CompleteFunc != nil {
defer o.CompleteFunc(&detailedErr)
}
if o.EventRecorderFunc != nil {
defer o.EventRecorderFunc(&eventErr)
}
// Handle panic, if any, from operationFunc()
defer runtime.RecoverFromPanic(&detailedErr)
return o.OperationFunc()
}