mirror of https://github.com/k3s-io/k3s
Improve scheduling queue's logic
parent
c93cc804c4
commit
bba9b12d0c
|
@ -380,10 +380,10 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
|
|||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.deleteNominatedPodIfExists(pod)
|
||||
if _, exists, _ := p.activeQ.Get(pod); exists {
|
||||
return p.activeQ.Delete(pod)
|
||||
err := p.activeQ.Delete(pod)
|
||||
if err != nil { // The item was probably not found in the activeQ.
|
||||
p.unschedulableQ.Delete(pod)
|
||||
}
|
||||
p.unschedulableQ.Delete(pod)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -410,11 +410,11 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
|
|||
func (p *PriorityQueue) MoveAllToActiveQueue() {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
var unschedulablePods []interface{}
|
||||
for _, pod := range p.unschedulableQ.pods {
|
||||
unschedulablePods = append(unschedulablePods, pod)
|
||||
if err := p.activeQ.Add(pod); err != nil {
|
||||
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
|
||||
}
|
||||
}
|
||||
p.activeQ.BulkAdd(unschedulablePods)
|
||||
p.unschedulableQ.Clear()
|
||||
p.receivedMoveRequest = true
|
||||
p.cond.Broadcast()
|
||||
|
@ -424,8 +424,11 @@ func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
|
|||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
for _, pod := range pods {
|
||||
p.activeQ.Add(pod)
|
||||
p.unschedulableQ.Delete(pod)
|
||||
if err := p.activeQ.Add(pod); err == nil {
|
||||
p.unschedulableQ.Delete(pod)
|
||||
} else {
|
||||
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
|
||||
}
|
||||
}
|
||||
p.receivedMoveRequest = true
|
||||
p.cond.Broadcast()
|
||||
|
@ -449,6 +452,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
|
|||
}
|
||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
|
||||
podsToMove = append(podsToMove, up)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -645,23 +649,6 @@ func (h *Heap) Add(obj interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// BulkAdd adds all the items in the list to the queue.
|
||||
func (h *Heap) BulkAdd(list []interface{}) error {
|
||||
for _, obj := range list {
|
||||
key, err := h.data.keyFunc(obj)
|
||||
if err != nil {
|
||||
return cache.KeyError{Obj: obj, Err: err}
|
||||
}
|
||||
if _, exists := h.data.items[key]; exists {
|
||||
h.data.items[key].obj = obj
|
||||
heap.Fix(h.data, h.data.items[key].index)
|
||||
} else {
|
||||
heap.Push(h.data, &itemKeyValue{key, obj})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
|
||||
// the key is present in the map, no changes is made to the item.
|
||||
func (h *Heap) AddIfNotPresent(obj interface{}) error {
|
||||
|
|
Loading…
Reference in New Issue