diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index bec90c1b8a..12a851cdfd 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -151,7 +151,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume // Immediate claims should be bound if len(unboundClaimsImmediate) > 0 { - return false, false, fmt.Errorf("pod has unbound PersistentVolumeClaims") + return false, false, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims") } // Check PV node affinity on bound volumes diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index b7a094125c..971c59b272 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -34,6 +34,7 @@ go_library( "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index e65ed29bcc..5c5dfa5b49 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -29,6 +29,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/api/policy/v1beta1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -300,6 +301,13 @@ func NewConfigFactory( if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // Setup volume binder c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer) + + storageClassInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.onStorageClassAdd, + DeleteFunc: c.onStorageClassDelete, + }, + ) } // Setup cache comparer @@ -384,6 +392,13 @@ func (c *configFactory) onPvAdd(obj interface{}) { } c.invalidatePredicatesForPv(pv) } + // Pods created when there are no PVs available will be stuck in + // unschedulable queue. But unbound PVs created for static provisioning and + // delay binding storage class are skipped in PV controller dynamic + // provisiong and binding process, will not trigger events to schedule pod + // again. So we need to move pods to active queue on PV add for this + // scenario. + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onPvUpdate(old, new interface{}) { @@ -400,6 +415,11 @@ func (c *configFactory) onPvUpdate(old, new interface{}) { } c.invalidatePredicatesForPvUpdate(oldPV, newPV) } + // Scheduler.bindVolumesWorker may fail to update assumed pod volume + // bindings due to conflicts if PVs are updated by PV controller or other + // parties, then scheduler will add pod back to unschedulable queue. We + // need to move pods to active queue on PV update for this scenario. + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) { @@ -564,6 +584,59 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } +func (c *configFactory) onStorageClassAdd(obj interface{}) { + sc, ok := obj.(*storagev1.StorageClass) + if !ok { + glog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj) + return + } + + // CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these + // PVCs have specified StorageClass name, creating StorageClass objects + // with late binding will cause predicates to pass, so we need to move pods + // to active queue. + // We don't need to invalidate cached results because results will not be + // cached for pod that has unbound immediate PVCs. + if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { + c.podQueue.MoveAllToActiveQueue() + } +} + +func (c *configFactory) onStorageClassDelete(obj interface{}) { + if c.enableEquivalenceClassCache { + var sc *storagev1.StorageClass + switch t := obj.(type) { + case *storagev1.StorageClass: + sc = t + case cache.DeletedFinalStateUnknown: + var ok bool + sc, ok = t.Obj.(*storagev1.StorageClass) + if !ok { + glog.Errorf("cannot convert to *storagev1.StorageClass: %v", t.Obj) + return + } + default: + glog.Errorf("cannot convert to *storagev1.StorageClass: %v", t) + return + } + c.invalidatePredicatesForStorageClass(sc) + } +} + +func (c *configFactory) invalidatePredicatesForStorageClass(sc *storagev1.StorageClass) { + invalidPredicates := sets.NewString() + + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { + // Delete can cause predicates to fail + invalidPredicates.Insert(predicates.CheckVolumeBindingPred) + invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) + } + } + + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) +} + func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index 7b80b65e61..bcc0f35c80 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -30,6 +30,7 @@ go_test( "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", + "//pkg/controller/volume/persistentvolume/options:go_default_library", "//pkg/features:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//pkg/scheduler:go_default_library", @@ -38,6 +39,8 @@ go_test( "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/factory:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/testing:go_default_library", "//plugin/pkg/admission/podtolerationrestriction:go_default_library", "//plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index e40994b5d5..a2f954109f 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -142,7 +142,7 @@ func initTestScheduler( ) *TestContext { // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority // feature gate is enabled at the same time. - return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false) + return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, time.Second) } // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -154,6 +154,7 @@ func initTestSchedulerWithOptions( setPodInformer bool, policy *schedulerapi.Policy, disablePreemption bool, + resyncPeriod time.Duration, ) *TestContext { // Enable EnableEquivalenceClassCache for all integration tests. defer utilfeaturetesting.SetFeatureGateDuringTest( @@ -162,7 +163,7 @@ func initTestSchedulerWithOptions( features.EnableEquivalenceClassCache, true)() // 1. Create scheduler - context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, time.Second) + context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod) var podInformer coreinformers.PodInformer @@ -254,7 +255,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { // configuration but with pod preemption disabled. func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { return initTestSchedulerWithOptions( - t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true) + t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, time.Second) } // cleanupTest deletes the scheduler and the test namespace. It should be called diff --git a/test/integration/scheduler/volume_binding_test.go b/test/integration/scheduler/volume_binding_test.go index 6a55459041..989df65870 100644 --- a/test/integration/scheduler/volume_binding_test.go +++ b/test/integration/scheduler/volume_binding_test.go @@ -34,8 +34,12 @@ import ( "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" + persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" ) type testConfig struct { @@ -58,11 +62,12 @@ var ( ) const ( - node1 = "node-1" - node2 = "node-2" - podLimit = 100 - volsPerPod = 5 - nodeAffinityLabelKey = "kubernetes.io/hostname" + node1 = "node-1" + node2 = "node-2" + podLimit = 100 + volsPerPod = 5 + nodeAffinityLabelKey = "kubernetes.io/hostname" + provisionerPluginName = "kubernetes.io/mock-provisioner" ) type testPV struct { @@ -79,7 +84,11 @@ type testPVC struct { } func TestVolumeBinding(t *testing.T) { - config := setupCluster(t, "volume-scheduling", 2) + features := map[string]bool{ + "VolumeScheduling": true, + "PersistentLocalVolumes": true, + } + config := setupCluster(t, "volume-scheduling", 2, features, 0) defer config.teardown() cases := map[string]struct { @@ -246,9 +255,124 @@ func TestVolumeBinding(t *testing.T) { } } +// TestVolumeBindingRescheduling tests scheduler will retry scheduling when needed. +func TestVolumeBindingRescheduling(t *testing.T) { + features := map[string]bool{ + "VolumeScheduling": true, + "PersistentLocalVolumes": true, + "DynamicProvisioningScheduling": true, + } + config := setupCluster(t, "volume-scheduling", 2, features, 0) + defer config.teardown() + + storageClassName := "local-storage" + + cases := map[string]struct { + pod *v1.Pod + pvcs []*testPVC + trigger func(config *testConfig) + shouldFail bool + }{ + "reschedule on WaitForFirstConsumer dynamic storage class add": { + pod: makePod("pod-reschedule-onclassadd-dynamic", config.ns, []string{"pvc-reschedule-onclassadd-dynamic"}), + pvcs: []*testPVC{ + {"pvc-reschedule-onclassadd-dynamic", "", ""}, + }, + trigger: func(config *testConfig) { + sc := makeDynamicProvisionerStorageClass(storageClassName, &modeWait) + if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil { + t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err) + } + }, + shouldFail: false, + }, + "reschedule on WaitForFirstConsumer static storage class add": { + pod: makePod("pod-reschedule-onclassadd-static", config.ns, []string{"pvc-reschedule-onclassadd-static"}), + pvcs: []*testPVC{ + {"pvc-reschedule-onclassadd-static", "", ""}, + }, + trigger: func(config *testConfig) { + sc := makeStorageClass(storageClassName, &modeWait) + if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil { + t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err) + } + // Create pv for this class to mock static provisioner behavior. + pv := makePV("pv-reschedule-onclassadd-static", storageClassName, "", "", node1) + if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { + t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + } + }, + shouldFail: false, + }, + // TODO test rescheduling on PVC add/update + } + + for name, test := range cases { + glog.Infof("Running test %v", name) + + if test.pod == nil { + t.Fatal("pod is required for this test") + } + + // Create unbound pvc + for _, pvcConfig := range test.pvcs { + pvc := makePVC(pvcConfig.name, config.ns, &storageClassName, "") + if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil { + t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err) + } + } + + // Create pod + if _, err := config.client.CoreV1().Pods(config.ns).Create(test.pod); err != nil { + t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err) + } + + // Wait for pod is unschedulable. + glog.Infof("Waiting for pod is unschedulable") + if err := waitForPodUnschedulable(config.client, test.pod); err != nil { + t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err) + } + + // Trigger + test.trigger(config) + + // Wait for pod is scheduled or unscheduable. + if !test.shouldFail { + glog.Infof("Waiting for pod is scheduled") + if err := waitForPodToSchedule(config.client, test.pod); err != nil { + t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err) + } + } else { + glog.Infof("Waiting for pod is unschedulable") + if err := waitForPodUnschedulable(config.client, test.pod); err != nil { + t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err) + } + } + + // Force delete objects, but they still may not be immediately removed + deleteTestObjects(config.client, config.ns, deleteOption) + } +} + // TestVolumeBindingStress creates pods, each with unbound PVCs. func TestVolumeBindingStress(t *testing.T) { - config := setupCluster(t, "volume-binding-stress", 1) + testVolumeBindingStress(t, 0) +} + +// Like TestVolumeBindingStress but with scheduler resync. In real cluster, +// scheduler will schedule failed pod frequently due to various events, e.g. +// service/node update events. +// This is useful to detect possible race conditions. +func TestVolumeBindingStressWithSchedulerResync(t *testing.T) { + testVolumeBindingStress(t, time.Second) +} + +func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) { + features := map[string]bool{ + "VolumeScheduling": true, + "PersistentLocalVolumes": true, + } + config := setupCluster(t, "volume-binding-stress", 1, features, schedulerResyncPeriod) defer config.teardown() // Create enough PVs and PVCs for all the pods @@ -303,7 +427,11 @@ func TestVolumeBindingStress(t *testing.T) { } func TestPVAffinityConflict(t *testing.T) { - config := setupCluster(t, "volume-scheduling", 3) + features := map[string]bool{ + "VolumeScheduling": true, + "PersistentLocalVolumes": true, + } + config := setupCluster(t, "volume-scheduling", 3, features, 0) defer config.teardown() pv := makePV("local-pv", classImmediate, "", "", node1) @@ -361,30 +489,51 @@ func TestPVAffinityConflict(t *testing.T) { } } -func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig { - // Enable feature gates - utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true,PersistentLocalVolumes=true") +func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration) *testConfig { + oldFeatures := make(map[string]bool, len(features)) + for feature := range features { + oldFeatures[feature] = utilfeature.DefaultFeatureGate.Enabled(utilfeature.Feature(feature)) + } + // Set feature gates + utilfeature.DefaultFeatureGate.SetFromMap(features) controllerCh := make(chan struct{}) - context := initTestScheduler(t, initTestMaster(t, nsName, nil), controllerCh, false, nil) + context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, resyncPeriod) clientset := context.clientSet ns := context.ns.Name - informers := context.informerFactory + // Informers factory for controllers, we disable resync period for testing. + informerFactory := informers.NewSharedInformerFactory(context.clientSet, 0) // Start PV controller for volume binding. + host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil) + plugin := &volumetest.FakeVolumePlugin{ + PluginName: provisionerPluginName, + Host: host, + Config: volume.VolumeConfig{}, + LastProvisionerOptions: volume.VolumeOptions{}, + NewAttacherCallCount: 0, + NewDetacherCallCount: 0, + Mounters: nil, + Unmounters: nil, + Attachers: nil, + Detachers: nil, + } + plugins := []volume.VolumePlugin{plugin} + + controllerOptions := persistentvolumeoptions.NewPersistentVolumeControllerOptions() params := persistentvolume.ControllerParameters{ KubeClient: clientset, - SyncPeriod: time.Hour, // test shouldn't need to resync - VolumePlugins: nil, // TODO; need later for dynamic provisioning + SyncPeriod: controllerOptions.PVClaimBinderSyncPeriod, + VolumePlugins: plugins, Cloud: nil, ClusterName: "volume-test-cluster", - VolumeInformer: informers.Core().V1().PersistentVolumes(), - ClaimInformer: informers.Core().V1().PersistentVolumeClaims(), - ClassInformer: informers.Storage().V1().StorageClasses(), - PodInformer: informers.Core().V1().Pods(), - NodeInformer: informers.Core().V1().Nodes(), + VolumeInformer: informerFactory.Core().V1().PersistentVolumes(), + ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(), + ClassInformer: informerFactory.Storage().V1().StorageClasses(), + PodInformer: informerFactory.Core().V1().Pods(), + NodeInformer: informerFactory.Core().V1().Nodes(), EnableDynamicProvisioning: true, } ctrl, err := persistentvolume.NewController(params) @@ -392,6 +541,9 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig { t.Fatalf("Failed to create PV controller: %v", err) } go ctrl.Run(controllerCh) + // Start informer factory after all controllers are configured and running. + informerFactory.Start(controllerCh) + informerFactory.WaitForCacheSync(controllerCh) // Create shared objects // Create nodes @@ -439,7 +591,8 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig { teardown: func() { deleteTestObjects(clientset, ns, nil) cleanupTest(t, context) - utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false,LocalPersistentVolumes=false") + // Restore feature gates + utilfeature.DefaultFeatureGate.SetFromMap(oldFeatures) }, } } @@ -461,6 +614,16 @@ func makeStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1 } } +func makeDynamicProvisionerStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1.StorageClass { + return &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Provisioner: provisionerPluginName, + VolumeBindingMode: mode, + } +} + func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume { pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{