mirror of https://github.com/k3s-io/k3s
Merge pull request #72558 from denkensk/add-goroutine-move-unschedulablepods-to-activeq
Move unschedulable pods to the active queue if they are not retried for more than 1 minutepull/564/head
commit
d857790d36
|
@ -49,6 +49,10 @@ var (
|
||||||
queueClosed = "scheduling queue is closed"
|
queueClosed = "scheduling queue is closed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
|
||||||
|
// the pod will be moved from unschedulableQ to activeQ.
|
||||||
|
const unschedulableQTimeInterval = 60 * time.Second
|
||||||
|
|
||||||
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
|
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
|
||||||
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
|
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
|
||||||
// makes it easy to use those data structures as a SchedulingQueue.
|
// makes it easy to use those data structures as a SchedulingQueue.
|
||||||
|
@ -279,6 +283,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
|
||||||
// run starts the goroutine to pump from podBackoffQ to activeQ
|
// run starts the goroutine to pump from podBackoffQ to activeQ
|
||||||
func (p *PriorityQueue) run() {
|
func (p *PriorityQueue) run() {
|
||||||
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
|
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
|
||||||
|
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add adds a pod to the active queue. It should be called only when a new pod
|
// Add adds a pod to the active queue. It should be called only when a new pod
|
||||||
|
@ -443,6 +448,26 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the durationStayUnschedulableQ
|
||||||
|
// to activeQ.
|
||||||
|
func (p *PriorityQueue) flushUnschedulableQLeftover() {
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
var podsToMove []*v1.Pod
|
||||||
|
currentTime := p.clock.Now()
|
||||||
|
for _, pod := range p.unschedulableQ.pods {
|
||||||
|
lastScheduleTime := podTimestamp(pod)
|
||||||
|
if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
|
||||||
|
podsToMove = append(podsToMove, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(podsToMove) > 0 {
|
||||||
|
p.movePodsToActiveQueue(podsToMove)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Pop removes the head of the active queue and returns it. It blocks if the
|
// Pop removes the head of the active queue and returns it. It blocks if the
|
||||||
// activeQ is empty and waits until a new item is added to the queue. It also
|
// activeQ is empty and waits until a new item is added to the queue. It also
|
||||||
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.
|
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
@ -837,3 +838,63 @@ func TestHighProirotyBackoff(t *testing.T) {
|
||||||
t.Errorf("Expected to get mid prority pod, got: %v", p)
|
t.Errorf("Expected to get mid prority pod, got: %v", p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestHighProirotyFlushUnschedulableQLeftover tests that pods will be moved to
|
||||||
|
// activeQ after one minutes if it is in unschedulableQ
|
||||||
|
func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) {
|
||||||
|
q := NewPriorityQueue(nil)
|
||||||
|
midPod := v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "test-midpod",
|
||||||
|
Namespace: "ns1",
|
||||||
|
UID: types.UID("tp-mid"),
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Priority: &midPriority,
|
||||||
|
},
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
NominatedNodeName: "node1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
highPod := v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "test-highpod",
|
||||||
|
Namespace: "ns1",
|
||||||
|
UID: types.UID("tp-high"),
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Priority: &highPriority,
|
||||||
|
},
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
NominatedNodeName: "node1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
q.unschedulableQ.addOrUpdate(&highPod)
|
||||||
|
q.unschedulableQ.addOrUpdate(&midPod)
|
||||||
|
|
||||||
|
// Update pod condition to highPod.
|
||||||
|
podutil.UpdatePodCondition(&highPod.Status, &v1.PodCondition{
|
||||||
|
Type: v1.PodScheduled,
|
||||||
|
Status: v1.ConditionFalse,
|
||||||
|
Reason: v1.PodReasonUnschedulable,
|
||||||
|
Message: "fake scheduling failure",
|
||||||
|
LastProbeTime: metav1.Time{Time: time.Now().Add(-1 * unschedulableQTimeInterval)},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Update pod condition to midPod.
|
||||||
|
podutil.UpdatePodCondition(&midPod.Status, &v1.PodCondition{
|
||||||
|
Type: v1.PodScheduled,
|
||||||
|
Status: v1.ConditionFalse,
|
||||||
|
Reason: v1.PodReasonUnschedulable,
|
||||||
|
Message: "fake scheduling failure",
|
||||||
|
LastProbeTime: metav1.Time{Time: time.Now().Add(-1 * unschedulableQTimeInterval)},
|
||||||
|
})
|
||||||
|
|
||||||
|
if p, err := q.Pop(); err != nil || p != &highPod {
|
||||||
|
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
|
||||||
|
}
|
||||||
|
if p, err := q.Pop(); err != nil || p != &midPod {
|
||||||
|
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue