Merge pull request #70227 from bsalamat/reserve

Add plugin interfaces for reserve and prebind extension points of the scheduling framework
pull/564/head
Kubernetes Prow Robot 2018-12-01 05:25:12 -08:00 committed by GitHub
commit 2c322a2ff5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 949 additions and 48 deletions

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -96,6 +97,7 @@ filegroup(
"//pkg/scheduler/internal/cache:all-srcs",
"//pkg/scheduler/internal/queue:all-srcs",
"//pkg/scheduler/metrics:all-srcs",
"//pkg/scheduler/plugins:all-srcs",
"//pkg/scheduler/testing:all-srcs",
"//pkg/scheduler/util:all-srcs",
"//pkg/scheduler/volumebinder:all-srcs",

View File

@ -17,6 +17,7 @@ go_library(
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -51,6 +52,7 @@ go_test(
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",

View File

@ -516,6 +516,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
test.prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},

View File

@ -44,6 +44,7 @@ import (
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -103,6 +104,7 @@ type genericScheduler struct {
priorityMetaProducer algorithm.PriorityMetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
prioritizers []algorithm.PriorityConfig
pluginSet pluginsv1alpha1.PluginSet
extenders []algorithm.SchedulerExtender
lastNodeIndex uint64
alwaysCheckAllPredicates bool
@ -1152,6 +1154,7 @@ func NewGenericScheduler(
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.PriorityMetadataProducer,
pluginSet pluginsv1alpha1.PluginSet,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
@ -1168,6 +1171,7 @@ func NewGenericScheduler(
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
pluginSet: pluginSet,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
)
@ -135,6 +136,28 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str
return nil
}
// EmptyPluginSet is a test plugin set used by the default scheduler.
type EmptyPluginSet struct{}
var _ plugins.PluginSet = EmptyPluginSet{}
// ReservePlugins returns a slice of default reserve plugins.
func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin {
return []plugins.ReservePlugin{}
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return []plugins.PrebindPlugin{}
}
// Data returns a pointer to PluginData.
func (r EmptyPluginSet) Data() *plugins.PluginData {
return &plugins.PluginData{}
}
var emptyPluginSet = &EmptyPluginSet{}
func makeNodeList(nodeNames []string) []*v1.Node {
result := make([]*v1.Node, 0, len(nodeNames))
for _, nodeName := range nodeNames {
@ -454,6 +477,7 @@ func TestGenericScheduler(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
test.prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
[]algorithm.SchedulerExtender{},
nil,
pvcLister,
@ -490,6 +514,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap)
@ -1416,6 +1441,7 @@ func TestPreempt(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}},
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
extenders,
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
@ -1543,6 +1569,7 @@ func TestCacheInvalidationRace(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
nil, nil, pvcLister, pdbLister,
true, false,
schedulerapi.DefaultPercentageOfNodesToScore)
@ -1626,6 +1653,7 @@ func TestCacheInvalidationRace2(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
nil, nil, pvcLister, pdbLister, true, false,
schedulerapi.DefaultPercentageOfNodesToScore)

View File

@ -25,6 +25,8 @@ go_library(
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/plugins:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -63,6 +63,8 @@ import (
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/plugins"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -109,6 +111,8 @@ type Config struct {
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor
// PlugingSet has a set of plugins and data used to run them.
PluginSet pluginsv1alpha1.PluginSet
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
@ -202,6 +206,8 @@ type configFactory struct {
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelisters.StorageClassLister
// pluginRunner has a set of plugins and the context used for running them.
pluginSet pluginsv1alpha1.PluginSet
// Close this to stop all reflectors
StopEverything <-chan struct{}
@ -1225,6 +1231,9 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, err
}
// TODO(bsalamat): the default registrar should be able to process config files.
c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache)
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
@ -1239,6 +1248,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
c.pluginSet,
extenders,
c.volumeBinder,
c.pVCLister,
@ -1258,6 +1268,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
GetBinder: c.getBinderFunc(extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
PluginSet: c.pluginSet,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},

View File

@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["registrar.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/plugins",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/scheduler/plugins/examples:all-srcs",
"//pkg/scheduler/plugins/v1alpha1:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"multipoint.go",
"prebind.go",
"stateful.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/plugins/examples",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,62 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package examples
import (
"fmt"
"k8s.io/api/core/v1"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// MultipointCommunicatingPlugin is an example of a plugin that implements two
// extension points. It communicates through pluginContext with another function.
type MultipointCommunicatingPlugin struct{}
var _ = plugins.ReservePlugin(MultipointCommunicatingPlugin{})
// Name returns name of the plugin. It is used in logs, etc.
func (mc MultipointCommunicatingPlugin) Name() string {
return "multipoint-communicating-plugin"
}
// Reserve is the functions invoked by the framework at "reserve" extension point.
func (mc MultipointCommunicatingPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error {
if pod == nil {
return fmt.Errorf("pod cannot be nil")
}
if pod.Name == "my-test-pod" {
ps.Data().Ctx.SyncWrite(plugins.ContextKey(pod.Name), "never bind")
}
return nil
}
// Prebind is the functions invoked by the framework at "prebind" extension point.
func (mc MultipointCommunicatingPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
if pod == nil {
return false, fmt.Errorf("pod cannot be nil")
}
if v, e := ps.Data().Ctx.SyncRead(plugins.ContextKey(pod.Name)); e == nil && v == "never bind" {
return false, nil
}
return true, nil
}
// NewMultipointCommunicatingPlugin initializes a new plugin and returns it.
func NewMultipointCommunicatingPlugin() *MultipointCommunicatingPlugin {
return &MultipointCommunicatingPlugin{}
}

View File

@ -0,0 +1,48 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package examples
import (
"fmt"
"k8s.io/api/core/v1"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// StatelessPrebindExample is an example of a simple plugin that has no state
// and implements only one hook for prebind.
type StatelessPrebindExample struct{}
var _ = plugins.PrebindPlugin(StatelessPrebindExample{})
// Name returns name of the plugin. It is used in logs, etc.
func (sr StatelessPrebindExample) Name() string {
return "stateless-prebind-plugin-example"
}
// Prebind is the functions invoked by the framework at "prebind" extension point.
func (sr StatelessPrebindExample) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
if pod == nil {
return false, fmt.Errorf("pod cannot be nil")
}
return true, nil
}
// NewStatelessPrebindExample initializes a new plugin and returns it.
func NewStatelessPrebindExample() *StatelessPrebindExample {
return &StatelessPrebindExample{}
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package examples
import (
"fmt"
"k8s.io/klog"
"k8s.io/api/core/v1"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// StatefulMultipointExample is an example plugin that is executed at multiple extension points.
// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin)
// and changes its state when it is executed.
type StatefulMultipointExample struct {
mpState map[int]string
numRuns int
}
var _ = plugins.ReservePlugin(&StatefulMultipointExample{})
var _ = plugins.PrebindPlugin(&StatefulMultipointExample{})
// Name returns name of the plugin. It is used in logs, etc.
func (mp *StatefulMultipointExample) Name() string {
return "multipoint-plugin-example"
}
// Reserve is the functions invoked by the framework at "reserve" extension point.
func (mp *StatefulMultipointExample) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error {
mp.numRuns++
return nil
}
// Prebind is the functions invoked by the framework at "prebind" extension point.
func (mp *StatefulMultipointExample) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
mp.numRuns++
if pod == nil {
return false, fmt.Errorf("pod must not be nil")
}
return true, nil
}
// NewStatefulMultipointExample initializes a new plugin and returns it.
func NewStatefulMultipointExample(initState ...interface{}) *StatefulMultipointExample {
if len(initState) == 0 {
klog.Error("StatefulMultipointExample needs exactly one argument for initialization")
return nil
}
mp := StatefulMultipointExample{
mpState: initState[0].(map[int]string),
}
return &mp
}

View File

@ -0,0 +1,77 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugins
import (
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// DefaultPluginSet is the default plugin registrar used by the default scheduler.
type DefaultPluginSet struct {
data *plugins.PluginData
reservePlugins []plugins.ReservePlugin
prebindPlugins []plugins.PrebindPlugin
}
var _ = plugins.PluginSet(&DefaultPluginSet{})
// ReservePlugins returns a slice of default reserve plugins.
func (r *DefaultPluginSet) ReservePlugins() []plugins.ReservePlugin {
return r.reservePlugins
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r *DefaultPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return r.prebindPlugins
}
// Data returns a pointer to PluginData.
func (r *DefaultPluginSet) Data() *plugins.PluginData {
return r.data
}
// NewDefaultPluginSet initializes default plugin set and returns its pointer.
func NewDefaultPluginSet(ctx *plugins.PluginContext, schedulerCache *cache.Cache) *DefaultPluginSet {
defaultRegistrar := DefaultPluginSet{
data: &plugins.PluginData{
Ctx: ctx,
SchedulerCache: schedulerCache,
},
}
defaultRegistrar.registerReservePlugins()
defaultRegistrar.registerPrebindPlugins()
return &defaultRegistrar
}
func (r DefaultPluginSet) registerReservePlugins() {
r.reservePlugins = []plugins.ReservePlugin{
// Init functions of all reserve plugins go here. They are called in the
// same order that they are registered.
// Example:
// examples.NewStatefulMultipointExample(map[int]string{1: "test1", 2: "test2"}),
}
}
func (r DefaultPluginSet) registerPrebindPlugins() {
r.prebindPlugins = []plugins.PrebindPlugin{
// Init functions of all prebind plugins go here. They are called in the
// same order that they are registered.
// Example:
// examples.NewStatelessPrebindExample(),
}
}

View File

@ -0,0 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"context.go",
"interface.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,94 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"errors"
"sync"
)
const (
// NotFound is the not found error message.
NotFound = "not found"
)
// ContextData is a generic type for arbitrary data stored in PluginContext.
type ContextData interface{}
// ContextKey is the type of keys stored in PluginContext.
type ContextKey string
// PluginContext provides a mechanism for plugins to store and retrieve arbitrary data.
// ContextData stored by one plugin can be read, altered, or deleted by another plugin.
// PluginContext does not provide any data protection, as all plugins are assumed to be
// trusted.
type PluginContext struct {
Mx sync.RWMutex
storage map[ContextKey]ContextData
}
// NewPluginContext initializes a new PluginContext and returns its pointer.
func NewPluginContext() *PluginContext {
return &PluginContext{
storage: make(map[ContextKey]ContextData),
}
}
// Read retrieves data with the given "key" from PluginContext. If the key is not
// present an error is returned.
func (c *PluginContext) Read(key ContextKey) (ContextData, error) {
if v, ok := c.storage[key]; ok {
return v, nil
}
return nil, errors.New(NotFound)
}
// SyncRead is the thread safe version of Read(...).
func (c *PluginContext) SyncRead(key ContextKey) (ContextData, error) {
c.Mx.RLock()
defer c.Mx.RUnlock()
return c.Read(key)
}
// Write stores the given "val" in PluginContext with the given "key".
func (c *PluginContext) Write(key ContextKey, val ContextData) {
c.storage[key] = val
}
// SyncWrite is the thread safe version of Write(...).
func (c *PluginContext) SyncWrite(key ContextKey, val ContextData) {
c.Mx.Lock()
defer c.Mx.Unlock()
c.Write(key, val)
}
// Delete deletes data with the given key from PluginContext.
func (c *PluginContext) Delete(key ContextKey) {
delete(c.storage, key)
}
// SyncDelete is the thread safe version of Write(...).
func (c *PluginContext) SyncDelete(key ContextKey) {
c.Mx.Lock()
defer c.Mx.Unlock()
c.Delete(key)
}
// Reset removes all the information in the PluginContext.
func (c *PluginContext) Reset() {
c.storage = make(map[ContextKey]ContextData)
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file defines the scheduling framework plugin interfaces.
package v1alpha1
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
// PluginData carries information that plugins may need.
type PluginData struct {
Ctx *PluginContext
SchedulerCache *cache.Cache
// We may want to add the scheduling queue here too.
}
// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
}
// ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point, AKA "assume". These are meant to updated the state
// of the plugin. They do not return any value (other than error).
type ReservePlugin interface {
Plugin
// Reserve is called by the scheduling framework when the scheduler cache is
// updated.
Reserve(ps PluginSet, p *v1.Pod, nodeName string) error
}
// PrebindPlugin is an interface that must be implemented by "prebind" plugins.
// These plugins are called before a pod being scheduled
type PrebindPlugin interface {
Plugin
// Prebind is called before binding a pod. All prebind plugins must return
// or the pod will not be sent for binding.
Prebind(ps PluginSet, p *v1.Pod, nodeName string) (bool, error)
}
// PluginSet registers plugins used by the scheduling framework.
// The plugins registered are called at specified points in an scheduling cycle.
type PluginSet interface {
Data() *PluginData
ReservePlugins() []ReservePlugin
PrebindPlugins() []PrebindPlugin
}

View File

@ -17,11 +17,14 @@ limitations under the License.
package scheduler
import (
"errors"
"fmt"
"io/ioutil"
"os"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -44,13 +47,13 @@ import (
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog"
)
const (
// BindTimeoutSeconds defines the default bind timeout
BindTimeoutSeconds = 100
// SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
SchedulerError = "SchedulerError"
)
// Scheduler watches for new unscheduled pods. It attempts to find
@ -286,19 +289,26 @@ func (sched *Scheduler) Config() *factory.Config {
return sched.config
}
// recordFailedSchedulingEvent records an event for the pod that indicates the
// pod has failed to schedule.
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
sched.config.Error(pod, err)
sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
})
}
// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
pod = pod.DeepCopy()
sched.config.Error(pod, err)
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: err.Error(),
})
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
return "", err
}
return host, err
@ -362,14 +372,8 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
if err != nil {
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "SchedulerError",
Message: err.Error(),
})
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
}
// Invalidate ecache because assumed volumes could have affected the cached
// pvs for other pods
@ -387,9 +391,6 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo
// If binding errors, times out or gets undone, then an error will be returned to
// retry scheduling.
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
var reason string
var eventType string
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
@ -404,15 +405,7 @@ func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
// stale pod binding cache.
sched.config.VolumeBinder.DeletePodBindings(assumed)
reason = "VolumeBindingFailed"
eventType = v1.EventTypeWarning
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
})
sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error())
return err
}
@ -441,14 +434,8 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "SchedulerError",
Message: err.Error(),
})
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("AssumePod failed: %v", err))
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
@ -480,13 +467,8 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
}
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "BindingRejected",
})
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("Binding rejected: %v", err))
return err
}
@ -498,6 +480,12 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
plugins := sched.config.PluginSet
// Remove all plugin context data at the beginning of a scheduling cycle.
if plugins.Data().Ctx != nil {
plugins.Data().Ctx.Reset()
}
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
@ -554,6 +542,16 @@ func (sched *Scheduler) scheduleOne() {
return
}
// Run "reserve" plugins.
for _, pl := range plugins.ReservePlugins() {
if err := pl.Reserve(plugins, assumedPod, suggestedHost); err != nil {
klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
sched.recordSchedulingFailure(assumedPod, err, SchedulerError,
fmt.Sprintf("reserve plugin %v failed", pl.Name()))
metrics.PodScheduleErrors.Inc()
return
}
}
// assume modifies `assumedPod` by setting NodeName=suggestedHost
err = sched.assume(assumedPod, suggestedHost)
if err != nil {
@ -573,6 +571,30 @@ func (sched *Scheduler) scheduleOne() {
}
}
// Run "prebind" plugins.
for _, pl := range plugins.PrebindPlugins() {
approved, err := pl.Prebind(plugins, assumedPod, suggestedHost)
if err != nil {
approved = false
klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
metrics.PodScheduleErrors.Inc()
}
if !approved {
sched.Cache().ForgetPod(assumedPod)
var reason string
if err == nil {
msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name)
klog.V(4).Infof(msg)
err = errors.New(msg)
reason = v1.PodReasonUnschedulable
} else {
reason = SchedulerError
}
sched.recordSchedulingFailure(assumedPod, err, reason, err.Error())
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{

View File

@ -295,6 +295,7 @@ func TestScheduler(t *testing.T) {
NextPod: func() *v1.Pod {
return item.sendPod
},
PluginSet: &EmptyPluginSet{},
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
},
@ -643,6 +644,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyPriorityMetadataProducer,
&EmptyPluginSet{},
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -672,6 +674,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
PluginSet: &EmptyPluginSet{},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
},
}
@ -694,6 +697,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyPriorityMetadataProducer,
&EmptyPluginSet{},
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -727,6 +731,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
PluginSet: &EmptyPluginSet{},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
},
}

