From 286c3c543c780485072a8d1b9588e046aea9479c Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 22 Aug 2014 11:36:01 -0400 Subject: [PATCH] Improve the wait.Poll GoDoc and api Add more tests, and switch to timeouts instead of cycles. Deflake TestPoller --- cmd/integration/integration.go | 4 +-- pkg/kubecfg/kubecfg.go | 2 +- pkg/util/wait/wait.go | 31 ++++++++++++------ pkg/util/wait/wait_test.go | 57 ++++++++++++++++++++++++++++++++-- 4 files changed, 78 insertions(+), 16 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 198ab4b755..b7bfc03a5b 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -170,7 +170,7 @@ func runReplicationControllerTest(c *client.Client) { glog.Infof("Done creating replication controllers") // Give the controllers some time to actually create the pods - if err := wait.Poll(time.Second, 10, c.ControllerHasDesiredReplicas(controllerRequest)); err != nil { + if err := wait.Poll(time.Second, time.Second*10, c.ControllerHasDesiredReplicas(controllerRequest)); err != nil { glog.Fatalf("FAILED: pods never created %v", err) } @@ -179,7 +179,7 @@ func runReplicationControllerTest(c *client.Client) { if err != nil { glog.Fatalf("FAILED: unable to get pods to list: %v", err) } - if err := wait.Poll(time.Second, 10, podsOnMinions(c, pods)); err != nil { + if err := wait.Poll(time.Second, time.Second*10, podsOnMinions(c, pods)); err != nil { glog.Fatalf("FAILED: pods never started running %v", err) } diff --git a/pkg/kubecfg/kubecfg.go b/pkg/kubecfg/kubecfg.go index 35e836e030..878011b5e2 100644 --- a/pkg/kubecfg/kubecfg.go +++ b/pkg/kubecfg/kubecfg.go @@ -107,7 +107,7 @@ func Update(name string, client client.Interface, updatePeriod time.Duration) er } time.Sleep(updatePeriod) } - return wait.Poll(time.Second*5, 60 /* timeout after 300 seconds */, func() (bool, error) { + return wait.Poll(time.Second*5, time.Second*300, func() (bool, error) { podList, err := client.ListPods(s) if err != nil { return false, err diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index a89f048ff3..d4c4714237 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -28,17 +28,24 @@ var ErrWaitTimeout = errors.New("timed out waiting for the condition") // if the loop should be aborted. type ConditionFunc func() (done bool, err error) -// Poll tries a condition func until it returns true, an error, or the -// wait channel is closed. Will always poll at least once. -func Poll(interval time.Duration, cycles int, condition ConditionFunc) error { - return WaitFor(poller(interval, cycles), condition) +// Poll tries a condition func until it returns true, an error, or the timeout +// is reached. condition will always be invoked at least once but some intervals +// may be missed if the condition takes too long or the time window is too short. +// If you pass maxTimes = 0, Poll will loop until condition returns true or an +// error. +func Poll(interval, timeout time.Duration, condition ConditionFunc) error { + return WaitFor(poller(interval, timeout), condition) } // WaitFunc creates a channel that receives an item every time a test // should be executed and is closed when the last test should be invoked. type WaitFunc func() <-chan struct{} -// WaitFor implements the looping for a wait. +// WaitFor gets a channel from wait(), and then invokes c once for every value +// placed on the channel and once more when the channel is closed. If c +// returns an error the loop ends and that error is returned, and if c returns +// true the loop ends and nil is returned. ErrWaitTimeout will be returned if +// the channel is closed without c every returning true. func WaitFor(wait WaitFunc, c ConditionFunc) error { w := wait() for { @@ -58,16 +65,20 @@ func WaitFor(wait WaitFunc, c ConditionFunc) error { } // poller returns a WaitFunc that will send to the channel every -// interval until at most cycles * interval has elapsed and then -// close the channel. Over very short intervals you may receive -// no ticks before being closed. -func poller(interval time.Duration, cycles int) WaitFunc { +// interval until timeout has elapsed and then close the channel. +// Over very short intervals you may receive no ticks before +// the channel is closed closed. If maxTimes is 0, the channel +// will never be closed. +func poller(interval, timeout time.Duration) WaitFunc { return WaitFunc(func() <-chan struct{} { ch := make(chan struct{}) go func() { tick := time.NewTicker(interval) defer tick.Stop() - after := time.After(interval * time.Duration(cycles)) + var after <-chan time.Time + if timeout != 0 { + after = time.After(timeout) + } for { select { case <-tick.C: diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index eb30b4da73..c54d5b267f 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -23,7 +23,7 @@ import ( ) func TestPoller(t *testing.T) { - w := poller(time.Millisecond, 2) + w := poller(time.Millisecond, 2*time.Millisecond) ch := w() count := 0 DRAIN: @@ -34,7 +34,7 @@ DRAIN: break DRAIN } count++ - case <-time.After(time.Millisecond * 5): + case <-time.After(time.Second): t.Errorf("unexpected timeout after poll") } } @@ -62,7 +62,7 @@ func TestPoll(t *testing.T) { invocations++ return true, nil }) - if err := Poll(time.Microsecond, 1, f); err != nil { + if err := Poll(time.Microsecond, time.Microsecond, f); err != nil { t.Fatalf("unexpected error %v", err) } if invocations == 0 { @@ -70,6 +70,57 @@ func TestPoll(t *testing.T) { } } +func TestPollForever(t *testing.T) { + ch := make(chan struct{}) + done := make(chan struct{}, 1) + complete := make(chan struct{}) + go func() { + f := ConditionFunc(func() (bool, error) { + ch <- struct{}{} + select { + case <-done: + return true, nil + default: + } + return false, nil + }) + if err := Poll(time.Microsecond, 0, f); err != nil { + t.Fatalf("unexpected error %v", err) + } + close(ch) + complete <- struct{}{} + }() + + // ensure the condition is opened + <-ch + + // ensure channel sends events + for i := 0; i < 10; i++ { + select { + case _, open := <-ch: + if !open { + t.Fatalf("did not expect channel to be closed") + } + case <-time.After(time.Second): + t.Fatalf("channel did not return at least once within the poll interval") + } + } + + // at most two poll notifications should be sent once we return from the condition + done <- struct{}{} + go func() { + for i := 0; i < 2; i++ { + _, open := <-ch + if open { + <-complete + return + } + } + t.Fatalf("expected closed channel after two iterations") + }() + <-complete +} + func TestWaitFor(t *testing.T) { var invocations int testCases := map[string]struct {