From de8cfdcd79259def9a7bd9f56dc6b984d8045ceb Mon Sep 17 00:00:00 2001 From: wangqingcan Date: Wed, 16 Jan 2019 12:08:19 +0800 Subject: [PATCH] add goroutine to move unschedulablepods to activeq regularly --- .../internal/queue/scheduling_queue.go | 25 ++++++++ .../internal/queue/scheduling_queue_test.go | 61 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 438e53bb48..4b009ec3e6 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -49,6 +49,10 @@ var ( queueClosed = "scheduling queue is closed" ) +// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval, +// the pod will be moved from unschedulableQ to activeQ. +const unschedulableQTimeInterval = 60 * time.Second + // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. // The interface follows a pattern similar to cache.FIFO and cache.Heap and // makes it easy to use those data structures as a SchedulingQueue. @@ -279,6 +283,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority // run starts the goroutine to pump from podBackoffQ to activeQ func (p *PriorityQueue) run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) + go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop) } // Add adds a pod to the active queue. It should be called only when a new pod @@ -443,6 +448,26 @@ func (p *PriorityQueue) flushBackoffQCompleted() { } } +// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the durationStayUnschedulableQ +// to activeQ. +func (p *PriorityQueue) flushUnschedulableQLeftover() { + p.lock.Lock() + defer p.lock.Unlock() + + var podsToMove []*v1.Pod + currentTime := p.clock.Now() + for _, pod := range p.unschedulableQ.pods { + lastScheduleTime := podTimestamp(pod) + if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval { + podsToMove = append(podsToMove, pod) + } + } + + if len(podsToMove) > 0 { + p.movePodsToActiveQueue(podsToMove) + } +} + // Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It also // clears receivedMoveRequest to mark the beginning of a new scheduling cycle. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 4ae72d16c6..a7fcd5881e 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -21,6 +21,7 @@ import ( "reflect" "sync" "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -837,3 +838,63 @@ func TestHighProirotyBackoff(t *testing.T) { t.Errorf("Expected to get mid prority pod, got: %v", p) } } + +// TestHighProirotyFlushUnschedulableQLeftover tests that pods will be moved to +// activeQ after one minutes if it is in unschedulableQ +func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) { + q := NewPriorityQueue(nil) + midPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-midpod", + Namespace: "ns1", + UID: types.UID("tp-mid"), + }, + Spec: v1.PodSpec{ + Priority: &midPriority, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + highPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-highpod", + Namespace: "ns1", + UID: types.UID("tp-high"), + }, + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + + q.unschedulableQ.addOrUpdate(&highPod) + q.unschedulableQ.addOrUpdate(&midPod) + + // Update pod condition to highPod. + podutil.UpdatePodCondition(&highPod.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + LastProbeTime: metav1.Time{Time: time.Now().Add(-1 * unschedulableQTimeInterval)}, + }) + + // Update pod condition to midPod. + podutil.UpdatePodCondition(&midPod.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + LastProbeTime: metav1.Time{Time: time.Now().Add(-1 * unschedulableQTimeInterval)}, + }) + + if p, err := q.Pop(); err != nil || p != &highPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + } + if p, err := q.Pop(); err != nil || p != &midPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + } +}