Merge pull request #77559 from ahg-g/permit-extension-point

Implement the permit extension point in scheduler.
k3s-v1.15.3
Kubernetes Prow Robot 2019-05-13 16:59:08 -07:00 committed by GitHub
commit c54b664b1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 496 additions and 8 deletions

View File

@ -7,6 +7,7 @@ go_library(
"framework.go",
"interface.go",
"registry.go",
"waiting_pods_map.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1",
visibility = ["//visibility:public"],
@ -14,6 +15,7 @@ go_library(
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -18,9 +18,11 @@ package v1alpha1
import (
"fmt"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
@ -30,12 +32,19 @@ import (
type framework struct {
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
}
const (
// Specifies the maximum timeout a permit plugin can return.
maxTimeout time.Duration = 15 * time.Minute
)
var _ = Framework(&framework{})
// NewFramework initializes plugins given the configuration and the registry.
@ -44,6 +53,7 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
registry: r,
nodeInfoSnapshot: cache.NewNodeInfoSnapshot(),
plugins: make(map[string]Plugin),
waitingPods: newWaitingPodsMap(),
}
// TODO: The framework needs to read the scheduler config and initialize only
@ -68,6 +78,9 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
if up, ok := p.(UnreservePlugin); ok {
f.unreservePlugins = append(f.unreservePlugins, up)
}
if pr, ok := p.(PermitPlugin); ok {
f.permitPlugins = append(f.permitPlugins, pr)
}
}
return f, nil
}
@ -117,6 +130,69 @@ func (f *framework) RunUnreservePlugins(
}
}
// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will block for the timeout period
// returned by the plugin, if the time expires, then it will return an error.
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
func (f *framework) RunPermitPlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
timeout := maxTimeout
statusCode := Success
for _, pl := range f.permitPlugins {
status, d := pl.Permit(pc, pod, nodeName)
if !status.IsSuccess() {
if status.Code() == Unschedulable {
msg := fmt.Sprintf("rejected by %v at permit: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
if status.Code() == Wait {
// Use the minimum timeout duration.
if timeout > d {
timeout = d
}
statusCode = Wait
} else {
msg := fmt.Sprintf("error while running %v permit plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}
// We now wait for the minimum duration if at least one plugin asked to
// wait (and no plugin rejected the pod)
if statusCode == Wait {
w := newWaitingPod(pod)
f.waitingPods.add(w)
defer f.waitingPods.remove(pod.UID)
timer := time.NewTimer(timeout)
klog.V(4).Infof("waiting for %v for pod %v at permit", timeout, pod.Name)
select {
case <-timer.C:
msg := fmt.Sprintf("pod %v rejected due to timeout after waiting %v at permit", pod.Name, timeout)
klog.V(4).Infof(msg)
return NewStatus(Unschedulable, msg)
case s := <-w.s:
if !s.IsSuccess() {
if s.Code() == Unschedulable {
msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting at permit for pod %v: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}
return nil
}
// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until a
// pod finishes "Reserve". There is no guarantee that the information remains
@ -124,3 +200,13 @@ func (f *framework) RunUnreservePlugins(
func (f *framework) NodeInfoSnapshot() *cache.NodeInfoSnapshot {
return f.nodeInfoSnapshot
}
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
f.waitingPods.iterate(callback)
}
// GetWaitingPod returns a reference to a WaitingPod given its UID.
func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
return f.waitingPods.get(uid)
}

View File

@ -20,8 +20,10 @@ package v1alpha1
import (
"errors"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
@ -38,6 +40,8 @@ const (
// Unschedulable is used when a plugin finds a pod unschedulable.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable Code = 2
// Wait is used when a permit plugin finds a pod scheduling should wait.
Wait Code = 3
)
// Status indicates the result of running a plugin. It consists of a code and a
@ -86,6 +90,18 @@ func NewStatus(code Code, msg string) *Status {
}
}
// WaitingPod represents a pod currently waiting in the permit phase.
type WaitingPod interface {
// GetPod returns a reference to the waiting pod.
GetPod() *v1.Pod
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
// successfully delivered, false otherwise.
Allow() bool
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
// was successfully delivered, false otherwise.
Reject(msg string) bool
}
// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
Name() string
@ -105,7 +121,7 @@ type ReservePlugin interface {
}
// PrebindPlugin is an interface that must be implemented by "prebind" plugins.
// These plugins are called before a pod being scheduled
// These plugins are called before a pod being scheduled.
type PrebindPlugin interface {
Plugin
// Prebind is called before binding a pod. All prebind plugins must return
@ -124,6 +140,19 @@ type UnreservePlugin interface {
Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)
}
// PermitPlugin is an interface that must be implemented by "permit" plugins.
// These plugins are called before a pod is bound to a node.
type PermitPlugin interface {
Plugin
// Permit is called before binding a pod (and before prebind plugins). Permit
// plugins are used to prevent or delay the binding of a Pod. A permit plugin
// must return success or wait with timeout duration, or the pod will be rejected.
// The pod will also be rejected if the wait timeout or the pod is rejected while
// waiting. Note that if the plugin returns "wait", the framework will wait only
// after running the remaining plugins given that no other plugin rejects the pod.
Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration)
}
// Framework manages the set of plugins in use by the scheduling framework.
// Configured plugins are called at specified points in a scheduling context.
type Framework interface {
@ -142,6 +171,15 @@ type Framework interface {
// RunUnreservePlugins runs the set of configured unreserve plugins.
RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will block for the timeout period
// returned by the plugin, if the time expires, then it will return an error.
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
RunPermitPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
}
// FrameworkHandle provides data and some tools that plugins can use. It is
@ -153,4 +191,10 @@ type FrameworkHandle interface {
// a pod finishes "Reserve" point. There is no guarantee that the information
// remains unchanged in the binding phase of scheduling.
NodeInfoSnapshot() *internalcache.NodeInfoSnapshot
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
IterateOverWaitingPods(callback func(WaitingPod))
// GetWaitingPod returns a waiting pod given its UID.
GetWaitingPod(uid types.UID) WaitingPod
}

View File

@ -0,0 +1,109 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
// waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase.
type waitingPodsMap struct {
pods map[types.UID]WaitingPod
mu sync.RWMutex
}
// newWaitingPodsMap returns a new waitingPodsMap.
func newWaitingPodsMap() *waitingPodsMap {
return &waitingPodsMap{
pods: make(map[types.UID]WaitingPod),
}
}
// add a new WaitingPod to the map.
func (m *waitingPodsMap) add(wp WaitingPod) {
m.mu.Lock()
defer m.mu.Unlock()
m.pods[wp.GetPod().UID] = wp
}
// remove a WaitingPod from the map.
func (m *waitingPodsMap) remove(uid types.UID) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.pods, uid)
}
// get a WaitingPod from the map.
func (m *waitingPodsMap) get(uid types.UID) WaitingPod {
m.mu.RLock()
defer m.mu.RUnlock()
return m.pods[uid]
}
// iterate acquires a read lock and iterates over the WaitingPods map.
func (m *waitingPodsMap) iterate(callback func(WaitingPod)) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, v := range m.pods {
callback(v)
}
}
// waitingPod represents a pod waiting in the permit phase.
type waitingPod struct {
pod *v1.Pod
s chan *Status
}
// newWaitingPod returns a new waitingPod instance.
func newWaitingPod(pod *v1.Pod) *waitingPod {
return &waitingPod{
pod: pod,
s: make(chan *Status),
}
}
// GetPod returns a reference to the waiting pod.
func (w *waitingPod) GetPod() *v1.Pod {
return w.pod
}
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
// successfully delivered, false otherwise.
func (w *waitingPod) Allow() bool {
select {
case w.s <- NewStatus(Success, ""):
return true
default:
return false
}
}
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
// was successfully delivered, false otherwise.
func (w *waitingPod) Reject(msg string) bool {
select {
case w.s <- NewStatus(Unschedulable, msg):
return true
default:
return false
}
}

View File

@ -533,6 +533,25 @@ func (sched *Scheduler) scheduleOne() {
}
}
// Run "permit" plugins.
permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
if !permitStatus.IsSuccess() {
var reason string
if permitStatus.Code() == framework.Unschedulable {
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
sched.recordSchedulingFailure(assumedPod, permitStatus.AsError(), reason, permitStatus.Message())
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
return
}
// Run "prebind" plugins.
prebindStatus := fwk.RunPrebindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
if !prebindStatus.IsSuccess() {

View File

@ -30,12 +30,18 @@ import (
// TesterPlugin is common ancestor for a test plugin that allows injection of
// failures and some other test functionalities.
type TesterPlugin struct {
numReserveCalled int
numPrebindCalled int
numUnreserveCalled int
failReserve bool
failPrebind bool
rejectPrebind bool
numReserveCalled int
numPrebindCalled int
numUnreserveCalled int
failReserve bool
failPrebind bool
rejectPrebind bool
numPermitCalled int
failPermit bool
rejectPermit bool
timeoutPermit bool
waitAndRejectPermit bool
waitAndAllowPermit bool
}
type ReservePlugin struct {
@ -50,15 +56,22 @@ type UnreservePlugin struct {
TesterPlugin
}
type PermitPlugin struct {
TesterPlugin
fh framework.FrameworkHandle
}
const (
reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin"
unreservePluginName = "unreserve-plugin"
permitPluginName = "permit-plugin"
)
var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{})
var _ = framework.UnreservePlugin(&UnreservePlugin{})
var _ = framework.PermitPlugin(&PermitPlugin{})
// Name returns name of the plugin.
func (rp *ReservePlugin) Name() string {
@ -134,6 +147,55 @@ func NewUnreservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framew
return unresPlugin, nil
}
var perPlugin = &PermitPlugin{}
// Name returns name of the plugin.
func (pp *PermitPlugin) Name() string {
return permitPluginName
}
// Permit implements the permit test plugin.
func (pp *PermitPlugin) Permit(pc *framework.PluginContext, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
pp.numPermitCalled++
if pp.failPermit {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0
}
if pp.rejectPermit {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
}
if pp.timeoutPermit {
return framework.NewStatus(framework.Wait, ""), 3 * time.Second
}
if pp.waitAndRejectPermit || pp.waitAndAllowPermit {
if pod.Name == "waiting-pod" {
return framework.NewStatus(framework.Wait, ""), 30 * time.Second
}
// This is the signalling pod, wait until the waiting-pod is actually waiting and then either reject or allow it.
wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
w := false
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
return w, nil
})
if pp.waitAndRejectPermit {
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) {
wp.Reject(fmt.Sprintf("reject pod %v", wp.GetPod().Name))
})
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
}
if pp.waitAndAllowPermit {
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow() })
return nil, 0
}
}
return nil, 0
}
// NewPermitPlugin is the factory for permit plugin.
func NewPermitPlugin(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
perPlugin.fh = fh
return perPlugin, nil
}
// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
@ -181,7 +243,7 @@ func TestReservePlugin(t *testing.T) {
// TestPrebindPlugin tests invocation of prebind plugins.
func TestPrebindPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
// Create a plugin registry for testing. Register only a prebind plugin.
registry := framework.Registry{prebindPluginName: NewPrebindPlugin}
// Create the master and the scheduler with the test plugin set.
@ -336,3 +398,169 @@ func TestUnreservePlugin(t *testing.T) {
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestPermitPlugin tests invocation of permit plugins.
func TestPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
registry := framework.Registry{permitPluginName: NewPermitPlugin}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "permit-plugin", nil),
false, nil, registry, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
fail bool
reject bool
timeout bool
}{
{
fail: false,
reject: false,
timeout: false,
},
{
fail: true,
reject: false,
timeout: false,
},
{
fail: false,
reject: true,
timeout: false,
},
{
fail: true,
reject: true,
timeout: false,
},
{
fail: false,
reject: false,
timeout: true,
},
{
fail: false,
reject: false,
timeout: true,
},
}
for i, test := range tests {
perPlugin.failPermit = test.fail
perPlugin.rejectPermit = test.reject
perPlugin.timeoutPermit = test.timeout
perPlugin.waitAndRejectPermit = false
perPlugin.waitAndAllowPermit = false
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if test.fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
}
} else {
if test.reject || test.timeout {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expect the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
}
}
if perPlugin.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins.
func TestCoSchedulingWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
registry := framework.Registry{permitPluginName: NewPermitPlugin}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "permit-plugin", nil),
false, nil, registry, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
waitReject bool
waitAllow bool
}{
{
waitReject: true,
waitAllow: false,
},
{
waitReject: false,
waitAllow: true,
},
}
for i, test := range tests {
perPlugin.failPermit = false
perPlugin.rejectPermit = false
perPlugin.timeoutPermit = false
perPlugin.waitAndRejectPermit = test.waitReject
perPlugin.waitAndAllowPermit = test.waitAllow
// Create two pods.
waitingPod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "waiting-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating the waiting pod: %v", err)
}
signallingPod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "signalling-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating the signalling pod: %v", err)
}
if test.waitReject {
if err = waitForPodUnschedulable(cs, waitingPod); err != nil {
t.Errorf("test #%v: Didn't expect the waiting pod to be scheduled. error: %v", i, err)
}
if err = waitForPodUnschedulable(cs, signallingPod); err != nil {
t.Errorf("test #%v: Didn't expect the signalling pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, waitingPod); err != nil {
t.Errorf("test #%v: Expected the waiting pod to be scheduled. error: %v", i, err)
}
if err = waitForPodToSchedule(cs, signallingPod); err != nil {
t.Errorf("test #%v: Expected the signalling pod to be scheduled. error: %v", i, err)
}
}
if perPlugin.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{waitingPod, signallingPod})
}
}