Merge pull request #68700 from Huang-Wei/schedulingQ-graceful-shutdown

shutdown schedulingQueue gracefully
pull/58/head
k8s-ci-robot 2018-09-28 00:46:14 -07:00 committed by GitHub
commit db1d1c8674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 6 deletions

View File

@ -43,6 +43,10 @@ import (
"k8s.io/kubernetes/pkg/scheduler/util"
)
var (
queueClosed = "scheduling queue is closed"
)
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
@ -50,6 +54,8 @@ type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error
// Pop removes the head of the queue and returns it. It blocks if the
// queue is empty and waits until a new item is added to the queue.
Pop() (*v1.Pod, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
@ -58,6 +64,9 @@ type SchedulingQueue interface {
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
}
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
@ -109,12 +118,11 @@ func (f *FIFO) Delete(pod *v1.Pod) error {
// shouldn't be used in production code, but scheduler has always been using it.
// This function does minimal error checking.
func (f *FIFO) Pop() (*v1.Pod, error) {
var result interface{}
f.FIFO.Pop(func(obj interface{}) error {
result = obj
return nil
})
return result.(*v1.Pod), nil
result, err := f.FIFO.Pop(func(obj interface{}) error { return nil })
if err == cache.FIFOClosedError {
return nil, fmt.Errorf(queueClosed)
}
return result.(*v1.Pod), err
}
// WaitingPods returns all the waiting pods in the queue.
@ -144,6 +152,11 @@ func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod {
return nil
}
// Close closes the FIFO queue.
func (f *FIFO) Close() {
f.FIFO.Close()
}
// NewFIFO creates a FIFO object.
func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
@ -179,6 +192,10 @@ type PriorityQueue struct {
// pod was in flight (we were trying to schedule it). In such a case, we put
// the pod back into the activeQ if it is determined unschedulable.
receivedMoveRequest bool
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
}
// Making sure that PriorityQueue implements SchedulingQueue.
@ -312,6 +329,12 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.activeQ.data.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
@ -485,6 +508,14 @@ func (p *PriorityQueue) WaitingPods() []*v1.Pod {
return result
}
// Close closes the priority queue.
func (p *PriorityQueue) Close() {
p.lock.Lock()
defer p.lock.Unlock()
p.closed = true
p.cond.Broadcast()
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {

View File

@ -17,6 +17,7 @@ limitations under the License.
package core
import (
"fmt"
"reflect"
"sync"
"testing"
@ -473,3 +474,40 @@ func TestUnschedulablePodsMap(t *testing.T) {
})
}
}
func TestSchedulingQueue_Close(t *testing.T) {
tests := []struct {
name string
q SchedulingQueue
expectedErr error
}{
{
name: "FIFO close",
q: NewFIFO(),
expectedErr: fmt.Errorf(queueClosed),
},
{
name: "PriorityQueue close",
q: NewPriorityQueue(),
expectedErr: fmt.Errorf(queueClosed),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
pod, err := test.q.Pop()
if err.Error() != test.expectedErr.Error() {
t.Errorf("Expected err %q from Pop() if queue is closed, but got %q", test.expectedErr.Error(), err.Error())
}
if pod != nil {
t.Errorf("Expected pod nil from Pop() if queue is closed, but got: %v", pod)
}
}()
test.q.Close()
wg.Wait()
})
}
}

View File

@ -318,6 +318,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
for {
select {
case <-c.StopEverything:
c.podQueue.Close()
return
case <-ch:
comparer.Compare()

View File

@ -396,6 +396,10 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
return
}
if pod.DeletionTimestamp != nil {
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)