Change type of scheduling queue from cache.FIFO to a new interface

pull/6/head
Bobby (Babak) Salamat 2017-10-23 19:26:59 -07:00
parent 0829376d8b
commit 5497e893ae
5 changed files with 78 additions and 11 deletions

View File

@ -0,0 +1,61 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file contains structures that implement scheduling queue types.
// Scheduling queues hold pending pods waiting to be scheduled.
package core
import (
"k8s.io/client-go/tools/cache"
)
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
Add(obj interface{}) error
AddIfNotPresent(obj interface{}) error
Pop() (interface{}, error)
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
}
// FIFO is only used to add a Pop() method to cache.FIFO so that it can be
// used as a SchedulingQueue interface.
type FIFO struct {
*cache.FIFO
}
// Pop removes the head of FIFO and returns it.
// This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler
// has always been using. There is a comment in that file saying that this method
// shouldn't be used in production code, but scheduler has always been using it.
// This function does minimal error checking.
func (f *FIFO) Pop() (interface{}, error) {
var result interface{}
f.FIFO.Pop(func(obj interface{}) error {
result = obj
return nil
})
return result, nil
}
var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue.

View File

@ -77,7 +77,7 @@ var (
type configFactory struct {
client clientset.Interface
// queue for pods that need scheduling
podQueue *cache.FIFO
podQueue core.SchedulingQueue
// a means to list all known scheduled pods.
scheduledPodLister corelisters.PodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled.
@ -142,10 +142,11 @@ func NewConfigFactory(
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
schedulingQueue := &core.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
c := &configFactory{
client: client,
podLister: schedulerCache,
podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
podQueue: schedulingQueue,
pVLister: pvInformer.Lister(),
pVCLister: pvcInformer.Lister(),
serviceLister: serviceInformer.Lister(),
@ -901,9 +902,14 @@ func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
}
func (f *configFactory) getNextPod() *v1.Pod {
pod := cache.Pop(f.podQueue).(*v1.Pod)
glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
return pod
if obj, err := f.podQueue.Pop(); err == nil {
pod := obj.(*v1.Pod)
glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
return pod
} else {
glog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
}
// unassignedNonTerminatedPod selects pods that are unassigned and non-terminal.
@ -1011,7 +1017,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration, sche
}
}
func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) {
return func(pod *v1.Pod, err error) {
if err == core.ErrNoNodesAvailable {
glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
@ -280,7 +281,7 @@ func TestDefaultErrorFunc(t *testing.T) {
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
queue := &core.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue)

View File

@ -26,7 +26,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -80,7 +79,7 @@ type Configurator interface {
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
GetHardPodAffinitySymmetricWeight() int
GetSchedulerName() string
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error)
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error)
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister

View File

@ -23,9 +23,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
)
@ -65,7 +65,7 @@ func (fc *FakeConfigurator) GetSchedulerName() string {
}
// MakeDefaultErrorFunc is not implemented yet.
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) {
return nil
}