View File

@ -27,6 +27,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -89,3 +90,26 @@ func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*facto
func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) {
return fc.Config, nil
}
// EmptyPluginSet is the default plugin restirar used by the default scheduler.
type EmptyPluginSet struct{}
var _ = plugins.PluginSet(EmptyPluginSet{})
// ReservePlugins returns a slice of default reserve plugins.
func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin {
return []plugins.ReservePlugin{}
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return []plugins.PrebindPlugin{}
}
// Data returns a pointer to PluginData.
func (r EmptyPluginSet) Data() *plugins.PluginData {
return &plugins.PluginData{
Ctx: nil,
SchedulerCache: nil,
}
}

View File

@ -12,6 +12,7 @@ go_test(
srcs = [
"extender_test.go",
"main_test.go",
"plugin_test.go",
"predicates_test.go",
"preemption_test.go",
"priorities_test.go",
@ -37,6 +38,7 @@ go_test(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//plugin/pkg/admission/podtolerationrestriction:go_default_library",
@ -96,6 +98,7 @@ go_library(
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/util/taints:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",

View File

@ -0,0 +1,269 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
)
// StatefulMultipointExample is an example plugin that is executed at multiple extension points.
// This plugin is stateful. It receives arguments at initialization (NewMultipointPlugin)
// and changes its state when it is executed.
type TesterPlugin struct {
numReserveCalled int
numPrebindCalled int
failReserve bool
failPrebind bool
rejectPrebind bool
}
var _ = plugins.ReservePlugin(&TesterPlugin{})
var _ = plugins.PrebindPlugin(&TesterPlugin{})
// Name returns name of the plugin.
func (tp *TesterPlugin) Name() string {
return "tester-plugin"
}
// Reserve is a test function that returns an error or nil, depending on the
// value of "failReserve".
func (tp *TesterPlugin) Reserve(ps plugins.PluginSet, pod *v1.Pod, nodeName string) error {
tp.numReserveCalled++
if tp.failReserve {
return fmt.Errorf("injecting failure for pod %v", pod.Name)
}
return nil
}
// Prebind is a test function that returns (true, nil) or errors for testing.
func (tp *TesterPlugin) Prebind(ps plugins.PluginSet, pod *v1.Pod, nodeName string) (bool, error) {
var err error = nil
tp.numPrebindCalled++
if tp.failPrebind {
err = fmt.Errorf("injecting failure for pod %v", pod.Name)
}
if tp.rejectPrebind {
return false, err
}
return true, err
}
// TestPluginSet is a plugin set used for testing purposes.
type TestPluginSet struct {
data *plugins.PluginData
reservePlugins []plugins.ReservePlugin
prebindPlugins []plugins.PrebindPlugin
}
var _ = plugins.PluginSet(&TestPluginSet{})
// ReservePlugins returns a slice of default reserve plugins.
func (r *TestPluginSet) ReservePlugins() []plugins.ReservePlugin {
return r.reservePlugins
}
// PrebindPlugins returns a slice of default prebind plugins.
func (r *TestPluginSet) PrebindPlugins() []plugins.PrebindPlugin {
return r.prebindPlugins
}
// Data returns a pointer to PluginData.
func (r *TestPluginSet) Data() *plugins.PluginData {
return r.data
}
// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin set for testing. Register only a reserve plugin.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
reservePlugins: []plugins.ReservePlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "reserve-plugin", nil),
false, nil, testPluginSet, false, true, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
for _, fail := range []bool{false, true} {
testerPlugin.failReserve = fail
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Didn't expected the pod to be scheduled. error: %v", err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
}
if testerPlugin.numReserveCalled == 0 {
t.Errorf("Expected the reserve plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestPrebindPlugin tests invocation of prebind plugins.
func TestPrebindPlugin(t *testing.T) {
// Create a plugin set for testing. Register only a prebind plugin.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
prebindPlugins: []plugins.PrebindPlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prebind-plugin", nil),
false, nil, testPluginSet, false, true, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
fail bool
reject bool
}{
{
fail: false,
reject: false,
},
{
fail: true,
reject: false,
},
{
fail: false,
reject: true,
},
{
fail: true,
reject: true,
},
}
for i, test := range tests {
testerPlugin.failPrebind = test.fail
testerPlugin.rejectPrebind = test.reject
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if test.fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
}
} else {
if test.reject {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expected the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
}
}
if testerPlugin.numPrebindCalled == 0 {
t.Errorf("Expected the prebind plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestContextCleanup tests that data inserted in the pluginContext is removed
// after a scheduling cycle is over.
func TestContextCleanup(t *testing.T) {
// Create a plugin set for testing.
testerPlugin := &TesterPlugin{}
testPluginSet := &TestPluginSet{
data: &plugins.PluginData{
Ctx: plugins.NewPluginContext(),
},
reservePlugins: []plugins.ReservePlugin{testerPlugin},
prebindPlugins: []plugins.PrebindPlugin{testerPlugin},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "plugin-context-cleanup", nil),
false, nil, testPluginSet, false, true, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
// Insert something in the plugin context.
testPluginSet.Data().Ctx.Write("test", "foo")
// Create and schedule a best effort pod.
pod, err := runPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating or scheduling a test pod: %v", err)
}
// Make sure the data inserted in the plugin context is removed.
_, err = testPluginSet.Data().Ctx.Read("test")
if err == nil || err.Error() != plugins.NotFound {
t.Errorf("Expected the plugin context to be cleaned up after a scheduling cycle. error: %v", err)
}
cleanupPods(cs, t, []*v1.Pod{pod})
}

View File

@ -51,6 +51,7 @@ import (
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/test/integration/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -148,7 +149,7 @@ func initTestScheduler(
) *TestContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, false, true, time.Second)
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, true, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -158,6 +159,7 @@ func initTestSchedulerWithOptions(
context *TestContext,
setPodInformer bool,
policy *schedulerapi.Policy,
pluginSet plugins.PluginSet,
disablePreemption bool,
disableEquivalenceCache bool,
resyncPeriod time.Duration,
@ -205,6 +207,11 @@ func initTestSchedulerWithOptions(
controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
}
// Set pluginSet if provided. DefaultPluginSet is used if this is not specified.
if pluginSet != nil {
context.schedulerConfig.PluginSet = pluginSet
}
eventBroadcaster := record.NewBroadcaster()
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
@ -257,7 +264,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), true, nil, true, true, time.Second)
t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, true, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called
@ -605,6 +612,25 @@ func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.
}
}
// podSchedulingError returns a condition function that returns true if the given pod
// gets unschedulable status for reasons other than "Unschedulable". The scheduler
// records such reasons in case of error.
func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse &&
cond.Reason != v1.PodReasonUnschedulable, nil
}
}
// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
// an error if it does not scheduled within the given timeout.
func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {

View File

@ -890,7 +890,7 @@ func TestRescheduleProvisioning(t *testing.T) {
}
func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig {
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, false, disableEquivalenceCache, resyncPeriod)
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, disableEquivalenceCache, resyncPeriod)
clientset := context.clientSet
ns := context.ns.Name