diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 9a41cbd805..b008286db5 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -1493,7 +1493,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue // to run on a node, scheduler takes the pod into account when running // predicates for the node. if !util.PodPriorityEnabled() { - if !backoff.TryBackoffAndWait(podID) { + if !backoff.TryBackoffAndWait(podID, c.StopEverything) { klog.Warningf("Request for pod %v already in flight, abandoning", podID) return } diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go index f800d9d5af..618f93772f 100644 --- a/pkg/scheduler/util/backoff_utils.go +++ b/pkg/scheduler/util/backoff_utils.go @@ -143,7 +143,7 @@ func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration { } // TryBackoffAndWait tries to acquire the backoff lock -func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool { +func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName, stop <-chan struct{}) bool { p.lock.Lock() entry := p.getEntry(podID) @@ -154,8 +154,12 @@ func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName) bool { defer entry.unlock() duration := entry.getBackoff(p.maxDuration) p.lock.Unlock() - time.Sleep(duration) - return true + select { + case <-time.After(duration): + return true + case <-stop: + return false + } } // Gc execute garbage collection on the pod back-off. diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go index 7aa045da3b..b99c9498f5 100644 --- a/pkg/scheduler/util/backoff_utils_test.go +++ b/pkg/scheduler/util/backoff_utils_test.go @@ -111,3 +111,28 @@ func TestClearPodBackoff(t *testing.T) { t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime) } } + +func TestTryBackoffAndWait(t *testing.T) { + clock := fakeClock{} + backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) + + stopCh := make(chan struct{}) + podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"} + if !backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait success for new pod, got failure.") + } + + be := backoff.getEntry(podID) + if !be.tryLock() { + t.Error("Failed to acquire lock for backoffentry") + } + + if backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait failure with lock acquired, got success.") + } + + close(stopCh) + if backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.") + } +}