Merge pull request #70290 from tossmilestone/scheduler-test-refactor

Refactor scheduler_test.go to use Clientset
pull/58/head
k8s-ci-robot 2018-10-29 22:07:52 -07:00 committed by GitHub
commit 2f175c1b41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 29 deletions

View File

@ -54,12 +54,12 @@ go_test(
"//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/fake:go_default_library", "//pkg/scheduler/internal/cache/fake:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
@ -67,6 +67,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
], ],

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
@ -34,6 +35,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
corelister "k8s.io/client-go/listers/core/v1"
clientcache "k8s.io/client-go/tools/cache" clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
@ -47,7 +49,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/factory" "k8s.io/kubernetes/pkg/scheduler/factory"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/volumebinder" "k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
@ -81,12 +82,20 @@ func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
return nil return nil
} }
type nodeLister struct {
corelister.NodeLister
}
func (n *nodeLister) List() ([]*v1.Node, error) {
return n.NodeLister.List(labels.Everything())
}
func podWithID(id, desiredHost string) *v1.Pod { func podWithID(id, desiredHost string) *v1.Pod {
return &v1.Pod{ return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: id, Name: id,
UID: types.UID(id), UID: types.UID(id),
SelfLink: schedulertesting.Test.SelfLink(string(v1.ResourcePods), id), SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
}, },
Spec: v1.PodSpec{ Spec: v1.PodSpec{
NodeName: desiredHost, NodeName: desiredHost,
@ -100,8 +109,8 @@ func deletingPod(id string) *v1.Pod {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: id, Name: id,
UID: types.UID(id), UID: types.UID(id),
SelfLink: schedulertesting.Test.SelfLink(string(v1.ResourcePods), id),
DeletionTimestamp: &deletionTimestamp, DeletionTimestamp: &deletionTimestamp,
SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
}, },
Spec: v1.PodSpec{ Spec: v1.PodSpec{
NodeName: "", NodeName: "",
@ -239,6 +248,15 @@ func TestScheduler(t *testing.T) {
}, },
} }
stop := make(chan struct{})
defer close(stop)
client := clientsetfake.NewSimpleClientset(&testNode)
informerFactory := informers.NewSharedInformerFactory(client, 0)
nl := informerFactory.Core().V1().Nodes().Lister()
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
for _, item := range table { for _, item := range table {
t.Run(item.name, func(t *testing.T) { t.Run(item.name, func(t *testing.T) {
var gotError error var gotError error
@ -256,10 +274,8 @@ func TestScheduler(t *testing.T) {
gotAssumedPod = pod gotAssumedPod = pod
}, },
}, },
NodeLister: schedulertesting.FakeNodeLister( NodeLister: &nodeLister{nl},
[]*v1.Node{&testNode}, Algorithm: item.algo,
),
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) factory.Binder { GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error { return fakeBinder{func(b *v1.Binding) error {
gotBinding = b gotBinding = b
@ -317,9 +333,10 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
pod := podWithPort("pod.Name", "", 8080) pod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node) scache.AddNode(&node)
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node}) client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, pod, &node) scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, pod, &node)
waitPodExpireChan := make(chan struct{}) waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{}) timeout := make(chan struct{})
@ -375,9 +392,10 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
firstPod := podWithPort("pod.Name", "", 8080) firstPod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node) scache.AddNode(&node)
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node}) client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, firstPod, &node) scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, firstPod, &node)
// We use conflicted pod ports to incur fit predicate failure. // We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080) secondPod := podWithPort("bar", "", 8080)
@ -463,11 +481,16 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node) scache.AddNode(&node)
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node}) client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry( scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
queuedPodStore, scache, nodeLister, predicateMap, stop, test.BindingDuration) queuedPodStore, scache, informerFactory, predicateMap, stop, test.BindingDuration)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
scheduler.Run() scheduler.Run()
queuedPodStore.Add(firstPod) queuedPodStore.Add(firstPod)
queuedPodStore.Add(conflictPod) queuedPodStore.Add(conflictPod)
@ -495,9 +518,12 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
// queuedPodStore: pods queued before processing. // queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods. // cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache,
nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) { informerFactory informers.SharedInformerFactory, stop chan struct{}, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil) scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
queuedPodStore.Add(pod) queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080] // queuedPodStore: [foo:8080]
@ -540,7 +566,8 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
}) })
// create several nodes which cannot schedule the above pod // create several nodes which cannot schedule the above pod
nodes := []*v1.Node{} var nodes []*v1.Node
var objects []runtime.Object
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
uid := fmt.Sprintf("machine%v", i) uid := fmt.Sprintf("machine%v", i)
node := v1.Node{ node := v1.Node{
@ -559,8 +586,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
} }
scache.AddNode(&node) scache.AddNode(&node)
nodes = append(nodes, &node) nodes = append(nodes, &node)
objects = append(objects, &node)
} }
nodeLister := schedulertesting.FakeNodeLister(nodes) client := clientsetfake.NewSimpleClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{ predicateMap := map[string]algorithm.FitPredicate{
"PodFitsResources": predicates.PodFitsResources, "PodFitsResources": predicates.PodFitsResources,
} }
@ -573,7 +602,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100), predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100),
} }
} }
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil) scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
queuedPodStore.Add(podWithTooBigResourceRequests) queuedPodStore.Add(podWithTooBigResourceRequests)
scheduler.scheduleOne() scheduler.scheduleOne()
@ -597,7 +629,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing. // queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods. // scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
scache, scache,
nil, nil,
@ -608,8 +640,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
algorithm.EmptyPriorityMetadataProducer, algorithm.EmptyPriorityMetadataProducer,
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
nil, nil,
schedulertesting.FakePersistentVolumeClaimLister{}, informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
schedulertesting.FakePDBLister{}, informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false, false,
false, false,
api.DefaultPercentageOfNodesToScore) api.DefaultPercentageOfNodesToScore)
@ -618,7 +650,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
configurator := &FakeConfigurator{ configurator := &FakeConfigurator{
Config: &factory.Config{ Config: &factory.Config{
SchedulerCache: scache, SchedulerCache: scache,
NodeLister: nodeLister, NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
Algorithm: algo, Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder { GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error { return fakeBinder{func(b *v1.Binding) error {
@ -648,7 +680,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
return sched, bindingChan, errChan return sched, bindingChan, errChan
} }
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
scache, scache,
nil, nil,
@ -659,8 +691,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algorithm.EmptyPriorityMetadataProducer, algorithm.EmptyPriorityMetadataProducer,
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
nil, nil,
schedulertesting.FakePersistentVolumeClaimLister{}, informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
schedulertesting.FakePDBLister{}, informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false, false,
false, false,
api.DefaultPercentageOfNodesToScore) api.DefaultPercentageOfNodesToScore)
@ -668,7 +700,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
configurator := &FakeConfigurator{ configurator := &FakeConfigurator{
Config: &factory.Config{ Config: &factory.Config{
SchedulerCache: scache, SchedulerCache: scache,
NodeLister: nodeLister, NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
Algorithm: algo, Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder { GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error { return fakeBinder{func(b *v1.Binding) error {
@ -701,18 +733,21 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&testNode})
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
queuedPodStore.Add(podWithID("foo", "")) queuedPodStore.Add(podWithID("foo", ""))
scache := schedulerinternalcache.New(10*time.Minute, stop) scache := schedulerinternalcache.New(10*time.Minute, stop)
scache.AddNode(&testNode) scache.AddNode(&testNode)
client := clientsetfake.NewSimpleClientset(&testNode)
informerFactory := informers.NewSharedInformerFactory(client, 0)
predicateMap := map[string]algorithm.FitPredicate{ predicateMap := map[string]algorithm.FitPredicate{
predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder), predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
} }
recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}) recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, recorder) s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
s.config.VolumeBinder = fakeVolumeBinder s.config.VolumeBinder = fakeVolumeBinder
return s, bindingChan, errChan return s, bindingChan, errChan
} }