diff --git a/hack/benchmark-go.sh b/hack/benchmark-go.sh index 2090e47448..5512a8b60c 100755 --- a/hack/benchmark-go.sh +++ b/hack/benchmark-go.sh @@ -20,4 +20,4 @@ set -o pipefail KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. -KUBE_COVER="" KUBE_RACE=" " "${KUBE_ROOT}/hack/test-go.sh" -- -test.run="^X" -benchtime=1s -bench=. -benchmem +KUBE_COVER="" KUBE_RACE=" " "${KUBE_ROOT}/hack/test-go.sh" -- -test.run="^X" -benchtime=1s -bench=. -benchmem $@ diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 32cca298da..340d73a135 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -106,17 +106,24 @@ var ( errorStopRequested = errors.New("Stop requested") ) -// resyncChan returns a channel which will receive something when a resync is required. -func (r *Reflector) resyncChan() <-chan time.Time { +// resyncChan returns a channel which will receive something when a resync is +// required, and a cleanup function. +func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { if r.resyncPeriod == 0 { - return neverExitWatch + return neverExitWatch, func() bool { return false } } - return time.After(r.resyncPeriod) + // The cleanup function is required: imagine the scenario where watches + // always fail so we end up listing frequently. Then, if we don't + // manually stop the timer, we could end up with many timers active + // concurrently. + t := time.NewTimer(r.resyncPeriod) + return t.C, t.Stop } func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { var resourceVersion string - resyncCh := r.resyncChan() + resyncCh, cleanup := r.resyncChan() + defer cleanup() list, err := r.listerWatcher.List() if err != nil { diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 54bfc136d3..a63dfd0356 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -18,6 +18,7 @@ package cache import ( "fmt" + "math/rand" "strconv" "testing" "time" @@ -95,7 +96,8 @@ func TestRunUntil(t *testing.T) { func TestReflector_resyncChan(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond) - a, b := g.resyncChan(), time.After(100*time.Millisecond) + a, _ := g.resyncChan() + b := time.After(100 * time.Millisecond) select { case <-a: t.Logf("got timeout as expected") @@ -104,6 +106,18 @@ func TestReflector_resyncChan(t *testing.T) { } } +func BenchmarkReflector_resyncChanMany(b *testing.B) { + s := NewStore(MetaNamespaceKeyFunc) + g := NewReflector(&testLW{}, &api.Pod{}, s, 25*time.Millisecond) + // The improvement to this (calling the timer's Stop() method) makes + // this benchmark about 40% faster. + for i := 0; i < b.N; i++ { + g.resyncPeriod = time.Duration(rand.Float64() * float64(time.Millisecond) * 25) + _, stop := g.resyncChan() + stop() + } +} + func TestReflector_watchHandlerError(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, 0) diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 9e390e2999..1efb02ca41 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -57,7 +57,7 @@ const ( // specifically targeted at the case where some problem prevents an update // of expectations, without it the RC could stay asleep forever. This should // be set based on the expected latency of watch events. - + // // TODO: Set this per expectation, based on its size. // Currently an rc can service (create *and* observe the watch events for said // creation) about 10-20 pods a second, so it takes about 3.5 min to service diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 3eebffdea4..dd3b48d04f 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -29,6 +29,8 @@ import ( ) func (m *Master) serviceWriterLoop(stop chan struct{}) { + t := time.NewTicker(10 * time.Second) + defer t.Stop() for { // Update service & endpoint records. // TODO: when it becomes possible to change this stuff, @@ -49,12 +51,14 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) { select { case <-stop: return - case <-time.After(10 * time.Second): + case <-t.C: } } } func (m *Master) roServiceWriterLoop(stop chan struct{}) { + t := time.NewTicker(10 * time.Second) + defer t.Stop() for { // Update service & endpoint records. // TODO: when it becomes possible to change this stuff, @@ -74,7 +78,7 @@ func (m *Master) roServiceWriterLoop(stop chan struct{}) { select { case <-stop: return - case <-time.After(10 * time.Second): + case <-t.C: } } } diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 5289e8a309..5587769fd0 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -366,16 +366,16 @@ const syncInterval = 5 * time.Second // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { + t := time.NewTicker(syncInterval) + defer t.Stop() for { - select { - case <-time.After(syncInterval): - glog.V(3).Infof("Periodic sync") - if err := iptablesInit(proxier.iptables); err != nil { - glog.Errorf("Failed to ensure iptables: %v", err) - } - proxier.ensurePortals() - proxier.cleanupStaleStickySessions() + <-t.C + glog.V(3).Infof("Periodic sync") + if err := iptablesInit(proxier.iptables); err != nil { + glog.Errorf("Failed to ensure iptables: %v", err) } + proxier.ensurePortals() + proxier.cleanupStaleStickySessions() } } diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index 1b05efe7d8..d6585cbe64 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -89,7 +89,12 @@ func poller(interval, timeout time.Duration) WaitFunc { defer tick.Stop() var after <-chan time.Time if timeout != 0 { - after = time.After(timeout) + // time.After is more convenient, but it + // potentially leaves timers around much longer + // than necessary if we exit early. + timer := time.NewTimer(timeout) + after = timer.C + defer timer.Stop() } for { select {