add scheduling framework configuration

update bazel build

fix get plugin config method

initialize only needed plugins

fix unit test

fix import duplicate package

update bazel

add docstrings

add weight field to plugin

add plugin to v1alpha1

add plugins at appropriate extension points

remove todo statement

fix import package file path

set plugin json schema

add plugin unit test to option

initial plugin in test integration

initialize only needed plugins

update bazel

rename func

change plugins needed logic

remove v1 alias

change the comment

fix alias shorter

remove blank line

change docstrings

fix map bool to struct

add some docstrings

add unreserve plugin

fix docstrings

move variable inside the for loop

make if else statement cleaner

remove plugin config from reserve plugin unit test

add plugin config and reduce unnecessary options for unit test

update bazel

fix race condition

fix permit plugin integration

change plugins to be pointer

change weight to int32

fix package alias

initial queue sort plugin

rename unreserve plugin

redesign plugin struct

update docstrings

check queue sort plugin amount

fix error message

fix condition

change plugin struct

add disabled plugin for unit test

fix docstrings

handle nil plugin set
k3s-v1.15.3
JieJhih Jhang 2019-05-04 18:29:30 +08:00
parent e1770e698e
commit 2cd5fc54a1
23 changed files with 996 additions and 52 deletions

View File

@ -70,6 +70,7 @@ go_test(
"//cmd/kube-scheduler/app/config:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",

View File

@ -29,6 +29,7 @@ import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
componentbaseconfig "k8s.io/component-base/config"
@ -146,6 +147,31 @@ users:
t.Fatal(err)
}
// plugin config
pluginconfigFile := filepath.Join(tmpDir, "plugin.yaml")
if err := ioutil.WriteFile(pluginconfigFile, []byte(fmt.Sprintf(`
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: "%s"
plugins:
reserve:
enabled:
- name: foo
- name: bar
disabled:
- name: baz
preBind:
enabled:
- name: foo
disabled:
- name: baz
pluginConfig:
- name: foo
`, configKubeconfig)), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
// Insulate this test from picking up in-cluster config when run inside a pod
// We can't assume we have permissions to write to /var/run/secrets/... from a unit test to mock in-cluster config for testing
originalHost := os.Getenv("KUBERNETES_SERVICE_HOST")
@ -224,6 +250,7 @@ users:
ContentType: "application/vnd.kubernetes.protobuf",
},
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
Plugins: nil,
},
},
{
@ -334,6 +361,73 @@ users:
},
expectedUsername: "none, http",
},
{
name: "plugin config",
options: &Options{
ConfigFile: pluginconfigFile,
},
expectedUsername: "config",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
SchedulerName: "default-scheduler",
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
HardPodAffinitySymmetricWeight: 1,
HealthzBindAddress: "0.0.0.0:10251",
MetricsBindAddress: "0.0.0.0:10251",
LeaderElection: kubeschedulerconfig.KubeSchedulerLeaderElectionConfiguration{
LeaderElectionConfiguration: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceLock: "endpoints",
},
LockObjectNamespace: "kube-system",
LockObjectName: "kube-scheduler",
},
ClientConnection: componentbaseconfig.ClientConnectionConfiguration{
Kubeconfig: configKubeconfig,
QPS: 50,
Burst: 100,
ContentType: "application/vnd.kubernetes.protobuf",
},
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
Plugins: &kubeschedulerconfig.Plugins{
Reserve: &kubeschedulerconfig.PluginSet{
Enabled: []kubeschedulerconfig.Plugin{
{
Name: "foo",
},
{
Name: "bar",
},
},
Disabled: []kubeschedulerconfig.Plugin{
{
Name: "baz",
},
},
},
PreBind: &kubeschedulerconfig.PluginSet{
Enabled: []kubeschedulerconfig.Plugin{
{
Name: "foo",
},
},
Disabled: []kubeschedulerconfig.Plugin{
{
Name: "baz",
},
},
},
},
PluginConfig: []kubeschedulerconfig.PluginConfig{
{
Name: "foo",
Args: runtime.Unknown{},
},
},
},
},
{
name: "no config",
options: &Options{},

View File

@ -176,6 +176,8 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
cc.ComponentConfig.AlgorithmSource,
stopCh,
framework.NewRegistry(),
cc.ComponentConfig.Plugins,
cc.ComponentConfig.PluginConfig,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),

View File

