Merge pull request #77529 from draveness/feature/add-queuesort-extension-point

feat: implement "queue-sort" extension point for scheduling framework
k3s-v1.15.3
Kubernetes Prow Robot 2019-05-16 01:09:42 -07:00 committed by GitHub
commit 796ecb9391
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 260 additions and 162 deletions

View File

@ -536,7 +536,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for _, name := range test.nodes { for _, name := range test.nodes {
cache.AddNode(createNode(name)) cache.AddNode(createNode(name))
} }
queue := internalqueue.NewSchedulingQueue(nil) queue := internalqueue.NewSchedulingQueue(nil, nil)
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, cache,
queue, queue,

View File

@ -452,7 +452,7 @@ func TestGenericScheduler(t *testing.T) {
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, cache,
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil, nil),
test.predicates, test.predicates,
algorithmpredicates.EmptyPredicateMetadataProducer, algorithmpredicates.EmptyPredicateMetadataProducer,
test.prioritizers, test.prioritizers,
@ -488,7 +488,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes
s := NewGenericScheduler( s := NewGenericScheduler(
cache, cache,
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil, nil),
predicates, predicates,
algorithmpredicates.EmptyPredicateMetadataProducer, algorithmpredicates.EmptyPredicateMetadataProducer,
prioritizers, prioritizers,
@ -1491,7 +1491,7 @@ func TestPreempt(t *testing.T) {
} }
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, cache,
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil, nil),
map[string]algorithmpredicates.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, map[string]algorithmpredicates.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
algorithmpredicates.EmptyPredicateMetadataProducer, algorithmpredicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{{Function: numericPriority, Weight: 1}}, []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},

View File

@ -262,7 +262,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
c := &configFactory{ c := &configFactory{
client: args.Client, client: args.Client,
podLister: schedulerCache, podLister: schedulerCache,
podQueue: internalqueue.NewSchedulingQueue(stopEverything), podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
nodeLister: args.NodeInformer.Lister(), nodeLister: args.NodeInformer.Lister(),
pVLister: args.PvInformer.Lister(), pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(), pVCLister: args.PvcInformer.Lister(),

View File

@ -256,7 +256,7 @@ func TestDefaultErrorFunc(t *testing.T) {
defer close(stopCh) defer close(stopCh)
timestamp := time.Now() timestamp := time.Now()
queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp)) queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp), nil)
schedulerCache := internalcache.New(30*time.Second, stopCh) schedulerCache := internalcache.New(30*time.Second, stopCh)
errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh) errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh)

View File

@ -34,6 +34,7 @@ type framework struct {
nodeInfoSnapshot *cache.NodeInfoSnapshot nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap waitingPods *waitingPodsMap
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance. plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
queueSortPlugins []QueueSortPlugin
reservePlugins []ReservePlugin reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin prebindPlugins []PrebindPlugin
unreservePlugins []UnreservePlugin unreservePlugins []UnreservePlugin
@ -69,6 +70,10 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
// TODO: For now, we assume any plugins that implements an extension // TODO: For now, we assume any plugins that implements an extension
// point wants to be called at that extension point. We should change this // point wants to be called at that extension point. We should change this
// later and add these plugins based on the configuration. // later and add these plugins based on the configuration.
if qsp, ok := p.(QueueSortPlugin); ok {
f.queueSortPlugins = append(f.queueSortPlugins, qsp)
}
if rp, ok := p.(ReservePlugin); ok { if rp, ok := p.(ReservePlugin); ok {
f.reservePlugins = append(f.reservePlugins, rp) f.reservePlugins = append(f.reservePlugins, rp)
} }
@ -85,6 +90,16 @@ func NewFramework(r Registry, _ *runtime.Unknown) (Framework, error) {
return f, nil return f, nil
} }
// QueueSortFunc returns the function to sort pods in scheduling queue
func (f *framework) QueueSortFunc() LessFunc {
if len(f.queueSortPlugins) == 0 {
return nil
}
// Only one QueueSort plugin can be enabled.
return f.queueSortPlugins[0].Less
}
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a // RunPrebindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an // failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin. // error containing the rejection message or the error occurred in the plugin.

View File

