diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 7d39910014..ec16307f7a 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -476,7 +476,7 @@ func NewSchedulerServer(config *componentconfig.KubeSchedulerConfiguration, mast SchedulerName: config.SchedulerName, Client: client, InformerFactory: informers.NewSharedInformerFactory(client, 0), - PodInformer: factory.NewPodInformer(client, 0, config.SchedulerName), + PodInformer: factory.NewPodInformer(client, 0), AlgorithmSource: config.AlgorithmSource, HardPodAffinitySymmetricWeight: config.HardPodAffinitySymmetricWeight, EventClient: eventClient, diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 0a5df6ed64..174c1f384c 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -218,10 +218,10 @@ func NewConfigFactory( FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: - return unassignedNonTerminatedPod(t) + return unassignedNonTerminatedPod(t) && responsibleForPod(t, schedulerName) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { - return unassignedNonTerminatedPod(pod) + return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, schedulerName) } runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) return false @@ -1198,6 +1198,11 @@ func assignedNonTerminatedPod(pod *v1.Pod) bool { return true } +// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler. +func responsibleForPod(pod *v1.Pod, schedulerName string) bool { + return schedulerName == pod.Spec.SchedulerName +} + // assignedPodLister filters the pods returned from a PodLister to // only include those that have a node name set. type assignedPodLister struct { @@ -1270,10 +1275,9 @@ func (i *podInformer) Lister() corelisters.PodLister { } // NewPodInformer creates a shared index informer that returns only non-terminal pods. -func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration, schedulerName string) coreinformers.PodInformer { +func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer { selector := fields.ParseSelectorOrDie( - "spec.schedulerName=" + schedulerName + - ",status.phase!=" + string(v1.PodSucceeded) + + "status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector) return &podInformer{ diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 2d46a6a520..3946796201 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -195,7 +195,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, Client: clientSet, InformerFactory: informerFactory, - PodInformer: factory.NewPodInformer(clientSet, 0, v1.DefaultSchedulerName), + PodInformer: factory.NewPodInformer(clientSet, 0), EventClient: clientSet.CoreV1(), Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), Broadcaster: eventBroadcaster, @@ -254,7 +254,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, Client: clientSet, InformerFactory: informerFactory, - PodInformer: factory.NewPodInformer(clientSet, 0, v1.DefaultSchedulerName), + PodInformer: factory.NewPodInformer(clientSet, 0), EventClient: clientSet.CoreV1(), Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), Broadcaster: eventBroadcaster, @@ -516,7 +516,7 @@ func TestMultiScheduler(t *testing.T) { // 5. create and start a scheduler with name "foo-scheduler" clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0) - podInformer2 := factory.NewPodInformer(context.clientSet, 0, fooScheduler) + podInformer2 := factory.NewPodInformer(context.clientSet, 0) schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2) schedulerConfig2, err := schedulerConfigFactory2.Create() @@ -749,3 +749,96 @@ func TestPDBCache(t *testing.T) { t.Errorf("No PDB was deleted from the cache: %v", err) } } + +// TestSchedulerInformers tests that scheduler receives informer events and updates its cache when +// pods are scheduled by other schedulers. +func TestSchedulerInformers(t *testing.T) { + // Initialize scheduler. + context := initTest(t, "scheduler-informer") + defer cleanupTest(t, context) + cs := context.clientSet + + defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, + } + defaultNodeRes := &v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(500, resource.BinarySI), + } + + type nodeConfig struct { + name string + res *v1.ResourceList + } + + tests := []struct { + description string + nodes []*nodeConfig + existingPods []*v1.Pod + pod *v1.Pod + preemptedPodIndexes map[int]struct{} + }{ + { + description: "Pod cannot be scheduled when node is occupied by pods scheduled by other schedulers", + nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}}, + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "pod1", + Namespace: context.ns.Name, + Resources: defaultPodRes, + Labels: map[string]string{"foo": "bar"}, + NodeName: "node-1", + SchedulerName: "foo-scheduler", + }), + initPausePod(context.clientSet, &pausePodConfig{ + Name: "pod2", + Namespace: context.ns.Name, + Resources: defaultPodRes, + Labels: map[string]string{"foo": "bar"}, + NodeName: "node-1", + SchedulerName: "bar-scheduler", + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "unschedulable-pod", + Namespace: context.ns.Name, + Resources: defaultPodRes, + }), + preemptedPodIndexes: map[int]struct{}{2: {}}, + }, + } + + for _, test := range tests { + for _, nodeConf := range test.nodes { + _, err := createNode(cs, nodeConf.name, nodeConf.res) + if err != nil { + t.Fatalf("Error creating node %v: %v", nodeConf.name, err) + } + } + + pods := make([]*v1.Pod, len(test.existingPods)) + var err error + // Create and run existingPods. + for i, p := range test.existingPods { + if pods[i], err = runPausePod(cs, p); err != nil { + t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err) + } + } + // Create the new "pod". + unschedulable, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating new pod: %v", err) + } + if err := waitForPodUnschedulable(cs, unschedulable); err != nil { + t.Errorf("Pod %v got scheduled: %v", unschedulable.Name, err) + } + + // Cleanup + pods = append(pods, unschedulable) + cleanupPods(cs, t, pods) + cs.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).DeleteCollection(nil, metav1.ListOptions{}) + cs.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{}) + } +} diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 2a383a1fc3..c55e8d9428 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -164,7 +164,7 @@ func initTestSchedulerWithOptions( // create independent pod informer if required if setPodInformer { - podInformer = factory.NewPodInformer(context.clientSet, 12*time.Hour, v1.DefaultSchedulerName) + podInformer = factory.NewPodInformer(context.clientSet, 12*time.Hour) } else { podInformer = context.informerFactory.Core().V1().Pods() }