From ea9e1a4118141eb71b7c1484adde7b1fddcfd751 Mon Sep 17 00:00:00 2001 From: wangqingcan Date: Fri, 22 Feb 2019 09:46:18 +0800 Subject: [PATCH] not updae timestamp for each scheduling attempt --- .../internal/queue/scheduling_queue.go | 214 ++++++++----- .../internal/queue/scheduling_queue_test.go | 295 +++++++++++++----- pkg/scheduler/scheduler.go | 9 +- 3 files changed, 355 insertions(+), 163 deletions(-) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index bc5130d481..97e98526e4 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -260,15 +260,29 @@ func podTimestamp(pod *v1.Pod) *metav1.Time { return &condition.LastProbeTime } +// podInfo is minimum cell in the scheduling queue. +type podInfo struct { + pod *v1.Pod + // The time pod added to the scheduling queue. + timestamp time.Time +} + +// newPodInfoNoTimestamp builds a podInfo object without timestamp. +func newPodInfoNoTimestamp(pod *v1.Pod) *podInfo { + return &podInfo{ + pod: pod, + } +} + // activeQComp is the function used by the activeQ heap algorithm to sort pods. // It sorts pods based on their priority. When priorities are equal, it uses -// podTimestamp. -func activeQComp(pod1, pod2 interface{}) bool { - p1 := pod1.(*v1.Pod) - p2 := pod2.(*v1.Pod) - prio1 := util.GetPodPriority(p1) - prio2 := util.GetPodPriority(p2) - return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2))) +// podInfo.timestamp. +func activeQComp(podInfo1, podInfo2 interface{}) bool { + pInfo1 := podInfo1.(*podInfo) + pInfo2 := podInfo2.(*podInfo) + prio1 := util.GetPodPriority(pInfo1.pod) + prio2 := util.GetPodPriority(pInfo2.pod) + return (prio1 > prio2) || (prio1 == prio2 && pInfo1.timestamp.Before(pInfo2.timestamp)) } // NewPriorityQueue creates a PriorityQueue object. @@ -282,13 +296,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority clock: clock, stop: stop, podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock), - activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp), - unschedulableQ: newUnschedulablePodsMap(), + activeQ: util.NewHeap(podInfoKeyFunc, activeQComp), + unschedulableQ: newUnschedulablePodsMap(clock), nominatedPods: newNominatedPodMap(), moveRequestCycle: -1, } pq.cond.L = &pq.lock - pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted) + pq.podBackoffQ = util.NewHeap(podInfoKeyFunc, pq.podsCompareBackoffCompleted) pq.run() @@ -306,7 +320,8 @@ func (p *PriorityQueue) run() { func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - if err := p.activeQ.Add(pod); err != nil { + pInfo := p.newPodInfo(pod) + if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) return err } @@ -315,7 +330,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { p.unschedulableQ.delete(pod) } // Delete pod from backoffQ if it is backing off - if err := p.podBackoffQ.Delete(pod); err == nil { + if err := p.podBackoffQ.Delete(pInfo); err == nil { klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name) } p.nominatedPods.add(pod, "") @@ -332,13 +347,15 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { if p.unschedulableQ.get(pod) != nil { return nil } - if _, exists, _ := p.activeQ.Get(pod); exists { + + pInfo := p.newPodInfo(pod) + if _, exists, _ := p.activeQ.Get(pInfo); exists { return nil } - if _, exists, _ := p.podBackoffQ.Get(pod); exists { + if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { return nil } - err := p.activeQ.Add(pod) + err := p.activeQ.Add(pInfo) if err != nil { klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) } else { @@ -405,22 +422,24 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingC if p.unschedulableQ.get(pod) != nil { return fmt.Errorf("pod is already present in unschedulableQ") } - if _, exists, _ := p.activeQ.Get(pod); exists { + + pInfo := p.newPodInfo(pod) + if _, exists, _ := p.activeQ.Get(pInfo); exists { return fmt.Errorf("pod is already present in the activeQ") } - if _, exists, _ := p.podBackoffQ.Get(pod); exists { + if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { return fmt.Errorf("pod is already present in the backoffQ") } if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) { p.backoffPod(pod) - p.unschedulableQ.addOrUpdate(pod) + p.unschedulableQ.addOrUpdate(pInfo) p.nominatedPods.add(pod, "") return nil } // If a move request has been received and the pod is subject to backoff, move it to the BackoffQ. if p.isPodBackingOff(pod) && isPodUnschedulable(pod) { - err := p.podBackoffQ.Add(pod) + err := p.podBackoffQ.Add(pInfo) if err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) } else { @@ -429,7 +448,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingC return err } - err := p.activeQ.Add(pod) + err := p.activeQ.Add(pInfo) if err == nil { p.nominatedPods.add(pod, "") p.cond.Broadcast() @@ -443,16 +462,16 @@ func (p *PriorityQueue) flushBackoffQCompleted() { defer p.lock.Unlock() for { - rawPod := p.podBackoffQ.Peek() - if rawPod == nil { + rawPodInfo := p.podBackoffQ.Peek() + if rawPodInfo == nil { return } - pod := rawPod.(*v1.Pod) + pod := rawPodInfo.(*podInfo).pod boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) if !found { klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod)) p.podBackoffQ.Pop() - p.activeQ.Add(pod) + p.activeQ.Add(rawPodInfo) defer p.cond.Broadcast() continue } @@ -465,7 +484,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() { klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod)) return } - p.activeQ.Add(pod) + p.activeQ.Add(rawPodInfo) defer p.cond.Broadcast() } } @@ -476,12 +495,12 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() { p.lock.Lock() defer p.lock.Unlock() - var podsToMove []*v1.Pod + var podsToMove []*podInfo currentTime := p.clock.Now() - for _, pod := range p.unschedulableQ.pods { - lastScheduleTime := podTimestamp(pod) - if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval { - podsToMove = append(podsToMove, pod) + for _, pInfo := range p.unschedulableQ.podInfoMap { + lastScheduleTime := pInfo.timestamp + if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval { + podsToMove = append(podsToMove, pInfo) } } @@ -509,9 +528,9 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) { if err != nil { return nil, err } - pod := obj.(*v1.Pod) + pInfo := obj.(*podInfo) p.schedulingCycle++ - return pod, err + return pInfo.pod, err } // isPodUpdated checks if the pod is updated in a way that it may have become @@ -536,18 +555,23 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { defer p.lock.Unlock() if oldPod != nil { + oldPodInfo := newPodInfoNoTimestamp(oldPod) // If the pod is already in the active queue, just update it there. - if _, exists, _ := p.activeQ.Get(oldPod); exists { + if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { p.nominatedPods.update(oldPod, newPod) - err := p.activeQ.Update(newPod) + newPodInfo := newPodInfoNoTimestamp(newPod) + newPodInfo.timestamp = oldPodInfo.(*podInfo).timestamp + err := p.activeQ.Update(newPodInfo) return err } // If the pod is in the backoff queue, update it there. - if _, exists, _ := p.podBackoffQ.Get(oldPod); exists { + if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { p.nominatedPods.update(oldPod, newPod) - p.podBackoffQ.Delete(newPod) - err := p.activeQ.Add(newPod) + p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod)) + newPodInfo := newPodInfoNoTimestamp(newPod) + newPodInfo.timestamp = oldPodInfo.(*podInfo).timestamp + err := p.activeQ.Add(newPodInfo) if err == nil { p.cond.Broadcast() } @@ -556,24 +580,26 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { } // If the pod is in the unschedulable queue, updating it may make it schedulable. - if usPod := p.unschedulableQ.get(newPod); usPod != nil { + if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil { p.nominatedPods.update(oldPod, newPod) + newPodInfo := newPodInfoNoTimestamp(newPod) + newPodInfo.timestamp = usPodInfo.timestamp if isPodUpdated(oldPod, newPod) { // If the pod is updated reset backoff p.clearPodBackoff(newPod) - p.unschedulableQ.delete(usPod) - err := p.activeQ.Add(newPod) + p.unschedulableQ.delete(usPodInfo.pod) + err := p.activeQ.Add(newPodInfo) if err == nil { p.cond.Broadcast() } return err } // Pod is already in unschedulable queue and hasnt updated, no need to backoff again - p.unschedulableQ.addOrUpdate(newPod) + p.unschedulableQ.addOrUpdate(newPodInfo) return nil } // If pod is not in any of the queues, we put it in the active queue. - err := p.activeQ.Add(newPod) + err := p.activeQ.Add(p.newPodInfo(newPod)) if err == nil { p.nominatedPods.add(newPod, "") p.cond.Broadcast() @@ -587,10 +613,10 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() p.nominatedPods.delete(pod) - err := p.activeQ.Delete(pod) + err := p.activeQ.Delete(newPodInfoNoTimestamp(pod)) if err != nil { // The item was probably not found in the activeQ. p.clearPodBackoff(pod) - p.podBackoffQ.Delete(pod) + p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod)) p.unschedulableQ.delete(pod) } return nil @@ -619,13 +645,14 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { func (p *PriorityQueue) MoveAllToActiveQueue() { p.lock.Lock() defer p.lock.Unlock() - for _, pod := range p.unschedulableQ.pods { + for _, pInfo := range p.unschedulableQ.podInfoMap { + pod := pInfo.pod if p.isPodBackingOff(pod) { - if err := p.podBackoffQ.Add(pod); err != nil { + if err := p.podBackoffQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) } } else { - if err := p.activeQ.Add(pod); err != nil { + if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) } } @@ -636,14 +663,15 @@ func (p *PriorityQueue) MoveAllToActiveQueue() { } // NOTE: this function assumes lock has been acquired in caller -func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { - for _, pod := range pods { +func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*podInfo) { + for _, pInfo := range podInfoList { + pod := pInfo.pod if p.isPodBackingOff(pod) { - if err := p.podBackoffQ.Add(pod); err != nil { + if err := p.podBackoffQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) } } else { - if err := p.activeQ.Add(pod); err != nil { + if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) } } @@ -656,9 +684,10 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { // getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have // any affinity term that matches "pod". // NOTE: this function assumes lock has been acquired in caller. -func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod { - var podsToMove []*v1.Pod - for _, up := range p.unschedulableQ.pods { +func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*podInfo { + var podsToMove []*podInfo + for _, pInfo := range p.unschedulableQ.podInfoMap { + up := pInfo.pod affinity := up.Spec.Affinity if affinity != nil && affinity.PodAffinity != nil { terms := predicates.GetPodAffinityTerms(affinity.PodAffinity) @@ -669,7 +698,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod klog.Errorf("Error getting label selectors for pod: %v.", up.Name) } if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { - podsToMove = append(podsToMove, up) + podsToMove = append(podsToMove, pInfo) break } } @@ -692,16 +721,15 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod { func (p *PriorityQueue) PendingPods() []*v1.Pod { p.lock.Lock() defer p.lock.Unlock() - result := []*v1.Pod{} - for _, pod := range p.activeQ.List() { - result = append(result, pod.(*v1.Pod)) + for _, pInfo := range p.activeQ.List() { + result = append(result, pInfo.(*podInfo).pod) } - for _, pod := range p.podBackoffQ.List() { - result = append(result, pod.(*v1.Pod)) + for _, pInfo := range p.podBackoffQ.List() { + result = append(result, pInfo.(*podInfo).pod) } - for _, pod := range p.unschedulableQ.pods { - result = append(result, pod) + for _, pInfo := range p.unschedulableQ.podInfoMap { + result = append(result, pInfo.pod) } return result } @@ -731,9 +759,11 @@ func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) p.lock.Unlock() } -func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool { - bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p1.(*v1.Pod))) - bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p2.(*v1.Pod))) +func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { + pInfo1 := podInfo1.(*podInfo) + pInfo2 := podInfo2.(*podInfo) + bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.pod)) + bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.pod)) return bo1.Before(bo2) } @@ -741,47 +771,61 @@ func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool { func (p *PriorityQueue) NumUnschedulablePods() int { p.lock.RLock() defer p.lock.RUnlock() - return len(p.unschedulableQ.pods) + return len(p.unschedulableQ.podInfoMap) +} + +// newPodInfo builds a podInfo object. +func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *podInfo { + if p.clock == nil { + return &podInfo{ + pod: pod, + } + } + + return &podInfo{ + pod: pod, + timestamp: p.clock.Now(), + } } // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { - // pods is a map key by a pod's full-name and the value is a pointer to the pod. - pods map[string]*v1.Pod - keyFunc func(*v1.Pod) string + // podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo. + podInfoMap map[string]*podInfo + keyFunc func(*v1.Pod) string } -// Add adds a pod to the unschedulable pods. -func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) { - u.pods[u.keyFunc(pod)] = pod +// Add adds a pod to the unschedulable podInfoMap. +func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) { + u.podInfoMap[u.keyFunc(pInfo.pod)] = pInfo } -// Delete deletes a pod from the unschedulable pods. +// Delete deletes a pod from the unschedulable podInfoMap. func (u *UnschedulablePodsMap) delete(pod *v1.Pod) { - delete(u.pods, u.keyFunc(pod)) + delete(u.podInfoMap, u.keyFunc(pod)) } -// Get returns the pod if a pod with the same key as the key of the given "pod" +// Get returns the podInfo if a pod with the same key as the key of the given "pod" // is found in the map. It returns nil otherwise. -func (u *UnschedulablePodsMap) get(pod *v1.Pod) *v1.Pod { +func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo { podKey := u.keyFunc(pod) - if p, exists := u.pods[podKey]; exists { - return p + if pInfo, exists := u.podInfoMap[podKey]; exists { + return pInfo } return nil } -// Clear removes all the entries from the unschedulable maps. +// Clear removes all the entries from the unschedulable podInfoMap. func (u *UnschedulablePodsMap) clear() { - u.pods = make(map[string]*v1.Pod) + u.podInfoMap = make(map[string]*podInfo) } // newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap. -func newUnschedulablePodsMap() *UnschedulablePodsMap { +func newUnschedulablePodsMap(clock util.Clock) *UnschedulablePodsMap { return &UnschedulablePodsMap{ - pods: make(map[string]*v1.Pod), - keyFunc: util.GetPodFullName, + podInfoMap: make(map[string]*podInfo), + keyFunc: util.GetPodFullName, } } @@ -872,3 +916,7 @@ func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod { return nil } } + +func podInfoKeyFunc(obj interface{}) (string, error) { + return cache.MetaNamespaceKeyFunc(obj.(*podInfo).pod) +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index d2f43f9b54..9f7599c86f 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -98,13 +98,17 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. func addOrUpdateUnschedulablePod(p *PriorityQueue, pod *v1.Pod) { p.lock.Lock() defer p.lock.Unlock() - p.unschedulableQ.addOrUpdate(pod) + p.unschedulableQ.addOrUpdate(newPodInfoNoTimestamp(pod)) } func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { p.lock.Lock() defer p.lock.Unlock() - return p.unschedulableQ.get(pod) + pInfo := p.unschedulableQ.get(pod) + if pInfo != nil { + return pInfo.pod + } + return nil } func TestPriorityQueue_Add(t *testing.T) { @@ -275,7 +279,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) { wg.Wait() // all other pods should be in active queue again for i := 1; i < totalNum; i++ { - if _, exists, _ := q.activeQ.Get(&expectedPods[i]); !exists { + if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&expectedPods[i])); !exists { t.Errorf("Expected %v to be added to activeQ.", expectedPods[i].Name) } } @@ -301,7 +305,7 @@ func TestPriorityQueue_Pop(t *testing.T) { func TestPriorityQueue_Update(t *testing.T) { q := NewPriorityQueue(nil) q.Update(nil, &highPriorityPod) - if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists { + if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) } if len(q.nominatedPods.nominatedPods) != 0 { @@ -318,15 +322,15 @@ func TestPriorityQueue_Update(t *testing.T) { // Updating an unschedulable pod which is not in any of the two queues, should // add the pod to activeQ. q.Update(&unschedulablePod, &unschedulablePod) - if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { + if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists { t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name) } // Updating a pod that is already in activeQ, should not change it. q.Update(&unschedulablePod, &unschedulablePod) - if len(q.unschedulableQ.pods) != 0 { + if len(q.unschedulableQ.podInfoMap) != 0 { t.Error("Expected unschedulableQ to be empty.") } - if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { + if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists { t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name) } if p, err := q.Pop(); err != nil || p != &highPriNominatedPod { @@ -341,10 +345,10 @@ func TestPriorityQueue_Delete(t *testing.T) { if err := q.Delete(&highPriNominatedPod); err != nil { t.Errorf("delete failed: %v", err) } - if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { + if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists { t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name) } - if _, exists, _ := q.activeQ.Get(&highPriNominatedPod); exists { + if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriNominatedPod)); exists { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name) } if len(q.nominatedPods.nominatedPods) != 1 { @@ -416,7 +420,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { if getUnschedulablePod(q, affinityPod) != nil { t.Error("affinityPod is still in the unschedulableQ.") } - if _, exists, _ := q.activeQ.Get(affinityPod); !exists { + if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(affinityPod)); !exists { t.Error("affinityPod is not moved to activeQ.") } // Check that the other pod is still in the unschedulableQ. @@ -588,98 +592,98 @@ func TestUnschedulablePodsMap(t *testing.T) { tests := []struct { name string podsToAdd []*v1.Pod - expectedMapAfterAdd map[string]*v1.Pod + expectedMapAfterAdd map[string]*podInfo podsToUpdate []*v1.Pod - expectedMapAfterUpdate map[string]*v1.Pod + expectedMapAfterUpdate map[string]*podInfo podsToDelete []*v1.Pod - expectedMapAfterDelete map[string]*v1.Pod + expectedMapAfterDelete map[string]*podInfo }{ { name: "create, update, delete subset of pods", podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]}, - expectedMapAfterAdd: map[string]*v1.Pod{ - util.GetPodFullName(pods[0]): pods[0], - util.GetPodFullName(pods[1]): pods[1], - util.GetPodFullName(pods[2]): pods[2], - util.GetPodFullName(pods[3]): pods[3], + expectedMapAfterAdd: map[string]*podInfo{ + util.GetPodFullName(pods[0]): {pod: pods[0]}, + util.GetPodFullName(pods[1]): {pod: pods[1]}, + util.GetPodFullName(pods[2]): {pod: pods[2]}, + util.GetPodFullName(pods[3]): {pod: pods[3]}, }, podsToUpdate: []*v1.Pod{updatedPods[0]}, - expectedMapAfterUpdate: map[string]*v1.Pod{ - util.GetPodFullName(pods[0]): updatedPods[0], - util.GetPodFullName(pods[1]): pods[1], - util.GetPodFullName(pods[2]): pods[2], - util.GetPodFullName(pods[3]): pods[3], + expectedMapAfterUpdate: map[string]*podInfo{ + util.GetPodFullName(pods[0]): {pod: updatedPods[0]}, + util.GetPodFullName(pods[1]): {pod: pods[1]}, + util.GetPodFullName(pods[2]): {pod: pods[2]}, + util.GetPodFullName(pods[3]): {pod: pods[3]}, }, podsToDelete: []*v1.Pod{pods[0], pods[1]}, - expectedMapAfterDelete: map[string]*v1.Pod{ - util.GetPodFullName(pods[2]): pods[2], - util.GetPodFullName(pods[3]): pods[3], + expectedMapAfterDelete: map[string]*podInfo{ + util.GetPodFullName(pods[2]): {pod: pods[2]}, + util.GetPodFullName(pods[3]): {pod: pods[3]}, }, }, { name: "create, update, delete all", podsToAdd: []*v1.Pod{pods[0], pods[3]}, - expectedMapAfterAdd: map[string]*v1.Pod{ - util.GetPodFullName(pods[0]): pods[0], - util.GetPodFullName(pods[3]): pods[3], + expectedMapAfterAdd: map[string]*podInfo{ + util.GetPodFullName(pods[0]): {pod: pods[0]}, + util.GetPodFullName(pods[3]): {pod: pods[3]}, }, podsToUpdate: []*v1.Pod{updatedPods[3]}, - expectedMapAfterUpdate: map[string]*v1.Pod{ - util.GetPodFullName(pods[0]): pods[0], - util.GetPodFullName(pods[3]): updatedPods[3], + expectedMapAfterUpdate: map[string]*podInfo{ + util.GetPodFullName(pods[0]): {pod: pods[0]}, + util.GetPodFullName(pods[3]): {pod: updatedPods[3]}, }, podsToDelete: []*v1.Pod{pods[0], pods[3]}, - expectedMapAfterDelete: map[string]*v1.Pod{}, + expectedMapAfterDelete: map[string]*podInfo{}, }, { name: "delete non-existing and existing pods", podsToAdd: []*v1.Pod{pods[1], pods[2]}, - expectedMapAfterAdd: map[string]*v1.Pod{ - util.GetPodFullName(pods[1]): pods[1], - util.GetPodFullName(pods[2]): pods[2], + expectedMapAfterAdd: map[string]*podInfo{ + util.GetPodFullName(pods[1]): {pod: pods[1]}, + util.GetPodFullName(pods[2]): {pod: pods[2]}, }, podsToUpdate: []*v1.Pod{updatedPods[1]}, - expectedMapAfterUpdate: map[string]*v1.Pod{ - util.GetPodFullName(pods[1]): updatedPods[1], - util.GetPodFullName(pods[2]): pods[2], + expectedMapAfterUpdate: map[string]*podInfo{ + util.GetPodFullName(pods[1]): {pod: updatedPods[1]}, + util.GetPodFullName(pods[2]): {pod: pods[2]}, }, podsToDelete: []*v1.Pod{pods[2], pods[3]}, - expectedMapAfterDelete: map[string]*v1.Pod{ - util.GetPodFullName(pods[1]): updatedPods[1], + expectedMapAfterDelete: map[string]*podInfo{ + util.GetPodFullName(pods[1]): {pod: updatedPods[1]}, }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - upm := newUnschedulablePodsMap() + upm := newUnschedulablePodsMap(nil) for _, p := range test.podsToAdd { - upm.addOrUpdate(p) + upm.addOrUpdate(newPodInfoNoTimestamp(p)) } - if !reflect.DeepEqual(upm.pods, test.expectedMapAfterAdd) { + if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterAdd) { t.Errorf("Unexpected map after adding pods. Expected: %v, got: %v", - test.expectedMapAfterAdd, upm.pods) + test.expectedMapAfterAdd, upm.podInfoMap) } if len(test.podsToUpdate) > 0 { for _, p := range test.podsToUpdate { - upm.addOrUpdate(p) + upm.addOrUpdate(newPodInfoNoTimestamp(p)) } - if !reflect.DeepEqual(upm.pods, test.expectedMapAfterUpdate) { + if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterUpdate) { t.Errorf("Unexpected map after updating pods. Expected: %v, got: %v", - test.expectedMapAfterUpdate, upm.pods) + test.expectedMapAfterUpdate, upm.podInfoMap) } } for _, p := range test.podsToDelete { upm.delete(p) } - if !reflect.DeepEqual(upm.pods, test.expectedMapAfterDelete) { + if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterDelete) { t.Errorf("Unexpected map after deleting pods. Expected: %v, got: %v", - test.expectedMapAfterDelete, upm.pods) + test.expectedMapAfterDelete, upm.podInfoMap) } upm.clear() - if len(upm.pods) != 0 { - t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods)) + if len(upm.podInfoMap) != 0 { + t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.podInfoMap)) } }) } @@ -799,14 +803,15 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { NominatedNodeName: "node1", }, } + // Update pod condition to unschedulable. podutil.UpdatePodCondition(&unschedulablePod.Status, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: "fake scheduling failure", - LastProbeTime: metav1.Now(), + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", }) + // Put in the unschedulable queue q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) // Clear its backoff to simulate backoff its expiration @@ -844,12 +849,12 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // And then unschedulablePod was determined as unschedulable AGAIN. podutil.UpdatePodCondition(&unschedulablePod.Status, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: "fake scheduling failure", - LastProbeTime: metav1.Now(), + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", }) + // And then, put unschedulable pod to the unschedulable queue q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) // Clear its backoff to simulate its backoff expiration @@ -962,27 +967,27 @@ func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) { }, } - addOrUpdateUnschedulablePod(q, &highPod) - addOrUpdateUnschedulablePod(q, &midPod) - // Update pod condition to highPod. podutil.UpdatePodCondition(&highPod.Status, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: "fake scheduling failure", - LastProbeTime: metav1.Time{Time: time.Now().Add(-1 * unschedulableQTimeInterval)}, + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", }) // Update pod condition to midPod. podutil.UpdatePodCondition(&midPod.Status, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: "fake scheduling failure", - LastProbeTime: metav1.Time{Time: time.Now().Add(-1 * unschedulableQTimeInterval)}, + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", }) + q.unschedulableQ.addOrUpdate(newPodInfoNoTimestamp(&highPod)) + q.unschedulableQ.addOrUpdate(newPodInfoNoTimestamp(&midPod)) + q.unschedulableQ.podInfoMap[util.GetPodFullName(&highPod)].timestamp = time.Now().Add(-1 * unschedulableQTimeInterval) + q.unschedulableQ.podInfoMap[util.GetPodFullName(&midPod)].timestamp = time.Now().Add(-1 * unschedulableQTimeInterval) + if p, err := q.Pop(); err != nil || p != &highPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) } @@ -990,3 +995,143 @@ func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) } } + +// TestPodTimestamp tests the operations related to podInfo. +func TestPodTimestamp(t *testing.T) { + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Namespace: "ns1", + UID: types.UID("tp-1"), + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + + pod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Namespace: "ns2", + UID: types.UID("tp-2"), + }, + Status: v1.PodStatus{ + NominatedNodeName: "node2", + }, + } + + pInfo1 := &podInfo{ + pod: pod1, + timestamp: util.RealClock{}.Now(), + } + pInfo2 := &podInfo{ + pod: pod2, + timestamp: util.RealClock{}.Now().Add(1 * time.Second), + } + + var queue *PriorityQueue + type operation = func() + addPodActiveQ := func(pInfo *podInfo) operation { + return func() { + queue.lock.Lock() + defer queue.lock.Unlock() + queue.activeQ.Add(pInfo) + } + } + updatePodActiveQ := func(pInfo *podInfo) operation { + return func() { + queue.lock.Lock() + defer queue.lock.Unlock() + queue.activeQ.Update(pInfo) + } + } + addPodUnschedulableQ := func(pInfo *podInfo) operation { + return func() { + queue.lock.Lock() + defer queue.lock.Unlock() + // Update pod condition to unschedulable. + podutil.UpdatePodCondition(&pInfo.pod.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + }) + queue.unschedulableQ.addOrUpdate(pInfo) + } + } + addPodBackoffQ := func(pInfo *podInfo) operation { + return func() { + queue.lock.Lock() + defer queue.lock.Unlock() + queue.podBackoffQ.Add(pInfo) + } + } + moveAllToActiveQ := func() operation { + return func() { + queue.MoveAllToActiveQueue() + } + } + flushBackoffQ := func() operation { + return func() { + queue.flushBackoffQCompleted() + } + } + tests := []struct { + name string + operations []operation + expected []*podInfo + }{ + { + name: "add two pod to activeQ and sort them by the timestamp", + operations: []operation{ + addPodActiveQ(pInfo2), addPodActiveQ(pInfo1), + }, + expected: []*podInfo{pInfo1, pInfo2}, + }, + { + name: "update two pod to activeQ and sort them by the timestamp", + operations: []operation{ + updatePodActiveQ(pInfo2), updatePodActiveQ(pInfo1), + }, + expected: []*podInfo{pInfo1, pInfo2}, + }, + { + name: "add two pod to unschedulableQ then move them to activeQ and sort them by the timestamp", + operations: []operation{ + addPodUnschedulableQ(pInfo2), addPodUnschedulableQ(pInfo1), moveAllToActiveQ(), + }, + expected: []*podInfo{pInfo1, pInfo2}, + }, + { + name: "add one pod to BackoffQ and move it to activeQ", + operations: []operation{ + addPodActiveQ(pInfo2), addPodBackoffQ(pInfo1), flushBackoffQ(), moveAllToActiveQ(), + }, + expected: []*podInfo{pInfo1, pInfo2}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + queue = NewPriorityQueue(nil) + var podInfoList []*podInfo + + for _, op := range test.operations { + op() + } + + for i := 0; i < len(test.expected); i++ { + if pInfo, err := queue.activeQ.Pop(); err != nil { + t.Errorf("Error while popping the head of the queue: %v", err) + } else { + podInfoList = append(podInfoList, pInfo.(*podInfo)) + } + } + + if !reflect.DeepEqual(test.expected, podInfoList) { + t.Errorf("Unexpected podInfo list. Expected: %v, got: %v", + test.expected, podInfoList) + } + }) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b78bab7d7a..9b1560f3a8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -265,11 +265,10 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s sched.config.Error(pod, err) sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message) sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - LastProbeTime: metav1.Now(), - Reason: reason, - Message: err.Error(), + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: reason, + Message: err.Error(), }) }