Merge pull request #26518 from jsafrane/initial-sync

Automatic merge from submit-queue

Fill controller caches on startup

The controller needs to fill its caches before it starts binding/recycling/ deleting or provisioning volumes and claims. This was done using blocking initial 'xxx added' from going through syncClaim/syncVolume. However, when the caches were full, the controller waited for the next sync period to do actual binding/recycling etc.

In this patch, the controller fills its caches directly from etcd and then processes initial 'xxx added' events to reconcile the world and bind/recycle/ delete/provision stuff, resulting in faster binding after startup.

Fixes #25967 (properly)
pull/6/head
k8s-merge-robot 2016-06-02 21:44:56 -07:00
commit a41d84408c
3 changed files with 45 additions and 33 deletions

View File

@ -108,8 +108,10 @@ const createProvisionedPVInterval = 10 * time.Second
type PersistentVolumeController struct {
volumeController *framework.Controller
volumeControllerStopCh chan struct{}
volumeSource cache.ListerWatcher
claimController *framework.Controller
claimControllerStopCh chan struct{}
claimSource cache.ListerWatcher
kubeClient clientset.Interface
eventRecorder record.EventRecorder
cloud cloudprovider.Interface

View File

@ -89,6 +89,7 @@ func NewPersistentVolumeController(
},
}
}
controller.volumeSource = volumeSource
if claimSource == nil {
claimSource = &cache.ListWatch{
@ -100,6 +101,7 @@ func NewPersistentVolumeController(
},
}
}
controller.claimSource = claimSource
_, controller.volumeController = framework.NewIndexerInformer(
volumeSource,
@ -125,6 +127,40 @@ func NewPersistentVolumeController(
return controller
}
// initalizeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addClaim/addVolume to
// perform initial synchronization of the controller.
func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) {
volumeListObj, err := volumeSource.List(api.ListOptions{})
if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
volumeList, ok := volumeListObj.(*api.List)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %+v", volumeListObj)
return
}
for _, volume := range volumeList.Items {
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
}
claimListObj, err := claimSource.List(api.ListOptions{})
if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
claimList, ok := claimListObj.(*api.List)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", volumeListObj)
return
}
for _, claim := range claimList.Items {
storeObjectUpdate(ctrl.claims, claim, "claim")
}
glog.V(4).Infof("controller initialized")
}
// addVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
@ -138,10 +174,6 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
return
}
if !ctrl.isFullySynced() {
return
}
pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
@ -171,10 +203,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
return
}
if !ctrl.isFullySynced() {
return
}
newVolume, ok := newObj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
@ -196,10 +224,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
_ = ctrl.volumes.store.Delete(obj)
if !ctrl.isFullySynced() {
return
}
var volume *api.PersistentVolume
var ok bool
volume, ok = obj.(*api.PersistentVolume)
@ -220,6 +244,8 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
return
}
glog.V(4).Infof("volume %q deleted", volume.Name)
if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists {
if claim, ok := claimObj.(*api.PersistentVolumeClaim); ok && claim != nil {
// sync the claim when its volume is deleted. Explicitly syncing the
@ -254,10 +280,6 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
return
}
if !ctrl.isFullySynced() {
return
}
claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
@ -287,10 +309,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{})
return
}
if !ctrl.isFullySynced() {
return
}
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
@ -312,10 +330,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{})
func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
_ = ctrl.claims.Delete(obj)
if !ctrl.isFullySynced() {
return
}
var volume *api.PersistentVolume
var claim *api.PersistentVolumeClaim
var ok bool
@ -337,6 +351,7 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
if !ok || claim == nil {
return
}
glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim))
if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists {
if volume, ok = pvObj.(*api.PersistentVolume); ok {
@ -365,6 +380,8 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
func (ctrl *PersistentVolumeController) Run() {
glog.V(4).Infof("starting PersistentVolumeController")
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
if ctrl.volumeControllerStopCh == nil {
ctrl.volumeControllerStopCh = make(chan struct{})
go ctrl.volumeController.Run(ctrl.volumeControllerStopCh)
@ -383,14 +400,6 @@ func (ctrl *PersistentVolumeController) Stop() {
close(ctrl.claimControllerStopCh)
}
// isFullySynced returns true, if both volume and claim caches are fully loaded
// after startup.
// We do not want to process events with not fully loaded caches - e.g. we might
// recycle/delete PVs that don't have corresponding claim in the cache yet.
func (ctrl *PersistentVolumeController) isFullySynced() bool {
return ctrl.volumeController.HasSynced() && ctrl.claimController.HasSynced()
}
// Stateless functions
func hasAnnotation(obj api.ObjectMeta, ann string) bool {

View File

@ -112,9 +112,10 @@ func TestControllerSync(t *testing.T) {
go ctrl.Run()
// Wait for the controller to pass initial sync.
for !ctrl.isFullySynced() {
for !ctrl.volumeController.HasSynced() || !ctrl.claimController.HasSynced() {
time.Sleep(10 * time.Millisecond)
}
glog.V(4).Infof("controller synced, starting test")
count := reactor.getChangeCount()