diff --git a/test/integration/framework/util.go b/test/integration/framework/util.go new file mode 100644 index 0000000000..1826da257f --- /dev/null +++ b/test/integration/framework/util.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "strings" + + clientset "k8s.io/client-go/kubernetes" +) + +const ( + // When these values are updated, also update cmd/kubelet/app/options/options.go + // A copy of these values exist in e2e/framework/util.go. + currentPodInfraContainerImageName = "gcr.io/google_containers/pause" + currentPodInfraContainerImageVersion = "3.0" +) + +// GetServerArchitecture fetches the architecture of the cluster's apiserver. +func GetServerArchitecture(c clientset.Interface) string { + arch := "" + sVer, err := c.Discovery().ServerVersion() + if err != nil || sVer.Platform == "" { + // If we failed to get the server version for some reason, default to amd64. + arch = "amd64" + } else { + // Split the platform string into OS and Arch separately. + // The platform string may for example be "linux/amd64", "linux/arm" or "windows/amd64". + osArchArray := strings.Split(sVer.Platform, "/") + arch = osArchArray[1] + } + return arch +} + +// GetPauseImageName fetches the pause image name for the same architecture as the apiserver. +func GetPauseImageName(c clientset.Interface) string { + return currentPodInfraContainerImageName + "-" + GetServerArchitecture(c) + ":" + currentPodInfraContainerImageVersion +} diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 414cf45b4e..3dc70d1abf 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -25,7 +25,6 @@ import ( "k8s.io/api/core/v1" clientv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -46,7 +45,6 @@ import ( schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/factory" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" - e2e "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/integration/framework" ) @@ -134,10 +132,10 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { informerFactory.Core().V1().Services(), eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}), ) - if err != nil { t.Fatalf("Error creating scheduler: %v", err) } + defer close(sched.Config().StopEverything) // Verify that the config is applied correctly. schedPredicates := sched.Config().Algorithm.Predicates() @@ -152,8 +150,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { if schedPrioritizers[1].Function == nil || schedPrioritizers[1].Weight != 5 { t.Errorf("Unexpected prioritizer: func: %v, weight: %v", schedPrioritizers[1].Function, schedPrioritizers[1].Weight) } - - defer close(sched.Config().StopEverything) } // TestSchedulerCreationFromNonExistentConfigMap ensures that creation of the @@ -203,11 +199,11 @@ func TestSchedulerCreationInLegacyMode(t *testing.T) { defer framework.DeleteTestingNamespace(ns, s, t) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) - defer clientSet.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{}) + defer clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) informerFactory := informers.NewSharedInformerFactory(clientSet, 0) eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")}) + eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.CoreV1().RESTClient()).Events("")}) ss := options.NewSchedulerServer() ss.HardPodAffinitySymmetricWeight = v1.DefaultHardPodAffinitySymmetricWeight @@ -225,107 +221,37 @@ func TestSchedulerCreationInLegacyMode(t *testing.T) { informerFactory.Core().V1().Services(), eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}), ) - if err != nil { t.Fatalf("Creation of scheduler in legacy mode failed: %v", err) } - informerFactory.Start(sched.Config().StopEverything) defer close(sched.Config().StopEverything) - sched.Run() - DoTestUnschedulableNodes(t, clientSet, ns, informerFactory.Core().V1().Nodes().Lister()) + + _, err = createNode(clientSet, "test-node") + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + pod, err := createDefaultPausePod(clientSet, "test-pod", "configmap") + if err != nil { + t.Fatalf("Failed to create pod: %v", err) + } + err = waitForPodToSchedule(clientSet, pod) + if err != nil { + t.Errorf("Failed to schedule a pod: %v", err) + } else { + t.Logf("Pod got scheduled on a node.") + } } func TestUnschedulableNodes(t *testing.T) { - _, s, closeFn := framework.RunAMaster(nil) - defer closeFn() + context := initTest(t, "unschedulable-nodes") + defer cleanupTest(t, context) - ns := framework.CreateTestingNamespace("unschedulable-nodes", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) - - clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) - informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - - schedulerConfigFactory := factory.NewConfigFactory( - v1.DefaultSchedulerName, - clientSet, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - v1.DefaultHardPodAffinitySymmetricWeight, - enableEquivalenceCache, - ) - schedulerConfig, err := schedulerConfigFactory.Create() - if err != nil { - t.Fatalf("Couldn't create scheduler config: %v", err) - } - eventBroadcaster := record.NewBroadcaster() - schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}) - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")}) - informerFactory.Start(schedulerConfig.StopEverything) - sched, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig}, nil...) - sched.Run() - defer close(schedulerConfig.StopEverything) - - DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.GetNodeLister()) -} - -func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.Core().Pods(podNamespace).Get(podName, metav1.GetOptions{}) - if errors.IsNotFound(err) { - return false, nil - } - if err != nil { - // This could be a connection error so we want to retry. - return false, nil - } - if pod.Spec.NodeName == "" { - return false, nil - } - return true, nil - } -} - -// Wait till the passFunc confirms that the object it expects to see is in the store. -// Used to observe reflected events. -func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, passFunc func(n interface{}) bool) error { - nodes := []*v1.Node{} - err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { - n, err := nodeLister.Get(key) - - switch { - case err == nil && passFunc(n): - return true, nil - case errors.IsNotFound(err): - nodes = append(nodes, nil) - case err != nil: - t.Errorf("Unexpected error: %v", err) - default: - nodes = append(nodes, n) - } - - return false, nil - }) - if err != nil { - t.Logf("Logging consecutive node versions received from store:") - for i, n := range nodes { - t.Logf("%d: %#v", i, n) - } - } - return err -} - -func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Namespace, nodeLister corelisters.NodeLister) { + nodeLister := context.schedulerConfigFactory.GetNodeLister() // NOTE: This test cannot run in parallel, because it is creating and deleting // non-namespaced objects (Nodes). - defer cs.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{}) + defer context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) goodCondition := v1.NodeCondition{ Type: v1.NodeReady, @@ -370,7 +296,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names { makeUnSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) { n.Spec.Unschedulable = true - if _, err := c.Core().Nodes().Update(n); err != nil { + if _, err := c.CoreV1().Nodes().Update(n); err != nil { t.Fatalf("Failed to update node with unschedulable=true: %v", err) } err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool { @@ -386,7 +312,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names }, makeSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) { n.Spec.Unschedulable = false - if _, err := c.Core().Nodes().Update(n); err != nil { + if _, err := c.CoreV1().Nodes().Update(n); err != nil { t.Fatalf("Failed to update node with unschedulable=false: %v", err) } err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool { @@ -406,7 +332,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names }, Conditions: []v1.NodeCondition{badCondition}, } - if _, err = c.Core().Nodes().UpdateStatus(n); err != nil { + if _, err = c.CoreV1().Nodes().UpdateStatus(n); err != nil { t.Fatalf("Failed to update node with bad status condition: %v", err) } err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool { @@ -423,7 +349,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names }, Conditions: []v1.NodeCondition{goodCondition}, } - if _, err = c.Core().Nodes().UpdateStatus(n); err != nil { + if _, err = c.CoreV1().Nodes().UpdateStatus(n); err != nil { t.Fatalf("Failed to update node with healthy status condition: %v", err) } err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool { @@ -437,29 +363,23 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names } for i, mod := range nodeModifications { - unSchedNode, err := cs.Core().Nodes().Create(node) + unSchedNode, err := context.clientSet.Core().Nodes().Create(node) if err != nil { t.Fatalf("Failed to create node: %v", err) } // Apply the unschedulable modification to the node, and wait for the reflection - mod.makeUnSchedulable(t, unSchedNode, nodeLister, cs) + mod.makeUnSchedulable(t, unSchedNode, nodeLister, context.clientSet) // Create the new pod, note that this needs to happen post unschedulable // modification or we have a race in the test. - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "node-scheduling-test-pod"}, - Spec: v1.PodSpec{ - Containers: []v1.Container{{Name: "container", Image: e2e.GetPauseImageName(cs)}}, - }, - } - myPod, err := cs.Core().Pods(ns.Name).Create(pod) + myPod, err := createDefaultPausePod(context.clientSet, "node-scheduling-test-pod", context.ns.Name) if err != nil { t.Fatalf("Failed to create pod: %v", err) } // There are no schedulable nodes - the pod shouldn't be scheduled. - err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(cs, myPod.Namespace, myPod.Name)) + err = waitForPodToSchedule(context.clientSet, myPod) if err == nil { t.Errorf("Pod scheduled successfully on unschedulable nodes") } @@ -470,25 +390,23 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names } // Apply the schedulable modification to the node, and wait for the reflection - schedNode, err := cs.Core().Nodes().Get(unSchedNode.Name, metav1.GetOptions{}) + schedNode, err := context.clientSet.CoreV1().Nodes().Get(unSchedNode.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to get node: %v", err) } - mod.makeSchedulable(t, schedNode, nodeLister, cs) + mod.makeSchedulable(t, schedNode, nodeLister, context.clientSet) // Wait until the pod is scheduled. - err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(cs, myPod.Namespace, myPod.Name)) - if err != nil { + if err := waitForPodToSchedule(context.clientSet, myPod); err != nil { t.Errorf("Test %d: failed to schedule a pod: %v", i, err) } else { t.Logf("Test %d: Pod got scheduled on a schedulable node", i) } - - err = cs.Core().Pods(ns.Name).Delete(myPod.Name, metav1.NewDeleteOptions(0)) - if err != nil { + // Clean up. + if err := deletePod(context.clientSet, myPod.Name, myPod.Namespace); err != nil { t.Errorf("Failed to delete pod: %v", err) } - err = cs.Core().Nodes().Delete(schedNode.Name, nil) + err = context.clientSet.CoreV1().Nodes().Delete(schedNode.Name, nil) if err != nil { t.Errorf("Failed to delete node: %v", err) } @@ -496,14 +414,6 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names } func TestMultiScheduler(t *testing.T) { - _, s, _ := framework.RunAMaster(nil) - // TODO: Uncomment when fix #19254 - // This seems to be a different issue - it still doesn't work. - // defer s.Close() - - ns := framework.CreateTestingNamespace("multi-scheduler", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) - /* This integration tests the multi-scheduler feature in the following way: 1. create a default scheduler @@ -523,39 +433,10 @@ func TestMultiScheduler(t *testing.T) { 9. **check point-3**: - testPodNoAnnotation2 and testPodWithAnnotationFitsDefault2 should NOT be scheduled */ + // 1. create and start default-scheduler - clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) - - // NOTE: This test cannot run in parallel, because it is creating and deleting - // non-namespaced objects (Nodes). - defer clientSet.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{}) - - informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - schedulerConfigFactory := factory.NewConfigFactory( - v1.DefaultSchedulerName, - clientSet, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - v1.DefaultHardPodAffinitySymmetricWeight, - enableEquivalenceCache, - ) - schedulerConfig, err := schedulerConfigFactory.Create() - if err != nil { - t.Fatalf("Couldn't create scheduler config: %v", err) - } - eventBroadcaster := record.NewBroadcaster() - schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}) - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")}) - informerFactory.Start(schedulerConfig.StopEverything) - sched, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig}, nil...) - sched.Run() - // default-scheduler will be stopped later + context := initTest(t, "multi-scheduler") + defer cleanupTest(t, context) // 2. create a node node := &v1.Node{ @@ -567,25 +448,22 @@ func TestMultiScheduler(t *testing.T) { }, }, } - clientSet.Core().Nodes().Create(node) + context.clientSet.Core().Nodes().Create(node) // 3. create 3 pods for testing - podWithoutSchedulerName := createPod(clientSet, "pod-without-scheduler-name", "") - testPod, err := clientSet.Core().Pods(ns.Name).Create(podWithoutSchedulerName) + testPod, err := createDefaultPausePod(context.clientSet, "pod-without-scheduler-name", context.ns.Name) if err != nil { t.Fatalf("Failed to create pod: %v", err) } - schedulerFitsDefault := "default-scheduler" - podFitsDefault := createPod(clientSet, "pod-fits-default", schedulerFitsDefault) - testPodFitsDefault, err := clientSet.Core().Pods(ns.Name).Create(podFitsDefault) + defaultScheduler := "default-scheduler" + testPodFitsDefault, err := createPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-default", Namespace: context.ns.Name, SchedulerName: defaultScheduler}) if err != nil { t.Fatalf("Failed to create pod: %v", err) } - schedulerFitsFoo := "foo-scheduler" - podFitsFoo := createPod(clientSet, "pod-fits-foo", schedulerFitsFoo) - testPodFitsFoo, err := clientSet.Core().Pods(ns.Name).Create(podFitsFoo) + fooScheduler := "foo-scheduler" + testPodFitsFoo, err := createPausePod(context.clientSet, &pausePodConfig{Name: "pod-fits-foo", Namespace: context.ns.Name, SchedulerName: fooScheduler}) if err != nil { t.Fatalf("Failed to create pod: %v", err) } @@ -593,42 +471,39 @@ func TestMultiScheduler(t *testing.T) { // 4. **check point-1**: // - testPod, testPodFitsDefault should be scheduled // - testPodFitsFoo should NOT be scheduled - err = wait.Poll(time.Second, time.Second*5, podScheduled(clientSet, testPod.Namespace, testPod.Name)) - if err != nil { + if err := waitForPodToSchedule(context.clientSet, testPod); err != nil { t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPod.Name, err) } else { t.Logf("Test MultiScheduler: %s Pod scheduled", testPod.Name) } - err = wait.Poll(time.Second, time.Second*5, podScheduled(clientSet, testPodFitsDefault.Namespace, testPodFitsDefault.Name)) - if err != nil { + if err := waitForPodToSchedule(context.clientSet, testPodFitsDefault); err != nil { t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPodFitsDefault.Name, err) } else { t.Logf("Test MultiScheduler: %s Pod scheduled", testPodFitsDefault.Name) } - err = wait.Poll(time.Second, time.Second*5, podScheduled(clientSet, testPodFitsFoo.Namespace, testPodFitsFoo.Name)) - if err == nil { + if err := waitForPodToScheduleWithTimeout(context.clientSet, testPodFitsFoo, time.Second*5); err == nil { t.Errorf("Test MultiScheduler: %s Pod got scheduled, %v", testPodFitsFoo.Name, err) } else { t.Logf("Test MultiScheduler: %s Pod not scheduled", testPodFitsFoo.Name) } // 5. create and start a scheduler with name "foo-scheduler" - clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) - informerFactory2 := informers.NewSharedInformerFactory(clientSet, 0) + clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) + informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0) schedulerConfigFactory2 := factory.NewConfigFactory( - "foo-scheduler", + fooScheduler, clientSet2, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), + informerFactory2.Core().V1().Nodes(), + informerFactory2.Core().V1().Pods(), + informerFactory2.Core().V1().PersistentVolumes(), + informerFactory2.Core().V1().PersistentVolumeClaims(), + informerFactory2.Core().V1().ReplicationControllers(), + informerFactory2.Extensions().V1beta1().ReplicaSets(), + informerFactory2.Apps().V1beta1().StatefulSets(), + informerFactory2.Core().V1().Services(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -637,8 +512,8 @@ func TestMultiScheduler(t *testing.T) { t.Errorf("Couldn't create scheduler config: %v", err) } eventBroadcaster2 := record.NewBroadcaster() - schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(api.Scheme, clientv1.EventSource{Component: "foo-scheduler"}) - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet2.Core().RESTClient()).Events("")}) + schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(api.Scheme, clientv1.EventSource{Component: fooScheduler}) + eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet2.CoreV1().RESTClient()).Events("")}) informerFactory2.Start(schedulerConfig2.StopEverything) sched2, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig2}, nil...) @@ -647,7 +522,7 @@ func TestMultiScheduler(t *testing.T) { // 6. **check point-2**: // - testPodWithAnnotationFitsFoo should be scheduled - err = wait.Poll(time.Second, time.Second*5, podScheduled(clientSet, testPodFitsFoo.Namespace, testPodFitsFoo.Name)) + err = waitForPodToSchedule(context.clientSet, testPodFitsFoo) if err != nil { t.Errorf("Test MultiScheduler: %s Pod not scheduled, %v", testPodFitsFoo.Name, err) } else { @@ -655,12 +530,10 @@ func TestMultiScheduler(t *testing.T) { } // 7. delete the pods that were scheduled by the default scheduler, and stop the default scheduler - err = clientSet.Core().Pods(ns.Name).Delete(testPod.Name, metav1.NewDeleteOptions(0)) - if err != nil { + if err := deletePod(context.clientSet, testPod.Name, context.ns.Name); err != nil { t.Errorf("Failed to delete pod: %v", err) } - err = clientSet.Core().Pods(ns.Name).Delete(testPodFitsDefault.Name, metav1.NewDeleteOptions(0)) - if err != nil { + if err := deletePod(context.clientSet, testPodFitsDefault.Name, context.ns.Name); err != nil { t.Errorf("Failed to delete pod: %v", err) } @@ -703,103 +576,35 @@ func TestMultiScheduler(t *testing.T) { */ } -func createPod(client clientset.Interface, name string, scheduler string) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: name}, - Spec: v1.PodSpec{ - Containers: []v1.Container{{Name: "container", Image: e2e.GetPauseImageName(client)}}, - SchedulerName: scheduler, - }, - } -} - // This test will verify scheduler can work well regardless of whether kubelet is allocatable aware or not. func TestAllocatable(t *testing.T) { - _, s, closeFn := framework.RunAMaster(nil) - defer closeFn() - - ns := framework.CreateTestingNamespace("allocatable", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) - - // 1. create and start default-scheduler - clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) - informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - - // NOTE: This test cannot run in parallel, because it is creating and deleting - // non-namespaced objects (Nodes). - defer clientSet.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{}) - - schedulerConfigFactory := factory.NewConfigFactory( - v1.DefaultSchedulerName, - clientSet, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - v1.DefaultHardPodAffinitySymmetricWeight, - enableEquivalenceCache, - ) - schedulerConfig, err := schedulerConfigFactory.Create() - if err != nil { - t.Fatalf("Couldn't create scheduler config: %v", err) - } - eventBroadcaster := record.NewBroadcaster() - schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}) - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")}) - informerFactory.Start(schedulerConfig.StopEverything) - sched, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig}, nil...) - sched.Run() - // default-scheduler will be stopped later - defer close(schedulerConfig.StopEverything) + context := initTest(t, "allocatable") + defer cleanupTest(t, context) // 2. create a node without allocatable awareness - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "node-allocatable-scheduler-test-node"}, - Spec: v1.NodeSpec{Unschedulable: false}, - Status: v1.NodeStatus{ - Capacity: v1.ResourceList{ - v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), - v1.ResourceCPU: *resource.NewMilliQuantity(30, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(30, resource.BinarySI), - }, - }, + nodeRes := &v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(30, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(30, resource.BinarySI), } - - allocNode, err := clientSet.Core().Nodes().Create(node) + allocNode, err := createNodeWithResource(context.clientSet, "node-allocatable-scheduler-test-node", nodeRes) if err != nil { t.Fatalf("Failed to create node: %v", err) } // 3. create resource pod which requires less than Capacity - podResource := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "pod-test-allocatable"}, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "container", - Image: e2e.GetPauseImageName(clientSet), - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(20, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(20, resource.BinarySI), - }, - }, - }, - }, - }, + podName := "pod-test-allocatable" + podRes := &v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(20, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20, resource.BinarySI), } - - testAllocPod, err := clientSet.Core().Pods(ns.Name).Create(podResource) + testAllocPod, err := createPausePodWithResource(context.clientSet, podName, context.ns.Name, podRes) if err != nil { t.Fatalf("Test allocatable unawareness failed to create pod: %v", err) } // 4. Test: this test pod should be scheduled since api-server will use Capacity as Allocatable - err = wait.Poll(time.Second, time.Second*5, podScheduled(clientSet, testAllocPod.Namespace, testAllocPod.Name)) + err = waitForPodToScheduleWithTimeout(context.clientSet, testAllocPod, time.Second*5) if err != nil { t.Errorf("Test allocatable unawareness: %s Pod not scheduled: %v", testAllocPod.Name, err) } else { @@ -820,24 +625,23 @@ func TestAllocatable(t *testing.T) { }, } - if _, err := clientSet.Core().Nodes().UpdateStatus(allocNode); err != nil { + if _, err := context.clientSet.CoreV1().Nodes().UpdateStatus(allocNode); err != nil { t.Fatalf("Failed to update node with Status.Allocatable: %v", err) } - if err := clientSet.Core().Pods(ns.Name).Delete(podResource.Name, &metav1.DeleteOptions{}); err != nil { - t.Fatalf("Failed to remove first resource pod: %v", err) + if err := deletePod(context.clientSet, testAllocPod.Name, context.ns.Name); err != nil { + t.Fatalf("Failed to remove the first pod: %v", err) } // 6. Make another pod with different name, same resource request - podResource.ObjectMeta.Name = "pod-test-allocatable2" - testAllocPod2, err := clientSet.Core().Pods(ns.Name).Create(podResource) + podName2 := "pod-test-allocatable2" + testAllocPod2, err := createPausePodWithResource(context.clientSet, podName2, context.ns.Name, podRes) if err != nil { t.Fatalf("Test allocatable awareness failed to create pod: %v", err) } // 7. Test: this test pod should not be scheduled since it request more than Allocatable - err = wait.Poll(time.Second, time.Second*5, podScheduled(clientSet, testAllocPod2.Namespace, testAllocPod2.Name)) - if err == nil { + if err := waitForPodToScheduleWithTimeout(context.clientSet, testAllocPod2, time.Second*5); err == nil { t.Errorf("Test allocatable awareness: %s Pod got scheduled unexpectedly, %v", testAllocPod2.Name, err) } else { t.Logf("Test allocatable awareness: %s Pod not scheduled as expected", testAllocPod2.Name) diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go new file mode 100644 index 0000000000..1ba9852035 --- /dev/null +++ b/test/integration/scheduler/util.go @@ -0,0 +1,334 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "testing" + "time" + + "k8s.io/api/core/v1" + clientv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" + informers "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/plugin/pkg/scheduler" + _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" + "k8s.io/kubernetes/plugin/pkg/scheduler/factory" + "k8s.io/kubernetes/test/integration/framework" + + "net/http/httptest" +) + +type TestContext struct { + closeFn framework.CloseFunc + httpServer *httptest.Server + ns *v1.Namespace + clientSet *clientset.Clientset + informerFactory informers.SharedInformerFactory + schedulerConfigFactory scheduler.Configurator + schedulerConfig *scheduler.Config + scheduler *scheduler.Scheduler +} + +// initTest initializes a test environment and creates a scheduler with default +// configuration. +func initTest(t *testing.T, nsPrefix string) *TestContext { + var context TestContext + _, context.httpServer, context.closeFn = framework.RunAMaster(nil) + + context.ns = framework.CreateTestingNamespace(nsPrefix+string(uuid.NewUUID()), context.httpServer, t) + + context.clientSet = clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) + context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, 0) + + context.schedulerConfigFactory = factory.NewConfigFactory( + v1.DefaultSchedulerName, + context.clientSet, + context.informerFactory.Core().V1().Nodes(), + context.informerFactory.Core().V1().Pods(), + context.informerFactory.Core().V1().PersistentVolumes(), + context.informerFactory.Core().V1().PersistentVolumeClaims(), + context.informerFactory.Core().V1().ReplicationControllers(), + context.informerFactory.Extensions().V1beta1().ReplicaSets(), + context.informerFactory.Apps().V1beta1().StatefulSets(), + context.informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + true, + ) + var err error + context.schedulerConfig, err = context.schedulerConfigFactory.Create() + if err != nil { + t.Fatalf("Couldn't create scheduler config: %v", err) + } + eventBroadcaster := record.NewBroadcaster() + context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}) + eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(context.clientSet.CoreV1().RESTClient()).Events("")}) + context.informerFactory.Start(context.schedulerConfig.StopEverything) + context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: context.schedulerConfig}, nil...) + if err != nil { + t.Fatalf("Couldn't create scheduler: %v", err) + } + context.scheduler.Run() + return &context +} + +// cleanupTest deletes the scheduler and the test namespace. It should be called +// at the end of a test. +func cleanupTest(t *testing.T, context *TestContext) { + // Kill the scheduler. + close(context.schedulerConfig.StopEverything) + // Cleanup nodes. + context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) + framework.DeleteTestingNamespace(context.ns, context.httpServer, t) + context.closeFn() +} + +// waitForReflection waits till the passFunc confirms that the object it expects +// to see is in the store. Used to observe reflected events. +func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, passFunc func(n interface{}) bool) error { + nodes := []*v1.Node{} + err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { + n, err := nodeLister.Get(key) + + switch { + case err == nil && passFunc(n): + return true, nil + case errors.IsNotFound(err): + nodes = append(nodes, nil) + case err != nil: + t.Errorf("Unexpected error: %v", err) + default: + nodes = append(nodes, n) + } + + return false, nil + }) + if err != nil { + t.Logf("Logging consecutive node versions received from store:") + for i, n := range nodes { + t.Logf("%d: %#v", i, n) + } + } + return err +} + +// nodeHasLabels returns a function that checks if a node has all the given labels. +func nodeHasLabels(cs clientset.Interface, nodeName string, labels map[string]string) wait.ConditionFunc { + return func() (bool, error) { + node, err := cs.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return false, nil + } + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + for k, v := range labels { + if node.Labels == nil || node.Labels[k] != v { + return false, nil + } + } + return true, nil + } +} + +// waitForNodeLabels waits for the given node to have all the given labels. +func waitForNodeLabels(cs clientset.Interface, nodeName string, labels map[string]string) error { + return wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, nodeHasLabels(cs, nodeName, labels)) +} + +// createNodeWithResource creates a node with the given resource list and +// returns a pointer and error status. +func createNodeWithResource(cs clientset.Interface, name string, res *v1.ResourceList) (*v1.Node, error) { + n := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: v1.NodeSpec{Unschedulable: false}, + Status: v1.NodeStatus{ + Capacity: *res, + }, + } + return cs.CoreV1().Nodes().Create(n) +} + +// createNode creates a node with predefined resources and returns a pointer and +// error status. +func createNode(cs clientset.Interface, name string) (*v1.Node, error) { + res := v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + return createNodeWithResource(cs, name, &res) +} + +// createNodes created `numNodes` nodes. The created node names will be in the +// form of "`prefix`-X" where X is an ordinal. +func createNodes(cs clientset.Interface, prefix string, numNodes int) ([]*v1.Node, error) { + nodes := make([]*v1.Node, numNodes) + for i := 0; i < numNodes; i++ { + nodeName := fmt.Sprintf("%v-%d", prefix, i) + node, err := createNode(cs, nodeName) + if err != nil { + return nodes[:], err + } + nodes[i] = node + } + return nodes[:], nil +} + +type pausePodConfig struct { + Name string + Namespace string + Affinity *v1.Affinity + Annotations, Labels, NodeSelector map[string]string + Resources *v1.ResourceRequirements + Tolerations []v1.Toleration + NodeName string + SchedulerName string +} + +// initPausePod initializes a pod API object from the given config. It is used +// mainly in pod creation process. +func initPausePod(cs clientset.Interface, conf *pausePodConfig) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: conf.Name, + Labels: conf.Labels, + Annotations: conf.Annotations, + }, + Spec: v1.PodSpec{ + NodeSelector: conf.NodeSelector, + Affinity: conf.Affinity, + Containers: []v1.Container{ + { + Name: conf.Name, + Image: framework.GetPauseImageName(cs), + }, + }, + Tolerations: conf.Tolerations, + NodeName: conf.NodeName, + SchedulerName: conf.SchedulerName, + }, + } + if conf.Resources != nil { + pod.Spec.Containers[0].Resources = *conf.Resources + } + return pod +} + +// createPausePod creates a pod with "Pause" image and the given config and +// return its pointer and error status. +func createPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) { + p := initPausePod(cs, conf) + return cs.CoreV1().Pods(conf.Namespace).Create(p) +} + +// createPausePodWithResource creates a pod with "Pause" image and the given +// resources and return its pointer and error status. +func createPausePodWithResource(cs clientset.Interface, podName string, nsName string, res *v1.ResourceList) (*v1.Pod, error) { + conf := pausePodConfig{ + Name: podName, + Namespace: nsName, + Resources: &v1.ResourceRequirements{ + Requests: *res, + }, + } + return createPausePod(cs, &conf) +} + +// createDefaultPausePod creates a pod with "Pause" image and return its pointer +// and error status. +func createDefaultPausePod(cs clientset.Interface, podName string, nsName string) (*v1.Pod, error) { + conf := pausePodConfig{ + Name: podName, + Namespace: nsName, + } + return createPausePod(cs, &conf) +} + +// runPausePod creates a pod with "Pause" image and the given config and waits +// until it is scheduled. It returns its pointer and error status. +func runPausePod(cs clientset.Interface, conf *pausePodConfig) (*v1.Pod, error) { + p := initPausePod(cs, conf) + pod, err := cs.CoreV1().Pods(conf.Namespace).Create(p) + if err != nil { + return nil, fmt.Errorf("Error creating pause pod: %v", err) + } + if err = waitForPodToSchedule(cs, pod); err != nil { + return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err) + } + if pod, err = cs.CoreV1().Pods(conf.Namespace).Get(conf.Name, metav1.GetOptions{}); err != nil { + return pod, fmt.Errorf("Error getting pod %v info: %v", conf.Name, err) + } + return pod, nil +} + +// podScheduled returns true if a node is assigned to the given pod. +func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return false, nil + } + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + if pod.Spec.NodeName == "" { + return false, nil + } + return true, nil + } +} + +// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns +// an error if it does not scheduled within the given timeout. +func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { + return wait.Poll(time.Second, timeout, podScheduled(cs, pod.Namespace, pod.Name)) +} + +// waitForPodToSchedule waits for a pod to get scheduled and returns an error if +// it does not scheduled within the timeout duration (30 seconds). +func waitForPodToSchedule(cs clientset.Interface, pod *v1.Pod) error { + return waitForPodToScheduleWithTimeout(cs, pod, wait.ForeverTestTimeout) +} + +// deletePod deletes the given pod in the given namespace. +func deletePod(cs clientset.Interface, podName string, nsName string) error { + return cs.CoreV1().Pods(nsName).Delete(podName, metav1.NewDeleteOptions(0)) +} + +// printAllPods prints a list of all the pods and their node names. This is used +// for debugging. +func printAllPods(t *testing.T, cs clientset.Interface, nsName string) { + podList, err := cs.CoreV1().Pods(nsName).List(metav1.ListOptions{}) + if err != nil { + t.Logf("Error getting pods: %v", err) + } + for _, pod := range podList.Items { + t.Logf("Pod:\n\tName:%v\n\tNamespace:%v\n\tNode Name:%v\n", pod.Name, pod.Namespace, pod.Spec.NodeName) + } +}