Move informer event handlers to Scheduler

pull/564/head
Mayank Kumar 2018-10-01 14:57:17 -07:00
parent d57d606275
commit e0a7d96632
13 changed files with 856 additions and 690 deletions

View File

@ -3,12 +3,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"eventhandlers.go",
"scheduler.go",
"testutil.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
@ -22,8 +24,10 @@ go_library(
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
@ -32,6 +36,7 @@ go_library(
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
@ -39,7 +44,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["scheduler_test.go"],
srcs = [
"eventhandlers_test.go",
"scheduler_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
@ -53,6 +61,7 @@ go_test(
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -88,6 +97,7 @@ filegroup(
"//pkg/scheduler/api:all-srcs",
"//pkg/scheduler/apis/config:all-srcs",
"//pkg/scheduler/core:all-srcs",
"//pkg/scheduler/equivalence:all-srcs",
"//pkg/scheduler/factory:all-srcs",
"//pkg/scheduler/internal/cache:all-srcs",
"//pkg/scheduler/internal/queue:all-srcs",

View File

@ -0,0 +1,13 @@
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,481 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"k8s.io/klog"
"reflect"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
storageinformers "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/tools/cache"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
)
func (sched *Scheduler) onPvAdd(obj interface{}) {
// Pods created when there are no PVs available will be stuck in
// unschedulable queue. But unbound PVs created for static provisioning and
// delay binding storage class are skipped in PV controller dynamic
// provisiong and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onPvcAdd(obj interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
sc, ok := obj.(*storagev1.StorageClass)
if !ok {
klog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj)
return
}
// CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
// PVCs have specified StorageClass name, creating StorageClass objects
// with late binding will cause predicates to pass, so we need to move pods
// to active queue.
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
}
func (sched *Scheduler) onServiceAdd(obj interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onServiceDelete(obj interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
klog.Errorf("cannot convert to *v1.Node: %v", obj)
return
}
if err := sched.config.SchedulerCache.AddNode(node); err != nil {
klog.Errorf("scheduler cache AddNode failed: %v", err)
}
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
oldNode, ok := oldObj.(*v1.Node)
if !ok {
klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj)
return
}
newNode, ok := newObj.(*v1.Node)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj)
return
}
if err := sched.config.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
klog.Errorf("scheduler cache UpdateNode failed: %v", err)
}
// Only activate unschedulable pods if the node became more schedulable.
// We skip the node property comparison when there is no unschedulable pods in the queue
// to save processing cycles. We still trigger a move to active queue to cover the case
// that a pod being processed by the scheduler is determined unschedulable. We want this
// pod to be reevaluated when a change in the cluster happens.
if sched.config.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
}
func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
node = t
case cache.DeletedFinalStateUnknown:
var ok bool
node, ok = t.Obj.(*v1.Node)
if !ok {
klog.Errorf("cannot convert to *v1.Node: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.Node: %v", t)
return
}
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.RemoveNode(node); err != nil {
klog.Errorf("scheduler cache RemoveNode failed: %v", err)
}
}
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}
func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
pod := newObj.(*v1.Pod)
if sched.skipPodUpdate(pod) {
return
}
if err := sched.config.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
}
}
func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return
}
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return
}
if err := sched.config.SchedulingQueue.Delete(pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
}
if sched.config.VolumeBinder != nil {
// Volume binder only wants to keep unassigned pods
sched.config.VolumeBinder.DeletePodBindings(pod)
}
}
func (sched *Scheduler) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", obj)
return
}
if err := sched.config.SchedulerCache.AddPod(pod); err != nil {
klog.Errorf("scheduler cache AddPod failed: %v", err)
}
sched.config.SchedulingQueue.AssignedPodAdded(pod)
}
func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
return
}
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
klog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
sched.config.SchedulingQueue.AssignedPodUpdated(newPod)
}
func (sched *Scheduler) deletePodFromCache(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.Pod: %v", t)
return
}
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.RemovePod(pod); err != nil {
klog.Errorf("scheduler cache RemovePod failed: %v", err)
}
sched.config.SchedulingQueue.MoveAllToActiveQueue()
}
// assignedPod selects pods that are assigned (scheduled and running).
func assignedPod(pod *v1.Pod) bool {
return len(pod.Spec.NodeName) != 0
}
// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Spec.SchedulerName
}
// isZoneRegionLabel check if given key of label is zone or region label.
func isZoneRegionLabel(k string) bool {
return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion
}
// skipPodUpdate checks whether the specified pod update should be ignored.
// This function will return true if
// - The pod has already been assumed, AND
// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
// updated.
func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
// Non-assumed pods should never be skipped.
isAssumed, err := sched.config.SchedulerCache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}
if !isAssumed {
return false
}
// Gets the assumed pod from the cache.
assumedPod, err := sched.config.SchedulerCache.GetPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
return false
}
// Compares the assumed pod in the cache with the pod update. If they are
// equal (with certain fields excluded), this pod update will be skipped.
f := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
// ResourceVersion must be excluded because each object update will
// have a new resource version.
p.ResourceVersion = ""
// Spec.NodeName must be excluded because the pod assumed in the cache
// is expected to have a node assigned while the pod update may nor may
// not have this field set.
p.Spec.NodeName = ""
// Annotations must be excluded for the reasons described in
// https://github.com/kubernetes/kubernetes/issues/52914.
p.Annotations = nil
return p
}
assumedPodCopy, podCopy := f(assumedPod), f(pod)
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
return false
}
klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
return true
}
// AddAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func AddAllEventHandlers(
sched *Scheduler,
schedulerName string,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
) {
// scheduled pod cache
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToCache,
UpdateFunc: sched.updatePodInCache,
DeleteFunc: sched.deletePodFromCache,
},
},
)
// unscheduled pod queue
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t, schedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue,
UpdateFunc: sched.updatePodInSchedulingQueue,
DeleteFunc: sched.deletePodFromSchedulingQueue,
},
},
)
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache,
UpdateFunc: sched.updateNodeInCache,
DeleteFunc: sched.deleteNodeFromCache,
},
)
// On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume
pvInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: sched.onPvAdd,
UpdateFunc: sched.onPvUpdate,
},
)
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
pvcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onPvcAdd,
UpdateFunc: sched.onPvcUpdate,
},
)
// This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onServiceAdd,
UpdateFunc: sched.onServiceUpdate,
DeleteFunc: sched.onServiceDelete,
},
)
storageClassInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd,
},
)
}
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return true
}
if nodeAllocatableChanged(newNode, oldNode) {
return true
}
if nodeLabelsChanged(newNode, oldNode) {
return true
}
if nodeTaintsChanged(newNode, oldNode) {
return true
}
if nodeConditionsChanged(newNode, oldNode) {
return true
}
return false
}
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
}
func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
}
func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
}
func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
for i := range conditions {
conditionStatuses[conditions[i].Type] = conditions[i].Status
}
return conditionStatuses
}
return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
}
func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
}

