Reschedule with backoff

With the alpha scheduling queue we move pods from unschedulable to
active on certain events without a backoff. As a result we can cause
starvation issues if high priority pods are in the unschedulable queue.
Implement a backoff mechanism for pods being moved to active.

Closes #56721
pull/564/head
Gregory Haynes 2018-07-24 20:46:40 +00:00
parent 082b48240a
commit 5e4ccede4c
10 changed files with 340 additions and 43 deletions

View File

@ -507,7 +507,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for _, name := range test.nodes {
cache.AddNode(createNode(name))
}
queue := internalqueue.NewSchedulingQueue()
queue := internalqueue.NewSchedulingQueue(nil)
scheduler := NewGenericScheduler(
cache,
nil,

View File

@ -472,7 +472,7 @@ func TestGenericScheduler(t *testing.T) {
scheduler := NewGenericScheduler(
cache,
nil,
internalqueue.NewSchedulingQueue(),
internalqueue.NewSchedulingQueue(nil),
test.predicates,
algorithm.EmptyPredicateMetadataProducer,
test.prioritizers,
@ -509,7 +509,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod
s := NewGenericScheduler(
cache,
nil,
internalqueue.NewSchedulingQueue(),
internalqueue.NewSchedulingQueue(nil),
predicates,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
@ -1436,7 +1436,7 @@ func TestPreempt(t *testing.T) {
scheduler := NewGenericScheduler(
cache,
nil,
internalqueue.NewSchedulingQueue(),
internalqueue.NewSchedulingQueue(nil),
map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}},
@ -1564,7 +1564,7 @@ func TestCacheInvalidationRace(t *testing.T) {
scheduler := NewGenericScheduler(
mockCache,
eCache,
internalqueue.NewSchedulingQueue(),
internalqueue.NewSchedulingQueue(nil),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
@ -1648,7 +1648,7 @@ func TestCacheInvalidationRace2(t *testing.T) {
scheduler := NewGenericScheduler(
cache,
eCache,
internalqueue.NewSchedulingQueue(),
internalqueue.NewSchedulingQueue(nil),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,

View File

@ -283,7 +283,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
c := &configFactory{
client: args.Client,
podLister: schedulerCache,
podQueue: internalqueue.NewSchedulingQueue(),
podQueue: internalqueue.NewSchedulingQueue(stopEverything),
nodeLister: args.NodeInformer.Lister(),
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),

View File

@ -12,6 +12,8 @@ go_library(
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -30,11 +30,14 @@ import (
"fmt"
"reflect"
"sync"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
@ -72,9 +75,9 @@ type SchedulingQueue interface {
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
// enabled a priority queue is returned. If it is disabled, a FIFO is returned.
func NewSchedulingQueue() SchedulingQueue {
func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue {
if util.PodPriorityEnabled() {
return NewPriorityQueue()
return NewPriorityQueue(stop)
}
return NewFIFO()
}
@ -178,12 +181,20 @@ func NominatedNodeName(pod *v1.Pod) string {
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ.
type PriorityQueue struct {
stop <-chan struct{}
clock util.Clock
// podBackoff tracks backoff for pods attempting to be rescheduled
podBackoff *util.PodBackoff
lock sync.RWMutex
cond sync.Cond
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
activeQ *util.Heap
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ *util.Heap
// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap
// nominatedPods is a map keyed by a node name and the value is a list of
@ -227,16 +238,33 @@ func activeQComp(pod1, pod2 interface{}) bool {
}
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue() *PriorityQueue {
func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue {
return NewPriorityQueueWithClock(stop, util.RealClock{})
}
// NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time.
func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *PriorityQueue {
pq := &PriorityQueue{
clock: clock,
stop: stop,
podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock),
activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: map[string][]*v1.Pod{},
}
pq.cond.L = &pq.lock
pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted)
pq.run()
return pq
}
// run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
}
// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not
// already exist in the map. Adding an existing pod is not going to update the pod.
func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
@ -278,7 +306,7 @@ func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) {
}
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in either queue.
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
@ -291,6 +319,10 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.deleteNominatedPodIfExists(pod)
p.unschedulableQ.delete(pod)
}
// Delete pod from backoffQ if it is backing off
if err = p.podBackoffQ.Delete(pod); err == nil {
klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
}
p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast()
}
@ -308,6 +340,9 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
if _, exists, _ := p.activeQ.Get(pod); exists {
return nil
}
if _, exists, _ := p.podBackoffQ.Get(pod); exists {
return nil
}
err := p.activeQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
@ -323,6 +358,40 @@ func isPodUnschedulable(pod *v1.Pod) bool {
return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable
}
// nsNameForPod returns a namespacedname for a pod
func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
return ktypes.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
}
// clearPodBackoff clears all backoff state for a pod (resets expiry)
func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) {
p.podBackoff.ClearPodBackoff(nsNameForPod(pod))
}
// isPodBackingOff returns whether a pod is currently undergoing backoff in the podBackoff structure
func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool {
boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
if !exists {
return false
}
return boTime.After(p.clock.Now())
}
// backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff
// timeout otherwise it does nothing.
func (p *PriorityQueue) backoffPod(pod *v1.Pod) {
p.podBackoff.Gc()
podID := nsNameForPod(pod)
boTime, found := p.podBackoff.GetBackoffTime(podID)
if !found || boTime.Before(p.clock.Now()) {
p.podBackoff.BackoffPod(podID)
}
}
// AddUnschedulableIfNotPresent does nothing if the pod is present in either
// queue. Otherwise it adds the pod to the unschedulable queue if
// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.
@ -335,11 +404,27 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
if _, exists, _ := p.activeQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if _, exists, _ := p.podBackoffQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the backoffQ")
}
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
p.backoffPod(pod)
p.unschedulableQ.addOrUpdate(pod)
p.addNominatedPodIfNeeded(pod)
return nil
}
// If a move request has been received and the pod is subject to backoff, move it to the BackoffQ.
if p.isPodBackingOff(pod) && isPodUnschedulable(pod) {
err := p.podBackoffQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
} else {
p.addNominatedPodIfNeeded(pod)
}
return err
}
err := p.activeQ.Add(pod)
if err == nil {
p.addNominatedPodIfNeeded(pod)
@ -348,6 +433,39 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
return err
}
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
for {
rawPod := p.podBackoffQ.Peek()
if rawPod == nil {
return
}
pod := rawPod.(*v1.Pod)
boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
if !found {
klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
p.podBackoffQ.Pop()
p.activeQ.Add(pod)
defer p.cond.Broadcast()
continue
}
if boTime.After(p.clock.Now()) {
return
}
_, err := p.podBackoffQ.Pop()
if err != nil {
klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
return
}
p.activeQ.Add(pod)
defer p.cond.Broadcast()
}
}
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It also
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.
@ -391,16 +509,33 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(newPod); exists {
p.updateNominatedPod(oldPod, newPod)
err := p.activeQ.Update(newPod)
return err
if oldPod != nil {
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(oldPod); exists {
p.updateNominatedPod(oldPod, newPod)
err := p.activeQ.Update(newPod)
return err
}
// If the pod is in the backoff queue, update it there.
if _, exists, _ := p.podBackoffQ.Get(oldPod); exists {
p.updateNominatedPod(oldPod, newPod)
p.podBackoffQ.Delete(newPod)
err := p.activeQ.Add(newPod)
if err == nil {
p.cond.Broadcast()
}
return err
}
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPod := p.unschedulableQ.get(newPod); usPod != nil {
p.updateNominatedPod(oldPod, newPod)
if isPodUpdated(oldPod, newPod) {
// If the pod is updated reset backoff
p.clearPodBackoff(newPod)
p.unschedulableQ.delete(usPod)
err := p.activeQ.Add(newPod)
if err == nil {
@ -408,6 +543,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
}
return err
}
// Pod is already in unschedulable queue and hasnt updated, no need to backoff again
p.unschedulableQ.addOrUpdate(newPod)
return nil
}
@ -428,6 +564,8 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.deleteNominatedPodIfExists(pod)
err := p.activeQ.Delete(pod)
if err != nil { // The item was probably not found in the activeQ.
p.clearPodBackoff(pod)
p.podBackoffQ.Delete(pod)
p.unschedulableQ.delete(pod)
}
return nil
@ -453,16 +591,18 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
// function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives it after all the pods are in the
// queue and the head is the highest priority pod.
// TODO(bsalamat): We should add a back-off mechanism here so that a high priority
// pod which is unschedulable does not go to the head of the queue frequently. For
// example in a cluster where a lot of pods being deleted, such a high priority
// pod can deprive other pods from getting scheduled.
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()
for _, pod := range p.unschedulableQ.pods {
if err := p.activeQ.Add(pod); err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pod); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
if err := p.activeQ.Add(pod); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
}
}
}
p.unschedulableQ.clear()
@ -473,11 +613,16 @@ func (p *PriorityQueue) MoveAllToActiveQueue() {
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
for _, pod := range pods {
if err := p.activeQ.Add(pod); err == nil {
p.unschedulableQ.delete(pod)
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pod); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
if err := p.activeQ.Add(pod); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
}
}
p.unschedulableQ.delete(pod)
}
p.receivedMoveRequest = true
p.cond.Broadcast()
@ -550,6 +695,12 @@ func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
p.lock.Unlock()
}
func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool {
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p1.(*v1.Pod)))
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p2.(*v1.Pod)))
return bo1.Before(bo2)
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {

View File

@ -95,7 +95,7 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.
}
func TestPriorityQueue_Add(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
q.Add(&highPriorityPod)
@ -120,7 +120,7 @@ func TestPriorityQueue_Add(t *testing.T) {
}
func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
q.unschedulableQ.addOrUpdate(&highPriNominatedPod)
q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddIfNotPresent(&medPriorityPod)
@ -146,7 +146,7 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ.
@ -172,7 +172,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
}
func TestPriorityQueue_Pop(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
@ -189,7 +189,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
}
func TestPriorityQueue_Update(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
q.Update(nil, &highPriorityPod)
if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
@ -225,7 +225,7 @@ func TestPriorityQueue_Update(t *testing.T) {
}
func TestPriorityQueue_Delete(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
q.Update(&highPriorityPod, &highPriNominatedPod)
q.Add(&unschedulablePod)
q.Delete(&highPriNominatedPod)
@ -245,7 +245,7 @@ func TestPriorityQueue_Delete(t *testing.T) {
}
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
q.Add(&medPriorityPod)
q.unschedulableQ.addOrUpdate(&unschedulablePod)
q.unschedulableQ.addOrUpdate(&highPriorityPod)
@ -291,7 +291,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
Spec: v1.PodSpec{NodeName: "machine1"},
}
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
q.Add(&medPriorityPod)
// Add a couple of pods to the unschedulableQ.
q.unschedulableQ.addOrUpdate(&unschedulablePod)
@ -312,7 +312,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
}
func TestPriorityQueue_WaitingPodsForNode(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
q.Add(&highPriorityPod)
@ -491,7 +491,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
},
{
name: "PriorityQueue close",
q: NewPriorityQueue(),
q: NewPriorityQueue(nil),
expectedErr: fmt.Errorf(queueClosed),
},
}
@ -520,7 +520,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
// ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue.
func TestRecentlyTriedPodsGoBack(t *testing.T) {
q := NewPriorityQueue()
q := NewPriorityQueue(nil)
// Add a few pods to priority queue.
for i := 0; i < 5; i++ {
p := v1.Pod{
@ -567,3 +567,66 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
}
}
}
// TestHighPriorityBackoff tests that a high priority pod does not block
// other pods if it is unschedulable
func TestHighProirotyBackoff(t *testing.T) {
q := NewPriorityQueue(nil)
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-midpod",
Namespace: "ns1",
UID: types.UID("tp-mid"),
},
Spec: v1.PodSpec{
Priority: &midPriority,
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
}
highPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-highpod",
Namespace: "ns1",
UID: types.UID("tp-high"),
},
Spec: v1.PodSpec{
Priority: &highPriority,
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
}
q.Add(&midPod)
q.Add(&highPod)
// Simulate a pod being popped by the scheduler, determined unschedulable, and
// then moved back to the active queue.
p, err := q.Pop()
if err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
}
if p != &highPod {
t.Errorf("Expected to get high prority pod, got: %v", p)
}
// Update pod condition to unschedulable.
podutil.UpdatePodCondition(&p.Status, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
})
// Put in the unschedulable queue.
q.AddUnschedulableIfNotPresent(p)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveQueue()
p, err = q.Pop()
if err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
}
if p != &midPod {
t.Errorf("Expected to get mid prority pod, got: %v", p)
}
}

