Merge pull request #76301 from xiuqiaoli/master

Remove FIFO scheduling queue and old pod backoff logic
k3s-v1.15.3
Kubernetes Prow Robot 2019-04-24 19:36:44 -07:00 committed by GitHub
commit a0d8278346
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 120 additions and 193 deletions

View File

@ -21,7 +21,6 @@ go_library(
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/plugins:go_default_library",
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@ -68,6 +67,7 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -56,7 +56,6 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/plugins"
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -456,7 +455,6 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
c.percentageOfNodesToScore,
)
podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second)
return &Config{
SchedulerCache: c.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
@ -470,7 +468,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
Error: MakeDefaultErrorFunc(c.client, podBackoff, c.podQueue, c.schedulerCache, c.StopEverything),
Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
@ -639,7 +637,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core
}
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
return func(pod *v1.Pod, err error) {
if err == core.ErrNoNodesAvailable {
klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
@ -662,7 +660,6 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.Pod
}
}
backoff.CleanupPodsCompletesBackingoff()
podSchedulingCycle := podQueue.SchedulingCycle()
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
@ -673,16 +670,9 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.Pod
Name: pod.Name,
}
// When pod priority is enabled, we would like to place an unschedulable
// pod in the unschedulable queue. This ensures that if the pod is nominated
// to run on a node, scheduler takes the pod into account when running
// predicates for the node.
if !util.PodPriorityEnabled() {
if !backoff.TryBackoffAndWait(podID, stopEverything) {
klog.Warningf("Request for pod %v already in flight, abandoning", podID)
return
}
}
// An unschedulable pod will be placed in the unschedulable queue.
// This ensures that if the pod is nominated to run on a node,
// scheduler takes the pod into account when running predicates for the node.
// Get the pod again; it may have changed/been scheduled already.
getBackoff := initialGetBackoff
for {

View File

@ -26,6 +26,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
@ -252,48 +253,125 @@ func TestDefaultErrorFunc(t *testing.T) {
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
stopCh := make(chan struct{})
defer close(stopCh)
queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
schedulerCache := internalcache.New(30*time.Second, stopCh)
podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second)
errFunc := MakeDefaultErrorFunc(client, podBackoff, queue, schedulerCache, stopCh)
timestamp := time.Now()
queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp))
schedulerCache := internalcache.New(30*time.Second, stopCh)
errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh)
// Trigger error handling again to put the pod in unschedulable queue
errFunc(testPod, nil)
for {
// This is a terrible way to do this but I plan on replacing this
// whole error handling system in the future. The test will time
// out if something doesn't work.
time.Sleep(10 * time.Millisecond)
got, exists, _ := queue.Get(testPod)
if !exists {
// Try up to a minute to retrieve the error pod from priority queue
foundPodFlag := false
maxIterations := 10 * 60
for i := 0; i < maxIterations; i++ {
time.Sleep(100 * time.Millisecond)
got := getPodfromPriorityQueue(queue, testPod)
if got == nil {
continue
}
requestReceived := false
actions := client.Actions()
for _, a := range actions {
if a.GetVerb() == "get" {
getAction, ok := a.(clienttesting.GetAction)
if !ok {
t.Errorf("Can't cast action object to GetAction interface")
break
}
name := getAction.GetName()
ns := a.GetNamespace()
if name != "foo" || ns != "bar" {
t.Errorf("Expected name %s namespace %s, got %s %s",
"foo", "bar", name, ns)
}
requestReceived = true
}
}
if !requestReceived {
t.Errorf("Get pod request not received")
}
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
foundPodFlag = true
break
}
if !foundPodFlag {
t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod)
}
// Remove the pod from priority queue to test putting error
// pod in backoff queue.
queue.Delete(testPod)
// Trigger a move request
queue.MoveAllToActiveQueue()
// Trigger error handling again to put the pod in backoff queue
errFunc(testPod, nil)
foundPodFlag = false
for i := 0; i < maxIterations; i++ {
time.Sleep(100 * time.Millisecond)
// The pod should be found from backoff queue at this time
got := getPodfromPriorityQueue(queue, testPod)
if got == nil {
continue
}
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
foundPodFlag = true
break
}
if !foundPodFlag {
t.Errorf("Failed to get pod from the backoff queue after waiting for a minute: %v", testPod)
}
}
// getPodfromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
// the specific pod from the given priority queue. It returns the found pod in the priority queue.
func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
podList := queue.PendingPods()
if len(podList) == 0 {
return nil
}
queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
return nil
}
for _, foundPod := range podList {
foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
if err != nil {
return nil
}
if foundPodKey == queryPodKey {
return foundPod
}
}
return nil
}
// testClientGetPodRequest function provides a routine used by TestDefaultErrorFunc test.
// It tests whether the fake client can receive request and correctly "get" the namespace
// and name of the error pod.
func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, podName string) {
requestReceived := false
actions := client.Actions()
for _, a := range actions {
if a.GetVerb() == "get" {
getAction, ok := a.(clienttesting.GetAction)
if !ok {
t.Errorf("Can't cast action object to GetAction interface")
break
}
name := getAction.GetName()
ns := a.GetNamespace()
if name != podName || ns != podNs {
t.Errorf("Expected name %s namespace %s, got %s %s",
podName, podNs, name, ns)
}
requestReceived = true
}
}
if !requestReceived {
t.Errorf("Get pod request not received")
}
}
func TestBind(t *testing.T) {

View File

@ -60,20 +60,6 @@ func (pbm *PodBackoffMap) GetBackoffTime(nsPod ktypes.NamespacedName) (time.Time
return backoffTime, true
}
// TryBackoffAndWait tries to perform backoff for a non-preempting pod.
// it is invoked from factory.go if util.PodPriorityEnabled() returns false.
func (pbm *PodBackoffMap) TryBackoffAndWait(nsPod ktypes.NamespacedName, stop <-chan struct{}) bool {
pbm.lock.RLock()
defer pbm.lock.RUnlock()
backoffDuration := pbm.calculateBackoffDuration(nsPod)
select {
case <-time.After(backoffDuration):
return true
case <-stop:
return false
}
}
// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {

View File

@ -92,18 +92,3 @@ func TestClearPodBackoff(t *testing.T) {
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String())
}
}
func TestTryBackoffAndWait(t *testing.T) {
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second)
stopCh := make(chan struct{})
podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"}
if !bpm.TryBackoffAndWait(podID, stopCh) {
t.Error("Expected TryBackoffAndWait success for new pod, got failure.")
}
close(stopCh)
if bpm.TryBackoffAndWait(podID, stopCh) {
t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.")
}
}

