mirror of https://github.com/k3s-io/k3s
Retry scheduling on various events.
parent
c861ceb41a
commit
8f0373792f
|
@ -151,7 +151,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
|
||||||
|
|
||||||
// Immediate claims should be bound
|
// Immediate claims should be bound
|
||||||
if len(unboundClaimsImmediate) > 0 {
|
if len(unboundClaimsImmediate) > 0 {
|
||||||
return false, false, fmt.Errorf("pod has unbound PersistentVolumeClaims")
|
return false, false, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check PV node affinity on bound volumes
|
// Check PV node affinity on bound volumes
|
||||||
|
|
|
@ -34,6 +34,7 @@ go_library(
|
||||||
"//pkg/scheduler/volumebinder:go_default_library",
|
"//pkg/scheduler/volumebinder:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/api/policy/v1beta1"
|
"k8s.io/api/policy/v1beta1"
|
||||||
|
storagev1 "k8s.io/api/storage/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
@ -300,6 +301,13 @@ func NewConfigFactory(
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||||
// Setup volume binder
|
// Setup volume binder
|
||||||
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)
|
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)
|
||||||
|
|
||||||
|
storageClassInformer.Informer().AddEventHandler(
|
||||||
|
cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: c.onStorageClassAdd,
|
||||||
|
DeleteFunc: c.onStorageClassDelete,
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup cache comparer
|
// Setup cache comparer
|
||||||
|
@ -384,6 +392,13 @@ func (c *configFactory) onPvAdd(obj interface{}) {
|
||||||
}
|
}
|
||||||
c.invalidatePredicatesForPv(pv)
|
c.invalidatePredicatesForPv(pv)
|
||||||
}
|
}
|
||||||
|
// Pods created when there are no PVs available will be stuck in
|
||||||
|
// unschedulable queue. But unbound PVs created for static provisioning and
|
||||||
|
// delay binding storage class are skipped in PV controller dynamic
|
||||||
|
// provisiong and binding process, will not trigger events to schedule pod
|
||||||
|
// again. So we need to move pods to active queue on PV add for this
|
||||||
|
// scenario.
|
||||||
|
c.podQueue.MoveAllToActiveQueue()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) onPvUpdate(old, new interface{}) {
|
func (c *configFactory) onPvUpdate(old, new interface{}) {
|
||||||
|
@ -400,6 +415,11 @@ func (c *configFactory) onPvUpdate(old, new interface{}) {
|
||||||
}
|
}
|
||||||
c.invalidatePredicatesForPvUpdate(oldPV, newPV)
|
c.invalidatePredicatesForPvUpdate(oldPV, newPV)
|
||||||
}
|
}
|
||||||
|
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
|
||||||
|
// bindings due to conflicts if PVs are updated by PV controller or other
|
||||||
|
// parties, then scheduler will add pod back to unschedulable queue. We
|
||||||
|
// need to move pods to active queue on PV update for this scenario.
|
||||||
|
c.podQueue.MoveAllToActiveQueue()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
|
func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
|
||||||
|
@ -562,6 +582,59 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent
|
||||||
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *configFactory) onStorageClassAdd(obj interface{}) {
|
||||||
|
sc, ok := obj.(*storagev1.StorageClass)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
|
||||||
|
// PVCs have specified StorageClass name, creating StorageClass objects
|
||||||
|
// with late binding will cause predicates to pass, so we need to move pods
|
||||||
|
// to active queue.
|
||||||
|
// We don't need to invalidate cached results because results will not be
|
||||||
|
// cached for pod that has unbound immediate PVCs.
|
||||||
|
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
|
||||||
|
c.podQueue.MoveAllToActiveQueue()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *configFactory) onStorageClassDelete(obj interface{}) {
|
||||||
|
if c.enableEquivalenceClassCache {
|
||||||
|
var sc *storagev1.StorageClass
|
||||||
|
switch t := obj.(type) {
|
||||||
|
case *storagev1.StorageClass:
|
||||||
|
sc = t
|
||||||
|
case cache.DeletedFinalStateUnknown:
|
||||||
|
var ok bool
|
||||||
|
sc, ok = t.Obj.(*storagev1.StorageClass)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("cannot convert to *storagev1.StorageClass: %v", t.Obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
glog.Errorf("cannot convert to *storagev1.StorageClass: %v", t)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.invalidatePredicatesForStorageClass(sc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *configFactory) invalidatePredicatesForStorageClass(sc *storagev1.StorageClass) {
|
||||||
|
invalidPredicates := sets.NewString()
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||||
|
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
|
||||||
|
// Delete can cause predicates to fail
|
||||||
|
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
|
||||||
|
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *configFactory) onServiceAdd(obj interface{}) {
|
func (c *configFactory) onServiceAdd(obj interface{}) {
|
||||||
if c.enableEquivalenceClassCache {
|
if c.enableEquivalenceClassCache {
|
||||||
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
|
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
|
||||||
|
|
Loading…
Reference in New Issue