From 082b48240a12138169af0f35a1d7d7bea9ed8f16 Mon Sep 17 00:00:00 2001 From: Gregory Haynes Date: Sun, 10 Dec 2017 18:34:04 +0000 Subject: [PATCH] Implement scheduler.util.backoff as a queue We are going to use PodBackoff for controlling backoff when adding unschedulable pods back to the active scheduling queue. In order to do this more easily, limit the interface for PodBackoff to only this struct (rather than exposing BackoffEntry) and change the backing expiry implementation to be queue based. --- pkg/scheduler/factory/factory.go | 3 +- pkg/scheduler/util/BUILD | 2 - pkg/scheduler/util/backoff_utils.go | 117 ++++++++++++++++------- pkg/scheduler/util/backoff_utils_test.go | 9 +- pkg/scheduler/util/heap.go | 15 ++- 5 files changed, 105 insertions(+), 41 deletions(-) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 120806aef7..188a0069b6 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -1493,8 +1493,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue // to run on a node, scheduler takes the pod into account when running // predicates for the node. if !util.PodPriorityEnabled() { - entry := backoff.GetEntry(podID) - if !entry.TryWait(backoff.MaxDuration()) { + if !backoff.TryBackoffAndWait(podID) { klog.Warningf("Request for pod %v already in flight, abandoning", podID) return } diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 19374a7845..0f8c07e080 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -11,7 +11,6 @@ go_test( srcs = [ "backoff_utils_test.go", "heap_test.go", - "testutil_test.go", "utils_test.go", ], embed = [":go_default_library"], @@ -28,7 +27,6 @@ go_library( srcs = [ "backoff_utils.go", "heap.go", - "testutil.go", "utils.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/util", diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go index 506cd1270a..e77e808658 100644 --- a/pkg/scheduler/util/backoff_utils.go +++ b/pkg/scheduler/util/backoff_utils.go @@ -37,10 +37,11 @@ func (realClock) Now() time.Time { return time.Now() } -// BackoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. -// It is expected that all users will only use the public TryWait(...) method +// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. // It is also not safe to copy this object. -type BackoffEntry struct { +type backoffEntry struct { + initialized bool + podName ktypes.NamespacedName backoff time.Duration lastUpdate time.Time reqInFlight int32 @@ -48,45 +49,46 @@ type BackoffEntry struct { // tryLock attempts to acquire a lock via atomic compare and swap. // returns true if the lock was acquired, false otherwise -func (b *BackoffEntry) tryLock() bool { +func (b *backoffEntry) tryLock() bool { return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1) } // unlock returns the lock. panics if the lock isn't held -func (b *BackoffEntry) unlock() { +func (b *backoffEntry) unlock() { if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) { panic(fmt.Sprintf("unexpected state on unlocking: %+v", b)) } } -// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for. -func (b *BackoffEntry) TryWait(maxDuration time.Duration) bool { - if !b.tryLock() { - return false - } - defer b.unlock() - b.wait(maxDuration) - return true +// backoffTime returns the Time when a backoffEntry completes backoff +func (b *backoffEntry) backoffTime() time.Time { + return b.lastUpdate.Add(b.backoff) } -func (b *BackoffEntry) getBackoff(maxDuration time.Duration) time.Duration { - duration := b.backoff - newDuration := time.Duration(duration) * 2 +// getBackoff returns the duration until this entry completes backoff +func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { + if !b.initialized { + b.initialized = true + return b.backoff + } + newDuration := b.backoff * 2 if newDuration > maxDuration { newDuration = maxDuration } b.backoff = newDuration - klog.V(4).Infof("Backing off %s", duration.String()) - return duration + klog.V(4).Infof("Backing off %s", newDuration.String()) + return newDuration } -func (b *BackoffEntry) wait(maxDuration time.Duration) { +// backoffAndWait Blocks until this entry has completed backoff +func (b *backoffEntry) backoffAndWait(maxDuration time.Duration) { time.Sleep(b.getBackoff(maxDuration)) } // PodBackoff is used to restart a pod with back-off delay. type PodBackoff struct { - perPodBackoff map[ktypes.NamespacedName]*BackoffEntry + // expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd + expiryQ *Heap lock sync.Mutex clock clock defaultDuration time.Duration @@ -111,24 +113,54 @@ func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff { // CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock. func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff { return &PodBackoff{ - perPodBackoff: map[ktypes.NamespacedName]*BackoffEntry{}, + expiryQ: NewHeap(backoffEntryKeyFunc, backoffEntryCompareUpdate), clock: clock, defaultDuration: defaultDuration, maxDuration: maxDuration, } } -// GetEntry returns a back-off entry by Pod ID. -func (p *PodBackoff) GetEntry(podID ktypes.NamespacedName) *BackoffEntry { +// getEntry returns the backoffEntry for a given podID +func (p *PodBackoff) getEntry(podID ktypes.NamespacedName) *backoffEntry { + entry, exists, _ := p.expiryQ.GetByKey(podID.String()) + var be *backoffEntry + if !exists { + be = &backoffEntry{ + initialized: false, + podName: podID, + backoff: p.defaultDuration, + } + p.expiryQ.Update(be) + } else { + be = entry.(*backoffEntry) + } + return be +} + +// BackoffPod updates the backoff for a podId and returns the duration until backoff completion +func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration { p.lock.Lock() defer p.lock.Unlock() - entry, ok := p.perPodBackoff[podID] - if !ok { - entry = &BackoffEntry{backoff: p.defaultDuration} - p.perPodBackoff[podID] = entry - } + entry := p.getEntry(podID) entry.lastUpdate = p.clock.Now() - return entry + p.expiryQ.Update(entry) + return entry.getBackoff(p.maxDuration) +} + +// TryBackoffAndWait tries to acquire the backoff lock +func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool { + p.lock.Lock() + entry := p.getEntry(podID) + + if !entry.tryLock() { + p.lock.Unlock() + return false + } + defer entry.unlock() + duration := entry.getBackoff(p.maxDuration) + p.lock.Unlock() + time.Sleep(duration) + return true } // Gc execute garbage collection on the pod back-off. @@ -136,9 +168,30 @@ func (p *PodBackoff) Gc() { p.lock.Lock() defer p.lock.Unlock() now := p.clock.Now() - for podID, entry := range p.perPodBackoff { - if now.Sub(entry.lastUpdate) > p.maxDuration { - delete(p.perPodBackoff, podID) + var be *backoffEntry + for { + entry := p.expiryQ.Peek() + if entry == nil { + break + } + be = entry.(*backoffEntry) + if now.Sub(be.lastUpdate) > p.maxDuration { + p.expiryQ.Pop() + } else { + break } } } + +// backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap +func backoffEntryKeyFunc(b interface{}) (string, error) { + be := b.(*backoffEntry) + return be.podName.String(), nil +} + +// backoffEntryCompareUpdate returns true when b1's backoff time is before b2's +func backoffEntryCompareUpdate(b1, b2 interface{}) bool { + be1 := b1.(*backoffEntry) + be2 := b2.(*backoffEntry) + return be1.lastUpdate.Before(be2.lastUpdate) +} diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go index 8f61b637e7..8a618ebb32 100644 --- a/pkg/scheduler/util/backoff_utils_test.go +++ b/pkg/scheduler/util/backoff_utils_test.go @@ -64,7 +64,7 @@ func TestBackoff(t *testing.T) { } for _, test := range tests { - duration := backoff.GetEntry(test.podID).getBackoff(backoff.maxDuration) + duration := backoff.BackoffPod(test.podID) if duration != test.expectedDuration { t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) } @@ -72,14 +72,15 @@ func TestBackoff(t *testing.T) { backoff.Gc() } fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"} - backoff.perPodBackoff[fooID].backoff = 60 * time.Second - duration := backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + be := backoff.getEntry(fooID) + be.backoff = 60 * time.Second + duration := backoff.BackoffPod(fooID) if duration != 60*time.Second { t.Errorf("expected: 60, got %s", duration.String()) } // Verify that we split on namespaces correctly, same name, different namespace fooID.Namespace = "other" - duration = backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + duration = backoff.BackoffPod(fooID) if duration != 1*time.Second { t.Errorf("expected: 1, got %s", duration.String()) } diff --git a/pkg/scheduler/util/heap.go b/pkg/scheduler/util/heap.go index d7c5534868..0f15652c65 100644 --- a/pkg/scheduler/util/heap.go +++ b/pkg/scheduler/util/heap.go @@ -113,6 +113,14 @@ func (h *heapData) Pop() interface{} { return item.obj } +// Peek is supposed to be called by heap.Peek only. +func (h *heapData) Peek() interface{} { + if len(h.queue) > 0 { + return h.items[h.queue[0]].obj + } + return nil +} + // Heap is a producer/consumer queue that implements a heap data structure. // It can be used to implement priority queues and similar data structures. type Heap struct { @@ -169,7 +177,12 @@ func (h *Heap) Delete(obj interface{}) error { return fmt.Errorf("object not found") } -// Pop returns the head of the heap. +// Peek returns the head of the heap without removing it. +func (h *Heap) Peek() interface{} { + return h.data.Peek() +} + +// Pop returns the head of the heap and removes it. func (h *Heap) Pop() (interface{}, error) { obj := heap.Pop(h.data) if obj != nil {