Add plugin invocation for 'reserve' and 'prebind' plugins to the scheduler.

pull/564/head
Bobby (Babak) Salamat 2018-11-08 18:08:38 -08:00
parent f6f81fb1f3
commit f74b30868c
7 changed files with 140 additions and 45 deletions

View File

@ -516,6 +516,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
test.prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},

View File

@ -44,6 +44,7 @@ import (
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -103,6 +104,7 @@ type genericScheduler struct {
priorityMetaProducer algorithm.PriorityMetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
prioritizers []algorithm.PriorityConfig
pluginSet pluginsv1alpha1.PluginSet
extenders []algorithm.SchedulerExtender
lastNodeIndex uint64
alwaysCheckAllPredicates bool
@ -1152,6 +1154,7 @@ func NewGenericScheduler(
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.PriorityMetadataProducer,
pluginSet pluginsv1alpha1.PluginSet,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
@ -1168,6 +1171,7 @@ func NewGenericScheduler(
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
pluginSet: pluginSet,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
)
@ -135,6 +136,28 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str
return nil
}
// EmptyPluginSet is a test plugin set used by the default scheduler.
type EmptyPluginSet struct{}
var _ plugins.PluginSet = EmptyPluginSet{}
// ReservePlugins returns a slice of default reserve plugins.
func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin {
return []plugins.ReservePlugin{}
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return []plugins.PrebindPlugin{}
}
// Data returns a pointer to PluginData.
func (r EmptyPluginSet) Data() *plugins.PluginData {
return &plugins.PluginData{}
}
var emptyPluginSet = &EmptyPluginSet{}
func makeNodeList(nodeNames []string) []*v1.Node {
result := make([]*v1.Node, 0, len(nodeNames))
for _, nodeName := range nodeNames {
@ -454,6 +477,7 @@ func TestGenericScheduler(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
test.prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
[]algorithm.SchedulerExtender{},
nil,
pvcLister,
@ -490,6 +514,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap)
@ -1416,6 +1441,7 @@ func TestPreempt(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}},
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
@ -1543,6 +1569,7 @@ func TestCacheInvalidationRace(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
nil, nil, pvcLister, pdbLister,
true, false,
schedulerapi.DefaultPercentageOfNodesToScore)
@ -1626,6 +1653,7 @@ func TestCacheInvalidationRace2(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
nil, nil, pvcLister, pdbLister, true, false,
schedulerapi.DefaultPercentageOfNodesToScore)

View File

@ -63,6 +63,8 @@ import (
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/plugins"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -109,6 +111,8 @@ type Config struct {
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor
// PlugingSet has a set of plugins and data used to run them.
PluginSet pluginsv1alpha1.PluginSet
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
@ -202,6 +206,8 @@ type configFactory struct {
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelisters.StorageClassLister
// pluginRunner has a set of plugins and the context used for running them.
pluginSet pluginsv1alpha1.PluginSet
// Close this to stop all reflectors
StopEverything <-chan struct{}
@ -1225,6 +1231,9 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, err
}
// TODO(bsalamat): the default registrar should be able to process config files.
c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache)
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
@ -1239,6 +1248,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
c.pluginSet,
extenders,
c.volumeBinder,
c.pVCLister,
@ -1258,6 +1268,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
GetBinder: c.getBinderFunc(extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
PluginSet: c.pluginSet,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},

View File

@ -17,11 +17,14 @@ limitations under the License.
package scheduler
import (
"errors"
"fmt"
"io/ioutil"
"os"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -44,13 +47,13 @@ import (
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog"
)
const (
// BindTimeoutSeconds defines the default bind timeout
BindTimeoutSeconds = 100
// SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
SchedulerError = "SchedulerError"
)
// Scheduler watches for new unscheduled pods. It attempts to find
@ -286,19 +289,26 @@ func (sched *Scheduler) Config() *factory.Config {
return sched.config
}
// recordFailedSchedulingEvent records an event for the pod that indicates the
// pod has failed to schedule.
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
sched.config.Error(pod, err)
sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
})
}
// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
pod = pod.DeepCopy()
sched.config.Error(pod, err)
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: err.Error(),
})
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
return "", err
}
return host, err
@ -362,14 +372,8 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
if err != nil {
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "SchedulerError",
Message: err.Error(),
})
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
}
// Invalidate ecache because assumed volumes could have affected the cached
// pvs for other pods
@ -387,9 +391,6 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo
// If binding errors, times out or gets undone, then an error will be returned to
// retry scheduling.
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
var reason string
var eventType string
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
@ -404,15 +405,7 @@ func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
// stale pod binding cache.
sched.config.VolumeBinder.DeletePodBindings(assumed)
reason = "VolumeBindingFailed"
eventType = v1.EventTypeWarning
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
})
sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error())
return err
}
@ -441,14 +434,8 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "SchedulerError",
Message: err.Error(),
})
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("AssumePod failed: %v", err))
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
@ -480,13 +467,8 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
}
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "BindingRejected",
})
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("Binding rejected: %v", err))
return err
}
@ -498,6 +480,12 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
plugins := sched.config.PluginSet
// Remove all plugin context data at the beginning of a scheduling cycle.
if plugins.Data().Ctx != nil {
plugins.Data().Ctx.Reset()
}
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
@ -554,6 +542,16 @@ func (sched *Scheduler) scheduleOne() {
return
}
// Run "reserve" plugins.
for _, pl := range plugins.ReservePlugins() {
if err := pl.Reserve(plugins, assumedPod, suggestedHost); err != nil {
klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
sched.recordSchedulingFailure(assumedPod, err, SchedulerError,
fmt.Sprintf("reserve plugin %v failed", pl.Name()))
metrics.PodScheduleErrors.Inc()
return
}
}
// assume modifies `assumedPod` by setting NodeName=suggestedHost
err = sched.assume(assumedPod, suggestedHost)
if err != nil {
@ -573,6 +571,30 @@ func (sched *Scheduler) scheduleOne() {
}
}
// Run "prebind" plugins.
for _, pl := range plugins.PrebindPlugins() {
approved, err := pl.Prebind(plugins, assumedPod, suggestedHost)
if err != nil {
approved = false
klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
metrics.PodScheduleErrors.Inc()
}
if !approved {
sched.Cache().ForgetPod(assumedPod)
var reason string
if err == nil {
msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name)
klog.V(4).Infof(msg)
err = errors.New(msg)
reason = v1.PodReasonUnschedulable
} else {
reason = SchedulerError
}
sched.recordSchedulingFailure(assumedPod, err, reason, err.Error())
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{

View File

@ -295,6 +295,7 @@ func TestScheduler(t *testing.T) {
NextPod: func() *v1.Pod {
return item.sendPod
},
PluginSet: &EmptyPluginSet{},
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
},
@ -643,6 +644,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyPriorityMetadataProducer,
&EmptyPluginSet{},
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -672,6 +674,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
PluginSet: &EmptyPluginSet{},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
},
}
@ -694,6 +697,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyPriorityMetadataProducer,
&EmptyPluginSet{},
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -727,6 +731,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
PluginSet: &EmptyPluginSet{},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
},
}

View File

@ -27,6 +27,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -89,3 +90,26 @@ func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*facto
func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) {
return fc.Config, nil
}
// EmptyPluginSet is the default plugin restirar used by the default scheduler.
type EmptyPluginSet struct{}
var _ = plugins.PluginSet(EmptyPluginSet{})
// ReservePlugins returns a slice of default reserve plugins.
func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin {
return []plugins.ReservePlugin{}
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return []plugins.PrebindPlugin{}
}
// Data returns a pointer to PluginData.
func (r EmptyPluginSet) Data() *plugins.PluginData {
return &plugins.PluginData{
Ctx: nil,
SchedulerCache: nil,
}
}