Use PV shared informer in PV controller

pull/6/head
Matthew Wong 2016-08-25 21:55:23 -04:00
parent d836b248b2
commit 1d6dbdd9d2
6 changed files with 80 additions and 44 deletions

View File

@ -430,7 +430,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
cloud,
s.ClusterName,
nil, // volumeSource
sharedInformers.PersistentVolumes().Informer(),
nil, // claimSource
nil, // classSource
nil, // eventRecorder

View File

@ -289,7 +289,7 @@ func (s *CMServer) Run(_ []string) error {
if err != nil {
glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
}
volumeController := persistentvolumecontroller.NewPersistentVolumeController(
volumeController := persistentvolumecontroller.NewPersistentVolumeControllerFromClient(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")),
s.PVClaimBinderSyncPeriod.Duration,
alphaProvisioner,

View File

@ -153,8 +153,8 @@ const createProvisionedPVInterval = 10 * time.Second
// framework.Controllers that watch PersistentVolume and PersistentVolumeClaim
// changes.
type PersistentVolumeController struct {
volumeController *framework.Controller
volumeSource cache.ListerWatcher
volumeController framework.ControllerInterface
pvInformer framework.SharedIndexInformer
claimController *framework.Controller
claimSource cache.ListerWatcher
classReflector *cache.Reflector
@ -175,6 +175,13 @@ type PersistentVolumeController struct {
claims cache.Store
classes cache.Store
// isInformerInternal is true if the informer we hold is a personal informer,
// false if it is a shared informer. If we're using a normal shared informer,
// then the informer will be started for us. If we have a personal informer,
// we must start it ourselves. If you start the controller using
// NewPersistentVolumeController(passing SharedInformer), this will be false.
isInformerInternal bool
// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap

View File

@ -38,6 +38,8 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/util/wait"
)
// This file contains the controller base functionality, i.e. framework to
@ -52,7 +54,8 @@ func NewPersistentVolumeController(
volumePlugins []vol.VolumePlugin,
cloud cloudprovider.Interface,
clusterName string,
volumeSource, claimSource, classSource cache.ListerWatcher,
pvInformer framework.SharedIndexInformer,
claimSource, classSource cache.ListerWatcher,
eventRecorder record.EventRecorder,
enableDynamicProvisioning bool,
) *PersistentVolumeController {
@ -84,17 +87,8 @@ func NewPersistentVolumeController(
}
}
if volumeSource == nil {
volumeSource = &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().PersistentVolumes().Watch(options)
},
}
}
controller.volumeSource = volumeSource
controller.pvInformer = pvInformer
controller.isInformerInternal = false
if claimSource == nil {
claimSource = &cache.ListWatch{
@ -120,17 +114,8 @@ func NewPersistentVolumeController(
}
controller.classSource = classSource
_, controller.volumeController = framework.NewIndexerInformer(
volumeSource,
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: controller.addVolume,
UpdateFunc: controller.updateVolume,
DeleteFunc: controller.deleteVolume,
},
cache.Indexers{"accessmodes": accessModesIndexFunc},
)
controller.volumeController = pvInformer.GetController()
_, controller.claimController = framework.NewInformer(
claimSource,
&api.PersistentVolumeClaim{},
@ -154,25 +139,55 @@ func NewPersistentVolumeController(
return controller
}
// NewPersistentVolumeControllerFromClient returns a new
// *PersistentVolumeController that runs its own informer.
func NewPersistentVolumeControllerFromClient(
kubeClient clientset.Interface,
syncPeriod time.Duration,
alphaProvisioner vol.ProvisionableVolumePlugin,
volumePlugins []vol.VolumePlugin,
cloud cloudprovider.Interface,
clusterName string,
volumeSource, claimSource, classSource cache.ListerWatcher,
eventRecorder record.EventRecorder,
enableDynamicProvisioning bool,
) *PersistentVolumeController {
pvInformer := informers.NewPVInformer(kubeClient, syncPeriod)
if volumeSource != nil {
pvInformer = framework.NewSharedIndexInformer(volumeSource, &api.PersistentVolume{}, syncPeriod, cache.Indexers{"accessmodes": accessModesIndexFunc})
}
ctrl := NewPersistentVolumeController(
kubeClient,
syncPeriod,
alphaProvisioner,
volumePlugins,
cloud,
clusterName,
pvInformer,
claimSource,
classSource,
eventRecorder,
enableDynamicProvisioning,
)
ctrl.isInformerInternal = true
return ctrl
}
// initializeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addClaim/addVolume to
// perform initial synchronization of the controller.
func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) {
volumeListObj, err := volumeSource.List(api.ListOptions{})
if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
volumeList, ok := volumeListObj.(*api.PersistentVolumeList)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", volumeListObj)
return
}
for _, volume := range volumeList.Items {
func (ctrl *PersistentVolumeController) initializeCaches(volumeStore cache.Store, claimSource cache.ListerWatcher) {
volumeList := volumeStore.List()
for _, obj := range volumeList {
volume, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", obj)
}
// Ignore template volumes from kubernetes 1.2
deleted := ctrl.upgradeVolumeFrom1_2(&volume)
deleted := ctrl.upgradeVolumeFrom1_2(volume)
if !deleted {
clone, err := conversion.NewCloner().DeepCopy(&volume)
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
glog.Errorf("error cloning volume %q: %v", volume.Name, err)
continue
@ -444,7 +459,21 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
glog.V(4).Infof("starting PersistentVolumeController")
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
if ctrl.isInformerInternal {
go ctrl.pvInformer.Run(stopCh)
// Wait to avoid data race between Run and AddEventHandler in tests
wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
return ctrl.pvInformer.HasSynced(), nil
})
}
ctrl.initializeCaches(ctrl.pvInformer.GetStore(), ctrl.claimSource)
// AddEventHandler will send synthetic add events which we don't want until
// we have initialized the caches
ctrl.pvInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: ctrl.addVolume,
UpdateFunc: ctrl.updateVolume,
DeleteFunc: ctrl.deleteVolume,
})
go ctrl.volumeController.Run(stopCh)
go ctrl.claimController.Run(stopCh)
go ctrl.classReflector.RunUntil(stopCh)

View File

@ -594,7 +594,7 @@ func newTestController(kubeClient clientset.Interface, volumeSource, claimSource
if classSource == nil {
classSource = framework.NewFakeControllerSource()
}
ctrl := NewPersistentVolumeController(
ctrl := NewPersistentVolumeControllerFromClient(
kubeClient,
5*time.Second, // sync period
nil, // alpha provisioner

View File

@ -1126,7 +1126,7 @@ func createClients(ns *api.Namespace, t *testing.T, s *httptest.Server, syncPeri
cloud := &fake_cloud.FakeCloud{}
syncPeriod = getSyncPeriod(syncPeriod)
ctrl := persistentvolumecontroller.NewPersistentVolumeController(
ctrl := persistentvolumecontroller.NewPersistentVolumeControllerFromClient(
binderClient,
syncPeriod,
nil, // alpha provisioner