mirror of https://github.com/k3s-io/k3s
Merge pull request #70898 from Huang-Wei/preemption-issue
ensure scheduler preemptor behaves in an efficient/correct pathpull/58/head
commit
1f3057b7fb
|
@ -494,7 +494,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
||||||
// addNominatedPods adds pods with equal or greater priority which are nominated
|
// addNominatedPods adds pods with equal or greater priority which are nominated
|
||||||
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
|
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
|
||||||
// any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
|
// any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
|
||||||
func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
|
func addNominatedPods(pod *v1.Pod, meta algorithm.PredicateMetadata,
|
||||||
nodeInfo *schedulercache.NodeInfo, queue internalqueue.SchedulingQueue) (bool, algorithm.PredicateMetadata,
|
nodeInfo *schedulercache.NodeInfo, queue internalqueue.SchedulingQueue) (bool, algorithm.PredicateMetadata,
|
||||||
*schedulercache.NodeInfo) {
|
*schedulercache.NodeInfo) {
|
||||||
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
|
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
|
||||||
|
@ -511,7 +511,7 @@ func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
|
||||||
}
|
}
|
||||||
nodeInfoOut := nodeInfo.Clone()
|
nodeInfoOut := nodeInfo.Clone()
|
||||||
for _, p := range nominatedPods {
|
for _, p := range nominatedPods {
|
||||||
if util.GetPodPriority(p) >= podPriority {
|
if util.GetPodPriority(p) >= util.GetPodPriority(pod) && p.UID != pod.UID {
|
||||||
nodeInfoOut.AddPod(p)
|
nodeInfoOut.AddPod(p)
|
||||||
if metaOut != nil {
|
if metaOut != nil {
|
||||||
metaOut.AddPod(p, nodeInfoOut)
|
metaOut.AddPod(p, nodeInfoOut)
|
||||||
|
@ -569,7 +569,7 @@ func podFitsOnNode(
|
||||||
metaToUse := meta
|
metaToUse := meta
|
||||||
nodeInfoToUse := info
|
nodeInfoToUse := info
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
|
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
|
||||||
} else if !podsAdded || len(failedPredicates) != 0 {
|
} else if !podsAdded || len(failedPredicates) != 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,6 +135,9 @@ type Config struct {
|
||||||
|
|
||||||
// Disable pod preemption or not.
|
// Disable pod preemption or not.
|
||||||
DisablePreemption bool
|
DisablePreemption bool
|
||||||
|
|
||||||
|
// SchedulingQueue holds pods to be scheduled
|
||||||
|
SchedulingQueue internalqueue.SchedulingQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodPreemptor has methods needed to delete a pod and to update
|
// PodPreemptor has methods needed to delete a pod and to update
|
||||||
|
@ -1264,6 +1267,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
||||||
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
|
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
|
||||||
StopEverything: c.StopEverything,
|
StopEverything: c.StopEverything,
|
||||||
VolumeBinder: c.volumeBinder,
|
VolumeBinder: c.volumeBinder,
|
||||||
|
SchedulingQueue: c.podQueue,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,6 +67,8 @@ type SchedulingQueue interface {
|
||||||
// Close closes the SchedulingQueue so that the goroutine which is
|
// Close closes the SchedulingQueue so that the goroutine which is
|
||||||
// waiting to pop items can exit gracefully.
|
// waiting to pop items can exit gracefully.
|
||||||
Close()
|
Close()
|
||||||
|
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
|
||||||
|
DeleteNominatedPodIfExists(pod *v1.Pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
|
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
|
||||||
|
@ -157,6 +159,9 @@ func (f *FIFO) Close() {
|
||||||
f.FIFO.Close()
|
f.FIFO.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteNominatedPodIfExists does nothing in FIFO.
|
||||||
|
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
|
||||||
|
|
||||||
// NewFIFO creates a FIFO object.
|
// NewFIFO creates a FIFO object.
|
||||||
func NewFIFO() *FIFO {
|
func NewFIFO() *FIFO {
|
||||||
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
|
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
|
||||||
|
@ -219,7 +224,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
|
||||||
if len(nnn) > 0 {
|
if len(nnn) > 0 {
|
||||||
for _, np := range p.nominatedPods[nnn] {
|
for _, np := range p.nominatedPods[nnn] {
|
||||||
if np.UID == pod.UID {
|
if np.UID == pod.UID {
|
||||||
klog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
|
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -228,6 +233,7 @@ func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
|
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
|
||||||
|
// NOTE: this function assumes lock has been acquired in caller.
|
||||||
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
|
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
|
||||||
nnn := NominatedNodeName(pod)
|
nnn := NominatedNodeName(pod)
|
||||||
if len(nnn) > 0 {
|
if len(nnn) > 0 {
|
||||||
|
@ -342,7 +348,6 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
pod := obj.(*v1.Pod)
|
pod := obj.(*v1.Pod)
|
||||||
p.deleteNominatedPodIfExists(pod)
|
|
||||||
p.receivedMoveRequest = false
|
p.receivedMoveRequest = false
|
||||||
return pod, err
|
return pod, err
|
||||||
}
|
}
|
||||||
|
@ -411,13 +416,17 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
|
||||||
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
|
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
|
||||||
// may make pending pods with matching affinity terms schedulable.
|
// may make pending pods with matching affinity terms schedulable.
|
||||||
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
|
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
|
||||||
|
p.lock.Lock()
|
||||||
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
|
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
|
||||||
|
p.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// AssignedPodUpdated is called when a bound pod is updated. Change of labels
|
// AssignedPodUpdated is called when a bound pod is updated. Change of labels
|
||||||
// may make pending pods with matching affinity terms schedulable.
|
// may make pending pods with matching affinity terms schedulable.
|
||||||
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
|
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
|
||||||
|
p.lock.Lock()
|
||||||
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
|
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
|
||||||
|
p.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
|
// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
|
||||||
|
@ -441,9 +450,8 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
|
||||||
p.cond.Broadcast()
|
p.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: this function assumes lock has been acquired in caller
|
||||||
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
|
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
if err := p.activeQ.Add(pod); err == nil {
|
if err := p.activeQ.Add(pod); err == nil {
|
||||||
p.unschedulableQ.delete(pod)
|
p.unschedulableQ.delete(pod)
|
||||||
|
@ -457,9 +465,8 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
|
||||||
|
|
||||||
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
|
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
|
||||||
// any affinity term that matches "pod".
|
// any affinity term that matches "pod".
|
||||||
|
// NOTE: this function assumes lock has been acquired in caller.
|
||||||
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
|
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
|
||||||
p.lock.RLock()
|
|
||||||
defer p.lock.RUnlock()
|
|
||||||
var podsToMove []*v1.Pod
|
var podsToMove []*v1.Pod
|
||||||
for _, up := range p.unschedulableQ.pods {
|
for _, up := range p.unschedulableQ.pods {
|
||||||
affinity := up.Spec.Affinity
|
affinity := up.Spec.Affinity
|
||||||
|
@ -516,6 +523,13 @@ func (p *PriorityQueue) Close() {
|
||||||
p.cond.Broadcast()
|
p.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod
|
||||||
|
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
|
||||||
|
p.lock.Lock()
|
||||||
|
p.deleteNominatedPodIfExists(pod)
|
||||||
|
p.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
|
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
|
||||||
// is used to implement unschedulableQ.
|
// is used to implement unschedulableQ.
|
||||||
type UnschedulablePodsMap struct {
|
type UnschedulablePodsMap struct {
|
||||||
|
|
|
@ -112,8 +112,8 @@ func TestPriorityQueue_Add(t *testing.T) {
|
||||||
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
|
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
|
||||||
}
|
}
|
||||||
if len(q.nominatedPods) != 0 {
|
if len(q.nominatedPods["node1"]) != 2 {
|
||||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,8 +135,8 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
|
||||||
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
|
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
|
||||||
}
|
}
|
||||||
if len(q.nominatedPods) != 0 {
|
if len(q.nominatedPods["node1"]) != 2 {
|
||||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
|
||||||
}
|
}
|
||||||
if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod {
|
if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod {
|
||||||
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
|
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
|
||||||
|
@ -178,8 +178,8 @@ func TestPriorityQueue_Pop(t *testing.T) {
|
||||||
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
|
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
|
||||||
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
|
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
|
||||||
}
|
}
|
||||||
if len(q.nominatedPods) != 0 {
|
if len(q.nominatedPods["node1"]) != 1 {
|
||||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods["node1"])
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
q.Add(&medPriorityPod)
|
q.Add(&medPriorityPod)
|
||||||
|
|
|
@ -447,6 +447,10 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// if "assumed" is a nominated pod, we should remove it from internal cache
|
||||||
|
if sched.config.SchedulingQueue != nil {
|
||||||
|
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
|
||||||
|
}
|
||||||
|
|
||||||
// Optimistically assume that the binding will succeed, so we need to invalidate affected
|
// Optimistically assume that the binding will succeed, so we need to invalidate affected
|
||||||
// predicates in equivalence cache.
|
// predicates in equivalence cache.
|
||||||
|
|
Loading…
Reference in New Issue