diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 7be893b060..f7332ccdca 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -73,6 +73,7 @@ go_test( "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index aa420bcb17..6242f3b485 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -992,7 +992,14 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { } c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode) - c.podQueue.MoveAllToActiveQueue() + // Only activate unschedulable pods if the node became more schedulable. + // We skip the node property comparison when there is no unschedulable pods in the queue + // to save processing cycles. We still trigger a move to active queue to cover the case + // that a pod being processed by the scheduler is determined unschedulable. We want this + // pod to be reevaluated when a change in the cluster happens. + if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) { + c.podQueue.MoveAllToActiveQueue() + } } func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { @@ -1064,6 +1071,53 @@ func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, } } +func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool { + if nodeSpecUnschedulableChanged(newNode, oldNode) { + return true + } + if nodeAllocatableChanged(newNode, oldNode) { + return true + } + if nodeLabelsChanged(newNode, oldNode) { + return true + } + if nodeTaintsChanged(newNode, oldNode) { + return true + } + if nodeConditionsChanged(newNode, oldNode) { + return true + } + + return false +} + +func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool { + return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) +} + +func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool { + return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) +} + +func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool { + return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints) +} + +func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool { + strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus { + conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions)) + for i := range conditions { + conditionStatuses[conditions[i].Type] = conditions[i].Status + } + return conditionStatuses + } + return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions)) +} + +func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool { + return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false +} + func (c *configFactory) deleteNodeFromCache(obj interface{}) { var node *v1.Node switch t := obj.(type) { diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index aa68c716fc..080930e22c 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -657,3 +658,146 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType) } } + +func TestNodeAllocatableChanged(t *testing.T) { + newQuantity := func(value int64) resource.Quantity { + return *resource.NewQuantity(value, resource.BinarySI) + } + for _, c := range []struct { + Name string + Changed bool + OldAllocatable v1.ResourceList + NewAllocatable v1.ResourceList + }{ + { + Name: "no allocatable resources changed", + Changed: false, + OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, + NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, + }, + { + Name: "new node has more allocatable resources", + Changed: true, + OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)}, + NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)}, + }, + } { + oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}} + newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}} + changed := nodeAllocatableChanged(newNode, oldNode) + if changed != c.Changed { + t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed) + } + } +} + +func TestNodeLabelsChanged(t *testing.T) { + for _, c := range []struct { + Name string + Changed bool + OldLabels map[string]string + NewLabels map[string]string + }{ + { + Name: "no labels changed", + Changed: false, + OldLabels: map[string]string{"foo": "bar"}, + NewLabels: map[string]string{"foo": "bar"}, + }, + // Labels changed. + { + Name: "new node has more labels", + Changed: true, + OldLabels: map[string]string{"foo": "bar"}, + NewLabels: map[string]string{"foo": "bar", "test": "value"}, + }, + } { + oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}} + newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}} + changed := nodeLabelsChanged(newNode, oldNode) + if changed != c.Changed { + t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed) + } + } +} + +func TestNodeTaintsChanged(t *testing.T) { + for _, c := range []struct { + Name string + Changed bool + OldTaints []v1.Taint + NewTaints []v1.Taint + }{ + { + Name: "no taint changed", + Changed: false, + OldTaints: []v1.Taint{{Key: "key", Value: "value"}}, + NewTaints: []v1.Taint{{Key: "key", Value: "value"}}, + }, + { + Name: "taint value changed", + Changed: true, + OldTaints: []v1.Taint{{Key: "key", Value: "value1"}}, + NewTaints: []v1.Taint{{Key: "key", Value: "value2"}}, + }, + } { + oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}} + newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}} + changed := nodeTaintsChanged(newNode, oldNode) + if changed != c.Changed { + t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed) + } + } +} + +func TestNodeConditionsChanged(t *testing.T) { + nodeConditionType := reflect.TypeOf(v1.NodeCondition{}) + if nodeConditionType.NumField() != 6 { + t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.") + } + + for _, c := range []struct { + Name string + Changed bool + OldConditions []v1.NodeCondition + NewConditions []v1.NodeCondition + }{ + { + Name: "no condition changed", + Changed: false, + OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, + NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, + }, + { + Name: "only LastHeartbeatTime changed", + Changed: false, + OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}}, + NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}}, + }, + { + Name: "new node has more healthy conditions", + Changed: true, + OldConditions: []v1.NodeCondition{}, + NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}, + }, + { + Name: "new node has less unhealthy conditions", + Changed: true, + OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}, + NewConditions: []v1.NodeCondition{}, + }, + { + Name: "condition status changed", + Changed: true, + OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}, + NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}, + }, + } { + oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}} + newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}} + changed := nodeConditionsChanged(newNode, oldNode) + if changed != c.Changed { + t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed) + } + } +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 9bf40361b4..d56eb4adbf 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -69,6 +69,8 @@ type SchedulingQueue interface { Close() // DeleteNominatedPodIfExists deletes nominatedPod from internal cache DeleteNominatedPodIfExists(pod *v1.Pod) + // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. + NumUnschedulablePods() int } // NewSchedulingQueue initializes a new scheduling queue. If pod priority is @@ -162,6 +164,11 @@ func (f *FIFO) Close() { // DeleteNominatedPodIfExists does nothing in FIFO. func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {} +// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. +func (f *FIFO) NumUnschedulablePods() int { + return 0 +} + // NewFIFO creates a FIFO object. func NewFIFO() *FIFO { return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} @@ -551,6 +558,13 @@ func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) { p.lock.Unlock() } +// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. +func (p *PriorityQueue) NumUnschedulablePods() int { + p.lock.RLock() + defer p.lock.RUnlock() + return len(p.unschedulableQ.pods) +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct {