Implement scheduler.util.backoff as a queue

We are going to use PodBackoff for controlling backoff when adding
unschedulable pods back to the active scheduling queue. In order to do
this more easily, limit the interface for PodBackoff to only this struct
(rather than exposing BackoffEntry) and change the backing expiry
implementation to be queue based.
pull/564/head
Gregory Haynes 2017-12-10 18:34:04 +00:00
parent c821f2ed2f
commit 082b48240a
5 changed files with 105 additions and 41 deletions

View File

@ -1493,8 +1493,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
// to run on a node, scheduler takes the pod into account when running // to run on a node, scheduler takes the pod into account when running
// predicates for the node. // predicates for the node.
if !util.PodPriorityEnabled() { if !util.PodPriorityEnabled() {
entry := backoff.GetEntry(podID) if !backoff.TryBackoffAndWait(podID) {
if !entry.TryWait(backoff.MaxDuration()) {
klog.Warningf("Request for pod %v already in flight, abandoning", podID) klog.Warningf("Request for pod %v already in flight, abandoning", podID)
return return
} }

View File

@ -11,7 +11,6 @@ go_test(
srcs = [ srcs = [
"backoff_utils_test.go", "backoff_utils_test.go",
"heap_test.go", "heap_test.go",
"testutil_test.go",
"utils_test.go", "utils_test.go",
], ],
embed = [":go_default_library"], embed = [":go_default_library"],
@ -28,7 +27,6 @@ go_library(
srcs = [ srcs = [
"backoff_utils.go", "backoff_utils.go",
"heap.go", "heap.go",
"testutil.go",
"utils.go", "utils.go",
], ],
importpath = "k8s.io/kubernetes/pkg/scheduler/util", importpath = "k8s.io/kubernetes/pkg/scheduler/util",

View File

@ -37,10 +37,11 @@ func (realClock) Now() time.Time {
return time.Now() return time.Now()
} }
// BackoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. // backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time.
// It is expected that all users will only use the public TryWait(...) method
// It is also not safe to copy this object. // It is also not safe to copy this object.
type BackoffEntry struct { type backoffEntry struct {
initialized bool
podName ktypes.NamespacedName
backoff time.Duration backoff time.Duration
lastUpdate time.Time lastUpdate time.Time
reqInFlight int32 reqInFlight int32
@ -48,45 +49,46 @@ type BackoffEntry struct {
// tryLock attempts to acquire a lock via atomic compare and swap. // tryLock attempts to acquire a lock via atomic compare and swap.
// returns true if the lock was acquired, false otherwise // returns true if the lock was acquired, false otherwise
func (b *BackoffEntry) tryLock() bool { func (b *backoffEntry) tryLock() bool {
return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1) return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1)
} }
// unlock returns the lock. panics if the lock isn't held // unlock returns the lock. panics if the lock isn't held
func (b *BackoffEntry) unlock() { func (b *backoffEntry) unlock() {
if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) { if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) {
panic(fmt.Sprintf("unexpected state on unlocking: %+v", b)) panic(fmt.Sprintf("unexpected state on unlocking: %+v", b))
} }
} }
// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for. // backoffTime returns the Time when a backoffEntry completes backoff
func (b *BackoffEntry) TryWait(maxDuration time.Duration) bool { func (b *backoffEntry) backoffTime() time.Time {
if !b.tryLock() { return b.lastUpdate.Add(b.backoff)
return false
}
defer b.unlock()
b.wait(maxDuration)
return true
} }
func (b *BackoffEntry) getBackoff(maxDuration time.Duration) time.Duration { // getBackoff returns the duration until this entry completes backoff
duration := b.backoff func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration {
newDuration := time.Duration(duration) * 2 if !b.initialized {
b.initialized = true
return b.backoff
}
newDuration := b.backoff * 2
if newDuration > maxDuration { if newDuration > maxDuration {
newDuration = maxDuration newDuration = maxDuration
} }
b.backoff = newDuration b.backoff = newDuration
klog.V(4).Infof("Backing off %s", duration.String()) klog.V(4).Infof("Backing off %s", newDuration.String())
return duration return newDuration
} }
func (b *BackoffEntry) wait(maxDuration time.Duration) { // backoffAndWait Blocks until this entry has completed backoff
func (b *backoffEntry) backoffAndWait(maxDuration time.Duration) {
time.Sleep(b.getBackoff(maxDuration)) time.Sleep(b.getBackoff(maxDuration))
} }
// PodBackoff is used to restart a pod with back-off delay. // PodBackoff is used to restart a pod with back-off delay.
type PodBackoff struct { type PodBackoff struct {
perPodBackoff map[ktypes.NamespacedName]*BackoffEntry // expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd
expiryQ *Heap
lock sync.Mutex lock sync.Mutex
clock clock clock clock
defaultDuration time.Duration defaultDuration time.Duration
@ -111,24 +113,54 @@ func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff {
// CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock. // CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock.
func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff { func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff {
return &PodBackoff{ return &PodBackoff{
perPodBackoff: map[ktypes.NamespacedName]*BackoffEntry{}, expiryQ: NewHeap(backoffEntryKeyFunc, backoffEntryCompareUpdate),
clock: clock, clock: clock,
defaultDuration: defaultDuration, defaultDuration: defaultDuration,
maxDuration: maxDuration, maxDuration: maxDuration,
} }
} }
// GetEntry returns a back-off entry by Pod ID. // getEntry returns the backoffEntry for a given podID
func (p *PodBackoff) GetEntry(podID ktypes.NamespacedName) *BackoffEntry { func (p *PodBackoff) getEntry(podID ktypes.NamespacedName) *backoffEntry {
entry, exists, _ := p.expiryQ.GetByKey(podID.String())
var be *backoffEntry
if !exists {
be = &backoffEntry{
initialized: false,
podName: podID,
backoff: p.defaultDuration,
}
p.expiryQ.Update(be)
} else {
be = entry.(*backoffEntry)
}
return be
}
// BackoffPod updates the backoff for a podId and returns the duration until backoff completion
func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
entry, ok := p.perPodBackoff[podID] entry := p.getEntry(podID)
if !ok {
entry = &BackoffEntry{backoff: p.defaultDuration}
p.perPodBackoff[podID] = entry
}
entry.lastUpdate = p.clock.Now() entry.lastUpdate = p.clock.Now()
return entry p.expiryQ.Update(entry)
return entry.getBackoff(p.maxDuration)
}
// TryBackoffAndWait tries to acquire the backoff lock
func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool {
p.lock.Lock()
entry := p.getEntry(podID)
if !entry.tryLock() {
p.lock.Unlock()
return false
}
defer entry.unlock()
duration := entry.getBackoff(p.maxDuration)
p.lock.Unlock()
time.Sleep(duration)
return true
} }
// Gc execute garbage collection on the pod back-off. // Gc execute garbage collection on the pod back-off.
@ -136,9 +168,30 @@ func (p *PodBackoff) Gc() {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
now := p.clock.Now() now := p.clock.Now()
for podID, entry := range p.perPodBackoff { var be *backoffEntry
if now.Sub(entry.lastUpdate) > p.maxDuration { for {
delete(p.perPodBackoff, podID) entry := p.expiryQ.Peek()
if entry == nil {
break
}
be = entry.(*backoffEntry)
if now.Sub(be.lastUpdate) > p.maxDuration {
p.expiryQ.Pop()
} else {
break
} }
} }
} }
// backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap
func backoffEntryKeyFunc(b interface{}) (string, error) {
be := b.(*backoffEntry)
return be.podName.String(), nil
}
// backoffEntryCompareUpdate returns true when b1's backoff time is before b2's
func backoffEntryCompareUpdate(b1, b2 interface{}) bool {
be1 := b1.(*backoffEntry)
be2 := b2.(*backoffEntry)
return be1.lastUpdate.Before(be2.lastUpdate)
}

View File

@ -64,7 +64,7 @@ func TestBackoff(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
duration := backoff.GetEntry(test.podID).getBackoff(backoff.maxDuration) duration := backoff.BackoffPod(test.podID)
if duration != test.expectedDuration { if duration != test.expectedDuration {
t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID)
} }
@ -72,14 +72,15 @@ func TestBackoff(t *testing.T) {
backoff.Gc() backoff.Gc()
} }
fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"} fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"}
backoff.perPodBackoff[fooID].backoff = 60 * time.Second be := backoff.getEntry(fooID)
duration := backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) be.backoff = 60 * time.Second
duration := backoff.BackoffPod(fooID)
if duration != 60*time.Second { if duration != 60*time.Second {
t.Errorf("expected: 60, got %s", duration.String()) t.Errorf("expected: 60, got %s", duration.String())
} }
// Verify that we split on namespaces correctly, same name, different namespace // Verify that we split on namespaces correctly, same name, different namespace
fooID.Namespace = "other" fooID.Namespace = "other"
duration = backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) duration = backoff.BackoffPod(fooID)
if duration != 1*time.Second { if duration != 1*time.Second {
t.Errorf("expected: 1, got %s", duration.String()) t.Errorf("expected: 1, got %s", duration.String())
} }

View File

@ -113,6 +113,14 @@ func (h *heapData) Pop() interface{} {
return item.obj return item.obj
} }
// Peek is supposed to be called by heap.Peek only.
func (h *heapData) Peek() interface{} {
if len(h.queue) > 0 {
return h.items[h.queue[0]].obj
}
return nil
}
// Heap is a producer/consumer queue that implements a heap data structure. // Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures. // It can be used to implement priority queues and similar data structures.
type Heap struct { type Heap struct {
@ -169,7 +177,12 @@ func (h *Heap) Delete(obj interface{}) error {
return fmt.Errorf("object not found") return fmt.Errorf("object not found")
} }
// Pop returns the head of the heap. // Peek returns the head of the heap without removing it.
func (h *Heap) Peek() interface{} {
return h.data.Peek()
}
// Pop returns the head of the heap and removes it.
func (h *Heap) Pop() (interface{}, error) { func (h *Heap) Pop() (interface{}, error) {
obj := heap.Pop(h.data) obj := heap.Pop(h.data)
if obj != nil { if obj != nil {