k3s/vendor/k8s.io/kubernetes/pkg/scheduler/scheduler.go

744 lines
29 KiB
Go
Raw Normal View History

2019-01-12 04:58:27 +00:00
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
2019-12-12 01:27:03 +00:00
"context"
2019-01-12 04:58:27 +00:00
"fmt"
"io/ioutil"
2019-12-12 01:27:03 +00:00
"math/rand"
2019-01-12 04:58:27 +00:00
"os"
2020-08-10 17:43:49 +00:00
"strconv"
2019-01-12 04:58:27 +00:00
"time"
2019-09-27 21:51:53 +00:00
v1 "k8s.io/api/core/v1"
2019-01-12 04:58:27 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
2019-01-12 04:58:27 +00:00
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
2019-12-12 01:27:03 +00:00
"k8s.io/client-go/informers"
2019-01-12 04:58:27 +00:00
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
2019-12-12 01:27:03 +00:00
"k8s.io/client-go/tools/cache"
2020-08-10 17:43:49 +00:00
"k8s.io/klog/v2"
"k8s.io/kube-scheduler/config/v1beta2"
2019-12-12 01:27:03 +00:00
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
2019-12-12 01:27:03 +00:00
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/framework"
2019-12-12 01:27:03 +00:00
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
2020-08-10 17:43:49 +00:00
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
2019-08-30 18:33:25 +00:00
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
2019-09-27 21:51:53 +00:00
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
2019-01-12 04:58:27 +00:00
"k8s.io/kubernetes/pkg/scheduler/metrics"
2020-03-26 21:07:15 +00:00
"k8s.io/kubernetes/pkg/scheduler/profile"
2020-08-10 17:43:49 +00:00
"k8s.io/kubernetes/pkg/scheduler/util"
2019-01-12 04:58:27 +00:00
)
const (
2019-04-07 17:07:55 +00:00
// SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
SchedulerError = "SchedulerError"
2020-03-26 21:07:15 +00:00
// Percentage of plugin metrics to be sampled.
pluginMetricsSamplePercent = 10
2019-01-12 04:58:27 +00:00
)
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
2019-09-27 21:51:53 +00:00
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache internalcache.Cache
Algorithm ScheduleAlgorithm
Extenders []framework.Extender
2019-09-27 21:51:53 +00:00
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
2020-08-10 17:43:49 +00:00
NextPod func() *framework.QueuedPodInfo
2019-09-27 21:51:53 +00:00
// Error is called if there is an error. It is passed the pod in
// question, and the error
2020-08-10 17:43:49 +00:00
Error func(*framework.QueuedPodInfo, error)
2019-09-27 21:51:53 +00:00
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
2019-12-12 01:27:03 +00:00
2020-03-26 21:07:15 +00:00
// Profiles are the scheduling profiles.
Profiles profile.Map
2019-12-12 01:27:03 +00:00
2020-08-10 17:43:49 +00:00
client clientset.Interface
2019-01-12 04:58:27 +00:00
}
type schedulerOptions struct {
componentConfigVersion string
kubeConfig *restclient.Config
legacyPolicySource *schedulerapi.SchedulerPolicySource
2020-03-26 21:07:15 +00:00
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
// Contains out-of-tree plugins to be merged with the in-tree registry.
2020-08-10 17:43:49 +00:00
frameworkOutOfTreeRegistry frameworkruntime.Registry
2020-03-26 21:07:15 +00:00
profiles []schedulerapi.KubeSchedulerProfile
extenders []schedulerapi.Extender
2020-08-10 17:43:49 +00:00
frameworkCapturer FrameworkCapturer
parallelism int32
applyDefaultProfile bool
2019-01-12 04:58:27 +00:00
}
// Option configures a Scheduler
type Option func(*schedulerOptions)
// WithComponentConfigVersion sets the component config version to the
// KubeSchedulerConfiguration version used. The string should be the full
// scheme group/version of the external type we converted from (for example
// "kubescheduler.config.k8s.io/v1beta2")
func WithComponentConfigVersion(apiVersion string) Option {
return func(o *schedulerOptions) {
o.componentConfigVersion = apiVersion
}
}
// WithKubeConfig sets the kube config for Scheduler.
func WithKubeConfig(cfg *restclient.Config) Option {
return func(o *schedulerOptions) {
o.kubeConfig = cfg
}
}
2020-03-26 21:07:15 +00:00
// WithProfiles sets profiles for Scheduler. By default, there is one profile
// with the name "default-scheduler".
func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
2019-01-12 04:58:27 +00:00
return func(o *schedulerOptions) {
2020-03-26 21:07:15 +00:00
o.profiles = p
o.applyDefaultProfile = false
2019-01-12 04:58:27 +00:00
}
}
// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
func WithParallelism(threads int32) Option {
return func(o *schedulerOptions) {
o.parallelism = threads
}
}
// WithLegacyPolicySource sets legacy policy config file source.
func WithLegacyPolicySource(source *schedulerapi.SchedulerPolicySource) Option {
2019-12-12 01:27:03 +00:00
return func(o *schedulerOptions) {
o.legacyPolicySource = source
2019-12-12 01:27:03 +00:00
}
}
2019-01-12 04:58:27 +00:00
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler, the default value is 50
func WithPercentageOfNodesToScore(percentageOfNodesToScore int32) Option {
return func(o *schedulerOptions) {
o.percentageOfNodesToScore = percentageOfNodesToScore
}
}
2019-12-12 01:27:03 +00:00
// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
// will be appended to the default registry.
2020-08-10 17:43:49 +00:00
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
2019-12-12 01:27:03 +00:00
return func(o *schedulerOptions) {
o.frameworkOutOfTreeRegistry = registry
}
}
// WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
o.podInitialBackoffSeconds = podInitialBackoffSeconds
}
}
// WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10
func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
o.podMaxBackoffSeconds = podMaxBackoffSeconds
}
}
2020-03-26 21:07:15 +00:00
// WithExtenders sets extenders for the Scheduler
func WithExtenders(e ...schedulerapi.Extender) Option {
return func(o *schedulerOptions) {
o.extenders = e
}
}
2020-08-10 17:43:49 +00:00
// FrameworkCapturer is used for registering a notify function in building framework.
type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
// WithBuildFrameworkCapturer sets a notify function for getting buildFramework details.
func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
return func(o *schedulerOptions) {
o.frameworkCapturer = fc
}
}
2019-01-12 04:58:27 +00:00
var defaultSchedulerOptions = schedulerOptions{
2020-03-26 21:07:15 +00:00
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
parallelism: int32(parallelize.DefaultParallelism),
// Ideally we would statically set the default profile here, but we can't because
// creating the default profile may require testing feature gates, which may get
// set dynamically in tests. Therefore, we delay creating it until New is actually
// invoked.
applyDefaultProfile: true,
2019-01-12 04:58:27 +00:00
}
// New returns a Scheduler
func New(client clientset.Interface,
2019-12-12 01:27:03 +00:00
informerFactory informers.SharedInformerFactory,
2020-03-26 21:07:15 +00:00
recorderFactory profile.RecorderFactory,
2019-01-12 04:58:27 +00:00
stopCh <-chan struct{},
2019-12-12 01:27:03 +00:00
opts ...Option) (*Scheduler, error) {
stopEverything := stopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
2019-01-12 04:58:27 +00:00
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
2019-12-12 01:27:03 +00:00
if options.applyDefaultProfile {
var versionedCfg v1beta2.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := config.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
options.profiles = cfg.Profiles
}
2019-12-12 01:27:03 +00:00
schedulerCache := internalcache.New(30*time.Second, stopEverything)
2020-03-26 21:07:15 +00:00
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
2019-12-12 01:27:03 +00:00
}
2020-03-26 21:07:15 +00:00
snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
2019-12-12 01:27:03 +00:00
configurator := &Configurator{
componentConfigVersion: options.componentConfigVersion,
2020-03-26 21:07:15 +00:00
client: client,
kubeConfig: options.kubeConfig,
2020-03-26 21:07:15 +00:00
recorderFactory: recorderFactory,
informerFactory: informerFactory,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
percentageOfNodesToScore: options.percentageOfNodesToScore,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
2020-08-10 17:43:49 +00:00
frameworkCapturer: options.frameworkCapturer,
parallellism: options.parallelism,
clusterEventMap: clusterEventMap,
2019-12-12 01:27:03 +00:00
}
2020-02-14 00:18:16 +00:00
metrics.Register()
2019-12-12 01:27:03 +00:00
var sched *Scheduler
if options.legacyPolicySource == nil {
// Create the config from component config
sc, err := configurator.create()
2019-01-12 04:58:27 +00:00
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler: %v", err)
2019-01-12 04:58:27 +00:00
}
2019-12-12 01:27:03 +00:00
sched = sc
} else {
2019-01-12 04:58:27 +00:00
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case options.legacyPolicySource.File != nil:
if err := initPolicyFromFile(options.legacyPolicySource.File.Path, policy); err != nil {
2019-01-12 04:58:27 +00:00
return nil, err
}
case options.legacyPolicySource.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, options.legacyPolicySource.ConfigMap, policy); err != nil {
2019-01-12 04:58:27 +00:00
return nil, err
}
}
2020-03-26 21:07:15 +00:00
// Set extenders on the configurator now that we've decoded the policy
// In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,
// which would have set extenders in the above instantiation of Configurator from CC options)
configurator.extenders = policy.Extenders
sc, err := configurator.createFromPolicy(*policy)
2019-01-12 04:58:27 +00:00
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
2019-12-12 01:27:03 +00:00
sched = sc
2019-01-12 04:58:27 +00:00
}
2019-01-12 04:58:27 +00:00
// Additional tweaks to the config produced by the configurator.
2019-12-12 01:27:03 +00:00
sched.StopEverything = stopEverything
2020-08-10 17:43:49 +00:00
sched.client = client
2019-12-12 01:27:03 +00:00
// Build dynamic client and dynamic informer factory
var dynInformerFactory dynamicinformer.DynamicSharedInformerFactory
// options.kubeConfig can be nil in tests.
if options.kubeConfig != nil {
dynClient := dynamic.NewForConfigOrDie(options.kubeConfig)
dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
}
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
2019-01-12 04:58:27 +00:00
return sched, nil
}
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType)
for evt := range m {
if _, ok := gvkMap[evt.Resource]; ok {
gvkMap[evt.Resource] |= evt.ActionType
} else {
gvkMap[evt.Resource] = evt.ActionType
}
}
return gvkMap
}
2019-01-12 04:58:27 +00:00
// initPolicyFromFile initialize policy from file
func initPolicyFromFile(policyFile string, policy *schedulerapi.Policy) error {
// Use a policy serialized in a file.
_, err := os.Stat(policyFile)
if err != nil {
return fmt.Errorf("missing policy config file %s", policyFile)
}
data, err := ioutil.ReadFile(policyFile)
if err != nil {
return fmt.Errorf("couldn't read policy config: %v", err)
}
2019-12-12 01:27:03 +00:00
err = runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), []byte(data), policy)
2019-01-12 04:58:27 +00:00
if err != nil {
return fmt.Errorf("invalid policy: %v", err)
}
return nil
}
// initPolicyFromConfigMap initialize policy from configMap
2019-12-12 01:27:03 +00:00
func initPolicyFromConfigMap(client clientset.Interface, policyRef *schedulerapi.SchedulerPolicyConfigMapSource, policy *schedulerapi.Policy) error {
2019-01-12 04:58:27 +00:00
// Use a policy serialized in a config map value.
2020-03-26 21:07:15 +00:00
policyConfigMap, err := client.CoreV1().ConfigMaps(policyRef.Namespace).Get(context.TODO(), policyRef.Name, metav1.GetOptions{})
2019-01-12 04:58:27 +00:00
if err != nil {
return fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
}
2019-12-12 01:27:03 +00:00
data, found := policyConfigMap.Data[schedulerapi.SchedulerPolicyConfigMapKey]
2019-01-12 04:58:27 +00:00
if !found {
2019-12-12 01:27:03 +00:00
return fmt.Errorf("missing policy config map value at key %q", schedulerapi.SchedulerPolicyConfigMapKey)
2019-01-12 04:58:27 +00:00
}
2019-12-12 01:27:03 +00:00
err = runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), []byte(data), policy)
2019-01-12 04:58:27 +00:00
if err != nil {
return fmt.Errorf("invalid policy: %v", err)
}
return nil
}
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
2019-12-12 01:27:03 +00:00
func (sched *Scheduler) Run(ctx context.Context) {
2020-03-26 21:07:15 +00:00
sched.SchedulingQueue.Run()
2019-12-12 01:27:03 +00:00
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
2020-03-26 21:07:15 +00:00
sched.SchedulingQueue.Close()
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
// recordSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
2019-12-12 01:27:03 +00:00
sched.Error(podInfo, err)
2020-08-10 17:43:49 +00:00
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
// Here we check for nil only for tests.
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode)
2020-08-10 17:43:49 +00:00
}
2019-12-12 01:27:03 +00:00
pod := podInfo.Pod
msg := truncateMessage(err.Error())
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
2020-08-10 17:43:49 +00:00
if err := updatePod(sched.client, pod, &v1.PodCondition{
2019-04-07 17:07:55 +00:00
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
2020-08-10 17:43:49 +00:00
}, nominatedNode); err != nil {
klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))
2019-09-27 21:51:53 +00:00
}
2019-04-07 17:07:55 +00:00
}
// truncateMessage truncates a message if it hits the NoteLengthLimit.
func truncateMessage(message string) string {
max := validation.NoteLengthLimit
if len(message) <= max {
return message
}
suffix := " ..."
return message[:max-len(suffix)] + suffix
}
2020-08-10 17:43:49 +00:00
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error {
klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
podStatusCopy := pod.Status.DeepCopy()
2020-08-10 17:43:49 +00:00
// NominatedNodeName is updated only if we are trying to set it, and the value is
// different from the existing one.
if !podutil.UpdatePodCondition(podStatusCopy, condition) &&
2020-08-10 17:43:49 +00:00
(len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) {
return nil
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
if nominatedNode != "" {
podStatusCopy.NominatedNodeName = nominatedNode
2019-01-12 04:58:27 +00:00
}
return util.PatchPodStatus(client, pod, podStatusCopy)
2019-01-12 04:58:27 +00:00
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// Optimistically assume that the binding will succeed and send it to apiserver
// in the background.
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
assumed.Spec.NodeName = host
2019-04-07 17:07:55 +00:00
2019-09-27 21:51:53 +00:00
if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
klog.ErrorS(err, "scheduler cache AssumePod failed")
2019-01-12 04:58:27 +00:00
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
2019-09-27 21:51:53 +00:00
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
2019-01-12 04:58:27 +00:00
}
return nil
}
2020-03-26 21:07:15 +00:00
// bind binds a pod to a given node defined in a binding object.
// The precedence for binding is: (1) extenders and (2) framework plugins.
// We expect this to run asynchronously, so we handle binding metrics internally.
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
2020-03-26 21:07:15 +00:00
defer func() {
sched.finishBinding(fwk, assumed, targetNode, err)
2020-03-26 21:07:15 +00:00
}()
bound, err := sched.extendersBinding(assumed, targetNode)
if bound {
return err
}
bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode)
2020-03-26 21:07:15 +00:00
if bindStatus.IsSuccess() {
return nil
}
if bindStatus.Code() == framework.Error {
return bindStatus.AsError()
}
return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
}
// TODO(#87159): Move this to a Plugin.
func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) {
for _, extender := range sched.Extenders {
2020-03-26 21:07:15 +00:00
if !extender.IsBinder() || !extender.IsInterested(pod) {
continue
2019-09-27 21:51:53 +00:00
}
2020-03-26 21:07:15 +00:00
return true, extender.Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
Target: v1.ObjectReference{Kind: "Node", Name: node},
})
2019-09-27 21:51:53 +00:00
}
2020-03-26 21:07:15 +00:00
return false, nil
}
func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) {
2019-09-27 21:51:53 +00:00
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.ErrorS(finErr, "scheduler cache FinishBinding failed")
2019-01-12 04:58:27 +00:00
}
if err != nil {
klog.V(1).InfoS("Failed to bind pod", "pod", klog.KObj(assumed))
2020-03-26 21:07:15 +00:00
return
2019-01-12 04:58:27 +00:00
}
fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
2019-01-12 04:58:27 +00:00
}
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
2019-12-12 01:27:03 +00:00
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
2019-01-12 04:58:27 +00:00
// pod could be nil when schedulerQueue is closed
2019-12-12 01:27:03 +00:00
if podInfo == nil || podInfo.Pod == nil {
2019-01-12 04:58:27 +00:00
return
}
2019-12-12 01:27:03 +00:00
pod := podInfo.Pod
fwk, err := sched.frameworkForPod(pod)
2020-03-26 21:07:15 +00:00
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
klog.ErrorS(err, "Error occurred")
2020-03-26 21:07:15 +00:00
return
}
if sched.skipPodSchedule(fwk, pod) {
2019-01-12 04:58:27 +00:00
return
}
klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))
2019-01-12 04:58:27 +00:00
// Synchronously attempt to find a fit for the pod.
start := time.Now()
2019-12-12 01:27:03 +00:00
state := framework.NewCycleState()
2020-03-26 21:07:15 +00:00
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)
2019-12-12 01:27:03 +00:00
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
2019-01-12 04:58:27 +00:00
if err != nil {
2019-12-12 01:27:03 +00:00
// Schedule() may have failed because the pod would not fit on any host, so we try to
2019-01-12 04:58:27 +00:00
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
2020-08-10 17:43:49 +00:00
nominatedNode := ""
if fitError, ok := err.(*framework.FitError); ok {
if !fwk.HasPostFilterPlugins() {
klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
2019-04-07 17:07:55 +00:00
} else {
2020-08-10 17:43:49 +00:00
// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
2020-08-10 17:43:49 +00:00
if status.Code() == framework.Error {
klog.ErrorS(nil, "Status after running PostFilter plugins for pod", klog.KObj(pod), "status", status)
2020-08-10 17:43:49 +00:00
} else {
klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
2020-08-10 17:43:49 +00:00
}
if status.IsSuccess() && result != nil {
nominatedNode = result.NominatedNodeName
}
2019-04-07 17:07:55 +00:00
}
2019-01-12 04:58:27 +00:00
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to
// schedule it. (hopefully)
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
} else if err == ErrNoNodesAvailable {
2020-05-26 22:59:35 +00:00
// No nodes available is counted as unschedulable rather than an error.
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
2019-01-12 04:58:27 +00:00
} else {
2020-08-10 17:43:49 +00:00
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
2019-01-12 04:58:27 +00:00
}
sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
2019-01-12 04:58:27 +00:00
return
}
2019-04-07 17:07:55 +00:00
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
2019-01-12 04:58:27 +00:00
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
2019-12-12 01:27:03 +00:00
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
2019-04-07 17:07:55 +00:00
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
2019-01-12 04:58:27 +00:00
if err != nil {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
2019-12-12 01:27:03 +00:00
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")
2020-08-10 17:43:49 +00:00
return
}
// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
2020-08-10 17:43:49 +00:00
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
2020-08-10 17:43:49 +00:00
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
2019-01-12 04:58:27 +00:00
return
}
2020-03-26 21:07:15 +00:00
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
2020-03-26 21:07:15 +00:00
if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
var reason string
if runPermitStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
2020-03-26 21:07:15 +00:00
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
2020-03-26 21:07:15 +00:00
reason = SchedulerError
}
2020-08-10 17:43:49 +00:00
// One of the plugins returned status different than success or wait.
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
2020-03-26 21:07:15 +00:00
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
2020-03-26 21:07:15 +00:00
return
}
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Clear the entries after activation.
podsToActivate.Map = make(map[string]*v1.Pod)
}
2019-01-12 04:58:27 +00:00
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
2019-12-12 01:27:03 +00:00
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
2019-01-12 04:58:27 +00:00
waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
2020-03-26 21:07:15 +00:00
if !waitOnPermitStatus.IsSuccess() {
2019-08-30 18:33:25 +00:00
var reason string
2020-03-26 21:07:15 +00:00
if waitOnPermitStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
2019-08-30 18:33:25 +00:00
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
2019-08-30 18:33:25 +00:00
reason = SchedulerError
}
2020-08-10 17:43:49 +00:00
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
2019-08-30 18:33:25 +00:00
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
2019-08-30 18:33:25 +00:00
return
}
2019-04-07 17:07:55 +00:00
// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
2019-09-27 21:51:53 +00:00
if !preBindStatus.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
2020-08-10 17:43:49 +00:00
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
2019-04-07 17:07:55 +00:00
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
2019-08-30 18:33:25 +00:00
return
2019-04-07 17:07:55 +00:00
}
err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
2019-01-12 04:58:27 +00:00
if err != nil {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
2019-08-30 18:33:25 +00:00
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
2020-08-10 17:43:49 +00:00
if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
klog.ErrorS(err, "scheduler cache ForgetPod failed")
2020-08-10 17:43:49 +00:00
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
2019-01-12 04:58:27 +00:00
} else {
2019-09-27 21:51:53 +00:00
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
2020-08-10 17:43:49 +00:00
if klog.V(2).Enabled() {
klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
2019-09-27 21:51:53 +00:00
}
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
2019-12-12 01:27:03 +00:00
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
2020-08-10 17:43:49 +00:00
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
2019-08-30 18:33:25 +00:00
// Run "postbind" plugins.
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Unlike the logic in scheduling cycle, we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}
2019-01-12 04:58:27 +00:00
}
}()
}
2019-09-27 21:51:53 +00:00
2020-08-10 17:43:49 +00:00
func getAttemptsLabel(p *framework.QueuedPodInfo) string {
// We breakdown the pod scheduling duration by attempts capped to a limit
// to avoid ending up with a high cardinality metric.
if p.Attempts >= 15 {
return "15+"
}
return strconv.Itoa(p.Attempts)
}
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
2020-03-26 21:07:15 +00:00
if !ok {
return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
}
return fwk, nil
2020-03-26 21:07:15 +00:00
}
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bool {
2020-03-26 21:07:15 +00:00
// Case 1: pod is being deleted.
if pod.DeletionTimestamp != nil {
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).InfoS("Skip schedule deleting pod", "pod", klog.KObj(pod))
2020-03-26 21:07:15 +00:00
return true
}
// Case 2: pod has been assumed could be skipped.
2020-03-26 21:07:15 +00:00
// An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed.
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
2020-03-26 21:07:15 +00:00
}
return isAssumed
2019-09-27 21:51:53 +00:00
}
// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific
// in-place podInformer.
func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)
informerFactory.InformerFor(&v1.Pod{}, newPodInformer)
return informerFactory
}
// newPodInformer creates a shared index informer that returns only non-terminal pods.
func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}
return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, nil, tweakListOptions)
}