View File

@ -0,0 +1,265 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/factory"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
)
func TestSkipPodUpdate(t *testing.T) {
table := []struct {
pod *v1.Pod
isAssumedPodFunc func(*v1.Pod) bool
getPodFunc func(*v1.Pod) *v1.Pod
expected bool
name string
}{
{
name: "Non-assumed pod",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool { return false },
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
}
},
expected: false,
},
{
name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "b"},
ResourceVersion: "0",
},
Spec: v1.PodSpec{
NodeName: "node-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"c": "d"},
ResourceVersion: "1",
},
Spec: v1.PodSpec{
NodeName: "node-1",
},
}
},
expected: true,
},
{
name: "with changes on Labels",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"a": "b"},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"c": "d"},
},
}
},
expected: false,
},
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
c := NewFromConfig(&factory.Config{
SchedulerCache: &fakecache.Cache{
IsAssumedPodFunc: test.isAssumedPodFunc,
GetPodFunc: test.getPodFunc,
},
},
)
got := c.skipPodUpdate(test.pod)
if got != test.expected {
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)
}
})
}
}
func TestNodeAllocatableChanged(t *testing.T) {
newQuantity := func(value int64) resource.Quantity {
return *resource.NewQuantity(value, resource.BinarySI)
}
for _, c := range []struct {
Name string
Changed bool
OldAllocatable v1.ResourceList
NewAllocatable v1.ResourceList
}{
{
Name: "no allocatable resources changed",
Changed: false,
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
},
{
Name: "new node has more allocatable resources",
Changed: true,
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)},
},
} {
oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}}
newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}}
changed := nodeAllocatableChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed)
}
}
}
func TestNodeLabelsChanged(t *testing.T) {
for _, c := range []struct {
Name string
Changed bool
OldLabels map[string]string
NewLabels map[string]string
}{
{
Name: "no labels changed",
Changed: false,
OldLabels: map[string]string{"foo": "bar"},
NewLabels: map[string]string{"foo": "bar"},
},
// Labels changed.
{
Name: "new node has more labels",
Changed: true,
OldLabels: map[string]string{"foo": "bar"},
NewLabels: map[string]string{"foo": "bar", "test": "value"},
},
} {
oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}}
newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}}
changed := nodeLabelsChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
}
}
}
func TestNodeTaintsChanged(t *testing.T) {
for _, c := range []struct {
Name string
Changed bool
OldTaints []v1.Taint
NewTaints []v1.Taint
}{
{
Name: "no taint changed",
Changed: false,
OldTaints: []v1.Taint{{Key: "key", Value: "value"}},
NewTaints: []v1.Taint{{Key: "key", Value: "value"}},
},
{
Name: "taint value changed",
Changed: true,
OldTaints: []v1.Taint{{Key: "key", Value: "value1"}},
NewTaints: []v1.Taint{{Key: "key", Value: "value2"}},
},
} {
oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}}
newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}}
changed := nodeTaintsChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed)
}
}
}
func TestNodeConditionsChanged(t *testing.T) {
nodeConditionType := reflect.TypeOf(v1.NodeCondition{})
if nodeConditionType.NumField() != 6 {
t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.")
}
for _, c := range []struct {
Name string
Changed bool
OldConditions []v1.NodeCondition
NewConditions []v1.NodeCondition
}{
{
Name: "no condition changed",
Changed: false,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
},
{
Name: "only LastHeartbeatTime changed",
Changed: false,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}},
},
{
Name: "new node has more healthy conditions",
Changed: true,
OldConditions: []v1.NodeCondition{},
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
},
{
Name: "new node has less unhealthy conditions",
Changed: true,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
NewConditions: []v1.NodeCondition{},
},
{
Name: "condition status changed",
Changed: true,
OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
},
} {
oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}}
newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}}
changed := nodeConditionsChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
}
}
}

