mirror of https://github.com/k3s-io/k3s
Merge pull request #65616 from cofyc/fix56163
Automatic merge from submit-queue (batch tested with PRs 65570, 65616). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Retry scheduling on StorageClass events **What this PR does / why we need it**: **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #56163 **Special notes for your reviewer**: I have taken over #60006. It's hard to test in e2e, because we cannot know reschedule of pod is triggered by which event (periodically service/node events will move pods to active queue too). ~~I'll add integration tests for this functionality after [this PR](https://github.com/kubernetes/kubernetes/pull/65296) get merged.~~ (already added) **Release note**: ```release-note NONE ```pull/8/head
commit
f4d8220df5
|
@ -151,7 +151,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
|
||||||
|
|
||||||
// Immediate claims should be bound
|
// Immediate claims should be bound
|
||||||
if len(unboundClaimsImmediate) > 0 {
|
if len(unboundClaimsImmediate) > 0 {
|
||||||
return false, false, fmt.Errorf("pod has unbound PersistentVolumeClaims")
|
return false, false, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check PV node affinity on bound volumes
|
// Check PV node affinity on bound volumes
|
||||||
|
|
|
@ -34,6 +34,7 @@ go_library(
|
||||||
"//pkg/scheduler/volumebinder:go_default_library",
|
"//pkg/scheduler/volumebinder:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/api/policy/v1beta1"
|
"k8s.io/api/policy/v1beta1"
|
||||||
|
storagev1 "k8s.io/api/storage/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
@ -300,6 +301,13 @@ func NewConfigFactory(
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||||
// Setup volume binder
|
// Setup volume binder
|
||||||
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)
|
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)
|
||||||
|
|
||||||
|
storageClassInformer.Informer().AddEventHandler(
|
||||||
|
cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: c.onStorageClassAdd,
|
||||||
|
DeleteFunc: c.onStorageClassDelete,
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup cache comparer
|
// Setup cache comparer
|
||||||
|
@ -384,6 +392,13 @@ func (c *configFactory) onPvAdd(obj interface{}) {
|
||||||
}
|
}
|
||||||
c.invalidatePredicatesForPv(pv)
|
c.invalidatePredicatesForPv(pv)
|
||||||
}
|
}
|
||||||
|
// Pods created when there are no PVs available will be stuck in
|
||||||
|
// unschedulable queue. But unbound PVs created for static provisioning and
|
||||||
|
// delay binding storage class are skipped in PV controller dynamic
|
||||||
|
// provisiong and binding process, will not trigger events to schedule pod
|
||||||
|
// again. So we need to move pods to active queue on PV add for this
|
||||||
|
// scenario.
|
||||||
|
c.podQueue.MoveAllToActiveQueue()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) onPvUpdate(old, new interface{}) {
|
func (c *configFactory) onPvUpdate(old, new interface{}) {
|
||||||
|
@ -400,6 +415,11 @@ func (c *configFactory) onPvUpdate(old, new interface{}) {
|
||||||
}
|
}
|
||||||
c.invalidatePredicatesForPvUpdate(oldPV, newPV)
|
c.invalidatePredicatesForPvUpdate(oldPV, newPV)
|
||||||
}
|
}
|
||||||
|
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
|
||||||
|
// bindings due to conflicts if PVs are updated by PV controller or other
|
||||||
|
// parties, then scheduler will add pod back to unschedulable queue. We
|
||||||
|
// need to move pods to active queue on PV update for this scenario.
|
||||||
|
c.podQueue.MoveAllToActiveQueue()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
|
func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
|
||||||
|
@ -564,6 +584,59 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent
|
||||||
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *configFactory) onStorageClassAdd(obj interface{}) {
|
||||||
|
sc, ok := obj.(*storagev1.StorageClass)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
|
||||||
|
// PVCs have specified StorageClass name, creating StorageClass objects
|
||||||
|
// with late binding will cause predicates to pass, so we need to move pods
|
||||||
|
// to active queue.
|
||||||
|
// We don't need to invalidate cached results because results will not be
|
||||||
|
// cached for pod that has unbound immediate PVCs.
|
||||||
|
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
|
||||||
|
c.podQueue.MoveAllToActiveQueue()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *configFactory) onStorageClassDelete(obj interface{}) {
|
||||||
|
if c.enableEquivalenceClassCache {
|
||||||
|
var sc *storagev1.StorageClass
|
||||||
|
switch t := obj.(type) {
|
||||||
|
case *storagev1.StorageClass:
|
||||||
|
sc = t
|
||||||
|
case cache.DeletedFinalStateUnknown:
|
||||||
|
var ok bool
|
||||||
|
sc, ok = t.Obj.(*storagev1.StorageClass)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("cannot convert to *storagev1.StorageClass: %v", t.Obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
glog.Errorf("cannot convert to *storagev1.StorageClass: %v", t)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.invalidatePredicatesForStorageClass(sc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *configFactory) invalidatePredicatesForStorageClass(sc *storagev1.StorageClass) {
|
||||||
|
invalidPredicates := sets.NewString()
|
||||||
|
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||||
|
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
|
||||||
|
// Delete can cause predicates to fail
|
||||||
|
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
|
||||||
|
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *configFactory) onServiceAdd(obj interface{}) {
|
func (c *configFactory) onServiceAdd(obj interface{}) {
|
||||||
if c.enableEquivalenceClassCache {
|
if c.enableEquivalenceClassCache {
|
||||||
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
|
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
|
||||||
|
|
|
@ -30,6 +30,7 @@ go_test(
|
||||||
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||||
"//pkg/controller/nodelifecycle:go_default_library",
|
"//pkg/controller/nodelifecycle:go_default_library",
|
||||||
"//pkg/controller/volume/persistentvolume:go_default_library",
|
"//pkg/controller/volume/persistentvolume:go_default_library",
|
||||||
|
"//pkg/controller/volume/persistentvolume/options:go_default_library",
|
||||||
"//pkg/features:go_default_library",
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/kubeapiserver/admission:go_default_library",
|
"//pkg/kubeapiserver/admission:go_default_library",
|
||||||
"//pkg/scheduler:go_default_library",
|
"//pkg/scheduler:go_default_library",
|
||||||
|
@ -38,6 +39,8 @@ go_test(
|
||||||
"//pkg/scheduler/api:go_default_library",
|
"//pkg/scheduler/api:go_default_library",
|
||||||
"//pkg/scheduler/cache:go_default_library",
|
"//pkg/scheduler/cache:go_default_library",
|
||||||
"//pkg/scheduler/factory:go_default_library",
|
"//pkg/scheduler/factory:go_default_library",
|
||||||
|
"//pkg/volume:go_default_library",
|
||||||
|
"//pkg/volume/testing:go_default_library",
|
||||||
"//plugin/pkg/admission/podtolerationrestriction:go_default_library",
|
"//plugin/pkg/admission/podtolerationrestriction:go_default_library",
|
||||||
"//plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction:go_default_library",
|
"//plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
|
|
@ -142,7 +142,7 @@ func initTestScheduler(
|
||||||
) *TestContext {
|
) *TestContext {
|
||||||
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
|
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
|
||||||
// feature gate is enabled at the same time.
|
// feature gate is enabled at the same time.
|
||||||
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false)
|
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
|
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
|
||||||
|
@ -154,6 +154,7 @@ func initTestSchedulerWithOptions(
|
||||||
setPodInformer bool,
|
setPodInformer bool,
|
||||||
policy *schedulerapi.Policy,
|
policy *schedulerapi.Policy,
|
||||||
disablePreemption bool,
|
disablePreemption bool,
|
||||||
|
resyncPeriod time.Duration,
|
||||||
) *TestContext {
|
) *TestContext {
|
||||||
// Enable EnableEquivalenceClassCache for all integration tests.
|
// Enable EnableEquivalenceClassCache for all integration tests.
|
||||||
defer utilfeaturetesting.SetFeatureGateDuringTest(
|
defer utilfeaturetesting.SetFeatureGateDuringTest(
|
||||||
|
@ -162,7 +163,7 @@ func initTestSchedulerWithOptions(
|
||||||
features.EnableEquivalenceClassCache, true)()
|
features.EnableEquivalenceClassCache, true)()
|
||||||
|
|
||||||
// 1. Create scheduler
|
// 1. Create scheduler
|
||||||
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, time.Second)
|
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod)
|
||||||
|
|
||||||
var podInformer coreinformers.PodInformer
|
var podInformer coreinformers.PodInformer
|
||||||
|
|
||||||
|
@ -254,7 +255,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
|
||||||
// configuration but with pod preemption disabled.
|
// configuration but with pod preemption disabled.
|
||||||
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
|
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
|
||||||
return initTestSchedulerWithOptions(
|
return initTestSchedulerWithOptions(
|
||||||
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true)
|
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanupTest deletes the scheduler and the test namespace. It should be called
|
// cleanupTest deletes the scheduler and the test namespace. It should be called
|
||||||
|
|
|
@ -34,8 +34,12 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/util/rand"
|
"k8s.io/apimachinery/pkg/util/rand"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||||
|
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
|
||||||
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testConfig struct {
|
type testConfig struct {
|
||||||
|
@ -63,6 +67,7 @@ const (
|
||||||
podLimit = 100
|
podLimit = 100
|
||||||
volsPerPod = 5
|
volsPerPod = 5
|
||||||
nodeAffinityLabelKey = "kubernetes.io/hostname"
|
nodeAffinityLabelKey = "kubernetes.io/hostname"
|
||||||
|
provisionerPluginName = "kubernetes.io/mock-provisioner"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testPV struct {
|
type testPV struct {
|
||||||
|
@ -79,7 +84,11 @@ type testPVC struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVolumeBinding(t *testing.T) {
|
func TestVolumeBinding(t *testing.T) {
|
||||||
config := setupCluster(t, "volume-scheduling", 2)
|
features := map[string]bool{
|
||||||
|
"VolumeScheduling": true,
|
||||||
|
"PersistentLocalVolumes": true,
|
||||||
|
}
|
||||||
|
config := setupCluster(t, "volume-scheduling", 2, features, 0)
|
||||||
defer config.teardown()
|
defer config.teardown()
|
||||||
|
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
|
@ -246,9 +255,124 @@ func TestVolumeBinding(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestVolumeBindingRescheduling tests scheduler will retry scheduling when needed.
|
||||||
|
func TestVolumeBindingRescheduling(t *testing.T) {
|
||||||
|
features := map[string]bool{
|
||||||
|
"VolumeScheduling": true,
|
||||||
|
"PersistentLocalVolumes": true,
|
||||||
|
"DynamicProvisioningScheduling": true,
|
||||||
|
}
|
||||||
|
config := setupCluster(t, "volume-scheduling", 2, features, 0)
|
||||||
|
defer config.teardown()
|
||||||
|
|
||||||
|
storageClassName := "local-storage"
|
||||||
|
|
||||||
|
cases := map[string]struct {
|
||||||
|
pod *v1.Pod
|
||||||
|
pvcs []*testPVC
|
||||||
|
trigger func(config *testConfig)
|
||||||
|
shouldFail bool
|
||||||
|
}{
|
||||||
|
"reschedule on WaitForFirstConsumer dynamic storage class add": {
|
||||||
|
pod: makePod("pod-reschedule-onclassadd-dynamic", config.ns, []string{"pvc-reschedule-onclassadd-dynamic"}),
|
||||||
|
pvcs: []*testPVC{
|
||||||
|
{"pvc-reschedule-onclassadd-dynamic", "", ""},
|
||||||
|
},
|
||||||
|
trigger: func(config *testConfig) {
|
||||||
|
sc := makeDynamicProvisionerStorageClass(storageClassName, &modeWait)
|
||||||
|
if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil {
|
||||||
|
t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
shouldFail: false,
|
||||||
|
},
|
||||||
|
"reschedule on WaitForFirstConsumer static storage class add": {
|
||||||
|
pod: makePod("pod-reschedule-onclassadd-static", config.ns, []string{"pvc-reschedule-onclassadd-static"}),
|
||||||
|
pvcs: []*testPVC{
|
||||||
|
{"pvc-reschedule-onclassadd-static", "", ""},
|
||||||
|
},
|
||||||
|
trigger: func(config *testConfig) {
|
||||||
|
sc := makeStorageClass(storageClassName, &modeWait)
|
||||||
|
if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil {
|
||||||
|
t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
|
||||||
|
}
|
||||||
|
// Create pv for this class to mock static provisioner behavior.
|
||||||
|
pv := makePV("pv-reschedule-onclassadd-static", storageClassName, "", "", node1)
|
||||||
|
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
|
||||||
|
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
shouldFail: false,
|
||||||
|
},
|
||||||
|
// TODO test rescheduling on PVC add/update
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, test := range cases {
|
||||||
|
glog.Infof("Running test %v", name)
|
||||||
|
|
||||||
|
if test.pod == nil {
|
||||||
|
t.Fatal("pod is required for this test")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create unbound pvc
|
||||||
|
for _, pvcConfig := range test.pvcs {
|
||||||
|
pvc := makePVC(pvcConfig.name, config.ns, &storageClassName, "")
|
||||||
|
if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil {
|
||||||
|
t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create pod
|
||||||
|
if _, err := config.client.CoreV1().Pods(config.ns).Create(test.pod); err != nil {
|
||||||
|
t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for pod is unschedulable.
|
||||||
|
glog.Infof("Waiting for pod is unschedulable")
|
||||||
|
if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
|
||||||
|
t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger
|
||||||
|
test.trigger(config)
|
||||||
|
|
||||||
|
// Wait for pod is scheduled or unscheduable.
|
||||||
|
if !test.shouldFail {
|
||||||
|
glog.Infof("Waiting for pod is scheduled")
|
||||||
|
if err := waitForPodToSchedule(config.client, test.pod); err != nil {
|
||||||
|
t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Infof("Waiting for pod is unschedulable")
|
||||||
|
if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
|
||||||
|
t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force delete objects, but they still may not be immediately removed
|
||||||
|
deleteTestObjects(config.client, config.ns, deleteOption)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestVolumeBindingStress creates <podLimit> pods, each with <volsPerPod> unbound PVCs.
|
// TestVolumeBindingStress creates <podLimit> pods, each with <volsPerPod> unbound PVCs.
|
||||||
func TestVolumeBindingStress(t *testing.T) {
|
func TestVolumeBindingStress(t *testing.T) {
|
||||||
config := setupCluster(t, "volume-binding-stress", 1)
|
testVolumeBindingStress(t, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Like TestVolumeBindingStress but with scheduler resync. In real cluster,
|
||||||
|
// scheduler will schedule failed pod frequently due to various events, e.g.
|
||||||
|
// service/node update events.
|
||||||
|
// This is useful to detect possible race conditions.
|
||||||
|
func TestVolumeBindingStressWithSchedulerResync(t *testing.T) {
|
||||||
|
testVolumeBindingStress(t, time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) {
|
||||||
|
features := map[string]bool{
|
||||||
|
"VolumeScheduling": true,
|
||||||
|
"PersistentLocalVolumes": true,
|
||||||
|
}
|
||||||
|
config := setupCluster(t, "volume-binding-stress", 1, features, schedulerResyncPeriod)
|
||||||
defer config.teardown()
|
defer config.teardown()
|
||||||
|
|
||||||
// Create enough PVs and PVCs for all the pods
|
// Create enough PVs and PVCs for all the pods
|
||||||
|
@ -303,7 +427,11 @@ func TestVolumeBindingStress(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPVAffinityConflict(t *testing.T) {
|
func TestPVAffinityConflict(t *testing.T) {
|
||||||
config := setupCluster(t, "volume-scheduling", 3)
|
features := map[string]bool{
|
||||||
|
"VolumeScheduling": true,
|
||||||
|
"PersistentLocalVolumes": true,
|
||||||
|
}
|
||||||
|
config := setupCluster(t, "volume-scheduling", 3, features, 0)
|
||||||
defer config.teardown()
|
defer config.teardown()
|
||||||
|
|
||||||
pv := makePV("local-pv", classImmediate, "", "", node1)
|
pv := makePV("local-pv", classImmediate, "", "", node1)
|
||||||
|
@ -361,30 +489,51 @@ func TestPVAffinityConflict(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
|
func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration) *testConfig {
|
||||||
// Enable feature gates
|
oldFeatures := make(map[string]bool, len(features))
|
||||||
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true,PersistentLocalVolumes=true")
|
for feature := range features {
|
||||||
|
oldFeatures[feature] = utilfeature.DefaultFeatureGate.Enabled(utilfeature.Feature(feature))
|
||||||
|
}
|
||||||
|
// Set feature gates
|
||||||
|
utilfeature.DefaultFeatureGate.SetFromMap(features)
|
||||||
|
|
||||||
controllerCh := make(chan struct{})
|
controllerCh := make(chan struct{})
|
||||||
|
|
||||||
context := initTestScheduler(t, initTestMaster(t, nsName, nil), controllerCh, false, nil)
|
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, resyncPeriod)
|
||||||
|
|
||||||
clientset := context.clientSet
|
clientset := context.clientSet
|
||||||
ns := context.ns.Name
|
ns := context.ns.Name
|
||||||
informers := context.informerFactory
|
// Informers factory for controllers, we disable resync period for testing.
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(context.clientSet, 0)
|
||||||
|
|
||||||
// Start PV controller for volume binding.
|
// Start PV controller for volume binding.
|
||||||
|
host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)
|
||||||
|
plugin := &volumetest.FakeVolumePlugin{
|
||||||
|
PluginName: provisionerPluginName,
|
||||||
|
Host: host,
|
||||||
|
Config: volume.VolumeConfig{},
|
||||||
|
LastProvisionerOptions: volume.VolumeOptions{},
|
||||||
|
NewAttacherCallCount: 0,
|
||||||
|
NewDetacherCallCount: 0,
|
||||||
|
Mounters: nil,
|
||||||
|
Unmounters: nil,
|
||||||
|
Attachers: nil,
|
||||||
|
Detachers: nil,
|
||||||
|
}
|
||||||
|
plugins := []volume.VolumePlugin{plugin}
|
||||||
|
|
||||||
|
controllerOptions := persistentvolumeoptions.NewPersistentVolumeControllerOptions()
|
||||||
params := persistentvolume.ControllerParameters{
|
params := persistentvolume.ControllerParameters{
|
||||||
KubeClient: clientset,
|
KubeClient: clientset,
|
||||||
SyncPeriod: time.Hour, // test shouldn't need to resync
|
SyncPeriod: controllerOptions.PVClaimBinderSyncPeriod,
|
||||||
VolumePlugins: nil, // TODO; need later for dynamic provisioning
|
VolumePlugins: plugins,
|
||||||
Cloud: nil,
|
Cloud: nil,
|
||||||
ClusterName: "volume-test-cluster",
|
ClusterName: "volume-test-cluster",
|
||||||
VolumeInformer: informers.Core().V1().PersistentVolumes(),
|
VolumeInformer: informerFactory.Core().V1().PersistentVolumes(),
|
||||||
ClaimInformer: informers.Core().V1().PersistentVolumeClaims(),
|
ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||||
ClassInformer: informers.Storage().V1().StorageClasses(),
|
ClassInformer: informerFactory.Storage().V1().StorageClasses(),
|
||||||
PodInformer: informers.Core().V1().Pods(),
|
PodInformer: informerFactory.Core().V1().Pods(),
|
||||||
NodeInformer: informers.Core().V1().Nodes(),
|
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||||
EnableDynamicProvisioning: true,
|
EnableDynamicProvisioning: true,
|
||||||
}
|
}
|
||||||
ctrl, err := persistentvolume.NewController(params)
|
ctrl, err := persistentvolume.NewController(params)
|
||||||
|
@ -392,6 +541,9 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
|
||||||
t.Fatalf("Failed to create PV controller: %v", err)
|
t.Fatalf("Failed to create PV controller: %v", err)
|
||||||
}
|
}
|
||||||
go ctrl.Run(controllerCh)
|
go ctrl.Run(controllerCh)
|
||||||
|
// Start informer factory after all controllers are configured and running.
|
||||||
|
informerFactory.Start(controllerCh)
|
||||||
|
informerFactory.WaitForCacheSync(controllerCh)
|
||||||
|
|
||||||
// Create shared objects
|
// Create shared objects
|
||||||
// Create nodes
|
// Create nodes
|
||||||
|
@ -439,7 +591,8 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
|
||||||
teardown: func() {
|
teardown: func() {
|
||||||
deleteTestObjects(clientset, ns, nil)
|
deleteTestObjects(clientset, ns, nil)
|
||||||
cleanupTest(t, context)
|
cleanupTest(t, context)
|
||||||
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false,LocalPersistentVolumes=false")
|
// Restore feature gates
|
||||||
|
utilfeature.DefaultFeatureGate.SetFromMap(oldFeatures)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -461,6 +614,16 @@ func makeStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeDynamicProvisionerStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1.StorageClass {
|
||||||
|
return &storagev1.StorageClass{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Provisioner: provisionerPluginName,
|
||||||
|
VolumeBindingMode: mode,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume {
|
func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume {
|
||||||
pv := &v1.PersistentVolume{
|
pv := &v1.PersistentVolume{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
|
Loading…
Reference in New Issue