@ -18,6 +18,7 @@ package config
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
componentbaseconfig "k8s.io/component-base/config"
)
@ -86,6 +87,17 @@ type KubeSchedulerConfiguration struct {
// Value must be non-negative integer. The value zero indicates no waiting.
// If this value is nil, the default value will be used.
BindTimeoutSeconds *int64
// Plugins specify the set of plugins that should be enabled or disabled. Enabled plugins are the
// ones that should be enabled in addition to the default plugins. Disabled plugins are any of the
// default plugins that should be disabled.
// When no enabled or disabled plugin is specified for an extension point, default plugins for
// that extension point will be used if there is any.
Plugins *Plugins
// PluginConfig is an optional set of custom plugin arguments for each plugin.
// Omitting config args for a plugin is equivalent to using the default config for that plugin.
PluginConfig []PluginConfig
}
// SchedulerAlgorithmSource is the source of a scheduler algorithm. One source
@ -131,3 +143,76 @@ type KubeSchedulerLeaderElectionConfiguration struct {
// LockObjectName defines the lock object name
LockObjectName string
}
// Plugins include multiple extension points. When specified, the list of plugins for
// a particular extension point are the only ones enabled. If an extension point is
// omitted from the config, then the default set of plugins is used for that extension point.
// Enabled plugins are called in the order specified here, after default plugins. If they need to
// be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.
type Plugins struct {
// QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.
QueueSort *PluginSet
// PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework.
PreFilter *PluginSet
// Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod.
Filter *PluginSet
// PostFilter is a list of plugins that are invoked after filtering out infeasible nodes.
PostFilter *PluginSet
// Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase.
Score *PluginSet
// NormalizeScore is a list of plugins that should be invoked after the scoring phase to normalize scores.
NormalizeScore *PluginSet
// Reserve is a list of plugins invoked when reserving a node to run the pod.
Reserve *PluginSet
// Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod.
Permit *PluginSet
// PreBind is a list of plugins that should be invoked before a pod is bound.
PreBind *PluginSet
// Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework.
// The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success.
Bind *PluginSet
// PostBind is a list of plugins that should be invoked after a pod is successfully bound.
PostBind *PluginSet
// Unreserve is a list of plugins invoked when a pod that was previously reserved is rejected in a later phase.
Unreserve *PluginSet
}
// PluginSet specifies enabled and disabled plugins for an extension point.
// If an array is empty, missing, or nil, default plugins at that extension point will be used.
type PluginSet struct {
// Enabled specifies plugins that should be enabled in addition to default plugins.
// These are called after default plugins and in the same order specified here.
Enabled []Plugin
// Disabled specifies default plugins that should be disabled.
// When all default plugins need to be disabled, an array containing only one "*" should be provided.
Disabled []Plugin
}
// Plugin specifies a plugin name and its weight when applicable. Weight is used only for Score plugins.
type Plugin struct {
// Name defines the name of plugin
Name string
// Weight defines the weight of plugin, only used for Score plugins.
Weight int32
}
// PluginConfig specifies arguments that should be passed to a plugin at the time of initialization.
// A plugin that is invoked at multiple extension points is initialized once. Args can have arbitrary structure.
// It is up to the plugin to process these Args.
type PluginConfig struct {
// Name defines the name of plugin being configured
Name string
// Args defines the arguments passed to the plugins at the time of initialization. Args can have arbitrary structure.
Args runtime.Unknown
}

View File