View File

@ -10,7 +10,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
@ -25,7 +24,6 @@ go_library(
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
@ -65,12 +63,10 @@ go_test(
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",

View File

@ -20,11 +20,9 @@ package factory
import (
"fmt"
"reflect"
"time"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -47,7 +45,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
@ -68,14 +65,6 @@ const (
maximalGetBackoff = time.Minute
)
var (
serviceAffinitySet = sets.NewString(predicates.CheckServiceAffinityPred)
matchInterPodAffinitySet = sets.NewString(predicates.MatchInterPodAffinityPred)
generalPredicatesSets = sets.NewString(predicates.GeneralPred)
noDiskConflictSet = sets.NewString(predicates.NoDiskConflictPred)
maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred, predicates.MaxCinderVolumeCountPred}
)
// Binder knows how to write a binding.
type Binder interface {
Bind(binding *v1.Binding) error
@ -172,8 +161,6 @@ type Configurator interface {
// configFactory is the default implementation of the scheduler.Configurator interface.
type configFactory struct {
client clientset.Interface
// queue for pods that need scheduling
podQueue internalqueue.SchedulingQueue
// a means to list all known scheduled pods.
scheduledPodLister corelisters.PodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled.
@ -226,6 +213,10 @@ type configFactory struct {
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
percentageOfNodesToScore int32
bindTimeoutSeconds int64
// queue for pods that need scheduling
podQueue internalqueue.SchedulingQueue
}
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
@ -282,105 +273,15 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
bindTimeoutSeconds: args.BindTimeoutSeconds,
}
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// scheduled pod cache
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering scheduledPod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object for filtering scheduledPod in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)
// unscheduled pod queue
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering unscheduledPod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object for filtering unscheduledPod in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToSchedulingQueue,
UpdateFunc: c.updatePodInSchedulingQueue,
DeleteFunc: c.deletePodFromSchedulingQueue,
},
},
)
// ScheduledPodLister is something we provide to plug-in functions that
// they may need to call.
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
args.NodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
)
args.PvInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: c.onPvAdd,
UpdateFunc: c.onPvUpdate,
},
)
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
args.PvcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onPvcAdd,
UpdateFunc: c.onPvcUpdate,
},
)
// This is for ServiceAffinity: affected by the selector of the service is updated.
args.ServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onServiceAdd,
UpdateFunc: c.onServiceUpdate,
DeleteFunc: c.onServiceDelete,
},
)
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
args.StorageClassInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onStorageClassAdd,
},
)
// Setup cache debugger
debugger := cachedebugger.New(
args.NodeInformer.Lister(),
@ -394,116 +295,9 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
<-c.StopEverything
c.podQueue.Close()
}()
return c
}
// skipPodUpdate checks whether the specified pod update should be ignored.
// This function will return true if
// - The pod has already been assumed, AND
// - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations
// updated.
func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool {
// Non-assumed pods should never be skipped.
isAssumed, err := c.schedulerCache.IsAssumedPod(pod)
if err != nil {
runtime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}
if !isAssumed {
return false
}
// Gets the assumed pod from the cache.
assumedPod, err := c.schedulerCache.GetPod(pod)
if err != nil {
runtime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
return false
}
// Compares the assumed pod in the cache with the pod update. If they are
// equal (with certain fields excluded), this pod update will be skipped.
f := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
// ResourceVersion must be excluded because each object update will
// have a new resource version.
p.ResourceVersion = ""
// Spec.NodeName must be excluded because the pod assumed in the cache
// is expected to have a node assigned while the pod update may nor may
// not have this field set.
p.Spec.NodeName = ""
// Annotations must be excluded for the reasons described in
// https://github.com/kubernetes/kubernetes/issues/52914.
p.Annotations = nil
return p
}
assumedPodCopy, podCopy := f(assumedPod), f(pod)
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
return false
}
klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
return true
}
func (c *configFactory) onPvAdd(obj interface{}) {
// Pods created when there are no PVs available will be stuck in
// unschedulable queue. But unbound PVs created for static provisioning and
// delay binding storage class are skipped in PV controller dynamic
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onPvUpdate(old, new interface{}) {
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
c.podQueue.MoveAllToActiveQueue()
}
// isZoneRegionLabel check if given key of label is zone or region label.
func isZoneRegionLabel(k string) bool {
return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion
}
func (c *configFactory) onPvcAdd(obj interface{}) {
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onPvcUpdate(old, new interface{}) {
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onStorageClassAdd(obj interface{}) {
sc, ok := obj.(*storagev1.StorageClass)
if !ok {
klog.Errorf("cannot convert to *storagev1.StorageClass for storageClassAdd: %v", obj)
return
}
// CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
// PVCs have specified StorageClass name, creating StorageClass objects
// with late binding will cause predicates to pass, so we need to move pods
// to active queue.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
c.podQueue.MoveAllToActiveQueue()
}
}
func (c *configFactory) onServiceAdd(obj interface{}) {
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onServiceDelete(obj interface{}) {
c.podQueue.MoveAllToActiveQueue()
}
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
func (c *configFactory) GetNodeLister() corelisters.NodeLister {
return c.nodeLister
@ -527,213 +321,6 @@ func (c *configFactory) GetScheduledPodLister() corelisters.PodLister {
return c.scheduledPodLister
}
func (c *configFactory) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", obj)
return
}
if err := c.schedulerCache.AddPod(pod); err != nil {
klog.Errorf("scheduler cache AddPod failed: %v", err)
}
c.podQueue.AssignedPodAdded(pod)
}
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
return
}
if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
klog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
c.podQueue.AssignedPodUpdated(newPod)
}
func (c *configFactory) addPodToSchedulingQueue(obj interface{}) {
if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil {
runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}
func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
pod := newObj.(*v1.Pod)
if c.skipPodUpdate(pod) {
return
}
if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
}
}
func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return
}
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return
}
if err := c.podQueue.Delete(pod); err != nil {
runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
}
if c.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods
c.volumeBinder.DeletePodBindings(pod)
}
}
func (c *configFactory) deletePodFromCache(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert DeletedFinalStateUnknown obj to *v1.Pod: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.Pod: %v", t)
return
}
if err := c.schedulerCache.RemovePod(pod); err != nil {
klog.Errorf("scheduler cache RemovePod failed: %v", err)
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) addNodeToCache(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
klog.Errorf("cannot convert to *v1.Node: %v", obj)
return
}
if err := c.schedulerCache.AddNode(node); err != nil {
klog.Errorf("scheduler cache AddNode failed: %v", err)
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
oldNode, ok := oldObj.(*v1.Node)
if !ok {
klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj)
return
}
newNode, ok := newObj.(*v1.Node)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj)
return
}
if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
klog.Errorf("scheduler cache UpdateNode failed: %v", err)
}
// Only activate unschedulable pods if the node became more schedulable.
// We skip the node property comparison when there is no unschedulable pods in the queue
// to save processing cycles. We still trigger a move to active queue to cover the case
// that a pod being processed by the scheduler is determined unschedulable. We want this
// pod to be reevaluated when a change in the cluster happens.
if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
c.podQueue.MoveAllToActiveQueue()
}
}
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return true
}
if nodeAllocatableChanged(newNode, oldNode) {
return true
}
if nodeLabelsChanged(newNode, oldNode) {
return true
}
if nodeTaintsChanged(newNode, oldNode) {
return true
}
if nodeConditionsChanged(newNode, oldNode) {
return true
}
return false
}
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
}
func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
}
func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
}
func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
for i := range conditions {
conditionStatuses[conditions[i].Type] = conditions[i].Status
}
return conditionStatuses
}
return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
}
func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
}
func (c *configFactory) deleteNodeFromCache(obj interface{}) {
var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
node = t
case cache.DeletedFinalStateUnknown:
var ok bool
node, ok = t.Obj.(*v1.Node)
if !ok {
klog.Errorf("cannot convert DeletedFinalStateUnknown obj to *v1.Node: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.Node: %v", t)
return
}
if err := c.schedulerCache.RemoveNode(node); err != nil {
klog.Errorf("scheduler cache RemoveNode failed: %v", err)
}
}
// Create creates a scheduler with the default algorithm provider.
func (c *configFactory) Create() (*Config, error) {
return c.CreateFromProvider(DefaultProvider)
@ -968,16 +555,6 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
}, nil
}
// assignedPod selects pods that are assigned (scheduled and running).
func assignedPod(pod *v1.Pod) bool {
return len(pod.Spec.NodeName) != 0
}
// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Spec.SchedulerName
}
// assignedPodLister filters the pods returned from a PodLister to
// only include those that have a node name set.
type assignedPodLister struct {

View File

@ -24,7 +24,6 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -40,7 +39,6 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/util"
@ -425,98 +423,6 @@ func TestInvalidFactoryArgs(t *testing.T) {
}
func TestSkipPodUpdate(t *testing.T) {
table := []struct {
pod *v1.Pod
isAssumedPodFunc func(*v1.Pod) bool
getPodFunc func(*v1.Pod) *v1.Pod
expected bool
name string
}{
{
name: "Non-assumed pod",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool { return false },
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
}
},
expected: false,
},
{
name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "b"},
ResourceVersion: "0",
},
Spec: v1.PodSpec{
NodeName: "node-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"c": "d"},
ResourceVersion: "1",
},
Spec: v1.PodSpec{
NodeName: "node-1",
},
}
},
expected: true,
},
{
name: "with changes on Labels",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"a": "b"},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"c": "d"},
},
}
},
expected: false,
},
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
c := &configFactory{
schedulerCache: &fakecache.Cache{
IsAssumedPodFunc: test.isAssumedPodFunc,
GetPodFunc: test.getPodFunc,
},
}
got := c.skipPodUpdate(test.pod)
if got != test.expected {
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)
}
})
}
}
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
@ -654,146 +560,3 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm
t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType)
}
}
func TestNodeAllocatableChanged(t *testing.T) {
newQuantity := func(value int64) resource.Quantity {
return *resource.NewQuantity(value, resource.BinarySI)
}
for _, c := range []struct {
Name string
Changed bool
OldAllocatable v1.ResourceList
NewAllocatable v1.ResourceList
}{
{
Name: "no allocatable resources changed",
Changed: false,
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
},
{
Name: "new node has more allocatable resources",
Changed: true,
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)},
},
} {
oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}}
newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}}
changed := nodeAllocatableChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed)
}
}
}
func TestNodeLabelsChanged(t *testing.T) {
for _, c := range []struct {
Name string
Changed bool
OldLabels map[string]string
NewLabels map[string]string
}{
{
Name: "no labels changed",
Changed: false,
OldLabels: map[string]string{"foo": "bar"},
NewLabels: map[string]string{"foo": "bar"},
},
// Labels changed.
{
Name: "new node has more labels",
Changed: true,
OldLabels: map[string]string{"foo": "bar"},
NewLabels: map[string]string{"foo": "bar", "test": "value"},
},
} {
oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}}
newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}}
changed := nodeLabelsChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
}
}
}
func TestNodeTaintsChanged(t *testing.T) {
for _, c := range []struct {
Name string
Changed bool
OldTaints []v1.Taint
NewTaints []v1.Taint
}{
{
Name: "no taint changed",
Changed: false,
OldTaints: []v1.Taint{{Key: "key", Value: "value"}},
NewTaints: []v1.Taint{{Key: "key", Value: "value"}},
},
{
Name: "taint value changed",
Changed: true,
OldTaints: []v1.Taint{{Key: "key", Value: "value1"}},
NewTaints: []v1.Taint{{Key: "key", Value: "value2"}},
},
} {
oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}}
newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}}
changed := nodeTaintsChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed)
}
}
}
func TestNodeConditionsChanged(t *testing.T) {
nodeConditionType := reflect.TypeOf(v1.NodeCondition{})
if nodeConditionType.NumField() != 6 {
t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.")
}
for _, c := range []struct {
Name string
Changed bool
OldConditions []v1.NodeCondition
NewConditions []v1.NodeCondition
}{
{
Name: "no condition changed",
Changed: false,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
},
{
Name: "only LastHeartbeatTime changed",
Changed: false,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}},
},
{
Name: "new node has more healthy conditions",
Changed: true,
OldConditions: []v1.NodeCondition{},
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
},
{
Name: "new node has less unhealthy conditions",
Changed: true,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
NewConditions: []v1.NodeCondition{},
},
{
Name: "condition status changed",
Changed: true,
OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
},
} {
oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}}
newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}}
changed := nodeConditionsChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
}
}
}

