Test rescheduling on various events.

- Add resyncPeriod parameter for setupCluster to make resync period of
scheduler configurable.
- Add test case for static provisioning and delay binding storage class.
Move pods into active queue on PV add/update events.
- Add a stress test with scheduler resync to detect possible race
conditions.
pull/8/head
Yecheng Fu 2018-07-20 09:54:46 +08:00
parent 8f0373792f
commit c868b5bc88
3 changed files with 191 additions and 24 deletions

View File

@ -30,6 +30,7 @@ go_test(
"//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/nodelifecycle:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/controller/volume/persistentvolume/options:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library", "//pkg/kubeapiserver/admission:go_default_library",
"//pkg/scheduler:go_default_library", "//pkg/scheduler:go_default_library",
@ -38,6 +39,8 @@ go_test(
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/factory:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//plugin/pkg/admission/podtolerationrestriction:go_default_library", "//plugin/pkg/admission/podtolerationrestriction:go_default_library",
"//plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction:go_default_library", "//plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -141,7 +141,7 @@ func initTestScheduler(
) *TestContext { ) *TestContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time. // feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false) return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, time.Second)
} }
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -153,6 +153,7 @@ func initTestSchedulerWithOptions(
setPodInformer bool, setPodInformer bool,
policy *schedulerapi.Policy, policy *schedulerapi.Policy,
disablePreemption bool, disablePreemption bool,
resyncPeriod time.Duration,
) *TestContext { ) *TestContext {
// Enable EnableEquivalenceClassCache for all integration tests. // Enable EnableEquivalenceClassCache for all integration tests.
defer utilfeaturetesting.SetFeatureGateDuringTest( defer utilfeaturetesting.SetFeatureGateDuringTest(
@ -161,7 +162,7 @@ func initTestSchedulerWithOptions(
features.EnableEquivalenceClassCache, true)() features.EnableEquivalenceClassCache, true)()
// 1. Create scheduler // 1. Create scheduler
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, time.Second) context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod)
var podInformer coreinformers.PodInformer var podInformer coreinformers.PodInformer
@ -253,7 +254,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
// configuration but with pod preemption disabled. // configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
return initTestSchedulerWithOptions( return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true) t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, time.Second)
} }
// cleanupTest deletes the scheduler and the test namespace. It should be called // cleanupTest deletes the scheduler and the test namespace. It should be called

View File

