diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index 6b7d31dfeb..0dea18d4aa 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -108,8 +108,10 @@ const createProvisionedPVInterval = 10 * time.Second type PersistentVolumeController struct { volumeController *framework.Controller volumeControllerStopCh chan struct{} + volumeSource cache.ListerWatcher claimController *framework.Controller claimControllerStopCh chan struct{} + claimSource cache.ListerWatcher kubeClient clientset.Interface eventRecorder record.EventRecorder cloud cloudprovider.Interface diff --git a/pkg/controller/persistentvolume/controller_base.go b/pkg/controller/persistentvolume/controller_base.go index fcd1fda662..9949d27293 100644 --- a/pkg/controller/persistentvolume/controller_base.go +++ b/pkg/controller/persistentvolume/controller_base.go @@ -89,6 +89,7 @@ func NewPersistentVolumeController( }, } } + controller.volumeSource = volumeSource if claimSource == nil { claimSource = &cache.ListWatch{ @@ -100,6 +101,7 @@ func NewPersistentVolumeController( }, } } + controller.claimSource = claimSource _, controller.volumeController = framework.NewIndexerInformer( volumeSource, @@ -125,6 +127,40 @@ func NewPersistentVolumeController( return controller } +// initalizeCaches fills all controller caches with initial data from etcd in +// order to have the caches already filled when first addClaim/addVolume to +// perform initial synchronization of the controller. +func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) { + volumeListObj, err := volumeSource.List(api.ListOptions{}) + if err != nil { + glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) + return + } + volumeList, ok := volumeListObj.(*api.List) + if !ok { + glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %+v", volumeListObj) + return + } + for _, volume := range volumeList.Items { + storeObjectUpdate(ctrl.volumes.store, volume, "volume") + } + + claimListObj, err := claimSource.List(api.ListOptions{}) + if err != nil { + glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) + return + } + claimList, ok := claimListObj.(*api.List) + if !ok { + glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", volumeListObj) + return + } + for _, claim := range claimList.Items { + storeObjectUpdate(ctrl.claims, claim, "claim") + } + glog.V(4).Infof("controller initialized") +} + // addVolume is callback from framework.Controller watching PersistentVolume // events. func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { @@ -138,10 +174,6 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) { return } - if !ctrl.isFullySynced() { - return - } - pv, ok := obj.(*api.PersistentVolume) if !ok { glog.Errorf("expected PersistentVolume but handler received %+v", obj) @@ -171,10 +203,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) return } - if !ctrl.isFullySynced() { - return - } - newVolume, ok := newObj.(*api.PersistentVolume) if !ok { glog.Errorf("Expected PersistentVolume but handler received %+v", newObj) @@ -196,10 +224,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { _ = ctrl.volumes.store.Delete(obj) - if !ctrl.isFullySynced() { - return - } - var volume *api.PersistentVolume var ok bool volume, ok = obj.(*api.PersistentVolume) @@ -220,6 +244,8 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) { return } + glog.V(4).Infof("volume %q deleted", volume.Name) + if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists { if claim, ok := claimObj.(*api.PersistentVolumeClaim); ok && claim != nil { // sync the claim when its volume is deleted. Explicitly syncing the @@ -254,10 +280,6 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) { return } - if !ctrl.isFullySynced() { - return - } - claim, ok := obj.(*api.PersistentVolumeClaim) if !ok { glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj) @@ -287,10 +309,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) return } - if !ctrl.isFullySynced() { - return - } - newClaim, ok := newObj.(*api.PersistentVolumeClaim) if !ok { glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj) @@ -312,10 +330,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { _ = ctrl.claims.Delete(obj) - if !ctrl.isFullySynced() { - return - } - var volume *api.PersistentVolume var claim *api.PersistentVolumeClaim var ok bool @@ -337,6 +351,7 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { if !ok || claim == nil { return } + glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim)) if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists { if volume, ok = pvObj.(*api.PersistentVolume); ok { @@ -365,6 +380,8 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { func (ctrl *PersistentVolumeController) Run() { glog.V(4).Infof("starting PersistentVolumeController") + ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource) + if ctrl.volumeControllerStopCh == nil { ctrl.volumeControllerStopCh = make(chan struct{}) go ctrl.volumeController.Run(ctrl.volumeControllerStopCh) @@ -383,14 +400,6 @@ func (ctrl *PersistentVolumeController) Stop() { close(ctrl.claimControllerStopCh) } -// isFullySynced returns true, if both volume and claim caches are fully loaded -// after startup. -// We do not want to process events with not fully loaded caches - e.g. we might -// recycle/delete PVs that don't have corresponding claim in the cache yet. -func (ctrl *PersistentVolumeController) isFullySynced() bool { - return ctrl.volumeController.HasSynced() && ctrl.claimController.HasSynced() -} - // Stateless functions func hasAnnotation(obj api.ObjectMeta, ann string) bool { diff --git a/pkg/controller/persistentvolume/controller_test.go b/pkg/controller/persistentvolume/controller_test.go index 0dd4f0c466..7ce093e22f 100644 --- a/pkg/controller/persistentvolume/controller_test.go +++ b/pkg/controller/persistentvolume/controller_test.go @@ -146,9 +146,10 @@ func TestControllerSync(t *testing.T) { go ctrl.Run() // Wait for the controller to pass initial sync. - for !ctrl.isFullySynced() { + for !ctrl.volumeController.HasSynced() || !ctrl.claimController.HasSynced() { time.Sleep(10 * time.Millisecond) } + glog.V(4).Infof("controller synced, starting test") count := reactor.getChangeCount()