mirror of https://github.com/k3s-io/k3s
Fix weakness of current receivedMoveRequest
- add incremental scheduling cycle - instead of set a flag on move reqeust, we cache current scheduling cycle in moveRequestCycle - when unschedulable pods are added back, compare its cycle with moveRequestCycle to decide whether it should be added into active queue or notpull/564/head
parent
0210c0d869
commit
ba47beffd2
|
@ -1085,6 +1085,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff,
|
||||||
}
|
}
|
||||||
|
|
||||||
backoff.Gc()
|
backoff.Gc()
|
||||||
|
podSchedulingCycle := podQueue.SchedulingCycle()
|
||||||
// Retry asynchronously.
|
// Retry asynchronously.
|
||||||
// Note that this is extremely rudimentary and we need a more real error handling path.
|
// Note that this is extremely rudimentary and we need a more real error handling path.
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -1110,7 +1111,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff,
|
||||||
pod, err := client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
|
pod, err := client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if len(pod.Spec.NodeName) == 0 {
|
if len(pod.Spec.NodeName) == 0 {
|
||||||
podQueue.AddUnschedulableIfNotPresent(pod)
|
podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,14 @@ const unschedulableQTimeInterval = 60 * time.Second
|
||||||
type SchedulingQueue interface {
|
type SchedulingQueue interface {
|
||||||
Add(pod *v1.Pod) error
|
Add(pod *v1.Pod) error
|
||||||
AddIfNotPresent(pod *v1.Pod) error
|
AddIfNotPresent(pod *v1.Pod) error
|
||||||
AddUnschedulableIfNotPresent(pod *v1.Pod) error
|
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
|
||||||
|
// The podSchedulingCycle represents the current scheduling cycle number which can be
|
||||||
|
// returned by calling SchedulingCycle().
|
||||||
|
AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error
|
||||||
|
// SchedulingCycle returns the current number of scheduling cycle which is
|
||||||
|
// cached by scheduling queue. Normally, incrementing this number whenever
|
||||||
|
// a pod is popped (e.g. called Pop()) is enough.
|
||||||
|
SchedulingCycle() int64
|
||||||
// Pop removes the head of the queue and returns it. It blocks if the
|
// Pop removes the head of the queue and returns it. It blocks if the
|
||||||
// queue is empty and waits until a new item is added to the queue.
|
// queue is empty and waits until a new item is added to the queue.
|
||||||
Pop() (*v1.Pod, error)
|
Pop() (*v1.Pod, error)
|
||||||
|
@ -111,10 +118,15 @@ func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error {
|
||||||
|
|
||||||
// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In
|
// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In
|
||||||
// FIFO it is added to the end of the queue.
|
// FIFO it is added to the end of the queue.
|
||||||
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
|
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
|
||||||
return f.FIFO.AddIfNotPresent(pod)
|
return f.FIFO.AddIfNotPresent(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SchedulingCycle implements SchedulingQueue.SchedulingCycle interface.
|
||||||
|
func (f *FIFO) SchedulingCycle() int64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
// Update updates a pod in the FIFO.
|
// Update updates a pod in the FIFO.
|
||||||
func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
|
func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
|
||||||
return f.FIFO.Update(newPod)
|
return f.FIFO.Update(newPod)
|
||||||
|
@ -218,12 +230,14 @@ type PriorityQueue struct {
|
||||||
// nominatedPods is a structures that stores pods which are nominated to run
|
// nominatedPods is a structures that stores pods which are nominated to run
|
||||||
// on nodes.
|
// on nodes.
|
||||||
nominatedPods *nominatedPodMap
|
nominatedPods *nominatedPodMap
|
||||||
// receivedMoveRequest is set to true whenever we receive a request to move a
|
// schedulingCycle represents sequence number of scheduling cycle and is incremented
|
||||||
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
|
// when a pod is popped.
|
||||||
// a pod from the activeQ. It indicates if we received a move request when a
|
schedulingCycle int64
|
||||||
// pod was in flight (we were trying to schedule it). In such a case, we put
|
// moveRequestCycle caches the sequence number of scheduling cycle when we
|
||||||
// the pod back into the activeQ if it is determined unschedulable.
|
// received a move request. Unscheduable pods in and before this scheduling
|
||||||
receivedMoveRequest bool
|
// cycle will be put back to activeQueue if we were trying to schedule them
|
||||||
|
// when we received move request.
|
||||||
|
moveRequestCycle int64
|
||||||
|
|
||||||
// closed indicates that the queue is closed.
|
// closed indicates that the queue is closed.
|
||||||
// It is mainly used to let Pop() exit its control loop while waiting for an item.
|
// It is mainly used to let Pop() exit its control loop while waiting for an item.
|
||||||
|
@ -271,6 +285,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
|
||||||
activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp),
|
activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp),
|
||||||
unschedulableQ: newUnschedulablePodsMap(),
|
unschedulableQ: newUnschedulablePodsMap(),
|
||||||
nominatedPods: newNominatedPodMap(),
|
nominatedPods: newNominatedPodMap(),
|
||||||
|
moveRequestCycle: -1,
|
||||||
}
|
}
|
||||||
pq.cond.L = &pq.lock
|
pq.cond.L = &pq.lock
|
||||||
pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted)
|
pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted)
|
||||||
|
@ -372,12 +387,19 @@ func (p *PriorityQueue) backoffPod(pod *v1.Pod) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SchedulingCycle returns current scheduling cycle.
|
||||||
|
func (p *PriorityQueue) SchedulingCycle() int64 {
|
||||||
|
p.lock.RLock()
|
||||||
|
defer p.lock.RUnlock()
|
||||||
|
return p.schedulingCycle
|
||||||
|
}
|
||||||
|
|
||||||
// AddUnschedulableIfNotPresent does nothing if the pod is present in any
|
// AddUnschedulableIfNotPresent does nothing if the pod is present in any
|
||||||
// queue. If pod is unschedulable, it adds pod to unschedulable queue if
|
// queue. If pod is unschedulable, it adds pod to unschedulable queue if
|
||||||
// p.receivedMoveRequest is false or to backoff queue if p.receivedMoveRequest
|
// p.moveRequestCycle > podSchedulingCycle or to backoff queue if p.moveRequestCycle
|
||||||
// is true but pod is subject to backoff. In other cases, it adds pod to active
|
// <= podSchedulingCycle but pod is subject to backoff. In other cases, it adds pod to
|
||||||
// queue and clears p.receivedMoveRequest.
|
// active queue.
|
||||||
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
|
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
if p.unschedulableQ.get(pod) != nil {
|
if p.unschedulableQ.get(pod) != nil {
|
||||||
|
@ -389,7 +411,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
|
||||||
if _, exists, _ := p.podBackoffQ.Get(pod); exists {
|
if _, exists, _ := p.podBackoffQ.Get(pod); exists {
|
||||||
return fmt.Errorf("pod is already present in the backoffQ")
|
return fmt.Errorf("pod is already present in the backoffQ")
|
||||||
}
|
}
|
||||||
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
|
if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
|
||||||
p.backoffPod(pod)
|
p.backoffPod(pod)
|
||||||
p.unschedulableQ.addOrUpdate(pod)
|
p.unschedulableQ.addOrUpdate(pod)
|
||||||
p.nominatedPods.add(pod, "")
|
p.nominatedPods.add(pod, "")
|
||||||
|
@ -412,7 +434,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
|
||||||
p.nominatedPods.add(pod, "")
|
p.nominatedPods.add(pod, "")
|
||||||
p.cond.Broadcast()
|
p.cond.Broadcast()
|
||||||
}
|
}
|
||||||
p.receivedMoveRequest = false
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,7 +491,8 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pop removes the head of the active queue and returns it. It blocks if the
|
// Pop removes the head of the active queue and returns it. It blocks if the
|
||||||
// activeQ is empty and waits until a new item is added to the queue.
|
// activeQ is empty and waits until a new item is added to the queue. It
|
||||||
|
// increments scheduling cycle when a pod is popped.
|
||||||
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
|
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
@ -488,6 +510,7 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
pod := obj.(*v1.Pod)
|
pod := obj.(*v1.Pod)
|
||||||
|
p.schedulingCycle++
|
||||||
return pod, err
|
return pod, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -608,7 +631,7 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.unschedulableQ.clear()
|
p.unschedulableQ.clear()
|
||||||
p.receivedMoveRequest = true
|
p.moveRequestCycle = p.schedulingCycle
|
||||||
p.cond.Broadcast()
|
p.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -626,7 +649,7 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
|
||||||
}
|
}
|
||||||
p.unschedulableQ.delete(pod)
|
p.unschedulableQ.delete(pod)
|
||||||
}
|
}
|
||||||
p.receivedMoveRequest = true
|
p.moveRequestCycle = p.schedulingCycle
|
||||||
p.cond.Broadcast()
|
p.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -179,9 +179,9 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
|
||||||
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
||||||
q := NewPriorityQueue(nil)
|
q := NewPriorityQueue(nil)
|
||||||
q.Add(&highPriNominatedPod)
|
q.Add(&highPriNominatedPod)
|
||||||
q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything.
|
q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything.
|
||||||
q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ.
|
q.AddUnschedulableIfNotPresent(&medPriorityPod, q.SchedulingCycle()) // This should go to activeQ.
|
||||||
q.AddUnschedulableIfNotPresent(&unschedulablePod)
|
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
|
||||||
expectedNominatedPods := &nominatedPodMap{
|
expectedNominatedPods := &nominatedPodMap{
|
||||||
nominatedPodToNode: map[types.UID]string{
|
nominatedPodToNode: map[types.UID]string{
|
||||||
medPriorityPod.UID: "node1",
|
medPriorityPod.UID: "node1",
|
||||||
|
@ -209,6 +209,78 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestPriorityQueue_AddUnschedulableIfNotPresent_Async tests scenario when
|
||||||
|
// AddUnschedulableIfNotPresent is called asynchronously pods in and before
|
||||||
|
// current scheduling cycle will be put back to activeQueue if we were trying
|
||||||
|
// to schedule them when we received move request.
|
||||||
|
func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) {
|
||||||
|
q := NewPriorityQueue(nil)
|
||||||
|
totalNum := 10
|
||||||
|
expectedPods := make([]v1.Pod, 0, totalNum)
|
||||||
|
for i := 0; i < totalNum; i++ {
|
||||||
|
priority := int32(i)
|
||||||
|
p := v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod%d", i),
|
||||||
|
Namespace: fmt.Sprintf("ns%d", i),
|
||||||
|
UID: types.UID(fmt.Sprintf("upns%d", i)),
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Priority: &priority,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
expectedPods = append(expectedPods, p)
|
||||||
|
// priority is to make pods ordered in the PriorityQueue
|
||||||
|
q.Add(&p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pop all pods except for the first one
|
||||||
|
for i := totalNum - 1; i > 0; i-- {
|
||||||
|
p, _ := q.Pop()
|
||||||
|
if !reflect.DeepEqual(&expectedPods[i], p) {
|
||||||
|
t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[i], p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// move all pods to active queue when we were trying to schedule them
|
||||||
|
q.MoveAllToActiveQueue()
|
||||||
|
moveReqChan := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(totalNum - 1)
|
||||||
|
// mark pods[1] ~ pods[totalNum-1] as unschedulable, fire goroutines to add them back later
|
||||||
|
for i := 1; i < totalNum; i++ {
|
||||||
|
unschedulablePod := expectedPods[i].DeepCopy()
|
||||||
|
unschedulablePod.Status = v1.PodStatus{
|
||||||
|
Conditions: []v1.PodCondition{
|
||||||
|
{
|
||||||
|
Type: v1.PodScheduled,
|
||||||
|
Status: v1.ConditionFalse,
|
||||||
|
Reason: v1.PodReasonUnschedulable,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cycle := q.SchedulingCycle()
|
||||||
|
go func() {
|
||||||
|
<-moveReqChan
|
||||||
|
q.AddUnschedulableIfNotPresent(unschedulablePod, cycle)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
firstPod, _ := q.Pop()
|
||||||
|
if !reflect.DeepEqual(&expectedPods[0], firstPod) {
|
||||||
|
t.Errorf("Unexpected pod. Expected: %v, got: %v", &expectedPods[0], firstPod)
|
||||||
|
}
|
||||||
|
// close moveReqChan here to make sure q.AddUnschedulableIfNotPresent is called after another pod is popped
|
||||||
|
close(moveReqChan)
|
||||||
|
wg.Wait()
|
||||||
|
// all other pods should be in active queue again
|
||||||
|
for i := 1; i < totalNum; i++ {
|
||||||
|
if _, exists, _ := q.activeQ.Get(&expectedPods[i]); !exists {
|
||||||
|
t.Errorf("Expected %v to be added to activeQ.", expectedPods[i].Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPriorityQueue_Pop(t *testing.T) {
|
func TestPriorityQueue_Pop(t *testing.T) {
|
||||||
q := NewPriorityQueue(nil)
|
q := NewPriorityQueue(nil)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
@ -680,7 +752,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
|
||||||
LastProbeTime: metav1.Now(),
|
LastProbeTime: metav1.Now(),
|
||||||
})
|
})
|
||||||
// Put in the unschedulable queue.
|
// Put in the unschedulable queue.
|
||||||
q.AddUnschedulableIfNotPresent(p1)
|
q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle())
|
||||||
// Move all unschedulable pods to the active queue.
|
// Move all unschedulable pods to the active queue.
|
||||||
q.MoveAllToActiveQueue()
|
q.MoveAllToActiveQueue()
|
||||||
// Simulation is over. Now let's pop all pods. The pod popped first should be
|
// Simulation is over. Now let's pop all pods. The pod popped first should be
|
||||||
|
@ -728,7 +800,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
|
||||||
LastProbeTime: metav1.Now(),
|
LastProbeTime: metav1.Now(),
|
||||||
})
|
})
|
||||||
// Put in the unschedulable queue
|
// Put in the unschedulable queue
|
||||||
q.AddUnschedulableIfNotPresent(&unschedulablePod)
|
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
|
||||||
// Clear its backoff to simulate backoff its expiration
|
// Clear its backoff to simulate backoff its expiration
|
||||||
q.clearPodBackoff(&unschedulablePod)
|
q.clearPodBackoff(&unschedulablePod)
|
||||||
// Move all unschedulable pods to the active queue.
|
// Move all unschedulable pods to the active queue.
|
||||||
|
@ -771,7 +843,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
|
||||||
LastProbeTime: metav1.Now(),
|
LastProbeTime: metav1.Now(),
|
||||||
})
|
})
|
||||||
// And then, put unschedulable pod to the unschedulable queue
|
// And then, put unschedulable pod to the unschedulable queue
|
||||||
q.AddUnschedulableIfNotPresent(&unschedulablePod)
|
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
|
||||||
// Clear its backoff to simulate its backoff expiration
|
// Clear its backoff to simulate its backoff expiration
|
||||||
q.clearPodBackoff(&unschedulablePod)
|
q.clearPodBackoff(&unschedulablePod)
|
||||||
// Move all unschedulable pods to the active queue.
|
// Move all unschedulable pods to the active queue.
|
||||||
|
@ -838,7 +910,7 @@ func TestHighProirotyBackoff(t *testing.T) {
|
||||||
Message: "fake scheduling failure",
|
Message: "fake scheduling failure",
|
||||||
})
|
})
|
||||||
// Put in the unschedulable queue.
|
// Put in the unschedulable queue.
|
||||||
q.AddUnschedulableIfNotPresent(p)
|
q.AddUnschedulableIfNotPresent(p, q.SchedulingCycle())
|
||||||
// Move all unschedulable pods to the active queue.
|
// Move all unschedulable pods to the active queue.
|
||||||
q.MoveAllToActiveQueue()
|
q.MoveAllToActiveQueue()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue