diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 98e4bbc140..3b75dcec5c 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", + "//pkg/scheduler/plugins/v1alpha1:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -96,6 +97,7 @@ filegroup( "//pkg/scheduler/internal/cache:all-srcs", "//pkg/scheduler/internal/queue:all-srcs", "//pkg/scheduler/metrics:all-srcs", + "//pkg/scheduler/plugins:all-srcs", "//pkg/scheduler/testing:all-srcs", "//pkg/scheduler/util:all-srcs", "//pkg/scheduler/volumebinder:all-srcs", diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 7df943d3f7..a1b0df7104 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -17,6 +17,7 @@ go_library( "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", + "//pkg/scheduler/plugins/v1alpha1:go_default_library", "//pkg/scheduler/util:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -51,6 +52,7 @@ go_test( "//pkg/scheduler/core/equivalence:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", + "//pkg/scheduler/plugins/v1alpha1:go_default_library", "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index e71f8805d6..f1f17beb43 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -516,6 +516,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 8d5a73d146..874ca53360 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -44,6 +44,7 @@ import ( schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" + pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -103,6 +104,7 @@ type genericScheduler struct { priorityMetaProducer algorithm.PriorityMetadataProducer predicateMetaProducer algorithm.PredicateMetadataProducer prioritizers []algorithm.PriorityConfig + pluginSet pluginsv1alpha1.PluginSet extenders []algorithm.SchedulerExtender lastNodeIndex uint64 alwaysCheckAllPredicates bool @@ -1152,6 +1154,7 @@ func NewGenericScheduler( predicateMetaProducer algorithm.PredicateMetadataProducer, prioritizers []algorithm.PriorityConfig, priorityMetaProducer algorithm.PriorityMetadataProducer, + pluginSet pluginsv1alpha1.PluginSet, extenders []algorithm.SchedulerExtender, volumeBinder *volumebinder.VolumeBinder, pvcLister corelisters.PersistentVolumeClaimLister, @@ -1168,6 +1171,7 @@ func NewGenericScheduler( predicateMetaProducer: predicateMetaProducer, prioritizers: prioritizers, priorityMetaProducer: priorityMetaProducer, + pluginSet: pluginSet, extenders: extenders, cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), volumeBinder: volumeBinder, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index d0152a9fbb..2b66879692 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/core/equivalence" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -135,6 +136,28 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str return nil } +// EmptyPluginSet is a test plugin set used by the default scheduler. +type EmptyPluginSet struct{} + +var _ plugins.PluginSet = EmptyPluginSet{} + +// ReservePlugins returns a slice of default reserve plugins. +func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin { + return []plugins.ReservePlugin{} +} + +// PrebindPlugins returns a slice of default prebind plugins. +func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin { + return []plugins.PrebindPlugin{} +} + +// Data returns a pointer to PluginData. +func (r EmptyPluginSet) Data() *plugins.PluginData { + return &plugins.PluginData{} +} + +var emptyPluginSet = &EmptyPluginSet{} + func makeNodeList(nodeNames []string) []*v1.Node { result := make([]*v1.Node, 0, len(nodeNames)) for _, nodeName := range nodeNames { @@ -454,6 +477,7 @@ func TestGenericScheduler(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, []algorithm.SchedulerExtender{}, nil, pvcLister, @@ -490,6 +514,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, nil, nil, nil, nil, false, false, schedulerapi.DefaultPercentageOfNodesToScore) cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap) @@ -1416,6 +1441,7 @@ func TestPreempt(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, @@ -1543,6 +1569,7 @@ func TestCacheInvalidationRace(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, nil, nil, pvcLister, pdbLister, true, false, schedulerapi.DefaultPercentageOfNodesToScore) @@ -1626,6 +1653,7 @@ func TestCacheInvalidationRace2(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, nil, nil, pvcLister, pdbLister, true, false, schedulerapi.DefaultPercentageOfNodesToScore) diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 7be893b060..9a2e20ce29 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -25,6 +25,8 @@ go_library( "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", + "//pkg/scheduler/plugins:go_default_library", + "//pkg/scheduler/plugins/v1alpha1:go_default_library", "//pkg/scheduler/util:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index aa420bcb17..120806aef7 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -63,6 +63,8 @@ import ( schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + "k8s.io/kubernetes/pkg/scheduler/plugins" + pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -109,6 +111,8 @@ type Config struct { PodConditionUpdater PodConditionUpdater // PodPreemptor is used to evict pods and update pod annotations. PodPreemptor PodPreemptor + // PlugingSet has a set of plugins and data used to run them. + PluginSet pluginsv1alpha1.PluginSet // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -202,6 +206,8 @@ type configFactory struct { pdbLister policylisters.PodDisruptionBudgetLister // a means to list all StorageClasses storageClassLister storagelisters.StorageClassLister + // pluginRunner has a set of plugins and the context used for running them. + pluginSet pluginsv1alpha1.PluginSet // Close this to stop all reflectors StopEverything <-chan struct{} @@ -1225,6 +1231,9 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return nil, err } + // TODO(bsalamat): the default registrar should be able to process config files. + c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache) + // Init equivalence class cache if c.enableEquivalenceClassCache { c.equivalencePodCache = equivalence.NewCache(predicates.Ordering()) @@ -1239,6 +1248,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, predicateMetaProducer, priorityConfigs, priorityMetaProducer, + c.pluginSet, extenders, c.volumeBinder, c.pVCLister, @@ -1258,6 +1268,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, GetBinder: c.getBinderFunc(extenders), PodConditionUpdater: &podConditionUpdater{c.client}, PodPreemptor: &podPreemptor{c.client}, + PluginSet: c.pluginSet, WaitForCacheSync: func() bool { return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) }, diff --git a/pkg/scheduler/plugins/BUILD b/pkg/scheduler/plugins/BUILD new file mode 100644 index 0000000000..8de218f021 --- /dev/null +++ b/pkg/scheduler/plugins/BUILD @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["registrar.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/plugins", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/internal/cache:go_default_library", + "//pkg/scheduler/plugins/v1alpha1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/scheduler/plugins/examples:all-srcs", + "//pkg/scheduler/plugins/v1alpha1:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/plugins/examples/BUILD b/pkg/scheduler/plugins/examples/BUILD new file mode 100644 index 0000000000..bc26941c8e --- /dev/null +++ b/pkg/scheduler/plugins/examples/BUILD @@ -0,0 +1,31 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "multipoint.go", + "prebind.go", + "stateful.go", + ], + importpath = "k8s.io/kubernetes/pkg/scheduler/plugins/examples", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/plugins/v1alpha1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/plugins/examples/multipoint.go b/pkg/scheduler/plugins/examples/multipoint.go new file mode 100644 index 0000000000..3b82f21980 --- /dev/null +++ b/pkg/scheduler/plugins/examples/multipoint.go @@ -0,0 +1,62 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package examples + +import ( + "fmt" + + "k8s.io/api/core/v1" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" +) + +// MultipointCommunicatingPlugin is an example of a plugin that implements two +// extension points. It communicates through pluginContext with another function. +type MultipointCommunicatingPlugin struct{} + +var _ = plugins.ReservePlugin(MultipointCommunicatingPlugin{}) + +// Name returns name of the plugin. It is used in logs, etc. +func (mc MultipointCommunicatingPlugin) Name() string { + return "multipoint-communicating-plugin" +} + +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (mc MultipointCommunicatingPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error { + if pod == nil { + return fmt.Errorf("pod cannot be nil") + } + if pod.Name == "my-test-pod" { + ps.Data().Ctx.SyncWrite(plugins.ContextKey(pod.Name), "never bind") + } + return nil +} + +// Prebind is the functions invoked by the framework at "prebind" extension point. +func (mc MultipointCommunicatingPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) { + if pod == nil { + return false, fmt.Errorf("pod cannot be nil") + } + if v, e := ps.Data().Ctx.SyncRead(plugins.ContextKey(pod.Name)); e == nil && v == "never bind" { + return false, nil + } + return true, nil +} + +// NewMultipointCommunicatingPlugin initializes a new plugin and returns it. +func NewMultipointCommunicatingPlugin() *MultipointCommunicatingPlugin { + return &MultipointCommunicatingPlugin{} +} diff --git a/pkg/scheduler/plugins/examples/prebind.go b/pkg/scheduler/plugins/examples/prebind.go new file mode 100644 index 0000000000..13e71eb0bd --- /dev/null +++ b/pkg/scheduler/plugins/examples/prebind.go @@ -0,0 +1,48 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package examples + +import ( + "fmt" + + "k8s.io/api/core/v1" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" +) + +// StatelessPrebindExample is an example of a simple plugin that has no state +// and implements only one hook for prebind. +type StatelessPrebindExample struct{} + +var _ = plugins.PrebindPlugin(StatelessPrebindExample{}) + +// Name returns name of the plugin. It is used in logs, etc. +func (sr StatelessPrebindExample) Name() string { + return "stateless-prebind-plugin-example" +} + +// Prebind is the functions invoked by the framework at "prebind" extension point. +func (sr StatelessPrebindExample) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) { + if pod == nil { + return false, fmt.Errorf("pod cannot be nil") + } + return true, nil +} + +// NewStatelessPrebindExample initializes a new plugin and returns it. +func NewStatelessPrebindExample() *StatelessPrebindExample { + return &StatelessPrebindExample{} +} diff --git a/pkg/scheduler/plugins/examples/stateful.go b/pkg/scheduler/plugins/examples/stateful.go new file mode 100644 index 0000000000..2b8b210305 --- /dev/null +++ b/pkg/scheduler/plugins/examples/stateful.go @@ -0,0 +1,68 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package examples + +import ( + "fmt" + "k8s.io/klog" + + "k8s.io/api/core/v1" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" +) + +// StatefulMultipointExample is an example plugin that is executed at multiple extension points. +// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin) +// and changes its state when it is executed. +type StatefulMultipointExample struct { + mpState map[int]string + numRuns int +} + +var _ = plugins.ReservePlugin(&StatefulMultipointExample{}) +var _ = plugins.PrebindPlugin(&StatefulMultipointExample{}) + +// Name returns name of the plugin. It is used in logs, etc. +func (mp *StatefulMultipointExample) Name() string { + return "multipoint-plugin-example" +} + +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (mp *StatefulMultipointExample) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error { + mp.numRuns++ + return nil +} + +// Prebind is the functions invoked by the framework at "prebind" extension point. +func (mp *StatefulMultipointExample) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) { + mp.numRuns++ + if pod == nil { + return false, fmt.Errorf("pod must not be nil") + } + return true, nil +} + +// NewStatefulMultipointExample initializes a new plugin and returns it. +func NewStatefulMultipointExample(initState ...interface{}) *StatefulMultipointExample { + if len(initState) == 0 { + klog.Error("StatefulMultipointExample needs exactly one argument for initialization") + return nil + } + mp := StatefulMultipointExample{ + mpState: initState[0].(map[int]string), + } + return &mp +} diff --git a/pkg/scheduler/plugins/registrar.go b/pkg/scheduler/plugins/registrar.go new file mode 100644 index 0000000000..4eab86ecc1 --- /dev/null +++ b/pkg/scheduler/plugins/registrar.go @@ -0,0 +1,77 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "k8s.io/kubernetes/pkg/scheduler/internal/cache" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" +) + +// DefaultPluginSet is the default plugin registrar used by the default scheduler. +type DefaultPluginSet struct { + data *plugins.PluginData + reservePlugins []plugins.ReservePlugin + prebindPlugins []plugins.PrebindPlugin +} + +var _ = plugins.PluginSet(&DefaultPluginSet{}) + +// ReservePlugins returns a slice of default reserve plugins. +func (r *DefaultPluginSet) ReservePlugins() []plugins.ReservePlugin { + return r.reservePlugins +} + +// PrebindPlugins returns a slice of default prebind plugins. +func (r *DefaultPluginSet) PrebindPlugins() []plugins.PrebindPlugin { + return r.prebindPlugins +} + +// Data returns a pointer to PluginData. +func (r *DefaultPluginSet) Data() *plugins.PluginData { + return r.data +} + +// NewDefaultPluginSet initializes default plugin set and returns its pointer. +func NewDefaultPluginSet(ctx *plugins.PluginContext, schedulerCache *cache.Cache) *DefaultPluginSet { + defaultRegistrar := DefaultPluginSet{ + data: &plugins.PluginData{ + Ctx: ctx, + SchedulerCache: schedulerCache, + }, + } + defaultRegistrar.registerReservePlugins() + defaultRegistrar.registerPrebindPlugins() + return &defaultRegistrar +} + +func (r DefaultPluginSet) registerReservePlugins() { + r.reservePlugins = []plugins.ReservePlugin{ + // Init functions of all reserve plugins go here. They are called in the + // same order that they are registered. + // Example: + // examples.NewStatefulMultipointExample(map[int]string{1: "test1", 2: "test2"}), + } +} + +func (r DefaultPluginSet) registerPrebindPlugins() { + r.prebindPlugins = []plugins.PrebindPlugin{ + // Init functions of all prebind plugins go here. They are called in the + // same order that they are registered. + // Example: + // examples.NewStatelessPrebindExample(), + } +} diff --git a/pkg/scheduler/plugins/v1alpha1/BUILD b/pkg/scheduler/plugins/v1alpha1/BUILD new file mode 100644 index 0000000000..619bd4685a --- /dev/null +++ b/pkg/scheduler/plugins/v1alpha1/BUILD @@ -0,0 +1,29 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "context.go", + "interface.go", + ], + importpath = "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/internal/cache:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/plugins/v1alpha1/context.go b/pkg/scheduler/plugins/v1alpha1/context.go new file mode 100644 index 0000000000..0631b5f0d8 --- /dev/null +++ b/pkg/scheduler/plugins/v1alpha1/context.go @@ -0,0 +1,94 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "errors" + "sync" +) + +const ( + // NotFound is the not found error message. + NotFound = "not found" +) + +// ContextData is a generic type for arbitrary data stored in PluginContext. +type ContextData interface{} + +// ContextKey is the type of keys stored in PluginContext. +type ContextKey string + +// PluginContext provides a mechanism for plugins to store and retrieve arbitrary data. +// ContextData stored by one plugin can be read, altered, or deleted by another plugin. +// PluginContext does not provide any data protection, as all plugins are assumed to be +// trusted. +type PluginContext struct { + Mx sync.RWMutex + storage map[ContextKey]ContextData +} + +// NewPluginContext initializes a new PluginContext and returns its pointer. +func NewPluginContext() *PluginContext { + return &PluginContext{ + storage: make(map[ContextKey]ContextData), + } +} + +// Read retrieves data with the given "key" from PluginContext. If the key is not +// present an error is returned. +func (c *PluginContext) Read(key ContextKey) (ContextData, error) { + if v, ok := c.storage[key]; ok { + return v, nil + } + return nil, errors.New(NotFound) +} + +// SyncRead is the thread safe version of Read(...). +func (c *PluginContext) SyncRead(key ContextKey) (ContextData, error) { + c.Mx.RLock() + defer c.Mx.RUnlock() + return c.Read(key) +} + +// Write stores the given "val" in PluginContext with the given "key". +func (c *PluginContext) Write(key ContextKey, val ContextData) { + c.storage[key] = val +} + +// SyncWrite is the thread safe version of Write(...). +func (c *PluginContext) SyncWrite(key ContextKey, val ContextData) { + c.Mx.Lock() + defer c.Mx.Unlock() + c.Write(key, val) +} + +// Delete deletes data with the given key from PluginContext. +func (c *PluginContext) Delete(key ContextKey) { + delete(c.storage, key) +} + +// SyncDelete is the thread safe version of Write(...). +func (c *PluginContext) SyncDelete(key ContextKey) { + c.Mx.Lock() + defer c.Mx.Unlock() + c.Delete(key) +} + +// Reset removes all the information in the PluginContext. +func (c *PluginContext) Reset() { + c.storage = make(map[ContextKey]ContextData) +} diff --git a/pkg/scheduler/plugins/v1alpha1/interface.go b/pkg/scheduler/plugins/v1alpha1/interface.go new file mode 100644 index 0000000000..0d0c90b43e --- /dev/null +++ b/pkg/scheduler/plugins/v1alpha1/interface.go @@ -0,0 +1,63 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file defines the scheduling framework plugin interfaces. + +package v1alpha1 + +import ( + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/internal/cache" +) + +// PluginData carries information that plugins may need. +type PluginData struct { + Ctx *PluginContext + SchedulerCache *cache.Cache + // We may want to add the scheduling queue here too. +} + +// Plugin is the parent type for all the scheduling framework plugins. +type Plugin interface { + Name() string +} + +// ReservePlugin is an interface for Reserve plugins. These plugins are called +// at the reservation point, AKA "assume". These are meant to updated the state +// of the plugin. They do not return any value (other than error). +type ReservePlugin interface { + Plugin + // Reserve is called by the scheduling framework when the scheduler cache is + // updated. + Reserve(ps PluginSet, p *v1.Pod, nodeName string) error +} + +// PrebindPlugin is an interface that must be implemented by "prebind" plugins. +// These plugins are called before a pod being scheduled +type PrebindPlugin interface { + Plugin + // Prebind is called before binding a pod. All prebind plugins must return + // or the pod will not be sent for binding. + Prebind(ps PluginSet, p *v1.Pod, nodeName string) (bool, error) +} + +// PluginSet registers plugins used by the scheduling framework. +// The plugins registered are called at specified points in an scheduling cycle. +type PluginSet interface { + Data() *PluginData + ReservePlugins() []ReservePlugin + PrebindPlugins() []PrebindPlugin +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0237036b1c..8d63a99795 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -17,11 +17,14 @@ limitations under the License. package scheduler import ( + "errors" "fmt" "io/ioutil" "os" "time" + "k8s.io/klog" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -44,13 +47,13 @@ import ( schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" - - "k8s.io/klog" ) const ( // BindTimeoutSeconds defines the default bind timeout BindTimeoutSeconds = 100 + // SchedulerError is the reason recorded for events when an error occurs during scheduling a pod. + SchedulerError = "SchedulerError" ) // Scheduler watches for new unscheduled pods. It attempts to find @@ -286,19 +289,26 @@ func (sched *Scheduler) Config() *factory.Config { return sched.config } +// recordFailedSchedulingEvent records an event for the pod that indicates the +// pod has failed to schedule. +// NOTE: This function modifies "pod". "pod" should be copied before being passed. +func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) { + sched.config.Error(pod, err) + sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message) + sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: reason, + Message: err.Error(), + }) +} + // schedule implements the scheduling algorithm and returns the suggested host. func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) { host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister) if err != nil { pod = pod.DeepCopy() - sched.config.Error(pod, err) - sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err) - sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: err.Error(), - }) + sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error()) return "", err } return host, err @@ -362,14 +372,8 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) if err != nil { - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "SchedulerError", - Message: err.Error(), - }) + sched.recordSchedulingFailure(assumed, err, SchedulerError, + fmt.Sprintf("AssumePodVolumes failed: %v", err)) } // Invalidate ecache because assumed volumes could have affected the cached // pvs for other pods @@ -387,9 +391,6 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo // If binding errors, times out or gets undone, then an error will be returned to // retry scheduling. func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { - var reason string - var eventType string - klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed) if err != nil { @@ -404,15 +405,7 @@ func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { // stale pod binding cache. sched.config.VolumeBinder.DeletePodBindings(assumed) - reason = "VolumeBindingFailed" - eventType = v1.EventTypeWarning - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: reason, - }) + sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error()) return err } @@ -441,14 +434,8 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "SchedulerError", - Message: err.Error(), - }) + sched.recordSchedulingFailure(assumed, err, SchedulerError, + fmt.Sprintf("AssumePod failed: %v", err)) return err } // if "assumed" is a nominated pod, we should remove it from internal cache @@ -480,13 +467,8 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", err) } - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "BindingRejected", - }) + sched.recordSchedulingFailure(assumed, err, SchedulerError, + fmt.Sprintf("Binding rejected: %v", err)) return err } @@ -498,6 +480,12 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne() { + plugins := sched.config.PluginSet + // Remove all plugin context data at the beginning of a scheduling cycle. + if plugins.Data().Ctx != nil { + plugins.Data().Ctx.Reset() + } + pod := sched.config.NextPod() // pod could be nil when schedulerQueue is closed if pod == nil { @@ -554,6 +542,16 @@ func (sched *Scheduler) scheduleOne() { return } + // Run "reserve" plugins. + for _, pl := range plugins.ReservePlugins() { + if err := pl.Reserve(plugins, assumedPod, suggestedHost); err != nil { + klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err) + sched.recordSchedulingFailure(assumedPod, err, SchedulerError, + fmt.Sprintf("reserve plugin %v failed", pl.Name())) + metrics.PodScheduleErrors.Inc() + return + } + } // assume modifies `assumedPod` by setting NodeName=suggestedHost err = sched.assume(assumedPod, suggestedHost) if err != nil { @@ -573,6 +571,30 @@ func (sched *Scheduler) scheduleOne() { } } + // Run "prebind" plugins. + for _, pl := range plugins.PrebindPlugins() { + approved, err := pl.Prebind(plugins, assumedPod, suggestedHost) + if err != nil { + approved = false + klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err) + metrics.PodScheduleErrors.Inc() + } + if !approved { + sched.Cache().ForgetPod(assumedPod) + var reason string + if err == nil { + msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name) + klog.V(4).Infof(msg) + err = errors.New(msg) + reason = v1.PodReasonUnschedulable + } else { + reason = SchedulerError + } + sched.recordSchedulingFailure(assumedPod, err, reason, err.Error()) + return + } + } + err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 4dac1a7bd2..0f2ea0b11f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -295,6 +295,7 @@ func TestScheduler(t *testing.T) { NextPod: func() *v1.Pod { return item.sendPod }, + PluginSet: &EmptyPluginSet{}, Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}), VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), }, @@ -643,6 +644,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{}, algorithm.EmptyPriorityMetadataProducer, + &EmptyPluginSet{}, []algorithm.SchedulerExtender{}, nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), @@ -672,6 +674,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern Recorder: &record.FakeRecorder{}, PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, + PluginSet: &EmptyPluginSet{}, VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), }, } @@ -694,6 +697,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{}, algorithm.EmptyPriorityMetadataProducer, + &EmptyPluginSet{}, []algorithm.SchedulerExtender{}, nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), @@ -727,6 +731,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, StopEverything: stop, + PluginSet: &EmptyPluginSet{}, VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), }, } diff --git a/pkg/scheduler/testutil.go b/pkg/scheduler/testutil.go index 0c101a5457..b495fdefa1 100644 --- a/pkg/scheduler/testutil.go +++ b/pkg/scheduler/testutil.go @@ -27,6 +27,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/factory" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -89,3 +90,26 @@ func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*facto func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) { return fc.Config, nil } + +// EmptyPluginSet is the default plugin restirar used by the default scheduler. +type EmptyPluginSet struct{} + +var _ = plugins.PluginSet(EmptyPluginSet{}) + +// ReservePlugins returns a slice of default reserve plugins. +func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin { + return []plugins.ReservePlugin{} +} + +// PrebindPlugins returns a slice of default prebind plugins. +func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin { + return []plugins.PrebindPlugin{} +} + +// Data returns a pointer to PluginData. +func (r EmptyPluginSet) Data() *plugins.PluginData { + return &plugins.PluginData{ + Ctx: nil, + SchedulerCache: nil, + } +} diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index bce32d144b..f6124c23ca 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -12,6 +12,7 @@ go_test( srcs = [ "extender_test.go", "main_test.go", + "plugin_test.go", "predicates_test.go", "preemption_test.go", "priorities_test.go", @@ -37,6 +38,7 @@ go_test( "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/factory:go_default_library", + "//pkg/scheduler/plugins/v1alpha1:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", "//plugin/pkg/admission/podtolerationrestriction:go_default_library", @@ -96,6 +98,7 @@ go_library( "//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/factory:go_default_library", + "//pkg/scheduler/plugins/v1alpha1:go_default_library", "//pkg/util/taints:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", diff --git a/test/integration/scheduler/plugin_test.go b/test/integration/scheduler/plugin_test.go new file mode 100644 index 0000000000..1fdf513d01 --- /dev/null +++ b/test/integration/scheduler/plugin_test.go @@ -0,0 +1,269 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "testing" + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" +) + +// StatefulMultipointExample is an example plugin that is executed at multiple extension points. +// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin) +// and changes its state when it is executed. +type TesterPlugin struct { + numReserveCalled int + numPrebindCalled int + failReserve bool + failPrebind bool + rejectPrebind bool +} + +var _ = plugins.ReservePlugin(&TesterPlugin{}) +var _ = plugins.PrebindPlugin(&TesterPlugin{}) + +// Name returns name of the plugin. +func (tp *TesterPlugin) Name() string { + return "tester-plugin" +} + +// Reserve is a test function that returns an error or nil, depending on the +// value of "failReserve". +func (tp *TesterPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error { + tp.numReserveCalled++ + if tp.failReserve { + return fmt.Errorf("injecting failure for pod %v", pod.Name) + } + return nil +} + +// Prebind is a test function that returns (true, nil) or errors for testing. +func (tp *TesterPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) { + var err error = nil + tp.numPrebindCalled++ + if tp.failPrebind { + err = fmt.Errorf("injecting failure for pod %v", pod.Name) + } + if tp.rejectPrebind { + return false, err + } + return true, err +} + +// TestPluginSet is a plugin set used for testing purposes. +type TestPluginSet struct { + data *plugins.PluginData + reservePlugins []plugins.ReservePlugin + prebindPlugins []plugins.PrebindPlugin +} + +var _ = plugins.PluginSet(&TestPluginSet{}) + +// ReservePlugins returns a slice of default reserve plugins. +func (r *TestPluginSet) ReservePlugins() []plugins.ReservePlugin { + return r.reservePlugins +} + +// PrebindPlugins returns a slice of default prebind plugins. +func (r *TestPluginSet) PrebindPlugins() []plugins.PrebindPlugin { + return r.prebindPlugins +} + +// Data returns a pointer to PluginData. +func (r *TestPluginSet) Data() *plugins.PluginData { + return r.data +} + +// TestReservePlugin tests invocation of reserve plugins. +func TestReservePlugin(t *testing.T) { + // Create a plugin set for testing. Register only a reserve plugin. + testerPlugin := &TesterPlugin{} + testPluginSet := &TestPluginSet{ + data: &plugins.PluginData{ + Ctx: plugins.NewPluginContext(), + }, + reservePlugins: []plugins.ReservePlugin{testerPlugin}, + } + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "reserve-plugin", nil), + false, nil, testPluginSet, false, true, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + for _, fail := range []bool{false, true} { + testerPlugin.failReserve = fail + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if fail { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { + t.Errorf("Didn't expected the pod to be scheduled. error: %v", err) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("Expected the pod to be scheduled. error: %v", err) + } + } + + if testerPlugin.numReserveCalled == 0 { + t.Errorf("Expected the reserve plugin to be called.") + } + + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} + +// TestPrebindPlugin tests invocation of prebind plugins. +func TestPrebindPlugin(t *testing.T) { + // Create a plugin set for testing. Register only a prebind plugin. + testerPlugin := &TesterPlugin{} + testPluginSet := &TestPluginSet{ + data: &plugins.PluginData{ + Ctx: plugins.NewPluginContext(), + }, + prebindPlugins: []plugins.PrebindPlugin{testerPlugin}, + } + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "prebind-plugin", nil), + false, nil, testPluginSet, false, true, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + tests := []struct { + fail bool + reject bool + }{ + { + fail: false, + reject: false, + }, + { + fail: true, + reject: false, + }, + { + fail: false, + reject: true, + }, + { + fail: true, + reject: true, + }, + } + + for i, test := range tests { + testerPlugin.failPrebind = test.fail + testerPlugin.rejectPrebind = test.reject + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if test.fail { + if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { + t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err) + } + } else { + if test.reject { + if err = waitForPodUnschedulable(cs, pod); err != nil { + t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err) + } + } + } + + if testerPlugin.numPrebindCalled == 0 { + t.Errorf("Expected the prebind plugin to be called.") + } + + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} + +// TestContextCleanup tests that data inserted in the pluginContext is removed +// after a scheduling cycle is over. +func TestContextCleanup(t *testing.T) { + // Create a plugin set for testing. + testerPlugin := &TesterPlugin{} + testPluginSet := &TestPluginSet{ + data: &plugins.PluginData{ + Ctx: plugins.NewPluginContext(), + }, + reservePlugins: []plugins.ReservePlugin{testerPlugin}, + prebindPlugins: []plugins.PrebindPlugin{testerPlugin}, + } + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "plugin-context-cleanup", nil), + false, nil, testPluginSet, false, true, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + // Insert something in the plugin context. + testPluginSet.Data().Ctx.Write("test", "foo") + + // Create and schedule a best effort pod. + pod, err := runPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating or scheduling a test pod: %v", err) + } + + // Make sure the data inserted in the plugin context is removed. + _, err = testPluginSet.Data().Ctx.Read("test") + if err == nil || err.Error() != plugins.NotFound { + t.Errorf("Expected the plugin context to be cleaned up after a scheduling cycle. error: %v", err) + } + + cleanupPods(cs, t, []*v1.Pod{pod}) +} diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 71875405ee..f993d8342b 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -51,6 +51,7 @@ import ( _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/factory" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" taintutils "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/test/integration/framework" imageutils "k8s.io/kubernetes/test/utils/image" @@ -148,7 +149,7 @@ func initTestScheduler( ) *TestContext { // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority // feature gate is enabled at the same time. - return initTestSchedulerWithOptions(t, context, setPodInformer, policy, false, true, time.Second) + return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, true, time.Second) } // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -158,6 +159,7 @@ func initTestSchedulerWithOptions( context *TestContext, setPodInformer bool, policy *schedulerapi.Policy, + pluginSet plugins.PluginSet, disablePreemption bool, disableEquivalenceCache bool, resyncPeriod time.Duration, @@ -205,6 +207,11 @@ func initTestSchedulerWithOptions( controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) } + // Set pluginSet if provided. DefaultPluginSet is used if this is not specified. + if pluginSet != nil { + context.schedulerConfig.PluginSet = pluginSet + } + eventBroadcaster := record.NewBroadcaster() context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder( legacyscheme.Scheme, @@ -257,7 +264,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { // configuration but with pod preemption disabled. func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { return initTestSchedulerWithOptions( - t, initTestMaster(t, nsPrefix, nil), true, nil, true, true, time.Second) + t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, true, time.Second) } // cleanupTest deletes the scheduler and the test namespace. It should be called @@ -605,6 +612,25 @@ func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait. } } +// podSchedulingError returns a condition function that returns true if the given pod +// gets unschedulable status for reasons other than "Unschedulable". The scheduler +// records such reasons in case of error. +func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return false, nil + } + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + return cond != nil && cond.Status == v1.ConditionFalse && + cond.Reason != v1.PodReasonUnschedulable, nil + } +} + // waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns // an error if it does not scheduled within the given timeout. func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { diff --git a/test/integration/scheduler/volume_binding_test.go b/test/integration/scheduler/volume_binding_test.go index 114e23ae67..7fa38f7b75 100644 --- a/test/integration/scheduler/volume_binding_test.go +++ b/test/integration/scheduler/volume_binding_test.go @@ -890,7 +890,7 @@ func TestRescheduleProvisioning(t *testing.T) { } func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig { - context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, false, disableEquivalenceCache, resyncPeriod) + context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, disableEquivalenceCache, resyncPeriod) clientset := context.clientSet ns := context.ns.Name