From 5e4ccede4c9ad5e97f119113c883916f29412a68 Mon Sep 17 00:00:00 2001 From: Gregory Haynes Date: Tue, 24 Jul 2018 20:46:40 +0000 Subject: [PATCH] Reschedule with backoff With the alpha scheduling queue we move pods from unschedulable to active on certain events without a backoff. As a result we can cause starvation issues if high priority pods are in the unschedulable queue. Implement a backoff mechanism for pods being moved to active. Closes #56721 --- pkg/scheduler/core/extender_test.go | 2 +- pkg/scheduler/core/generic_scheduler_test.go | 10 +- pkg/scheduler/factory/factory.go | 2 +- pkg/scheduler/internal/queue/BUILD | 2 + .../internal/queue/scheduling_queue.go | 187 ++++++++++++++++-- .../internal/queue/scheduling_queue_test.go | 85 ++++++-- pkg/scheduler/util/BUILD | 2 + pkg/scheduler/util/backoff_utils.go | 29 ++- pkg/scheduler/util/backoff_utils_test.go | 30 ++- pkg/scheduler/util/clock.go | 34 ++++ 10 files changed, 340 insertions(+), 43 deletions(-) create mode 100644 pkg/scheduler/util/clock.go diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index f1f17beb43..fa3a95cc9f 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -507,7 +507,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for _, name := range test.nodes { cache.AddNode(createNode(name)) } - queue := internalqueue.NewSchedulingQueue() + queue := internalqueue.NewSchedulingQueue(nil) scheduler := NewGenericScheduler( cache, nil, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 2b66879692..4a35013aff 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -472,7 +472,7 @@ func TestGenericScheduler(t *testing.T) { scheduler := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, @@ -509,7 +509,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod s := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), predicates, algorithm.EmptyPredicateMetadataProducer, prioritizers, @@ -1436,7 +1436,7 @@ func TestPreempt(t *testing.T) { scheduler := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, @@ -1564,7 +1564,7 @@ func TestCacheInvalidationRace(t *testing.T) { scheduler := NewGenericScheduler( mockCache, eCache, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), ps, algorithm.EmptyPredicateMetadataProducer, prioritizers, @@ -1648,7 +1648,7 @@ func TestCacheInvalidationRace2(t *testing.T) { scheduler := NewGenericScheduler( cache, eCache, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), ps, algorithm.EmptyPredicateMetadataProducer, prioritizers, diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 188a0069b6..9a41cbd805 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -283,7 +283,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { c := &configFactory{ client: args.Client, podLister: schedulerCache, - podQueue: internalqueue.NewSchedulingQueue(), + podQueue: internalqueue.NewSchedulingQueue(stopEverything), nodeLister: args.NodeInformer.Lister(), pVLister: args.PvInformer.Lister(), pVCLister: args.PvcInformer.Lister(), diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 37a8d1f525..ea7b66fe3f 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -12,6 +12,8 @@ go_library( "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 0b4728851b..22242a9ce3 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -30,11 +30,14 @@ import ( "fmt" "reflect" "sync" + "time" "k8s.io/klog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ktypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -72,9 +75,9 @@ type SchedulingQueue interface { // NewSchedulingQueue initializes a new scheduling queue. If pod priority is // enabled a priority queue is returned. If it is disabled, a FIFO is returned. -func NewSchedulingQueue() SchedulingQueue { +func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue { if util.PodPriorityEnabled() { - return NewPriorityQueue() + return NewPriorityQueue(stop) } return NewFIFO() } @@ -178,12 +181,20 @@ func NominatedNodeName(pod *v1.Pod) string { // pods that are already tried and are determined to be unschedulable. The latter // is called unschedulableQ. type PriorityQueue struct { + stop <-chan struct{} + clock util.Clock + // podBackoff tracks backoff for pods attempting to be rescheduled + podBackoff *util.PodBackoff + lock sync.RWMutex cond sync.Cond // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. activeQ *util.Heap + // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff + // are popped from this heap before the scheduler looks at activeQ + podBackoffQ *util.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap // nominatedPods is a map keyed by a node name and the value is a list of @@ -227,16 +238,33 @@ func activeQComp(pod1, pod2 interface{}) bool { } // NewPriorityQueue creates a PriorityQueue object. -func NewPriorityQueue() *PriorityQueue { +func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue { + return NewPriorityQueueWithClock(stop, util.RealClock{}) +} + +// NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time. +func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *PriorityQueue { pq := &PriorityQueue{ + clock: clock, + stop: stop, + podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock), activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp), unschedulableQ: newUnschedulablePodsMap(), nominatedPods: map[string][]*v1.Pod{}, } pq.cond.L = &pq.lock + pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted) + + pq.run() + return pq } +// run starts the goroutine to pump from podBackoffQ to activeQ +func (p *PriorityQueue) run() { + go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) +} + // addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not // already exist in the map. Adding an existing pod is not going to update the pod. func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) { @@ -278,7 +306,7 @@ func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) { } // Add adds a pod to the active queue. It should be called only when a new pod -// is added so there is no chance the pod is already in either queue. +// is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() @@ -291,6 +319,10 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { p.deleteNominatedPodIfExists(pod) p.unschedulableQ.delete(pod) } + // Delete pod from backoffQ if it is backing off + if err = p.podBackoffQ.Delete(pod); err == nil { + klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name) + } p.addNominatedPodIfNeeded(pod) p.cond.Broadcast() } @@ -308,6 +340,9 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { if _, exists, _ := p.activeQ.Get(pod); exists { return nil } + if _, exists, _ := p.podBackoffQ.Get(pod); exists { + return nil + } err := p.activeQ.Add(pod) if err != nil { klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) @@ -323,6 +358,40 @@ func isPodUnschedulable(pod *v1.Pod) bool { return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable } +// nsNameForPod returns a namespacedname for a pod +func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName { + return ktypes.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + } +} + +// clearPodBackoff clears all backoff state for a pod (resets expiry) +func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) { + p.podBackoff.ClearPodBackoff(nsNameForPod(pod)) +} + +// isPodBackingOff returns whether a pod is currently undergoing backoff in the podBackoff structure +func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool { + boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) + if !exists { + return false + } + return boTime.After(p.clock.Now()) +} + +// backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff +// timeout otherwise it does nothing. +func (p *PriorityQueue) backoffPod(pod *v1.Pod) { + p.podBackoff.Gc() + + podID := nsNameForPod(pod) + boTime, found := p.podBackoff.GetBackoffTime(podID) + if !found || boTime.Before(p.clock.Now()) { + p.podBackoff.BackoffPod(podID) + } +} + // AddUnschedulableIfNotPresent does nothing if the pod is present in either // queue. Otherwise it adds the pod to the unschedulable queue if // p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true. @@ -335,11 +404,27 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { if _, exists, _ := p.activeQ.Get(pod); exists { return fmt.Errorf("pod is already present in the activeQ") } + if _, exists, _ := p.podBackoffQ.Get(pod); exists { + return fmt.Errorf("pod is already present in the backoffQ") + } if !p.receivedMoveRequest && isPodUnschedulable(pod) { + p.backoffPod(pod) p.unschedulableQ.addOrUpdate(pod) p.addNominatedPodIfNeeded(pod) return nil } + + // If a move request has been received and the pod is subject to backoff, move it to the BackoffQ. + if p.isPodBackingOff(pod) && isPodUnschedulable(pod) { + err := p.podBackoffQ.Add(pod) + if err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } else { + p.addNominatedPodIfNeeded(pod) + } + return err + } + err := p.activeQ.Add(pod) if err == nil { p.addNominatedPodIfNeeded(pod) @@ -348,6 +433,39 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { return err } +// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ +func (p *PriorityQueue) flushBackoffQCompleted() { + p.lock.Lock() + defer p.lock.Unlock() + + for { + rawPod := p.podBackoffQ.Peek() + if rawPod == nil { + return + } + pod := rawPod.(*v1.Pod) + boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) + if !found { + klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod)) + p.podBackoffQ.Pop() + p.activeQ.Add(pod) + defer p.cond.Broadcast() + continue + } + + if boTime.After(p.clock.Now()) { + return + } + _, err := p.podBackoffQ.Pop() + if err != nil { + klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod)) + return + } + p.activeQ.Add(pod) + defer p.cond.Broadcast() + } +} + // Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It also // clears receivedMoveRequest to mark the beginning of a new scheduling cycle. @@ -391,16 +509,33 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - // If the pod is already in the active queue, just update it there. - if _, exists, _ := p.activeQ.Get(newPod); exists { - p.updateNominatedPod(oldPod, newPod) - err := p.activeQ.Update(newPod) - return err + + if oldPod != nil { + // If the pod is already in the active queue, just update it there. + if _, exists, _ := p.activeQ.Get(oldPod); exists { + p.updateNominatedPod(oldPod, newPod) + err := p.activeQ.Update(newPod) + return err + } + + // If the pod is in the backoff queue, update it there. + if _, exists, _ := p.podBackoffQ.Get(oldPod); exists { + p.updateNominatedPod(oldPod, newPod) + p.podBackoffQ.Delete(newPod) + err := p.activeQ.Add(newPod) + if err == nil { + p.cond.Broadcast() + } + return err + } } + // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPod := p.unschedulableQ.get(newPod); usPod != nil { p.updateNominatedPod(oldPod, newPod) if isPodUpdated(oldPod, newPod) { + // If the pod is updated reset backoff + p.clearPodBackoff(newPod) p.unschedulableQ.delete(usPod) err := p.activeQ.Add(newPod) if err == nil { @@ -408,6 +543,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { } return err } + // Pod is already in unschedulable queue and hasnt updated, no need to backoff again p.unschedulableQ.addOrUpdate(newPod) return nil } @@ -428,6 +564,8 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.deleteNominatedPodIfExists(pod) err := p.activeQ.Delete(pod) if err != nil { // The item was probably not found in the activeQ. + p.clearPodBackoff(pod) + p.podBackoffQ.Delete(pod) p.unschedulableQ.delete(pod) } return nil @@ -453,16 +591,18 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { // function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives it after all the pods are in the // queue and the head is the highest priority pod. -// TODO(bsalamat): We should add a back-off mechanism here so that a high priority -// pod which is unschedulable does not go to the head of the queue frequently. For -// example in a cluster where a lot of pods being deleted, such a high priority -// pod can deprive other pods from getting scheduled. func (p *PriorityQueue) MoveAllToActiveQueue() { p.lock.Lock() defer p.lock.Unlock() for _, pod := range p.unschedulableQ.pods { - if err := p.activeQ.Add(pod); err != nil { - klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) + if p.isPodBackingOff(pod) { + if err := p.podBackoffQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } + } else { + if err := p.activeQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) + } } } p.unschedulableQ.clear() @@ -473,11 +613,16 @@ func (p *PriorityQueue) MoveAllToActiveQueue() { // NOTE: this function assumes lock has been acquired in caller func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { for _, pod := range pods { - if err := p.activeQ.Add(pod); err == nil { - p.unschedulableQ.delete(pod) + if p.isPodBackingOff(pod) { + if err := p.podBackoffQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } } else { - klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) + if err := p.activeQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) + } } + p.unschedulableQ.delete(pod) } p.receivedMoveRequest = true p.cond.Broadcast() @@ -550,6 +695,12 @@ func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) { p.lock.Unlock() } +func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool { + bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p1.(*v1.Pod))) + bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p2.(*v1.Pod))) + return bo1.Before(bo2) +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index e7566e3299..cc0a057fdf 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -95,7 +95,7 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. } func TestPriorityQueue_Add(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -120,7 +120,7 @@ func TestPriorityQueue_Add(t *testing.T) { } func TestPriorityQueue_AddIfNotPresent(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.unschedulableQ.addOrUpdate(&highPriNominatedPod) q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddIfNotPresent(&medPriorityPod) @@ -146,7 +146,7 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&highPriNominatedPod) q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ. @@ -172,7 +172,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { } func TestPriorityQueue_Pop(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -189,7 +189,7 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Update(nil, &highPriorityPod) if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) @@ -225,7 +225,7 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Update(&highPriorityPod, &highPriNominatedPod) q.Add(&unschedulablePod) q.Delete(&highPriNominatedPod) @@ -245,7 +245,7 @@ func TestPriorityQueue_Delete(t *testing.T) { } func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.unschedulableQ.addOrUpdate(&unschedulablePod) q.unschedulableQ.addOrUpdate(&highPriorityPod) @@ -291,7 +291,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { Spec: v1.PodSpec{NodeName: "machine1"}, } - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. q.unschedulableQ.addOrUpdate(&unschedulablePod) @@ -312,7 +312,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_WaitingPodsForNode(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -491,7 +491,7 @@ func TestSchedulingQueue_Close(t *testing.T) { }, { name: "PriorityQueue close", - q: NewPriorityQueue(), + q: NewPriorityQueue(nil), expectedErr: fmt.Errorf(queueClosed), }, } @@ -520,7 +520,7 @@ func TestSchedulingQueue_Close(t *testing.T) { // ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -567,3 +567,66 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { } } } + +// TestHighPriorityBackoff tests that a high priority pod does not block +// other pods if it is unschedulable +func TestHighProirotyBackoff(t *testing.T) { + q := NewPriorityQueue(nil) + + midPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-midpod", + Namespace: "ns1", + UID: types.UID("tp-mid"), + }, + Spec: v1.PodSpec{ + Priority: &midPriority, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + highPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-highpod", + Namespace: "ns1", + UID: types.UID("tp-high"), + }, + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + q.Add(&midPod) + q.Add(&highPod) + // Simulate a pod being popped by the scheduler, determined unschedulable, and + // then moved back to the active queue. + p, err := q.Pop() + if err != nil { + t.Errorf("Error while popping the head of the queue: %v", err) + } + if p != &highPod { + t.Errorf("Expected to get high prority pod, got: %v", p) + } + // Update pod condition to unschedulable. + podutil.UpdatePodCondition(&p.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + }) + // Put in the unschedulable queue. + q.AddUnschedulableIfNotPresent(p) + // Move all unschedulable pods to the active queue. + q.MoveAllToActiveQueue() + + p, err = q.Pop() + if err != nil { + t.Errorf("Error while popping the head of the queue: %v", err) + } + if p != &midPod { + t.Errorf("Expected to get mid prority pod, got: %v", p) + } +} diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 0f8c07e080..53649692f2 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -26,6 +26,7 @@ go_library( name = "go_default_library", srcs = [ "backoff_utils.go", + "clock.go", "heap.go", "utils.go", ], @@ -36,6 +37,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go index e77e808658..f800d9d5af 100644 --- a/pkg/scheduler/util/backoff_utils.go +++ b/pkg/scheduler/util/backoff_utils.go @@ -80,11 +80,6 @@ func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { return newDuration } -// backoffAndWait Blocks until this entry has completed backoff -func (b *backoffEntry) backoffAndWait(maxDuration time.Duration) { - time.Sleep(b.getBackoff(maxDuration)) -} - // PodBackoff is used to restart a pod with back-off delay. type PodBackoff struct { // expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd @@ -183,6 +178,30 @@ func (p *PodBackoff) Gc() { } } +// GetBackoffTime returns the time that podID completes backoff +func (p *PodBackoff) GetBackoffTime(podID ktypes.NamespacedName) (time.Time, bool) { + p.lock.Lock() + defer p.lock.Unlock() + rawBe, exists, _ := p.expiryQ.GetByKey(podID.String()) + if !exists { + return time.Time{}, false + } + be := rawBe.(*backoffEntry) + return be.lastUpdate.Add(be.backoff), true +} + +// ClearPodBackoff removes all tracking information for podID (clears expiry) +func (p *PodBackoff) ClearPodBackoff(podID ktypes.NamespacedName) bool { + p.lock.Lock() + defer p.lock.Unlock() + entry, exists, _ := p.expiryQ.GetByKey(podID.String()) + if exists { + err := p.expiryQ.Delete(entry) + return err == nil + } + return false +} + // backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap func backoffEntryKeyFunc(b interface{}) (string, error) { be := b.(*backoffEntry) diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go index 8a618ebb32..7aa045da3b 100644 --- a/pkg/scheduler/util/backoff_utils_test.go +++ b/pkg/scheduler/util/backoff_utils_test.go @@ -31,7 +31,7 @@ func (f *fakeClock) Now() time.Time { return f.t } -func TestBackoff(t *testing.T) { +func TestBackoffPod(t *testing.T) { clock := fakeClock{} backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) tests := []struct { @@ -66,7 +66,10 @@ func TestBackoff(t *testing.T) { for _, test := range tests { duration := backoff.BackoffPod(test.podID) if duration != test.expectedDuration { - t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) + t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID) + } + if boTime, _ := backoff.GetBackoffTime(test.podID); boTime != clock.Now().Add(test.expectedDuration) { + t.Errorf("expected GetBackoffTime %s, got %s for pod %s", test.expectedDuration.String(), boTime.String(), test.podID) } clock.t = clock.t.Add(test.advanceClock) backoff.Gc() @@ -85,3 +88,26 @@ func TestBackoff(t *testing.T) { t.Errorf("expected: 1, got %s", duration.String()) } } + +func TestClearPodBackoff(t *testing.T) { + clock := fakeClock{} + backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) + + if backoff.ClearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "nonexist"}) { + t.Error("Expected ClearPodBackoff failure for unknown pod, got success.") + } + + podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"} + if dur := backoff.BackoffPod(podID); dur != 1*time.Second { + t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, dur.String()) + } + + if !backoff.ClearPodBackoff(podID) { + t.Errorf("Failed to clear backoff for pod %v", podID) + } + + expectBoTime := clock.Now() + if boTime, _ := backoff.GetBackoffTime(podID); boTime != expectBoTime { + t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime) + } +} diff --git a/pkg/scheduler/util/clock.go b/pkg/scheduler/util/clock.go new file mode 100644 index 0000000000..e17c759dba --- /dev/null +++ b/pkg/scheduler/util/clock.go @@ -0,0 +1,34 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "time" +) + +// Clock provides an interface for getting the current time +type Clock interface { + Now() time.Time +} + +// RealClock implements a clock using time +type RealClock struct{} + +// Now returns the current time with time.Now +func (RealClock) Now() time.Time { + return time.Now() +}