Fix scheduler Pod informers to receive events when pods are scheduled by other schedulers.

pull/8/head
Bobby (Babak) Salamat 2018-04-23 04:17:25 -07:00
parent 719a56fa67
commit a073dfdbd9
3 changed files with 11 additions and 7 deletions

View File

@ -476,7 +476,7 @@ func NewSchedulerServer(config *componentconfig.KubeSchedulerConfiguration, mast
SchedulerName: config.SchedulerName, SchedulerName: config.SchedulerName,
Client: client, Client: client,
InformerFactory: informers.NewSharedInformerFactory(client, 0), InformerFactory: informers.NewSharedInformerFactory(client, 0),
PodInformer: factory.NewPodInformer(client, 0, config.SchedulerName), PodInformer: factory.NewPodInformer(client, 0),
AlgorithmSource: config.AlgorithmSource, AlgorithmSource: config.AlgorithmSource,
HardPodAffinitySymmetricWeight: config.HardPodAffinitySymmetricWeight, HardPodAffinitySymmetricWeight: config.HardPodAffinitySymmetricWeight,
EventClient: eventClient, EventClient: eventClient,

View File

@ -218,10 +218,10 @@ func NewConfigFactory(
FilterFunc: func(obj interface{}) bool { FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) { switch t := obj.(type) {
case *v1.Pod: case *v1.Pod:
return unassignedNonTerminatedPod(t) return unassignedNonTerminatedPod(t) && responsibleForPod(t, schedulerName)
case cache.DeletedFinalStateUnknown: case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok { if pod, ok := t.Obj.(*v1.Pod); ok {
return unassignedNonTerminatedPod(pod) return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, schedulerName)
} }
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false return false
@ -1198,6 +1198,11 @@ func assignedNonTerminatedPod(pod *v1.Pod) bool {
return true return true
} }
// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Spec.SchedulerName
}
// assignedPodLister filters the pods returned from a PodLister to // assignedPodLister filters the pods returned from a PodLister to
// only include those that have a node name set. // only include those that have a node name set.
type assignedPodLister struct { type assignedPodLister struct {
@ -1270,10 +1275,9 @@ func (i *podInformer) Lister() corelisters.PodLister {
} }
// NewPodInformer creates a shared index informer that returns only non-terminal pods. // NewPodInformer creates a shared index informer that returns only non-terminal pods.
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration, schedulerName string) coreinformers.PodInformer { func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer {
selector := fields.ParseSelectorOrDie( selector := fields.ParseSelectorOrDie(
"spec.schedulerName=" + schedulerName + "status.phase!=" + string(v1.PodSucceeded) +
",status.phase!=" + string(v1.PodSucceeded) +
",status.phase!=" + string(v1.PodFailed)) ",status.phase!=" + string(v1.PodFailed))
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector) lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
return &podInformer{ return &podInformer{

View File

@ -164,7 +164,7 @@ func initTestSchedulerWithOptions(
// create independent pod informer if required // create independent pod informer if required
if setPodInformer { if setPodInformer {
podInformer = factory.NewPodInformer(context.clientSet, 12*time.Hour, v1.DefaultSchedulerName) podInformer = factory.NewPodInformer(context.clientSet, 12*time.Hour)
} else { } else {
podInformer = context.informerFactory.Core().V1().Pods() podInformer = context.informerFactory.Core().V1().Pods()
} }