From c821f2ed2f509435c37eabbc39e6b69e78169206 Mon Sep 17 00:00:00 2001 From: Gregory Haynes Date: Sat, 9 Dec 2017 23:09:48 +0000 Subject: [PATCH 1/4] Move scheduling Heap in to scheduler.core.utils The Heap data structure is useful for our backoff system in addition to scheduling queue. Move it to somewhere it can be consumed by both systems and properly export needed names. Also adding unit tests from client-go/tools/cache/heap.go. --- .../internal/queue/scheduling_queue.go | 204 +------------ .../internal/queue/scheduling_queue_test.go | 4 +- pkg/scheduler/util/BUILD | 4 + pkg/scheduler/util/heap.go | 223 ++++++++++++++ pkg/scheduler/util/heap_test.go | 271 ++++++++++++++++++ 5 files changed, 503 insertions(+), 203 deletions(-) create mode 100644 pkg/scheduler/util/heap.go create mode 100644 pkg/scheduler/util/heap_test.go diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 9bf40361b4..0b4728851b 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -27,7 +27,6 @@ limitations under the License. package queue import ( - "container/heap" "fmt" "reflect" "sync" @@ -184,7 +183,7 @@ type PriorityQueue struct { // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. - activeQ *Heap + activeQ *util.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap // nominatedPods is a map keyed by a node name and the value is a list of @@ -230,7 +229,7 @@ func activeQComp(pod1, pod2 interface{}) bool { // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue() *PriorityQueue { pq := &PriorityQueue{ - activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp), + activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp), unschedulableQ: newUnschedulablePodsMap(), nominatedPods: map[string][]*v1.Pod{}, } @@ -355,7 +354,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { func (p *PriorityQueue) Pop() (*v1.Pod, error) { p.lock.Lock() defer p.lock.Unlock() - for len(p.activeQ.data.queue) == 0 { + for p.activeQ.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, // which causes this loop to continue and return from the Pop(). @@ -591,200 +590,3 @@ func newUnschedulablePodsMap() *UnschedulablePodsMap { keyFunc: util.GetPodFullName, } } - -// Below is the implementation of the a heap. The logic is pretty much the same -// as cache.heap, however, this heap does not perform synchronization. It leaves -// synchronization to the SchedulingQueue. - -// LessFunc is a function type to compare two objects. -type LessFunc func(interface{}, interface{}) bool - -// KeyFunc is a function type to get the key from an object. -type KeyFunc func(obj interface{}) (string, error) - -type heapItem struct { - obj interface{} // The object which is stored in the heap. - index int // The index of the object's key in the Heap.queue. -} - -type itemKeyValue struct { - key string - obj interface{} -} - -// heapData is an internal struct that implements the standard heap interface -// and keeps the data stored in the heap. -type heapData struct { - // items is a map from key of the objects to the objects and their index. - // We depend on the property that items in the map are in the queue and vice versa. - items map[string]*heapItem - // queue implements a heap data structure and keeps the order of elements - // according to the heap invariant. The queue keeps the keys of objects stored - // in "items". - queue []string - - // keyFunc is used to make the key used for queued item insertion and retrieval, and - // should be deterministic. - keyFunc KeyFunc - // lessFunc is used to compare two objects in the heap. - lessFunc LessFunc -} - -var ( - _ = heap.Interface(&heapData{}) // heapData is a standard heap -) - -// Less compares two objects and returns true if the first one should go -// in front of the second one in the heap. -func (h *heapData) Less(i, j int) bool { - if i > len(h.queue) || j > len(h.queue) { - return false - } - itemi, ok := h.items[h.queue[i]] - if !ok { - return false - } - itemj, ok := h.items[h.queue[j]] - if !ok { - return false - } - return h.lessFunc(itemi.obj, itemj.obj) -} - -// Len returns the number of items in the Heap. -func (h *heapData) Len() int { return len(h.queue) } - -// Swap implements swapping of two elements in the heap. This is a part of standard -// heap interface and should never be called directly. -func (h *heapData) Swap(i, j int) { - h.queue[i], h.queue[j] = h.queue[j], h.queue[i] - item := h.items[h.queue[i]] - item.index = i - item = h.items[h.queue[j]] - item.index = j -} - -// Push is supposed to be called by heap.Push only. -func (h *heapData) Push(kv interface{}) { - keyValue := kv.(*itemKeyValue) - n := len(h.queue) - h.items[keyValue.key] = &heapItem{keyValue.obj, n} - h.queue = append(h.queue, keyValue.key) -} - -// Pop is supposed to be called by heap.Pop only. -func (h *heapData) Pop() interface{} { - key := h.queue[len(h.queue)-1] - h.queue = h.queue[0 : len(h.queue)-1] - item, ok := h.items[key] - if !ok { - // This is an error - return nil - } - delete(h.items, key) - return item.obj -} - -// Heap is a producer/consumer queue that implements a heap data structure. -// It can be used to implement priority queues and similar data structures. -type Heap struct { - // data stores objects and has a queue that keeps their ordering according - // to the heap invariant. - data *heapData -} - -// Add inserts an item, and puts it in the queue. The item is updated if it -// already exists. -func (h *Heap) Add(obj interface{}) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return cache.KeyError{Obj: obj, Err: err} - } - if _, exists := h.data.items[key]; exists { - h.data.items[key].obj = obj - heap.Fix(h.data, h.data.items[key].index) - } else { - heap.Push(h.data, &itemKeyValue{key, obj}) - } - return nil -} - -// AddIfNotPresent inserts an item, and puts it in the queue. If an item with -// the key is present in the map, no changes is made to the item. -func (h *Heap) AddIfNotPresent(obj interface{}) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return cache.KeyError{Obj: obj, Err: err} - } - if _, exists := h.data.items[key]; !exists { - heap.Push(h.data, &itemKeyValue{key, obj}) - } - return nil -} - -// Update is the same as Add in this implementation. When the item does not -// exist, it is added. -func (h *Heap) Update(obj interface{}) error { - return h.Add(obj) -} - -// Delete removes an item. -func (h *Heap) Delete(obj interface{}) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return cache.KeyError{Obj: obj, Err: err} - } - if item, ok := h.data.items[key]; ok { - heap.Remove(h.data, item.index) - return nil - } - return fmt.Errorf("object not found") -} - -// Pop returns the head of the heap. -func (h *Heap) Pop() (interface{}, error) { - obj := heap.Pop(h.data) - if obj != nil { - return obj, nil - } - return nil, fmt.Errorf("object was removed from heap data") -} - -// Get returns the requested item, or sets exists=false. -func (h *Heap) Get(obj interface{}) (interface{}, bool, error) { - key, err := h.data.keyFunc(obj) - if err != nil { - return nil, false, cache.KeyError{Obj: obj, Err: err} - } - return h.GetByKey(key) -} - -// GetByKey returns the requested item, or sets exists=false. -func (h *Heap) GetByKey(key string) (interface{}, bool, error) { - item, exists := h.data.items[key] - if !exists { - return nil, false, nil - } - return item.obj, true, nil -} - -// List returns a list of all the items. -func (h *Heap) List() []interface{} { - list := make([]interface{}, 0, len(h.data.items)) - for _, item := range h.data.items { - list = append(list, item.obj) - } - return list -} - -// newHeap returns a Heap which can be used to queue up items to process. -func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { - return &Heap{ - data: &heapData{ - items: map[string]*heapItem{}, - queue: []string{}, - keyFunc: keyFn, - lessFunc: lessFn, - }, - } -} diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 73ef4a0081..e7566e3299 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -199,7 +199,7 @@ func TestPriorityQueue_Update(t *testing.T) { } // Update highPriorityPod and add a nominatedNodeName to it. q.Update(&highPriorityPod, &highPriNominatedPod) - if q.activeQ.data.Len() != 1 { + if q.activeQ.Len() != 1 { t.Error("Expected only one item in activeQ.") } if len(q.nominatedPods) != 1 { @@ -250,7 +250,7 @@ func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { q.unschedulableQ.addOrUpdate(&unschedulablePod) q.unschedulableQ.addOrUpdate(&highPriorityPod) q.MoveAllToActiveQueue() - if q.activeQ.data.Len() != 3 { + if q.activeQ.Len() != 3 { t.Error("Expected all items to be in activeQ.") } } diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 810d2c5cb0..19374a7845 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -10,6 +10,8 @@ go_test( name = "go_default_test", srcs = [ "backoff_utils_test.go", + "heap_test.go", + "testutil_test.go", "utils_test.go", ], embed = [":go_default_library"], @@ -25,6 +27,8 @@ go_library( name = "go_default_library", srcs = [ "backoff_utils.go", + "heap.go", + "testutil.go", "utils.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/util", diff --git a/pkg/scheduler/util/heap.go b/pkg/scheduler/util/heap.go new file mode 100644 index 0000000000..d7c5534868 --- /dev/null +++ b/pkg/scheduler/util/heap.go @@ -0,0 +1,223 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Below is the implementation of the a heap. The logic is pretty much the same +// as cache.heap, however, this heap does not perform synchronization. It leaves +// synchronization to the SchedulingQueue. + +package util + +import ( + "container/heap" + "fmt" + + "k8s.io/client-go/tools/cache" +) + +// KeyFunc is a function type to get the key from an object. +type KeyFunc func(obj interface{}) (string, error) + +type heapItem struct { + obj interface{} // The object which is stored in the heap. + index int // The index of the object's key in the Heap.queue. +} + +type itemKeyValue struct { + key string + obj interface{} +} + +// heapData is an internal struct that implements the standard heap interface +// and keeps the data stored in the heap. +type heapData struct { + // items is a map from key of the objects to the objects and their index. + // We depend on the property that items in the map are in the queue and vice versa. + items map[string]*heapItem + // queue implements a heap data structure and keeps the order of elements + // according to the heap invariant. The queue keeps the keys of objects stored + // in "items". + queue []string + + // keyFunc is used to make the key used for queued item insertion and retrieval, and + // should be deterministic. + keyFunc KeyFunc + // lessFunc is used to compare two objects in the heap. + lessFunc LessFunc +} + +var ( + _ = heap.Interface(&heapData{}) // heapData is a standard heap +) + +// Less compares two objects and returns true if the first one should go +// in front of the second one in the heap. +func (h *heapData) Less(i, j int) bool { + if i > len(h.queue) || j > len(h.queue) { + return false + } + itemi, ok := h.items[h.queue[i]] + if !ok { + return false + } + itemj, ok := h.items[h.queue[j]] + if !ok { + return false + } + return h.lessFunc(itemi.obj, itemj.obj) +} + +// Len returns the number of items in the Heap. +func (h *heapData) Len() int { return len(h.queue) } + +// Swap implements swapping of two elements in the heap. This is a part of standard +// heap interface and should never be called directly. +func (h *heapData) Swap(i, j int) { + h.queue[i], h.queue[j] = h.queue[j], h.queue[i] + item := h.items[h.queue[i]] + item.index = i + item = h.items[h.queue[j]] + item.index = j +} + +// Push is supposed to be called by heap.Push only. +func (h *heapData) Push(kv interface{}) { + keyValue := kv.(*itemKeyValue) + n := len(h.queue) + h.items[keyValue.key] = &heapItem{keyValue.obj, n} + h.queue = append(h.queue, keyValue.key) +} + +// Pop is supposed to be called by heap.Pop only. +func (h *heapData) Pop() interface{} { + key := h.queue[len(h.queue)-1] + h.queue = h.queue[0 : len(h.queue)-1] + item, ok := h.items[key] + if !ok { + // This is an error + return nil + } + delete(h.items, key) + return item.obj +} + +// Heap is a producer/consumer queue that implements a heap data structure. +// It can be used to implement priority queues and similar data structures. +type Heap struct { + // data stores objects and has a queue that keeps their ordering according + // to the heap invariant. + data *heapData +} + +// Add inserts an item, and puts it in the queue. The item is updated if it +// already exists. +func (h *Heap) Add(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if _, exists := h.data.items[key]; exists { + h.data.items[key].obj = obj + heap.Fix(h.data, h.data.items[key].index) + } else { + heap.Push(h.data, &itemKeyValue{key, obj}) + } + return nil +} + +// AddIfNotPresent inserts an item, and puts it in the queue. If an item with +// the key is present in the map, no changes is made to the item. +func (h *Heap) AddIfNotPresent(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if _, exists := h.data.items[key]; !exists { + heap.Push(h.data, &itemKeyValue{key, obj}) + } + return nil +} + +// Update is the same as Add in this implementation. When the item does not +// exist, it is added. +func (h *Heap) Update(obj interface{}) error { + return h.Add(obj) +} + +// Delete removes an item. +func (h *Heap) Delete(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if item, ok := h.data.items[key]; ok { + heap.Remove(h.data, item.index) + return nil + } + return fmt.Errorf("object not found") +} + +// Pop returns the head of the heap. +func (h *Heap) Pop() (interface{}, error) { + obj := heap.Pop(h.data) + if obj != nil { + return obj, nil + } + return nil, fmt.Errorf("object was removed from heap data") +} + +// Get returns the requested item, or sets exists=false. +func (h *Heap) Get(obj interface{}) (interface{}, bool, error) { + key, err := h.data.keyFunc(obj) + if err != nil { + return nil, false, cache.KeyError{Obj: obj, Err: err} + } + return h.GetByKey(key) +} + +// GetByKey returns the requested item, or sets exists=false. +func (h *Heap) GetByKey(key string) (interface{}, bool, error) { + item, exists := h.data.items[key] + if !exists { + return nil, false, nil + } + return item.obj, true, nil +} + +// List returns a list of all the items. +func (h *Heap) List() []interface{} { + list := make([]interface{}, 0, len(h.data.items)) + for _, item := range h.data.items { + list = append(list, item.obj) + } + return list +} + +// Len returns the number of items in the heap. +func (h *Heap) Len() int { + return len(h.data.queue) +} + +// NewHeap returns a Heap which can be used to queue up items to process. +func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { + return &Heap{ + data: &heapData{ + items: map[string]*heapItem{}, + queue: []string{}, + keyFunc: keyFn, + lessFunc: lessFn, + }, + } +} diff --git a/pkg/scheduler/util/heap_test.go b/pkg/scheduler/util/heap_test.go new file mode 100644 index 0000000000..62812ec4c9 --- /dev/null +++ b/pkg/scheduler/util/heap_test.go @@ -0,0 +1,271 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was copied from client-go/tools/cache/heap.go and modified +// for our non thread-safe heap + +package util + +import ( + "testing" +) + +func testHeapObjectKeyFunc(obj interface{}) (string, error) { + return obj.(testHeapObject).name, nil +} + +type testHeapObject struct { + name string + val interface{} +} + +func mkHeapObj(name string, val interface{}) testHeapObject { + return testHeapObject{name: name, val: val} +} + +func compareInts(val1 interface{}, val2 interface{}) bool { + first := val1.(testHeapObject).val.(int) + second := val2.(testHeapObject).val.(int) + return first < second +} + +// TestHeapBasic tests Heap invariant +func TestHeapBasic(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + const amount = 500 + var i int + + for i = amount; i > 0; i-- { + h.Add(mkHeapObj(string([]rune{'a', rune(i)}), i)) + } + + // Make sure that the numbers are popped in ascending order. + prevNum := 0 + for i := 0; i < amount; i++ { + obj, err := h.Pop() + num := obj.(testHeapObject).val.(int) + // All the items must be sorted. + if err != nil || prevNum > num { + t.Errorf("got %v out of order, last was %v", obj, prevNum) + } + prevNum = num + } +} + +// Tests Heap.Add and ensures that heap invariant is preserved after adding items. +func TestHeap_Add(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("baz", 11)) + h.Add(mkHeapObj("zab", 30)) + h.Add(mkHeapObj("foo", 13)) // This updates "foo". + + item, err := h.Pop() + if e, a := 1, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 11, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + h.Delete(mkHeapObj("baz", 11)) // Nothing is deleted. + h.Add(mkHeapObj("foo", 14)) // foo is updated. + item, err = h.Pop() + if e, a := 14, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 30, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +// TestHeap_AddIfNotPresent tests Heap.AddIfNotPresent and ensures that heap +// invariant is preserved after adding items. +func TestHeap_AddIfNotPresent(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.AddIfNotPresent(mkHeapObj("foo", 10)) + h.AddIfNotPresent(mkHeapObj("bar", 1)) + h.AddIfNotPresent(mkHeapObj("baz", 11)) + h.AddIfNotPresent(mkHeapObj("zab", 30)) + h.AddIfNotPresent(mkHeapObj("foo", 13)) // This is not added. + + if len := len(h.data.items); len != 4 { + t.Errorf("unexpected number of items: %d", len) + } + if val := h.data.items["foo"].obj.(testHeapObject).val; val != 10 { + t.Errorf("unexpected value: %d", val) + } + item, err := h.Pop() + if e, a := 1, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 10, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + // bar is already popped. Let's add another one. + h.AddIfNotPresent(mkHeapObj("bar", 14)) + item, err = h.Pop() + if e, a := 11, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 14, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is +// preserved after deleting items. +func TestHeap_Delete(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("bal", 31)) + h.Add(mkHeapObj("baz", 11)) + + // Delete head. Delete should work with "key" and doesn't care about the value. + if err := h.Delete(mkHeapObj("bar", 200)); err != nil { + t.Fatalf("Failed to delete head.") + } + item, err := h.Pop() + if e, a := 10, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + h.Add(mkHeapObj("zab", 30)) + h.Add(mkHeapObj("faz", 30)) + len := h.data.Len() + // Delete non-existing item. + if err = h.Delete(mkHeapObj("non-existent", 10)); err == nil || len != h.data.Len() { + t.Fatalf("Didn't expect any item removal") + } + // Delete tail. + if err = h.Delete(mkHeapObj("bal", 31)); err != nil { + t.Fatalf("Failed to delete tail.") + } + // Delete one of the items with value 30. + if err = h.Delete(mkHeapObj("zab", 30)); err != nil { + t.Fatalf("Failed to delete item.") + } + item, err = h.Pop() + if e, a := 11, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 30, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if h.data.Len() != 0 { + t.Fatalf("expected an empty heap.") + } +} + +// TestHeap_Update tests Heap.Update and ensures that heap invariant is +// preserved after adding items. +func TestHeap_Update(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("bal", 31)) + h.Add(mkHeapObj("baz", 11)) + + // Update an item to a value that should push it to the head. + h.Update(mkHeapObj("baz", 0)) + if h.data.queue[0] != "baz" || h.data.items["baz"].index != 0 { + t.Fatalf("expected baz to be at the head") + } + item, err := h.Pop() + if e, a := 0, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + // Update bar to push it farther back in the queue. + h.Update(mkHeapObj("bar", 100)) + if h.data.queue[0] != "foo" || h.data.items["foo"].index != 0 { + t.Fatalf("expected foo to be at the head") + } +} + +// TestHeap_Get tests Heap.Get. +func TestHeap_Get(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("bal", 31)) + h.Add(mkHeapObj("baz", 11)) + + // Get works with the key. + obj, exists, err := h.Get(mkHeapObj("baz", 0)) + if err != nil || exists == false || obj.(testHeapObject).val != 11 { + t.Fatalf("unexpected error in getting element") + } + // Get non-existing object. + _, exists, err = h.Get(mkHeapObj("non-existing", 0)) + if err != nil || exists == true { + t.Fatalf("didn't expect to get any object") + } +} + +// TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get. +func TestHeap_GetByKey(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("bal", 31)) + h.Add(mkHeapObj("baz", 11)) + + obj, exists, err := h.GetByKey("baz") + if err != nil || exists == false || obj.(testHeapObject).val != 11 { + t.Fatalf("unexpected error in getting element") + } + // Get non-existing object. + _, exists, err = h.GetByKey("non-existing") + if err != nil || exists == true { + t.Fatalf("didn't expect to get any object") + } +} + +// TestHeap_List tests Heap.List function. +func TestHeap_List(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + list := h.List() + if len(list) != 0 { + t.Errorf("expected an empty list") + } + + items := map[string]int{ + "foo": 10, + "bar": 1, + "bal": 30, + "baz": 11, + "faz": 30, + } + for k, v := range items { + h.Add(mkHeapObj(k, v)) + } + list = h.List() + if len(list) != len(items) { + t.Errorf("expected %d items, got %d", len(items), len(list)) + } + for _, obj := range list { + heapObj := obj.(testHeapObject) + v, ok := items[heapObj.name] + if !ok || v != heapObj.val { + t.Errorf("unexpected item in the list: %v", heapObj) + } + } +} From 082b48240a12138169af0f35a1d7d7bea9ed8f16 Mon Sep 17 00:00:00 2001 From: Gregory Haynes Date: Sun, 10 Dec 2017 18:34:04 +0000 Subject: [PATCH 2/4] Implement scheduler.util.backoff as a queue We are going to use PodBackoff for controlling backoff when adding unschedulable pods back to the active scheduling queue. In order to do this more easily, limit the interface for PodBackoff to only this struct (rather than exposing BackoffEntry) and change the backing expiry implementation to be queue based. --- pkg/scheduler/factory/factory.go | 3 +- pkg/scheduler/util/BUILD | 2 - pkg/scheduler/util/backoff_utils.go | 117 ++++++++++++++++------- pkg/scheduler/util/backoff_utils_test.go | 9 +- pkg/scheduler/util/heap.go | 15 ++- 5 files changed, 105 insertions(+), 41 deletions(-) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 120806aef7..188a0069b6 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -1493,8 +1493,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue // to run on a node, scheduler takes the pod into account when running // predicates for the node. if !util.PodPriorityEnabled() { - entry := backoff.GetEntry(podID) - if !entry.TryWait(backoff.MaxDuration()) { + if !backoff.TryBackoffAndWait(podID) { klog.Warningf("Request for pod %v already in flight, abandoning", podID) return } diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 19374a7845..0f8c07e080 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -11,7 +11,6 @@ go_test( srcs = [ "backoff_utils_test.go", "heap_test.go", - "testutil_test.go", "utils_test.go", ], embed = [":go_default_library"], @@ -28,7 +27,6 @@ go_library( srcs = [ "backoff_utils.go", "heap.go", - "testutil.go", "utils.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/util", diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go index 506cd1270a..e77e808658 100644 --- a/pkg/scheduler/util/backoff_utils.go +++ b/pkg/scheduler/util/backoff_utils.go @@ -37,10 +37,11 @@ func (realClock) Now() time.Time { return time.Now() } -// BackoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. -// It is expected that all users will only use the public TryWait(...) method +// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. // It is also not safe to copy this object. -type BackoffEntry struct { +type backoffEntry struct { + initialized bool + podName ktypes.NamespacedName backoff time.Duration lastUpdate time.Time reqInFlight int32 @@ -48,45 +49,46 @@ type BackoffEntry struct { // tryLock attempts to acquire a lock via atomic compare and swap. // returns true if the lock was acquired, false otherwise -func (b *BackoffEntry) tryLock() bool { +func (b *backoffEntry) tryLock() bool { return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1) } // unlock returns the lock. panics if the lock isn't held -func (b *BackoffEntry) unlock() { +func (b *backoffEntry) unlock() { if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) { panic(fmt.Sprintf("unexpected state on unlocking: %+v", b)) } } -// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for. -func (b *BackoffEntry) TryWait(maxDuration time.Duration) bool { - if !b.tryLock() { - return false - } - defer b.unlock() - b.wait(maxDuration) - return true +// backoffTime returns the Time when a backoffEntry completes backoff +func (b *backoffEntry) backoffTime() time.Time { + return b.lastUpdate.Add(b.backoff) } -func (b *BackoffEntry) getBackoff(maxDuration time.Duration) time.Duration { - duration := b.backoff - newDuration := time.Duration(duration) * 2 +// getBackoff returns the duration until this entry completes backoff +func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { + if !b.initialized { + b.initialized = true + return b.backoff + } + newDuration := b.backoff * 2 if newDuration > maxDuration { newDuration = maxDuration } b.backoff = newDuration - klog.V(4).Infof("Backing off %s", duration.String()) - return duration + klog.V(4).Infof("Backing off %s", newDuration.String()) + return newDuration } -func (b *BackoffEntry) wait(maxDuration time.Duration) { +// backoffAndWait Blocks until this entry has completed backoff +func (b *backoffEntry) backoffAndWait(maxDuration time.Duration) { time.Sleep(b.getBackoff(maxDuration)) } // PodBackoff is used to restart a pod with back-off delay. type PodBackoff struct { - perPodBackoff map[ktypes.NamespacedName]*BackoffEntry + // expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd + expiryQ *Heap lock sync.Mutex clock clock defaultDuration time.Duration @@ -111,24 +113,54 @@ func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff { // CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock. func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff { return &PodBackoff{ - perPodBackoff: map[ktypes.NamespacedName]*BackoffEntry{}, + expiryQ: NewHeap(backoffEntryKeyFunc, backoffEntryCompareUpdate), clock: clock, defaultDuration: defaultDuration, maxDuration: maxDuration, } } -// GetEntry returns a back-off entry by Pod ID. -func (p *PodBackoff) GetEntry(podID ktypes.NamespacedName) *BackoffEntry { +// getEntry returns the backoffEntry for a given podID +func (p *PodBackoff) getEntry(podID ktypes.NamespacedName) *backoffEntry { + entry, exists, _ := p.expiryQ.GetByKey(podID.String()) + var be *backoffEntry + if !exists { + be = &backoffEntry{ + initialized: false, + podName: podID, + backoff: p.defaultDuration, + } + p.expiryQ.Update(be) + } else { + be = entry.(*backoffEntry) + } + return be +} + +// BackoffPod updates the backoff for a podId and returns the duration until backoff completion +func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration { p.lock.Lock() defer p.lock.Unlock() - entry, ok := p.perPodBackoff[podID] - if !ok { - entry = &BackoffEntry{backoff: p.defaultDuration} - p.perPodBackoff[podID] = entry - } + entry := p.getEntry(podID) entry.lastUpdate = p.clock.Now() - return entry + p.expiryQ.Update(entry) + return entry.getBackoff(p.maxDuration) +} + +// TryBackoffAndWait tries to acquire the backoff lock +func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool { + p.lock.Lock() + entry := p.getEntry(podID) + + if !entry.tryLock() { + p.lock.Unlock() + return false + } + defer entry.unlock() + duration := entry.getBackoff(p.maxDuration) + p.lock.Unlock() + time.Sleep(duration) + return true } // Gc execute garbage collection on the pod back-off. @@ -136,9 +168,30 @@ func (p *PodBackoff) Gc() { p.lock.Lock() defer p.lock.Unlock() now := p.clock.Now() - for podID, entry := range p.perPodBackoff { - if now.Sub(entry.lastUpdate) > p.maxDuration { - delete(p.perPodBackoff, podID) + var be *backoffEntry + for { + entry := p.expiryQ.Peek() + if entry == nil { + break + } + be = entry.(*backoffEntry) + if now.Sub(be.lastUpdate) > p.maxDuration { + p.expiryQ.Pop() + } else { + break } } } + +// backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap +func backoffEntryKeyFunc(b interface{}) (string, error) { + be := b.(*backoffEntry) + return be.podName.String(), nil +} + +// backoffEntryCompareUpdate returns true when b1's backoff time is before b2's +func backoffEntryCompareUpdate(b1, b2 interface{}) bool { + be1 := b1.(*backoffEntry) + be2 := b2.(*backoffEntry) + return be1.lastUpdate.Before(be2.lastUpdate) +} diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go index 8f61b637e7..8a618ebb32 100644 --- a/pkg/scheduler/util/backoff_utils_test.go +++ b/pkg/scheduler/util/backoff_utils_test.go @@ -64,7 +64,7 @@ func TestBackoff(t *testing.T) { } for _, test := range tests { - duration := backoff.GetEntry(test.podID).getBackoff(backoff.maxDuration) + duration := backoff.BackoffPod(test.podID) if duration != test.expectedDuration { t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) } @@ -72,14 +72,15 @@ func TestBackoff(t *testing.T) { backoff.Gc() } fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"} - backoff.perPodBackoff[fooID].backoff = 60 * time.Second - duration := backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + be := backoff.getEntry(fooID) + be.backoff = 60 * time.Second + duration := backoff.BackoffPod(fooID) if duration != 60*time.Second { t.Errorf("expected: 60, got %s", duration.String()) } // Verify that we split on namespaces correctly, same name, different namespace fooID.Namespace = "other" - duration = backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + duration = backoff.BackoffPod(fooID) if duration != 1*time.Second { t.Errorf("expected: 1, got %s", duration.String()) } diff --git a/pkg/scheduler/util/heap.go b/pkg/scheduler/util/heap.go index d7c5534868..0f15652c65 100644 --- a/pkg/scheduler/util/heap.go +++ b/pkg/scheduler/util/heap.go @@ -113,6 +113,14 @@ func (h *heapData) Pop() interface{} { return item.obj } +// Peek is supposed to be called by heap.Peek only. +func (h *heapData) Peek() interface{} { + if len(h.queue) > 0 { + return h.items[h.queue[0]].obj + } + return nil +} + // Heap is a producer/consumer queue that implements a heap data structure. // It can be used to implement priority queues and similar data structures. type Heap struct { @@ -169,7 +177,12 @@ func (h *Heap) Delete(obj interface{}) error { return fmt.Errorf("object not found") } -// Pop returns the head of the heap. +// Peek returns the head of the heap without removing it. +func (h *Heap) Peek() interface{} { + return h.data.Peek() +} + +// Pop returns the head of the heap and removes it. func (h *Heap) Pop() (interface{}, error) { obj := heap.Pop(h.data) if obj != nil { From 5e4ccede4c9ad5e97f119113c883916f29412a68 Mon Sep 17 00:00:00 2001 From: Gregory Haynes Date: Tue, 24 Jul 2018 20:46:40 +0000 Subject: [PATCH 3/4] Reschedule with backoff With the alpha scheduling queue we move pods from unschedulable to active on certain events without a backoff. As a result we can cause starvation issues if high priority pods are in the unschedulable queue. Implement a backoff mechanism for pods being moved to active. Closes #56721 --- pkg/scheduler/core/extender_test.go | 2 +- pkg/scheduler/core/generic_scheduler_test.go | 10 +- pkg/scheduler/factory/factory.go | 2 +- pkg/scheduler/internal/queue/BUILD | 2 + .../internal/queue/scheduling_queue.go | 187 ++++++++++++++++-- .../internal/queue/scheduling_queue_test.go | 85 ++++++-- pkg/scheduler/util/BUILD | 2 + pkg/scheduler/util/backoff_utils.go | 29 ++- pkg/scheduler/util/backoff_utils_test.go | 30 ++- pkg/scheduler/util/clock.go | 34 ++++ 10 files changed, 340 insertions(+), 43 deletions(-) create mode 100644 pkg/scheduler/util/clock.go diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index f1f17beb43..fa3a95cc9f 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -507,7 +507,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for _, name := range test.nodes { cache.AddNode(createNode(name)) } - queue := internalqueue.NewSchedulingQueue() + queue := internalqueue.NewSchedulingQueue(nil) scheduler := NewGenericScheduler( cache, nil, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 2b66879692..4a35013aff 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -472,7 +472,7 @@ func TestGenericScheduler(t *testing.T) { scheduler := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, @@ -509,7 +509,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod s := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), predicates, algorithm.EmptyPredicateMetadataProducer, prioritizers, @@ -1436,7 +1436,7 @@ func TestPreempt(t *testing.T) { scheduler := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, @@ -1564,7 +1564,7 @@ func TestCacheInvalidationRace(t *testing.T) { scheduler := NewGenericScheduler( mockCache, eCache, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), ps, algorithm.EmptyPredicateMetadataProducer, prioritizers, @@ -1648,7 +1648,7 @@ func TestCacheInvalidationRace2(t *testing.T) { scheduler := NewGenericScheduler( cache, eCache, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), ps, algorithm.EmptyPredicateMetadataProducer, prioritizers, diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 188a0069b6..9a41cbd805 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -283,7 +283,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { c := &configFactory{ client: args.Client, podLister: schedulerCache, - podQueue: internalqueue.NewSchedulingQueue(), + podQueue: internalqueue.NewSchedulingQueue(stopEverything), nodeLister: args.NodeInformer.Lister(), pVLister: args.PvInformer.Lister(), pVCLister: args.PvcInformer.Lister(), diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 37a8d1f525..ea7b66fe3f 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -12,6 +12,8 @@ go_library( "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 0b4728851b..22242a9ce3 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -30,11 +30,14 @@ import ( "fmt" "reflect" "sync" + "time" "k8s.io/klog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ktypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -72,9 +75,9 @@ type SchedulingQueue interface { // NewSchedulingQueue initializes a new scheduling queue. If pod priority is // enabled a priority queue is returned. If it is disabled, a FIFO is returned. -func NewSchedulingQueue() SchedulingQueue { +func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue { if util.PodPriorityEnabled() { - return NewPriorityQueue() + return NewPriorityQueue(stop) } return NewFIFO() } @@ -178,12 +181,20 @@ func NominatedNodeName(pod *v1.Pod) string { // pods that are already tried and are determined to be unschedulable. The latter // is called unschedulableQ. type PriorityQueue struct { + stop <-chan struct{} + clock util.Clock + // podBackoff tracks backoff for pods attempting to be rescheduled + podBackoff *util.PodBackoff + lock sync.RWMutex cond sync.Cond // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. activeQ *util.Heap + // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff + // are popped from this heap before the scheduler looks at activeQ + podBackoffQ *util.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap // nominatedPods is a map keyed by a node name and the value is a list of @@ -227,16 +238,33 @@ func activeQComp(pod1, pod2 interface{}) bool { } // NewPriorityQueue creates a PriorityQueue object. -func NewPriorityQueue() *PriorityQueue { +func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue { + return NewPriorityQueueWithClock(stop, util.RealClock{}) +} + +// NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time. +func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *PriorityQueue { pq := &PriorityQueue{ + clock: clock, + stop: stop, + podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock), activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp), unschedulableQ: newUnschedulablePodsMap(), nominatedPods: map[string][]*v1.Pod{}, } pq.cond.L = &pq.lock + pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted) + + pq.run() + return pq } +// run starts the goroutine to pump from podBackoffQ to activeQ +func (p *PriorityQueue) run() { + go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) +} + // addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not // already exist in the map. Adding an existing pod is not going to update the pod. func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) { @@ -278,7 +306,7 @@ func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) { } // Add adds a pod to the active queue. It should be called only when a new pod -// is added so there is no chance the pod is already in either queue. +// is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() @@ -291,6 +319,10 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { p.deleteNominatedPodIfExists(pod) p.unschedulableQ.delete(pod) } + // Delete pod from backoffQ if it is backing off + if err = p.podBackoffQ.Delete(pod); err == nil { + klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name) + } p.addNominatedPodIfNeeded(pod) p.cond.Broadcast() } @@ -308,6 +340,9 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { if _, exists, _ := p.activeQ.Get(pod); exists { return nil } + if _, exists, _ := p.podBackoffQ.Get(pod); exists { + return nil + } err := p.activeQ.Add(pod) if err != nil { klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) @@ -323,6 +358,40 @@ func isPodUnschedulable(pod *v1.Pod) bool { return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable } +// nsNameForPod returns a namespacedname for a pod +func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName { + return ktypes.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + } +} + +// clearPodBackoff clears all backoff state for a pod (resets expiry) +func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) { + p.podBackoff.ClearPodBackoff(nsNameForPod(pod)) +} + +// isPodBackingOff returns whether a pod is currently undergoing backoff in the podBackoff structure +func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool { + boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) + if !exists { + return false + } + return boTime.After(p.clock.Now()) +} + +// backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff +// timeout otherwise it does nothing. +func (p *PriorityQueue) backoffPod(pod *v1.Pod) { + p.podBackoff.Gc() + + podID := nsNameForPod(pod) + boTime, found := p.podBackoff.GetBackoffTime(podID) + if !found || boTime.Before(p.clock.Now()) { + p.podBackoff.BackoffPod(podID) + } +} + // AddUnschedulableIfNotPresent does nothing if the pod is present in either // queue. Otherwise it adds the pod to the unschedulable queue if // p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true. @@ -335,11 +404,27 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { if _, exists, _ := p.activeQ.Get(pod); exists { return fmt.Errorf("pod is already present in the activeQ") } + if _, exists, _ := p.podBackoffQ.Get(pod); exists { + return fmt.Errorf("pod is already present in the backoffQ") + } if !p.receivedMoveRequest && isPodUnschedulable(pod) { + p.backoffPod(pod) p.unschedulableQ.addOrUpdate(pod) p.addNominatedPodIfNeeded(pod) return nil } + + // If a move request has been received and the pod is subject to backoff, move it to the BackoffQ. + if p.isPodBackingOff(pod) && isPodUnschedulable(pod) { + err := p.podBackoffQ.Add(pod) + if err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } else { + p.addNominatedPodIfNeeded(pod) + } + return err + } + err := p.activeQ.Add(pod) if err == nil { p.addNominatedPodIfNeeded(pod) @@ -348,6 +433,39 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { return err } +// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ +func (p *PriorityQueue) flushBackoffQCompleted() { + p.lock.Lock() + defer p.lock.Unlock() + + for { + rawPod := p.podBackoffQ.Peek() + if rawPod == nil { + return + } + pod := rawPod.(*v1.Pod) + boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) + if !found { + klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod)) + p.podBackoffQ.Pop() + p.activeQ.Add(pod) + defer p.cond.Broadcast() + continue + } + + if boTime.After(p.clock.Now()) { + return + } + _, err := p.podBackoffQ.Pop() + if err != nil { + klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod)) + return + } + p.activeQ.Add(pod) + defer p.cond.Broadcast() + } +} + // Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It also // clears receivedMoveRequest to mark the beginning of a new scheduling cycle. @@ -391,16 +509,33 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - // If the pod is already in the active queue, just update it there. - if _, exists, _ := p.activeQ.Get(newPod); exists { - p.updateNominatedPod(oldPod, newPod) - err := p.activeQ.Update(newPod) - return err + + if oldPod != nil { + // If the pod is already in the active queue, just update it there. + if _, exists, _ := p.activeQ.Get(oldPod); exists { + p.updateNominatedPod(oldPod, newPod) + err := p.activeQ.Update(newPod) + return err + } + + // If the pod is in the backoff queue, update it there. + if _, exists, _ := p.podBackoffQ.Get(oldPod); exists { + p.updateNominatedPod(oldPod, newPod) + p.podBackoffQ.Delete(newPod) + err := p.activeQ.Add(newPod) + if err == nil { + p.cond.Broadcast() + } + return err + } } + // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPod := p.unschedulableQ.get(newPod); usPod != nil { p.updateNominatedPod(oldPod, newPod) if isPodUpdated(oldPod, newPod) { + // If the pod is updated reset backoff + p.clearPodBackoff(newPod) p.unschedulableQ.delete(usPod) err := p.activeQ.Add(newPod) if err == nil { @@ -408,6 +543,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { } return err } + // Pod is already in unschedulable queue and hasnt updated, no need to backoff again p.unschedulableQ.addOrUpdate(newPod) return nil } @@ -428,6 +564,8 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.deleteNominatedPodIfExists(pod) err := p.activeQ.Delete(pod) if err != nil { // The item was probably not found in the activeQ. + p.clearPodBackoff(pod) + p.podBackoffQ.Delete(pod) p.unschedulableQ.delete(pod) } return nil @@ -453,16 +591,18 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { // function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives it after all the pods are in the // queue and the head is the highest priority pod. -// TODO(bsalamat): We should add a back-off mechanism here so that a high priority -// pod which is unschedulable does not go to the head of the queue frequently. For -// example in a cluster where a lot of pods being deleted, such a high priority -// pod can deprive other pods from getting scheduled. func (p *PriorityQueue) MoveAllToActiveQueue() { p.lock.Lock() defer p.lock.Unlock() for _, pod := range p.unschedulableQ.pods { - if err := p.activeQ.Add(pod); err != nil { - klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) + if p.isPodBackingOff(pod) { + if err := p.podBackoffQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } + } else { + if err := p.activeQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) + } } } p.unschedulableQ.clear() @@ -473,11 +613,16 @@ func (p *PriorityQueue) MoveAllToActiveQueue() { // NOTE: this function assumes lock has been acquired in caller func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { for _, pod := range pods { - if err := p.activeQ.Add(pod); err == nil { - p.unschedulableQ.delete(pod) + if p.isPodBackingOff(pod) { + if err := p.podBackoffQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } } else { - klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) + if err := p.activeQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) + } } + p.unschedulableQ.delete(pod) } p.receivedMoveRequest = true p.cond.Broadcast() @@ -550,6 +695,12 @@ func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) { p.lock.Unlock() } +func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool { + bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p1.(*v1.Pod))) + bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p2.(*v1.Pod))) + return bo1.Before(bo2) +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index e7566e3299..cc0a057fdf 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -95,7 +95,7 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. } func TestPriorityQueue_Add(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -120,7 +120,7 @@ func TestPriorityQueue_Add(t *testing.T) { } func TestPriorityQueue_AddIfNotPresent(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.unschedulableQ.addOrUpdate(&highPriNominatedPod) q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddIfNotPresent(&medPriorityPod) @@ -146,7 +146,7 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&highPriNominatedPod) q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ. @@ -172,7 +172,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { } func TestPriorityQueue_Pop(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -189,7 +189,7 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Update(nil, &highPriorityPod) if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) @@ -225,7 +225,7 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Update(&highPriorityPod, &highPriNominatedPod) q.Add(&unschedulablePod) q.Delete(&highPriNominatedPod) @@ -245,7 +245,7 @@ func TestPriorityQueue_Delete(t *testing.T) { } func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.unschedulableQ.addOrUpdate(&unschedulablePod) q.unschedulableQ.addOrUpdate(&highPriorityPod) @@ -291,7 +291,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { Spec: v1.PodSpec{NodeName: "machine1"}, } - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. q.unschedulableQ.addOrUpdate(&unschedulablePod) @@ -312,7 +312,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_WaitingPodsForNode(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -491,7 +491,7 @@ func TestSchedulingQueue_Close(t *testing.T) { }, { name: "PriorityQueue close", - q: NewPriorityQueue(), + q: NewPriorityQueue(nil), expectedErr: fmt.Errorf(queueClosed), }, } @@ -520,7 +520,7 @@ func TestSchedulingQueue_Close(t *testing.T) { // ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -567,3 +567,66 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { } } } + +// TestHighPriorityBackoff tests that a high priority pod does not block +// other pods if it is unschedulable +func TestHighProirotyBackoff(t *testing.T) { + q := NewPriorityQueue(nil) + + midPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-midpod", + Namespace: "ns1", + UID: types.UID("tp-mid"), + }, + Spec: v1.PodSpec{ + Priority: &midPriority, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + highPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-highpod", + Namespace: "ns1", + UID: types.UID("tp-high"), + }, + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + q.Add(&midPod) + q.Add(&highPod) + // Simulate a pod being popped by the scheduler, determined unschedulable, and + // then moved back to the active queue. + p, err := q.Pop() + if err != nil { + t.Errorf("Error while popping the head of the queue: %v", err) + } + if p != &highPod { + t.Errorf("Expected to get high prority pod, got: %v", p) + } + // Update pod condition to unschedulable. + podutil.UpdatePodCondition(&p.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + }) + // Put in the unschedulable queue. + q.AddUnschedulableIfNotPresent(p) + // Move all unschedulable pods to the active queue. + q.MoveAllToActiveQueue() + + p, err = q.Pop() + if err != nil { + t.Errorf("Error while popping the head of the queue: %v", err) + } + if p != &midPod { + t.Errorf("Expected to get mid prority pod, got: %v", p) + } +} diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 0f8c07e080..53649692f2 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -26,6 +26,7 @@ go_library( name = "go_default_library", srcs = [ "backoff_utils.go", + "clock.go", "heap.go", "utils.go", ], @@ -36,6 +37,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go index e77e808658..f800d9d5af 100644 --- a/pkg/scheduler/util/backoff_utils.go +++ b/pkg/scheduler/util/backoff_utils.go @@ -80,11 +80,6 @@ func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { return newDuration } -// backoffAndWait Blocks until this entry has completed backoff -func (b *backoffEntry) backoffAndWait(maxDuration time.Duration) { - time.Sleep(b.getBackoff(maxDuration)) -} - // PodBackoff is used to restart a pod with back-off delay. type PodBackoff struct { // expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd @@ -183,6 +178,30 @@ func (p *PodBackoff) Gc() { } } +// GetBackoffTime returns the time that podID completes backoff +func (p *PodBackoff) GetBackoffTime(podID ktypes.NamespacedName) (time.Time, bool) { + p.lock.Lock() + defer p.lock.Unlock() + rawBe, exists, _ := p.expiryQ.GetByKey(podID.String()) + if !exists { + return time.Time{}, false + } + be := rawBe.(*backoffEntry) + return be.lastUpdate.Add(be.backoff), true +} + +// ClearPodBackoff removes all tracking information for podID (clears expiry) +func (p *PodBackoff) ClearPodBackoff(podID ktypes.NamespacedName) bool { + p.lock.Lock() + defer p.lock.Unlock() + entry, exists, _ := p.expiryQ.GetByKey(podID.String()) + if exists { + err := p.expiryQ.Delete(entry) + return err == nil + } + return false +} + // backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap func backoffEntryKeyFunc(b interface{}) (string, error) { be := b.(*backoffEntry) diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go index 8a618ebb32..7aa045da3b 100644 --- a/pkg/scheduler/util/backoff_utils_test.go +++ b/pkg/scheduler/util/backoff_utils_test.go @@ -31,7 +31,7 @@ func (f *fakeClock) Now() time.Time { return f.t } -func TestBackoff(t *testing.T) { +func TestBackoffPod(t *testing.T) { clock := fakeClock{} backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) tests := []struct { @@ -66,7 +66,10 @@ func TestBackoff(t *testing.T) { for _, test := range tests { duration := backoff.BackoffPod(test.podID) if duration != test.expectedDuration { - t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) + t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID) + } + if boTime, _ := backoff.GetBackoffTime(test.podID); boTime != clock.Now().Add(test.expectedDuration) { + t.Errorf("expected GetBackoffTime %s, got %s for pod %s", test.expectedDuration.String(), boTime.String(), test.podID) } clock.t = clock.t.Add(test.advanceClock) backoff.Gc() @@ -85,3 +88,26 @@ func TestBackoff(t *testing.T) { t.Errorf("expected: 1, got %s", duration.String()) } } + +func TestClearPodBackoff(t *testing.T) { + clock := fakeClock{} + backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) + + if backoff.ClearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "nonexist"}) { + t.Error("Expected ClearPodBackoff failure for unknown pod, got success.") + } + + podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"} + if dur := backoff.BackoffPod(podID); dur != 1*time.Second { + t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, dur.String()) + } + + if !backoff.ClearPodBackoff(podID) { + t.Errorf("Failed to clear backoff for pod %v", podID) + } + + expectBoTime := clock.Now() + if boTime, _ := backoff.GetBackoffTime(podID); boTime != expectBoTime { + t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime) + } +} diff --git a/pkg/scheduler/util/clock.go b/pkg/scheduler/util/clock.go new file mode 100644 index 0000000000..e17c759dba --- /dev/null +++ b/pkg/scheduler/util/clock.go @@ -0,0 +1,34 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "time" +) + +// Clock provides an interface for getting the current time +type Clock interface { + Now() time.Time +} + +// RealClock implements a clock using time +type RealClock struct{} + +// Now returns the current time with time.Now +func (RealClock) Now() time.Time { + return time.Now() +} From 73710f06dbaf91851c14f946230f5bc4a3535cce Mon Sep 17 00:00:00 2001 From: Gregory Haynes Date: Thu, 6 Dec 2018 16:57:20 +0000 Subject: [PATCH 4/4] Check for shutdown in TryBackoffAndWait --- pkg/scheduler/factory/factory.go | 2 +- pkg/scheduler/util/backoff_utils.go | 10 +++++++--- pkg/scheduler/util/backoff_utils_test.go | 25 ++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 9a41cbd805..b008286db5 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -1493,7 +1493,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue // to run on a node, scheduler takes the pod into account when running // predicates for the node. if !util.PodPriorityEnabled() { - if !backoff.TryBackoffAndWait(podID) { + if !backoff.TryBackoffAndWait(podID, c.StopEverything) { klog.Warningf("Request for pod %v already in flight, abandoning", podID) return } diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go index f800d9d5af..618f93772f 100644 --- a/pkg/scheduler/util/backoff_utils.go +++ b/pkg/scheduler/util/backoff_utils.go @@ -143,7 +143,7 @@ func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration { } // TryBackoffAndWait tries to acquire the backoff lock -func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool { +func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName, stop <-chan struct{}) bool { p.lock.Lock() entry := p.getEntry(podID) @@ -154,8 +154,12 @@ func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool { defer entry.unlock() duration := entry.getBackoff(p.maxDuration) p.lock.Unlock() - time.Sleep(duration) - return true + select { + case <-time.After(duration): + return true + case <-stop: + return false + } } // Gc execute garbage collection on the pod back-off. diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go index 7aa045da3b..b99c9498f5 100644 --- a/pkg/scheduler/util/backoff_utils_test.go +++ b/pkg/scheduler/util/backoff_utils_test.go @@ -111,3 +111,28 @@ func TestClearPodBackoff(t *testing.T) { t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime) } } + +func TestTryBackoffAndWait(t *testing.T) { + clock := fakeClock{} + backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) + + stopCh := make(chan struct{}) + podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"} + if !backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait success for new pod, got failure.") + } + + be := backoff.getEntry(podID) + if !be.tryLock() { + t.Error("Failed to acquire lock for backoffentry") + } + + if backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait failure with lock acquired, got success.") + } + + close(stopCh) + if backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.") + } +}