@ -34,8 +34,12 @@ import (
"k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
) )
type testConfig struct { type testConfig struct {
@ -58,11 +62,12 @@ var (
) )
const ( const (
node1 = "node-1" node1 = "node-1"
node2 = "node-2" node2 = "node-2"
podLimit = 100 podLimit = 100
volsPerPod = 5 volsPerPod = 5
nodeAffinityLabelKey = "kubernetes.io/hostname" nodeAffinityLabelKey = "kubernetes.io/hostname"
provisionerPluginName = "kubernetes.io/mock-provisioner"
) )
type testPV struct { type testPV struct {
@ -79,7 +84,11 @@ type testPVC struct {
} }
func TestVolumeBinding(t *testing.T) { func TestVolumeBinding(t *testing.T) {
config := setupCluster(t, "volume-scheduling", 2) features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-scheduling", 2, features, 0)
defer config.teardown() defer config.teardown()
cases := map[string]struct { cases := map[string]struct {
@ -246,9 +255,124 @@ func TestVolumeBinding(t *testing.T) {
} }
} }
// TestVolumeBindingRescheduling tests scheduler will retry scheduling when needed.
func TestVolumeBindingRescheduling(t *testing.T) {
features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
"DynamicProvisioningScheduling": true,
}
config := setupCluster(t, "volume-scheduling", 2, features, 0)
defer config.teardown()
storageClassName := "local-storage"
cases := map[string]struct {
pod *v1.Pod
pvcs []*testPVC
trigger func(config *testConfig)
shouldFail bool
}{
"reschedule on WaitForFirstConsumer dynamic storage class add": {
pod: makePod("pod-reschedule-onclassadd-dynamic", config.ns, []string{"pvc-reschedule-onclassadd-dynamic"}),
pvcs: []*testPVC{
{"pvc-reschedule-onclassadd-dynamic", "", ""},
},
trigger: func(config *testConfig) {
sc := makeDynamicProvisionerStorageClass(storageClassName, &modeWait)
if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil {
t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
}
},
shouldFail: false,
},
"reschedule on WaitForFirstConsumer static storage class add": {
pod: makePod("pod-reschedule-onclassadd-static", config.ns, []string{"pvc-reschedule-onclassadd-static"}),
pvcs: []*testPVC{
{"pvc-reschedule-onclassadd-static", "", ""},
},
trigger: func(config *testConfig) {
sc := makeStorageClass(storageClassName, &modeWait)
if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil {
t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
}
// Create pv for this class to mock static provisioner behavior.
pv := makePV("pv-reschedule-onclassadd-static", storageClassName, "", "", node1)
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
}
},
shouldFail: false,
},
// TODO test rescheduling on PVC add/update
}
for name, test := range cases {
glog.Infof("Running test %v", name)
if test.pod == nil {
t.Fatal("pod is required for this test")
}
// Create unbound pvc
for _, pvcConfig := range test.pvcs {
pvc := makePVC(pvcConfig.name, config.ns, &storageClassName, "")
if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil {
t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
}
}
// Create pod
if _, err := config.client.CoreV1().Pods(config.ns).Create(test.pod); err != nil {
t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
}
// Wait for pod is unschedulable.
glog.Infof("Waiting for pod is unschedulable")
if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
}
// Trigger
test.trigger(config)
// Wait for pod is scheduled or unscheduable.
if !test.shouldFail {
glog.Infof("Waiting for pod is scheduled")
if err := waitForPodToSchedule(config.client, test.pod); err != nil {
t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
}
} else {
glog.Infof("Waiting for pod is unschedulable")
if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
}
}
// Force delete objects, but they still may not be immediately removed
deleteTestObjects(config.client, config.ns, deleteOption)
}
}
// TestVolumeBindingStress creates <podLimit> pods, each with <volsPerPod> unbound PVCs. // TestVolumeBindingStress creates <podLimit> pods, each with <volsPerPod> unbound PVCs.
func TestVolumeBindingStress(t *testing.T) { func TestVolumeBindingStress(t *testing.T) {
config := setupCluster(t, "volume-binding-stress", 1) testVolumeBindingStress(t, 0)
}
// Like TestVolumeBindingStress but with scheduler resync. In real cluster,
// scheduler will schedule failed pod frequently due to various events, e.g.
// service/node update events.
// This is useful to detect possible race conditions.
func TestVolumeBindingStressWithSchedulerResync(t *testing.T) {
testVolumeBindingStress(t, time.Second)
}
func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) {
features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-binding-stress", 1, features, schedulerResyncPeriod)
defer config.teardown() defer config.teardown()
// Create enough PVs and PVCs for all the pods // Create enough PVs and PVCs for all the pods
@ -303,7 +427,11 @@ func TestVolumeBindingStress(t *testing.T) {
} }
func TestPVAffinityConflict(t *testing.T) { func TestPVAffinityConflict(t *testing.T) {
config := setupCluster(t, "volume-scheduling", 3) features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-scheduling", 3, features, 0)
defer config.teardown() defer config.teardown()
pv := makePV("local-pv", classImmediate, "", "", node1) pv := makePV("local-pv", classImmediate, "", "", node1)
@ -361,30 +489,51 @@ func TestPVAffinityConflict(t *testing.T) {
} }
} }
func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig { func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration) *testConfig {
// Enable feature gates oldFeatures := make(map[string]bool, len(features))
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true,PersistentLocalVolumes=true") for feature := range features {
oldFeatures[feature] = utilfeature.DefaultFeatureGate.Enabled(utilfeature.Feature(feature))
}
// Set feature gates
utilfeature.DefaultFeatureGate.SetFromMap(features)
controllerCh := make(chan struct{}) controllerCh := make(chan struct{})
context := initTestScheduler(t, initTestMaster(t, nsName, nil), controllerCh, false, nil) context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, resyncPeriod)
clientset := context.clientSet clientset := context.clientSet
ns := context.ns.Name ns := context.ns.Name
informers := context.informerFactory // Informers factory for controllers, we disable resync period for testing.
informerFactory := informers.NewSharedInformerFactory(context.clientSet, 0)
// Start PV controller for volume binding. // Start PV controller for volume binding.
host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)
plugin := &volumetest.FakeVolumePlugin{
PluginName: provisionerPluginName,
Host: host,
Config: volume.VolumeConfig{},
LastProvisionerOptions: volume.VolumeOptions{},
NewAttacherCallCount: 0,
NewDetacherCallCount: 0,
Mounters: nil,
Unmounters: nil,
Attachers: nil,
Detachers: nil,
}
plugins := []volume.VolumePlugin{plugin}
controllerOptions := persistentvolumeoptions.NewPersistentVolumeControllerOptions()
params := persistentvolume.ControllerParameters{ params := persistentvolume.ControllerParameters{
KubeClient: clientset, KubeClient: clientset,
SyncPeriod: time.Hour, // test shouldn't need to resync SyncPeriod: controllerOptions.PVClaimBinderSyncPeriod,
VolumePlugins: nil, // TODO; need later for dynamic provisioning VolumePlugins: plugins,
Cloud: nil, Cloud: nil,
ClusterName: "volume-test-cluster", ClusterName: "volume-test-cluster",
VolumeInformer: informers.Core().V1().PersistentVolumes(), VolumeInformer: informerFactory.Core().V1().PersistentVolumes(),
ClaimInformer: informers.Core().V1().PersistentVolumeClaims(), ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ClassInformer: informers.Storage().V1().StorageClasses(), ClassInformer: informerFactory.Storage().V1().StorageClasses(),
PodInformer: informers.Core().V1().Pods(), PodInformer: informerFactory.Core().V1().Pods(),
NodeInformer: informers.Core().V1().Nodes(), NodeInformer: informerFactory.Core().V1().Nodes(),
EnableDynamicProvisioning: true, EnableDynamicProvisioning: true,
} }
ctrl, err := persistentvolume.NewController(params) ctrl, err := persistentvolume.NewController(params)
@ -392,6 +541,9 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
t.Fatalf("Failed to create PV controller: %v", err) t.Fatalf("Failed to create PV controller: %v", err)
} }
go ctrl.Run(controllerCh) go ctrl.Run(controllerCh)
// Start informer factory after all controllers are configured and running.
informerFactory.Start(controllerCh)
informerFactory.WaitForCacheSync(controllerCh)
// Create shared objects // Create shared objects
// Create nodes // Create nodes
@ -439,7 +591,8 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
teardown: func() { teardown: func() {
deleteTestObjects(clientset, ns, nil) deleteTestObjects(clientset, ns, nil)
cleanupTest(t, context) cleanupTest(t, context)
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false,LocalPersistentVolumes=false") // Restore feature gates
utilfeature.DefaultFeatureGate.SetFromMap(oldFeatures)
}, },
} }
} }
@ -461,6 +614,16 @@ func makeStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1
} }
} }
func makeDynamicProvisionerStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1.StorageClass {
return &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Provisioner: provisionerPluginName,
VolumeBindingMode: mode,
}
}
func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume { func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume {
pv := &v1.PersistentVolume{ pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{