@ -107,6 +107,25 @@ type Plugin interface {
Name() string Name() string
} }
// PodInfo is minimum cell in the scheduling queue.
type PodInfo struct {
Pod *v1.Pod
// The time pod added to the scheduling queue.
Timestamp time.Time
}
// LessFunc is the function to sort pod info
type LessFunc func(podInfo1, podInfo2 *PodInfo) bool
// QueueSortPlugin is an interface that must be implemented by "QueueSort" plugins.
// These plugins are used to sort pods in the scheduling queue. Only one queue sort
// plugin may be enabled at a time.
type QueueSortPlugin interface {
Plugin
// Less are used to sort pods in the scheduling queue.
Less(*PodInfo, *PodInfo) bool
}
// ReservePlugin is an interface for Reserve plugins. These plugins are called // ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point. These are meant to update the state of the plugin. // at the reservation point. These are meant to update the state of the plugin.
// This concept used to be called 'assume' in the original scheduler. // This concept used to be called 'assume' in the original scheduler.
@ -157,6 +176,9 @@ type PermitPlugin interface {
// Configured plugins are called at specified points in a scheduling context. // Configured plugins are called at specified points in a scheduling context.
type Framework interface { type Framework interface {
FrameworkHandle FrameworkHandle
// QueueSortFunc returns the function to sort pods in scheduling queue
QueueSortFunc() LessFunc
// RunPrebindPlugins runs the set of configured prebind plugins. It returns // RunPrebindPlugins runs the set of configured prebind plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns // *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If the Status code is "Unschedulable", it is // anything but Success. If the Status code is "Unschedulable", it is

View File

@ -11,6 +11,7 @@ go_library(
deps = [ deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
@ -31,6 +32,8 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -38,6 +38,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -87,8 +88,8 @@ type SchedulingQueue interface {
} }
// NewSchedulingQueue initializes a priority queue as a new scheduling queue. // NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue { func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework) SchedulingQueue {
return NewPriorityQueue(stop) return NewPriorityQueue(stop, fwk)
} }
// NominatedNodeName returns nominated node name of a Pod. // NominatedNodeName returns nominated node name of a Pod.
@ -140,43 +141,48 @@ type PriorityQueue struct {
// Making sure that PriorityQueue implements SchedulingQueue. // Making sure that PriorityQueue implements SchedulingQueue.
var _ = SchedulingQueue(&PriorityQueue{}) var _ = SchedulingQueue(&PriorityQueue{})
// podInfo is minimum cell in the scheduling queue. // newPodInfoNoTimestamp builds a PodInfo object without timestamp.
type podInfo struct { func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
pod *v1.Pod return &framework.PodInfo{
// The time pod added to the scheduling queue. Pod: pod,
timestamp time.Time
}
// newPodInfoNoTimestamp builds a podInfo object without timestamp.
func newPodInfoNoTimestamp(pod *v1.Pod) *podInfo {
return &podInfo{
pod: pod,
} }
} }
// activeQComp is the function used by the activeQ heap algorithm to sort pods. // activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses // It sorts pods based on their priority. When priorities are equal, it uses
// podInfo.timestamp. // PodInfo.timestamp.
func activeQComp(podInfo1, podInfo2 interface{}) bool { func activeQComp(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*podInfo) pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*podInfo) pInfo2 := podInfo2.(*framework.PodInfo)
prio1 := util.GetPodPriority(pInfo1.pod) prio1 := util.GetPodPriority(pInfo1.Pod)
prio2 := util.GetPodPriority(pInfo2.pod) prio2 := util.GetPodPriority(pInfo2.Pod)
return (prio1 > prio2) || (prio1 == prio2 && pInfo1.timestamp.Before(pInfo2.timestamp)) return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
} }
// NewPriorityQueue creates a PriorityQueue object. // NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue { func NewPriorityQueue(stop <-chan struct{}, fwk framework.Framework) *PriorityQueue {
return NewPriorityQueueWithClock(stop, util.RealClock{}) return NewPriorityQueueWithClock(stop, util.RealClock{}, fwk)
} }
// NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time. // NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time.
func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *PriorityQueue { func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk framework.Framework) *PriorityQueue {
comp := activeQComp
if fwk != nil {
if queueSortFunc := fwk.QueueSortFunc(); queueSortFunc != nil {
comp = func(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo)
return queueSortFunc(pInfo1, pInfo2)
}
}
}
pq := &PriorityQueue{ pq := &PriorityQueue{
clock: clock, clock: clock,
stop: stop, stop: stop,
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second), podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, activeQComp, metrics.NewActivePodsRecorder()), activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
nominatedPods: newNominatedPodMap(), nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1, moveRequestCycle: -1,
@ -334,7 +340,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
if rawPodInfo == nil { if rawPodInfo == nil {
return return
} }
pod := rawPodInfo.(*podInfo).pod pod := rawPodInfo.(*framework.PodInfo).Pod
boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
if !found { if !found {
klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod)) klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
@ -363,10 +369,10 @@ func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
var podsToMove []*podInfo var podsToMove []*framework.PodInfo
currentTime := p.clock.Now() currentTime := p.clock.Now()
for _, pInfo := range p.unschedulableQ.podInfoMap { for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.timestamp lastScheduleTime := pInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval { if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pInfo) podsToMove = append(podsToMove, pInfo)
} }
@ -396,9 +402,9 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
pInfo := obj.(*podInfo) pInfo := obj.(*framework.PodInfo)
p.schedulingCycle++ p.schedulingCycle++
return pInfo.pod, err return pInfo.Pod, err
} }
// isPodUpdated checks if the pod is updated in a way that it may have become // isPodUpdated checks if the pod is updated in a way that it may have become
@ -428,7 +434,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
p.nominatedPods.update(oldPod, newPod) p.nominatedPods.update(oldPod, newPod)
newPodInfo := newPodInfoNoTimestamp(newPod) newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = oldPodInfo.(*podInfo).timestamp newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp
err := p.activeQ.Update(newPodInfo) err := p.activeQ.Update(newPodInfo)
return err return err
} }
@ -438,7 +444,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
p.nominatedPods.update(oldPod, newPod) p.nominatedPods.update(oldPod, newPod)
p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod)) p.podBackoffQ.Delete(newPodInfoNoTimestamp(oldPod))
newPodInfo := newPodInfoNoTimestamp(newPod) newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = oldPodInfo.(*podInfo).timestamp newPodInfo.Timestamp = oldPodInfo.(*framework.PodInfo).Timestamp
err := p.activeQ.Add(newPodInfo) err := p.activeQ.Add(newPodInfo)
if err == nil { if err == nil {
p.cond.Broadcast() p.cond.Broadcast()
@ -451,11 +457,11 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil { if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
p.nominatedPods.update(oldPod, newPod) p.nominatedPods.update(oldPod, newPod)
newPodInfo := newPodInfoNoTimestamp(newPod) newPodInfo := newPodInfoNoTimestamp(newPod)
newPodInfo.timestamp = usPodInfo.timestamp newPodInfo.Timestamp = usPodInfo.Timestamp
if isPodUpdated(oldPod, newPod) { if isPodUpdated(oldPod, newPod) {
// If the pod is updated reset backoff // If the pod is updated reset backoff
p.clearPodBackoff(newPod) p.clearPodBackoff(newPod)
p.unschedulableQ.delete(usPodInfo.pod) p.unschedulableQ.delete(usPodInfo.Pod)
err := p.activeQ.Add(newPodInfo) err := p.activeQ.Add(newPodInfo)
if err == nil { if err == nil {
p.cond.Broadcast() p.cond.Broadcast()
@ -514,7 +520,7 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
for _, pInfo := range p.unschedulableQ.podInfoMap { for _, pInfo := range p.unschedulableQ.podInfoMap {
pod := pInfo.pod pod := pInfo.Pod
if p.isPodBackingOff(pod) { if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pInfo); err != nil { if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
@ -531,9 +537,9 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
} }
// NOTE: this function assumes lock has been acquired in caller // NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*podInfo) { func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*framework.PodInfo) {
for _, pInfo := range podInfoList { for _, pInfo := range podInfoList {
pod := pInfo.pod pod := pInfo.Pod
if p.isPodBackingOff(pod) { if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pInfo); err != nil { if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
@ -552,10 +558,10 @@ func (p *PriorityQueue) movePodsToActiveQueue(podInfoList []*podInfo) {
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have // getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod". // any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller. // NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*podInfo { func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.PodInfo {
var podsToMove []*podInfo var podsToMove []*framework.PodInfo
for _, pInfo := range p.unschedulableQ.podInfoMap { for _, pInfo := range p.unschedulableQ.podInfoMap {
up := pInfo.pod up := pInfo.Pod
affinity := up.Spec.Affinity affinity := up.Spec.Affinity
if affinity != nil && affinity.PodAffinity != nil { if affinity != nil && affinity.PodAffinity != nil {
terms := predicates.GetPodAffinityTerms(affinity.PodAffinity) terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
@ -591,13 +597,13 @@ func (p *PriorityQueue) PendingPods() []*v1.Pod {
defer p.lock.RUnlock() defer p.lock.RUnlock()
result := []*v1.Pod{} result := []*v1.Pod{}
for _, pInfo := range p.activeQ.List() { for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.(*podInfo).pod) result = append(result, pInfo.(*framework.PodInfo).Pod)
} }
for _, pInfo := range p.podBackoffQ.List() { for _, pInfo := range p.podBackoffQ.List() {
result = append(result, pInfo.(*podInfo).pod) result = append(result, pInfo.(*framework.PodInfo).Pod)
} }
for _, pInfo := range p.unschedulableQ.podInfoMap { for _, pInfo := range p.unschedulableQ.podInfoMap {
result = append(result, pInfo.pod) result = append(result, pInfo.Pod)
} }
return result return result
} }
@ -628,10 +634,10 @@ func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
} }
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*podInfo) pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*podInfo) pInfo2 := podInfo2.(*framework.PodInfo)
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.pod)) bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.pod)) bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
return bo1.Before(bo2) return bo1.Before(bo2)
} }
@ -642,25 +648,25 @@ func (p *PriorityQueue) NumUnschedulablePods() int {
return len(p.unschedulableQ.podInfoMap) return len(p.unschedulableQ.podInfoMap)
} }
// newPodInfo builds a podInfo object. // newPodInfo builds a PodInfo object.
func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *podInfo { func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
if p.clock == nil { if p.clock == nil {
return &podInfo{ return &framework.PodInfo{
pod: pod, Pod: pod,
} }
} }
return &podInfo{ return &framework.PodInfo{
pod: pod, Pod: pod,
timestamp: p.clock.Now(), Timestamp: p.clock.Now(),
} }
} }
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ. // is used to implement unschedulableQ.
type UnschedulablePodsMap struct { type UnschedulablePodsMap struct {
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo. // podInfoMap is a map key by a pod's full-name and the value is a pointer to the PodInfo.
podInfoMap map[string]*podInfo podInfoMap map[string]*framework.PodInfo
keyFunc func(*v1.Pod) string keyFunc func(*v1.Pod) string
// metricRecorder updates the counter when elements of an unschedulablePodsMap // metricRecorder updates the counter when elements of an unschedulablePodsMap
// get added or removed, and it does nothing if it's nil // get added or removed, and it does nothing if it's nil
@ -668,8 +674,8 @@ type UnschedulablePodsMap struct {
} }
// Add adds a pod to the unschedulable podInfoMap. // Add adds a pod to the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) { func (u *UnschedulablePodsMap) addOrUpdate(pInfo *framework.PodInfo) {
podID := u.keyFunc(pInfo.pod) podID := u.keyFunc(pInfo.Pod)
if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil { if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
u.metricRecorder.Inc() u.metricRecorder.Inc()
} }
@ -685,9 +691,9 @@ func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
delete(u.podInfoMap, podID) delete(u.podInfoMap, podID)
} }
// Get returns the podInfo if a pod with the same key as the key of the given "pod" // Get returns the PodInfo if a pod with the same key as the key of the given "pod"
// is found in the map. It returns nil otherwise. // is found in the map. It returns nil otherwise.
func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo { func (u *UnschedulablePodsMap) get(pod *v1.Pod) *framework.PodInfo {
podKey := u.keyFunc(pod) podKey := u.keyFunc(pod)
if pInfo, exists := u.podInfoMap[podKey]; exists { if pInfo, exists := u.podInfoMap[podKey]; exists {
return pInfo return pInfo
@ -697,7 +703,7 @@ func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo {
// Clear removes all the entries from the unschedulable podInfoMap. // Clear removes all the entries from the unschedulable podInfoMap.
func (u *UnschedulablePodsMap) clear() { func (u *UnschedulablePodsMap) clear() {
u.podInfoMap = make(map[string]*podInfo) u.podInfoMap = make(map[string]*framework.PodInfo)
if u.metricRecorder != nil { if u.metricRecorder != nil {
u.metricRecorder.Clear() u.metricRecorder.Clear()
} }
@ -706,7 +712,7 @@ func (u *UnschedulablePodsMap) clear() {
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap. // newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap { func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
return &UnschedulablePodsMap{ return &UnschedulablePodsMap{
podInfoMap: make(map[string]*podInfo), podInfoMap: make(map[string]*framework.PodInfo),
keyFunc: util.GetPodFullName, keyFunc: util.GetPodFullName,
metricRecorder: metricRecorder, metricRecorder: metricRecorder,
} }
@ -801,5 +807,5 @@ func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod {
} }
func podInfoKeyFunc(obj interface{}) (string, error) { func podInfoKeyFunc(obj interface{}) (string, error) {
return cache.MetaNamespaceKeyFunc(obj.(*podInfo).pod) return cache.MetaNamespaceKeyFunc(obj.(*framework.PodInfo).Pod)
} }

View File

@ -29,6 +29,8 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -109,13 +111,13 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
defer p.lock.Unlock() defer p.lock.Unlock()
pInfo := p.unschedulableQ.get(pod) pInfo := p.unschedulableQ.get(pod)
if pInfo != nil { if pInfo != nil {
return pInfo.pod return pInfo.Pod
} }
return nil return nil
} }
func TestPriorityQueue_Add(t *testing.T) { func TestPriorityQueue_Add(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
if err := q.Add(&medPriorityPod); err != nil { if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err) t.Errorf("add failed: %v", err)
} }
@ -151,8 +153,58 @@ func TestPriorityQueue_Add(t *testing.T) {
} }
} }
type fakeFramework struct{}
func (*fakeFramework) QueueSortFunc() framework.LessFunc {
return func(podInfo1, podInfo2 *framework.PodInfo) bool {
prio1 := util.GetPodPriority(podInfo1.Pod)
prio2 := util.GetPodPriority(podInfo2.Pod)
return prio1 < prio2
}
}
func (*fakeFramework) NodeInfoSnapshot() *internalcache.NodeInfoSnapshot {
return nil
}
func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunReservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunUnreservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {}
func (*fakeFramework) RunPermitPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) IterateOverWaitingPods(callback func(framework.WaitingPod)) {}
func (*fakeFramework) GetWaitingPod(uid types.UID) framework.WaitingPod {
return nil
}
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := NewPriorityQueue(nil, &fakeFramework{})
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
if err := q.Add(&highPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if p, err := q.Pop(); err != nil || p != &highPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
}
}
func TestPriorityQueue_AddIfNotPresent(t *testing.T) { func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
addOrUpdateUnschedulablePod(q, &highPriNominatedPod) addOrUpdateUnschedulablePod(q, &highPriNominatedPod)
q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddIfNotPresent(&medPriorityPod) q.AddIfNotPresent(&medPriorityPod)
@ -184,7 +236,7 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
} }
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
q.Add(&highPriNominatedPod) q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(&highPriNominatedPod, q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(&unschedulablePod, q.SchedulingCycle())
@ -216,7 +268,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
// current scheduling cycle will be put back to activeQueue if we were trying // current scheduling cycle will be put back to activeQueue if we were trying
// to schedule them when we received move request. // to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
totalNum := 10 totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum) expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ { for i := 0; i < totalNum; i++ {
@ -279,7 +331,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
} }
func TestPriorityQueue_Pop(t *testing.T) { func TestPriorityQueue_Pop(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -296,7 +348,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
} }
func TestPriorityQueue_Update(t *testing.T) { func TestPriorityQueue_Update(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
q.Update(nil, &highPriorityPod) q.Update(nil, &highPriorityPod)
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists { if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
@ -332,7 +384,7 @@ func TestPriorityQueue_Update(t *testing.T) {
} }
func TestPriorityQueue_Delete(t *testing.T) { func TestPriorityQueue_Delete(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
q.Update(&highPriorityPod, &highPriNominatedPod) q.Update(&highPriorityPod, &highPriNominatedPod)
q.Add(&unschedulablePod) q.Add(&unschedulablePod)
if err := q.Delete(&highPriNominatedPod); err != nil { if err := q.Delete(&highPriNominatedPod); err != nil {
@ -356,7 +408,7 @@ func TestPriorityQueue_Delete(t *testing.T) {
} }
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, &unschedulablePod) addOrUpdateUnschedulablePod(q, &unschedulablePod)
addOrUpdateUnschedulablePod(q, &highPriorityPod) addOrUpdateUnschedulablePod(q, &highPriorityPod)
@ -402,7 +454,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
Spec: v1.PodSpec{NodeName: "machine1"}, Spec: v1.PodSpec{NodeName: "machine1"},
} }
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
// Add a couple of pods to the unschedulableQ. // Add a couple of pods to the unschedulableQ.
addOrUpdateUnschedulablePod(q, &unschedulablePod) addOrUpdateUnschedulablePod(q, &unschedulablePod)
@ -423,7 +475,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
} }
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
q.Add(&unschedulablePod) q.Add(&unschedulablePod)
q.Add(&highPriorityPod) q.Add(&highPriorityPod)
@ -448,7 +500,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
return pendingSet return pendingSet
} }
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, &unschedulablePod) addOrUpdateUnschedulablePod(q, &unschedulablePod)
addOrUpdateUnschedulablePod(q, &highPriorityPod) addOrUpdateUnschedulablePod(q, &highPriorityPod)
@ -464,7 +516,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
} }
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
if err := q.Add(&medPriorityPod); err != nil { if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err) t.Errorf("add failed: %v", err)
} }
@ -585,64 +637,64 @@ func TestUnschedulablePodsMap(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
podsToAdd []*v1.Pod podsToAdd []*v1.Pod
expectedMapAfterAdd map[string]*podInfo expectedMapAfterAdd map[string]*framework.PodInfo
podsToUpdate []*v1.Pod podsToUpdate []*v1.Pod
expectedMapAfterUpdate map[string]*podInfo expectedMapAfterUpdate map[string]*framework.PodInfo
podsToDelete []*v1.Pod podsToDelete []*v1.Pod
expectedMapAfterDelete map[string]*podInfo expectedMapAfterDelete map[string]*framework.PodInfo
}{ }{
{ {
name: "create, update, delete subset of pods", name: "create, update, delete subset of pods",
podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]}, podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]},
expectedMapAfterAdd: map[string]*podInfo{ expectedMapAfterAdd: map[string]*framework.PodInfo{
util.GetPodFullName(pods[0]): {pod: pods[0]}, util.GetPodFullName(pods[0]): {Pod: pods[0]},
util.GetPodFullName(pods[1]): {pod: pods[1]}, util.GetPodFullName(pods[1]): {Pod: pods[1]},
util.GetPodFullName(pods[2]): {pod: pods[2]}, util.GetPodFullName(pods[2]): {Pod: pods[2]},
util.GetPodFullName(pods[3]): {pod: pods[3]}, util.GetPodFullName(pods[3]): {Pod: pods[3]},
}, },
podsToUpdate: []*v1.Pod{updatedPods[0]}, podsToUpdate: []*v1.Pod{updatedPods[0]},
expectedMapAfterUpdate: map[string]*podInfo{ expectedMapAfterUpdate: map[string]*framework.PodInfo{
util.GetPodFullName(pods[0]): {pod: updatedPods[0]}, util.GetPodFullName(pods[0]): {Pod: updatedPods[0]},
util.GetPodFullName(pods[1]): {pod: pods[1]}, util.GetPodFullName(pods[1]): {Pod: pods[1]},
util.GetPodFullName(pods[2]): {pod: pods[2]}, util.GetPodFullName(pods[2]): {Pod: pods[2]},
util.GetPodFullName(pods[3]): {pod: pods[3]}, util.GetPodFullName(pods[3]): {Pod: pods[3]},
}, },
podsToDelete: []*v1.Pod{pods[0], pods[1]}, podsToDelete: []*v1.Pod{pods[0], pods[1]},
expectedMapAfterDelete: map[string]*podInfo{ expectedMapAfterDelete: map[string]*framework.PodInfo{
util.GetPodFullName(pods[2]): {pod: pods[2]}, util.GetPodFullName(pods[2]): {Pod: pods[2]},
util.GetPodFullName(pods[3]): {pod: pods[3]}, util.GetPodFullName(pods[3]): {Pod: pods[3]},
}, },
}, },
{ {
name: "create, update, delete all", name: "create, update, delete all",
podsToAdd: []*v1.Pod{pods[0], pods[3]}, podsToAdd: []*v1.Pod{pods[0], pods[3]},
expectedMapAfterAdd: map[string]*podInfo{ expectedMapAfterAdd: map[string]*framework.PodInfo{
util.GetPodFullName(pods[0]): {pod: pods[0]}, util.GetPodFullName(pods[0]): {Pod: pods[0]},
util.GetPodFullName(pods[3]): {pod: pods[3]}, util.GetPodFullName(pods[3]): {Pod: pods[3]},
}, },
podsToUpdate: []*v1.Pod{updatedPods[3]}, podsToUpdate: []*v1.Pod{updatedPods[3]},
expectedMapAfterUpdate: map[string]*podInfo{ expectedMapAfterUpdate: map[string]*framework.PodInfo{
util.GetPodFullName(pods[0]): {pod: pods[0]}, util.GetPodFullName(pods[0]): {Pod: pods[0]},
util.GetPodFullName(pods[3]): {pod: updatedPods[3]}, util.GetPodFullName(pods[3]): {Pod: updatedPods[3]},
}, },
podsToDelete: []*v1.Pod{pods[0], pods[3]}, podsToDelete: []*v1.Pod{pods[0], pods[3]},
expectedMapAfterDelete: map[string]*podInfo{}, expectedMapAfterDelete: map[string]*framework.PodInfo{},
}, },
{ {
name: "delete non-existing and existing pods", name: "delete non-existing and existing pods",
podsToAdd: []*v1.Pod{pods[1], pods[2]}, podsToAdd: []*v1.Pod{pods[1], pods[2]},
expectedMapAfterAdd: map[string]*podInfo{ expectedMapAfterAdd: map[string]*framework.PodInfo{
util.GetPodFullName(pods[1]): {pod: pods[1]}, util.GetPodFullName(pods[1]): {Pod: pods[1]},
util.GetPodFullName(pods[2]): {pod: pods[2]}, util.GetPodFullName(pods[2]): {Pod: pods[2]},
}, },
podsToUpdate: []*v1.Pod{updatedPods[1]}, podsToUpdate: []*v1.Pod{updatedPods[1]},
expectedMapAfterUpdate: map[string]*podInfo{ expectedMapAfterUpdate: map[string]*framework.PodInfo{
util.GetPodFullName(pods[1]): {pod: updatedPods[1]}, util.GetPodFullName(pods[1]): {Pod: updatedPods[1]},
util.GetPodFullName(pods[2]): {pod: pods[2]}, util.GetPodFullName(pods[2]): {Pod: pods[2]},
}, },
podsToDelete: []*v1.Pod{pods[2], pods[3]}, podsToDelete: []*v1.Pod{pods[2], pods[3]},
expectedMapAfterDelete: map[string]*podInfo{ expectedMapAfterDelete: map[string]*framework.PodInfo{
util.GetPodFullName(pods[1]): {pod: updatedPods[1]}, util.GetPodFullName(pods[1]): {Pod: updatedPods[1]},
}, },
}, },
} }
@ -690,7 +742,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
}{ }{
{ {
name: "PriorityQueue close", name: "PriorityQueue close",
q: NewPriorityQueue(nil), q: NewPriorityQueue(nil, nil),
expectedErr: fmt.Errorf(queueClosed), expectedErr: fmt.Errorf(queueClosed),
}, },
} }
@ -719,7 +771,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
// ensures that an unschedulable pod does not block head of the queue when there // ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue. // are frequent events that move pods to the active queue.
func TestRecentlyTriedPodsGoBack(t *testing.T) { func TestRecentlyTriedPodsGoBack(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
// Add a few pods to priority queue. // Add a few pods to priority queue.
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
p := v1.Pod{ p := v1.Pod{
@ -773,7 +825,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// This behavior ensures that an unschedulable pod does not block head of the queue when there // This behavior ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue. // are frequent events that move pods to the active queue.
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
// Add an unschedulable pod to a priority queue. // Add an unschedulable pod to a priority queue.
// This makes a situation that the pod was tried to schedule // This makes a situation that the pod was tried to schedule
@ -864,7 +916,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// TestHighPriorityBackoff tests that a high priority pod does not block // TestHighPriorityBackoff tests that a high priority pod does not block
// other pods if it is unschedulable // other pods if it is unschedulable
func TestHighProirotyBackoff(t *testing.T) { func TestHighProirotyBackoff(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
midPod := v1.Pod{ midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -927,7 +979,7 @@ func TestHighProirotyBackoff(t *testing.T) {
// TestHighProirotyFlushUnschedulableQLeftover tests that pods will be moved to // TestHighProirotyFlushUnschedulableQLeftover tests that pods will be moved to
// activeQ after one minutes if it is in unschedulableQ // activeQ after one minutes if it is in unschedulableQ
func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) { func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) {
q := NewPriorityQueue(nil) q := NewPriorityQueue(nil, nil)
midPod := v1.Pod{ midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-midpod", Name: "test-midpod",
@ -973,8 +1025,8 @@ func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) {
addOrUpdateUnschedulablePod(q, &highPod) addOrUpdateUnschedulablePod(q, &highPod)
addOrUpdateUnschedulablePod(q, &midPod) addOrUpdateUnschedulablePod(q, &midPod)
q.unschedulableQ.podInfoMap[util.GetPodFullName(&highPod)].timestamp = time.Now().Add(-1 * unschedulableQTimeInterval) q.unschedulableQ.podInfoMap[util.GetPodFullName(&highPod)].Timestamp = time.Now().Add(-1 * unschedulableQTimeInterval)
q.unschedulableQ.podInfoMap[util.GetPodFullName(&midPod)].timestamp = time.Now().Add(-1 * unschedulableQTimeInterval) q.unschedulableQ.podInfoMap[util.GetPodFullName(&midPod)].Timestamp = time.Now().Add(-1 * unschedulableQTimeInterval)
if p, err := q.Pop(); err != nil || p != &highPod { if p, err := q.Pop(); err != nil || p != &highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
@ -984,23 +1036,23 @@ func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) {
} }
} }
type operation func(queue *PriorityQueue, pInfo *podInfo) type operation func(queue *PriorityQueue, pInfo *framework.PodInfo)
var ( var (
addPodActiveQ = func(queue *PriorityQueue, pInfo *podInfo) { addPodActiveQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.lock.Lock() queue.lock.Lock()
queue.activeQ.Add(pInfo) queue.activeQ.Add(pInfo)
queue.lock.Unlock() queue.lock.Unlock()
} }
updatePodActiveQ = func(queue *PriorityQueue, pInfo *podInfo) { updatePodActiveQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.lock.Lock() queue.lock.Lock()
queue.activeQ.Update(pInfo) queue.activeQ.Update(pInfo)
queue.lock.Unlock() queue.lock.Unlock()
} }
addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *podInfo) { addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.lock.Lock() queue.lock.Lock()
// Update pod condition to unschedulable. // Update pod condition to unschedulable.
podutil.UpdatePodCondition(&pInfo.pod.Status, &v1.PodCondition{ podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{
Type: v1.PodScheduled, Type: v1.PodScheduled,
Status: v1.ConditionFalse, Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable, Reason: v1.PodReasonUnschedulable,
@ -1009,24 +1061,24 @@ var (
queue.unschedulableQ.addOrUpdate(pInfo) queue.unschedulableQ.addOrUpdate(pInfo)
queue.lock.Unlock() queue.lock.Unlock()
} }
addPodBackoffQ = func(queue *PriorityQueue, pInfo *podInfo) { addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.lock.Lock() queue.lock.Lock()
queue.podBackoffQ.Add(pInfo) queue.podBackoffQ.Add(pInfo)
queue.lock.Unlock() queue.lock.Unlock()
} }
moveAllToActiveQ = func(queue *PriorityQueue, _ *podInfo) { moveAllToActiveQ = func(queue *PriorityQueue, _ *framework.PodInfo) {
queue.MoveAllToActiveQueue() queue.MoveAllToActiveQueue()
} }
backoffPod = func(queue *PriorityQueue, pInfo *podInfo) { backoffPod = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.backoffPod(pInfo.pod) queue.backoffPod(pInfo.Pod)
} }
flushBackoffQ = func(queue *PriorityQueue, _ *podInfo) { flushBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) {
queue.clock.(*clock.FakeClock).Step(2 * time.Second) queue.clock.(*clock.FakeClock).Step(2 * time.Second)
queue.flushBackoffQCompleted() queue.flushBackoffQCompleted()
} }
) )
// TestPodTimestamp tests the operations related to podInfo. // TestPodTimestamp tests the operations related to PodInfo.
func TestPodTimestamp(t *testing.T) { func TestPodTimestamp(t *testing.T) {
pod1 := &v1.Pod{ pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -1051,20 +1103,20 @@ func TestPodTimestamp(t *testing.T) {
} }
var timestamp = time.Now() var timestamp = time.Now()
pInfo1 := &podInfo{ pInfo1 := &framework.PodInfo{
pod: pod1, Pod: pod1,
timestamp: timestamp, Timestamp: timestamp,
} }
pInfo2 := &podInfo{ pInfo2 := &framework.PodInfo{
pod: pod2, Pod: pod2,
timestamp: timestamp.Add(time.Second), Timestamp: timestamp.Add(time.Second),
} }
tests := []struct { tests := []struct {
name string name string
operations []operation operations []operation
operands []*podInfo operands []*framework.PodInfo
expected []*podInfo expected []*framework.PodInfo
}{ }{
{ {
name: "add two pod to activeQ and sort them by the timestamp", name: "add two pod to activeQ and sort them by the timestamp",
@ -1072,8 +1124,8 @@ func TestPodTimestamp(t *testing.T) {
addPodActiveQ, addPodActiveQ,
addPodActiveQ, addPodActiveQ,
}, },
operands: []*podInfo{pInfo2, pInfo1}, operands: []*framework.PodInfo{pInfo2, pInfo1},
expected: []*podInfo{pInfo1, pInfo2}, expected: []*framework.PodInfo{pInfo1, pInfo2},
}, },
{ {
name: "update two pod to activeQ and sort them by the timestamp", name: "update two pod to activeQ and sort them by the timestamp",
@ -1081,8 +1133,8 @@ func TestPodTimestamp(t *testing.T) {
updatePodActiveQ, updatePodActiveQ,
updatePodActiveQ, updatePodActiveQ,
}, },
operands: []*podInfo{pInfo2, pInfo1}, operands: []*framework.PodInfo{pInfo2, pInfo1},
expected: []*podInfo{pInfo1, pInfo2}, expected: []*framework.PodInfo{pInfo1, pInfo2},
}, },
{ {
name: "add two pod to unschedulableQ then move them to activeQ and sort them by the timestamp", name: "add two pod to unschedulableQ then move them to activeQ and sort them by the timestamp",
@ -1091,8 +1143,8 @@ func TestPodTimestamp(t *testing.T) {
addPodUnschedulableQ, addPodUnschedulableQ,
moveAllToActiveQ, moveAllToActiveQ,
}, },
operands: []*podInfo{pInfo2, pInfo1, nil}, operands: []*framework.PodInfo{pInfo2, pInfo1, nil},
expected: []*podInfo{pInfo1, pInfo2}, expected: []*framework.PodInfo{pInfo1, pInfo2},
}, },
{ {
name: "add one pod to BackoffQ and move it to activeQ", name: "add one pod to BackoffQ and move it to activeQ",
@ -1103,15 +1155,15 @@ func TestPodTimestamp(t *testing.T) {
flushBackoffQ, flushBackoffQ,
moveAllToActiveQ, moveAllToActiveQ,
}, },
operands: []*podInfo{pInfo2, pInfo1, pInfo1, nil, nil}, operands: []*framework.PodInfo{pInfo2, pInfo1, pInfo1, nil, nil},
expected: []*podInfo{pInfo1, pInfo2}, expected: []*framework.PodInfo{pInfo1, pInfo2},
}, },
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp)) queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp), nil)
var podInfoList []*podInfo var podInfoList []*framework.PodInfo
for i, op := range test.operations { for i, op := range test.operations {
op(queue, test.operands[i]) op(queue, test.operands[i])
@ -1121,12 +1173,12 @@ func TestPodTimestamp(t *testing.T) {
if pInfo, err := queue.activeQ.Pop(); err != nil { if pInfo, err := queue.activeQ.Pop(); err != nil {
t.Errorf("Error while popping the head of the queue: %v", err) t.Errorf("Error while popping the head of the queue: %v", err)
} else { } else {
podInfoList = append(podInfoList, pInfo.(*podInfo)) podInfoList = append(podInfoList, pInfo.(*framework.PodInfo))
} }
} }
if !reflect.DeepEqual(test.expected, podInfoList) { if !reflect.DeepEqual(test.expected, podInfoList) {
t.Errorf("Unexpected podInfo list. Expected: %v, got: %v", t.Errorf("Unexpected PodInfo list. Expected: %v, got: %v",
test.expected, podInfoList) test.expected, podInfoList)
} }
}) })
@ -1137,24 +1189,24 @@ func TestPodTimestamp(t *testing.T) {
func TestPendingPodsMetric(t *testing.T) { func TestPendingPodsMetric(t *testing.T) {
total := 50 total := 50
timestamp := time.Now() timestamp := time.Now()
var pInfos = make([]*podInfo, 0, total) var pInfos = make([]*framework.PodInfo, 0, total)
for i := 1; i <= total; i++ { for i := 1; i <= total; i++ {
p := &podInfo{ p := &framework.PodInfo{
pod: &v1.Pod{ Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-pod-%d", i), Name: fmt.Sprintf("test-pod-%d", i),
Namespace: fmt.Sprintf("ns%d", i), Namespace: fmt.Sprintf("ns%d", i),
UID: types.UID(fmt.Sprintf("tp-%d", i)), UID: types.UID(fmt.Sprintf("tp-%d", i)),
}, },
}, },
timestamp: timestamp, Timestamp: timestamp,
} }
pInfos = append(pInfos, p) pInfos = append(pInfos, p)
} }
tests := []struct { tests := []struct {
name string name string
operations []operation operations []operation
operands [][]*podInfo operands [][]*framework.PodInfo
expected []int64 expected []int64
}{ }{
{ {
@ -1163,7 +1215,7 @@ func TestPendingPodsMetric(t *testing.T) {
addPodActiveQ, addPodActiveQ,
addPodUnschedulableQ, addPodUnschedulableQ,
}, },
operands: [][]*podInfo{ operands: [][]*framework.PodInfo{
pInfos[:30], pInfos[:30],
pInfos[30:], pInfos[30:],
}, },
@ -1177,7 +1229,7 @@ func TestPendingPodsMetric(t *testing.T) {
addPodBackoffQ, addPodBackoffQ,
addPodUnschedulableQ, addPodUnschedulableQ,
}, },
operands: [][]*podInfo{ operands: [][]*framework.PodInfo{
pInfos[:15], pInfos[:15],
pInfos[15:40], pInfos[15:40],
pInfos[15:40], pInfos[15:40],
@ -1191,7 +1243,7 @@ func TestPendingPodsMetric(t *testing.T) {
addPodUnschedulableQ, addPodUnschedulableQ,
moveAllToActiveQ, moveAllToActiveQ,
}, },
operands: [][]*podInfo{ operands: [][]*framework.PodInfo{
pInfos[:total], pInfos[:total],
{nil}, {nil},
}, },
@ -1204,7 +1256,7 @@ func TestPendingPodsMetric(t *testing.T) {
addPodUnschedulableQ, addPodUnschedulableQ,
moveAllToActiveQ, moveAllToActiveQ,
}, },
operands: [][]*podInfo{ operands: [][]*framework.PodInfo{
pInfos[:20], pInfos[:20],
pInfos[:total], pInfos[:total],
{nil}, {nil},
@ -1220,7 +1272,7 @@ func TestPendingPodsMetric(t *testing.T) {
moveAllToActiveQ, moveAllToActiveQ,
flushBackoffQ, flushBackoffQ,
}, },
operands: [][]*podInfo{ operands: [][]*framework.PodInfo{
pInfos[:20], pInfos[:20],
pInfos[:40], pInfos[:40],
pInfos[40:], pInfos[40:],
@ -1240,7 +1292,7 @@ func TestPendingPodsMetric(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
resetMetrics() resetMetrics()
queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp)) queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp), nil)
for i, op := range test.operations { for i, op := range test.operations {
for _, pInfo := range test.operands[i] { for _, pInfo := range test.operands[i] {
op(queue, pInfo) op(queue, pInfo)

View File

@ -641,7 +641,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil) framework, _ := framework.NewFramework(EmptyPluginRegistry, nil)
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
scache, scache,
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil, nil),
predicateMap, predicateMap,
predicates.EmptyPredicateMetadataProducer, predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{}, []priorities.PriorityConfig{},
@ -694,7 +694,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil) framework, _ := framework.NewFramework(EmptyPluginRegistry, nil)
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
scache, scache,
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil, nil),
predicateMap, predicateMap,
predicates.EmptyPredicateMetadataProducer, predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{}, []priorities.PriorityConfig{},