diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 2017f8a476..0121dd4774 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -64,6 +64,7 @@ go_test( "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api/latest:go_default_library", + "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/fake:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 91432afbbc..8357ff75bc 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -150,8 +150,6 @@ type PodPreemptor interface { type Configurator interface { // Exposed for testing GetHardPodAffinitySymmetricWeight() int32 - // Exposed for testing - MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) // Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) @@ -883,7 +881,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) }, NextPod: internalqueue.MakeNextPodFunc(c.podQueue), - Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue), + Error: MakeDefaultErrorFunc(c.client, podBackoff, c.podQueue, c.schedulerCache, c.StopEverything), StopEverything: c.StopEverything, VolumeBinder: c.volumeBinder, SchedulingQueue: c.podQueue, @@ -1061,7 +1059,8 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core } } -func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) { +// MakeDefaultErrorFunc construct a function to handle pod scheduler error +func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue, schedulerCache schedulerinternalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) { if err == core.ErrNoNodesAvailable { klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name) @@ -1073,10 +1072,10 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue nodeName := errStatus.Status().Details.Name // when node is not found, We do not remove the node right away. Trying again to get // the node and if the node is still not found, then remove it from the scheduler cache. - _, err := c.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + _, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) if err != nil && errors.IsNotFound(err) { node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} - c.schedulerCache.RemoveNode(&node) + schedulerCache.RemoveNode(&node) } } } else { @@ -1099,7 +1098,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue // to run on a node, scheduler takes the pod into account when running // predicates for the node. if !util.PodPriorityEnabled() { - if !backoff.TryBackoffAndWait(podID, c.StopEverything) { + if !backoff.TryBackoffAndWait(podID, stopEverything) { klog.Warningf("Request for pod %v already in flight, abandoning", podID) return } @@ -1107,7 +1106,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue // Get the pod again; it may have changed/been scheduled already. getBackoff := initialGetBackoff for { - pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{}) + pod, err := client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{}) if err == nil { if len(pod.Spec.NodeName) == 0 { podQueue.AddUnschedulableIfNotPresent(pod) diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index bed89bed7d..395e6bcf8c 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -254,10 +255,10 @@ func TestDefaultErrorFunc(t *testing.T) { client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}) stopCh := make(chan struct{}) defer close(stopCh) - factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh) queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} + schedulerCache := schedulerinternalcache.New(30*time.Second, stopCh) podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second) - errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue) + errFunc := MakeDefaultErrorFunc(client, podBackoff, queue, schedulerCache, stopCh) errFunc(testPod, nil)