View File

@ -138,7 +138,6 @@ func New(client clientset.Interface,
for _, opt := range opts {
opt(&options)
}
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: options.schedulerName,
@ -193,8 +192,11 @@ func New(client clientset.Interface,
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
config.StopEverything = stopCh
// Create the scheduler.
sched := NewFromConfig(config)
AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, replicationControllerInformer, replicaSetInformer, statefulSetInformer, serviceInformer, pdbInformer, storageClassInformer)
return sched, nil
}

View File

@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -633,7 +634,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
algo := core.NewGenericScheduler(
scache,
nil,
internalqueue.NewSchedulingQueue(nil),
predicateMap,
predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{},
@ -645,7 +646,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
false,
api.DefaultPercentageOfNodesToScore)
api.DefaultPercentageOfNodesToScore,
)
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
@ -684,7 +686,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
algo := core.NewGenericScheduler(
scache,
nil,
internalqueue.NewSchedulingQueue(nil),
predicateMap,
predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{},
@ -696,7 +698,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
false,
api.DefaultPercentageOfNodesToScore)
api.DefaultPercentageOfNodesToScore,
)
bindingChan := make(chan *v1.Binding, 2)
sched := NewFromConfig(&factory.Config{

View File

@ -114,14 +114,29 @@ func setupScheduler(
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
DisablePreemption: false,
PercentageOfNodesToScore: 100,
StopCh: stopCh,
})
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
}
schedulerConfig.StopEverything = stopCh
// TODO: Replace NewFromConfig and AddAllEventHandlers with scheduler.New() in
// all test/integration tests.
sched := scheduler.NewFromConfig(schedulerConfig)
scheduler.AddAllEventHandlers(sched,
v1.DefaultSchedulerName,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
)
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
@ -132,8 +147,6 @@ func setupScheduler(
Interface: cs.CoreV1().Events(""),
})
sched := scheduler.NewFromConfig(schedulerConfig)
algorithmprovider.ApplyFeatureGates()
go sched.Run()
@ -513,12 +526,12 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go dc.Run(5, stopCh)
// Start Scheduler
setupScheduler(t, clientset, informers, stopCh)
informers.Start(stopCh)
go dc.Run(5, stopCh)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
_, err := dsClient.Create(ds)
@ -924,11 +937,11 @@ func TestTaintedNode(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go dc.Run(5, stopCh)
// Start Scheduler
setupScheduler(t, clientset, informers, stopCh)
informers.Start(stopCh)
go dc.Run(5, stopCh)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy

View File

@ -539,10 +539,24 @@ func TestMultiScheduler(t *testing.T) {
eventBroadcaster2 := record.NewBroadcaster()
schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler})
eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet2.CoreV1().Events("")})
go podInformer2.Informer().Run(stopCh)
informerFactory2.Start(stopCh)
sched2 := scheduler.NewFromConfig(schedulerConfig2)
scheduler.AddAllEventHandlers(sched2,
fooScheduler,
context.informerFactory.Core().V1().Nodes(),
podInformer2,
context.informerFactory.Core().V1().PersistentVolumes(),
context.informerFactory.Core().V1().PersistentVolumeClaims(),
context.informerFactory.Core().V1().ReplicationControllers(),
context.informerFactory.Apps().V1().ReplicaSets(),
context.informerFactory.Apps().V1().StatefulSets(),
context.informerFactory.Core().V1().Services(),
context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
context.informerFactory.Storage().V1().StorageClasses(),
)
go podInformer2.Informer().Run(stopCh)
informerFactory2.Start(stopCh)
sched2.Run()
// 6. **check point-2**:

