Merge pull request #73568 from cofyc/automated-cherry-pick-of-#73309-upstream-release-1.13

Automated cherry pick of #73309: Should move all unscheduable pods when we received move request to active queue
k3s-v1.13.4
Kubernetes Prow Robot 2019-02-21 15:41:25 -08:00 committed by GitHub
commit c8e0153c89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 127 additions and 30 deletions

View File

@ -1521,6 +1521,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
}
backoff.Gc()
podSchedulingCycle := podQueue.SchedulingCycle()
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
go func() {
@ -1548,7 +1549,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddUnschedulableIfNotPresent(pod)
podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle)
} else {
if c.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods

View File

@ -60,7 +60,14 @@ const unschedulableQTimeInterval = 60 * time.Second
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
// The podSchedulingCycle represents the current scheduling cycle number which can be
// returned by calling SchedulingCycle().
AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error
// SchedulingCycle returns the current number of scheduling cycle which is
// cached by scheduling queue. Normally, incrementing this number whenever
// a pod is popped (e.g. called Pop()) is enough.
SchedulingCycle() int64
// Pop removes the head of the queue and returns it. It blocks if the
// queue is empty and waits until a new item is added to the queue.
Pop() (*v1.Pod, error)
@ -112,10 +119,15 @@ func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error {
// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In
// FIFO it is added to the end of the queue.
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
return f.FIFO.AddIfNotPresent(pod)
}
// SchedulingCycle implements SchedulingQueue.SchedulingCycle interface.
func (f *FIFO) SchedulingCycle() int64 {
return 0
}
// Update updates a pod in the FIFO.
func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
return f.FIFO.Update(newPod)
@ -212,12 +224,14 @@ type PriorityQueue struct {
// nominatedPods is a structures that stores pods which are nominated to run
// on nodes.
nominatedPods *nominatedPodMap
// receivedMoveRequest is set to true whenever we receive a request to move a
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
// a pod from the activeQ. It indicates if we received a move request when a
// pod was in flight (we were trying to schedule it). In such a case, we put
// the pod back into the activeQ if it is determined unschedulable.
receivedMoveRequest bool
// schedulingCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
schedulingCycle int64
// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unscheduable pods in and before this scheduling
// cycle will be put back to activeQueue if we were trying to schedule them
// when we received move request.
moveRequestCycle int64
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
@ -254,11 +268,12 @@ func activeQComp(pod1, pod2 interface{}) bool {
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue {
pq := &PriorityQueue{
clock: util.RealClock{},
stop: stop,
activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: newNominatedPodMap(),
clock: util.RealClock{},
stop: stop,
activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1,
}
pq.cond.L = &pq.lock
@ -316,10 +331,19 @@ func isPodUnschedulable(pod *v1.Pod) bool {
return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable
}
// AddUnschedulableIfNotPresent does nothing if the pod is present in either
// queue. Otherwise it adds the pod to the unschedulable queue if
// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
// SchedulingCycle returns current scheduling cycle.
func (p *PriorityQueue) SchedulingCycle() int64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.schedulingCycle
}
// AddUnschedulableIfNotPresent does nothing if the pod is present in any
// queue. If pod is unschedulable, it adds pod to unschedulable queue if
// p.moveRequestCycle > podSchedulingCycle or to backoff queue if p.moveRequestCycle
// <= podSchedulingCycle but pod is subject to backoff. In other cases, it adds pod to
// active queue.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.get(pod) != nil {
@ -328,7 +352,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
if _, exists, _ := p.activeQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
p.unschedulableQ.addOrUpdate(pod)
p.nominatedPods.add(pod, "")
return nil
@ -362,8 +386,8 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
}
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It also
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
p.lock.Lock()
defer p.lock.Unlock()
@ -381,7 +405,7 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
return nil, err
}
pod := obj.(*v1.Pod)
p.receivedMoveRequest = false
p.schedulingCycle++
return pod, err
}
@ -479,7 +503,7 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
}
}
p.unschedulableQ.clear()
p.receivedMoveRequest = true
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}
@ -492,7 +516,7 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
}
}
p.receivedMoveRequest = true
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}

View File

@ -173,9 +173,9 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewPriorityQueue(nil)
q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ.
q.AddUnschedulableIfNotPresent(&unschedulablePod)
q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(&medPriorityPod, q.SchedulingCycle()) // This should go to activeQ.
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
expectedNominatedPods := &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
@ -203,6 +203,78 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
}
}
// TestPriorityQueue_AddUnschedulableIfNotPresent_Async tests scenario when
// AddUnschedulableIfNotPresent is called asynchronously pods in and before
// current scheduling cycle will be put back to activeQueue if we were trying
// to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) {
q := NewPriorityQueue(nil)
totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ {
priority := int32(i)
p := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod%d", i),
Namespace: fmt.Sprintf("ns%d", i),
UID: types.UID(fmt.Sprintf("upns%d", i)),
},
Spec: v1.PodSpec{
Priority: &priority,
},
}
expectedPods = append(expectedPods, p)
// priority is to make pods ordered in the PriorityQueue
q.Add(&p)
}
// Pop all pods except for the first one
for i := totalNum - 1; i > 0; i-- {
p, _ := q.Pop()
if !reflect.DeepEqual(&expectedPods[i], p) {
t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[i], p)
}
}
// move all pods to active queue when we were trying to schedule them
q.MoveAllToActiveQueue()
moveReqChan := make(chan struct{})
var wg sync.WaitGroup
wg.Add(totalNum - 1)
// mark pods[1] ~ pods[totalNum-1] as unschedulable, fire goroutines to add them back later
for i := 1; i < totalNum; i++ {
unschedulablePod := expectedPods[i].DeepCopy()
unschedulablePod.Status = v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
},
},
}
cycle := q.SchedulingCycle()
go func() {
<-moveReqChan
q.AddUnschedulableIfNotPresent(unschedulablePod, cycle)
wg.Done()
}()
}
firstPod, _ := q.Pop()
if !reflect.DeepEqual(&expectedPods[0], firstPod) {
t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod)
}
// close moveReqChan here to make sure q.AddUnschedulableIfNotPresent is called after another pod is popped
close(moveReqChan)
wg.Wait()
// all other pods should be in active queue again
for i := 1; i < totalNum; i++ {
if _, exists, _ := q.activeQ.Get(&expectedPods[i]); !exists {
t.Errorf("Expected %v to be added to activeQ.", expectedPods[i].Name)
}
}
}
func TestPriorityQueue_Pop(t *testing.T) {
q := NewPriorityQueue(nil)
wg := sync.WaitGroup{}
@ -656,7 +728,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
LastProbeTime: metav1.Now(),
})
// Put in the unschedulable queue.
q.AddUnschedulableIfNotPresent(p1)
q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()
// Simulation is over. Now let's pop all pods. The pod popped first should be
@ -704,7 +776,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
LastProbeTime: metav1.Now(),
})
// Put in the unschedulable queue
q.AddUnschedulableIfNotPresent(&unschedulablePod)
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()
@ -745,7 +817,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
LastProbeTime: metav1.Now(),
})
// And then, put unschedulable pod to the unschedulable queue
q.AddUnschedulableIfNotPresent(&unschedulablePod)
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()