mirror of https://github.com/k3s-io/k3s
Merge pull request #71488 from bsalamat/queue-sort
Change sort function of the scheduling queue to avoid starvationpull/564/head
commit
82abbdc11a
|
@ -22,9 +22,11 @@ go_test(
|
||||||
srcs = ["scheduling_queue_test.go"],
|
srcs = ["scheduling_queue_test.go"],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
"//pkg/scheduler/util:go_default_library",
|
"//pkg/scheduler/util:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -206,10 +206,31 @@ type PriorityQueue struct {
|
||||||
// Making sure that PriorityQueue implements SchedulingQueue.
|
// Making sure that PriorityQueue implements SchedulingQueue.
|
||||||
var _ = SchedulingQueue(&PriorityQueue{})
|
var _ = SchedulingQueue(&PriorityQueue{})
|
||||||
|
|
||||||
|
// podTimeStamp returns pod's last schedule time or its creation time if the
|
||||||
|
// scheduler has never tried scheduling it.
|
||||||
|
func podTimestamp(pod *v1.Pod) *metav1.Time {
|
||||||
|
_, condition := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
|
||||||
|
if condition == nil {
|
||||||
|
return &pod.CreationTimestamp
|
||||||
|
}
|
||||||
|
return &condition.LastTransitionTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// activeQComp is the function used by the activeQ heap algorithm to sort pods.
|
||||||
|
// It sorts pods based on their priority. When priorities are equal, it uses
|
||||||
|
// podTimestamp.
|
||||||
|
func activeQComp(pod1, pod2 interface{}) bool {
|
||||||
|
p1 := pod1.(*v1.Pod)
|
||||||
|
p2 := pod2.(*v1.Pod)
|
||||||
|
prio1 := util.GetPodPriority(p1)
|
||||||
|
prio2 := util.GetPodPriority(p2)
|
||||||
|
return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2)))
|
||||||
|
}
|
||||||
|
|
||||||
// NewPriorityQueue creates a PriorityQueue object.
|
// NewPriorityQueue creates a PriorityQueue object.
|
||||||
func NewPriorityQueue() *PriorityQueue {
|
func NewPriorityQueue() *PriorityQueue {
|
||||||
pq := &PriorityQueue{
|
pq := &PriorityQueue{
|
||||||
activeQ: newHeap(cache.MetaNamespaceKeyFunc, util.HigherPriorityPod),
|
activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
|
||||||
unschedulableQ: newUnschedulablePodsMap(),
|
unschedulableQ: newUnschedulablePodsMap(),
|
||||||
nominatedPods: map[string][]*v1.Pod{},
|
nominatedPods: map[string][]*v1.Pod{},
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ import (
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -512,3 +514,56 @@ func TestSchedulingQueue_Close(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRecentlyTriedPodsGoBack tests that pods which are recently tried and are
|
||||||
|
// unschedulable go behind other pods with the same priority. This behavior
|
||||||
|
// ensures that an unschedulable pod does not block head of the queue when there
|
||||||
|
// are frequent events that move pods to the active queue.
|
||||||
|
func TestRecentlyTriedPodsGoBack(t *testing.T) {
|
||||||
|
q := NewPriorityQueue()
|
||||||
|
// Add a few pods to priority queue.
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
p := v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("test-pod-%v", i),
|
||||||
|
Namespace: "ns1",
|
||||||
|
UID: types.UID(fmt.Sprintf("tp00%v", i)),
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Priority: &highPriority,
|
||||||
|
},
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
NominatedNodeName: "node1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
q.Add(&p)
|
||||||
|
}
|
||||||
|
// Simulate a pod being popped by the scheduler, determined unschedulable, and
|
||||||
|
// then moved back to the active queue.
|
||||||
|
p1, err := q.Pop()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error while popping the head of the queue: %v", err)
|
||||||
|
}
|
||||||
|
// Update pod condition to unschedulable.
|
||||||
|
podutil.UpdatePodCondition(&p1.Status, &v1.PodCondition{
|
||||||
|
Type: v1.PodScheduled,
|
||||||
|
Status: v1.ConditionFalse,
|
||||||
|
Reason: v1.PodReasonUnschedulable,
|
||||||
|
Message: "fake scheduling failure",
|
||||||
|
})
|
||||||
|
// Put in the unschedulable queue.
|
||||||
|
q.AddUnschedulableIfNotPresent(p1)
|
||||||
|
// Move all unschedulable pods to the active queue.
|
||||||
|
q.MoveAllToActiveQueue()
|
||||||
|
// Simulation is over. Now let's pop all pods. The pod popped first should be
|
||||||
|
// the last one we pop here.
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
p, err := q.Pop()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error while popping pods from the queue: %v", err)
|
||||||
|
}
|
||||||
|
if (i == 4) != (p1 == p) {
|
||||||
|
t.Errorf("A pod tried before is not the last pod popped: i: %v, pod name: %v", i, p.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue