Fix race in setting nominated node

pull/564/head
Bobby (Babak) Salamat 2018-12-17 23:41:53 -08:00
parent 5581497846
commit 7044145920
4 changed files with 231 additions and 93 deletions

View File

@ -362,7 +362,7 @@ func (g *genericScheduler) processPreemptionWithExtenders(
// worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node.
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
pods := g.schedulingQueue.WaitingPodsForNode(nodeName)
pods := g.schedulingQueue.NominatedPodsForNode(nodeName)
if len(pods) == 0 {
return nil
@ -509,7 +509,7 @@ func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata,
// This may happen only in tests.
return false, meta, nodeInfo
}
nominatedPods := queue.WaitingPodsForNode(nodeInfo.Node().Name)
nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name)
if nominatedPods == nil || len(nominatedPods) == 0 {
return false, meta, nodeInfo
}

View File

@ -64,11 +64,14 @@ type SchedulingQueue interface {
MoveAllToActiveQueue()
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
NominatedPodsForNode(nodeName string) []*v1.Pod
PendingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
// UpdateNominatedPodForNode adds the given pod to the nominated pod map or
// updates it if it already exists.
UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
DeleteNominatedPodIfExists(pod *v1.Pod)
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
@ -152,9 +155,9 @@ func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {}
// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue.
func (f *FIFO) MoveAllToActiveQueue() {}
// WaitingPodsForNode returns pods that are nominated to run on the given node,
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but FIFO does not support it.
func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod {
func (f *FIFO) NominatedPodsForNode(nodeName string) []*v1.Pod {
return nil
}
@ -166,6 +169,9 @@ func (f *FIFO) Close() {
// DeleteNominatedPodIfExists does nothing in FIFO.
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
// UpdateNominatedPodForNode does nothing in FIFO.
func (f *FIFO) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {}
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
func (f *FIFO) NumUnschedulablePods() int {
return 0
@ -204,10 +210,9 @@ type PriorityQueue struct {
podBackoffQ *util.Heap
// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulableQ.
nominatedPods map[string][]*v1.Pod
// nominatedPods is a structures that stores pods which are nominated to run
// on nodes.
nominatedPods *nominatedPodMap
// receivedMoveRequest is set to true whenever we receive a request to move a
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
// a pod from the activeQ. It indicates if we received a move request when a
@ -257,7 +262,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock),
activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: map[string][]*v1.Pod{},
nominatedPods: newNominatedPodMap(),
}
pq.cond.L = &pq.lock
pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted)
@ -272,49 +277,6 @@ func (p *PriorityQueue) run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
}
// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not
// already exist in the map. Adding an existing pod is not going to update the pod.
func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) <= 0 {
return
}
for _, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
return
}
}
p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod)
}
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) <= 0 {
return
}
for i, np := range p.nominatedPods[nnn] {
if np.UID != pod.UID {
continue
}
p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...)
if len(p.nominatedPods[nnn]) == 0 {
delete(p.nominatedPods, nnn)
}
break
}
}
// updateNominatedPod updates a pod in the nominatedPods.
func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) {
// Even if the nominated node name of the Pod is not changed, we must delete and add it again
// to ensure that its pointer is updated.
p.deleteNominatedPodIfExists(oldPod)
p.addNominatedPodIfNeeded(newPod)
}
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(pod *v1.Pod) error {
@ -326,14 +288,13 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
}
if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.deleteNominatedPodIfExists(pod)
p.unschedulableQ.delete(pod)
}
// Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pod); err == nil {
klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
}
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
return nil
@ -357,7 +318,7 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
@ -420,7 +381,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
p.backoffPod(pod)
p.unschedulableQ.addOrUpdate(pod)
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
return nil
}
@ -430,14 +391,14 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
if err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
} else {
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
}
return err
}
err := p.activeQ.Add(pod)
if err == nil {
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
@ -523,14 +484,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if oldPod != nil {
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(oldPod); exists {
p.updateNominatedPod(oldPod, newPod)
p.nominatedPods.update(oldPod, newPod)
err := p.activeQ.Update(newPod)
return err
}
// If the pod is in the backoff queue, update it there.
if _, exists, _ := p.podBackoffQ.Get(oldPod); exists {
p.updateNominatedPod(oldPod, newPod)
p.nominatedPods.update(oldPod, newPod)
p.podBackoffQ.Delete(newPod)
err := p.activeQ.Add(newPod)
if err == nil {
@ -542,7 +503,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPod := p.unschedulableQ.get(newPod); usPod != nil {
p.updateNominatedPod(oldPod, newPod)
p.nominatedPods.update(oldPod, newPod)
if isPodUpdated(oldPod, newPod) {
// If the pod is updated reset backoff
p.clearPodBackoff(newPod)
@ -560,7 +521,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If pod is not in any of the two queue, we put it in the active queue.
err := p.activeQ.Add(newPod)
if err == nil {
p.addNominatedPodIfNeeded(newPod)
p.nominatedPods.add(newPod, "")
p.cond.Broadcast()
}
return err
@ -571,7 +532,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.deleteNominatedPodIfExists(pod)
p.nominatedPods.delete(pod)
err := p.activeQ.Delete(pod)
if err != nil { // The item was probably not found in the activeQ.
p.clearPodBackoff(pod)
@ -663,16 +624,13 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
return podsToMove
}
// WaitingPodsForNode returns pods that are nominated to run on the given node,
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node before they
// can be actually scheduled.
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
p.lock.RLock()
defer p.lock.RUnlock()
if list, ok := p.nominatedPods[nodeName]; ok {
return list
}
return nil
return p.nominatedPods.podsForNode(nodeName)
}
// PendingPods returns all the pending pods in the queue. This function is
@ -702,10 +660,20 @@ func (p *PriorityQueue) Close() {
p.cond.Broadcast()
}
// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod
// DeleteNominatedPodIfExists deletes pod nominatedPods.
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
p.lock.Lock()
p.deleteNominatedPodIfExists(pod)
p.nominatedPods.delete(pod)
p.lock.Unlock()
}
// UpdateNominatedPodForNode adds a pod to the nominated pods of the given node.
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
p.lock.Lock()
p.nominatedPods.add(pod, nodeName)
p.lock.Unlock()
}
@ -762,3 +730,77 @@ func newUnschedulablePodsMap() *UnschedulablePodsMap {
keyFunc: util.GetPodFullName,
}
}
// nominatedPodMap is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominatedPodMap struct {
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulableQ.
nominatedPods map[string][]*v1.Pod
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
// nominated.
nominatedPodToNode map[ktypes.UID]string
}
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
// always delete the pod if it already exist, to ensure we never store more than
// one instance of the pod.
npm.delete(p)
nnn := nodeName
if len(nnn) == 0 {
nnn = NominatedNodeName(p)
if len(nnn) == 0 {
return
}
}
npm.nominatedPodToNode[p.UID] = nnn
for _, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}
func (npm *nominatedPodMap) delete(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
}
break
}
}
delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPod, "")
}
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
if list, ok := npm.nominatedPods[nodeName]; ok {
return list
}
return nil
}
func newNominatedPodMap() *nominatedPodMap {
return &nominatedPodMap{
nominatedPods: make(map[string][]*v1.Pod),
nominatedPodToNode: make(map[ktypes.UID]string),
}
}

