From 1d6dbdd9d2cd96f6a802fbed91c93fb9532d507d Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Thu, 25 Aug 2016 21:55:23 -0400 Subject: [PATCH] Use PV shared informer in PV controller --- .../app/controllermanager.go | 2 +- .../controllermanager/controllermanager.go | 2 +- .../volume/persistentvolume/controller.go | 11 +- .../persistentvolume/controller_base.go | 105 +++++++++++------- .../volume/persistentvolume/framework_test.go | 2 +- .../persistent_volumes_test.go | 2 +- 6 files changed, 80 insertions(+), 44 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 2ebbc0b8fd..f19345a680 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -430,7 +430,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), cloud, s.ClusterName, - nil, // volumeSource + sharedInformers.PersistentVolumes().Informer(), nil, // claimSource nil, // classSource nil, // eventRecorder diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 9129fbe6ec..ac5baea601 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -289,7 +289,7 @@ func (s *CMServer) Run(_ []string) error { if err != nil { glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) } - volumeController := persistentvolumecontroller.NewPersistentVolumeController( + volumeController := persistentvolumecontroller.NewPersistentVolumeControllerFromClient( clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")), s.PVClaimBinderSyncPeriod.Duration, alphaProvisioner, diff --git a/pkg/controller/volume/persistentvolume/controller.go b/pkg/controller/volume/persistentvolume/controller.go index 44f3edfc3a..5f686d7614 100644 --- a/pkg/controller/volume/persistentvolume/controller.go +++ b/pkg/controller/volume/persistentvolume/controller.go @@ -153,8 +153,8 @@ const createProvisionedPVInterval = 10 * time.Second // framework.Controllers that watch PersistentVolume and PersistentVolumeClaim // changes. type PersistentVolumeController struct { - volumeController *framework.Controller - volumeSource cache.ListerWatcher + volumeController framework.ControllerInterface + pvInformer framework.SharedIndexInformer claimController *framework.Controller claimSource cache.ListerWatcher classReflector *cache.Reflector @@ -175,6 +175,13 @@ type PersistentVolumeController struct { claims cache.Store classes cache.Store + // isInformerInternal is true if the informer we hold is a personal informer, + // false if it is a shared informer. If we're using a normal shared informer, + // then the informer will be started for us. If we have a personal informer, + // we must start it ourselves. If you start the controller using + // NewPersistentVolumeController(passing SharedInformer), this will be false. + isInformerInternal bool + // Map of scheduled/running operations. runningOperations goroutinemap.GoRoutineMap diff --git a/pkg/controller/volume/persistentvolume/controller_base.go b/pkg/controller/volume/persistentvolume/controller_base.go index cd9704abf4..e29071ce89 100644 --- a/pkg/controller/volume/persistentvolume/controller_base.go +++ b/pkg/controller/volume/persistentvolume/controller_base.go @@ -38,6 +38,8 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/controller/framework/informers" + "k8s.io/kubernetes/pkg/util/wait" ) // This file contains the controller base functionality, i.e. framework to @@ -52,7 +54,8 @@ func NewPersistentVolumeController( volumePlugins []vol.VolumePlugin, cloud cloudprovider.Interface, clusterName string, - volumeSource, claimSource, classSource cache.ListerWatcher, + pvInformer framework.SharedIndexInformer, + claimSource, classSource cache.ListerWatcher, eventRecorder record.EventRecorder, enableDynamicProvisioning bool, ) *PersistentVolumeController { @@ -84,17 +87,8 @@ func NewPersistentVolumeController( } } - if volumeSource == nil { - volumeSource = &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return kubeClient.Core().PersistentVolumes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return kubeClient.Core().PersistentVolumes().Watch(options) - }, - } - } - controller.volumeSource = volumeSource + controller.pvInformer = pvInformer + controller.isInformerInternal = false if claimSource == nil { claimSource = &cache.ListWatch{ @@ -120,17 +114,8 @@ func NewPersistentVolumeController( } controller.classSource = classSource - _, controller.volumeController = framework.NewIndexerInformer( - volumeSource, - &api.PersistentVolume{}, - syncPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: controller.addVolume, - UpdateFunc: controller.updateVolume, - DeleteFunc: controller.deleteVolume, - }, - cache.Indexers{"accessmodes": accessModesIndexFunc}, - ) + controller.volumeController = pvInformer.GetController() + _, controller.claimController = framework.NewInformer( claimSource, &api.PersistentVolumeClaim{}, @@ -154,25 +139,55 @@ func NewPersistentVolumeController( return controller } +// NewPersistentVolumeControllerFromClient returns a new +// *PersistentVolumeController that runs its own informer. +func NewPersistentVolumeControllerFromClient( + kubeClient clientset.Interface, + syncPeriod time.Duration, + alphaProvisioner vol.ProvisionableVolumePlugin, + volumePlugins []vol.VolumePlugin, + cloud cloudprovider.Interface, + clusterName string, + volumeSource, claimSource, classSource cache.ListerWatcher, + eventRecorder record.EventRecorder, + enableDynamicProvisioning bool, +) *PersistentVolumeController { + pvInformer := informers.NewPVInformer(kubeClient, syncPeriod) + if volumeSource != nil { + pvInformer = framework.NewSharedIndexInformer(volumeSource, &api.PersistentVolume{}, syncPeriod, cache.Indexers{"accessmodes": accessModesIndexFunc}) + } + ctrl := NewPersistentVolumeController( + kubeClient, + syncPeriod, + alphaProvisioner, + volumePlugins, + cloud, + clusterName, + pvInformer, + claimSource, + classSource, + eventRecorder, + enableDynamicProvisioning, + ) + ctrl.isInformerInternal = true + + return ctrl +} + // initializeCaches fills all controller caches with initial data from etcd in // order to have the caches already filled when first addClaim/addVolume to // perform initial synchronization of the controller. -func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) { - volumeListObj, err := volumeSource.List(api.ListOptions{}) - if err != nil { - glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) - return - } - volumeList, ok := volumeListObj.(*api.PersistentVolumeList) - if !ok { - glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", volumeListObj) - return - } - for _, volume := range volumeList.Items { +func (ctrl *PersistentVolumeController) initializeCaches(volumeStore cache.Store, claimSource cache.ListerWatcher) { + volumeList := volumeStore.List() + for _, obj := range volumeList { + volume, ok := obj.(*api.PersistentVolume) + if !ok { + glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", obj) + } // Ignore template volumes from kubernetes 1.2 - deleted := ctrl.upgradeVolumeFrom1_2(&volume) + deleted := ctrl.upgradeVolumeFrom1_2(volume) if !deleted { - clone, err := conversion.NewCloner().DeepCopy(&volume) + clone, err := conversion.NewCloner().DeepCopy(volume) if err != nil { glog.Errorf("error cloning volume %q: %v", volume.Name, err) continue @@ -444,7 +459,21 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { // Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { glog.V(4).Infof("starting PersistentVolumeController") - ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource) + if ctrl.isInformerInternal { + go ctrl.pvInformer.Run(stopCh) + // Wait to avoid data race between Run and AddEventHandler in tests + wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + return ctrl.pvInformer.HasSynced(), nil + }) + } + ctrl.initializeCaches(ctrl.pvInformer.GetStore(), ctrl.claimSource) + // AddEventHandler will send synthetic add events which we don't want until + // we have initialized the caches + ctrl.pvInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: ctrl.addVolume, + UpdateFunc: ctrl.updateVolume, + DeleteFunc: ctrl.deleteVolume, + }) go ctrl.volumeController.Run(stopCh) go ctrl.claimController.Run(stopCh) go ctrl.classReflector.RunUntil(stopCh) diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index d505eb752f..de4fe8267e 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -594,7 +594,7 @@ func newTestController(kubeClient clientset.Interface, volumeSource, claimSource if classSource == nil { classSource = framework.NewFakeControllerSource() } - ctrl := NewPersistentVolumeController( + ctrl := NewPersistentVolumeControllerFromClient( kubeClient, 5*time.Second, // sync period nil, // alpha provisioner diff --git a/test/integration/persistentvolumes/persistent_volumes_test.go b/test/integration/persistentvolumes/persistent_volumes_test.go index 760b3e03ee..67628a0a58 100644 --- a/test/integration/persistentvolumes/persistent_volumes_test.go +++ b/test/integration/persistentvolumes/persistent_volumes_test.go @@ -1126,7 +1126,7 @@ func createClients(ns *api.Namespace, t *testing.T, s *httptest.Server, syncPeri cloud := &fake_cloud.FakeCloud{} syncPeriod = getSyncPeriod(syncPeriod) - ctrl := persistentvolumecontroller.NewPersistentVolumeController( + ctrl := persistentvolumecontroller.NewPersistentVolumeControllerFromClient( binderClient, syncPeriod, nil, // alpha provisioner