Merge pull request #59671 from bsalamat/sched_queue_perf

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Improve performance of scheduling queue by adding a hash map to track all pods with a nominatedNodeName

**What this PR does / why we need it**:
Our investigations show that there is a performance regression in the new scheduling queue which is not enabled by default and is enabled only if "priority and preemption" which is an alpha feature is enabled. This PR is an important performance improvement for those who want to use priority and preemption in larger clusters.
The PR adds a hash table to track nominated Pods so that finding such Pods will be faster.
Other than improving performance, we don't expect this PR to change behavior of scheduler.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

ref/ #56032
ref/ #57471 

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```

/sig scheduling
pull/6/head
Kubernetes Submit Queue 2018-02-13 00:07:58 -08:00 committed by GitHub
commit ba791275ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 216 additions and 146 deletions

View File

@ -51,7 +51,7 @@ type SchedulingQueue interface {
AddIfNotPresent(pod *v1.Pod) error AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error AddUnschedulableIfNotPresent(pod *v1.Pod) error
Pop() (*v1.Pod, error) Pop() (*v1.Pod, error)
Update(pod *v1.Pod) error Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error Delete(pod *v1.Pod) error
MoveAllToActiveQueue() MoveAllToActiveQueue()
AssignedPodAdded(pod *v1.Pod) AssignedPodAdded(pod *v1.Pod)
@ -93,8 +93,8 @@ func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
} }
// Update updates a pod in the FIFO. // Update updates a pod in the FIFO.
func (f *FIFO) Update(pod *v1.Pod) error { func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
return f.FIFO.Update(pod) return f.FIFO.Update(newPod)
} }
// Delete deletes a pod in the FIFO. // Delete deletes a pod in the FIFO.
@ -139,6 +139,11 @@ func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
} }
// NominatedNodeName returns nominated node name of a Pod.
func NominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName
}
// UnschedulablePods is an interface for a queue that is used to keep unschedulable // UnschedulablePods is an interface for a queue that is used to keep unschedulable
// pods. These pods are not actively reevaluated for scheduling. They are moved // pods. These pods are not actively reevaluated for scheduling. They are moved
// to the active scheduling queue on certain events, such as termination of a pod // to the active scheduling queue on certain events, such as termination of a pod
@ -147,7 +152,6 @@ type UnschedulablePods interface {
Add(pod *v1.Pod) Add(pod *v1.Pod)
Delete(pod *v1.Pod) Delete(pod *v1.Pod)
Update(pod *v1.Pod) Update(pod *v1.Pod)
GetPodsWaitingForNode(nodeName string) []*v1.Pod
Get(pod *v1.Pod) *v1.Pod Get(pod *v1.Pod) *v1.Pod
Clear() Clear()
} }
@ -167,6 +171,10 @@ type PriorityQueue struct {
activeQ *Heap activeQ *Heap
// unschedulableQ holds pods that have been tried and determined unschedulable. // unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap unschedulableQ *UnschedulablePodsMap
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulableQ.
nominatedPods map[string][]*v1.Pod
// receivedMoveRequest is set to true whenever we receive a request to move a // receivedMoveRequest is set to true whenever we receive a request to move a
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop // pod from the unschedulableQ to the activeQ, and is set to false, when we pop
// a pod from the activeQ. It indicates if we received a move request when a // a pod from the activeQ. It indicates if we received a move request when a
@ -183,11 +191,51 @@ func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{ pq := &PriorityQueue{
activeQ: newHeap(cache.MetaNamespaceKeyFunc, util.HigherPriorityPod), activeQ: newHeap(cache.MetaNamespaceKeyFunc, util.HigherPriorityPod),
unschedulableQ: newUnschedulablePodsMap(), unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: map[string][]*v1.Pod{},
} }
pq.cond.L = &pq.lock pq.cond.L = &pq.lock
return pq return pq
} }
// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not
// already exist in the map. Adding an existing pod is not going to update the pod.
func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
for _, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
glog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
return
}
}
p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod)
}
}
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
for i, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...)
if len(p.nominatedPods[nnn]) == 0 {
delete(p.nominatedPods, nnn)
}
break
}
}
}
}
// updateNominatedPod updates a pod in the nominatedPods.
func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) {
// Even if the nominated node name of the Pod is not changed, we must delete and add it again
// to ensure that its pointer is updated.
p.deleteNominatedPodIfExists(oldPod)
p.addNominatedPodIfNeeded(newPod)
}
// Add adds a pod to the active queue. It should be called only when a new pod // Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in either queue. // is added so there is no chance the pod is already in either queue.
func (p *PriorityQueue) Add(pod *v1.Pod) error { func (p *PriorityQueue) Add(pod *v1.Pod) error {
@ -199,8 +247,10 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
} else { } else {
if p.unschedulableQ.Get(pod) != nil { if p.unschedulableQ.Get(pod) != nil {
glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name) glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name)
p.deleteNominatedPodIfExists(pod)
p.unschedulableQ.Delete(pod) p.unschedulableQ.Delete(pod)
} }
p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast() p.cond.Broadcast()
} }
return err return err
@ -221,6 +271,7 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
if err != nil { if err != nil {
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
} else { } else {
p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast() p.cond.Broadcast()
} }
return err return err
@ -245,10 +296,12 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
} }
if !p.receivedMoveRequest && isPodUnschedulable(pod) { if !p.receivedMoveRequest && isPodUnschedulable(pod) {
p.unschedulableQ.Add(pod) p.unschedulableQ.Add(pod)
p.addNominatedPodIfNeeded(pod)
return nil return nil
} }
err := p.activeQ.Add(pod) err := p.activeQ.Add(pod)
if err == nil { if err == nil {
p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast() p.cond.Broadcast()
} }
return err return err
@ -267,8 +320,10 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
pod := obj.(*v1.Pod)
p.deleteNominatedPodIfExists(pod)
p.receivedMoveRequest = false p.receivedMoveRequest = false
return obj.(*v1.Pod), err return pod, err
} }
// isPodUpdated checks if the pod is updated in a way that it may have become // isPodUpdated checks if the pod is updated in a way that it may have become
@ -287,30 +342,33 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
// Update updates a pod in the active queue if present. Otherwise, it removes // Update updates a pod in the active queue if present. Otherwise, it removes
// the item from the unschedulable queue and adds the updated one to the active // the item from the unschedulable queue and adds the updated one to the active
// queue. // queue.
func (p *PriorityQueue) Update(pod *v1.Pod) error { func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
// If the pod is already in the active queue, just update it there. // If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(pod); exists { if _, exists, _ := p.activeQ.Get(newPod); exists {
err := p.activeQ.Update(pod) p.updateNominatedPod(oldPod, newPod)
err := p.activeQ.Update(newPod)
return err return err
} }
// If the pod is in the unschedulable queue, updating it may make it schedulable. // If the pod is in the unschedulable queue, updating it may make it schedulable.
if oldPod := p.unschedulableQ.Get(pod); oldPod != nil { if usPod := p.unschedulableQ.Get(newPod); usPod != nil {
if isPodUpdated(oldPod, pod) { p.updateNominatedPod(oldPod, newPod)
p.unschedulableQ.Delete(oldPod) if isPodUpdated(oldPod, newPod) {
err := p.activeQ.Add(pod) p.unschedulableQ.Delete(usPod)
err := p.activeQ.Add(newPod)
if err == nil { if err == nil {
p.cond.Broadcast() p.cond.Broadcast()
} }
return err return err
} }
p.unschedulableQ.Update(pod) p.unschedulableQ.Update(newPod)
return nil return nil
} }
// If pod is not in any of the two queue, we put it in the active queue. // If pod is not in any of the two queue, we put it in the active queue.
err := p.activeQ.Add(pod) err := p.activeQ.Add(newPod)
if err == nil { if err == nil {
p.addNominatedPodIfNeeded(newPod)
p.cond.Broadcast() p.cond.Broadcast()
} }
return err return err
@ -321,6 +379,7 @@ func (p *PriorityQueue) Update(pod *v1.Pod) error {
func (p *PriorityQueue) Delete(pod *v1.Pod) error { func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
p.deleteNominatedPodIfExists(pod)
if _, exists, _ := p.activeQ.Get(pod); exists { if _, exists, _ := p.activeQ.Get(pod); exists {
return p.activeQ.Delete(pod) return p.activeQ.Delete(pod)
} }
@ -403,68 +462,34 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
pods := p.unschedulableQ.GetPodsWaitingForNode(nodeName) if list, ok := p.nominatedPods[nodeName]; ok {
for _, obj := range p.activeQ.List() { return list
pod := obj.(*v1.Pod)
if pod.Status.NominatedNodeName == nodeName {
pods = append(pods, pod)
}
} }
return pods return nil
} }
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ. // is used to implement unschedulableQ.
type UnschedulablePodsMap struct { type UnschedulablePodsMap struct {
// pods is a map key by a pod's full-name and the value is a pointer to the pod. // pods is a map key by a pod's full-name and the value is a pointer to the pod.
pods map[string]*v1.Pod pods map[string]*v1.Pod
// nominatedPods is a map keyed by a node name and the value is a list of keyFunc func(*v1.Pod) string
// pods' full-names which are nominated to run on the node.
nominatedPods map[string][]string
keyFunc func(*v1.Pod) string
} }
var _ = UnschedulablePods(&UnschedulablePodsMap{}) var _ = UnschedulablePods(&UnschedulablePodsMap{})
// NominatedNodeName returns the nominated node name of a pod.
func NominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName
}
// Add adds a pod to the unschedulable pods. // Add adds a pod to the unschedulable pods.
func (u *UnschedulablePodsMap) Add(pod *v1.Pod) { func (u *UnschedulablePodsMap) Add(pod *v1.Pod) {
podKey := u.keyFunc(pod) podKey := u.keyFunc(pod)
if _, exists := u.pods[podKey]; !exists { if _, exists := u.pods[podKey]; !exists {
u.pods[podKey] = pod u.pods[podKey] = pod
nominatedNodeName := NominatedNodeName(pod)
if len(nominatedNodeName) > 0 {
u.nominatedPods[nominatedNodeName] = append(u.nominatedPods[nominatedNodeName], podKey)
}
}
}
func (u *UnschedulablePodsMap) deleteFromNominated(pod *v1.Pod) {
nominatedNodeName := NominatedNodeName(pod)
if len(nominatedNodeName) > 0 {
podKey := u.keyFunc(pod)
nps := u.nominatedPods[nominatedNodeName]
for i, np := range nps {
if np == podKey {
u.nominatedPods[nominatedNodeName] = append(nps[:i], nps[i+1:]...)
if len(u.nominatedPods[nominatedNodeName]) == 0 {
delete(u.nominatedPods, nominatedNodeName)
}
break
}
}
} }
} }
// Delete deletes a pod from the unschedulable pods. // Delete deletes a pod from the unschedulable pods.
func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) { func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) {
podKey := u.keyFunc(pod) podKey := u.keyFunc(pod)
if p, exists := u.pods[podKey]; exists { if _, exists := u.pods[podKey]; exists {
u.deleteFromNominated(p)
delete(u.pods, podKey) delete(u.pods, podKey)
} }
} }
@ -472,20 +497,12 @@ func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) {
// Update updates a pod in the unschedulable pods. // Update updates a pod in the unschedulable pods.
func (u *UnschedulablePodsMap) Update(pod *v1.Pod) { func (u *UnschedulablePodsMap) Update(pod *v1.Pod) {
podKey := u.keyFunc(pod) podKey := u.keyFunc(pod)
oldPod, exists := u.pods[podKey] _, exists := u.pods[podKey]
if !exists { if !exists {
u.Add(pod) u.Add(pod)
return return
} }
u.pods[podKey] = pod u.pods[podKey] = pod
oldNominateNodeName := NominatedNodeName(oldPod)
nominatedNodeName := NominatedNodeName(pod)
if oldNominateNodeName != nominatedNodeName {
u.deleteFromNominated(oldPod)
if len(nominatedNodeName) > 0 {
u.nominatedPods[nominatedNodeName] = append(u.nominatedPods[nominatedNodeName], podKey)
}
}
} }
// Get returns the pod if a pod with the same key as the key of the given "pod" // Get returns the pod if a pod with the same key as the key of the given "pod"
@ -498,28 +515,16 @@ func (u *UnschedulablePodsMap) Get(pod *v1.Pod) *v1.Pod {
return nil return nil
} }
// GetPodsWaitingForNode returns a list of unschedulable pods whose NominatedNodeNames
// are equal to the given nodeName.
func (u *UnschedulablePodsMap) GetPodsWaitingForNode(nodeName string) []*v1.Pod {
var pods []*v1.Pod
for _, key := range u.nominatedPods[nodeName] {
pods = append(pods, u.pods[key])
}
return pods
}
// Clear removes all the entries from the unschedulable maps. // Clear removes all the entries from the unschedulable maps.
func (u *UnschedulablePodsMap) Clear() { func (u *UnschedulablePodsMap) Clear() {
u.pods = make(map[string]*v1.Pod) u.pods = make(map[string]*v1.Pod)
u.nominatedPods = make(map[string][]string)
} }
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap. // newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
func newUnschedulablePodsMap() *UnschedulablePodsMap { func newUnschedulablePodsMap() *UnschedulablePodsMap {
return &UnschedulablePodsMap{ return &UnschedulablePodsMap{
pods: make(map[string]*v1.Pod), pods: make(map[string]*v1.Pod),
nominatedPods: make(map[string][]string), keyFunc: util.GetPodFullName,
keyFunc: util.GetPodFullName,
} }
} }

View File

@ -27,19 +27,34 @@ import (
) )
var mediumPriority = (lowPriority + highPriority) / 2 var mediumPriority = (lowPriority + highPriority) / 2
var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "hpp", Name: "hpp",
Namespace: "ns1", Namespace: "ns1",
UID: "hppns1",
}, },
Spec: v1.PodSpec{ Spec: v1.PodSpec{
Priority: &highPriority, Priority: &highPriority,
}, },
}, },
v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "hpp",
Namespace: "ns1",
UID: "hppns1",
},
Spec: v1.PodSpec{
Priority: &highPriority,
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
v1.Pod{ v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mpp", Name: "mpp",
Namespace: "ns2", Namespace: "ns2",
UID: "mppns2",
Annotations: map[string]string{ Annotations: map[string]string{
"annot2": "val2", "annot2": "val2",
}, },
@ -55,6 +70,7 @@ var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "up", Name: "up",
Namespace: "ns1", Namespace: "ns1",
UID: "upns1",
Annotations: map[string]string{ Annotations: map[string]string{
"annot2": "val2", "annot2": "val2",
}, },
@ -79,6 +95,12 @@ func TestPriorityQueue_Add(t *testing.T) {
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
q.Add(&unschedulablePod) q.Add(&unschedulablePod)
q.Add(&highPriorityPod) q.Add(&highPriorityPod)
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
if p, err := q.Pop(); err != nil || p != &highPriorityPod { if p, err := q.Pop(); err != nil || p != &highPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
} }
@ -88,6 +110,61 @@ func TestPriorityQueue_Add(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &unschedulablePod { if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
} }
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
}
}
func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
q := NewPriorityQueue()
q.unschedulableQ.Add(&highPriNominatedPod)
q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddIfNotPresent(&medPriorityPod)
q.AddIfNotPresent(&unschedulablePod)
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
}
if q.unschedulableQ.Get(&highPriNominatedPod) != &highPriNominatedPod {
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
}
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewPriorityQueue()
q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ.
q.AddUnschedulableIfNotPresent(&unschedulablePod)
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Name)
}
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if len(q.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods)
}
if q.unschedulableQ.Get(&unschedulablePod) != &unschedulablePod {
t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePod.Name)
}
} }
func TestPriorityQueue_Pop(t *testing.T) { func TestPriorityQueue_Pop(t *testing.T) {
@ -96,55 +173,71 @@ func TestPriorityQueue_Pop(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
if p, err := q.Pop(); err != nil || p != &highPriorityPod { if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
} }
}() }()
q.Add(&highPriorityPod) q.Add(&medPriorityPod)
wg.Wait() wg.Wait()
} }
func TestPriorityQueue_Update(t *testing.T) { func TestPriorityQueue_Update(t *testing.T) {
q := NewPriorityQueue() q := NewPriorityQueue()
q.Update(&highPriorityPod) q.Update(nil, &highPriorityPod)
if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists { if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
} }
q.Update(&highPriorityPod) if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
}
// Update highPriorityPod and add a nominatedNodeName to it.
q.Update(&highPriorityPod, &highPriNominatedPod)
if q.activeQ.data.Len() != 1 { if q.activeQ.data.Len() != 1 {
t.Error("Expected only one item in activeQ.") t.Error("Expected only one item in activeQ.")
} }
if len(q.nominatedPods) != 1 {
t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods)
}
// Updating an unschedulable pod which is not in any of the two queues, should // Updating an unschedulable pod which is not in any of the two queues, should
// add the pod to activeQ. // add the pod to activeQ.
q.Update(&unschedulablePod) q.Update(&unschedulablePod, &unschedulablePod)
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name) t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
} }
// Updating a pod that is already in unschedulableQ, should move the pod to // Updating a pod that is already in activeQ, should not change it.
// activeQ. q.Update(&unschedulablePod, &unschedulablePod)
q.Update(&unschedulablePod)
if len(q.unschedulableQ.pods) != 0 { if len(q.unschedulableQ.pods) != 0 {
t.Error("Expected unschedulableQ to be empty.") t.Error("Expected unschedulableQ to be empty.")
} }
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name) t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
} }
if p, err := q.Pop(); err != nil || p != &highPriorityPod { if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
} }
} }
func TestPriorityQueue_Delete(t *testing.T) { func TestPriorityQueue_Delete(t *testing.T) {
q := NewPriorityQueue() q := NewPriorityQueue()
q.Update(&highPriorityPod) q.Update(&highPriorityPod, &highPriNominatedPod)
q.Add(&unschedulablePod) q.Add(&unschedulablePod)
q.Delete(&highPriorityPod) q.Delete(&highPriNominatedPod)
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name) t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
} }
if _, exists, _ := q.activeQ.Get(&highPriorityPod); exists { if _, exists, _ := q.activeQ.Get(&highPriNominatedPod); exists {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name) t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
} }
if len(q.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods)
}
q.Delete(&unschedulablePod)
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
}
} }
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
@ -214,6 +307,23 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
} }
} }
func TestPriorityQueue_WaitingPodsForNode(t *testing.T) {
q := NewPriorityQueue()
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
q.Add(&highPriorityPod)
if p, err := q.Pop(); err != nil || p != &highPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
}
expectedList := []*v1.Pod{&medPriorityPod, &unschedulablePod}
if !reflect.DeepEqual(expectedList, q.WaitingPodsForNode("node1")) {
t.Error("Unexpected list of nominated Pods for node.")
}
if q.WaitingPodsForNode("node2") != nil {
t.Error("Expected list of nominated Pods for node2 to be empty.")
}
}
func TestUnschedulablePodsMap(t *testing.T) { func TestUnschedulablePodsMap(t *testing.T) {
var pods = []*v1.Pod{ var pods = []*v1.Pod{
{ {
@ -261,22 +371,16 @@ func TestUnschedulablePodsMap(t *testing.T) {
} }
var updatedPods = make([]*v1.Pod, len(pods)) var updatedPods = make([]*v1.Pod, len(pods))
updatedPods[0] = pods[0].DeepCopy() updatedPods[0] = pods[0].DeepCopy()
updatedPods[0].Status.NominatedNodeName = "node3"
updatedPods[1] = pods[1].DeepCopy() updatedPods[1] = pods[1].DeepCopy()
updatedPods[1].Status.NominatedNodeName = "node3"
updatedPods[3] = pods[3].DeepCopy() updatedPods[3] = pods[3].DeepCopy()
updatedPods[3].Status.NominatedNodeName = ""
tests := []struct { tests := []struct {
podsToAdd []*v1.Pod podsToAdd []*v1.Pod
expectedMapAfterAdd map[string]*v1.Pod expectedMapAfterAdd map[string]*v1.Pod
expectedNominatedAfterAdd map[string][]string podsToUpdate []*v1.Pod
podsToUpdate []*v1.Pod expectedMapAfterUpdate map[string]*v1.Pod
expectedMapAfterUpdate map[string]*v1.Pod podsToDelete []*v1.Pod
expectedNominatedAfterUpdate map[string][]string expectedMapAfterDelete map[string]*v1.Pod
podsToDelete []*v1.Pod
expectedMapAfterDelete map[string]*v1.Pod
expectedNominatedAfterDelete map[string][]string
}{ }{
{ {
podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]}, podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]},
@ -286,10 +390,6 @@ func TestUnschedulablePodsMap(t *testing.T) {
util.GetPodFullName(pods[2]): pods[2], util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3], util.GetPodFullName(pods[3]): pods[3],
}, },
expectedNominatedAfterAdd: map[string][]string{
"node1": {util.GetPodFullName(pods[0]), util.GetPodFullName(pods[3])},
"node3": {util.GetPodFullName(pods[2])},
},
podsToUpdate: []*v1.Pod{updatedPods[0]}, podsToUpdate: []*v1.Pod{updatedPods[0]},
expectedMapAfterUpdate: map[string]*v1.Pod{ expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): updatedPods[0], util.GetPodFullName(pods[0]): updatedPods[0],
@ -297,19 +397,11 @@ func TestUnschedulablePodsMap(t *testing.T) {
util.GetPodFullName(pods[2]): pods[2], util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3], util.GetPodFullName(pods[3]): pods[3],
}, },
expectedNominatedAfterUpdate: map[string][]string{
"node1": {util.GetPodFullName(pods[3])},
"node3": {util.GetPodFullName(pods[2]), util.GetPodFullName(pods[0])},
},
podsToDelete: []*v1.Pod{pods[0], pods[1]}, podsToDelete: []*v1.Pod{pods[0], pods[1]},
expectedMapAfterDelete: map[string]*v1.Pod{ expectedMapAfterDelete: map[string]*v1.Pod{
util.GetPodFullName(pods[2]): pods[2], util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3], util.GetPodFullName(pods[3]): pods[3],
}, },
expectedNominatedAfterDelete: map[string][]string{
"node1": {util.GetPodFullName(pods[3])},
"node3": {util.GetPodFullName(pods[2])},
},
}, },
{ {
podsToAdd: []*v1.Pod{pods[0], pods[3]}, podsToAdd: []*v1.Pod{pods[0], pods[3]},
@ -317,20 +409,13 @@ func TestUnschedulablePodsMap(t *testing.T) {
util.GetPodFullName(pods[0]): pods[0], util.GetPodFullName(pods[0]): pods[0],
util.GetPodFullName(pods[3]): pods[3], util.GetPodFullName(pods[3]): pods[3],
}, },
expectedNominatedAfterAdd: map[string][]string{
"node1": {util.GetPodFullName(pods[0]), util.GetPodFullName(pods[3])},
},
podsToUpdate: []*v1.Pod{updatedPods[3]}, podsToUpdate: []*v1.Pod{updatedPods[3]},
expectedMapAfterUpdate: map[string]*v1.Pod{ expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): pods[0], util.GetPodFullName(pods[0]): pods[0],
util.GetPodFullName(pods[3]): updatedPods[3], util.GetPodFullName(pods[3]): updatedPods[3],
}, },
expectedNominatedAfterUpdate: map[string][]string{ podsToDelete: []*v1.Pod{pods[0], pods[3]},
"node1": {util.GetPodFullName(pods[0])}, expectedMapAfterDelete: map[string]*v1.Pod{},
},
podsToDelete: []*v1.Pod{pods[0], pods[3]},
expectedMapAfterDelete: map[string]*v1.Pod{},
expectedNominatedAfterDelete: map[string][]string{},
}, },
{ {
podsToAdd: []*v1.Pod{pods[1], pods[2]}, podsToAdd: []*v1.Pod{pods[1], pods[2]},
@ -338,24 +423,15 @@ func TestUnschedulablePodsMap(t *testing.T) {
util.GetPodFullName(pods[1]): pods[1], util.GetPodFullName(pods[1]): pods[1],
util.GetPodFullName(pods[2]): pods[2], util.GetPodFullName(pods[2]): pods[2],
}, },
expectedNominatedAfterAdd: map[string][]string{
"node3": {util.GetPodFullName(pods[2])},
},
podsToUpdate: []*v1.Pod{updatedPods[1]}, podsToUpdate: []*v1.Pod{updatedPods[1]},
expectedMapAfterUpdate: map[string]*v1.Pod{ expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[1]): updatedPods[1], util.GetPodFullName(pods[1]): updatedPods[1],
util.GetPodFullName(pods[2]): pods[2], util.GetPodFullName(pods[2]): pods[2],
}, },
expectedNominatedAfterUpdate: map[string][]string{
"node3": {util.GetPodFullName(pods[2]), util.GetPodFullName(updatedPods[1])},
},
podsToDelete: []*v1.Pod{pods[2], pods[3]}, podsToDelete: []*v1.Pod{pods[2], pods[3]},
expectedMapAfterDelete: map[string]*v1.Pod{ expectedMapAfterDelete: map[string]*v1.Pod{
util.GetPodFullName(pods[1]): updatedPods[1], util.GetPodFullName(pods[1]): updatedPods[1],
}, },
expectedNominatedAfterDelete: map[string][]string{
"node3": {util.GetPodFullName(updatedPods[1])},
},
}, },
} }
@ -368,10 +444,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
t.Errorf("#%d: Unexpected map after adding pods. Expected: %v, got: %v", t.Errorf("#%d: Unexpected map after adding pods. Expected: %v, got: %v",
i, test.expectedMapAfterAdd, upm.pods) i, test.expectedMapAfterAdd, upm.pods)
} }
if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterAdd) {
t.Errorf("#%d: Unexpected nominated map after adding pods. Expected: %v, got: %v",
i, test.expectedNominatedAfterAdd, upm.nominatedPods)
}
if len(test.podsToUpdate) > 0 { if len(test.podsToUpdate) > 0 {
for _, p := range test.podsToUpdate { for _, p := range test.podsToUpdate {
upm.Update(p) upm.Update(p)
@ -380,10 +453,6 @@ func TestUnschedulablePodsMap(t *testing.T) {
t.Errorf("#%d: Unexpected map after updating pods. Expected: %v, got: %v", t.Errorf("#%d: Unexpected map after updating pods. Expected: %v, got: %v",
i, test.expectedMapAfterUpdate, upm.pods) i, test.expectedMapAfterUpdate, upm.pods)
} }
if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterUpdate) {
t.Errorf("#%d: Unexpected nominated map after updating pods. Expected: %v, got: %v",
i, test.expectedNominatedAfterUpdate, upm.nominatedPods)
}
} }
for _, p := range test.podsToDelete { for _, p := range test.podsToDelete {
upm.Delete(p) upm.Delete(p)
@ -392,10 +461,6 @@ func TestUnschedulablePodsMap(t *testing.T) {
t.Errorf("#%d: Unexpected map after deleting pods. Expected: %v, got: %v", t.Errorf("#%d: Unexpected map after deleting pods. Expected: %v, got: %v",
i, test.expectedMapAfterDelete, upm.pods) i, test.expectedMapAfterDelete, upm.pods)
} }
if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterDelete) {
t.Errorf("#%d: Unexpected nominated map after deleting pods. Expected: %v, got: %v",
i, test.expectedNominatedAfterDelete, upm.nominatedPods)
}
upm.Clear() upm.Clear()
if len(upm.pods) != 0 { if len(upm.pods) != 0 {
t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods)) t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods))

View File

@ -591,7 +591,7 @@ func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
if c.skipPodUpdate(pod) { if c.skipPodUpdate(pod) {
return return
} }
if err := c.podQueue.Update(pod); err != nil { if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
} }
} }