diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 120806aef7..188a0069b6 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -1493,8 +1493,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue // to run on a node, scheduler takes the pod into account when running // predicates for the node. if !util.PodPriorityEnabled() { - entry := backoff.GetEntry(podID) - if !entry.TryWait(backoff.MaxDuration()) { + if !backoff.TryBackoffAndWait(podID) { klog.Warningf("Request for pod %v already in flight, abandoning", podID) return } diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 19374a7845..0f8c07e080 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -11,7 +11,6 @@ go_test( srcs = [ "backoff_utils_test.go", "heap_test.go", - "testutil_test.go", "utils_test.go", ], embed = [":go_default_library"], @@ -28,7 +27,6 @@ go_library( srcs = [ "backoff_utils.go", "heap.go", - "testutil.go", "utils.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/util", diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go index 506cd1270a..e77e808658 100644 --- a/pkg/scheduler/util/backoff_utils.go +++ b/pkg/scheduler/util/backoff_utils.go @@ -37,10 +37,11 @@ func (realClock) Now() time.Time { return time.Now() } -// BackoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. -// It is expected that all users will only use the public TryWait(...) method +// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. // It is also not safe to copy this object. -type BackoffEntry struct { +type backoffEntry struct { + initialized bool + podName ktypes.NamespacedName backoff time.Duration lastUpdate time.Time reqInFlight int32 @@ -48,45 +49,46 @@ type BackoffEntry struct { // tryLock attempts to acquire a lock via atomic compare and swap. // returns true if the lock was acquired, false otherwise -func (b *BackoffEntry) tryLock() bool { +func (b *backoffEntry) tryLock() bool { return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1) } // unlock returns the lock. panics if the lock isn't held -func (b *BackoffEntry) unlock() { +func (b *backoffEntry) unlock() { if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) { panic(fmt.Sprintf("unexpected state on unlocking: %+v", b)) } } -// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for. -func (b *BackoffEntry) TryWait(maxDuration time.Duration) bool { - if !b.tryLock() { - return false - } - defer b.unlock() - b.wait(maxDuration) - return true +// backoffTime returns the Time when a backoffEntry completes backoff +func (b *backoffEntry) backoffTime() time.Time { + return b.lastUpdate.Add(b.backoff) } -func (b *BackoffEntry) getBackoff(maxDuration time.Duration) time.Duration { - duration := b.backoff - newDuration := time.Duration(duration) * 2 +// getBackoff returns the duration until this entry completes backoff +func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { + if !b.initialized { + b.initialized = true + return b.backoff + } + newDuration := b.backoff * 2 if newDuration > maxDuration { newDuration = maxDuration } b.backoff = newDuration - klog.V(4).Infof("Backing off %s", duration.String()) - return duration + klog.V(4).Infof("Backing off %s", newDuration.String()) + return newDuration } -func (b *BackoffEntry) wait(maxDuration time.Duration) { +// backoffAndWait Blocks until this entry has completed backoff +func (b *backoffEntry) backoffAndWait(maxDuration time.Duration) { time.Sleep(b.getBackoff(maxDuration)) } // PodBackoff is used to restart a pod with back-off delay. type PodBackoff struct { - perPodBackoff map[ktypes.NamespacedName]*BackoffEntry + // expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd + expiryQ *Heap lock sync.Mutex clock clock defaultDuration time.Duration @@ -111,24 +113,54 @@ func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff { // CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock. func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff { return &PodBackoff{ - perPodBackoff: map[ktypes.NamespacedName]*BackoffEntry{}, + expiryQ: NewHeap(backoffEntryKeyFunc, backoffEntryCompareUpdate), clock: clock, defaultDuration: defaultDuration, maxDuration: maxDuration, } } -// GetEntry returns a back-off entry by Pod ID. -func (p *PodBackoff) GetEntry(podID ktypes.NamespacedName) *BackoffEntry { +// getEntry returns the backoffEntry for a given podID +func (p *PodBackoff) getEntry(podID ktypes.NamespacedName) *backoffEntry { + entry, exists, _ := p.expiryQ.GetByKey(podID.String()) + var be *backoffEntry + if !exists { + be = &backoffEntry{ + initialized: false, + podName: podID, + backoff: p.defaultDuration, + } + p.expiryQ.Update(be) + } else { + be = entry.(*backoffEntry) + } + return be +} + +// BackoffPod updates the backoff for a podId and returns the duration until backoff completion +func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration { p.lock.Lock() defer p.lock.Unlock() - entry, ok := p.perPodBackoff[podID] - if !ok { - entry = &BackoffEntry{backoff: p.defaultDuration} - p.perPodBackoff[podID] = entry - } + entry := p.getEntry(podID) entry.lastUpdate = p.clock.Now() - return entry + p.expiryQ.Update(entry) + return entry.getBackoff(p.maxDuration) +} + +// TryBackoffAndWait tries to acquire the backoff lock +func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool { + p.lock.Lock() + entry := p.getEntry(podID) + + if !entry.tryLock() { + p.lock.Unlock() + return false + } + defer entry.unlock() + duration := entry.getBackoff(p.maxDuration) + p.lock.Unlock() + time.Sleep(duration) + return true } // Gc execute garbage collection on the pod back-off. @@ -136,9 +168,30 @@ func (p *PodBackoff) Gc() { p.lock.Lock() defer p.lock.Unlock() now := p.clock.Now() - for podID, entry := range p.perPodBackoff { - if now.Sub(entry.lastUpdate) > p.maxDuration { - delete(p.perPodBackoff, podID) + var be *backoffEntry + for { + entry := p.expiryQ.Peek() + if entry == nil { + break + } + be = entry.(*backoffEntry) + if now.Sub(be.lastUpdate) > p.maxDuration { + p.expiryQ.Pop() + } else { + break } } } + +// backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap +func backoffEntryKeyFunc(b interface{}) (string, error) { + be := b.(*backoffEntry) + return be.podName.String(), nil +} + +// backoffEntryCompareUpdate returns true when b1's backoff time is before b2's +func backoffEntryCompareUpdate(b1, b2 interface{}) bool { + be1 := b1.(*backoffEntry) + be2 := b2.(*backoffEntry) + return be1.lastUpdate.Before(be2.lastUpdate) +} diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go index 8f61b637e7..8a618ebb32 100644 --- a/pkg/scheduler/util/backoff_utils_test.go +++ b/pkg/scheduler/util/backoff_utils_test.go @@ -64,7 +64,7 @@ func TestBackoff(t *testing.T) { } for _, test := range tests { - duration := backoff.GetEntry(test.podID).getBackoff(backoff.maxDuration) + duration := backoff.BackoffPod(test.podID) if duration != test.expectedDuration { t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) } @@ -72,14 +72,15 @@ func TestBackoff(t *testing.T) { backoff.Gc() } fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"} - backoff.perPodBackoff[fooID].backoff = 60 * time.Second - duration := backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + be := backoff.getEntry(fooID) + be.backoff = 60 * time.Second + duration := backoff.BackoffPod(fooID) if duration != 60*time.Second { t.Errorf("expected: 60, got %s", duration.String()) } // Verify that we split on namespaces correctly, same name, different namespace fooID.Namespace = "other" - duration = backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + duration = backoff.BackoffPod(fooID) if duration != 1*time.Second { t.Errorf("expected: 1, got %s", duration.String()) } diff --git a/pkg/scheduler/util/heap.go b/pkg/scheduler/util/heap.go index d7c5534868..0f15652c65 100644 --- a/pkg/scheduler/util/heap.go +++ b/pkg/scheduler/util/heap.go @@ -113,6 +113,14 @@ func (h *heapData) Pop() interface{} { return item.obj } +// Peek is supposed to be called by heap.Peek only. +func (h *heapData) Peek() interface{} { + if len(h.queue) > 0 { + return h.items[h.queue[0]].obj + } + return nil +} + // Heap is a producer/consumer queue that implements a heap data structure. // It can be used to implement priority queues and similar data structures. type Heap struct { @@ -169,7 +177,12 @@ func (h *Heap) Delete(obj interface{}) error { return fmt.Errorf("object not found") } -// Pop returns the head of the heap. +// Peek returns the head of the heap without removing it. +func (h *Heap) Peek() interface{} { + return h.data.Peek() +} + +// Pop returns the head of the heap and removes it. func (h *Heap) Pop() (interface{}, error) { obj := heap.Pop(h.data) if obj != nil {