Merge pull request #62933 from bsalamat/fix_pdb

Automatic merge from submit-queue (batch tested with PRs 63914, 63887, 64116, 64026, 62933). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix preemption tests that use PDB

**What this PR does / why we need it**:
Scheduler integration tests that test preemption in presence of PDB had an issue causing PDB status not getting updated. This PR fixes the issue.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```

/sig scheduling

xref/ #57057
pull/8/head
Kubernetes Submit Queue 2018-05-22 17:36:22 -07:00 committed by GitHub
commit b5cd7d81bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 21 deletions

View File

@ -89,6 +89,8 @@ go_library(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/disruption:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
@ -97,9 +99,11 @@ go_library(
"//test/integration/framework:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -27,7 +27,6 @@ import (
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
@ -601,6 +600,18 @@ func mkMinAvailablePDB(name, namespace string, uid types.UID, minAvailable int,
}
}
func addPodConditionReady(pod *v1.Pod) {
pod.Status = v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
}
}
// TestPDBInPreemption tests PodDisruptionBudget support in preemption.
func TestPDBInPreemption(t *testing.T) {
// Enable PodPriority feature gate.
@ -610,6 +621,8 @@ func TestPDBInPreemption(t *testing.T) {
defer cleanupTest(t, context)
cs := context.clientSet
initDisruptionController(context)
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)},
@ -629,6 +642,7 @@ func TestPDBInPreemption(t *testing.T) {
description string
nodes []*nodeConfig
pdbs []*policy.PodDisruptionBudget
pdbPodNum []int32
existingPods []*v1.Pod
pod *v1.Pod
preemptedPodIndexes map[int]struct{}
@ -639,6 +653,7 @@ func TestPDBInPreemption(t *testing.T) {
pdbs: []*policy.PodDisruptionBudget{
mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}),
},
pdbPodNum: []int32{2},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
@ -681,6 +696,7 @@ func TestPDBInPreemption(t *testing.T) {
pdbs: []*policy.PodDisruptionBudget{
mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}),
},
pdbPodNum: []int32{1},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
@ -720,6 +736,7 @@ func TestPDBInPreemption(t *testing.T) {
mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo1": "bar"}),
mkMinAvailablePDB("pdb-2", context.ns.Name, types.UID("pdb-2-uid"), 2, map[string]string{"foo2": "bar"}),
},
pdbPodNum: []int32{1, 5},
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "low-pod1",
@ -783,38 +800,22 @@ func TestPDBInPreemption(t *testing.T) {
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
v1.ResourceMemory: *resource.NewQuantity(400, resource.BinarySI)},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}},
// The third node is chosen because PDB is not violated for node 3 and the victims have lower priority than node-2.
preemptedPodIndexes: map[int]struct{}{4: {}, 5: {}, 6: {}},
},
}
for _, test := range tests {
t.Logf("================ Running test: %v\n", test.description)
for _, nodeConf := range test.nodes {
_, err := createNode(cs, nodeConf.name, nodeConf.res)
if err != nil {
t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
}
}
// Create PDBs.
for _, pdb := range test.pdbs {
_, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb)
if err != nil {
t.Fatalf("Failed to create PDB: %v", err)
}
}
// Wait for PDBs to show up in the scheduler's cache.
if err := wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if err != nil {
t.Errorf("Error while polling for PDB: %v", err)
return false, err
}
return len(cachedPDBs) == len(test.pdbs), err
}); err != nil {
t.Fatalf("Not all PDBs were added to the cache: %v", err)
}
pods := make([]*v1.Pod, len(test.existingPods))
var err error
@ -823,7 +824,29 @@ func TestPDBInPreemption(t *testing.T) {
if pods[i], err = runPausePod(cs, p); err != nil {
t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err)
}
// Add pod condition ready so that PDB is updated.
addPodConditionReady(p)
if _, err := context.clientSet.CoreV1().Pods(context.ns.Name).UpdateStatus(p); err != nil {
t.Fatal(err)
}
}
// Wait for Pods to be stable in scheduler cache.
if err := waitCachedPodsStable(context, test.existingPods); err != nil {
t.Fatalf("Not all pods are stable in the cache: %v", err)
}
// Create PDBs.
for _, pdb := range test.pdbs {
_, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb)
if err != nil {
t.Fatalf("Failed to create PDB: %v", err)
}
}
// Wait for PDBs to show up in the scheduler's cache and become stable.
if err := waitCachedPDBsStable(context, test.pdbs, test.pdbPodNum); err != nil {
t.Fatalf("Not all pdbs are stable in the cache: %v", err)
}
// Create the "pod".
preemptor, err := createPausePod(cs, test.pod)
if err != nil {

View File

@ -24,9 +24,11 @@ import (
"time"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
@ -42,6 +44,8 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
@ -194,6 +198,7 @@ func initTestSchedulerWithOptions(
// set setPodInformer if provided.
if setPodInformer {
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
}
eventBroadcaster := record.NewBroadcaster()
@ -218,6 +223,26 @@ func initTestSchedulerWithOptions(
return context
}
// initDisruptionController initializes and runs a Disruption Controller to properly
// update PodDisuptionBudget objects.
func initDisruptionController(context *TestContext) *disruption.DisruptionController {
informers := informers.NewSharedInformerFactory(context.clientSet, 12*time.Hour)
dc := disruption.NewDisruptionController(
informers.Core().V1().Pods(),
informers.Policy().V1beta1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),
informers.Extensions().V1beta1().ReplicaSets(),
informers.Extensions().V1beta1().Deployments(),
informers.Apps().V1beta1().StatefulSets(),
context.clientSet)
informers.Start(context.schedulerConfig.StopEverything)
informers.WaitForCacheSync(context.schedulerConfig.StopEverything)
go dc.Run(context.schedulerConfig.StopEverything)
return dc
}
// initTest initializes a test environment and creates master and scheduler with default
// configuration.
func initTest(t *testing.T, nsPrefix string) *TestContext {
@ -514,6 +539,59 @@ func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
}
// waitCachedPDBsStable waits for PDBs in scheduler cache to have "CurrentHealthy" status equal to
// the expected values.
func waitCachedPDBsStable(context *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
if err != nil {
return false, err
}
if len(cachedPDBs) != len(pdbs) {
return false, nil
}
for i, pdb := range pdbs {
found := false
for _, cpdb := range cachedPDBs {
if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
found = true
if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
return false, nil
}
}
}
if !found {
return false, nil
}
}
return true, nil
})
}
// waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(context *TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := context.scheduler.Config().SchedulerCache.List(labels.Everything())
if err != nil {
return false, err
}
if len(pods) != len(cachedPods) {
return false, nil
}
for _, p := range pods {
actualPod, err1 := context.clientSet.CoreV1().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{})
if err1 != nil {
return false, err1
}
cachedPod, err2 := context.scheduler.Config().SchedulerCache.GetPod(actualPod)
if err2 != nil || cachedPod == nil {
return false, err2
}
}
return true, nil
})
}
// deletePod deletes the given pod in the given namespace.
func deletePod(cs clientset.Interface, podName string, nsName string) error {
return cs.CoreV1().Pods(nsName).Delete(podName, metav1.NewDeleteOptions(0))