mirror of https://github.com/k3s-io/k3s
Fill controller caches on startup
The controller needs to fill its caches before it starts binding/recycling/ deleting or provisioning volumes and claims. This was done using blocking initial 'xxx added' from going through syncClaim/syncVolume. However, when the caches were full, the controller waited for the next sync period to do actual binding/recycling etc. In this patch, the controller fills its caches directly from etcd and then processes initial 'xxx added' events to reconcile the world and bind/recycle/ delete/provision stuff, resulting in faster binding after startup. Fixes #25967 (properly)pull/6/head
parent
5643b7498f
commit
df161c3a7e
|
@ -108,8 +108,10 @@ const createProvisionedPVInterval = 10 * time.Second
|
|||
type PersistentVolumeController struct {
|
||||
volumeController *framework.Controller
|
||||
volumeControllerStopCh chan struct{}
|
||||
volumeSource cache.ListerWatcher
|
||||
claimController *framework.Controller
|
||||
claimControllerStopCh chan struct{}
|
||||
claimSource cache.ListerWatcher
|
||||
kubeClient clientset.Interface
|
||||
eventRecorder record.EventRecorder
|
||||
cloud cloudprovider.Interface
|
||||
|
|
|
@ -89,6 +89,7 @@ func NewPersistentVolumeController(
|
|||
},
|
||||
}
|
||||
}
|
||||
controller.volumeSource = volumeSource
|
||||
|
||||
if claimSource == nil {
|
||||
claimSource = &cache.ListWatch{
|
||||
|
@ -100,6 +101,7 @@ func NewPersistentVolumeController(
|
|||
},
|
||||
}
|
||||
}
|
||||
controller.claimSource = claimSource
|
||||
|
||||
_, controller.volumeController = framework.NewIndexerInformer(
|
||||
volumeSource,
|
||||
|
@ -125,6 +127,40 @@ func NewPersistentVolumeController(
|
|||
return controller
|
||||
}
|
||||
|
||||
// initalizeCaches fills all controller caches with initial data from etcd in
|
||||
// order to have the caches already filled when first addClaim/addVolume to
|
||||
// perform initial synchronization of the controller.
|
||||
func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) {
|
||||
volumeListObj, err := volumeSource.List(api.ListOptions{})
|
||||
if err != nil {
|
||||
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
|
||||
return
|
||||
}
|
||||
volumeList, ok := volumeListObj.(*api.List)
|
||||
if !ok {
|
||||
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %+v", volumeListObj)
|
||||
return
|
||||
}
|
||||
for _, volume := range volumeList.Items {
|
||||
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
|
||||
}
|
||||
|
||||
claimListObj, err := claimSource.List(api.ListOptions{})
|
||||
if err != nil {
|
||||
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
|
||||
return
|
||||
}
|
||||
claimList, ok := claimListObj.(*api.List)
|
||||
if !ok {
|
||||
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", volumeListObj)
|
||||
return
|
||||
}
|
||||
for _, claim := range claimList.Items {
|
||||
storeObjectUpdate(ctrl.claims, claim, "claim")
|
||||
}
|
||||
glog.V(4).Infof("controller initialized")
|
||||
}
|
||||
|
||||
// addVolume is callback from framework.Controller watching PersistentVolume
|
||||
// events.
|
||||
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
||||
|
@ -138,10 +174,6 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
if !ctrl.isFullySynced() {
|
||||
return
|
||||
}
|
||||
|
||||
pv, ok := obj.(*api.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
|
||||
|
@ -171,10 +203,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
|
|||
return
|
||||
}
|
||||
|
||||
if !ctrl.isFullySynced() {
|
||||
return
|
||||
}
|
||||
|
||||
newVolume, ok := newObj.(*api.PersistentVolume)
|
||||
if !ok {
|
||||
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
|
||||
|
@ -196,10 +224,6 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
|
|||
func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
|
||||
_ = ctrl.volumes.store.Delete(obj)
|
||||
|
||||
if !ctrl.isFullySynced() {
|
||||
return
|
||||
}
|
||||
|
||||
var volume *api.PersistentVolume
|
||||
var ok bool
|
||||
volume, ok = obj.(*api.PersistentVolume)
|
||||
|
@ -220,6 +244,8 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("volume %q deleted", volume.Name)
|
||||
|
||||
if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists {
|
||||
if claim, ok := claimObj.(*api.PersistentVolumeClaim); ok && claim != nil {
|
||||
// sync the claim when its volume is deleted. Explicitly syncing the
|
||||
|
@ -254,10 +280,6 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
if !ctrl.isFullySynced() {
|
||||
return
|
||||
}
|
||||
|
||||
claim, ok := obj.(*api.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
|
||||
|
@ -287,10 +309,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{})
|
|||
return
|
||||
}
|
||||
|
||||
if !ctrl.isFullySynced() {
|
||||
return
|
||||
}
|
||||
|
||||
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
|
||||
|
@ -312,10 +330,6 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{})
|
|||
func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
|
||||
_ = ctrl.claims.Delete(obj)
|
||||
|
||||
if !ctrl.isFullySynced() {
|
||||
return
|
||||
}
|
||||
|
||||
var volume *api.PersistentVolume
|
||||
var claim *api.PersistentVolumeClaim
|
||||
var ok bool
|
||||
|
@ -337,6 +351,7 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
|
|||
if !ok || claim == nil {
|
||||
return
|
||||
}
|
||||
glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim))
|
||||
|
||||
if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists {
|
||||
if volume, ok = pvObj.(*api.PersistentVolume); ok {
|
||||
|
@ -365,6 +380,8 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
|
|||
func (ctrl *PersistentVolumeController) Run() {
|
||||
glog.V(4).Infof("starting PersistentVolumeController")
|
||||
|
||||
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
|
||||
|
||||
if ctrl.volumeControllerStopCh == nil {
|
||||
ctrl.volumeControllerStopCh = make(chan struct{})
|
||||
go ctrl.volumeController.Run(ctrl.volumeControllerStopCh)
|
||||
|
@ -383,14 +400,6 @@ func (ctrl *PersistentVolumeController) Stop() {
|
|||
close(ctrl.claimControllerStopCh)
|
||||
}
|
||||
|
||||
// isFullySynced returns true, if both volume and claim caches are fully loaded
|
||||
// after startup.
|
||||
// We do not want to process events with not fully loaded caches - e.g. we might
|
||||
// recycle/delete PVs that don't have corresponding claim in the cache yet.
|
||||
func (ctrl *PersistentVolumeController) isFullySynced() bool {
|
||||
return ctrl.volumeController.HasSynced() && ctrl.claimController.HasSynced()
|
||||
}
|
||||
|
||||
// Stateless functions
|
||||
|
||||
func hasAnnotation(obj api.ObjectMeta, ann string) bool {
|
||||
|
|
|
@ -146,9 +146,10 @@ func TestControllerSync(t *testing.T) {
|
|||
go ctrl.Run()
|
||||
|
||||
// Wait for the controller to pass initial sync.
|
||||
for !ctrl.isFullySynced() {
|
||||
for !ctrl.volumeController.HasSynced() || !ctrl.claimController.HasSynced() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
glog.V(4).Infof("controller synced, starting test")
|
||||
|
||||
count := reactor.getChangeCount()
|
||||
|
||||
|
|
Loading…
Reference in New Issue