mirror of https://github.com/k3s-io/k3s
Merge pull request #38615 from jsafrane/worker-thread
Automatic merge from submit-queue (batch tested with PRs 39150, 38615) Add work queues to PV controller PV controller should not use Controller.Requeue, as as it is not available in shared informers. We need to implement our own work queues instead, where we can enqueue volumes/claims as we want.pull/6/head
commit
76dfee04f5
|
@ -405,7 +405,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
|
||||||
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
|
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
|
||||||
}
|
}
|
||||||
volumeController := persistentvolumecontroller.NewController(params)
|
volumeController := persistentvolumecontroller.NewController(params)
|
||||||
volumeController.Run(stop)
|
go volumeController.Run(stop)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
|
||||||
attachDetachController, attachDetachControllerErr :=
|
attachDetachController, attachDetachControllerErr :=
|
||||||
|
|
|
@ -30,12 +30,15 @@ go_library(
|
||||||
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
|
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
|
||||||
"//pkg/client/record:go_default_library",
|
"//pkg/client/record:go_default_library",
|
||||||
"//pkg/cloudprovider:go_default_library",
|
"//pkg/cloudprovider:go_default_library",
|
||||||
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/labels:go_default_library",
|
"//pkg/labels:go_default_library",
|
||||||
"//pkg/runtime:go_default_library",
|
"//pkg/runtime:go_default_library",
|
||||||
"//pkg/types:go_default_library",
|
"//pkg/types:go_default_library",
|
||||||
"//pkg/util/goroutinemap:go_default_library",
|
"//pkg/util/goroutinemap:go_default_library",
|
||||||
"//pkg/util/io:go_default_library",
|
"//pkg/util/io:go_default_library",
|
||||||
"//pkg/util/mount:go_default_library",
|
"//pkg/util/mount:go_default_library",
|
||||||
|
"//pkg/util/wait:go_default_library",
|
||||||
|
"//pkg/util/workqueue:go_default_library",
|
||||||
"//pkg/volume:go_default_library",
|
"//pkg/volume:go_default_library",
|
||||||
"//pkg/watch:go_default_library",
|
"//pkg/watch:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
|
|
|
@ -32,6 +32,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
||||||
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
vol "k8s.io/kubernetes/pkg/volume"
|
vol "k8s.io/kubernetes/pkg/volume"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
@ -146,8 +147,10 @@ const createProvisionedPVInterval = 10 * time.Second
|
||||||
// changes.
|
// changes.
|
||||||
type PersistentVolumeController struct {
|
type PersistentVolumeController struct {
|
||||||
volumeController *cache.Controller
|
volumeController *cache.Controller
|
||||||
|
volumeInformer cache.Indexer
|
||||||
volumeSource cache.ListerWatcher
|
volumeSource cache.ListerWatcher
|
||||||
claimController *cache.Controller
|
claimController *cache.Controller
|
||||||
|
claimInformer cache.Store
|
||||||
claimSource cache.ListerWatcher
|
claimSource cache.ListerWatcher
|
||||||
classReflector *cache.Reflector
|
classReflector *cache.Reflector
|
||||||
classSource cache.ListerWatcher
|
classSource cache.ListerWatcher
|
||||||
|
@ -163,10 +166,34 @@ type PersistentVolumeController struct {
|
||||||
// must be cloned before any modification. These caches get updated both by
|
// must be cloned before any modification. These caches get updated both by
|
||||||
// "xxx added/updated/deleted" events from etcd and by the controller when
|
// "xxx added/updated/deleted" events from etcd and by the controller when
|
||||||
// it saves newer version to etcd.
|
// it saves newer version to etcd.
|
||||||
|
// Why local cache: binding a volume to a claim generates 4 events, roughly
|
||||||
|
// in this order (depends on goroutine ordering):
|
||||||
|
// - volume.Spec update
|
||||||
|
// - volume.Status update
|
||||||
|
// - claim.Spec update
|
||||||
|
// - claim.Status update
|
||||||
|
// With these caches, the controller can check that it has already saved
|
||||||
|
// volume.Status and claim.Spec+Status and does not need to do anything
|
||||||
|
// when e.g. volume.Spec update event arrives before all the other events.
|
||||||
|
// Without this cache, it would see the old version of volume.Status and
|
||||||
|
// claim in the informers (it has not been updated from API server events
|
||||||
|
// yet) and it would try to fix these objects to be bound together.
|
||||||
|
// Any write to API server would fail with version conflict - these objects
|
||||||
|
// have been already written.
|
||||||
volumes persistentVolumeOrderedIndex
|
volumes persistentVolumeOrderedIndex
|
||||||
claims cache.Store
|
claims cache.Store
|
||||||
classes cache.Store
|
classes cache.Store
|
||||||
|
|
||||||
|
// Work queues of claims and volumes to process. Every queue should have
|
||||||
|
// exactly one worker thread, especially syncClaim() is not reentrant.
|
||||||
|
// Two syncClaims could bind two different claims to the same volume or one
|
||||||
|
// claim to two volumes. The controller would recover from this (due to
|
||||||
|
// version errors in API server and other checks in this controller),
|
||||||
|
// however overall speed of multi-worker controller would be lower than if
|
||||||
|
// it runs single thread only.
|
||||||
|
claimQueue *workqueue.Type
|
||||||
|
volumeQueue *workqueue.Type
|
||||||
|
|
||||||
// Map of scheduled/running operations.
|
// Map of scheduled/running operations.
|
||||||
runningOperations goroutinemap.GoRoutineMap
|
runningOperations goroutinemap.GoRoutineMap
|
||||||
|
|
||||||
|
@ -463,19 +490,11 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume)
|
||||||
// In both cases, the volume is Bound and the claim is Pending.
|
// In both cases, the volume is Bound and the claim is Pending.
|
||||||
// Next syncClaim will fix it. To speed it up, we enqueue the claim
|
// Next syncClaim will fix it. To speed it up, we enqueue the claim
|
||||||
// into the controller, which results in syncClaim to be called
|
// into the controller, which results in syncClaim to be called
|
||||||
// shortly (and in the right goroutine).
|
// shortly (and in the right worker goroutine).
|
||||||
// This speeds up binding of provisioned volumes - provisioner saves
|
// This speeds up binding of provisioned volumes - provisioner saves
|
||||||
// only the new PV and it expects that next syncClaim will bind the
|
// only the new PV and it expects that next syncClaim will bind the
|
||||||
// claim to it.
|
// claim to it.
|
||||||
clone, err := api.Scheme.DeepCopy(claim)
|
ctrl.claimQueue.Add(claimToClaimKey(claim))
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error cloning claim %q: %v", claimToClaimKey(claim), err)
|
|
||||||
}
|
|
||||||
glog.V(5).Infof("requeueing claim %q for faster syncClaim", claimToClaimKey(claim))
|
|
||||||
err = ctrl.claimController.Requeue(clone)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error enqueing claim %q for faster sync: %v", claimToClaimKey(claim), err)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
} else if claim.Spec.VolumeName == volume.Name {
|
} else if claim.Spec.VolumeName == volume.Name {
|
||||||
// Volume is bound to a claim properly, update status if necessary
|
// Volume is bound to a claim properly, update status if necessary
|
||||||
|
|
|
@ -31,8 +31,11 @@ import (
|
||||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
|
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
"k8s.io/kubernetes/pkg/util/goroutinemap"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
vol "k8s.io/kubernetes/pkg/volume"
|
vol "k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
|
||||||
|
@ -78,6 +81,8 @@ func NewController(p ControllerParameters) *PersistentVolumeController {
|
||||||
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
|
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
|
||||||
createProvisionedPVInterval: createProvisionedPVInterval,
|
createProvisionedPVInterval: createProvisionedPVInterval,
|
||||||
alphaProvisioner: p.AlphaProvisioner,
|
alphaProvisioner: p.AlphaProvisioner,
|
||||||
|
claimQueue: workqueue.NewNamed("claims"),
|
||||||
|
volumeQueue: workqueue.NewNamed("volumes"),
|
||||||
}
|
}
|
||||||
|
|
||||||
controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller)
|
controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller)
|
||||||
|
@ -126,25 +131,25 @@ func NewController(p ControllerParameters) *PersistentVolumeController {
|
||||||
}
|
}
|
||||||
controller.classSource = classSource
|
controller.classSource = classSource
|
||||||
|
|
||||||
_, controller.volumeController = cache.NewIndexerInformer(
|
controller.volumeInformer, controller.volumeController = cache.NewIndexerInformer(
|
||||||
volumeSource,
|
volumeSource,
|
||||||
&v1.PersistentVolume{},
|
&v1.PersistentVolume{},
|
||||||
p.SyncPeriod,
|
p.SyncPeriod,
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: controller.addVolume,
|
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
|
||||||
UpdateFunc: controller.updateVolume,
|
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
|
||||||
DeleteFunc: controller.deleteVolume,
|
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
|
||||||
},
|
},
|
||||||
cache.Indexers{"accessmodes": accessModesIndexFunc},
|
cache.Indexers{"accessmodes": accessModesIndexFunc},
|
||||||
)
|
)
|
||||||
_, controller.claimController = cache.NewInformer(
|
controller.claimInformer, controller.claimController = cache.NewInformer(
|
||||||
claimSource,
|
claimSource,
|
||||||
&v1.PersistentVolumeClaim{},
|
&v1.PersistentVolumeClaim{},
|
||||||
p.SyncPeriod,
|
p.SyncPeriod,
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: controller.addClaim,
|
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
|
||||||
UpdateFunc: controller.updateClaim,
|
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
|
||||||
DeleteFunc: controller.deleteClaim,
|
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -210,31 +215,40 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
|
||||||
glog.V(4).Infof("controller initialized")
|
glog.V(4).Infof("controller initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume *v1.PersistentVolume) (bool, error) {
|
// enqueueWork adds volume or claim to given work queue.
|
||||||
|
func (ctrl *PersistentVolumeController) enqueueWork(queue workqueue.Interface, obj interface{}) {
|
||||||
|
// Beware of "xxx deleted" events
|
||||||
|
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
||||||
|
obj = unknown.Obj
|
||||||
|
}
|
||||||
|
objName, err := controller.KeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("failed to get key from object: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
glog.V(5).Infof("enqueued %q for sync", objName)
|
||||||
|
queue.Add(objName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume interface{}) (bool, error) {
|
||||||
return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
|
return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctrl *PersistentVolumeController) storeClaimUpdate(claim *v1.PersistentVolumeClaim) (bool, error) {
|
func (ctrl *PersistentVolumeController) storeClaimUpdate(claim interface{}) (bool, error) {
|
||||||
return storeObjectUpdate(ctrl.claims, claim, "claim")
|
return storeObjectUpdate(ctrl.claims, claim, "claim")
|
||||||
}
|
}
|
||||||
|
|
||||||
// addVolume is callback from cache.Controller watching PersistentVolume
|
// updateVolume runs in worker thread and handles "volume added",
|
||||||
// events.
|
// "volume updated" and "periodic sync" events.
|
||||||
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {
|
||||||
pv, ok := obj.(*v1.PersistentVolume)
|
if deleted := ctrl.upgradeVolumeFrom1_2(volume); deleted {
|
||||||
if !ok {
|
|
||||||
glog.Errorf("expected PersistentVolume but handler received %#v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if ctrl.upgradeVolumeFrom1_2(pv) {
|
|
||||||
// volume deleted
|
// volume deleted
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the new volume version in the cache and do not process it if this
|
// Store the new volume version in the cache and do not process it if this
|
||||||
// is an old version.
|
// is an old version.
|
||||||
new, err := ctrl.storeVolumeUpdate(pv)
|
new, err := ctrl.storeVolumeUpdate(volume)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("%v", err)
|
glog.Errorf("%v", err)
|
||||||
}
|
}
|
||||||
|
@ -242,111 +256,39 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ctrl.syncVolume(pv); err != nil {
|
err = ctrl.syncVolume(volume)
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the controller
|
|
||||||
// recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateVolume is callback from cache.Controller watching PersistentVolume
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) {
|
|
||||||
newVolume, ok := newObj.(*v1.PersistentVolume)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolume but handler received %#v", newObj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if ctrl.upgradeVolumeFrom1_2(newVolume) {
|
|
||||||
// volume deleted
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store the new volume version in the cache and do not process it if this
|
|
||||||
// is an old version.
|
|
||||||
new, err := ctrl.storeVolumeUpdate(newVolume)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("%v", err)
|
|
||||||
}
|
|
||||||
if !new {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ctrl.syncVolume(newVolume); err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
if errors.IsConflict(err) {
|
||||||
// Version conflict error happens quite often and the controller
|
// Version conflict error happens quite often and the controller
|
||||||
// recovers from it easily.
|
// recovers from it easily.
|
||||||
glog.V(3).Infof("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
|
glog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err)
|
||||||
} else {
|
} else {
|
||||||
glog.Errorf("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
|
glog.Errorf("could not sync volume %q: %+v", volume.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteVolume is callback from cache.Controller watching PersistentVolume
|
// deleteVolume runs in worker thread and handles "volume deleted" event.
|
||||||
// events.
|
func (ctrl *PersistentVolumeController) deleteVolume(volume *v1.PersistentVolume) {
|
||||||
func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
|
_ = ctrl.volumes.store.Delete(volume)
|
||||||
_ = ctrl.volumes.store.Delete(obj)
|
|
||||||
|
|
||||||
var volume *v1.PersistentVolume
|
|
||||||
var ok bool
|
|
||||||
volume, ok = obj.(*v1.PersistentVolume)
|
|
||||||
if !ok {
|
|
||||||
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
|
||||||
volume, ok = unknown.Obj.(*v1.PersistentVolume)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolume but deleteVolume received %#v", unknown.Obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Expected PersistentVolume but deleteVolume received %+v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if volume == nil || volume.Spec.ClaimRef == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(4).Infof("volume %q deleted", volume.Name)
|
glog.V(4).Infof("volume %q deleted", volume.Name)
|
||||||
|
|
||||||
if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists {
|
if volume.Spec.ClaimRef == nil {
|
||||||
if claim, ok := claimObj.(*v1.PersistentVolumeClaim); ok && claim != nil {
|
|
||||||
// sync the claim when its volume is deleted. Explicitly syncing the
|
|
||||||
// claim here in response to volume deletion prevents the claim from
|
|
||||||
// waiting until the next sync period for its Lost status.
|
|
||||||
err := ctrl.syncClaim(claim)
|
|
||||||
if err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the
|
|
||||||
// controller recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteVolume handler: %+v", claimToClaimKey(claim), err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not update volume %q from deleteVolume handler: %+v", claimToClaimKey(claim), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Cannot convert object from claim cache to claim %q!?: %#v", claimrefToClaimKey(volume.Spec.ClaimRef), claimObj)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// addClaim is callback from cache.Controller watching PersistentVolumeClaim
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
|
|
||||||
// Store the new claim version in the cache and do not process it if this is
|
|
||||||
// an old version.
|
|
||||||
claim, ok := obj.(*v1.PersistentVolumeClaim)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// sync the claim when its volume is deleted. Explicitly syncing the
|
||||||
|
// claim here in response to volume deletion prevents the claim from
|
||||||
|
// waiting until the next sync period for its Lost status.
|
||||||
|
claimKey := claimrefToClaimKey(volume.Spec.ClaimRef)
|
||||||
|
glog.V(5).Infof("deleteVolume[%s]: scheduling sync of claim %q", volume.Name, claimKey)
|
||||||
|
ctrl.claimQueue.Add(claimKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateClaim runs in worker thread and handles "claim added",
|
||||||
|
// "claim updated" and "periodic sync" events.
|
||||||
|
func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {
|
||||||
|
// Store the new claim version in the cache and do not process it if this is
|
||||||
|
// an old version.
|
||||||
new, err := ctrl.storeClaimUpdate(claim)
|
new, err := ctrl.storeClaimUpdate(claim)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("%v", err)
|
glog.Errorf("%v", err)
|
||||||
|
@ -354,106 +296,162 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
|
||||||
if !new {
|
if !new {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
err = ctrl.syncClaim(claim)
|
||||||
if err := ctrl.syncClaim(claim); err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the controller
|
|
||||||
// recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateClaim is callback from cache.Controller watching PersistentVolumeClaim
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) {
|
|
||||||
// Store the new claim version in the cache and do not process it if this is
|
|
||||||
// an old version.
|
|
||||||
newClaim, ok := newObj.(*v1.PersistentVolumeClaim)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
new, err := ctrl.storeClaimUpdate(newClaim)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("%v", err)
|
|
||||||
}
|
|
||||||
if !new {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ctrl.syncClaim(newClaim); err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
if errors.IsConflict(err) {
|
||||||
// Version conflict error happens quite often and the controller
|
// Version conflict error happens quite often and the controller
|
||||||
// recovers from it easily.
|
// recovers from it easily.
|
||||||
glog.V(3).Infof("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
|
glog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)
|
||||||
} else {
|
} else {
|
||||||
glog.Errorf("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
|
glog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteClaim is callback from cache.Controller watching PersistentVolumeClaim
|
// deleteClaim runs in worker thread and handles "claim deleted" event.
|
||||||
// events.
|
func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeClaim) {
|
||||||
func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
|
_ = ctrl.claims.Delete(claim)
|
||||||
_ = ctrl.claims.Delete(obj)
|
|
||||||
|
|
||||||
var volume *v1.PersistentVolume
|
|
||||||
var claim *v1.PersistentVolumeClaim
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
claim, ok = obj.(*v1.PersistentVolumeClaim)
|
|
||||||
if !ok {
|
|
||||||
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
|
||||||
claim, ok = unknown.Obj.(*v1.PersistentVolumeClaim)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %#v", unknown.Obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %#v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if claim == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim))
|
glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim))
|
||||||
|
|
||||||
if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists {
|
// sync the volume when its claim is deleted. Explicitly sync'ing the
|
||||||
if volume, ok = pvObj.(*v1.PersistentVolume); ok {
|
// volume here in response to claim deletion prevents the volume from
|
||||||
// sync the volume when its claim is deleted. Explicitly sync'ing the
|
// waiting until the next sync period for its Release.
|
||||||
// volume here in response to claim deletion prevents the volume from
|
volumeName := claim.Spec.VolumeName
|
||||||
// waiting until the next sync period for its Release.
|
glog.V(5).Infof("deleteClaim[%s]: scheduling sync of volume %q", claimToClaimKey(claim), volumeName)
|
||||||
if volume != nil {
|
ctrl.volumeQueue.Add(volumeName)
|
||||||
err := ctrl.syncVolume(volume)
|
|
||||||
if err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the
|
|
||||||
// controller recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, pvObj)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts all of this controller's control loops
|
// Run starts all of this controller's control loops
|
||||||
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
|
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
|
||||||
glog.V(4).Infof("starting PersistentVolumeController")
|
glog.V(1).Infof("starting PersistentVolumeController")
|
||||||
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
|
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
|
||||||
go ctrl.volumeController.Run(stopCh)
|
go ctrl.volumeController.Run(stopCh)
|
||||||
go ctrl.claimController.Run(stopCh)
|
go ctrl.claimController.Run(stopCh)
|
||||||
go ctrl.classReflector.RunUntil(stopCh)
|
go ctrl.classReflector.RunUntil(stopCh)
|
||||||
|
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
|
||||||
|
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
|
||||||
|
|
||||||
|
<-stopCh
|
||||||
|
|
||||||
|
ctrl.claimQueue.ShutDown()
|
||||||
|
ctrl.volumeQueue.ShutDown()
|
||||||
|
}
|
||||||
|
|
||||||
|
// volumeWorker processes items from volumeQueue. It must run only once,
|
||||||
|
// syncVolume is not assured to be reentrant.
|
||||||
|
func (ctrl *PersistentVolumeController) volumeWorker() {
|
||||||
|
workFunc := func() bool {
|
||||||
|
keyObj, quit := ctrl.volumeQueue.Get()
|
||||||
|
if quit {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
defer ctrl.volumeQueue.Done(keyObj)
|
||||||
|
key := keyObj.(string)
|
||||||
|
glog.V(5).Infof("volumeWorker[%s]", key)
|
||||||
|
|
||||||
|
volumeObj, found, err := ctrl.volumeInformer.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(2).Infof("error getting volume %q from informer: %v", key, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if found {
|
||||||
|
// The volume still exists in informer cache, the event must have
|
||||||
|
// been add/update/sync
|
||||||
|
volume, ok := volumeObj.(*v1.PersistentVolume)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("expected volume, got %+v", volumeObj)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ctrl.updateVolume(volume)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// The volume is not in informer cache, the event must have been
|
||||||
|
// "delete"
|
||||||
|
volumeObj, found, err = ctrl.volumes.store.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(2).Infof("error getting volume %q from cache: %v", key, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
// The controller has already processed the delete event and
|
||||||
|
// deleted the volume from its cache
|
||||||
|
glog.V(2).Infof("deletion of volume %q was already processed", key)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
volume, ok := volumeObj.(*v1.PersistentVolume)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("expected volume, got %+v", volumeObj)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ctrl.deleteVolume(volume)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
if quit := workFunc(); quit {
|
||||||
|
glog.Infof("volume worker queue shutting down")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// claimWorker processes items from claimQueue. It must run only once,
|
||||||
|
// syncClaim is not reentrant.
|
||||||
|
func (ctrl *PersistentVolumeController) claimWorker() {
|
||||||
|
workFunc := func() bool {
|
||||||
|
keyObj, quit := ctrl.claimQueue.Get()
|
||||||
|
if quit {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
defer ctrl.claimQueue.Done(keyObj)
|
||||||
|
key := keyObj.(string)
|
||||||
|
glog.V(5).Infof("claimWorker[%s]", key)
|
||||||
|
|
||||||
|
claimObj, found, err := ctrl.claimInformer.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(2).Infof("error getting claim %q from informer: %v", key, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if found {
|
||||||
|
// The claim still exists in informer cache, the event must have
|
||||||
|
// been add/update/sync
|
||||||
|
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("expected claim, got %+v", claimObj)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ctrl.updateClaim(claim)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// The claim is not in informer cache, the event must have been "delete"
|
||||||
|
claimObj, found, err = ctrl.claims.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(2).Infof("error getting claim %q from cache: %v", key, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
// The controller has already processed the delete event and
|
||||||
|
// deleted the claim from its cache
|
||||||
|
glog.V(2).Infof("deletion of claim %q was already processed", key)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("expected claim, got %+v", claimObj)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ctrl.deleteClaim(claim)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
if quit := workFunc(); quit {
|
||||||
|
glog.Infof("claim worker queue shutting down")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -563,17 +561,20 @@ func isVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolum
|
||||||
// controller itself. Returns "true", if the cache was updated, false if the
|
// controller itself. Returns "true", if the cache was updated, false if the
|
||||||
// object is an old version and should be ignored.
|
// object is an old version and should be ignored.
|
||||||
func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
|
func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
|
||||||
objAccessor, err := meta.Accessor(obj)
|
objName, err := controller.KeyFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("Error reading cache of %s: %v", className, err)
|
return false, fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||||
}
|
}
|
||||||
objName := objAccessor.GetNamespace() + "/" + objAccessor.GetName()
|
|
||||||
|
|
||||||
oldObj, found, err := store.Get(obj)
|
oldObj, found, err := store.Get(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
|
return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
objAccessor, err := meta.Accessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
// This is a new object
|
// This is a new object
|
||||||
glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
|
glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
|
||||||
|
|
|
@ -176,7 +176,7 @@ func TestControllerSync(t *testing.T) {
|
||||||
|
|
||||||
// Start the controller
|
// Start the controller
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
ctrl.Run(stopCh)
|
go ctrl.Run(stopCh)
|
||||||
|
|
||||||
// Wait for the controller to pass initial sync and fill its caches.
|
// Wait for the controller to pass initial sync and fill its caches.
|
||||||
for !ctrl.volumeController.HasSynced() ||
|
for !ctrl.volumeController.HasSynced() ||
|
||||||
|
|
|
@ -120,7 +120,7 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
ctrl.Run(stopCh)
|
go ctrl.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
// This PV will be claimed, released, and recycled.
|
// This PV will be claimed, released, and recycled.
|
||||||
|
@ -174,7 +174,7 @@ func TestPersistentVolumeDeleter(t *testing.T) {
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
ctrl.Run(stopCh)
|
go ctrl.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
// This PV will be claimed, released, and deleted.
|
// This PV will be claimed, released, and deleted.
|
||||||
|
@ -233,7 +233,7 @@ func TestPersistentVolumeBindRace(t *testing.T) {
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
ctrl.Run(stopCh)
|
go ctrl.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
pv := createPV("fake-pv-race", "/tmp/foo", "10G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimRetain)
|
pv := createPV("fake-pv-race", "/tmp/foo", "10G", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, v1.PersistentVolumeReclaimRetain)
|
||||||
|
@ -304,7 +304,7 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) {
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller.Run(stopCh)
|
go controller.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -384,7 +384,7 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) {
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller.Run(stopCh)
|
go controller.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -483,7 +483,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller.Run(stopCh)
|
go controller.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
maxPVs := getObjectCount()
|
maxPVs := getObjectCount()
|
||||||
|
@ -572,7 +572,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
||||||
|
|
||||||
controllerStopCh := make(chan struct{})
|
controllerStopCh := make(chan struct{})
|
||||||
binder.Run(controllerStopCh)
|
go binder.Run(controllerStopCh)
|
||||||
defer close(controllerStopCh)
|
defer close(controllerStopCh)
|
||||||
|
|
||||||
objCount := getObjectCount()
|
objCount := getObjectCount()
|
||||||
|
@ -785,7 +785,7 @@ func TestPersistentVolumeControllerStartup(t *testing.T) {
|
||||||
|
|
||||||
// Start the controller when all PVs and PVCs are already saved in etcd
|
// Start the controller when all PVs and PVCs are already saved in etcd
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
binder.Run(stopCh)
|
go binder.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
// wait for at least two sync periods for changes. No volume should be
|
// wait for at least two sync periods for changes. No volume should be
|
||||||
|
@ -872,7 +872,7 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) {
|
||||||
testClient.Storage().StorageClasses().Create(&storageClass)
|
testClient.Storage().StorageClasses().Create(&storageClass)
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
binder.Run(stopCh)
|
go binder.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
objCount := getObjectCount()
|
objCount := getObjectCount()
|
||||||
|
@ -957,7 +957,7 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
|
||||||
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, v1.ListOptions{})
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
controller.Run(stopCh)
|
go controller.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
// This PV will be claimed, released, and deleted
|
// This PV will be claimed, released, and deleted
|
||||||
|
|
Loading…
Reference in New Issue