View File

@ -26,6 +26,7 @@ go_library(
name = "go_default_library",
srcs = [
"backoff_utils.go",
"clock.go",
"heap.go",
"utils.go",
],
@ -36,6 +37,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -80,11 +80,6 @@ func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration {
return newDuration
}
// backoffAndWait Blocks until this entry has completed backoff
func (b *backoffEntry) backoffAndWait(maxDuration time.Duration) {
time.Sleep(b.getBackoff(maxDuration))
}
// PodBackoff is used to restart a pod with back-off delay.
type PodBackoff struct {
// expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd
@ -183,6 +178,30 @@ func (p *PodBackoff) Gc() {
}
}
// GetBackoffTime returns the time that podID completes backoff
func (p *PodBackoff) GetBackoffTime(podID ktypes.NamespacedName) (time.Time, bool) {
p.lock.Lock()
defer p.lock.Unlock()
rawBe, exists, _ := p.expiryQ.GetByKey(podID.String())
if !exists {
return time.Time{}, false
}
be := rawBe.(*backoffEntry)
return be.lastUpdate.Add(be.backoff), true
}
// ClearPodBackoff removes all tracking information for podID (clears expiry)
func (p *PodBackoff) ClearPodBackoff(podID ktypes.NamespacedName) bool {
p.lock.Lock()
defer p.lock.Unlock()
entry, exists, _ := p.expiryQ.GetByKey(podID.String())
if exists {
err := p.expiryQ.Delete(entry)
return err == nil
}
return false
}
// backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap
func backoffEntryKeyFunc(b interface{}) (string, error) {
be := b.(*backoffEntry)

View File

@ -31,7 +31,7 @@ func (f *fakeClock) Now() time.Time {
return f.t
}
func TestBackoff(t *testing.T) {
func TestBackoffPod(t *testing.T) {
clock := fakeClock{}
backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock)
tests := []struct {
@ -66,7 +66,10 @@ func TestBackoff(t *testing.T) {
for _, test := range tests {
duration := backoff.BackoffPod(test.podID)
if duration != test.expectedDuration {
t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID)
t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID)
}
if boTime, _ := backoff.GetBackoffTime(test.podID); boTime != clock.Now().Add(test.expectedDuration) {
t.Errorf("expected GetBackoffTime %s, got %s for pod %s", test.expectedDuration.String(), boTime.String(), test.podID)
}
clock.t = clock.t.Add(test.advanceClock)
backoff.Gc()
@ -85,3 +88,26 @@ func TestBackoff(t *testing.T) {
t.Errorf("expected: 1, got %s", duration.String())
}
}
func TestClearPodBackoff(t *testing.T) {
clock := fakeClock{}
backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock)
if backoff.ClearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "nonexist"}) {
t.Error("Expected ClearPodBackoff failure for unknown pod, got success.")
}
podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"}
if dur := backoff.BackoffPod(podID); dur != 1*time.Second {
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, dur.String())
}
if !backoff.ClearPodBackoff(podID) {
t.Errorf("Failed to clear backoff for pod %v", podID)
}
expectBoTime := clock.Now()
if boTime, _ := backoff.GetBackoffTime(podID); boTime != expectBoTime {
t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime)
}
}

View File

@ -0,0 +1,34 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
)
// Clock provides an interface for getting the current time
type Clock interface {
Now() time.Time
}
// RealClock implements a clock using time
type RealClock struct{}
// Now returns the current time with time.Now
func (RealClock) Now() time.Time {
return time.Now()
}