@ -57,6 +57,46 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha1.Plugin)(nil), (*config.Plugin)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_Plugin_To_config_Plugin(a.(*v1alpha1.Plugin), b.(*config.Plugin), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.Plugin)(nil), (*v1alpha1.Plugin)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_Plugin_To_v1alpha1_Plugin(a.(*config.Plugin), b.(*v1alpha1.Plugin), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha1.PluginConfig)(nil), (*config.PluginConfig)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_PluginConfig_To_config_PluginConfig(a.(*v1alpha1.PluginConfig), b.(*config.PluginConfig), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.PluginConfig)(nil), (*v1alpha1.PluginConfig)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_PluginConfig_To_v1alpha1_PluginConfig(a.(*config.PluginConfig), b.(*v1alpha1.PluginConfig), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha1.PluginSet)(nil), (*config.PluginSet)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_PluginSet_To_config_PluginSet(a.(*v1alpha1.PluginSet), b.(*config.PluginSet), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.PluginSet)(nil), (*v1alpha1.PluginSet)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_PluginSet_To_v1alpha1_PluginSet(a.(*config.PluginSet), b.(*v1alpha1.PluginSet), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha1.Plugins)(nil), (*config.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_Plugins_To_config_Plugins(a.(*v1alpha1.Plugins), b.(*config.Plugins), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.Plugins)(nil), (*v1alpha1.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_Plugins_To_v1alpha1_Plugins(a.(*config.Plugins), b.(*v1alpha1.Plugins), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha1.SchedulerAlgorithmSource)(nil), (*config.SchedulerAlgorithmSource)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_SchedulerAlgorithmSource_To_config_SchedulerAlgorithmSource(a.(*v1alpha1.SchedulerAlgorithmSource), b.(*config.SchedulerAlgorithmSource), scope)
}); err != nil {
@ -120,6 +160,8 @@ func autoConvert_v1alpha1_KubeSchedulerConfiguration_To_config_KubeSchedulerConf
out.DisablePreemption = in.DisablePreemption
out.PercentageOfNodesToScore = in.PercentageOfNodesToScore
out.BindTimeoutSeconds = (*int64)(unsafe.Pointer(in.BindTimeoutSeconds))
out.Plugins = (*config.Plugins)(unsafe.Pointer(in.Plugins))
out.PluginConfig = *(*[]config.PluginConfig)(unsafe.Pointer(&in.PluginConfig))
return nil
}
@ -148,6 +190,8 @@ func autoConvert_config_KubeSchedulerConfiguration_To_v1alpha1_KubeSchedulerConf
out.DisablePreemption = in.DisablePreemption
out.PercentageOfNodesToScore = in.PercentageOfNodesToScore
out.BindTimeoutSeconds = (*int64)(unsafe.Pointer(in.BindTimeoutSeconds))
out.Plugins = (*v1alpha1.Plugins)(unsafe.Pointer(in.Plugins))
out.PluginConfig = *(*[]v1alpha1.PluginConfig)(unsafe.Pointer(&in.PluginConfig))
return nil
}
@ -184,6 +228,114 @@ func Convert_config_KubeSchedulerLeaderElectionConfiguration_To_v1alpha1_KubeSch
return autoConvert_config_KubeSchedulerLeaderElectionConfiguration_To_v1alpha1_KubeSchedulerLeaderElectionConfiguration(in, out, s)
}
func autoConvert_v1alpha1_Plugin_To_config_Plugin(in *v1alpha1.Plugin, out *config.Plugin, s conversion.Scope) error {
out.Name = in.Name
out.Weight = in.Weight
return nil
}
// Convert_v1alpha1_Plugin_To_config_Plugin is an autogenerated conversion function.
func Convert_v1alpha1_Plugin_To_config_Plugin(in *v1alpha1.Plugin, out *config.Plugin, s conversion.Scope) error {
return autoConvert_v1alpha1_Plugin_To_config_Plugin(in, out, s)
}
func autoConvert_config_Plugin_To_v1alpha1_Plugin(in *config.Plugin, out *v1alpha1.Plugin, s conversion.Scope) error {
out.Name = in.Name
out.Weight = in.Weight
return nil
}
// Convert_config_Plugin_To_v1alpha1_Plugin is an autogenerated conversion function.
func Convert_config_Plugin_To_v1alpha1_Plugin(in *config.Plugin, out *v1alpha1.Plugin, s conversion.Scope) error {
return autoConvert_config_Plugin_To_v1alpha1_Plugin(in, out, s)
}
func autoConvert_v1alpha1_PluginConfig_To_config_PluginConfig(in *v1alpha1.PluginConfig, out *config.PluginConfig, s conversion.Scope) error {
out.Name = in.Name
out.Args = in.Args
return nil
}
// Convert_v1alpha1_PluginConfig_To_config_PluginConfig is an autogenerated conversion function.
func Convert_v1alpha1_PluginConfig_To_config_PluginConfig(in *v1alpha1.PluginConfig, out *config.PluginConfig, s conversion.Scope) error {
return autoConvert_v1alpha1_PluginConfig_To_config_PluginConfig(in, out, s)
}
func autoConvert_config_PluginConfig_To_v1alpha1_PluginConfig(in *config.PluginConfig, out *v1alpha1.PluginConfig, s conversion.Scope) error {
out.Name = in.Name
out.Args = in.Args
return nil
}
// Convert_config_PluginConfig_To_v1alpha1_PluginConfig is an autogenerated conversion function.
func Convert_config_PluginConfig_To_v1alpha1_PluginConfig(in *config.PluginConfig, out *v1alpha1.PluginConfig, s conversion.Scope) error {
return autoConvert_config_PluginConfig_To_v1alpha1_PluginConfig(in, out, s)
}
func autoConvert_v1alpha1_PluginSet_To_config_PluginSet(in *v1alpha1.PluginSet, out *config.PluginSet, s conversion.Scope) error {
out.Enabled = *(*[]config.Plugin)(unsafe.Pointer(&in.Enabled))
out.Disabled = *(*[]config.Plugin)(unsafe.Pointer(&in.Disabled))
return nil
}
// Convert_v1alpha1_PluginSet_To_config_PluginSet is an autogenerated conversion function.
func Convert_v1alpha1_PluginSet_To_config_PluginSet(in *v1alpha1.PluginSet, out *config.PluginSet, s conversion.Scope) error {
return autoConvert_v1alpha1_PluginSet_To_config_PluginSet(in, out, s)
}
func autoConvert_config_PluginSet_To_v1alpha1_PluginSet(in *config.PluginSet, out *v1alpha1.PluginSet, s conversion.Scope) error {
out.Enabled = *(*[]v1alpha1.Plugin)(unsafe.Pointer(&in.Enabled))
out.Disabled = *(*[]v1alpha1.Plugin)(unsafe.Pointer(&in.Disabled))
return nil
}
// Convert_config_PluginSet_To_v1alpha1_PluginSet is an autogenerated conversion function.
func Convert_config_PluginSet_To_v1alpha1_PluginSet(in *config.PluginSet, out *v1alpha1.PluginSet, s conversion.Scope) error {
return autoConvert_config_PluginSet_To_v1alpha1_PluginSet(in, out, s)
}
func autoConvert_v1alpha1_Plugins_To_config_Plugins(in *v1alpha1.Plugins, out *config.Plugins, s conversion.Scope) error {
out.QueueSort = (*config.PluginSet)(unsafe.Pointer(in.QueueSort))
out.PreFilter = (*config.PluginSet)(unsafe.Pointer(in.PreFilter))
out.Filter = (*config.PluginSet)(unsafe.Pointer(in.Filter))
out.PostFilter = (*config.PluginSet)(unsafe.Pointer(in.PostFilter))
out.Score = (*config.PluginSet)(unsafe.Pointer(in.Score))
out.NormalizeScore = (*config.PluginSet)(unsafe.Pointer(in.NormalizeScore))
out.Reserve = (*config.PluginSet)(unsafe.Pointer(in.Reserve))
out.Permit = (*config.PluginSet)(unsafe.Pointer(in.Permit))
out.PreBind = (*config.PluginSet)(unsafe.Pointer(in.PreBind))
out.Bind = (*config.PluginSet)(unsafe.Pointer(in.Bind))
out.PostBind = (*config.PluginSet)(unsafe.Pointer(in.PostBind))
out.Unreserve = (*config.PluginSet)(unsafe.Pointer(in.Unreserve))
return nil
}
// Convert_v1alpha1_Plugins_To_config_Plugins is an autogenerated conversion function.
func Convert_v1alpha1_Plugins_To_config_Plugins(in *v1alpha1.Plugins, out *config.Plugins, s conversion.Scope) error {
return autoConvert_v1alpha1_Plugins_To_config_Plugins(in, out, s)
}
func autoConvert_config_Plugins_To_v1alpha1_Plugins(in *config.Plugins, out *v1alpha1.Plugins, s conversion.Scope) error {
out.QueueSort = (*v1alpha1.PluginSet)(unsafe.Pointer(in.QueueSort))
out.PreFilter = (*v1alpha1.PluginSet)(unsafe.Pointer(in.PreFilter))
out.Filter = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Filter))
out.PostFilter = (*v1alpha1.PluginSet)(unsafe.Pointer(in.PostFilter))
out.Score = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Score))
out.NormalizeScore = (*v1alpha1.PluginSet)(unsafe.Pointer(in.NormalizeScore))
out.Reserve = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Reserve))
out.Permit = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Permit))
out.PreBind = (*v1alpha1.PluginSet)(unsafe.Pointer(in.PreBind))
out.Bind = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Bind))
out.PostBind = (*v1alpha1.PluginSet)(unsafe.Pointer(in.PostBind))
out.Unreserve = (*v1alpha1.PluginSet)(unsafe.Pointer(in.Unreserve))
return nil
}
// Convert_config_Plugins_To_v1alpha1_Plugins is an autogenerated conversion function.
func Convert_config_Plugins_To_v1alpha1_Plugins(in *config.Plugins, out *v1alpha1.Plugins, s conversion.Scope) error {
return autoConvert_config_Plugins_To_v1alpha1_Plugins(in, out, s)
}
func autoConvert_v1alpha1_SchedulerAlgorithmSource_To_config_SchedulerAlgorithmSource(in *v1alpha1.SchedulerAlgorithmSource, out *config.SchedulerAlgorithmSource, s conversion.Scope) error {
out.Policy = (*config.SchedulerPolicySource)(unsafe.Pointer(in.Policy))
out.Provider = (*string)(unsafe.Pointer(in.Provider))

View File

@ -37,6 +37,18 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati
*out = new(int64)
**out = **in
}
if in.Plugins != nil {
in, out := &in.Plugins, &out.Plugins
*out = new(Plugins)
(*in).DeepCopyInto(*out)
}
if in.PluginConfig != nil {
in, out := &in.PluginConfig, &out.PluginConfig
*out = make([]PluginConfig, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
@ -75,6 +87,141 @@ func (in *KubeSchedulerLeaderElectionConfiguration) DeepCopy() *KubeSchedulerLea
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Plugin) DeepCopyInto(out *Plugin) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Plugin.
func (in *Plugin) DeepCopy() *Plugin {
if in == nil {
return nil
}
out := new(Plugin)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PluginConfig) DeepCopyInto(out *PluginConfig) {
*out = *in
in.Args.DeepCopyInto(&out.Args)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginConfig.
func (in *PluginConfig) DeepCopy() *PluginConfig {
if in == nil {
return nil
}
out := new(PluginConfig)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PluginSet) DeepCopyInto(out *PluginSet) {
*out = *in
if in.Enabled != nil {
in, out := &in.Enabled, &out.Enabled
*out = make([]Plugin, len(*in))
copy(*out, *in)
}
if in.Disabled != nil {
in, out := &in.Disabled, &out.Disabled
*out = make([]Plugin, len(*in))
copy(*out, *in)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginSet.
func (in *PluginSet) DeepCopy() *PluginSet {
if in == nil {
return nil
}
out := new(PluginSet)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Plugins) DeepCopyInto(out *Plugins) {
*out = *in
if in.QueueSort != nil {
in, out := &in.QueueSort, &out.QueueSort
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.PreFilter != nil {
in, out := &in.PreFilter, &out.PreFilter
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Filter != nil {
in, out := &in.Filter, &out.Filter
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.PostFilter != nil {
in, out := &in.PostFilter, &out.PostFilter
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Score != nil {
in, out := &in.Score, &out.Score
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.NormalizeScore != nil {
in, out := &in.NormalizeScore, &out.NormalizeScore
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Reserve != nil {
in, out := &in.Reserve, &out.Reserve
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Permit != nil {
in, out := &in.Permit, &out.Permit
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.PreBind != nil {
in, out := &in.PreBind, &out.PreBind
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Bind != nil {
in, out := &in.Bind, &out.Bind
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.PostBind != nil {
in, out := &in.PostBind, &out.PostBind
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Unreserve != nil {
in, out := &in.Unreserve, &out.Unreserve
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Plugins.
func (in *Plugins) DeepCopy() *Plugins {
if in == nil {
return nil
}
out := new(Plugins)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SchedulerAlgorithmSource) DeepCopyInto(out *SchedulerAlgorithmSource) {
*out = *in

View File

@ -48,6 +48,7 @@ go_test(
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",

View File

@ -29,7 +29,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -532,7 +531,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
extenders = append(extenders, &test.extenders[ii])
}
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, name := range test.nodes {
cache.AddNode(createNode(name))
}
@ -544,7 +542,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
predicates.EmptyPredicateMetadataProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
fwk,
emptyFramework,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -136,7 +137,7 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str
// EmptyPluginRegistry is a test plugin set used by the default scheduler.
var EmptyPluginRegistry = framework.Registry{}
var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil)
var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, []schedulerconfig.PluginConfig{})
func makeNodeList(nodeNames []string) []*v1.Node {
result := make([]*v1.Node, 0, len(nodeNames))
@ -438,7 +439,6 @@ func TestGenericScheduler(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, pod := range test.pods {
cache.AddPod(pod)
}
@ -457,7 +457,7 @@ func TestGenericScheduler(t *testing.T) {
algorithmpredicates.EmptyPredicateMetadataProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
fwk,
emptyFramework,
[]algorithm.SchedulerExtender{},
nil,
pvcLister,
@ -480,7 +480,6 @@ func TestGenericScheduler(t *testing.T) {
// makeScheduler makes a simple genericScheduler for testing.
func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes []*v1.Node) *genericScheduler {
cache := internalcache.New(time.Duration(0), wait.NeverStop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, n := range nodes {
cache.AddNode(n)
}
@ -493,7 +492,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes
algorithmpredicates.EmptyPredicateMetadataProducer,
prioritizers,
priorities.EmptyPriorityMetadataProducer,
fwk,
emptyFramework,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateNodeInfoSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
@ -1469,7 +1468,6 @@ func TestPreempt(t *testing.T) {
t.Logf("===== Running test %v", t.Name())
stop := make(chan struct{})
cache := internalcache.New(time.Duration(0), stop)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
for _, pod := range test.pods {
cache.AddPod(pod)
}
@ -1496,7 +1494,7 @@ func TestPreempt(t *testing.T) {
algorithmpredicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
priorities.EmptyPriorityMetadataProducer,
fwk,
emptyFramework,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},

View File

@ -15,6 +15,7 @@ go_library(
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/validation:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
@ -60,6 +61,7 @@ go_test(
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",

View File

@ -22,7 +22,7 @@ import (
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/api/validation"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -238,6 +239,8 @@ type ConfigFactoryArgs struct {
BindTimeoutSeconds int64
StopCh <-chan struct{}
Registry framework.Registry
Plugins *config.Plugins
PluginConfig []config.PluginConfig
}
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
@ -248,8 +251,8 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything = wait.NeverStop
}
schedulerCache := internalcache.New(30*time.Second, stopEverything)
// TODO(bsalamat): config files should be passed to the framework.
framework, err := framework.NewFramework(args.Registry, nil)
framework, err := framework.NewFramework(args.Registry, args.Plugins, args.PluginConfig)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -496,6 +497,8 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight
bindTimeoutSeconds,
stopCh,
framework.NewRegistry(),
nil,
[]config.PluginConfig{},
})
}

View File

@ -12,6 +12,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
@ -49,44 +50,103 @@ const (
var _ = Framework(&framework{})
// NewFramework initializes plugins given the configuration and the registry.
func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig) (Framework, error) {
f := &framework{
registry: r,
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
plugins: make(map[string]Plugin),
waitingPods: newWaitingPodsMap(),
}
if plugins == nil {
return f, nil
}
// TODO: The framework needs to read the scheduler config and initialize only
// needed plugins. In this initial version of the code, we initialize all.
// get needed plugins from config
pg := pluginsNeeded(plugins)
if len(pg) == 0 {
return f, nil
}
pluginConfig := pluginNameToConfig(args)
for name, factory := range r {
// TODO: 'nil' should be replaced by plugin config.
p, err := factory(nil, f)
// initialize only needed plugins
if _, ok := pg[name]; !ok {
continue
}
// find the config args of a plugin
pc := pluginConfig[name]
p, err := factory(pc, f)
if err != nil {
return nil, fmt.Errorf("error initializing plugin %v: %v", name, err)
}
f.plugins[name] = p
}
// TODO: For now, we assume any plugins that implements an extension
// point wants to be called at that extension point. We should change this
// later and add these plugins based on the configuration.
if qsp, ok := p.(QueueSortPlugin); ok {
f.queueSortPlugins = append(f.queueSortPlugins, qsp)
}
if rp, ok := p.(ReservePlugin); ok {
f.reservePlugins = append(f.reservePlugins, rp)
}
if pp, ok := p.(PrebindPlugin); ok {
f.prebindPlugins = append(f.prebindPlugins, pp)
}
if up, ok := p.(UnreservePlugin); ok {
f.unreservePlugins = append(f.unreservePlugins, up)
}
if pr, ok := p.(PermitPlugin); ok {
f.permitPlugins = append(f.permitPlugins, pr)
for _, r := range plugins.Reserve.Enabled {
if pg, ok := f.plugins[r.Name]; ok {
p, ok := pg.(ReservePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend reserve plugin", r.Name)
}
f.reservePlugins = append(f.reservePlugins, p)
} else {
return nil, fmt.Errorf("reserve plugin %v does not exist", r.Name)
}
}
for _, pb := range plugins.PreBind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
p, ok := pg.(PrebindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend prebind plugin", pb.Name)
}
f.prebindPlugins = append(f.prebindPlugins, p)
} else {
return nil, fmt.Errorf("prebind plugin %v does not exist", pb.Name)
}
}
for _, ur := range plugins.Unreserve.Enabled {
if pg, ok := f.plugins[ur.Name]; ok {
p, ok := pg.(UnreservePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend unreserve plugin", ur.Name)
}
f.unreservePlugins = append(f.unreservePlugins, p)
} else {
return nil, fmt.Errorf("unreserve plugin %v does not exist", ur.Name)
}
}
for _, pr := range plugins.Permit.Enabled {
if pg, ok := f.plugins[pr.Name]; ok {
p, ok := pg.(PermitPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend permit plugin", pr.Name)
}
f.permitPlugins = append(f.permitPlugins, p)
} else {
return nil, fmt.Errorf("permit plugin %v does not exist", pr.Name)
}
}
for _, qs := range plugins.QueueSort.Enabled {
if pg, ok := f.plugins[qs.Name]; ok {
p, ok := pg.(QueueSortPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend queue sort plugin", qs.Name)
}
f.queueSortPlugins = append(f.queueSortPlugins, p)
if len(f.queueSortPlugins) > 1 {
return nil, fmt.Errorf("only one queue sort plugin can be enabled")
}
} else {
return nil, fmt.Errorf("queue sort plugin %v does not exist", qs.Name)
}
}
return f, nil
}
@ -225,3 +285,42 @@ func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
return f.waitingPods.get(uid)
}
func pluginNameToConfig(args []config.PluginConfig) map[string]*runtime.Unknown {
pc := make(map[string]*runtime.Unknown, 0)
for _, p := range args {
pc[p.Name] = &p.Args
}
return pc
}
func pluginsNeeded(plugins *config.Plugins) map[string]struct{} {
pgMap := make(map[string]struct{}, 0)
if plugins == nil {
return pgMap
}
find := func(pgs *config.PluginSet) {
if pgs == nil {
return
}
for _, pg := range pgs.Enabled {
pgMap[pg.Name] = struct{}{}
}
}
find(plugins.QueueSort)
find(plugins.PreFilter)
find(plugins.Filter)
find(plugins.PostFilter)
find(plugins.Score)
find(plugins.NormalizeScore)
find(plugins.Reserve)
find(plugins.Permit)
find(plugins.PreBind)
find(plugins.Bind)
find(plugins.PostBind)
find(plugins.Unreserve)
return pgMap
}

View File

@ -133,6 +133,8 @@ func New(client clientset.Interface,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
registry framework.Registry,
plugins *kubeschedulerconfig.Plugins,
pluginConfig []kubeschedulerconfig.PluginConfig,
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
options := defaultSchedulerOptions
@ -158,6 +160,8 @@ func New(client clientset.Interface,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
Registry: registry,
Plugins: plugins,
PluginConfig: pluginConfig,
})
var config *factory.Config
source := schedulerAlgorithmSource

View File

@ -57,6 +57,13 @@ import (
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
// EmptyFramework is an empty framework used in tests.
// Note: If the test runs in goroutine, please don't using this variable to avoid a race condition.
var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig)
// EmptyPluginConfig is an empty plugin config used in tests.
var EmptyPluginConfig = []kubeschedulerconfig.PluginConfig{}
type fakeBinder struct {
b func(binding *v1.Binding) error
}
@ -197,6 +204,8 @@ func TestSchedulerCreation(t *testing.T) {
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
stopCh,
EmptyPluginRegistry,
nil,
EmptyPluginConfig,
WithBindTimeoutSeconds(defaultBindTimeout))
if err != nil {
@ -274,7 +283,6 @@ func TestScheduler(t *testing.T) {
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil)
s := NewFromConfig(&factory.Config{
SchedulerCache: &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
@ -300,7 +308,7 @@ func TestScheduler(t *testing.T) {
NextPod: func() *v1.Pod {
return item.sendPod
},
Framework: fwk,
Framework: EmptyFramework,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
})
@ -638,7 +646,6 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil)
algo := core.NewGenericScheduler(
scache,
internalqueue.NewSchedulingQueue(nil, nil),
@ -646,7 +653,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyPriorityMetadataProducer,
framework,
EmptyFramework,
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -677,7 +684,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
Framework: framework,
Framework: EmptyFramework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
}
@ -691,7 +698,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
}
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil)
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{})
algo := core.NewGenericScheduler(
scache,
internalqueue.NewSchedulingQueue(nil, nil),

View File

@ -18,6 +18,7 @@ package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"
)
@ -82,6 +83,17 @@ type KubeSchedulerConfiguration struct {
// Value must be non-negative integer. The value zero indicates no waiting.
// If this value is nil, the default value will be used.
BindTimeoutSeconds *int64 `json:"bindTimeoutSeconds"`
// Plugins specify the set of plugins that should be enabled or disabled. Enabled plugins are the
// ones that should be enabled in addition to the default plugins. Disabled plugins are any of the
// default plugins that should be disabled.
// When no enabled or disabled plugin is specified for an extension point, default plugins for
// that extension point will be used if there is any.
Plugins *Plugins `json:"plugins,omitempty"`
// PluginConfig is an optional set of custom plugin arguments for each plugin.
// Omitting config args for a plugin is equivalent to using the default config for that plugin.
PluginConfig []PluginConfig `json:"pluginConfig,omitempty"`
}
// SchedulerAlgorithmSource is the source of a scheduler algorithm. One source
@ -127,3 +139,76 @@ type KubeSchedulerLeaderElectionConfiguration struct {
// LockObjectName defines the lock object name
LockObjectName string `json:"lockObjectName"`
}
// Plugins include multiple extension points. When specified, the list of plugins for
// a particular extension point are the only ones enabled. If an extension point is
// omitted from the config, then the default set of plugins is used for that extension point.
// Enabled plugins are called in the order specified here, after default plugins. If they need to
// be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.
type Plugins struct {
// QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.
QueueSort *PluginSet `json:"queueSort,omitempty"`
// PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework.
PreFilter *PluginSet `json:"preFilter,omitempty"`
// Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod.
Filter *PluginSet `json:"filter,omitempty"`
// PostFilter is a list of plugins that are invoked after filtering out infeasible nodes.
PostFilter *PluginSet `json:"postFilter,omitempty"`
// Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase.
Score *PluginSet `json:"score,omitempty"`
// NormalizeScore is a list of plugins that should be invoked after the scoring phase to normalize scores.
NormalizeScore *PluginSet `json:"normalizeScore,omitempty"`
// Reserve is a list of plugins invoked when reserving a node to run the pod.
Reserve *PluginSet `json:"reserve,omitempty"`
// Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod.
Permit *PluginSet `json:"permit,omitempty"`
// PreBind is a list of plugins that should be invoked before a pod is bound.
PreBind *PluginSet `json:"preBind,omitempty"`
// Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework.
// The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success.
Bind *PluginSet `json:"bind,omitempty"`
// PostBind is a list of plugins that should be invoked after a pod is successfully bound.
PostBind *PluginSet `json:"postBind,omitempty"`
// Unreserve is a list of plugins invoked when a pod that was previously reserved is rejected in a later phase.
Unreserve *PluginSet `json:"unreserve,omitempty"`
}
// PluginSet specifies enabled and disabled plugins for an extension point.
// If an array is empty, missing, or nil, default plugins at that extension point will be used.
type PluginSet struct {
// Enabled specifies plugins that should be enabled in addition to default plugins.
// These are called after default plugins and in the same order specified here.
Enabled []Plugin `json:"enabled,omitempty"`
// Disabled specifies default plugins that should be disabled.
// When all default plugins need to be disabled, an array containing only one "*" should be provided.
Disabled []Plugin `json:"disabled,omitempty"`
}
// Plugin specifies a plugin name and its weight when applicable. Weight is used only for Score plugins.
type Plugin struct {
// Name defines the name of plugin
Name string `json:"name"`
// Weight defines the weight of plugin, only used for Score plugins.
Weight int32 `json:"weight,omitempty"`
}
// PluginConfig specifies arguments that should be passed to a plugin at the time of initialization.
// A plugin that is invoked at multiple extension points is initialized once. Args can have arbitrary structure.
// It is up to the plugin to process these Args.
type PluginConfig struct {
// Name defines the name of plugin being configured
Name string `json:"name"`
// Args defines the arguments passed to the plugins at the time of initialization. Args can have arbitrary structure.
Args runtime.Unknown `json:"args,omitempty"`
}

View File

@ -37,6 +37,18 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati
*out = new(int64)
**out = **in
}
if in.Plugins != nil {
in, out := &in.Plugins, &out.Plugins
*out = new(Plugins)
(*in).DeepCopyInto(*out)
}
if in.PluginConfig != nil {
in, out := &in.PluginConfig, &out.PluginConfig
*out = make([]PluginConfig, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
@ -75,6 +87,141 @@ func (in *KubeSchedulerLeaderElectionConfiguration) DeepCopy() *KubeSchedulerLea
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Plugin) DeepCopyInto(out *Plugin) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Plugin.
func (in *Plugin) DeepCopy() *Plugin {
if in == nil {
return nil
}
out := new(Plugin)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PluginConfig) DeepCopyInto(out *PluginConfig) {
*out = *in
in.Args.DeepCopyInto(&out.Args)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginConfig.
func (in *PluginConfig) DeepCopy() *PluginConfig {
if in == nil {
return nil
}
out := new(PluginConfig)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PluginSet) DeepCopyInto(out *PluginSet) {
*out = *in
if in.Enabled != nil {
in, out := &in.Enabled, &out.Enabled
*out = make([]Plugin, len(*in))
copy(*out, *in)
}
if in.Disabled != nil {
in, out := &in.Disabled, &out.Disabled
*out = make([]Plugin, len(*in))
copy(*out, *in)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginSet.
func (in *PluginSet) DeepCopy() *PluginSet {
if in == nil {
return nil
}
out := new(PluginSet)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Plugins) DeepCopyInto(out *Plugins) {
*out = *in
if in.QueueSort != nil {
in, out := &in.QueueSort, &out.QueueSort
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.PreFilter != nil {
in, out := &in.PreFilter, &out.PreFilter
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Filter != nil {
in, out := &in.Filter, &out.Filter
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.PostFilter != nil {
in, out := &in.PostFilter, &out.PostFilter
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Score != nil {
in, out := &in.Score, &out.Score
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.NormalizeScore != nil {
in, out := &in.NormalizeScore, &out.NormalizeScore
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Reserve != nil {
in, out := &in.Reserve, &out.Reserve
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Permit != nil {
in, out := &in.Permit, &out.Permit
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.PreBind != nil {
in, out := &in.PreBind, &out.PreBind
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Bind != nil {
in, out := &in.Bind, &out.Bind
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.PostBind != nil {
in, out := &in.PostBind, &out.PostBind
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
if in.Unreserve != nil {
in, out := &in.Unreserve, &out.Unreserve
*out = new(PluginSet)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Plugins.
func (in *Plugins) DeepCopy() *Plugins {
if in == nil {
return nil
}
out := new(Plugins)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SchedulerAlgorithmSource) DeepCopyInto(out *SchedulerAlgorithmSource) {
*out = *in

View File

@ -95,6 +95,7 @@ go_library(
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/util/taints:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
@ -201,10 +202,23 @@ func TestReservePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
registry := framework.Registry{reservePluginName: NewReservePlugin}
// Setup initial reserve plugin for testing.
reservePlugin := &schedulerconfig.Plugins{
Reserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: reservePluginName,
},
},
},
}
// Set empty plugin config for testing
emptyPluginConfig := []schedulerconfig.PluginConfig{}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "reserve-plugin", nil),
false, nil, registry, false, time.Second)
false, nil, registry, reservePlugin, emptyPluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
@ -246,10 +260,27 @@ func TestPrebindPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a prebind plugin.
registry := framework.Registry{prebindPluginName: NewPrebindPlugin}
// Setup initial prebind plugin for testing.
preBindPlugin := &schedulerconfig.Plugins{
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: prebindPluginName,
},
},
},
}
// Set reserve prebind config for testing
preBindPluginConfig := []schedulerconfig.PluginConfig{
{
Name: prebindPluginName,
Args: runtime.Unknown{},
},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prebind-plugin", nil),
false, nil, registry, false, time.Second)
false, nil, registry, preBindPlugin, preBindPluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
@ -323,10 +354,39 @@ func TestUnreservePlugin(t *testing.T) {
prebindPluginName: NewPrebindPlugin,
}
// Setup initial unreserve and prebind plugin for testing.
plugins := &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: unreservePluginName,
},
},
},
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: prebindPluginName,
},
},
},
}
// Set unreserve and prebind plugin config for testing
pluginConfig := []schedulerconfig.PluginConfig{
{
Name: unreservePluginName,
Args: runtime.Unknown{},
},
{
Name: prebindPluginName,
Args: runtime.Unknown{},
},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "unreserve-plugin", nil),
false, nil, registry, false, time.Second)
false, nil, registry, plugins, pluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
@ -404,10 +464,28 @@ func TestPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
registry := framework.Registry{permitPluginName: NewPermitPlugin}
// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
// Set permit plugin config for testing
pluginConfig := []schedulerconfig.PluginConfig{
{
Name: permitPluginName,
Args: runtime.Unknown{},
},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "permit-plugin", nil),
false, nil, registry, false, time.Second)
false, nil, registry, plugins, pluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
@ -495,10 +573,28 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
registry := framework.Registry{permitPluginName: NewPermitPlugin}
// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
// Set permit plugin config for testing
pluginConfig := []schedulerconfig.PluginConfig{
{
Name: permitPluginName,
Args: runtime.Unknown{},
},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "permit-plugin", nil),
false, nil, registry, false, time.Second)
false, nil, registry, plugins, pluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet

View File

@ -265,6 +265,8 @@ priorities: []
},
nil,
schedulerframework.NewRegistry(),
nil,
[]kubeschedulerconfig.PluginConfig{},
scheduler.WithName(v1.DefaultSchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(v1.DefaultHardPodAffinitySymmetricWeight),
scheduler.WithBindTimeoutSeconds(defaultBindTimeout),
@ -334,6 +336,8 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
},
nil,
schedulerframework.NewRegistry(),
nil,
[]kubeschedulerconfig.PluginConfig{},
scheduler.WithName(v1.DefaultSchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(v1.DefaultHardPodAffinitySymmetricWeight),
scheduler.WithBindTimeoutSeconds(defaultBindTimeout))
@ -598,7 +602,8 @@ func TestMultiScheduler(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerframework.NewRegistry(), stopCh)
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerframework.NewRegistry(),
nil, []kubeschedulerconfig.PluginConfig{}, stopCh)
schedulerConfig2, err := schedulerConfigFactory2.Create()
if err != nil {
t.Errorf("Couldn't create scheduler config: %v", err)

View File

@ -45,6 +45,8 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/scheduler"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
// Register defaults in pkg/scheduler/algorithmprovider.
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
@ -74,6 +76,8 @@ func createConfiguratorWithPodInformer(
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
pluginRegistry schedulerframework.Registry,
plugins *schedulerconfig.Plugins,
pluginConfig []schedulerconfig.PluginConfig,
stopCh <-chan struct{},
) factory.Configurator {
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
@ -90,6 +94,8 @@ func createConfiguratorWithPodInformer(
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
Registry: pluginRegistry,
Plugins: plugins,
PluginConfig: pluginConfig,
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
@ -148,7 +154,8 @@ func initTestScheduler(
) *testContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, schedulerframework.NewRegistry(), false, time.Second)
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, schedulerframework.NewRegistry(),
nil, []schedulerconfig.PluginConfig{}, false, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -159,6 +166,8 @@ func initTestSchedulerWithOptions(
setPodInformer bool,
policy *schedulerapi.Policy,
pluginRegistry schedulerframework.Registry,
plugins *schedulerconfig.Plugins,
pluginConfig []schedulerconfig.PluginConfig,
disablePreemption bool,
resyncPeriod time.Duration,
) *testContext {
@ -175,7 +184,8 @@ func initTestSchedulerWithOptions(
}
context.schedulerConfigFactory = createConfiguratorWithPodInformer(
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, context.stopCh)
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, plugins,
pluginConfig, context.stopCh)
var err error
@ -257,7 +267,8 @@ func initTest(t *testing.T, nsPrefix string) *testContext {
func initTestDisablePreemption(t *testing.T, nsPrefix string) *testContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), true, nil,
schedulerframework.NewRegistry(), true, time.Second)
schedulerframework.NewRegistry(), nil, []schedulerconfig.PluginConfig{},
true, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called

View File

@ -43,6 +43,7 @@ import (
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -882,7 +883,8 @@ func TestRescheduleProvisioning(t *testing.T) {
}
func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, resyncPeriod)
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil,
nil, []schedulerconfig.PluginConfig{}, false, resyncPeriod)
clientset := context.clientSet
ns := context.ns.Name