View File

@ -15,14 +15,11 @@ limitations under the License.
*/
// This file contains structures that implement scheduling queue types.
// Scheduling queues hold pods waiting to be scheduled. This file has two types
// of scheduling queue: 1) a FIFO, which is mostly the same as cache.FIFO, 2) a
// Scheduling queues hold pods waiting to be scheduled. This file implements a
// priority queue which has two sub queues. One sub-queue holds pods that are
// being considered for scheduling. This is called activeQ. Another queue holds
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ.
// FIFO is here for flag-gating purposes and allows us to use the traditional
// scheduling queue when util.PodPriorityEnabled() returns false.
package queue
@ -89,113 +86,9 @@ type SchedulingQueue interface {
NumUnschedulablePods() int
}
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
// enabled a priority queue is returned. If it is disabled, a FIFO is returned.
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue {
if util.PodPriorityEnabled() {
return NewPriorityQueue(stop)
}
return NewFIFO()
}
// FIFO is basically a simple wrapper around cache.FIFO to make it compatible
// with the SchedulingQueue interface.
type FIFO struct {
*cache.FIFO
}
var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue.
// Add adds a pod to the FIFO.
func (f *FIFO) Add(pod *v1.Pod) error {
return f.FIFO.Add(pod)
}
// AddIfNotPresent adds a pod to the FIFO if it is absent in the FIFO.
func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error {
return f.FIFO.AddIfNotPresent(pod)
}
// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In
// FIFO it is added to the end of the queue.
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
return f.FIFO.AddIfNotPresent(pod)
}
// SchedulingCycle implements SchedulingQueue.SchedulingCycle interface.
func (f *FIFO) SchedulingCycle() int64 {
return 0
}
// Update updates a pod in the FIFO.
func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
return f.FIFO.Update(newPod)
}
// Delete deletes a pod in the FIFO.
func (f *FIFO) Delete(pod *v1.Pod) error {
return f.FIFO.Delete(pod)
}
// Pop removes the head of FIFO and returns it.
// This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler
// has always been using. There is a comment in that file saying that this method
// shouldn't be used in production code, but scheduler has always been using it.
// This function does minimal error checking.
func (f *FIFO) Pop() (*v1.Pod, error) {
result, err := f.FIFO.Pop(func(obj interface{}) error { return nil })
if err == cache.FIFOClosedError {
return nil, fmt.Errorf(queueClosed)
}
return result.(*v1.Pod), err
}
// PendingPods returns all the pods in the queue.
func (f *FIFO) PendingPods() []*v1.Pod {
result := []*v1.Pod{}
for _, pod := range f.FIFO.List() {
result = append(result, pod.(*v1.Pod))
}
return result
}
// FIFO does not need to react to events, as all pods are always in the active
// scheduling queue anyway.
// AssignedPodAdded does nothing here.
func (f *FIFO) AssignedPodAdded(pod *v1.Pod) {}
// AssignedPodUpdated does nothing here.
func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {}
// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue.
func (f *FIFO) MoveAllToActiveQueue() {}
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but FIFO does not support it.
func (f *FIFO) NominatedPodsForNode(nodeName string) []*v1.Pod {
return nil
}
// Close closes the FIFO queue.
func (f *FIFO) Close() {
f.FIFO.Close()
}
// DeleteNominatedPodIfExists does nothing in FIFO.
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
// UpdateNominatedPodForNode does nothing in FIFO.
func (f *FIFO) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {}
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
func (f *FIFO) NumUnschedulablePods() int {
return 0
}
// NewFIFO creates a FIFO object.
func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
return NewPriorityQueue(stop)
}
// NominatedNodeName returns nominated node name of a Pod.
@ -203,7 +96,7 @@ func NominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName
}
// PriorityQueue implements a scheduling queue. It is an alternative to FIFO.
// PriorityQueue implements a scheduling queue.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has three sub queues. One sub-queue holds pods that are being considered for
// scheduling. This is called activeQ and is a Heap. Another queue holds

View File

@ -688,11 +688,6 @@ func TestSchedulingQueue_Close(t *testing.T) {
q SchedulingQueue
expectedErr error
}{
{
name: "FIFO close",
q: NewFIFO(),
expectedErr: fmt.Errorf(queueClosed),
},
{
name: "PriorityQueue close",
q: NewPriorityQueue(nil),