diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index b74a8aa9b6..ca1f0e867f 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -1085,6 +1085,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff, } backoff.Gc() + podSchedulingCycle := podQueue.SchedulingCycle() // Retry asynchronously. // Note that this is extremely rudimentary and we need a more real error handling path. go func() { @@ -1110,7 +1111,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff, pod, err := client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{}) if err == nil { if len(pod.Spec.NodeName) == 0 { - podQueue.AddUnschedulableIfNotPresent(pod) + podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle) } break } diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 16756abf62..bc5130d481 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -59,7 +59,14 @@ const unschedulableQTimeInterval = 60 * time.Second type SchedulingQueue interface { Add(pod *v1.Pod) error AddIfNotPresent(pod *v1.Pod) error - AddUnschedulableIfNotPresent(pod *v1.Pod) error + // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. + // The podSchedulingCycle represents the current scheduling cycle number which can be + // returned by calling SchedulingCycle(). + AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error + // SchedulingCycle returns the current number of scheduling cycle which is + // cached by scheduling queue. Normally, incrementing this number whenever + // a pod is popped (e.g. called Pop()) is enough. + SchedulingCycle() int64 // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. Pop() (*v1.Pod, error) @@ -111,10 +118,15 @@ func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error { // AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In // FIFO it is added to the end of the queue. -func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error { +func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error { return f.FIFO.AddIfNotPresent(pod) } +// SchedulingCycle implements SchedulingQueue.SchedulingCycle interface. +func (f *FIFO) SchedulingCycle() int64 { + return 0 +} + // Update updates a pod in the FIFO. func (f *FIFO) Update(oldPod, newPod *v1.Pod) error { return f.FIFO.Update(newPod) @@ -218,12 +230,14 @@ type PriorityQueue struct { // nominatedPods is a structures that stores pods which are nominated to run // on nodes. nominatedPods *nominatedPodMap - // receivedMoveRequest is set to true whenever we receive a request to move a - // pod from the unschedulableQ to the activeQ, and is set to false, when we pop - // a pod from the activeQ. It indicates if we received a move request when a - // pod was in flight (we were trying to schedule it). In such a case, we put - // the pod back into the activeQ if it is determined unschedulable. - receivedMoveRequest bool + // schedulingCycle represents sequence number of scheduling cycle and is incremented + // when a pod is popped. + schedulingCycle int64 + // moveRequestCycle caches the sequence number of scheduling cycle when we + // received a move request. Unscheduable pods in and before this scheduling + // cycle will be put back to activeQueue if we were trying to schedule them + // when we received move request. + moveRequestCycle int64 // closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. @@ -265,12 +279,13 @@ func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue { // NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time. func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *PriorityQueue { pq := &PriorityQueue{ - clock: clock, - stop: stop, - podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock), - activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp), - unschedulableQ: newUnschedulablePodsMap(), - nominatedPods: newNominatedPodMap(), + clock: clock, + stop: stop, + podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock), + activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp), + unschedulableQ: newUnschedulablePodsMap(), + nominatedPods: newNominatedPodMap(), + moveRequestCycle: -1, } pq.cond.L = &pq.lock pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted) @@ -372,12 +387,19 @@ func (p *PriorityQueue) backoffPod(pod *v1.Pod) { } } +// SchedulingCycle returns current scheduling cycle. +func (p *PriorityQueue) SchedulingCycle() int64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.schedulingCycle +} + // AddUnschedulableIfNotPresent does nothing if the pod is present in any // queue. If pod is unschedulable, it adds pod to unschedulable queue if -// p.receivedMoveRequest is false or to backoff queue if p.receivedMoveRequest -// is true but pod is subject to backoff. In other cases, it adds pod to active -// queue and clears p.receivedMoveRequest. -func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { +// p.moveRequestCycle > podSchedulingCycle or to backoff queue if p.moveRequestCycle +// <= podSchedulingCycle but pod is subject to backoff. In other cases, it adds pod to +// active queue. +func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error { p.lock.Lock() defer p.lock.Unlock() if p.unschedulableQ.get(pod) != nil { @@ -389,7 +411,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { if _, exists, _ := p.podBackoffQ.Get(pod); exists { return fmt.Errorf("pod is already present in the backoffQ") } - if !p.receivedMoveRequest && isPodUnschedulable(pod) { + if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) { p.backoffPod(pod) p.unschedulableQ.addOrUpdate(pod) p.nominatedPods.add(pod, "") @@ -412,7 +434,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { p.nominatedPods.add(pod, "") p.cond.Broadcast() } - p.receivedMoveRequest = false return err } @@ -470,7 +491,8 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() { } // Pop removes the head of the active queue and returns it. It blocks if the -// activeQ is empty and waits until a new item is added to the queue. +// activeQ is empty and waits until a new item is added to the queue. It +// increments scheduling cycle when a pod is popped. func (p *PriorityQueue) Pop() (*v1.Pod, error) { p.lock.Lock() defer p.lock.Unlock() @@ -488,6 +510,7 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) { return nil, err } pod := obj.(*v1.Pod) + p.schedulingCycle++ return pod, err } @@ -608,7 +631,7 @@ func (p *PriorityQueue) MoveAllToActiveQueue() { } } p.unschedulableQ.clear() - p.receivedMoveRequest = true + p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() } @@ -626,7 +649,7 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { } p.unschedulableQ.delete(pod) } - p.receivedMoveRequest = true + p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 15dca6bf2c..f0ca6751fe 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -179,9 +179,9 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { q := NewPriorityQueue(nil) q.Add(&highPriNominatedPod) - q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything. - q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ. - q.AddUnschedulableIfNotPresent(&unschedulablePod) + q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything. + q.AddUnschedulableIfNotPresent(&medPriorityPod, q.SchedulingCycle()) // This should go to activeQ. + q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) expectedNominatedPods := &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ medPriorityPod.UID: "node1", @@ -209,6 +209,78 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { } } +// TestPriorityQueue_AddUnschedulableIfNotPresent_Async tests scenario when +// AddUnschedulableIfNotPresent is called asynchronously pods in and before +// current scheduling cycle will be put back to activeQueue if we were trying +// to schedule them when we received move request. +func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) { + q := NewPriorityQueue(nil) + totalNum := 10 + expectedPods := make([]v1.Pod, 0, totalNum) + for i := 0; i < totalNum; i++ { + priority := int32(i) + p := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod%d", i), + Namespace: fmt.Sprintf("ns%d", i), + UID: types.UID(fmt.Sprintf("upns%d", i)), + }, + Spec: v1.PodSpec{ + Priority: &priority, + }, + } + expectedPods = append(expectedPods, p) + // priority is to make pods ordered in the PriorityQueue + q.Add(&p) + } + + // Pop all pods except for the first one + for i := totalNum - 1; i > 0; i-- { + p, _ := q.Pop() + if !reflect.DeepEqual(&expectedPods[i], p) { + t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[i], p) + } + } + + // move all pods to active queue when we were trying to schedule them + q.MoveAllToActiveQueue() + moveReqChan := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(totalNum - 1) + // mark pods[1] ~ pods[totalNum-1] as unschedulable, fire goroutines to add them back later + for i := 1; i < totalNum; i++ { + unschedulablePod := expectedPods[i].DeepCopy() + unschedulablePod.Status = v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + }, + }, + } + cycle := q.SchedulingCycle() + go func() { + <-moveReqChan + q.AddUnschedulableIfNotPresent(unschedulablePod, cycle) + wg.Done() + }() + } + firstPod, _ := q.Pop() + if !reflect.DeepEqual(&expectedPods[0], firstPod) { + t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod) + } + // close moveReqChan here to make sure q.AddUnschedulableIfNotPresent is called after another pod is popped + close(moveReqChan) + wg.Wait() + // all other pods should be in active queue again + for i := 1; i < totalNum; i++ { + if _, exists, _ := q.activeQ.Get(&expectedPods[i]); !exists { + t.Errorf("Expected %v to be added to activeQ.", expectedPods[i].Name) + } + } +} + func TestPriorityQueue_Pop(t *testing.T) { q := NewPriorityQueue(nil) wg := sync.WaitGroup{} @@ -680,7 +752,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { LastProbeTime: metav1.Now(), }) // Put in the unschedulable queue. - q.AddUnschedulableIfNotPresent(p1) + q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle()) // Move all unschedulable pods to the active queue. q.MoveAllToActiveQueue() // Simulation is over. Now let's pop all pods. The pod popped first should be @@ -728,7 +800,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { LastProbeTime: metav1.Now(), }) // Put in the unschedulable queue - q.AddUnschedulableIfNotPresent(&unschedulablePod) + q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) // Clear its backoff to simulate backoff its expiration q.clearPodBackoff(&unschedulablePod) // Move all unschedulable pods to the active queue. @@ -771,7 +843,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { LastProbeTime: metav1.Now(), }) // And then, put unschedulable pod to the unschedulable queue - q.AddUnschedulableIfNotPresent(&unschedulablePod) + q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) // Clear its backoff to simulate its backoff expiration q.clearPodBackoff(&unschedulablePod) // Move all unschedulable pods to the active queue. @@ -838,7 +910,7 @@ func TestHighProirotyBackoff(t *testing.T) { Message: "fake scheduling failure", }) // Put in the unschedulable queue. - q.AddUnschedulableIfNotPresent(p) + q.AddUnschedulableIfNotPresent(p, q.SchedulingCycle()) // Move all unschedulable pods to the active queue. q.MoveAllToActiveQueue()