View File

@ -105,8 +105,14 @@ func TestPriorityQueue_Add(t *testing.T) {
if err := q.Add(&highPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
expectedNominatedPods := &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
unschedulablePod.UID: "node1",
},
nominatedPods: map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
@ -120,8 +126,8 @@ func TestPriorityQueue_Add(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
if len(q.nominatedPods.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"])
}
}
@ -131,8 +137,14 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddIfNotPresent(&medPriorityPod)
q.AddIfNotPresent(&unschedulablePod)
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
expectedNominatedPods := &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
unschedulablePod.UID: "node1",
},
nominatedPods: map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
@ -143,8 +155,8 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
if len(q.nominatedPods.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"])
}
if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod {
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
@ -157,8 +169,15 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ.
q.AddUnschedulableIfNotPresent(&unschedulablePod)
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod},
expectedNominatedPods := &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
unschedulablePod.UID: "node1",
highPriNominatedPod.UID: "node1",
},
nominatedPods: map[string][]*v1.Pod{
"node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod},
},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
@ -169,7 +188,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if len(q.nominatedPods) != 1 {
if len(q.nominatedPods.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods)
}
if q.unschedulableQ.get(&unschedulablePod) != &unschedulablePod {
@ -186,8 +205,8 @@ func TestPriorityQueue_Pop(t *testing.T) {
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if len(q.nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods["node1"])
if len(q.nominatedPods.nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"])
}
}()
q.Add(&medPriorityPod)
@ -200,7 +219,7 @@ func TestPriorityQueue_Update(t *testing.T) {
if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
}
if len(q.nominatedPods) != 0 {
if len(q.nominatedPods.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
}
// Update highPriorityPod and add a nominatedNodeName to it.
@ -208,7 +227,7 @@ func TestPriorityQueue_Update(t *testing.T) {
if q.activeQ.Len() != 1 {
t.Error("Expected only one item in activeQ.")
}
if len(q.nominatedPods) != 1 {
if len(q.nominatedPods.nominatedPods) != 1 {
t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods)
}
// Updating an unschedulable pod which is not in any of the two queues, should
@ -243,13 +262,13 @@ func TestPriorityQueue_Delete(t *testing.T) {
if _, exists, _ := q.activeQ.Get(&highPriNominatedPod); exists {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
}
if len(q.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods)
if len(q.nominatedPods.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods.nominatedPods)
}
if err := q.Delete(&unschedulablePod); err != nil {
t.Errorf("delete failed: %v", err)
}
if len(q.nominatedPods) != 0 {
if len(q.nominatedPods.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
}
}
@ -321,7 +340,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
}
}
func TestPriorityQueue_WaitingPodsForNode(t *testing.T) {
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q := NewPriorityQueue(nil)
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
@ -330,10 +349,10 @@ func TestPriorityQueue_WaitingPodsForNode(t *testing.T) {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
}
expectedList := []*v1.Pod{&medPriorityPod, &unschedulablePod}
if !reflect.DeepEqual(expectedList, q.WaitingPodsForNode("node1")) {
if !reflect.DeepEqual(expectedList, q.NominatedPodsForNode("node1")) {
t.Error("Unexpected list of nominated Pods for node.")
}
if q.WaitingPodsForNode("node2") != nil {
if q.NominatedPodsForNode("node2") != nil {
t.Error("Expected list of nominated Pods for node2 to be empty.")
}
}
@ -354,6 +373,75 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
}
}
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
q := NewPriorityQueue(nil)
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
// Update unschedulablePod on a different node than specified in the pod.
q.UpdateNominatedPodForNode(&unschedulablePod, "node5")
// Update nominated node name of a pod on a node that is not specified in the pod object.
q.UpdateNominatedPodForNode(&highPriorityPod, "node2")
expectedNominatedPods := &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
highPriorityPod.UID: "node2",
unschedulablePod.UID: "node5",
},
nominatedPods: map[string][]*v1.Pod{
"node1": {&medPriorityPod},
"node2": {&highPriorityPod},
"node5": {&unschedulablePod},
},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
// List of nominated pods shouldn't change after popping them from the queue.
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after popping pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
// Update one of the nominated pods that doesn't have nominatedNodeName in the
// pod object. It should be updated correctly.
q.UpdateNominatedPodForNode(&highPriorityPod, "node4")
expectedNominatedPods = &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
highPriorityPod.UID: "node4",
unschedulablePod.UID: "node5",
},
nominatedPods: map[string][]*v1.Pod{
"node1": {&medPriorityPod},
"node4": {&highPriorityPod},
"node5": {&unschedulablePod},
},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after updating pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
// Delete a nominated pod that doesn't have nominatedNodeName in the pod
// object. It should be deleted.
q.DeleteNominatedPodIfExists(&highPriorityPod)
expectedNominatedPods = &nominatedPodMap{
nominatedPodToNode: map[types.UID]string{
medPriorityPod.UID: "node1",
unschedulablePod.UID: "node5",
},
nominatedPods: map[string][]*v1.Pod{
"node1": {&medPriorityPod},
"node5": {&unschedulablePod},
},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after deleting pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
}
func TestUnschedulablePodsMap(t *testing.T) {
var pods = []*v1.Pod{
{

View File

@ -320,11 +320,19 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
var nodeName = ""
if node != nil {
nodeName = node.Name
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// Make a call to update nominated node name of the pod on the API server.
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)