From be661fddb49087af113f03b106e827bec9bc4efe Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Fri, 14 Sep 2018 16:59:54 -0700 Subject: [PATCH] shutdown schedulingQueue gracefully - add Close() to interface SchedulingQueue - implement Close() for FIFO and PriorityQueue - add unit test --- pkg/scheduler/core/scheduling_queue.go | 43 ++++++++++++++++++--- pkg/scheduler/core/scheduling_queue_test.go | 38 ++++++++++++++++++ pkg/scheduler/factory/factory.go | 1 + pkg/scheduler/scheduler.go | 4 ++ 4 files changed, 80 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/core/scheduling_queue.go b/pkg/scheduler/core/scheduling_queue.go index 3c87aa246e..bd91a5381b 100644 --- a/pkg/scheduler/core/scheduling_queue.go +++ b/pkg/scheduler/core/scheduling_queue.go @@ -43,6 +43,10 @@ import ( "k8s.io/kubernetes/pkg/scheduler/util" ) +var ( + queueClosed = "scheduling queue is closed" +) + // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. // The interface follows a pattern similar to cache.FIFO and cache.Heap and // makes it easy to use those data structures as a SchedulingQueue. @@ -50,6 +54,8 @@ type SchedulingQueue interface { Add(pod *v1.Pod) error AddIfNotPresent(pod *v1.Pod) error AddUnschedulableIfNotPresent(pod *v1.Pod) error + // Pop removes the head of the queue and returns it. It blocks if the + // queue is empty and waits until a new item is added to the queue. Pop() (*v1.Pod, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error @@ -58,6 +64,9 @@ type SchedulingQueue interface { AssignedPodUpdated(pod *v1.Pod) WaitingPodsForNode(nodeName string) []*v1.Pod WaitingPods() []*v1.Pod + // Close closes the SchedulingQueue so that the goroutine which is + // waiting to pop items can exit gracefully. + Close() } // NewSchedulingQueue initializes a new scheduling queue. If pod priority is @@ -109,12 +118,11 @@ func (f *FIFO) Delete(pod *v1.Pod) error { // shouldn't be used in production code, but scheduler has always been using it. // This function does minimal error checking. func (f *FIFO) Pop() (*v1.Pod, error) { - var result interface{} - f.FIFO.Pop(func(obj interface{}) error { - result = obj - return nil - }) - return result.(*v1.Pod), nil + result, err := f.FIFO.Pop(func(obj interface{}) error { return nil }) + if err == cache.FIFOClosedError { + return nil, fmt.Errorf(queueClosed) + } + return result.(*v1.Pod), err } // WaitingPods returns all the waiting pods in the queue. @@ -144,6 +152,11 @@ func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod { return nil } +// Close closes the FIFO queue. +func (f *FIFO) Close() { + f.FIFO.Close() +} + // NewFIFO creates a FIFO object. func NewFIFO() *FIFO { return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} @@ -179,6 +192,10 @@ type PriorityQueue struct { // pod was in flight (we were trying to schedule it). In such a case, we put // the pod back into the activeQ if it is determined unschedulable. receivedMoveRequest bool + + // closed indicates that the queue is closed. + // It is mainly used to let Pop() exit its control loop while waiting for an item. + closed bool } // Making sure that PriorityQueue implements SchedulingQueue. @@ -312,6 +329,12 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) { p.lock.Lock() defer p.lock.Unlock() for len(p.activeQ.data.queue) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the p.closed is set and the condition is broadcast, + // which causes this loop to continue and return from the Pop(). + if p.closed { + return nil, fmt.Errorf(queueClosed) + } p.cond.Wait() } obj, err := p.activeQ.Pop() @@ -485,6 +508,14 @@ func (p *PriorityQueue) WaitingPods() []*v1.Pod { return result } +// Close closes the priority queue. +func (p *PriorityQueue) Close() { + p.lock.Lock() + defer p.lock.Unlock() + p.closed = true + p.cond.Broadcast() +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { diff --git a/pkg/scheduler/core/scheduling_queue_test.go b/pkg/scheduler/core/scheduling_queue_test.go index 38aa087419..9233c9969e 100644 --- a/pkg/scheduler/core/scheduling_queue_test.go +++ b/pkg/scheduler/core/scheduling_queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "fmt" "reflect" "sync" "testing" @@ -473,3 +474,40 @@ func TestUnschedulablePodsMap(t *testing.T) { }) } } + +func TestSchedulingQueue_Close(t *testing.T) { + tests := []struct { + name string + q SchedulingQueue + expectedErr error + }{ + { + name: "FIFO close", + q: NewFIFO(), + expectedErr: fmt.Errorf(queueClosed), + }, + { + name: "PriorityQueue close", + q: NewPriorityQueue(), + expectedErr: fmt.Errorf(queueClosed), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + pod, err := test.q.Pop() + if err.Error() != test.expectedErr.Error() { + t.Errorf("Expected err %q from Pop() if queue is closed, but got %q", test.expectedErr.Error(), err.Error()) + } + if pod != nil { + t.Errorf("Expected pod nil from Pop() if queue is closed, but got: %v", pod) + } + }() + test.q.Close() + wg.Wait() + }) + } +} diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 1995f84df1..a9ba328953 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -332,6 +332,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator { for { select { case <-c.StopEverything: + c.podQueue.Close() return case <-ch: comparer.Compare() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2171a49814..c29e8c85e9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -396,6 +396,10 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne() { pod := sched.config.NextPod() + // pod could be nil when schedulerQueue is closed + if pod == nil { + return + } if pod.DeletionTimestamp != nil { sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)