From 0fd5f2028d3ef9a618725c97920677d4995ee5be Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Mon, 2 Jan 2017 15:17:24 +0100 Subject: [PATCH] Add work queues to PV controller PV controller should not use Controller.Requeue, as as it is not available in shared informers. We need to implement our own work queues instead where we can enqueue volumes/claims as we want. --- .../app/controllermanager.go | 2 +- pkg/controller/volume/persistentvolume/BUILD | 3 + .../volume/persistentvolume/pv_controller.go | 39 +- .../persistentvolume/pv_controller_base.go | 395 +++++++++--------- .../persistentvolume/pv_controller_test.go | 2 +- .../volume/persistent_volumes_test.go | 20 +- 6 files changed, 242 insertions(+), 219 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f280b232ca..9a617e6db9 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -405,7 +405,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning, } volumeController := persistentvolumecontroller.NewController(params) - volumeController.Run(stop) + go volumeController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) attachDetachController, attachDetachControllerErr := diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index b2436b31b1..58887b75c4 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -30,12 +30,15 @@ go_library( "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", "//pkg/labels:go_default_library", "//pkg/runtime:go_default_library", "//pkg/types:go_default_library", "//pkg/util/goroutinemap:go_default_library", "//pkg/util/io:go_default_library", "//pkg/util/mount:go_default_library", + "//pkg/util/wait:go_default_library", + "//pkg/util/workqueue:go_default_library", "//pkg/volume:go_default_library", "//pkg/watch:go_default_library", "//vendor:github.com/golang/glog", diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 649567af9b..6ea1bab045 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/util/goroutinemap" + "k8s.io/kubernetes/pkg/util/workqueue" vol "k8s.io/kubernetes/pkg/volume" "github.com/golang/glog" @@ -146,8 +147,10 @@ const createProvisionedPVInterval = 10 * time.Second // changes. type PersistentVolumeController struct { volumeController *cache.Controller + volumeInformer cache.Indexer volumeSource cache.ListerWatcher claimController *cache.Controller + claimInformer cache.Store claimSource cache.ListerWatcher classReflector *cache.Reflector classSource cache.ListerWatcher @@ -163,10 +166,34 @@ type PersistentVolumeController struct { // must be cloned before any modification. These caches get updated both by // "xxx added/updated/deleted" events from etcd and by the controller when // it saves newer version to etcd. + // Why local cache: binding a volume to a claim generates 4 events, roughly + // in this order (depends on goroutine ordering): + // - volume.Spec update + // - volume.Status update + // - claim.Spec update + // - claim.Status update + // With these caches, the controller can check that it has already saved + // volume.Status and claim.Spec+Status and does not need to do anything + // when e.g. volume.Spec update event arrives before all the other events. + // Without this cache, it would see the old version of volume.Status and + // claim in the informers (it has not been updated from API server events + // yet) and it would try to fix these objects to be bound together. + // Any write to API server would fail with version conflict - these objects + // have been already written. volumes persistentVolumeOrderedIndex claims cache.Store classes cache.Store + // Work queues of claims and volumes to process. Every queue should have + // exactly one worker thread, especially syncClaim() is not reentrant. + // Two syncClaims could bind two different claims to the same volume or one + // claim to two volumes. The controller would recover from this (due to + // version errors in API server and other checks in this controller), + // however overall speed of multi-worker controller would be lower than if + // it runs single thread only. + claimQueue *workqueue.Type + volumeQueue *workqueue.Type + // Map of scheduled/running operations. runningOperations goroutinemap.GoRoutineMap @@ -463,19 +490,11 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) // In both cases, the volume is Bound and the claim is Pending. // Next syncClaim will fix it. To speed it up, we enqueue the claim // into the controller, which results in syncClaim to be called - // shortly (and in the right goroutine). + // shortly (and in the right worker goroutine). // This speeds up binding of provisioned volumes - provisioner saves // only the new PV and it expects that next syncClaim will bind the // claim to it. - clone, err := api.Scheme.DeepCopy(claim) - if err != nil { - return fmt.Errorf("error cloning claim %q: %v", claimToClaimKey(claim), err) - } - glog.V(5).Infof("requeueing claim %q for faster syncClaim", claimToClaimKey(claim)) - err = ctrl.claimController.Requeue(clone) - if err != nil { - return fmt.Errorf("error enqueing claim %q for faster sync: %v", claimToClaimKey(claim), err) - } + ctrl.claimQueue.Add(claimToClaimKey(claim)) return nil } else if claim.Spec.VolumeName == volume.Name { // Volume is bound to a claim properly, update status if necessary diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index 1d41e4aad4..c056754050 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -31,8 +31,11 @@ import ( unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/goroutinemap" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" vol "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" @@ -78,6 +81,8 @@ func NewController(p ControllerParameters) *PersistentVolumeController { createProvisionedPVRetryCount: createProvisionedPVRetryCount, createProvisionedPVInterval: createProvisionedPVInterval, alphaProvisioner: p.AlphaProvisioner, + claimQueue: workqueue.NewNamed("claims"), + volumeQueue: workqueue.NewNamed("volumes"), } controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller) @@ -126,25 +131,25 @@ func NewController(p ControllerParameters) *PersistentVolumeController { } controller.classSource = classSource - _, controller.volumeController = cache.NewIndexerInformer( + controller.volumeInformer, controller.volumeController = cache.NewIndexerInformer( volumeSource, &v1.PersistentVolume{}, p.SyncPeriod, cache.ResourceEventHandlerFuncs{ - AddFunc: controller.addVolume, - UpdateFunc: controller.updateVolume, - DeleteFunc: controller.deleteVolume, + AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, + DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, }, cache.Indexers{"accessmodes": accessModesIndexFunc}, ) - _, controller.claimController = cache.NewInformer( + controller.claimInformer, controller.claimController = cache.NewInformer( claimSource, &v1.PersistentVolumeClaim{}, p.SyncPeriod, cache.ResourceEventHandlerFuncs{ - AddFunc: controller.addClaim, - UpdateFunc: controller.updateClaim, - DeleteFunc: controller.deleteClaim, + AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, + DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, }, ) @@ -210,31 +215,40 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour glog.V(4).Infof("controller initialized") } -func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume *v1.PersistentVolume) (bool, error) { +// enqueueWork adds volume or claim to given work queue. +func (ctrl *PersistentVolumeController) enqueueWork(queue workqueue.Interface, obj interface{}) { + // Beware of "xxx deleted" events + if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { + obj = unknown.Obj + } + objName, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("failed to get key from object: %v", err) + return + } + glog.V(5).Infof("enqueued %q for sync", objName) + queue.Add(objName) +} + +func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume interface{}) (bool, error) { return storeObjectUpdate(ctrl.volumes.store, volume, "volume") } -func (ctrl *PersistentVolumeController) storeClaimUpdate(claim *v1.PersistentVolumeClaim) (bool, error) { +func (ctrl *PersistentVolumeController) storeClaimUpdate(claim interface{}) (bool, error) { return storeObjectUpdate(ctrl.claims, claim, "claim") } -// addVolume is callback from cache.Controller watching PersistentVolume -// events. -func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - glog.Errorf("expected PersistentVolume but handler received %#v", obj) - return - } - - if ctrl.upgradeVolumeFrom1_2(pv) { +// updateVolume runs in worker thread and handles "volume added", +// "volume updated" and "periodic sync" events. +func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) { + if deleted := ctrl.upgradeVolumeFrom1_2(volume); deleted { // volume deleted return } // Store the new volume version in the cache and do not process it if this // is an old version. - new, err := ctrl.storeVolumeUpdate(pv) + new, err := ctrl.storeVolumeUpdate(volume) if err != nil { glog.Errorf("%v", err) } @@ -242,111 +256,39 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { return } - if err := ctrl.syncVolume(pv); err != nil { - if errors.IsConflict(err) { - // Version conflict error happens quite often and the controller - // recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not add volume %q: %+v", pv.Name, err) - } else { - glog.Errorf("PersistentVolumeController could not add volume %q: %+v", pv.Name, err) - } - } -} - -// updateVolume is callback from cache.Controller watching PersistentVolume -// events. -func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) { - newVolume, ok := newObj.(*v1.PersistentVolume) - if !ok { - glog.Errorf("Expected PersistentVolume but handler received %#v", newObj) - return - } - - if ctrl.upgradeVolumeFrom1_2(newVolume) { - // volume deleted - return - } - - // Store the new volume version in the cache and do not process it if this - // is an old version. - new, err := ctrl.storeVolumeUpdate(newVolume) + err = ctrl.syncVolume(volume) if err != nil { - glog.Errorf("%v", err) - } - if !new { - return - } - - if err := ctrl.syncVolume(newVolume); err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller // recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err) + glog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err) } else { - glog.Errorf("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err) + glog.Errorf("could not sync volume %q: %+v", volume.Name, err) } } } -// deleteVolume is callback from cache.Controller watching PersistentVolume -// events. -func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { - _ = ctrl.volumes.store.Delete(obj) - - var volume *v1.PersistentVolume - var ok bool - volume, ok = obj.(*v1.PersistentVolume) - if !ok { - if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { - volume, ok = unknown.Obj.(*v1.PersistentVolume) - if !ok { - glog.Errorf("Expected PersistentVolume but deleteVolume received %#v", unknown.Obj) - return - } - } else { - glog.Errorf("Expected PersistentVolume but deleteVolume received %+v", obj) - return - } - } - - if volume == nil || volume.Spec.ClaimRef == nil { - return - } - +// deleteVolume runs in worker thread and handles "volume deleted" event. +func (ctrl *PersistentVolumeController) deleteVolume(volume *v1.PersistentVolume) { + _ = ctrl.volumes.store.Delete(volume) glog.V(4).Infof("volume %q deleted", volume.Name) - if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists { - if claim, ok := claimObj.(*v1.PersistentVolumeClaim); ok && claim != nil { - // sync the claim when its volume is deleted. Explicitly syncing the - // claim here in response to volume deletion prevents the claim from - // waiting until the next sync period for its Lost status. - err := ctrl.syncClaim(claim) - if err != nil { - if errors.IsConflict(err) { - // Version conflict error happens quite often and the - // controller recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteVolume handler: %+v", claimToClaimKey(claim), err) - } else { - glog.Errorf("PersistentVolumeController could not update volume %q from deleteVolume handler: %+v", claimToClaimKey(claim), err) - } - } - } else { - glog.Errorf("Cannot convert object from claim cache to claim %q!?: %#v", claimrefToClaimKey(volume.Spec.ClaimRef), claimObj) - } - } -} - -// addClaim is callback from cache.Controller watching PersistentVolumeClaim -// events. -func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { - // Store the new claim version in the cache and do not process it if this is - // an old version. - claim, ok := obj.(*v1.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj) + if volume.Spec.ClaimRef == nil { return } + // sync the claim when its volume is deleted. Explicitly syncing the + // claim here in response to volume deletion prevents the claim from + // waiting until the next sync period for its Lost status. + claimKey := claimrefToClaimKey(volume.Spec.ClaimRef) + glog.V(5).Infof("deleteVolume[%s]: scheduling sync of claim %q", volume.Name, claimKey) + ctrl.claimQueue.Add(claimKey) +} +// updateClaim runs in worker thread and handles "claim added", +// "claim updated" and "periodic sync" events. +func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) { + // Store the new claim version in the cache and do not process it if this is + // an old version. new, err := ctrl.storeClaimUpdate(claim) if err != nil { glog.Errorf("%v", err) @@ -354,106 +296,162 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { if !new { return } - - if err := ctrl.syncClaim(claim); err != nil { - if errors.IsConflict(err) { - // Version conflict error happens quite often and the controller - // recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err) - } else { - glog.Errorf("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err) - } - } -} - -// updateClaim is callback from cache.Controller watching PersistentVolumeClaim -// events. -func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) { - // Store the new claim version in the cache and do not process it if this is - // an old version. - newClaim, ok := newObj.(*v1.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj) - return - } - - new, err := ctrl.storeClaimUpdate(newClaim) + err = ctrl.syncClaim(claim) if err != nil { - glog.Errorf("%v", err) - } - if !new { - return - } - - if err := ctrl.syncClaim(newClaim); err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller // recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err) + glog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err) } else { - glog.Errorf("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err) + glog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err) } } } -// deleteClaim is callback from cache.Controller watching PersistentVolumeClaim -// events. -func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { - _ = ctrl.claims.Delete(obj) - - var volume *v1.PersistentVolume - var claim *v1.PersistentVolumeClaim - var ok bool - - claim, ok = obj.(*v1.PersistentVolumeClaim) - if !ok { - if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { - claim, ok = unknown.Obj.(*v1.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %#v", unknown.Obj) - return - } - } else { - glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %#v", obj) - return - } - } - - if claim == nil { - return - } +// deleteClaim runs in worker thread and handles "claim deleted" event. +func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeClaim) { + _ = ctrl.claims.Delete(claim) glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim)) - if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists { - if volume, ok = pvObj.(*v1.PersistentVolume); ok { - // sync the volume when its claim is deleted. Explicitly sync'ing the - // volume here in response to claim deletion prevents the volume from - // waiting until the next sync period for its Release. - if volume != nil { - err := ctrl.syncVolume(volume) - if err != nil { - if errors.IsConflict(err) { - // Version conflict error happens quite often and the - // controller recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err) - } else { - glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err) - } - } - } - } else { - glog.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, pvObj) - } - } + // sync the volume when its claim is deleted. Explicitly sync'ing the + // volume here in response to claim deletion prevents the volume from + // waiting until the next sync period for its Release. + volumeName := claim.Spec.VolumeName + glog.V(5).Infof("deleteClaim[%s]: scheduling sync of volume %q", claimToClaimKey(claim), volumeName) + ctrl.volumeQueue.Add(volumeName) } // Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { - glog.V(4).Infof("starting PersistentVolumeController") + glog.V(1).Infof("starting PersistentVolumeController") ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource) go ctrl.volumeController.Run(stopCh) go ctrl.claimController.Run(stopCh) go ctrl.classReflector.RunUntil(stopCh) + go wait.Until(ctrl.volumeWorker, time.Second, stopCh) + go wait.Until(ctrl.claimWorker, time.Second, stopCh) + + <-stopCh + + ctrl.claimQueue.ShutDown() + ctrl.volumeQueue.ShutDown() +} + +// volumeWorker processes items from volumeQueue. It must run only once, +// syncVolume is not assured to be reentrant. +func (ctrl *PersistentVolumeController) volumeWorker() { + workFunc := func() bool { + keyObj, quit := ctrl.volumeQueue.Get() + if quit { + return true + } + defer ctrl.volumeQueue.Done(keyObj) + key := keyObj.(string) + glog.V(5).Infof("volumeWorker[%s]", key) + + volumeObj, found, err := ctrl.volumeInformer.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting volume %q from informer: %v", key, err) + return false + } + + if found { + // The volume still exists in informer cache, the event must have + // been add/update/sync + volume, ok := volumeObj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("expected volume, got %+v", volumeObj) + return false + } + ctrl.updateVolume(volume) + return false + } + + // The volume is not in informer cache, the event must have been + // "delete" + volumeObj, found, err = ctrl.volumes.store.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting volume %q from cache: %v", key, err) + return false + } + if !found { + // The controller has already processed the delete event and + // deleted the volume from its cache + glog.V(2).Infof("deletion of volume %q was already processed", key) + return false + } + volume, ok := volumeObj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("expected volume, got %+v", volumeObj) + return false + } + ctrl.deleteVolume(volume) + return false + } + for { + if quit := workFunc(); quit { + glog.Infof("volume worker queue shutting down") + return + } + } +} + +// claimWorker processes items from claimQueue. It must run only once, +// syncClaim is not reentrant. +func (ctrl *PersistentVolumeController) claimWorker() { + workFunc := func() bool { + keyObj, quit := ctrl.claimQueue.Get() + if quit { + return true + } + defer ctrl.claimQueue.Done(keyObj) + key := keyObj.(string) + glog.V(5).Infof("claimWorker[%s]", key) + + claimObj, found, err := ctrl.claimInformer.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting claim %q from informer: %v", key, err) + return false + } + + if found { + // The claim still exists in informer cache, the event must have + // been add/update/sync + claim, ok := claimObj.(*v1.PersistentVolumeClaim) + if !ok { + glog.Errorf("expected claim, got %+v", claimObj) + return false + } + ctrl.updateClaim(claim) + return false + } + + // The claim is not in informer cache, the event must have been "delete" + claimObj, found, err = ctrl.claims.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting claim %q from cache: %v", key, err) + return false + } + if !found { + // The controller has already processed the delete event and + // deleted the claim from its cache + glog.V(2).Infof("deletion of claim %q was already processed", key) + return false + } + claim, ok := claimObj.(*v1.PersistentVolumeClaim) + if !ok { + glog.Errorf("expected claim, got %+v", claimObj) + return false + } + ctrl.deleteClaim(claim) + return false + } + for { + if quit := workFunc(); quit { + glog.Infof("claim worker queue shutting down") + return + } + } } const ( @@ -563,17 +561,20 @@ func isVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolum // controller itself. Returns "true", if the cache was updated, false if the // object is an old version and should be ignored. func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) { - objAccessor, err := meta.Accessor(obj) + objName, err := controller.KeyFunc(obj) if err != nil { - return false, fmt.Errorf("Error reading cache of %s: %v", className, err) + return false, fmt.Errorf("Couldn't get key for object %+v: %v", obj, err) } - objName := objAccessor.GetNamespace() + "/" + objAccessor.GetName() - oldObj, found, err := store.Get(obj) if err != nil { return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err) } + objAccessor, err := meta.Accessor(obj) + if err != nil { + return false, err + } + if !found { // This is a new object glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion()) diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index 59867e756d..ffc9aa0074 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -176,7 +176,7 @@ func TestControllerSync(t *testing.T) { // Start the controller stopCh := make(chan struct{}) - ctrl.Run(stopCh) + go ctrl.Run(stopCh) // Wait for the controller to pass initial sync and fill its caches. for !ctrl.volumeController.HasSynced() || diff --git a/test/integration/volume/persistent_volumes_test.go b/test/integration/volume/persistent_volumes_test.go index 351b13f219..0f1df45748 100644 --- a/test/integration/volume/persistent_volumes_test.go +++ b/test/integration/volume/persistent_volumes_test.go @@ -120,7 +120,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - ctrl.Run(stopCh) + go ctrl.Run(stopCh) defer close(stopCh) // This PV will be claimed, released, and recycled. @@ -174,7 +174,7 @@ func TestPersistentVolumeDeleter(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - ctrl.Run(stopCh) + go ctrl.Run(stopCh) defer close(stopCh) // This PV will be claimed, released, and deleted. @@ -233,7 +233,7 @@ func TestPersistentVolumeBindRace(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - ctrl.Run(stopCh) + go ctrl.Run(stopCh) defer close(stopCh) pv := createPV("fake-pv-race", "/tmp/foo", "10G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimRetain) @@ -304,7 +304,7 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - controller.Run(stopCh) + go controller.Run(stopCh) defer close(stopCh) var ( @@ -384,7 +384,7 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - controller.Run(stopCh) + go controller.Run(stopCh) defer close(stopCh) var ( @@ -483,7 +483,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - controller.Run(stopCh) + go controller.Run(stopCh) defer close(stopCh) maxPVs := getObjectCount() @@ -572,7 +572,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) controllerStopCh := make(chan struct{}) - binder.Run(controllerStopCh) + go binder.Run(controllerStopCh) defer close(controllerStopCh) objCount := getObjectCount() @@ -785,7 +785,7 @@ func TestPersistentVolumeControllerStartup(t *testing.T) { // Start the controller when all PVs and PVCs are already saved in etcd stopCh := make(chan struct{}) - binder.Run(stopCh) + go binder.Run(stopCh) defer close(stopCh) // wait for at least two sync periods for changes. No volume should be @@ -872,7 +872,7 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) { testClient.Storage().StorageClasses().Create(&storageClass) stopCh := make(chan struct{}) - binder.Run(stopCh) + go binder.Run(stopCh) defer close(stopCh) objCount := getObjectCount() @@ -957,7 +957,7 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - controller.Run(stopCh) + go controller.Run(stopCh) defer close(stopCh) // This PV will be claimed, released, and deleted