mirror of https://github.com/k3s-io/k3s
Merge pull request #69504 from krmayankk/refactor-sched1
Move informer event handlers to schedulerpull/564/head
commit
2e01637f86
|
@ -3,12 +3,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
|||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"eventhandlers.go",
|
||||
"scheduler.go",
|
||||
"testutil.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/scheduler",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/kubelet/apis:go_default_library",
|
||||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/api:go_default_library",
|
||||
|
@ -22,8 +24,10 @@ go_library(
|
|||
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
||||
|
@ -32,6 +36,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
|
@ -39,7 +44,10 @@ go_library(
|
|||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["scheduler_test.go"],
|
||||
srcs = [
|
||||
"eventhandlers_test.go",
|
||||
"scheduler_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/api/legacyscheme:go_default_library",
|
||||
|
@ -53,6 +61,7 @@ go_test(
|
|||
"//pkg/scheduler/factory:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/cache/fake:go_default_library",
|
||||
"//pkg/scheduler/internal/queue:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/scheduler/volumebinder:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@ -88,6 +97,7 @@ filegroup(
|
|||
"//pkg/scheduler/api:all-srcs",
|
||||
"//pkg/scheduler/apis/config:all-srcs",
|
||||
"//pkg/scheduler/core:all-srcs",
|
||||
"//pkg/scheduler/equivalence:all-srcs",
|
||||
"//pkg/scheduler/factory:all-srcs",
|
||||
"//pkg/scheduler/internal/cache:all-srcs",
|
||||
"//pkg/scheduler/internal/queue:all-srcs",
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
|
@ -0,0 +1,481 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"k8s.io/klog"
|
||||
"reflect"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
appsinformers "k8s.io/client-go/informers/apps/v1"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
|
||||
storageinformers "k8s.io/client-go/informers/storage/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
)
|
||||
|
||||
func (sched *Scheduler) onPvAdd(obj interface{}) {
|
||||
// Pods created when there are no PVs available will be stuck in
|
||||
// unschedulable queue. But unbound PVs created for static provisioning and
|
||||
// delay binding storage class are skipped in PV controller dynamic
|
||||
// provisiong and binding process, will not trigger events to schedule pod
|
||||
// again. So we need to move pods to active queue on PV add for this
|
||||
// scenario.
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
|
||||
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
|
||||
// bindings due to conflicts if PVs are updated by PV controller or other
|
||||
// parties, then scheduler will add pod back to unschedulable queue. We
|
||||
// need to move pods to active queue on PV update for this scenario.
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onPvcAdd(obj interface{}) {
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
|
||||
sc, ok := obj.(*storagev1.StorageClass)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
// CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
|
||||
// PVCs have specified StorageClass name, creating StorageClass objects
|
||||
// with late binding will cause predicates to pass, so we need to move pods
|
||||
// to active queue.
|
||||
// We don't need to invalidate cached results because results will not be
|
||||
// cached for pod that has unbound immediate PVCs.
|
||||
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onServiceAdd(obj interface{}) {
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) onServiceDelete(obj interface{}) {
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) addNodeToCache(obj interface{}) {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *v1.Node: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := sched.config.SchedulerCache.AddNode(node); err != nil {
|
||||
klog.Errorf("scheduler cache AddNode failed: %v", err)
|
||||
}
|
||||
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
|
||||
oldNode, ok := oldObj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj)
|
||||
return
|
||||
}
|
||||
newNode, ok := newObj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := sched.config.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
|
||||
klog.Errorf("scheduler cache UpdateNode failed: %v", err)
|
||||
}
|
||||
|
||||
// Only activate unschedulable pods if the node became more schedulable.
|
||||
// We skip the node property comparison when there is no unschedulable pods in the queue
|
||||
// to save processing cycles. We still trigger a move to active queue to cover the case
|
||||
// that a pod being processed by the scheduler is determined unschedulable. We want this
|
||||
// pod to be reevaluated when a change in the cluster happens.
|
||||
if sched.config.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
|
||||
var node *v1.Node
|
||||
switch t := obj.(type) {
|
||||
case *v1.Node:
|
||||
node = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
node, ok = t.Obj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *v1.Node: %v", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
klog.Errorf("cannot convert to *v1.Node: %v", t)
|
||||
return
|
||||
}
|
||||
// NOTE: Updates must be written to scheduler cache before invalidating
|
||||
// equivalence cache, because we could snapshot equivalence cache after the
|
||||
// invalidation and then snapshot the cache itself. If the cache is
|
||||
// snapshotted before updates are written, we would update equivalence
|
||||
// cache with stale information which is based on snapshot of old cache.
|
||||
if err := sched.config.SchedulerCache.RemoveNode(node); err != nil {
|
||||
klog.Errorf("scheduler cache RemoveNode failed: %v", err)
|
||||
}
|
||||
}
|
||||
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
|
||||
if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
|
||||
pod := newObj.(*v1.Pod)
|
||||
if sched.skipPodUpdate(pod) {
|
||||
return
|
||||
}
|
||||
if err := sched.config.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = obj.(*v1.Pod)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
|
||||
return
|
||||
}
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
|
||||
return
|
||||
}
|
||||
if err := sched.config.SchedulingQueue.Delete(pod); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
|
||||
}
|
||||
if sched.config.VolumeBinder != nil {
|
||||
// Volume binder only wants to keep unassigned pods
|
||||
sched.config.VolumeBinder.DeletePodBindings(pod)
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *Scheduler) addPodToCache(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *v1.Pod: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := sched.config.SchedulerCache.AddPod(pod); err != nil {
|
||||
klog.Errorf("scheduler cache AddPod failed: %v", err)
|
||||
}
|
||||
|
||||
sched.config.SchedulingQueue.AssignedPodAdded(pod)
|
||||
}
|
||||
|
||||
func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
|
||||
oldPod, ok := oldObj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
|
||||
return
|
||||
}
|
||||
newPod, ok := newObj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
// NOTE: Updates must be written to scheduler cache before invalidating
|
||||
// equivalence cache, because we could snapshot equivalence cache after the
|
||||
// invalidation and then snapshot the cache itself. If the cache is
|
||||
// snapshotted before updates are written, we would update equivalence
|
||||
// cache with stale information which is based on snapshot of old cache.
|
||||
if err := sched.config.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
|
||||
klog.Errorf("scheduler cache UpdatePod failed: %v", err)
|
||||
}
|
||||
|
||||
sched.config.SchedulingQueue.AssignedPodUpdated(newPod)
|
||||
}
|
||||
|
||||
func (sched *Scheduler) deletePodFromCache(obj interface{}) {
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
klog.Errorf("cannot convert to *v1.Pod: %v", t)
|
||||
return
|
||||
}
|
||||
// NOTE: Updates must be written to scheduler cache before invalidating
|
||||
// equivalence cache, because we could snapshot equivalence cache after the
|
||||
// invalidation and then snapshot the cache itself. If the cache is
|
||||
// snapshotted before updates are written, we would update equivalence
|
||||
// cache with stale information which is based on snapshot of old cache.
|
||||
if err := sched.config.SchedulerCache.RemovePod(pod); err != nil {
|
||||
klog.Errorf("scheduler cache RemovePod failed: %v", err)
|
||||
}
|
||||
|
||||
sched.config.SchedulingQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
// assignedPod selects pods that are assigned (scheduled and running).
|
||||
func assignedPod(pod *v1.Pod) bool {
|
||||
return len(pod.Spec.NodeName) != 0
|
||||
}
|
||||
|
||||
// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
|
||||
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
|
||||
return schedulerName == pod.Spec.SchedulerName
|
||||
}
|
||||
|
||||
// isZoneRegionLabel check if given key of label is zone or region label.
|
||||
func isZoneRegionLabel(k string) bool {
|
||||
return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion
|
||||
}
|
||||
|
||||
// skipPodUpdate checks whether the specified pod update should be ignored.
|
||||
// This function will return true if
|
||||
// - The pod has already been assumed, AND
|
||||
// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
|
||||
// updated.
|
||||
func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
|
||||
// Non-assumed pods should never be skipped.
|
||||
isAssumed, err := sched.config.SchedulerCache.IsAssumedPod(pod)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
|
||||
return false
|
||||
}
|
||||
if !isAssumed {
|
||||
return false
|
||||
}
|
||||
|
||||
// Gets the assumed pod from the cache.
|
||||
assumedPod, err := sched.config.SchedulerCache.GetPod(pod)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
|
||||
return false
|
||||
}
|
||||
|
||||
// Compares the assumed pod in the cache with the pod update. If they are
|
||||
// equal (with certain fields excluded), this pod update will be skipped.
|
||||
f := func(pod *v1.Pod) *v1.Pod {
|
||||
p := pod.DeepCopy()
|
||||
// ResourceVersion must be excluded because each object update will
|
||||
// have a new resource version.
|
||||
p.ResourceVersion = ""
|
||||
// Spec.NodeName must be excluded because the pod assumed in the cache
|
||||
// is expected to have a node assigned while the pod update may nor may
|
||||
// not have this field set.
|
||||
p.Spec.NodeName = ""
|
||||
// Annotations must be excluded for the reasons described in
|
||||
// https://github.com/kubernetes/kubernetes/issues/52914.
|
||||
p.Annotations = nil
|
||||
return p
|
||||
}
|
||||
assumedPodCopy, podCopy := f(assumedPod), f(pod)
|
||||
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
|
||||
return false
|
||||
}
|
||||
klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
|
||||
return true
|
||||
}
|
||||
|
||||
// AddAllEventHandlers is a helper function used in tests and in Scheduler
|
||||
// to add event handlers for various informers.
|
||||
func AddAllEventHandlers(
|
||||
sched *Scheduler,
|
||||
schedulerName string,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
podInformer coreinformers.PodInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
replicationControllerInformer coreinformers.ReplicationControllerInformer,
|
||||
replicaSetInformer appsinformers.ReplicaSetInformer,
|
||||
statefulSetInformer appsinformers.StatefulSetInformer,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
pdbInformer policyinformers.PodDisruptionBudgetInformer,
|
||||
storageClassInformer storageinformers.StorageClassInformer,
|
||||
) {
|
||||
// scheduled pod cache
|
||||
podInformer.Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return assignedPod(t)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
if pod, ok := t.Obj.(*v1.Pod); ok {
|
||||
return assignedPod(pod)
|
||||
}
|
||||
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
|
||||
return false
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
|
||||
return false
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.addPodToCache,
|
||||
UpdateFunc: sched.updatePodInCache,
|
||||
DeleteFunc: sched.deletePodFromCache,
|
||||
},
|
||||
},
|
||||
)
|
||||
// unscheduled pod queue
|
||||
podInformer.Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return !assignedPod(t) && responsibleForPod(t, schedulerName)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
if pod, ok := t.Obj.(*v1.Pod); ok {
|
||||
return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
|
||||
}
|
||||
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
|
||||
return false
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
|
||||
return false
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.addPodToSchedulingQueue,
|
||||
UpdateFunc: sched.updatePodInSchedulingQueue,
|
||||
DeleteFunc: sched.deletePodFromSchedulingQueue,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.addNodeToCache,
|
||||
UpdateFunc: sched.updateNodeInCache,
|
||||
DeleteFunc: sched.deleteNodeFromCache,
|
||||
},
|
||||
)
|
||||
|
||||
// On add and delete of PVs, it will affect equivalence cache items
|
||||
// related to persistent volume
|
||||
pvInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
|
||||
AddFunc: sched.onPvAdd,
|
||||
UpdateFunc: sched.onPvUpdate,
|
||||
},
|
||||
)
|
||||
|
||||
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
|
||||
pvcInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.onPvcAdd,
|
||||
UpdateFunc: sched.onPvcUpdate,
|
||||
},
|
||||
)
|
||||
|
||||
// This is for ServiceAffinity: affected by the selector of the service is updated.
|
||||
// Also, if new service is added, equivalence cache will also become invalid since
|
||||
// existing pods may be "captured" by this service and change this predicate result.
|
||||
serviceInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.onServiceAdd,
|
||||
UpdateFunc: sched.onServiceUpdate,
|
||||
DeleteFunc: sched.onServiceDelete,
|
||||
},
|
||||
)
|
||||
|
||||
storageClassInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: sched.onStorageClassAdd,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
if nodeSpecUnschedulableChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
if nodeAllocatableChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
if nodeLabelsChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
if nodeTaintsChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
if nodeConditionsChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
|
||||
}
|
||||
|
||||
func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
|
||||
}
|
||||
|
||||
func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
|
||||
}
|
||||
|
||||
func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
|
||||
conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
|
||||
for i := range conditions {
|
||||
conditionStatuses[conditions[i].Type] = conditions[i].Status
|
||||
}
|
||||
return conditionStatuses
|
||||
}
|
||||
return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
|
||||
}
|
||||
|
||||
func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
|
||||
}
|
|
@ -0,0 +1,265 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
|
||||
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
|
||||
)
|
||||
|
||||
func TestSkipPodUpdate(t *testing.T) {
|
||||
table := []struct {
|
||||
pod *v1.Pod
|
||||
isAssumedPodFunc func(*v1.Pod) bool
|
||||
getPodFunc func(*v1.Pod) *v1.Pod
|
||||
expected bool
|
||||
name string
|
||||
}{
|
||||
{
|
||||
name: "Non-assumed pod",
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool { return false },
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations",
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Annotations: map[string]string{"a": "b"},
|
||||
ResourceVersion: "0",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node-0",
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool {
|
||||
return true
|
||||
},
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Annotations: map[string]string{"c": "d"},
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node-1",
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "with changes on Labels",
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Labels: map[string]string{"a": "b"},
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool {
|
||||
return true
|
||||
},
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Labels: map[string]string{"c": "d"},
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
c := NewFromConfig(&factory.Config{
|
||||
SchedulerCache: &fakecache.Cache{
|
||||
IsAssumedPodFunc: test.isAssumedPodFunc,
|
||||
GetPodFunc: test.getPodFunc,
|
||||
},
|
||||
},
|
||||
)
|
||||
got := c.skipPodUpdate(test.pod)
|
||||
if got != test.expected {
|
||||
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeAllocatableChanged(t *testing.T) {
|
||||
newQuantity := func(value int64) resource.Quantity {
|
||||
return *resource.NewQuantity(value, resource.BinarySI)
|
||||
}
|
||||
for _, c := range []struct {
|
||||
Name string
|
||||
Changed bool
|
||||
OldAllocatable v1.ResourceList
|
||||
NewAllocatable v1.ResourceList
|
||||
}{
|
||||
{
|
||||
Name: "no allocatable resources changed",
|
||||
Changed: false,
|
||||
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
|
||||
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
|
||||
},
|
||||
{
|
||||
Name: "new node has more allocatable resources",
|
||||
Changed: true,
|
||||
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
|
||||
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)},
|
||||
},
|
||||
} {
|
||||
oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}}
|
||||
newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}}
|
||||
changed := nodeAllocatableChanged(newNode, oldNode)
|
||||
if changed != c.Changed {
|
||||
t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeLabelsChanged(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
Name string
|
||||
Changed bool
|
||||
OldLabels map[string]string
|
||||
NewLabels map[string]string
|
||||
}{
|
||||
{
|
||||
Name: "no labels changed",
|
||||
Changed: false,
|
||||
OldLabels: map[string]string{"foo": "bar"},
|
||||
NewLabels: map[string]string{"foo": "bar"},
|
||||
},
|
||||
// Labels changed.
|
||||
{
|
||||
Name: "new node has more labels",
|
||||
Changed: true,
|
||||
OldLabels: map[string]string{"foo": "bar"},
|
||||
NewLabels: map[string]string{"foo": "bar", "test": "value"},
|
||||
},
|
||||
} {
|
||||
oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}}
|
||||
newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}}
|
||||
changed := nodeLabelsChanged(newNode, oldNode)
|
||||
if changed != c.Changed {
|
||||
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeTaintsChanged(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
Name string
|
||||
Changed bool
|
||||
OldTaints []v1.Taint
|
||||
NewTaints []v1.Taint
|
||||
}{
|
||||
{
|
||||
Name: "no taint changed",
|
||||
Changed: false,
|
||||
OldTaints: []v1.Taint{{Key: "key", Value: "value"}},
|
||||
NewTaints: []v1.Taint{{Key: "key", Value: "value"}},
|
||||
},
|
||||
{
|
||||
Name: "taint value changed",
|
||||
Changed: true,
|
||||
OldTaints: []v1.Taint{{Key: "key", Value: "value1"}},
|
||||
NewTaints: []v1.Taint{{Key: "key", Value: "value2"}},
|
||||
},
|
||||
} {
|
||||
oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}}
|
||||
newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}}
|
||||
changed := nodeTaintsChanged(newNode, oldNode)
|
||||
if changed != c.Changed {
|
||||
t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeConditionsChanged(t *testing.T) {
|
||||
nodeConditionType := reflect.TypeOf(v1.NodeCondition{})
|
||||
if nodeConditionType.NumField() != 6 {
|
||||
t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.")
|
||||
}
|
||||
|
||||
for _, c := range []struct {
|
||||
Name string
|
||||
Changed bool
|
||||
OldConditions []v1.NodeCondition
|
||||
NewConditions []v1.NodeCondition
|
||||
}{
|
||||
{
|
||||
Name: "no condition changed",
|
||||
Changed: false,
|
||||
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
|
||||
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
|
||||
},
|
||||
{
|
||||
Name: "only LastHeartbeatTime changed",
|
||||
Changed: false,
|
||||
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}},
|
||||
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}},
|
||||
},
|
||||
{
|
||||
Name: "new node has more healthy conditions",
|
||||
Changed: true,
|
||||
OldConditions: []v1.NodeCondition{},
|
||||
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
|
||||
},
|
||||
{
|
||||
Name: "new node has less unhealthy conditions",
|
||||
Changed: true,
|
||||
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
|
||||
NewConditions: []v1.NodeCondition{},
|
||||
},
|
||||
{
|
||||
Name: "condition status changed",
|
||||
Changed: true,
|
||||
OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}},
|
||||
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
|
||||
},
|
||||
} {
|
||||
oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}}
|
||||
newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}}
|
||||
changed := nodeConditionsChanged(newNode, oldNode)
|
||||
if changed != c.Changed {
|
||||
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,7 +10,6 @@ go_library(
|
|||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/kubelet/apis:go_default_library",
|
||||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/algorithm/priorities:go_default_library",
|
||||
|
@ -25,7 +24,6 @@ go_library(
|
|||
"//pkg/scheduler/util:go_default_library",
|
||||
"//pkg/scheduler/volumebinder:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
|
@ -65,12 +63,10 @@ go_test(
|
|||
"//pkg/scheduler/api:go_default_library",
|
||||
"//pkg/scheduler/api/latest:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/cache/fake:go_default_library",
|
||||
"//pkg/scheduler/internal/queue:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
|
|
|
@ -20,11 +20,9 @@ package factory
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
|
@ -47,7 +45,6 @@ import (
|
|||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
|
||||
|
@ -68,14 +65,6 @@ const (
|
|||
maximalGetBackoff = time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
serviceAffinitySet = sets.NewString(predicates.CheckServiceAffinityPred)
|
||||
matchInterPodAffinitySet = sets.NewString(predicates.MatchInterPodAffinityPred)
|
||||
generalPredicatesSets = sets.NewString(predicates.GeneralPred)
|
||||
noDiskConflictSet = sets.NewString(predicates.NoDiskConflictPred)
|
||||
maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred, predicates.MaxCinderVolumeCountPred}
|
||||
)
|
||||
|
||||
// Binder knows how to write a binding.
|
||||
type Binder interface {
|
||||
Bind(binding *v1.Binding) error
|
||||
|
@ -172,8 +161,6 @@ type Configurator interface {
|
|||
// configFactory is the default implementation of the scheduler.Configurator interface.
|
||||
type configFactory struct {
|
||||
client clientset.Interface
|
||||
// queue for pods that need scheduling
|
||||
podQueue internalqueue.SchedulingQueue
|
||||
// a means to list all known scheduled pods.
|
||||
scheduledPodLister corelisters.PodLister
|
||||
// a means to list all known scheduled pods and pods assumed to have been scheduled.
|
||||
|
@ -226,6 +213,10 @@ type configFactory struct {
|
|||
|
||||
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
|
||||
percentageOfNodesToScore int32
|
||||
|
||||
bindTimeoutSeconds int64
|
||||
// queue for pods that need scheduling
|
||||
podQueue internalqueue.SchedulingQueue
|
||||
}
|
||||
|
||||
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
|
||||
|
@ -282,105 +273,15 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
|||
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
|
||||
disablePreemption: args.DisablePreemption,
|
||||
percentageOfNodesToScore: args.PercentageOfNodesToScore,
|
||||
bindTimeoutSeconds: args.BindTimeoutSeconds,
|
||||
}
|
||||
|
||||
// Setup volume binder
|
||||
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
|
||||
// scheduled pod cache
|
||||
args.PodInformer.Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return assignedPod(t)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
if pod, ok := t.Obj.(*v1.Pod); ok {
|
||||
return assignedPod(pod)
|
||||
}
|
||||
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering scheduledPod in %T", obj, c))
|
||||
return false
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("unable to handle object for filtering scheduledPod in %T: %T", c, obj))
|
||||
return false
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addPodToCache,
|
||||
UpdateFunc: c.updatePodInCache,
|
||||
DeleteFunc: c.deletePodFromCache,
|
||||
},
|
||||
},
|
||||
)
|
||||
// unscheduled pod queue
|
||||
args.PodInformer.Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
if pod, ok := t.Obj.(*v1.Pod); ok {
|
||||
return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
|
||||
}
|
||||
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering unscheduledPod in %T", obj, c))
|
||||
return false
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("unable to handle object for filtering unscheduledPod in %T: %T", c, obj))
|
||||
return false
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addPodToSchedulingQueue,
|
||||
UpdateFunc: c.updatePodInSchedulingQueue,
|
||||
DeleteFunc: c.deletePodFromSchedulingQueue,
|
||||
},
|
||||
},
|
||||
)
|
||||
// ScheduledPodLister is something we provide to plug-in functions that
|
||||
// they may need to call.
|
||||
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
|
||||
|
||||
args.NodeInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addNodeToCache,
|
||||
UpdateFunc: c.updateNodeInCache,
|
||||
DeleteFunc: c.deleteNodeFromCache,
|
||||
},
|
||||
)
|
||||
|
||||
args.PvInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
|
||||
AddFunc: c.onPvAdd,
|
||||
UpdateFunc: c.onPvUpdate,
|
||||
},
|
||||
)
|
||||
|
||||
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
|
||||
args.PvcInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onPvcAdd,
|
||||
UpdateFunc: c.onPvcUpdate,
|
||||
},
|
||||
)
|
||||
|
||||
// This is for ServiceAffinity: affected by the selector of the service is updated.
|
||||
args.ServiceInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onServiceAdd,
|
||||
UpdateFunc: c.onServiceUpdate,
|
||||
DeleteFunc: c.onServiceDelete,
|
||||
},
|
||||
)
|
||||
|
||||
// Setup volume binder
|
||||
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||
|
||||
args.StorageClassInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onStorageClassAdd,
|
||||
},
|
||||
)
|
||||
|
||||
// Setup cache debugger
|
||||
debugger := cachedebugger.New(
|
||||
args.NodeInformer.Lister(),
|
||||
|
@ -394,116 +295,9 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
|||
<-c.StopEverything
|
||||
c.podQueue.Close()
|
||||
}()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// skipPodUpdate checks whether the specified pod update should be ignored.
|
||||
// This function will return true if
|
||||
// - The pod has already been assumed, AND
|
||||
// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
|
||||
// updated.
|
||||
func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool {
|
||||
// Non-assumed pods should never be skipped.
|
||||
isAssumed, err := c.schedulerCache.IsAssumedPod(pod)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
|
||||
return false
|
||||
}
|
||||
if !isAssumed {
|
||||
return false
|
||||
}
|
||||
|
||||
// Gets the assumed pod from the cache.
|
||||
assumedPod, err := c.schedulerCache.GetPod(pod)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
|
||||
return false
|
||||
}
|
||||
|
||||
// Compares the assumed pod in the cache with the pod update. If they are
|
||||
// equal (with certain fields excluded), this pod update will be skipped.
|
||||
f := func(pod *v1.Pod) *v1.Pod {
|
||||
p := pod.DeepCopy()
|
||||
// ResourceVersion must be excluded because each object update will
|
||||
// have a new resource version.
|
||||
p.ResourceVersion = ""
|
||||
// Spec.NodeName must be excluded because the pod assumed in the cache
|
||||
// is expected to have a node assigned while the pod update may nor may
|
||||
// not have this field set.
|
||||
p.Spec.NodeName = ""
|
||||
// Annotations must be excluded for the reasons described in
|
||||
// https://github.com/kubernetes/kubernetes/issues/52914.
|
||||
p.Annotations = nil
|
||||
return p
|
||||
}
|
||||
assumedPodCopy, podCopy := f(assumedPod), f(pod)
|
||||
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
|
||||
return false
|
||||
}
|
||||
klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *configFactory) onPvAdd(obj interface{}) {
|
||||
// Pods created when there are no PVs available will be stuck in
|
||||
// unschedulable queue. But unbound PVs created for static provisioning and
|
||||
// delay binding storage class are skipped in PV controller dynamic
|
||||
// provisioning and binding process, will not trigger events to schedule pod
|
||||
// again. So we need to move pods to active queue on PV add for this
|
||||
// scenario.
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (c *configFactory) onPvUpdate(old, new interface{}) {
|
||||
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
|
||||
// bindings due to conflicts if PVs are updated by PV controller or other
|
||||
// parties, then scheduler will add pod back to unschedulable queue. We
|
||||
// need to move pods to active queue on PV update for this scenario.
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
// isZoneRegionLabel check if given key of label is zone or region label.
|
||||
func isZoneRegionLabel(k string) bool {
|
||||
return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion
|
||||
}
|
||||
|
||||
func (c *configFactory) onPvcAdd(obj interface{}) {
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (c *configFactory) onPvcUpdate(old, new interface{}) {
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (c *configFactory) onStorageClassAdd(obj interface{}) {
|
||||
sc, ok := obj.(*storagev1.StorageClass)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *storagev1.StorageClass for storageClassAdd: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
// CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
|
||||
// PVCs have specified StorageClass name, creating StorageClass objects
|
||||
// with late binding will cause predicates to pass, so we need to move pods
|
||||
// to active queue.
|
||||
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *configFactory) onServiceAdd(obj interface{}) {
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (c *configFactory) onServiceDelete(obj interface{}) {
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
|
||||
func (c *configFactory) GetNodeLister() corelisters.NodeLister {
|
||||
return c.nodeLister
|
||||
|
@ -527,213 +321,6 @@ func (c *configFactory) GetScheduledPodLister() corelisters.PodLister {
|
|||
return c.scheduledPodLister
|
||||
}
|
||||
|
||||
func (c *configFactory) addPodToCache(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *v1.Pod: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.AddPod(pod); err != nil {
|
||||
klog.Errorf("scheduler cache AddPod failed: %v", err)
|
||||
}
|
||||
|
||||
c.podQueue.AssignedPodAdded(pod)
|
||||
}
|
||||
|
||||
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
|
||||
oldPod, ok := oldObj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
|
||||
return
|
||||
}
|
||||
newPod, ok := newObj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
|
||||
klog.Errorf("scheduler cache UpdatePod failed: %v", err)
|
||||
}
|
||||
|
||||
c.podQueue.AssignedPodUpdated(newPod)
|
||||
}
|
||||
|
||||
func (c *configFactory) addPodToSchedulingQueue(obj interface{}) {
|
||||
if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
|
||||
pod := newObj.(*v1.Pod)
|
||||
if c.skipPodUpdate(pod) {
|
||||
return
|
||||
}
|
||||
if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = obj.(*v1.Pod)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
|
||||
return
|
||||
}
|
||||
default:
|
||||
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
|
||||
return
|
||||
}
|
||||
if err := c.podQueue.Delete(pod); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
|
||||
}
|
||||
if c.volumeBinder != nil {
|
||||
// Volume binder only wants to keep unassigned pods
|
||||
c.volumeBinder.DeletePodBindings(pod)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *configFactory) deletePodFromCache(obj interface{}) {
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert DeletedFinalStateUnknown obj to *v1.Pod: %v", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
klog.Errorf("cannot convert to *v1.Pod: %v", t)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.RemovePod(pod); err != nil {
|
||||
klog.Errorf("scheduler cache RemovePod failed: %v", err)
|
||||
}
|
||||
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (c *configFactory) addNodeToCache(obj interface{}) {
|
||||
node, ok := obj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert to *v1.Node: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.AddNode(node); err != nil {
|
||||
klog.Errorf("scheduler cache AddNode failed: %v", err)
|
||||
}
|
||||
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
||||
func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
|
||||
oldNode, ok := oldObj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj)
|
||||
return
|
||||
}
|
||||
newNode, ok := newObj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
|
||||
klog.Errorf("scheduler cache UpdateNode failed: %v", err)
|
||||
}
|
||||
|
||||
// Only activate unschedulable pods if the node became more schedulable.
|
||||
// We skip the node property comparison when there is no unschedulable pods in the queue
|
||||
// to save processing cycles. We still trigger a move to active queue to cover the case
|
||||
// that a pod being processed by the scheduler is determined unschedulable. We want this
|
||||
// pod to be reevaluated when a change in the cluster happens.
|
||||
if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
}
|
||||
|
||||
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
if nodeSpecUnschedulableChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
if nodeAllocatableChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
if nodeLabelsChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
if nodeTaintsChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
if nodeConditionsChanged(newNode, oldNode) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
|
||||
}
|
||||
|
||||
func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
|
||||
}
|
||||
|
||||
func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
|
||||
}
|
||||
|
||||
func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
|
||||
conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
|
||||
for i := range conditions {
|
||||
conditionStatuses[conditions[i].Type] = conditions[i].Status
|
||||
}
|
||||
return conditionStatuses
|
||||
}
|
||||
return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
|
||||
}
|
||||
|
||||
func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
|
||||
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
|
||||
}
|
||||
|
||||
func (c *configFactory) deleteNodeFromCache(obj interface{}) {
|
||||
var node *v1.Node
|
||||
switch t := obj.(type) {
|
||||
case *v1.Node:
|
||||
node = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
node, ok = t.Obj.(*v1.Node)
|
||||
if !ok {
|
||||
klog.Errorf("cannot convert DeletedFinalStateUnknown obj to *v1.Node: %v", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
klog.Errorf("cannot convert to *v1.Node: %v", t)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.RemoveNode(node); err != nil {
|
||||
klog.Errorf("scheduler cache RemoveNode failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create creates a scheduler with the default algorithm provider.
|
||||
func (c *configFactory) Create() (*Config, error) {
|
||||
return c.CreateFromProvider(DefaultProvider)
|
||||
|
@ -968,16 +555,6 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// assignedPod selects pods that are assigned (scheduled and running).
|
||||
func assignedPod(pod *v1.Pod) bool {
|
||||
return len(pod.Spec.NodeName) != 0
|
||||
}
|
||||
|
||||
// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
|
||||
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
|
||||
return schedulerName == pod.Spec.SchedulerName
|
||||
}
|
||||
|
||||
// assignedPodLister filters the pods returned from a PodLister to
|
||||
// only include those that have a node name set.
|
||||
type assignedPodLister struct {
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
@ -40,7 +39,6 @@ import (
|
|||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
|
||||
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
|
@ -425,98 +423,6 @@ func TestInvalidFactoryArgs(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestSkipPodUpdate(t *testing.T) {
|
||||
table := []struct {
|
||||
pod *v1.Pod
|
||||
isAssumedPodFunc func(*v1.Pod) bool
|
||||
getPodFunc func(*v1.Pod) *v1.Pod
|
||||
expected bool
|
||||
name string
|
||||
}{
|
||||
{
|
||||
name: "Non-assumed pod",
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool { return false },
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations",
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Annotations: map[string]string{"a": "b"},
|
||||
ResourceVersion: "0",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node-0",
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool {
|
||||
return true
|
||||
},
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Annotations: map[string]string{"c": "d"},
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node-1",
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "with changes on Labels",
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Labels: map[string]string{"a": "b"},
|
||||
},
|
||||
},
|
||||
isAssumedPodFunc: func(*v1.Pod) bool {
|
||||
return true
|
||||
},
|
||||
getPodFunc: func(*v1.Pod) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-0",
|
||||
Labels: map[string]string{"c": "d"},
|
||||
},
|
||||
}
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
c := &configFactory{
|
||||
schedulerCache: &fakecache.Cache{
|
||||
IsAssumedPodFunc: test.isAssumedPodFunc,
|
||||
GetPodFunc: test.getPodFunc,
|
||||
},
|
||||
}
|
||||
got := c.skipPodUpdate(test.pod)
|
||||
if got != test.expected {
|
||||
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) Configurator {
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
return NewConfigFactory(&ConfigFactoryArgs{
|
||||
|
@ -654,146 +560,3 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm
|
|||
t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeAllocatableChanged(t *testing.T) {
|
||||
newQuantity := func(value int64) resource.Quantity {
|
||||
return *resource.NewQuantity(value, resource.BinarySI)
|
||||
}
|
||||
for _, c := range []struct {
|
||||
Name string
|
||||
Changed bool
|
||||
OldAllocatable v1.ResourceList
|
||||
NewAllocatable v1.ResourceList
|
||||
}{
|
||||
{
|
||||
Name: "no allocatable resources changed",
|
||||
Changed: false,
|
||||
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
|
||||
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
|
||||
},
|
||||
{
|
||||
Name: "new node has more allocatable resources",
|
||||
Changed: true,
|
||||
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
|
||||
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)},
|
||||
},
|
||||
} {
|
||||
oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}}
|
||||
newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}}
|
||||
changed := nodeAllocatableChanged(newNode, oldNode)
|
||||
if changed != c.Changed {
|
||||
t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeLabelsChanged(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
Name string
|
||||
Changed bool
|
||||
OldLabels map[string]string
|
||||
NewLabels map[string]string
|
||||
}{
|
||||
{
|
||||
Name: "no labels changed",
|
||||
Changed: false,
|
||||
OldLabels: map[string]string{"foo": "bar"},
|
||||
NewLabels: map[string]string{"foo": "bar"},
|
||||
},
|
||||
// Labels changed.
|
||||
{
|
||||
Name: "new node has more labels",
|
||||
Changed: true,
|
||||
OldLabels: map[string]string{"foo": "bar"},
|
||||
NewLabels: map[string]string{"foo": "bar", "test": "value"},
|
||||
},
|
||||
} {
|
||||
oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}}
|
||||
newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}}
|
||||
changed := nodeLabelsChanged(newNode, oldNode)
|
||||
if changed != c.Changed {
|
||||
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeTaintsChanged(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
Name string
|
||||
Changed bool
|
||||
OldTaints []v1.Taint
|
||||
NewTaints []v1.Taint
|
||||
}{
|
||||
{
|
||||
Name: "no taint changed",
|
||||
Changed: false,
|
||||
OldTaints: []v1.Taint{{Key: "key", Value: "value"}},
|
||||
NewTaints: []v1.Taint{{Key: "key", Value: "value"}},
|
||||
},
|
||||
{
|
||||
Name: "taint value changed",
|
||||
Changed: true,
|
||||
OldTaints: []v1.Taint{{Key: "key", Value: "value1"}},
|
||||
NewTaints: []v1.Taint{{Key: "key", Value: "value2"}},
|
||||
},
|
||||
} {
|
||||
oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}}
|
||||
newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}}
|
||||
changed := nodeTaintsChanged(newNode, oldNode)
|
||||
if changed != c.Changed {
|
||||
t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeConditionsChanged(t *testing.T) {
|
||||
nodeConditionType := reflect.TypeOf(v1.NodeCondition{})
|
||||
if nodeConditionType.NumField() != 6 {
|
||||
t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.")
|
||||
}
|
||||
|
||||
for _, c := range []struct {
|
||||
Name string
|
||||
Changed bool
|
||||
OldConditions []v1.NodeCondition
|
||||
NewConditions []v1.NodeCondition
|
||||
}{
|
||||
{
|
||||
Name: "no condition changed",
|
||||
Changed: false,
|
||||
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
|
||||
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
|
||||
},
|
||||
{
|
||||
Name: "only LastHeartbeatTime changed",
|
||||
Changed: false,
|
||||
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}},
|
||||
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}},
|
||||
},
|
||||
{
|
||||
Name: "new node has more healthy conditions",
|
||||
Changed: true,
|
||||
OldConditions: []v1.NodeCondition{},
|
||||
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
|
||||
},
|
||||
{
|
||||
Name: "new node has less unhealthy conditions",
|
||||
Changed: true,
|
||||
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
|
||||
NewConditions: []v1.NodeCondition{},
|
||||
},
|
||||
{
|
||||
Name: "condition status changed",
|
||||
Changed: true,
|
||||
OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}},
|
||||
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
|
||||
},
|
||||
} {
|
||||
oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}}
|
||||
newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}}
|
||||
changed := nodeConditionsChanged(newNode, oldNode)
|
||||
if changed != c.Changed {
|
||||
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,7 +138,6 @@ func New(client clientset.Interface,
|
|||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
|
||||
// Set up the configurator which can create schedulers from configs.
|
||||
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: options.schedulerName,
|
||||
|
@ -193,8 +192,11 @@ func New(client clientset.Interface,
|
|||
config.Recorder = recorder
|
||||
config.DisablePreemption = options.disablePreemption
|
||||
config.StopEverything = stopCh
|
||||
|
||||
// Create the scheduler.
|
||||
sched := NewFromConfig(config)
|
||||
|
||||
AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, replicationControllerInformer, replicaSetInformer, statefulSetInformer, serviceInformer, pdbInformer, storageClassInformer)
|
||||
return sched, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
||||
)
|
||||
|
@ -633,7 +634,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
|||
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
algo := core.NewGenericScheduler(
|
||||
scache,
|
||||
nil,
|
||||
internalqueue.NewSchedulingQueue(nil),
|
||||
predicateMap,
|
||||
predicates.EmptyPredicateMetadataProducer,
|
||||
[]priorities.PriorityConfig{},
|
||||
|
@ -645,7 +646,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
|
|||
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
|
||||
false,
|
||||
false,
|
||||
api.DefaultPercentageOfNodesToScore)
|
||||
api.DefaultPercentageOfNodesToScore,
|
||||
)
|
||||
bindingChan := make(chan *v1.Binding, 1)
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
|
@ -684,7 +686,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
|
|||
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
|
||||
algo := core.NewGenericScheduler(
|
||||
scache,
|
||||
nil,
|
||||
internalqueue.NewSchedulingQueue(nil),
|
||||
predicateMap,
|
||||
predicates.EmptyPredicateMetadataProducer,
|
||||
[]priorities.PriorityConfig{},
|
||||
|
@ -696,7 +698,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
|
|||
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
|
||||
false,
|
||||
false,
|
||||
api.DefaultPercentageOfNodesToScore)
|
||||
api.DefaultPercentageOfNodesToScore,
|
||||
)
|
||||
bindingChan := make(chan *v1.Binding, 2)
|
||||
|
||||
sched := NewFromConfig(&factory.Config{
|
||||
|
|
|
@ -114,14 +114,29 @@ func setupScheduler(
|
|||
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
DisablePreemption: false,
|
||||
PercentageOfNodesToScore: 100,
|
||||
StopCh: stopCh,
|
||||
})
|
||||
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create scheduler config: %v", err)
|
||||
}
|
||||
|
||||
schedulerConfig.StopEverything = stopCh
|
||||
// TODO: Replace NewFromConfig and AddAllEventHandlers with scheduler.New() in
|
||||
// all test/integration tests.
|
||||
sched := scheduler.NewFromConfig(schedulerConfig)
|
||||
scheduler.AddAllEventHandlers(sched,
|
||||
v1.DefaultSchedulerName,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Apps().V1().ReplicaSets(),
|
||||
informerFactory.Apps().V1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
)
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
|
||||
|
@ -132,8 +147,6 @@ func setupScheduler(
|
|||
Interface: cs.CoreV1().Events(""),
|
||||
})
|
||||
|
||||
sched := scheduler.NewFromConfig(schedulerConfig)
|
||||
|
||||
algorithmprovider.ApplyFeatureGates()
|
||||
|
||||
go sched.Run()
|
||||
|
@ -513,12 +526,12 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
|
|||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
// Start Scheduler
|
||||
setupScheduler(t, clientset, informers, stopCh)
|
||||
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
_, err := dsClient.Create(ds)
|
||||
|
@ -924,11 +937,11 @@ func TestTaintedNode(t *testing.T) {
|
|||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
informers.Start(stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
// Start Scheduler
|
||||
setupScheduler(t, clientset, informers, stopCh)
|
||||
informers.Start(stopCh)
|
||||
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
ds := newDaemonSet("foo", ns.Name)
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
|
|
|
@ -539,10 +539,24 @@ func TestMultiScheduler(t *testing.T) {
|
|||
eventBroadcaster2 := record.NewBroadcaster()
|
||||
schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler})
|
||||
eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet2.CoreV1().Events("")})
|
||||
go podInformer2.Informer().Run(stopCh)
|
||||
informerFactory2.Start(stopCh)
|
||||
|
||||
sched2 := scheduler.NewFromConfig(schedulerConfig2)
|
||||
scheduler.AddAllEventHandlers(sched2,
|
||||
fooScheduler,
|
||||
context.informerFactory.Core().V1().Nodes(),
|
||||
podInformer2,
|
||||
context.informerFactory.Core().V1().PersistentVolumes(),
|
||||
context.informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
context.informerFactory.Core().V1().ReplicationControllers(),
|
||||
context.informerFactory.Apps().V1().ReplicaSets(),
|
||||
context.informerFactory.Apps().V1().StatefulSets(),
|
||||
context.informerFactory.Core().V1().Services(),
|
||||
context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
context.informerFactory.Storage().V1().StorageClasses(),
|
||||
)
|
||||
|
||||
go podInformer2.Informer().Run(stopCh)
|
||||
informerFactory2.Start(stopCh)
|
||||
sched2.Run()
|
||||
|
||||
// 6. **check point-2**:
|
||||
|
|
|
@ -189,6 +189,22 @@ func initTestSchedulerWithOptions(
|
|||
// set DisablePreemption option
|
||||
context.schedulerConfig.DisablePreemption = disablePreemption
|
||||
|
||||
context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
|
||||
|
||||
scheduler.AddAllEventHandlers(context.scheduler,
|
||||
v1.DefaultSchedulerName,
|
||||
context.informerFactory.Core().V1().Nodes(),
|
||||
podInformer,
|
||||
context.informerFactory.Core().V1().PersistentVolumes(),
|
||||
context.informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
context.informerFactory.Core().V1().ReplicationControllers(),
|
||||
context.informerFactory.Apps().V1().ReplicaSets(),
|
||||
context.informerFactory.Apps().V1().StatefulSets(),
|
||||
context.informerFactory.Core().V1().Services(),
|
||||
context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
context.informerFactory.Storage().V1().StorageClasses(),
|
||||
)
|
||||
|
||||
// set setPodInformer if provided.
|
||||
if setPodInformer {
|
||||
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
|
||||
|
@ -212,7 +228,6 @@ func initTestSchedulerWithOptions(
|
|||
context.informerFactory.Start(context.schedulerConfig.StopEverything)
|
||||
context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything)
|
||||
|
||||
context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
|
||||
context.scheduler.Run()
|
||||
return context
|
||||
}
|
||||
|
|
|
@ -74,6 +74,20 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
|
|||
config.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
|
||||
|
||||
sched := scheduler.NewFromConfig(config)
|
||||
scheduler.AddAllEventHandlers(sched,
|
||||
v1.DefaultSchedulerName,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Apps().V1().ReplicaSets(),
|
||||
informerFactory.Apps().V1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
)
|
||||
|
||||
informerFactory.Start(stopCh)
|
||||
sched.Run()
|
||||
|
||||
|
|
Loading…
Reference in New Issue