Merge pull request #73700 from denkensk/no-updae-timestamp-each-scheduling-attempt

Don't update the Pod object after each scheduling attempt
pull/564/head
Kubernetes Prow Robot 2019-02-22 01:47:48 -08:00 committed by GitHub
commit bf3b5e5563
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 355 additions and 163 deletions

View File

@ -260,15 +260,29 @@ func podTimestamp(pod *v1.Pod) *metav1.Time {
return &condition.LastProbeTime
}
// podInfo is minimum cell in the scheduling queue.
type podInfo struct {
pod *v1.Pod
// The time pod added to the scheduling queue.
timestamp time.Time
}
// newPodInfoNoTimestamp builds a podInfo object without timestamp.
func newPodInfoNoTimestamp(pod *v1.Pod) *podInfo {
return &podInfo{
pod: pod,
}
}
// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// podTimestamp.
func activeQComp(pod1, pod2 interface{}) bool {
p1 := pod1.(*v1.Pod)
p2 := pod2.(*v1.Pod)
prio1 := util.GetPodPriority(p1)
prio2 := util.GetPodPriority(p2)
return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2)))
// podInfo.timestamp.
func activeQComp(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*podInfo)
pInfo2 := podInfo2.(*podInfo)
prio1 := util.GetPodPriority(pInfo1.pod)
prio2 := util.GetPodPriority(pInfo2.pod)
return (prio1 > prio2) || (prio1 == prio2 && pInfo1.timestamp.Before(pInfo2.timestamp))
}
// NewPriorityQueue creates a PriorityQueue object.
@ -282,13 +296,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
clock: clock,
stop: stop,
podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock),
activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
activeQ: util.NewHeap(podInfoKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(clock),
nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted)
pq.podBackoffQ = util.NewHeap(podInfoKeyFunc, pq.podsCompareBackoffCompleted)
pq.run()
@ -306,7 +320,8 @@ func (p *PriorityQueue) run() {
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.activeQ.Add(pod); err != nil {
pInfo := p.newPodInfo(pod)
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
return err
}
@ -315,7 +330,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.unschedulableQ.delete(pod)
}
// Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pod); err == nil {
if err := p.podBackoffQ.Delete(pInfo); err == nil {
klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
}
p.nominatedPods.add(pod, "")
@ -332,13 +347,15 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
if p.unschedulableQ.get(pod) != nil {
return nil
}
if _, exists, _ := p.activeQ.Get(pod); exists {
pInfo := p.newPodInfo(pod)
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return nil
}
if _, exists, _ := p.podBackoffQ.Get(pod); exists {
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return nil
}
err := p.activeQ.Add(pod)
err := p.activeQ.Add(pInfo)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
@ -405,22 +422,24 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingC
if p.unschedulableQ.get(pod) != nil {
return fmt.Errorf("pod is already present in unschedulableQ")
}
if _, exists, _ := p.activeQ.Get(pod); exists {
pInfo := p.newPodInfo(pod)
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if _, exists, _ := p.podBackoffQ.Get(pod); exists {
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return fmt.Errorf("pod is already present in the backoffQ")
}
if podSchedulingCycle > p.moveRequestCycle && isPodUnschedulable(pod) {
p.backoffPod(pod)
p.unschedulableQ.addOrUpdate(pod)
p.unschedulableQ.addOrUpdate(pInfo)
p.nominatedPods.add(pod, "")
return nil
}
// If a move request has been received and the pod is subject to backoff, move it to the BackoffQ.
if p.isPodBackingOff(pod) && isPodUnschedulable(pod) {
err := p.podBackoffQ.Add(pod)
err := p.podBackoffQ.Add(pInfo)
if err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
} else {
@ -429,7 +448,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingC
return err
}
err := p.activeQ.Add(pod)
err := p.activeQ.Add(pInfo)
if err == nil {
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
@ -443,16 +462,16 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
defer p.lock.Unlock()
for {
rawPod := p.podBackoffQ.Peek()
if rawPod == nil {
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
return
}
pod := rawPod.(*v1.Pod)
pod := rawPodInfo.(*podInfo).pod
boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
if !found {
klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
p.podBackoffQ.Pop()
p.activeQ.Add(pod)
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
continue
}
@ -465,7 +484,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
return
}
p.activeQ.Add(pod)
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
}
}
@ -476,12 +495,12 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*v1.Pod
var podsToMove []*podInfo
currentTime := p.clock.Now()
for _, pod := range p.unschedulableQ.pods {
lastScheduleTime := podTimestamp(pod)
if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pod)
for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.timestamp
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pInfo)
}
}
@ -509,9 +528,9 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
if err != nil {
return nil, err
}
pod := obj.(*v1.Pod)
pInfo := obj.(*podInfo)
p.schedulingCycle++
return pod, err
return pInfo.pod, err
}
// isPodUpdated checks if the pod is updated in a way that it may have become
@ -536,18 +555,23 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
defer p.lock.Unlock()
if oldPod != nil {
oldPodInfo := newPodInfoNoTimestamp(oldPod)
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(oldPod); exists {
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
p.nominatedPods.update(oldPod, newPod)
err := p.activeQ.Update(newPod)
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = oldPodInfo.(*podInfo).timestamp
err := p.activeQ.Update(newPodInfo)
return err
}
// If the pod is in the backoff queue, update it there.
if _, exists, _ := p.podBackoffQ.Get(oldPod); exists {
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
p.nominatedPods.update(oldPod, newPod)
p.podBackoffQ.Delete(newPod)
err := p.activeQ.Add(newPod)
p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod))
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = oldPodInfo.(*podInfo).timestamp
err := p.activeQ.Add(newPodInfo)
if err == nil {
p.cond.Broadcast()
}
@ -556,24 +580,26 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPod := p.unschedulableQ.get(newPod); usPod != nil {
if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
p.nominatedPods.update(oldPod, newPod)
newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = usPodInfo.timestamp
if isPodUpdated(oldPod, newPod) {
// If the pod is updated reset backoff
p.clearPodBackoff(newPod)
p.unschedulableQ.delete(usPod)
err := p.activeQ.Add(newPod)
p.unschedulableQ.delete(usPodInfo.pod)
err := p.activeQ.Add(newPodInfo)
if err == nil {
p.cond.Broadcast()
}
return err
}
// Pod is already in unschedulable queue and hasnt updated, no need to backoff again
p.unschedulableQ.addOrUpdate(newPod)
p.unschedulableQ.addOrUpdate(newPodInfo)
return nil
}
// If pod is not in any of the queues, we put it in the active queue.
err := p.activeQ.Add(newPod)
err := p.activeQ.Add(p.newPodInfo(newPod))
if err == nil {
p.nominatedPods.add(newPod, "")
p.cond.Broadcast()
@ -587,10 +613,10 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.nominatedPods.delete(pod)
err := p.activeQ.Delete(pod)
err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
if err != nil { // The item was probably not found in the activeQ.
p.clearPodBackoff(pod)
p.podBackoffQ.Delete(pod)
p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
p.unschedulableQ.delete(pod)
}
return nil
@ -619,13 +645,14 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()
for _, pod := range p.unschedulableQ.pods {
for _, pInfo := range p.unschedulableQ.podInfoMap {
pod := pInfo.pod
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pod); err != nil {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
if err := p.activeQ.Add(pod); err != nil {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
}
}
@ -636,14 +663,15 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
for _, pod := range pods {
func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*podInfo) {
for _, pInfo := range podInfoList {
pod := pInfo.pod
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pod); err != nil {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
if err := p.activeQ.Add(pod); err != nil {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
}
}
@ -656,9 +684,10 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
var podsToMove []*v1.Pod
for _, up := range p.unschedulableQ.pods {
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*podInfo {
var podsToMove []*podInfo
for _, pInfo := range p.unschedulableQ.podInfoMap {
up := pInfo.pod
affinity := up.Spec.Affinity
if affinity != nil && affinity.PodAffinity != nil {
terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
@ -669,7 +698,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
podsToMove = append(podsToMove, up)
podsToMove = append(podsToMove, pInfo)
break
}
}
@ -692,16 +721,15 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
func (p *PriorityQueue) PendingPods() []*v1.Pod {
p.lock.Lock()
defer p.lock.Unlock()
result := []*v1.Pod{}
for _, pod := range p.activeQ.List() {
result = append(result, pod.(*v1.Pod))
for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.(*podInfo).pod)
}
for _, pod := range p.podBackoffQ.List() {
result = append(result, pod.(*v1.Pod))
for _, pInfo := range p.podBackoffQ.List() {
result = append(result, pInfo.(*podInfo).pod)
}
for _, pod := range p.unschedulableQ.pods {
result = append(result, pod)
for _, pInfo := range p.unschedulableQ.podInfoMap {
result = append(result, pInfo.pod)
}
return result
}
@ -731,9 +759,11 @@ func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
p.lock.Unlock()
}
func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool {
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p1.(*v1.Pod)))
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p2.(*v1.Pod)))
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*podInfo)
pInfo2 := podInfo2.(*podInfo)
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.pod))
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.pod))
return bo1.Before(bo2)
}
@ -741,47 +771,61 @@ func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool {
func (p *PriorityQueue) NumUnschedulablePods() int {
p.lock.RLock()
defer p.lock.RUnlock()
return len(p.unschedulableQ.pods)
return len(p.unschedulableQ.podInfoMap)
}
// newPodInfo builds a podInfo object.
func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *podInfo {
if p.clock == nil {
return &podInfo{
pod: pod,
}
}
return &podInfo{
pod: pod,
timestamp: p.clock.Now(),
}
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {
// pods is a map key by a pod's full-name and the value is a pointer to the pod.
pods map[string]*v1.Pod
keyFunc func(*v1.Pod) string
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo.
podInfoMap map[string]*podInfo
keyFunc func(*v1.Pod) string
}
// Add adds a pod to the unschedulable pods.
func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
u.pods[u.keyFunc(pod)] = pod
// Add adds a pod to the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) {
u.podInfoMap[u.keyFunc(pInfo.pod)] = pInfo
}
// Delete deletes a pod from the unschedulable pods.
// Delete deletes a pod from the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
delete(u.pods, u.keyFunc(pod))
delete(u.podInfoMap, u.keyFunc(pod))
}
// Get returns the pod if a pod with the same key as the key of the given "pod"
// Get returns the podInfo if a pod with the same key as the key of the given "pod"
// is found in the map. It returns nil otherwise.
func (u *UnschedulablePodsMap) get(pod *v1.Pod) *v1.Pod {
func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo {
podKey := u.keyFunc(pod)
if p, exists := u.pods[podKey]; exists {
return p
if pInfo, exists := u.podInfoMap[podKey]; exists {
return pInfo
}
return nil
}
// Clear removes all the entries from the unschedulable maps.
// Clear removes all the entries from the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) clear() {
u.pods = make(map[string]*v1.Pod)
u.podInfoMap = make(map[string]*podInfo)
}
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
func newUnschedulablePodsMap() *UnschedulablePodsMap {
func newUnschedulablePodsMap(clock util.Clock) *UnschedulablePodsMap {
return &UnschedulablePodsMap{
pods: make(map[string]*v1.Pod),
keyFunc: util.GetPodFullName,
podInfoMap: make(map[string]*podInfo),
keyFunc: util.GetPodFullName,
}
}
@ -872,3 +916,7 @@ func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod {
return nil
}
}
func podInfoKeyFunc(obj interface{}) (string, error) {
return cache.MetaNamespaceKeyFunc(obj.(*podInfo).pod)
}

View File

@ -98,13 +98,17 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.
func addOrUpdateUnschedulablePod(p *PriorityQueue, pod *v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
p.unschedulableQ.addOrUpdate(pod)
p.unschedulableQ.addOrUpdate(newPodInfoNoTimestamp(pod))
}
func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
p.lock.Lock()
defer p.lock.Unlock()
return p.unschedulableQ.get(pod)
pInfo := p.unschedulableQ.get(pod)
if pInfo != nil {
return pInfo.pod
}
return nil
}
func TestPriorityQueue_Add(t *testing.T) {
@ -275,7 +279,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Async(t *testing.T) {
wg.Wait()
// all other pods should be in active queue again
for i := 1; i < totalNum; i++ {
if _, exists, _ := q.activeQ.Get(&expectedPods[i]); !exists {
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&expectedPods[i])); !exists {
t.Errorf("Expected %v to be added to activeQ.", expectedPods[i].Name)
}
}
@ -301,7 +305,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
func TestPriorityQueue_Update(t *testing.T) {
q := NewPriorityQueue(nil)
q.Update(nil, &highPriorityPod)
if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists {
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
}
if len(q.nominatedPods.nominatedPods) != 0 {
@ -318,15 +322,15 @@ func TestPriorityQueue_Update(t *testing.T) {
// Updating an unschedulable pod which is not in any of the two queues, should
// add the pod to activeQ.
q.Update(&unschedulablePod, &unschedulablePod)
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
}
// Updating a pod that is already in activeQ, should not change it.
q.Update(&unschedulablePod, &unschedulablePod)
if len(q.unschedulableQ.pods) != 0 {
if len(q.unschedulableQ.podInfoMap) != 0 {
t.Error("Expected unschedulableQ to be empty.")
}
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
}
if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
@ -341,10 +345,10 @@ func TestPriorityQueue_Delete(t *testing.T) {
if err := q.Delete(&highPriNominatedPod); err != nil {
t.Errorf("delete failed: %v", err)
}
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
}
if _, exists, _ := q.activeQ.Get(&highPriNominatedPod); exists {
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriNominatedPod)); exists {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
}
if len(q.nominatedPods.nominatedPods) != 1 {
@ -416,7 +420,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
if getUnschedulablePod(q, affinityPod) != nil {
t.Error("affinityPod is still in the unschedulableQ.")
}
if _, exists, _ := q.activeQ.Get(affinityPod); !exists {
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(affinityPod)); !exists {
t.Error("affinityPod is not moved to activeQ.")
}
// Check that the other pod is still in the unschedulableQ.
@ -588,98 +592,98 @@ func TestUnschedulablePodsMap(t *testing.T) {
tests := []struct {
name string
podsToAdd []*v1.Pod
expectedMapAfterAdd map[string]*v1.Pod
expectedMapAfterAdd map[string]*podInfo
podsToUpdate []*v1.Pod
expectedMapAfterUpdate map[string]*v1.Pod
expectedMapAfterUpdate map[string]*podInfo
podsToDelete []*v1.Pod
expectedMapAfterDelete map[string]*v1.Pod
expectedMapAfterDelete map[string]*podInfo
}{
{
name: "create, update, delete subset of pods",
podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]},
expectedMapAfterAdd: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): pods[0],
util.GetPodFullName(pods[1]): pods[1],
util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3],
expectedMapAfterAdd: map[string]*podInfo{
util.GetPodFullName(pods[0]): {pod: pods[0]},
util.GetPodFullName(pods[1]): {pod: pods[1]},
util.GetPodFullName(pods[2]): {pod: pods[2]},
util.GetPodFullName(pods[3]): {pod: pods[3]},
},
podsToUpdate: []*v1.Pod{updatedPods[0]},
expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): updatedPods[0],
util.GetPodFullName(pods[1]): pods[1],
util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3],
expectedMapAfterUpdate: map[string]*podInfo{
util.GetPodFullName(pods[0]): {pod: updatedPods[0]},
util.GetPodFullName(pods[1]): {pod: pods[1]},
util.GetPodFullName(pods[2]): {pod: pods[2]},
util.GetPodFullName(pods[3]): {pod: pods[3]},
},
podsToDelete: []*v1.Pod{pods[0], pods[1]},
expectedMapAfterDelete: map[string]*v1.Pod{
util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3],
expectedMapAfterDelete: map[string]*podInfo{
util.GetPodFullName(pods[2]): {pod: pods[2]},
util.GetPodFullName(pods[3]): {pod: pods[3]},
},
},
{
name: "create, update, delete all",
podsToAdd: []*v1.Pod{pods[0], pods[3]},
expectedMapAfterAdd: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): pods[0],
util.GetPodFullName(pods[3]): pods[3],
expectedMapAfterAdd: map[string]*podInfo{
util.GetPodFullName(pods[0]): {pod: pods[0]},
util.GetPodFullName(pods[3]): {pod: pods[3]},
},
podsToUpdate: []*v1.Pod{updatedPods[3]},
expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): pods[0],
util.GetPodFullName(pods[3]): updatedPods[3],
expectedMapAfterUpdate: map[string]*podInfo{
util.GetPodFullName(pods[0]): {pod: pods[0]},
util.GetPodFullName(pods[3]): {pod: updatedPods[3]},
},
podsToDelete: []*v1.Pod{pods[0], pods[3]},
expectedMapAfterDelete: map[string]*v1.Pod{},
expectedMapAfterDelete: map[string]*podInfo{},
},
{
name: "delete non-existing and existing pods",
podsToAdd: []*v1.Pod{pods[1], pods[2]},
expectedMapAfterAdd: map[string]*v1.Pod{
util.GetPodFullName(pods[1]): pods[1],
util.GetPodFullName(pods[2]): pods[2],
expectedMapAfterAdd: map[string]*podInfo{
util.GetPodFullName(pods[1]): {pod: pods[1]},
util.GetPodFullName(pods[2]): {pod: pods[2]},
},
podsToUpdate: []*v1.Pod{updatedPods[1]},
expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[1]): updatedPods[1],
util.GetPodFullName(pods[2]): pods[2],
expectedMapAfterUpdate: map[string]*podInfo{
util.GetPodFullName(pods[1]): {pod: updatedPods[1]},
util.GetPodFullName(pods[2]): {pod: pods[2]},
},
podsToDelete: []*v1.Pod{pods[2], pods[3]},
expectedMapAfterDelete: map[string]*v1.Pod{
util.GetPodFullName(pods[1]): updatedPods[1],
expectedMapAfterDelete: map[string]*podInfo{
util.GetPodFullName(pods[1]): {pod: updatedPods[1]},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
upm := newUnschedulablePodsMap()
upm := newUnschedulablePodsMap(nil)
for _, p := range test.podsToAdd {
upm.addOrUpdate(p)
upm.addOrUpdate(newPodInfoNoTimestamp(p))
}
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterAdd) {
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterAdd) {
t.Errorf("Unexpected map after adding pods. Expected: %v, got: %v",
test.expectedMapAfterAdd, upm.pods)
test.expectedMapAfterAdd, upm.podInfoMap)
}
if len(test.podsToUpdate) > 0 {
for _, p := range test.podsToUpdate {
upm.addOrUpdate(p)
upm.addOrUpdate(newPodInfoNoTimestamp(p))
}
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterUpdate) {
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterUpdate) {
t.Errorf("Unexpected map after updating pods. Expected: %v, got: %v",
test.expectedMapAfterUpdate, upm.pods)
test.expectedMapAfterUpdate, upm.podInfoMap)
}
}
for _, p := range test.podsToDelete {
upm.delete(p)
}
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterDelete) {
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterDelete) {
t.Errorf("Unexpected map after deleting pods. Expected: %v, got: %v",
test.expectedMapAfterDelete, upm.pods)
test.expectedMapAfterDelete, upm.podInfoMap)
}
upm.clear()
if len(upm.pods) != 0 {
t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods))
if len(upm.podInfoMap) != 0 {
t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.podInfoMap))
}
})
}
@ -799,14 +803,15 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
NominatedNodeName: "node1",
},
}
// Update pod condition to unschedulable.
podutil.UpdatePodCondition(&unschedulablePod.Status, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
LastProbeTime: metav1.Now(),
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
})
// Put in the unschedulable queue
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
// Clear its backoff to simulate backoff its expiration
@ -844,12 +849,12 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// And then unschedulablePod was determined as unschedulable AGAIN.
podutil.UpdatePodCondition(&unschedulablePod.Status, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
LastProbeTime: metav1.Now(),
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
})
// And then, put unschedulable pod to the unschedulable queue
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
// Clear its backoff to simulate its backoff expiration
@ -962,27 +967,27 @@ func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) {
},
}
addOrUpdateUnschedulablePod(q, &highPod)
addOrUpdateUnschedulablePod(q, &midPod)
// Update pod condition to highPod.
podutil.UpdatePodCondition(&highPod.Status, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
LastProbeTime: metav1.Time{Time: time.Now().Add(-1 * unschedulableQTimeInterval)},
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
})
// Update pod condition to midPod.
podutil.UpdatePodCondition(&midPod.Status, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
LastProbeTime: metav1.Time{Time: time.Now().Add(-1 * unschedulableQTimeInterval)},
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
})
q.unschedulableQ.addOrUpdate(newPodInfoNoTimestamp(&highPod))
q.unschedulableQ.addOrUpdate(newPodInfoNoTimestamp(&midPod))
q.unschedulableQ.podInfoMap[util.GetPodFullName(&highPod)].timestamp = time.Now().Add(-1 * unschedulableQTimeInterval)
q.unschedulableQ.podInfoMap[util.GetPodFullName(&midPod)].timestamp = time.Now().Add(-1 * unschedulableQTimeInterval)
if p, err := q.Pop(); err != nil || p != &highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
}
@ -990,3 +995,143 @@ func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
}
// TestPodTimestamp tests the operations related to podInfo.
func TestPodTimestamp(t *testing.T) {
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-1",
Namespace: "ns1",
UID: types.UID("tp-1"),
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
}
pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-2",
Namespace: "ns2",
UID: types.UID("tp-2"),
},
Status: v1.PodStatus{
NominatedNodeName: "node2",
},
}
pInfo1 := &podInfo{
pod: pod1,
timestamp: util.RealClock{}.Now(),
}
pInfo2 := &podInfo{
pod: pod2,
timestamp: util.RealClock{}.Now().Add(1 * time.Second),
}
var queue *PriorityQueue
type operation = func()
addPodActiveQ := func(pInfo *podInfo) operation {
return func() {
queue.lock.Lock()
defer queue.lock.Unlock()
queue.activeQ.Add(pInfo)
}
}
updatePodActiveQ := func(pInfo *podInfo) operation {
return func() {
queue.lock.Lock()
defer queue.lock.Unlock()
queue.activeQ.Update(pInfo)
}
}
addPodUnschedulableQ := func(pInfo *podInfo) operation {
return func() {
queue.lock.Lock()
defer queue.lock.Unlock()
// Update pod condition to unschedulable.
podutil.UpdatePodCondition(&pInfo.pod.Status, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
})
queue.unschedulableQ.addOrUpdate(pInfo)
}
}
addPodBackoffQ := func(pInfo *podInfo) operation {
return func() {
queue.lock.Lock()
defer queue.lock.Unlock()
queue.podBackoffQ.Add(pInfo)
}
}
moveAllToActiveQ := func() operation {
return func() {
queue.MoveAllToActiveQueue()
}
}
flushBackoffQ := func() operation {
return func() {
queue.flushBackoffQCompleted()
}
}
tests := []struct {
name string
operations []operation
expected []*podInfo
}{
{
name: "add two pod to activeQ and sort them by the timestamp",
operations: []operation{
addPodActiveQ(pInfo2), addPodActiveQ(pInfo1),
},
expected: []*podInfo{pInfo1, pInfo2},
},
{
name: "update two pod to activeQ and sort them by the timestamp",
operations: []operation{
updatePodActiveQ(pInfo2), updatePodActiveQ(pInfo1),
},
expected: []*podInfo{pInfo1, pInfo2},
},
{
name: "add two pod to unschedulableQ then move them to activeQ and sort them by the timestamp",
operations: []operation{
addPodUnschedulableQ(pInfo2), addPodUnschedulableQ(pInfo1), moveAllToActiveQ(),
},
expected: []*podInfo{pInfo1, pInfo2},
},
{
name: "add one pod to BackoffQ and move it to activeQ",
operations: []operation{
addPodActiveQ(pInfo2), addPodBackoffQ(pInfo1), flushBackoffQ(), moveAllToActiveQ(),
},
expected: []*podInfo{pInfo1, pInfo2},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
queue = NewPriorityQueue(nil)
var podInfoList []*podInfo
for _, op := range test.operations {
op()
}
for i := 0; i < len(test.expected); i++ {
if pInfo, err := queue.activeQ.Pop(); err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
} else {
podInfoList = append(podInfoList, pInfo.(*podInfo))
}
}
if !reflect.DeepEqual(test.expected, podInfoList) {
t.Errorf("Unexpected podInfo list. Expected: %v, got: %v",
test.expected, podInfoList)
}
})
}
}

View File

@ -265,11 +265,10 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s
sched.config.Error(pod, err)
sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
LastProbeTime: metav1.Now(),
Reason: reason,
Message: err.Error(),
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
})
}