diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index bec90c1b8a..12a851cdfd 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -151,7 +151,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume // Immediate claims should be bound if len(unboundClaimsImmediate) > 0 { - return false, false, fmt.Errorf("pod has unbound PersistentVolumeClaims") + return false, false, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims") } // Check PV node affinity on bound volumes diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index b7a094125c..971c59b272 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -34,6 +34,7 @@ go_library( "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 93640d6f0e..bbfca036dc 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -29,6 +29,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/api/policy/v1beta1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -300,6 +301,13 @@ func NewConfigFactory( if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // Setup volume binder c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer) + + storageClassInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.onStorageClassAdd, + DeleteFunc: c.onStorageClassDelete, + }, + ) } // Setup cache comparer @@ -384,6 +392,13 @@ func (c *configFactory) onPvAdd(obj interface{}) { } c.invalidatePredicatesForPv(pv) } + // Pods created when there are no PVs available will be stuck in + // unschedulable queue. But unbound PVs created for static provisioning and + // delay binding storage class are skipped in PV controller dynamic + // provisiong and binding process, will not trigger events to schedule pod + // again. So we need to move pods to active queue on PV add for this + // scenario. + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onPvUpdate(old, new interface{}) { @@ -400,6 +415,11 @@ func (c *configFactory) onPvUpdate(old, new interface{}) { } c.invalidatePredicatesForPvUpdate(oldPV, newPV) } + // Scheduler.bindVolumesWorker may fail to update assumed pod volume + // bindings due to conflicts if PVs are updated by PV controller or other + // parties, then scheduler will add pod back to unschedulable queue. We + // need to move pods to active queue on PV update for this scenario. + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) { @@ -562,6 +582,59 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } +func (c *configFactory) onStorageClassAdd(obj interface{}) { + sc, ok := obj.(*storagev1.StorageClass) + if !ok { + glog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj) + return + } + + // CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these + // PVCs have specified StorageClass name, creating StorageClass objects + // with late binding will cause predicates to pass, so we need to move pods + // to active queue. + // We don't need to invalidate cached results because results will not be + // cached for pod that has unbound immediate PVCs. + if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { + c.podQueue.MoveAllToActiveQueue() + } +} + +func (c *configFactory) onStorageClassDelete(obj interface{}) { + if c.enableEquivalenceClassCache { + var sc *storagev1.StorageClass + switch t := obj.(type) { + case *storagev1.StorageClass: + sc = t + case cache.DeletedFinalStateUnknown: + var ok bool + sc, ok = t.Obj.(*storagev1.StorageClass) + if !ok { + glog.Errorf("cannot convert to *storagev1.StorageClass: %v", t.Obj) + return + } + default: + glog.Errorf("cannot convert to *storagev1.StorageClass: %v", t) + return + } + c.invalidatePredicatesForStorageClass(sc) + } +} + +func (c *configFactory) invalidatePredicatesForStorageClass(sc *storagev1.StorageClass) { + invalidPredicates := sets.NewString() + + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { + // Delete can cause predicates to fail + invalidPredicates.Insert(predicates.CheckVolumeBindingPred) + invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) + } + } + + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) +} + func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)