diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index ffa0e4237e..70b590b479 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -33,6 +33,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" clientset "k8s.io/client-go/kubernetes" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" testutils "k8s.io/kubernetes/test/utils" @@ -73,7 +74,7 @@ func TestPreemption(t *testing.T) { defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } tests := []struct { @@ -91,7 +92,7 @@ func TestPreemption(t *testing.T) { Priority: &lowPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), }, @@ -101,7 +102,7 @@ func TestPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), preemptedPodIndexes: map[int]struct{}{0: {}}, @@ -237,7 +238,7 @@ func TestPreemption(t *testing.T) { nodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } node, err := createNode(context.clientSet, "node1", nodeRes) if err != nil { @@ -313,7 +314,7 @@ func TestDisablePreemption(t *testing.T) { Priority: &lowPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), }, @@ -323,7 +324,7 @@ func TestDisablePreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), }, @@ -333,7 +334,7 @@ func TestDisablePreemption(t *testing.T) { nodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } _, err := createNode(context.clientSet, "node1", nodeRes) if err != nil { @@ -375,7 +376,7 @@ func TestDisablePreemption(t *testing.T) { func mkPriorityPodWithGrace(tc *TestContext, name string, priority int32, grace int64) *v1.Pod { defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } pod := initPausePod(tc.clientSet, &pausePodConfig{ Name: name, @@ -420,7 +421,7 @@ func TestPreemptionStarvation(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), }, @@ -430,7 +431,7 @@ func TestPreemptionStarvation(t *testing.T) { nodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } _, err := createNode(context.clientSet, "node1", nodeRes) if err != nil { @@ -490,6 +491,119 @@ func TestPreemptionStarvation(t *testing.T) { } } +// TestPreemptionRaces tests that other scheduling events and operations do not +// race with the preemption process. +func TestPreemptionRaces(t *testing.T) { + // Initialize scheduler. + context := initTest(t, "preemption-race") + defer cleanupTest(t, context) + cs := context.clientSet + + tests := []struct { + description string + numInitialPods int // Pods created and executed before running preemptor + numAdditionalPods int // Pods created after creating the preemptor + numRepetitions int // Repeat the tests to check races + preemptor *v1.Pod + }{ + { + // This test ensures that while the preempting pod is waiting for the victims + // terminate, other lower priority pods are not scheduled in the room created + // after preemption and while the higher priority pods is not scheduled yet. + description: "ensures that other pods are not scheduled while preemptor is being marked as nominated (issue #72124)", + numInitialPods: 2, + numAdditionalPods: 50, + numRepetitions: 10, + preemptor: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(4900, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(4900, resource.DecimalSI)}, + }, + }), + }, + } + + // Create a node with some resources and a label. + nodeRes := &v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(100, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(5000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(5000, resource.DecimalSI), + } + _, err := createNode(context.clientSet, "node1", nodeRes) + if err != nil { + t.Fatalf("Error creating nodes: %v", err) + } + + for _, test := range tests { + if test.numRepetitions <= 0 { + test.numRepetitions = 1 + } + for n := 0; n < test.numRepetitions; n++ { + initialPods := make([]*v1.Pod, test.numInitialPods) + additionalPods := make([]*v1.Pod, test.numAdditionalPods) + // Create and run existingPods. + for i := 0; i < test.numInitialPods; i++ { + initialPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(context, fmt.Sprintf("rpod-%v", i), mediumPriority, 0)) + if err != nil { + t.Fatalf("Test [%v]: Error creating pause pod: %v", test.description, err) + } + } + // make sure that initial Pods are all scheduled. + for _, p := range initialPods { + if err := waitForPodToSchedule(cs, p); err != nil { + t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) + } + } + // Create the preemptor. + klog.Info("Creating the preemptor pod...") + preemptor, err := createPausePod(cs, test.preemptor) + if err != nil { + t.Errorf("Error while creating the preempting pod: %v", err) + } + + klog.Info("Creating additional pods...") + for i := 0; i < test.numAdditionalPods; i++ { + additionalPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(context, fmt.Sprintf("ppod-%v", i), mediumPriority, 0)) + if err != nil { + t.Fatalf("Test [%v]: Error creating pending pod: %v", test.description, err) + } + } + // Check that the preemptor pod gets nominated node name. + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v/%v: %v", test.description, preemptor.Namespace, preemptor.Name, err) + } + // Make sure that preemptor is scheduled after preemptions. + if err := waitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil { + t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err) + } + + klog.Info("Check unschedulable pods still exists and were never scheduled...") + for _, p := range additionalPods { + pod, err := cs.CoreV1().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error in getting Pod %v/%v info: %v", p.Namespace, p.Name, err) + } + if len(pod.Spec.NodeName) > 0 { + t.Errorf("Pod %v/%v is already scheduled", p.Namespace, p.Name) + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + if cond != nil && cond.Status != v1.ConditionFalse { + t.Errorf("Pod %v/%v is no longer unschedulable: %v", p.Namespace, p.Name, err) + } + } + // Cleanup + klog.Info("Cleaning up all pods...") + allPods := additionalPods + allPods = append(allPods, initialPods...) + allPods = append(allPods, preemptor) + cleanupPods(cs, t, allPods) + } + } +} + // TestNominatedNodeCleanUp checks that when there are nominated pods on a // node and a higher priority pod is nominated to run on the node, the nominated // node name of the lower priority pods is cleared. @@ -515,7 +629,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { nodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } _, err := createNode(context.clientSet, "node1", nodeRes) if err != nil { @@ -543,7 +657,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { Priority: &mediumPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(400, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, }, }) medPriPod, err := createPausePod(cs, podConf) @@ -561,7 +675,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }) highPriPod, err := createPausePod(cs, podConf) @@ -626,12 +740,12 @@ func TestPDBInPreemption(t *testing.T) { defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } defaultNodeRes := &v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI), } type nodeConfig struct { @@ -683,7 +797,7 @@ func TestPDBInPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), preemptedPodIndexes: map[int]struct{}{2: {}}, @@ -721,7 +835,7 @@ func TestPDBInPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, }, }), preemptedPodIndexes: map[int]struct{}{1: {}}, @@ -801,7 +915,7 @@ func TestPDBInPreemption(t *testing.T) { Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(400, resource.BinarySI)}, + v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, }, }), // The third node is chosen because PDB is not violated for node 3 and the victims have lower priority than node-2. diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 11e990c665..c35502376d 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -582,6 +582,17 @@ func podScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond } } +// podUnschedulable returns a condition function that returns true if the given pod +// gets unschedulable status. +func podSchedulableCondition(c clientset.Interface, podNamespace, podName string) (*v1.PodCondition, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + return cond, nil +} + // podUnschedulable returns a condition function that returns true if the given pod // gets unschedulable status. func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { @@ -710,7 +721,7 @@ func cleanupPods(cs clientset.Interface, t *testing.T, pods []*v1.Pod) { } } for _, p := range pods { - if err := wait.Poll(time.Second, wait.ForeverTestTimeout, + if err := wait.Poll(time.Millisecond, wait.ForeverTestTimeout, podDeleted(cs, p.Namespace, p.Name)); err != nil { t.Errorf("error while waiting for pod %v/%v to get deleted: %v", p.Namespace, p.Name, err) }