From 1f393cdef96fe6e4ddcbf93825d65a9980463406 Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Sat, 31 Mar 2018 19:41:43 +1100 Subject: [PATCH] Stop() for Ticker to enable leak-free code --- pkg/kubelet/eviction/eviction_manager.go | 6 +- .../apimachinery/pkg/util/clock/clock.go | 63 ++++++++++++------- .../apimachinery/pkg/util/clock/clock_test.go | 18 +++++- .../util/workqueue/delaying_queue.go | 10 ++- .../workqueue/rate_limitting_queue_test.go | 2 +- test/images/logs-generator/logs_generator.go | 5 +- 6 files changed, 69 insertions(+), 35 deletions(-) diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index f580e2951b..9870238caf 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -419,13 +419,15 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act func (m *managerImpl) waitForPodsCleanup(podCleanedUpFunc PodCleanedUpFunc, pods []*v1.Pod) { timeout := m.clock.NewTimer(podCleanupTimeout) - tick := m.clock.Tick(podCleanupPollFreq) + defer timeout.Stop() + ticker := m.clock.NewTicker(podCleanupPollFreq) + defer ticker.Stop() for { select { case <-timeout.C(): glog.Warningf("eviction manager: timed out waiting for pods %s to be cleaned up", format.Pods(pods)) return - case <-tick: + case <-ticker.C(): for i, pod := range pods { if !podCleanedUpFunc(pod) { break diff --git a/staging/src/k8s.io/apimachinery/pkg/util/clock/clock.go b/staging/src/k8s.io/apimachinery/pkg/util/clock/clock.go index c303a212a0..9567f90060 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/clock/clock.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/clock/clock.go @@ -26,18 +26,12 @@ import ( type Clock interface { Now() time.Time Since(time.Time) time.Duration - After(d time.Duration) <-chan time.Time - NewTimer(d time.Duration) Timer - Sleep(d time.Duration) - Tick(d time.Duration) <-chan time.Time + After(time.Duration) <-chan time.Time + NewTimer(time.Duration) Timer + Sleep(time.Duration) + NewTicker(time.Duration) Ticker } -var ( - _ = Clock(RealClock{}) - _ = Clock(&FakeClock{}) - _ = Clock(&IntervalClock{}) -) - // RealClock really calls time.Now() type RealClock struct{} @@ -62,8 +56,10 @@ func (RealClock) NewTimer(d time.Duration) Timer { } } -func (RealClock) Tick(d time.Duration) <-chan time.Time { - return time.Tick(d) +func (RealClock) NewTicker(d time.Duration) Ticker { + return &realTicker{ + ticker: time.NewTicker(d), + } } func (RealClock) Sleep(d time.Duration) { @@ -137,7 +133,7 @@ func (f *FakeClock) NewTimer(d time.Duration) Timer { return timer } -func (f *FakeClock) Tick(d time.Duration) <-chan time.Time { +func (f *FakeClock) NewTicker(d time.Duration) Ticker { f.lock.Lock() defer f.lock.Unlock() tickTime := f.time.Add(d) @@ -149,7 +145,9 @@ func (f *FakeClock) Tick(d time.Duration) <-chan time.Time { destChan: ch, }) - return ch + return &fakeTicker{ + c: ch, + } } // Move clock by Duration, notify anyone that's called After, Tick, or NewTimer @@ -242,8 +240,8 @@ func (*IntervalClock) NewTimer(d time.Duration) Timer { // Unimplemented, will panic. // TODO: make interval clock use FakeClock so this can be implemented. -func (*IntervalClock) Tick(d time.Duration) <-chan time.Time { - panic("IntervalClock doesn't implement Tick") +func (*IntervalClock) NewTicker(d time.Duration) Ticker { + panic("IntervalClock doesn't implement NewTicker") } func (*IntervalClock) Sleep(d time.Duration) { @@ -258,11 +256,6 @@ type Timer interface { Reset(d time.Duration) bool } -var ( - _ = Timer(&realTimer{}) - _ = Timer(&fakeTimer{}) -) - // realTimer is backed by an actual time.Timer. type realTimer struct { timer *time.Timer @@ -325,3 +318,31 @@ func (f *fakeTimer) Reset(d time.Duration) bool { return active } + +type Ticker interface { + C() <-chan time.Time + Stop() +} + +type realTicker struct { + ticker *time.Ticker +} + +func (t *realTicker) C() <-chan time.Time { + return t.ticker.C +} + +func (t *realTicker) Stop() { + t.ticker.Stop() +} + +type fakeTicker struct { + c <-chan time.Time +} + +func (t *fakeTicker) C() <-chan time.Time { + return t.c +} + +func (t *fakeTicker) Stop() { +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/clock/clock_test.go b/staging/src/k8s.io/apimachinery/pkg/util/clock/clock_test.go index 27d34605f5..c7b371fc6d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/clock/clock_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/clock/clock_test.go @@ -21,6 +21,18 @@ import ( "time" ) +var ( + _ = Clock(RealClock{}) + _ = Clock(&FakeClock{}) + _ = Clock(&IntervalClock{}) + + _ = Timer(&realTimer{}) + _ = Timer(&fakeTimer{}) + + _ = Ticker(&realTicker{}) + _ = Ticker(&fakeTicker{}) +) + func TestFakeClock(t *testing.T) { startTime := time.Now() tc := NewFakeClock(startTime) @@ -110,13 +122,13 @@ func TestFakeTick(t *testing.T) { if tc.HasWaiters() { t.Errorf("unexpected waiter?") } - oneSec := tc.Tick(time.Second) + oneSec := tc.NewTicker(time.Second).C() if !tc.HasWaiters() { t.Errorf("unexpected lack of waiter?") } - oneOhOneSec := tc.Tick(time.Second + time.Millisecond) - twoSec := tc.Tick(2 * time.Second) + oneOhOneSec := tc.NewTicker(time.Second + time.Millisecond).C() + twoSec := tc.NewTicker(2 * time.Second).C() select { case <-oneSec: t.Errorf("unexpected channel read") diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go index c62ed32efa..a37177425d 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go @@ -45,7 +45,7 @@ func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ Interface: NewNamed(name), clock: clock, - heartbeat: clock.Tick(maxWait), + heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(name), @@ -67,10 +67,7 @@ type delayingType struct { stopCh chan struct{} // heartbeat ensures we wait no more than maxWait before firing - // - // TODO: replace with Ticker (and add to clock) so this can be cleaned up. - // clock.Tick will leak. - heartbeat <-chan time.Time + heartbeat clock.Ticker // waitingForAddCh is a buffered channel that feeds waitingForAdd waitingForAddCh chan *waitFor @@ -138,6 +135,7 @@ func (pq waitForPriorityQueue) Peek() interface{} { func (q *delayingType) ShutDown() { q.Interface.ShutDown() close(q.stopCh) + q.heartbeat.Stop() } // AddAfter adds the given item to the work queue after the given delay @@ -209,7 +207,7 @@ func (q *delayingType) waitingLoop() { case <-q.stopCh: return - case <-q.heartbeat: + case <-q.heartbeat.C(): // continue the loop, which will add ready items case <-nextReadyAt: diff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go index 32d7fc9068..3fbe07d0d8 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limitting_queue_test.go @@ -30,7 +30,7 @@ func TestRateLimitingQueue(t *testing.T) { delayingQueue := &delayingType{ Interface: New(), clock: fakeClock, - heartbeat: fakeClock.Tick(maxWait), + heartbeat: fakeClock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(""), diff --git a/test/images/logs-generator/logs_generator.go b/test/images/logs-generator/logs_generator.go index 87189172f1..d11a0025e5 100644 --- a/test/images/logs-generator/logs_generator.go +++ b/test/images/logs-generator/logs_generator.go @@ -62,10 +62,11 @@ func generateLogs(linesTotal int, duration time.Duration) { delay := duration / time.Duration(linesTotal) rand.Seed(time.Now().UnixNano()) - tick := time.Tick(delay) + ticker := time.NewTicker(delay) + defer ticker.Stop() for id := 0; id < linesTotal; id++ { glog.Info(generateLogLine(id)) - <-tick + <-ticker.C } }