Check for shutdown in TryBackoffAndWait

pull/564/head
Gregory Haynes 2018-12-06 16:57:20 +00:00
parent 5e4ccede4c
commit 73710f06db
3 changed files with 33 additions and 4 deletions

View File

@ -1493,7 +1493,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
// to run on a node, scheduler takes the pod into account when running // to run on a node, scheduler takes the pod into account when running
// predicates for the node. // predicates for the node.
if !util.PodPriorityEnabled() { if !util.PodPriorityEnabled() {
if !backoff.TryBackoffAndWait(podID) { if !backoff.TryBackoffAndWait(podID, c.StopEverything) {
klog.Warningf("Request for pod %v already in flight, abandoning", podID) klog.Warningf("Request for pod %v already in flight, abandoning", podID)
return return
} }

View File

@ -143,7 +143,7 @@ func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration {
} }
// TryBackoffAndWait tries to acquire the backoff lock // TryBackoffAndWait tries to acquire the backoff lock
func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool { func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName, stop <-chan struct{}) bool {
p.lock.Lock() p.lock.Lock()
entry := p.getEntry(podID) entry := p.getEntry(podID)
@ -154,8 +154,12 @@ func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool {
defer entry.unlock() defer entry.unlock()
duration := entry.getBackoff(p.maxDuration) duration := entry.getBackoff(p.maxDuration)
p.lock.Unlock() p.lock.Unlock()
time.Sleep(duration) select {
return true case <-time.After(duration):
return true
case <-stop:
return false
}
} }
// Gc execute garbage collection on the pod back-off. // Gc execute garbage collection on the pod back-off.

View File

@ -111,3 +111,28 @@ func TestClearPodBackoff(t *testing.T) {
t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime) t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime)
} }
} }
func TestTryBackoffAndWait(t *testing.T) {
clock := fakeClock{}
backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock)
stopCh := make(chan struct{})
podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"}
if !backoff.TryBackoffAndWait(podID, stopCh) {
t.Error("Expected TryBackoffAndWait success for new pod, got failure.")
}
be := backoff.getEntry(podID)
if !be.tryLock() {
t.Error("Failed to acquire lock for backoffentry")
}
if backoff.TryBackoffAndWait(podID, stopCh) {
t.Error("Expected TryBackoffAndWait failure with lock acquired, got success.")
}
close(stopCh)
if backoff.TryBackoffAndWait(podID, stopCh) {
t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.")
}
}