diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f280b232ca..9a617e6db9 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -405,7 +405,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning, } volumeController := persistentvolumecontroller.NewController(params) - volumeController.Run(stop) + go volumeController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) attachDetachController, attachDetachControllerErr := diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index b2436b31b1..58887b75c4 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -30,12 +30,15 @@ go_library( "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", "//pkg/labels:go_default_library", "//pkg/runtime:go_default_library", "//pkg/types:go_default_library", "//pkg/util/goroutinemap:go_default_library", "//pkg/util/io:go_default_library", "//pkg/util/mount:go_default_library", + "//pkg/util/wait:go_default_library", + "//pkg/util/workqueue:go_default_library", "//pkg/volume:go_default_library", "//pkg/watch:go_default_library", "//vendor:github.com/golang/glog", diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 649567af9b..6ea1bab045 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/util/goroutinemap" + "k8s.io/kubernetes/pkg/util/workqueue" vol "k8s.io/kubernetes/pkg/volume" "github.com/golang/glog" @@ -146,8 +147,10 @@ const createProvisionedPVInterval = 10 * time.Second // changes. type PersistentVolumeController struct { volumeController *cache.Controller + volumeInformer cache.Indexer volumeSource cache.ListerWatcher claimController *cache.Controller + claimInformer cache.Store claimSource cache.ListerWatcher classReflector *cache.Reflector classSource cache.ListerWatcher @@ -163,10 +166,34 @@ type PersistentVolumeController struct { // must be cloned before any modification. These caches get updated both by // "xxx added/updated/deleted" events from etcd and by the controller when // it saves newer version to etcd. + // Why local cache: binding a volume to a claim generates 4 events, roughly + // in this order (depends on goroutine ordering): + // - volume.Spec update + // - volume.Status update + // - claim.Spec update + // - claim.Status update + // With these caches, the controller can check that it has already saved + // volume.Status and claim.Spec+Status and does not need to do anything + // when e.g. volume.Spec update event arrives before all the other events. + // Without this cache, it would see the old version of volume.Status and + // claim in the informers (it has not been updated from API server events + // yet) and it would try to fix these objects to be bound together. + // Any write to API server would fail with version conflict - these objects + // have been already written. volumes persistentVolumeOrderedIndex claims cache.Store classes cache.Store + // Work queues of claims and volumes to process. Every queue should have + // exactly one worker thread, especially syncClaim() is not reentrant. + // Two syncClaims could bind two different claims to the same volume or one + // claim to two volumes. The controller would recover from this (due to + // version errors in API server and other checks in this controller), + // however overall speed of multi-worker controller would be lower than if + // it runs single thread only. + claimQueue *workqueue.Type + volumeQueue *workqueue.Type + // Map of scheduled/running operations. runningOperations goroutinemap.GoRoutineMap @@ -463,19 +490,11 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) // In both cases, the volume is Bound and the claim is Pending. // Next syncClaim will fix it. To speed it up, we enqueue the claim // into the controller, which results in syncClaim to be called - // shortly (and in the right goroutine). + // shortly (and in the right worker goroutine). // This speeds up binding of provisioned volumes - provisioner saves // only the new PV and it expects that next syncClaim will bind the // claim to it. - clone, err := api.Scheme.DeepCopy(claim) - if err != nil { - return fmt.Errorf("error cloning claim %q: %v", claimToClaimKey(claim), err) - } - glog.V(5).Infof("requeueing claim %q for faster syncClaim", claimToClaimKey(claim)) - err = ctrl.claimController.Requeue(clone) - if err != nil { - return fmt.Errorf("error enqueing claim %q for faster sync: %v", claimToClaimKey(claim), err) - } + ctrl.claimQueue.Add(claimToClaimKey(claim)) return nil } else if claim.Spec.VolumeName == volume.Name { // Volume is bound to a claim properly, update status if necessary diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index 1d41e4aad4..c056754050 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -31,8 +31,11 @@ import ( unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/goroutinemap" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" vol "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" @@ -78,6 +81,8 @@ func NewController(p ControllerParameters) *PersistentVolumeController { createProvisionedPVRetryCount: createProvisionedPVRetryCount, createProvisionedPVInterval: createProvisionedPVInterval, alphaProvisioner: p.AlphaProvisioner, + claimQueue: workqueue.NewNamed("claims"), + volumeQueue: workqueue.NewNamed("volumes"), } controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller) @@ -126,25 +131,25 @@ func NewController(p ControllerParameters) *PersistentVolumeController { } controller.classSource = classSource - _, controller.volumeController = cache.NewIndexerInformer( + controller.volumeInformer, controller.volumeController = cache.NewIndexerInformer( volumeSource, &v1.PersistentVolume{}, p.SyncPeriod, cache.ResourceEventHandlerFuncs{ - AddFunc: controller.addVolume, - UpdateFunc: controller.updateVolume, - DeleteFunc: controller.deleteVolume, + AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, + DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, }, cache.Indexers{"accessmodes": accessModesIndexFunc}, ) - _, controller.claimController = cache.NewInformer( + controller.claimInformer, controller.claimController = cache.NewInformer( claimSource, &v1.PersistentVolumeClaim{}, p.SyncPeriod, cache.ResourceEventHandlerFuncs{ - AddFunc: controller.addClaim, - UpdateFunc: controller.updateClaim, - DeleteFunc: controller.deleteClaim, + AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, + DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, }, ) @@ -210,31 +215,40 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour glog.V(4).Infof("controller initialized") } -func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume *v1.PersistentVolume) (bool, error) { +// enqueueWork adds volume or claim to given work queue. +func (ctrl *PersistentVolumeController) enqueueWork(queue workqueue.Interface, obj interface{}) { + // Beware of "xxx deleted" events + if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { + obj = unknown.Obj + } + objName, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("failed to get key from object: %v", err) + return + } + glog.V(5).Infof("enqueued %q for sync", objName) + queue.Add(objName) +} + +func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume interface{}) (bool, error) { return storeObjectUpdate(ctrl.volumes.store, volume, "volume") } -func (ctrl *PersistentVolumeController) storeClaimUpdate(claim *v1.PersistentVolumeClaim) (bool, error) { +func (ctrl *PersistentVolumeController) storeClaimUpdate(claim interface{}) (bool, error) { return storeObjectUpdate(ctrl.claims, claim, "claim") } -// addVolume is callback from cache.Controller watching PersistentVolume -// events. -func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - glog.Errorf("expected PersistentVolume but handler received %#v", obj) - return - } - - if ctrl.upgradeVolumeFrom1_2(pv) { +// updateVolume runs in worker thread and handles "volume added", +// "volume updated" and "periodic sync" events. +func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) { + if deleted := ctrl.upgradeVolumeFrom1_2(volume); deleted { // volume deleted return } // Store the new volume version in the cache and do not process it if this // is an old version. - new, err := ctrl.storeVolumeUpdate(pv) + new, err := ctrl.storeVolumeUpdate(volume) if err != nil { glog.Errorf("%v", err) } @@ -242,111 +256,39 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { return } - if err := ctrl.syncVolume(pv); err != nil { - if errors.IsConflict(err) { - // Version conflict error happens quite often and the controller - // recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not add volume %q: %+v", pv.Name, err) - } else { - glog.Errorf("PersistentVolumeController could not add volume %q: %+v", pv.Name, err) - } - } -} - -// updateVolume is callback from cache.Controller watching PersistentVolume -// events. -func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) { - newVolume, ok := newObj.(*v1.PersistentVolume) - if !ok { - glog.Errorf("Expected PersistentVolume but handler received %#v", newObj) - return - } - - if ctrl.upgradeVolumeFrom1_2(newVolume) { - // volume deleted - return - } - - // Store the new volume version in the cache and do not process it if this - // is an old version. - new, err := ctrl.storeVolumeUpdate(newVolume) + err = ctrl.syncVolume(volume) if err != nil { - glog.Errorf("%v", err) - } - if !new { - return - } - - if err := ctrl.syncVolume(newVolume); err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller // recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err) + glog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err) } else { - glog.Errorf("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err) + glog.Errorf("could not sync volume %q: %+v", volume.Name, err) } } } -// deleteVolume is callback from cache.Controller watching PersistentVolume -// events. -func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { - _ = ctrl.volumes.store.Delete(obj) - - var volume *v1.PersistentVolume - var ok bool - volume, ok = obj.(*v1.PersistentVolume) - if !ok { - if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { - volume, ok = unknown.Obj.(*v1.PersistentVolume) - if !ok { - glog.Errorf("Expected PersistentVolume but deleteVolume received %#v", unknown.Obj) - return - } - } else { - glog.Errorf("Expected PersistentVolume but deleteVolume received %+v", obj) - return - } - } - - if volume == nil || volume.Spec.ClaimRef == nil { - return - } - +// deleteVolume runs in worker thread and handles "volume deleted" event. +func (ctrl *PersistentVolumeController) deleteVolume(volume *v1.PersistentVolume) { + _ = ctrl.volumes.store.Delete(volume) glog.V(4).Infof("volume %q deleted", volume.Name) - if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists { - if claim, ok := claimObj.(*v1.PersistentVolumeClaim); ok && claim != nil { - // sync the claim when its volume is deleted. Explicitly syncing the - // claim here in response to volume deletion prevents the claim from - // waiting until the next sync period for its Lost status. - err := ctrl.syncClaim(claim) - if err != nil { - if errors.IsConflict(err) { - // Version conflict error happens quite often and the - // controller recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteVolume handler: %+v", claimToClaimKey(claim), err) - } else { - glog.Errorf("PersistentVolumeController could not update volume %q from deleteVolume handler: %+v", claimToClaimKey(claim), err) - } - } - } else { - glog.Errorf("Cannot convert object from claim cache to claim %q!?: %#v", claimrefToClaimKey(volume.Spec.ClaimRef), claimObj) - } - } -} - -// addClaim is callback from cache.Controller watching PersistentVolumeClaim -// events. -func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { - // Store the new claim version in the cache and do not process it if this is - // an old version. - claim, ok := obj.(*v1.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj) + if volume.Spec.ClaimRef == nil { return } + // sync the claim when its volume is deleted. Explicitly syncing the + // claim here in response to volume deletion prevents the claim from + // waiting until the next sync period for its Lost status. + claimKey := claimrefToClaimKey(volume.Spec.ClaimRef) + glog.V(5).Infof("deleteVolume[%s]: scheduling sync of claim %q", volume.Name, claimKey) + ctrl.claimQueue.Add(claimKey) +} +// updateClaim runs in worker thread and handles "claim added", +// "claim updated" and "periodic sync" events. +func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) { + // Store the new claim version in the cache and do not process it if this is + // an old version. new, err := ctrl.storeClaimUpdate(claim) if err != nil { glog.Errorf("%v", err) @@ -354,106 +296,162 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { if !new { return } - - if err := ctrl.syncClaim(claim); err != nil { - if errors.IsConflict(err) { - // Version conflict error happens quite often and the controller - // recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err) - } else { - glog.Errorf("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err) - } - } -} - -// updateClaim is callback from cache.Controller watching PersistentVolumeClaim -// events. -func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) { - // Store the new claim version in the cache and do not process it if this is - // an old version. - newClaim, ok := newObj.(*v1.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj) - return - } - - new, err := ctrl.storeClaimUpdate(newClaim) + err = ctrl.syncClaim(claim) if err != nil { - glog.Errorf("%v", err) - } - if !new { - return - } - - if err := ctrl.syncClaim(newClaim); err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller // recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err) + glog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err) } else { - glog.Errorf("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err) + glog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err) } } } -// deleteClaim is callback from cache.Controller watching PersistentVolumeClaim -// events. -func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { - _ = ctrl.claims.Delete(obj) - - var volume *v1.PersistentVolume - var claim *v1.PersistentVolumeClaim - var ok bool - - claim, ok = obj.(*v1.PersistentVolumeClaim) - if !ok { - if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { - claim, ok = unknown.Obj.(*v1.PersistentVolumeClaim) - if !ok { - glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %#v", unknown.Obj) - return - } - } else { - glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %#v", obj) - return - } - } - - if claim == nil { - return - } +// deleteClaim runs in worker thread and handles "claim deleted" event. +func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeClaim) { + _ = ctrl.claims.Delete(claim) glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim)) - if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists { - if volume, ok = pvObj.(*v1.PersistentVolume); ok { - // sync the volume when its claim is deleted. Explicitly sync'ing the - // volume here in response to claim deletion prevents the volume from - // waiting until the next sync period for its Release. - if volume != nil { - err := ctrl.syncVolume(volume) - if err != nil { - if errors.IsConflict(err) { - // Version conflict error happens quite often and the - // controller recovers from it easily. - glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err) - } else { - glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err) - } - } - } - } else { - glog.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, pvObj) - } - } + // sync the volume when its claim is deleted. Explicitly sync'ing the + // volume here in response to claim deletion prevents the volume from + // waiting until the next sync period for its Release. + volumeName := claim.Spec.VolumeName + glog.V(5).Infof("deleteClaim[%s]: scheduling sync of volume %q", claimToClaimKey(claim), volumeName) + ctrl.volumeQueue.Add(volumeName) } // Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { - glog.V(4).Infof("starting PersistentVolumeController") + glog.V(1).Infof("starting PersistentVolumeController") ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource) go ctrl.volumeController.Run(stopCh) go ctrl.claimController.Run(stopCh) go ctrl.classReflector.RunUntil(stopCh) + go wait.Until(ctrl.volumeWorker, time.Second, stopCh) + go wait.Until(ctrl.claimWorker, time.Second, stopCh) + + <-stopCh + + ctrl.claimQueue.ShutDown() + ctrl.volumeQueue.ShutDown() +} + +// volumeWorker processes items from volumeQueue. It must run only once, +// syncVolume is not assured to be reentrant. +func (ctrl *PersistentVolumeController) volumeWorker() { + workFunc := func() bool { + keyObj, quit := ctrl.volumeQueue.Get() + if quit { + return true + } + defer ctrl.volumeQueue.Done(keyObj) + key := keyObj.(string) + glog.V(5).Infof("volumeWorker[%s]", key) + + volumeObj, found, err := ctrl.volumeInformer.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting volume %q from informer: %v", key, err) + return false + } + + if found { + // The volume still exists in informer cache, the event must have + // been add/update/sync + volume, ok := volumeObj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("expected volume, got %+v", volumeObj) + return false + } + ctrl.updateVolume(volume) + return false + } + + // The volume is not in informer cache, the event must have been + // "delete" + volumeObj, found, err = ctrl.volumes.store.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting volume %q from cache: %v", key, err) + return false + } + if !found { + // The controller has already processed the delete event and + // deleted the volume from its cache + glog.V(2).Infof("deletion of volume %q was already processed", key) + return false + } + volume, ok := volumeObj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("expected volume, got %+v", volumeObj) + return false + } + ctrl.deleteVolume(volume) + return false + } + for { + if quit := workFunc(); quit { + glog.Infof("volume worker queue shutting down") + return + } + } +} + +// claimWorker processes items from claimQueue. It must run only once, +// syncClaim is not reentrant. +func (ctrl *PersistentVolumeController) claimWorker() { + workFunc := func() bool { + keyObj, quit := ctrl.claimQueue.Get() + if quit { + return true + } + defer ctrl.claimQueue.Done(keyObj) + key := keyObj.(string) + glog.V(5).Infof("claimWorker[%s]", key) + + claimObj, found, err := ctrl.claimInformer.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting claim %q from informer: %v", key, err) + return false + } + + if found { + // The claim still exists in informer cache, the event must have + // been add/update/sync + claim, ok := claimObj.(*v1.PersistentVolumeClaim) + if !ok { + glog.Errorf("expected claim, got %+v", claimObj) + return false + } + ctrl.updateClaim(claim) + return false + } + + // The claim is not in informer cache, the event must have been "delete" + claimObj, found, err = ctrl.claims.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting claim %q from cache: %v", key, err) + return false + } + if !found { + // The controller has already processed the delete event and + // deleted the claim from its cache + glog.V(2).Infof("deletion of claim %q was already processed", key) + return false + } + claim, ok := claimObj.(*v1.PersistentVolumeClaim) + if !ok { + glog.Errorf("expected claim, got %+v", claimObj) + return false + } + ctrl.deleteClaim(claim) + return false + } + for { + if quit := workFunc(); quit { + glog.Infof("claim worker queue shutting down") + return + } + } } const ( @@ -563,17 +561,20 @@ func isVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolum // controller itself. Returns "true", if the cache was updated, false if the // object is an old version and should be ignored. func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) { - objAccessor, err := meta.Accessor(obj) + objName, err := controller.KeyFunc(obj) if err != nil { - return false, fmt.Errorf("Error reading cache of %s: %v", className, err) + return false, fmt.Errorf("Couldn't get key for object %+v: %v", obj, err) } - objName := objAccessor.GetNamespace() + "/" + objAccessor.GetName() - oldObj, found, err := store.Get(obj) if err != nil { return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err) } + objAccessor, err := meta.Accessor(obj) + if err != nil { + return false, err + } + if !found { // This is a new object glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion()) diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index 59867e756d..ffc9aa0074 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -176,7 +176,7 @@ func TestControllerSync(t *testing.T) { // Start the controller stopCh := make(chan struct{}) - ctrl.Run(stopCh) + go ctrl.Run(stopCh) // Wait for the controller to pass initial sync and fill its caches. for !ctrl.volumeController.HasSynced() || diff --git a/test/integration/volume/persistent_volumes_test.go b/test/integration/volume/persistent_volumes_test.go index 351b13f219..0f1df45748 100644 --- a/test/integration/volume/persistent_volumes_test.go +++ b/test/integration/volume/persistent_volumes_test.go @@ -120,7 +120,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - ctrl.Run(stopCh) + go ctrl.Run(stopCh) defer close(stopCh) // This PV will be claimed, released, and recycled. @@ -174,7 +174,7 @@ func TestPersistentVolumeDeleter(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - ctrl.Run(stopCh) + go ctrl.Run(stopCh) defer close(stopCh) // This PV will be claimed, released, and deleted. @@ -233,7 +233,7 @@ func TestPersistentVolumeBindRace(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - ctrl.Run(stopCh) + go ctrl.Run(stopCh) defer close(stopCh) pv := createPV("fake-pv-race", "/tmp/foo", "10G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimRetain) @@ -304,7 +304,7 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - controller.Run(stopCh) + go controller.Run(stopCh) defer close(stopCh) var ( @@ -384,7 +384,7 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - controller.Run(stopCh) + go controller.Run(stopCh) defer close(stopCh) var ( @@ -483,7 +483,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - controller.Run(stopCh) + go controller.Run(stopCh) defer close(stopCh) maxPVs := getObjectCount() @@ -572,7 +572,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) controllerStopCh := make(chan struct{}) - binder.Run(controllerStopCh) + go binder.Run(controllerStopCh) defer close(controllerStopCh) objCount := getObjectCount() @@ -785,7 +785,7 @@ func TestPersistentVolumeControllerStartup(t *testing.T) { // Start the controller when all PVs and PVCs are already saved in etcd stopCh := make(chan struct{}) - binder.Run(stopCh) + go binder.Run(stopCh) defer close(stopCh) // wait for at least two sync periods for changes. No volume should be @@ -872,7 +872,7 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) { testClient.Storage().StorageClasses().Create(&storageClass) stopCh := make(chan struct{}) - binder.Run(stopCh) + go binder.Run(stopCh) defer close(stopCh) objCount := getObjectCount() @@ -957,7 +957,7 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{}) stopCh := make(chan struct{}) - controller.Run(stopCh) + go controller.Run(stopCh) defer close(stopCh) // This PV will be claimed, released, and deleted