From 5581497846b89b8e385439bd80c4b9e99895e706 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Mon, 17 Dec 2018 17:44:01 -0800 Subject: [PATCH 1/3] Add a test that reproduces the race condition between setting nominated node name of a pod and scheduling cycle of other pods --- test/integration/scheduler/preemption_test.go | 150 +++++++++++++++--- test/integration/scheduler/util.go | 13 +- 2 files changed, 144 insertions(+), 19 deletions(-) diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index ffa0e4237e..70b590b479 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -33,6 +33,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" clientset "k8s.io/client-go/kubernetes" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" testutils "k8s.io/kubernetes/test/utils" @@ -73,7 +74,7 @@ func TestPreemption(t *testing.T) { defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } tests := []struct { @@ -91,7 +92,7 @@ func TestPreemption(t *testing.T) { Priority: &lowPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), }, @@ -101,7 +102,7 @@ func TestPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), preemptedPodIndexes: map[int]struct{}{0: {}}, @@ -237,7 +238,7 @@ func TestPreemption(t *testing.T) { nodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } node, err := createNode(context.clientSet, "node1", nodeRes) if err != nil { @@ -313,7 +314,7 @@ func TestDisablePreemption(t *testing.T) { Priority: &lowPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), }, @@ -323,7 +324,7 @@ func TestDisablePreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), }, @@ -333,7 +334,7 @@ func TestDisablePreemption(t *testing.T) { nodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } _, err := createNode(context.clientSet, "node1", nodeRes) if err != nil { @@ -375,7 +376,7 @@ func TestDisablePreemption(t *testing.T) { func mkPriorityPodWithGrace(tc *TestContext, name string, priority int32, grace int64) *v1.Pod { defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } pod := initPausePod(tc.clientSet, &pausePodConfig{ Name: name, @@ -420,7 +421,7 @@ func TestPreemptionStarvation(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), }, @@ -430,7 +431,7 @@ func TestPreemptionStarvation(t *testing.T) { nodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } _, err := createNode(context.clientSet, "node1", nodeRes) if err != nil { @@ -490,6 +491,119 @@ func TestPreemptionStarvation(t *testing.T) { } } +// TestPreemptionRaces tests that other scheduling events and operations do not +// race with the preemption process. +func TestPreemptionRaces(t *testing.T) { + // Initialize scheduler. + context := initTest(t, "preemption-race") + defer cleanupTest(t, context) + cs := context.clientSet + + tests := []struct { + description string + numInitialPods int // Pods created and executed before running preemptor + numAdditionalPods int // Pods created after creating the preemptor + numRepetitions int // Repeat the tests to check races + preemptor *v1.Pod + }{ + { + // This test ensures that while the preempting pod is waiting for the victims + // terminate, other lower priority pods are not scheduled in the room created + // after preemption and while the higher priority pods is not scheduled yet. + description: "ensures that other pods are not scheduled while preemptor is being marked as nominated (issue #72124)", + numInitialPods: 2, + numAdditionalPods: 50, + numRepetitions: 10, + preemptor: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(4900, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(4900, resource.DecimalSI)}, + }, + }), + }, + } + + // Create a node with some resources and a label. + nodeRes := &v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(100, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(5000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(5000, resource.DecimalSI), + } + _, err := createNode(context.clientSet, "node1", nodeRes) + if err != nil { + t.Fatalf("Error creating nodes: %v", err) + } + + for _, test := range tests { + if test.numRepetitions <= 0 { + test.numRepetitions = 1 + } + for n := 0; n < test.numRepetitions; n++ { + initialPods := make([]*v1.Pod, test.numInitialPods) + additionalPods := make([]*v1.Pod, test.numAdditionalPods) + // Create and run existingPods. + for i := 0; i < test.numInitialPods; i++ { + initialPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(context, fmt.Sprintf("rpod-%v", i), mediumPriority, 0)) + if err != nil { + t.Fatalf("Test [%v]: Error creating pause pod: %v", test.description, err) + } + } + // make sure that initial Pods are all scheduled. + for _, p := range initialPods { + if err := waitForPodToSchedule(cs, p); err != nil { + t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) + } + } + // Create the preemptor. + klog.Info("Creating the preemptor pod...") + preemptor, err := createPausePod(cs, test.preemptor) + if err != nil { + t.Errorf("Error while creating the preempting pod: %v", err) + } + + klog.Info("Creating additional pods...") + for i := 0; i < test.numAdditionalPods; i++ { + additionalPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(context, fmt.Sprintf("ppod-%v", i), mediumPriority, 0)) + if err != nil { + t.Fatalf("Test [%v]: Error creating pending pod: %v", test.description, err) + } + } + // Check that the preemptor pod gets nominated node name. + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v/%v: %v", test.description, preemptor.Namespace, preemptor.Name, err) + } + // Make sure that preemptor is scheduled after preemptions. + if err := waitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil { + t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err) + } + + klog.Info("Check unschedulable pods still exists and were never scheduled...") + for _, p := range additionalPods { + pod, err := cs.CoreV1().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error in getting Pod %v/%v info: %v", p.Namespace, p.Name, err) + } + if len(pod.Spec.NodeName) > 0 { + t.Errorf("Pod %v/%v is already scheduled", p.Namespace, p.Name) + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + if cond != nil && cond.Status != v1.ConditionFalse { + t.Errorf("Pod %v/%v is no longer unschedulable: %v", p.Namespace, p.Name, err) + } + } + // Cleanup + klog.Info("Cleaning up all pods...") + allPods := additionalPods + allPods = append(allPods, initialPods...) + allPods = append(allPods, preemptor) + cleanupPods(cs, t, allPods) + } + } +} + // TestNominatedNodeCleanUp checks that when there are nominated pods on a // node and a higher priority pod is nominated to run on the node, the nominated // node name of the lower priority pods is cleared. @@ -515,7 +629,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { nodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } _, err := createNode(context.clientSet, "node1", nodeRes) if err != nil { @@ -543,7 +657,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { Priority: &mediumPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(400, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, }, }) medPriPod, err := createPausePod(cs, podConf) @@ -561,7 +675,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }) highPriPod, err := createPausePod(cs, podConf) @@ -626,12 +740,12 @@ func TestPDBInPreemption(t *testing.T) { defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } defaultNodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } type nodeConfig struct { @@ -683,7 +797,7 @@ func TestPDBInPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), preemptedPodIndexes: map[int]struct{}{2: {}}, @@ -721,7 +835,7 @@ func TestPDBInPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), preemptedPodIndexes: map[int]struct{}{1: {}}, @@ -801,7 +915,7 @@ func TestPDBInPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(400, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, }, }), // The third node is chosen because PDB is not violated for node 3 and the victims have lower priority than node-2. diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 11e990c665..c35502376d 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -582,6 +582,17 @@ func podScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond } } +// podUnschedulable returns a condition function that returns true if the given pod +// gets unschedulable status. +func podSchedulableCondition(c clientset.Interface, podNamespace, podName string) (*v1.PodCondition, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + return cond, nil +} + // podUnschedulable returns a condition function that returns true if the given pod // gets unschedulable status. func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { @@ -710,7 +721,7 @@ func cleanupPods(cs clientset.Interface, t *testing.T, pods []*v1.Pod) { } } for _, p := range pods { - if err := wait.Poll(time.Second, wait.ForeverTestTimeout, + if err := wait.Poll(time.Millisecond, wait.ForeverTestTimeout, podDeleted(cs, p.Namespace, p.Name)); err != nil { t.Errorf("error while waiting for pod %v/%v to get deleted: %v", p.Namespace, p.Name, err) } From 704414592037dee57de1071cf8489373b6ddd5d0 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Mon, 17 Dec 2018 23:41:53 -0800 Subject: [PATCH 2/3] Fix race in setting nominated node --- pkg/scheduler/core/generic_scheduler.go | 4 +- .../internal/queue/scheduling_queue.go | 182 +++++++++++------- .../internal/queue/scheduling_queue_test.go | 130 +++++++++++-- pkg/scheduler/scheduler.go | 8 + 4 files changed, 231 insertions(+), 93 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 36ab65d104..4b0f8ea523 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -362,7 +362,7 @@ func (g *genericScheduler) processPreemptionWithExtenders( // worth the complexity, especially because we generally expect to have a very // small number of nominated pods per node. func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod { - pods := g.schedulingQueue.WaitingPodsForNode(nodeName) + pods := g.schedulingQueue.NominatedPodsForNode(nodeName) if len(pods) == 0 { return nil @@ -509,7 +509,7 @@ func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata, // This may happen only in tests. return false, meta, nodeInfo } - nominatedPods := queue.WaitingPodsForNode(nodeInfo.Node().Name) + nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name) if nominatedPods == nil || len(nominatedPods) == 0 { return false, meta, nodeInfo } diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index a72aced91e..22bccc5e83 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -64,11 +64,14 @@ type SchedulingQueue interface { MoveAllToActiveQueue() AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) - WaitingPodsForNode(nodeName string) []*v1.Pod + NominatedPodsForNode(nodeName string) []*v1.Pod PendingPods() []*v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() + // UpdateNominatedPodForNode adds the given pod to the nominated pod map or + // updates it if it already exists. + UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) // DeleteNominatedPodIfExists deletes nominatedPod from internal cache DeleteNominatedPodIfExists(pod *v1.Pod) // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. @@ -152,9 +155,9 @@ func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {} // MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue. func (f *FIFO) MoveAllToActiveQueue() {} -// WaitingPodsForNode returns pods that are nominated to run on the given node, +// NominatedPodsForNode returns pods that are nominated to run on the given node, // but FIFO does not support it. -func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod { +func (f *FIFO) NominatedPodsForNode(nodeName string) []*v1.Pod { return nil } @@ -166,6 +169,9 @@ func (f *FIFO) Close() { // DeleteNominatedPodIfExists does nothing in FIFO. func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {} +// UpdateNominatedPodForNode does nothing in FIFO. +func (f *FIFO) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {} + // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. func (f *FIFO) NumUnschedulablePods() int { return 0 @@ -204,10 +210,9 @@ type PriorityQueue struct { podBackoffQ *util.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap - // nominatedPods is a map keyed by a node name and the value is a list of - // pods which are nominated to run on the node. These are pods which can be in - // the activeQ or unschedulableQ. - nominatedPods map[string][]*v1.Pod + // nominatedPods is a structures that stores pods which are nominated to run + // on nodes. + nominatedPods *nominatedPodMap // receivedMoveRequest is set to true whenever we receive a request to move a // pod from the unschedulableQ to the activeQ, and is set to false, when we pop // a pod from the activeQ. It indicates if we received a move request when a @@ -257,7 +262,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock), activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp), unschedulableQ: newUnschedulablePodsMap(), - nominatedPods: map[string][]*v1.Pod{}, + nominatedPods: newNominatedPodMap(), } pq.cond.L = &pq.lock pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted) @@ -272,49 +277,6 @@ func (p *PriorityQueue) run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) } -// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not -// already exist in the map. Adding an existing pod is not going to update the pod. -func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) { - nnn := NominatedNodeName(pod) - if len(nnn) <= 0 { - return - } - for _, np := range p.nominatedPods[nnn] { - if np.UID == pod.UID { - klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name) - return - } - } - p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod) -} - -// deleteNominatedPodIfExists deletes a pod from the nominatedPods. -// NOTE: this function assumes lock has been acquired in caller. -func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) { - nnn := NominatedNodeName(pod) - if len(nnn) <= 0 { - return - } - for i, np := range p.nominatedPods[nnn] { - if np.UID != pod.UID { - continue - } - p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...) - if len(p.nominatedPods[nnn]) == 0 { - delete(p.nominatedPods, nnn) - } - break - } -} - -// updateNominatedPod updates a pod in the nominatedPods. -func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) { - // Even if the nominated node name of the Pod is not changed, we must delete and add it again - // to ensure that its pointer is updated. - p.deleteNominatedPodIfExists(oldPod) - p.addNominatedPodIfNeeded(newPod) -} - // Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(pod *v1.Pod) error { @@ -326,14 +288,13 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { } if p.unschedulableQ.get(pod) != nil { klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name) - p.deleteNominatedPodIfExists(pod) p.unschedulableQ.delete(pod) } // Delete pod from backoffQ if it is backing off if err := p.podBackoffQ.Delete(pod); err == nil { klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name) } - p.addNominatedPodIfNeeded(pod) + p.nominatedPods.add(pod, "") p.cond.Broadcast() return nil @@ -357,7 +318,7 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { if err != nil { klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) } else { - p.addNominatedPodIfNeeded(pod) + p.nominatedPods.add(pod, "") p.cond.Broadcast() } return err @@ -420,7 +381,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { if !p.receivedMoveRequest && isPodUnschedulable(pod) { p.backoffPod(pod) p.unschedulableQ.addOrUpdate(pod) - p.addNominatedPodIfNeeded(pod) + p.nominatedPods.add(pod, "") return nil } @@ -430,14 +391,14 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { if err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) } else { - p.addNominatedPodIfNeeded(pod) + p.nominatedPods.add(pod, "") } return err } err := p.activeQ.Add(pod) if err == nil { - p.addNominatedPodIfNeeded(pod) + p.nominatedPods.add(pod, "") p.cond.Broadcast() } return err @@ -523,14 +484,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { if oldPod != nil { // If the pod is already in the active queue, just update it there. if _, exists, _ := p.activeQ.Get(oldPod); exists { - p.updateNominatedPod(oldPod, newPod) + p.nominatedPods.update(oldPod, newPod) err := p.activeQ.Update(newPod) return err } // If the pod is in the backoff queue, update it there. if _, exists, _ := p.podBackoffQ.Get(oldPod); exists { - p.updateNominatedPod(oldPod, newPod) + p.nominatedPods.update(oldPod, newPod) p.podBackoffQ.Delete(newPod) err := p.activeQ.Add(newPod) if err == nil { @@ -542,7 +503,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPod := p.unschedulableQ.get(newPod); usPod != nil { - p.updateNominatedPod(oldPod, newPod) + p.nominatedPods.update(oldPod, newPod) if isPodUpdated(oldPod, newPod) { // If the pod is updated reset backoff p.clearPodBackoff(newPod) @@ -560,7 +521,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { // If pod is not in any of the two queue, we put it in the active queue. err := p.activeQ.Add(newPod) if err == nil { - p.addNominatedPodIfNeeded(newPod) + p.nominatedPods.add(newPod, "") p.cond.Broadcast() } return err @@ -571,7 +532,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - p.deleteNominatedPodIfExists(pod) + p.nominatedPods.delete(pod) err := p.activeQ.Delete(pod) if err != nil { // The item was probably not found in the activeQ. p.clearPodBackoff(pod) @@ -663,16 +624,13 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod return podsToMove } -// WaitingPodsForNode returns pods that are nominated to run on the given node, +// NominatedPodsForNode returns pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node before they // can be actually scheduled. -func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { +func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod { p.lock.RLock() defer p.lock.RUnlock() - if list, ok := p.nominatedPods[nodeName]; ok { - return list - } - return nil + return p.nominatedPods.podsForNode(nodeName) } // PendingPods returns all the pending pods in the queue. This function is @@ -702,10 +660,20 @@ func (p *PriorityQueue) Close() { p.cond.Broadcast() } -// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod +// DeleteNominatedPodIfExists deletes pod nominatedPods. func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) { p.lock.Lock() - p.deleteNominatedPodIfExists(pod) + p.nominatedPods.delete(pod) + p.lock.Unlock() +} + +// UpdateNominatedPodForNode adds a pod to the nominated pods of the given node. +// This is called during the preemption process after a node is nominated to run +// the pod. We update the structure before sending a request to update the pod +// object to avoid races with the following scheduling cycles. +func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) { + p.lock.Lock() + p.nominatedPods.add(pod, nodeName) p.lock.Unlock() } @@ -762,3 +730,77 @@ func newUnschedulablePodsMap() *UnschedulablePodsMap { keyFunc: util.GetPodFullName, } } + +// nominatedPodMap is a structure that stores pods nominated to run on nodes. +// It exists because nominatedNodeName of pod objects stored in the structure +// may be different than what scheduler has here. We should be able to find pods +// by their UID and update/delete them. +type nominatedPodMap struct { + // nominatedPods is a map keyed by a node name and the value is a list of + // pods which are nominated to run on the node. These are pods which can be in + // the activeQ or unschedulableQ. + nominatedPods map[string][]*v1.Pod + // nominatedPodToNode is map keyed by a Pod UID to the node name where it is + // nominated. + nominatedPodToNode map[ktypes.UID]string +} + +func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) { + // always delete the pod if it already exist, to ensure we never store more than + // one instance of the pod. + npm.delete(p) + + nnn := nodeName + if len(nnn) == 0 { + nnn = NominatedNodeName(p) + if len(nnn) == 0 { + return + } + } + npm.nominatedPodToNode[p.UID] = nnn + for _, np := range npm.nominatedPods[nnn] { + if np.UID == p.UID { + klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name) + return + } + } + npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p) +} + +func (npm *nominatedPodMap) delete(p *v1.Pod) { + nnn, ok := npm.nominatedPodToNode[p.UID] + if !ok { + return + } + for i, np := range npm.nominatedPods[nnn] { + if np.UID == p.UID { + npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...) + if len(npm.nominatedPods[nnn]) == 0 { + delete(npm.nominatedPods, nnn) + } + break + } + } + delete(npm.nominatedPodToNode, p.UID) +} + +func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) { + // We update irrespective of the nominatedNodeName changed or not, to ensure + // that pod pointer is updated. + npm.delete(oldPod) + npm.add(newPod, "") +} + +func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod { + if list, ok := npm.nominatedPods[nodeName]; ok { + return list + } + return nil +} + +func newNominatedPodMap() *nominatedPodMap { + return &nominatedPodMap{ + nominatedPods: make(map[string][]*v1.Pod), + nominatedPodToNode: make(map[ktypes.UID]string), + } +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 7ac5fc1fbd..a77e756d12 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -105,8 +105,14 @@ func TestPriorityQueue_Add(t *testing.T) { if err := q.Add(&highPriorityPod); err != nil { t.Errorf("add failed: %v", err) } - expectedNominatedPods := map[string][]*v1.Pod{ - "node1": {&medPriorityPod, &unschedulablePod}, + expectedNominatedPods := &nominatedPodMap{ + nominatedPodToNode: map[types.UID]string{ + medPriorityPod.UID: "node1", + unschedulablePod.UID: "node1", + }, + nominatedPods: map[string][]*v1.Pod{ + "node1": {&medPriorityPod, &unschedulablePod}, + }, } if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) @@ -120,8 +126,8 @@ func TestPriorityQueue_Add(t *testing.T) { if p, err := q.Pop(); err != nil || p != &unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name) } - if len(q.nominatedPods["node1"]) != 2 { - t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"]) + if len(q.nominatedPods.nominatedPods["node1"]) != 2 { + t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) } } @@ -131,8 +137,14 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddIfNotPresent(&medPriorityPod) q.AddIfNotPresent(&unschedulablePod) - expectedNominatedPods := map[string][]*v1.Pod{ - "node1": {&medPriorityPod, &unschedulablePod}, + expectedNominatedPods := &nominatedPodMap{ + nominatedPodToNode: map[types.UID]string{ + medPriorityPod.UID: "node1", + unschedulablePod.UID: "node1", + }, + nominatedPods: map[string][]*v1.Pod{ + "node1": {&medPriorityPod, &unschedulablePod}, + }, } if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) @@ -143,8 +155,8 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { if p, err := q.Pop(); err != nil || p != &unschedulablePod { t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name) } - if len(q.nominatedPods["node1"]) != 2 { - t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"]) + if len(q.nominatedPods.nominatedPods["node1"]) != 2 { + t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) } if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod { t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name) @@ -157,8 +169,15 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ. q.AddUnschedulableIfNotPresent(&unschedulablePod) - expectedNominatedPods := map[string][]*v1.Pod{ - "node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod}, + expectedNominatedPods := &nominatedPodMap{ + nominatedPodToNode: map[types.UID]string{ + medPriorityPod.UID: "node1", + unschedulablePod.UID: "node1", + highPriNominatedPod.UID: "node1", + }, + nominatedPods: map[string][]*v1.Pod{ + "node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod}, + }, } if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) @@ -169,7 +188,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { if p, err := q.Pop(); err != nil || p != &medPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) } - if len(q.nominatedPods) != 1 { + if len(q.nominatedPods.nominatedPods) != 1 { t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods) } if q.unschedulableQ.get(&unschedulablePod) != &unschedulablePod { @@ -186,8 +205,8 @@ func TestPriorityQueue_Pop(t *testing.T) { if p, err := q.Pop(); err != nil || p != &medPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) } - if len(q.nominatedPods["node1"]) != 1 { - t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods["node1"]) + if len(q.nominatedPods.nominatedPods["node1"]) != 1 { + t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods.nominatedPods["node1"]) } }() q.Add(&medPriorityPod) @@ -200,7 +219,7 @@ func TestPriorityQueue_Update(t *testing.T) { if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) } - if len(q.nominatedPods) != 0 { + if len(q.nominatedPods.nominatedPods) != 0 { t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) } // Update highPriorityPod and add a nominatedNodeName to it. @@ -208,7 +227,7 @@ func TestPriorityQueue_Update(t *testing.T) { if q.activeQ.Len() != 1 { t.Error("Expected only one item in activeQ.") } - if len(q.nominatedPods) != 1 { + if len(q.nominatedPods.nominatedPods) != 1 { t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods) } // Updating an unschedulable pod which is not in any of the two queues, should @@ -243,13 +262,13 @@ func TestPriorityQueue_Delete(t *testing.T) { if _, exists, _ := q.activeQ.Get(&highPriNominatedPod); exists { t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name) } - if len(q.nominatedPods) != 1 { - t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods) + if len(q.nominatedPods.nominatedPods) != 1 { + t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods.nominatedPods) } if err := q.Delete(&unschedulablePod); err != nil { t.Errorf("delete failed: %v", err) } - if len(q.nominatedPods) != 0 { + if len(q.nominatedPods.nominatedPods) != 0 { t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) } } @@ -321,7 +340,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } } -func TestPriorityQueue_WaitingPodsForNode(t *testing.T) { +func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.Add(&unschedulablePod) @@ -330,10 +349,10 @@ func TestPriorityQueue_WaitingPodsForNode(t *testing.T) { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) } expectedList := []*v1.Pod{&medPriorityPod, &unschedulablePod} - if !reflect.DeepEqual(expectedList, q.WaitingPodsForNode("node1")) { + if !reflect.DeepEqual(expectedList, q.NominatedPodsForNode("node1")) { t.Error("Unexpected list of nominated Pods for node.") } - if q.WaitingPodsForNode("node2") != nil { + if q.NominatedPodsForNode("node2") != nil { t.Error("Expected list of nominated Pods for node2 to be empty.") } } @@ -354,6 +373,75 @@ func TestPriorityQueue_PendingPods(t *testing.T) { } } +func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { + q := NewPriorityQueue(nil) + if err := q.Add(&medPriorityPod); err != nil { + t.Errorf("add failed: %v", err) + } + // Update unschedulablePod on a different node than specified in the pod. + q.UpdateNominatedPodForNode(&unschedulablePod, "node5") + + // Update nominated node name of a pod on a node that is not specified in the pod object. + q.UpdateNominatedPodForNode(&highPriorityPod, "node2") + expectedNominatedPods := &nominatedPodMap{ + nominatedPodToNode: map[types.UID]string{ + medPriorityPod.UID: "node1", + highPriorityPod.UID: "node2", + unschedulablePod.UID: "node5", + }, + nominatedPods: map[string][]*v1.Pod{ + "node1": {&medPriorityPod}, + "node2": {&highPriorityPod}, + "node5": {&unschedulablePod}, + }, + } + if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + } + if p, err := q.Pop(); err != nil || p != &medPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + } + // List of nominated pods shouldn't change after popping them from the queue. + if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after popping pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + } + // Update one of the nominated pods that doesn't have nominatedNodeName in the + // pod object. It should be updated correctly. + q.UpdateNominatedPodForNode(&highPriorityPod, "node4") + expectedNominatedPods = &nominatedPodMap{ + nominatedPodToNode: map[types.UID]string{ + medPriorityPod.UID: "node1", + highPriorityPod.UID: "node4", + unschedulablePod.UID: "node5", + }, + nominatedPods: map[string][]*v1.Pod{ + "node1": {&medPriorityPod}, + "node4": {&highPriorityPod}, + "node5": {&unschedulablePod}, + }, + } + if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after updating pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + } + + // Delete a nominated pod that doesn't have nominatedNodeName in the pod + // object. It should be deleted. + q.DeleteNominatedPodIfExists(&highPriorityPod) + expectedNominatedPods = &nominatedPodMap{ + nominatedPodToNode: map[types.UID]string{ + medPriorityPod.UID: "node1", + unschedulablePod.UID: "node5", + }, + nominatedPods: map[string][]*v1.Pod{ + "node1": {&medPriorityPod}, + "node5": {&unschedulablePod}, + }, + } + if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) { + t.Errorf("Unexpected nominated map after deleting pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods) + } +} + func TestUnschedulablePodsMap(t *testing.T) { var pods = []*v1.Pod{ { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a9e2d12ec6..ce35a58cbc 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -320,11 +320,19 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e var nodeName = "" if node != nil { nodeName = node.Name + // Update the scheduling queue with the nominated pod information. Without + // this, there would be a race condition between the next scheduling cycle + // and the time the scheduler receives a Pod Update for the nominated pod. + sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName) + + // Make a call to update nominated node name of the pod on the API server. err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName) if err != nil { klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err) + sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor) return "", err } + for _, victim := range victims { if err := sched.config.PodPreemptor.DeletePod(victim); err != nil { klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) From b75672c4bae487c4b3e2dd7db7692864368ed68a Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Thu, 20 Dec 2018 17:34:40 -0800 Subject: [PATCH 3/3] autogenerated files --- test/integration/scheduler/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index e031f7d2c9..c9ad35323b 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -26,6 +26,7 @@ go_test( "//cmd/kube-scheduler/app:go_default_library", "//cmd/kube-scheduler/app/config:go_default_library", "//pkg/api/legacyscheme:go_default_library", + "//pkg/api/v1/pod:go_default_library", "//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/controller/volume/persistentvolume/options:go_default_library",