View File

@ -189,6 +189,22 @@ func initTestSchedulerWithOptions(
// set DisablePreemption option
context.schedulerConfig.DisablePreemption = disablePreemption
context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
scheduler.AddAllEventHandlers(context.scheduler,
v1.DefaultSchedulerName,
context.informerFactory.Core().V1().Nodes(),
podInformer,
context.informerFactory.Core().V1().PersistentVolumes(),
context.informerFactory.Core().V1().PersistentVolumeClaims(),
context.informerFactory.Core().V1().ReplicationControllers(),
context.informerFactory.Apps().V1().ReplicaSets(),
context.informerFactory.Apps().V1().StatefulSets(),
context.informerFactory.Core().V1().Services(),
context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
context.informerFactory.Storage().V1().StorageClasses(),
)
// set setPodInformer if provided.
if setPodInformer {
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
@ -212,7 +228,6 @@ func initTestSchedulerWithOptions(
context.informerFactory.Start(context.schedulerConfig.StopEverything)
context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything)
context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
context.scheduler.Run()
return context
}

View File

@ -74,6 +74,20 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
config.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
sched := scheduler.NewFromConfig(config)
scheduler.AddAllEventHandlers(sched,
v1.DefaultSchedulerName,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
)
informerFactory.Start(stopCh)
sched.Run()