From a766ed2d2b0a85670ae494d30881eb04c1ca373f Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Thu, 27 Dec 2018 18:12:55 +0800 Subject: [PATCH 1/2] WaitFor returns immediately when done is closed --- .../k8s.io/apimachinery/pkg/util/wait/wait.go | 22 ++++++------- .../apimachinery/pkg/util/wait/wait_test.go | 33 ++++++++++++++++++- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go index 760e17066c..e60f45bffd 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -351,22 +351,21 @@ type WaitFunc func(done <-chan struct{}) <-chan struct{} // WaitFor continually checks 'fn' as driven by 'wait'. // // WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value -// placed on the channel and once more when the channel is closed. +// placed on the channel and once more when the channel is closed. If the channel is closed +// and 'fn' returns false without error, WaitFor returns ErrWaitTimeout. // -// If 'fn' returns an error the loop ends and that error is returned, and if +// If 'fn' returns an error the loop ends and that error is returned. If // 'fn' returns true the loop ends and nil is returned. // -// ErrWaitTimeout will be returned if the channel is closed without fn ever +// ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever // returning true. +// +// When the done channel is closed, because the golang `select` statement is +// "uniform pseudo-random", the `fn` might still run one or multiple time, +// though eventually `WaitFor` will return. func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { stopCh := make(chan struct{}) - once := sync.Once{} - closeCh := func() { - once.Do(func() { - close(stopCh) - }) - } - defer closeCh() + defer close(stopCh) c := wait(stopCh) for { select { @@ -382,10 +381,9 @@ func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { return ErrWaitTimeout } case <-done: - closeCh() + return ErrWaitTimeout } } - return ErrWaitTimeout } // poller returns a WaitFunc that will send to the channel every interval until diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go index 611fa5a0a0..89987983d7 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go @@ -456,11 +456,42 @@ func TestWaitFor(t *testing.T) { } } +// TestWaitForWithEarlyClosingWaitFunc tests WaitFor when the WaitFunc closes its channel. The WaitFor should +// always return ErrWaitTimeout. +func TestWaitForWithEarlyClosingWaitFunc(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + + start := time.Now() + err := WaitFor(func(done <-chan struct{}) <-chan struct{} { + c := make(chan struct{}) + close(c) + return c + }, func() (bool, error) { + return false, nil + }, stopCh) + duration := time.Now().Sub(start) + + // The WaitFor should return immediately, so the duration is close to 0s. + if duration >= ForeverTestTimeout/2 { + t.Errorf("expected short timeout duration") + } + if err != ErrWaitTimeout { + t.Errorf("expected ErrWaitTimeout from WaitFunc") + } +} + +// TestWaitForWithClosedChannel tests WaitFor when it receives a closed channel. The WaitFor should +// always return ErrWaitTimeout. func TestWaitForWithClosedChannel(t *testing.T) { stopCh := make(chan struct{}) close(stopCh) + c := make(chan struct{}) + defer close(c) start := time.Now() - err := WaitFor(poller(ForeverTestTimeout, ForeverTestTimeout), func() (bool, error) { + err := WaitFor(func(done <-chan struct{}) <-chan struct{} { + return c + }, func() (bool, error) { return false, nil }, stopCh) duration := time.Now().Sub(start) From 44b2e8464bd3ece4effdb888ceabf65ae2f39ff7 Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Mon, 31 Dec 2018 20:53:19 +0800 Subject: [PATCH 2/2] change the period of GarbageCollector.Sync to 200ms for TestGarbageCollectorSync --- .../garbagecollector/garbagecollector_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 60aab9ff19..b382a3a185 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -856,7 +856,16 @@ func TestGarbageCollectorSync(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go gc.Run(1, stopCh) - go gc.Sync(fakeDiscoveryClient, 10*time.Millisecond, stopCh) + // The pseudo-code of GarbageCollector.Sync(): + // GarbageCollector.Sync(client, period, stopCh): + // wait.Until() loops with `period` until the `stopCh` is closed : + // wait.PollImmediateUntil() loops with 100ms (hardcode) util the `stopCh` is closed: + // GetDeletableResources() + // gc.resyncMonitors() + // controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced. + // + // Setting the period to 200ms allows the WaitForCacheSync() to check for cache sync ~2 times in every wait.PollImmediateUntil() loop. + go gc.Sync(fakeDiscoveryClient, 200*time.Millisecond, stopCh) // Wait until the sync discovers the initial resources fmt.Printf("Test output")