Merge pull request #61976 from atlassian/ticker-with-stop

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Stop() for Ticker to enable leak-free code

**What this PR does / why we need it**:
I wanted to use the clock package but the `Ticker` without a `Stop()` method is a deal breaker for me.

**Release note**:
```release-note
NONE
```
/kind enhancement
/sig api-machinery
pull/8/head
Kubernetes Submit Queue 2018-05-09 19:06:56 -07:00 committed by GitHub
commit d42df4561a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 69 additions and 35 deletions

View File

@ -414,13 +414,15 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
func (m *managerImpl) waitForPodsCleanup(podCleanedUpFunc PodCleanedUpFunc, pods []*v1.Pod) { func (m *managerImpl) waitForPodsCleanup(podCleanedUpFunc PodCleanedUpFunc, pods []*v1.Pod) {
timeout := m.clock.NewTimer(podCleanupTimeout) timeout := m.clock.NewTimer(podCleanupTimeout)
tick := m.clock.Tick(podCleanupPollFreq) defer timeout.Stop()
ticker := m.clock.NewTicker(podCleanupPollFreq)
defer ticker.Stop()
for { for {
select { select {
case <-timeout.C(): case <-timeout.C():
glog.Warningf("eviction manager: timed out waiting for pods %s to be cleaned up", format.Pods(pods)) glog.Warningf("eviction manager: timed out waiting for pods %s to be cleaned up", format.Pods(pods))
return return
case <-tick: case <-ticker.C():
for i, pod := range pods { for i, pod := range pods {
if !podCleanedUpFunc(pod) { if !podCleanedUpFunc(pod) {
break break

View File

@ -26,18 +26,12 @@ import (
type Clock interface { type Clock interface {
Now() time.Time Now() time.Time
Since(time.Time) time.Duration Since(time.Time) time.Duration
After(d time.Duration) <-chan time.Time After(time.Duration) <-chan time.Time
NewTimer(d time.Duration) Timer NewTimer(time.Duration) Timer
Sleep(d time.Duration) Sleep(time.Duration)
Tick(d time.Duration) <-chan time.Time NewTicker(time.Duration) Ticker
} }
var (
_ = Clock(RealClock{})
_ = Clock(&FakeClock{})
_ = Clock(&IntervalClock{})
)
// RealClock really calls time.Now() // RealClock really calls time.Now()
type RealClock struct{} type RealClock struct{}
@ -62,8 +56,10 @@ func (RealClock) NewTimer(d time.Duration) Timer {
} }
} }
func (RealClock) Tick(d time.Duration) <-chan time.Time { func (RealClock) NewTicker(d time.Duration) Ticker {
return time.Tick(d) return &realTicker{
ticker: time.NewTicker(d),
}
} }
func (RealClock) Sleep(d time.Duration) { func (RealClock) Sleep(d time.Duration) {
@ -137,7 +133,7 @@ func (f *FakeClock) NewTimer(d time.Duration) Timer {
return timer return timer
} }
func (f *FakeClock) Tick(d time.Duration) <-chan time.Time { func (f *FakeClock) NewTicker(d time.Duration) Ticker {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
tickTime := f.time.Add(d) tickTime := f.time.Add(d)
@ -149,7 +145,9 @@ func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
destChan: ch, destChan: ch,
}) })
return ch return &fakeTicker{
c: ch,
}
} }
// Move clock by Duration, notify anyone that's called After, Tick, or NewTimer // Move clock by Duration, notify anyone that's called After, Tick, or NewTimer
@ -242,8 +240,8 @@ func (*IntervalClock) NewTimer(d time.Duration) Timer {
// Unimplemented, will panic. // Unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented. // TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) Tick(d time.Duration) <-chan time.Time { func (*IntervalClock) NewTicker(d time.Duration) Ticker {
panic("IntervalClock doesn't implement Tick") panic("IntervalClock doesn't implement NewTicker")
} }
func (*IntervalClock) Sleep(d time.Duration) { func (*IntervalClock) Sleep(d time.Duration) {
@ -258,11 +256,6 @@ type Timer interface {
Reset(d time.Duration) bool Reset(d time.Duration) bool
} }
var (
_ = Timer(&realTimer{})
_ = Timer(&fakeTimer{})
)
// realTimer is backed by an actual time.Timer. // realTimer is backed by an actual time.Timer.
type realTimer struct { type realTimer struct {
timer *time.Timer timer *time.Timer
@ -325,3 +318,31 @@ func (f *fakeTimer) Reset(d time.Duration) bool {
return active return active
} }
type Ticker interface {
C() <-chan time.Time
Stop()
}
type realTicker struct {
ticker *time.Ticker
}
func (t *realTicker) C() <-chan time.Time {
return t.ticker.C
}
func (t *realTicker) Stop() {
t.ticker.Stop()
}
type fakeTicker struct {
c <-chan time.Time
}
func (t *fakeTicker) C() <-chan time.Time {
return t.c
}
func (t *fakeTicker) Stop() {
}

View File

@ -21,6 +21,18 @@ import (
"time" "time"
) )
var (
_ = Clock(RealClock{})
_ = Clock(&FakeClock{})
_ = Clock(&IntervalClock{})
_ = Timer(&realTimer{})
_ = Timer(&fakeTimer{})
_ = Ticker(&realTicker{})
_ = Ticker(&fakeTicker{})
)
func TestFakeClock(t *testing.T) { func TestFakeClock(t *testing.T) {
startTime := time.Now() startTime := time.Now()
tc := NewFakeClock(startTime) tc := NewFakeClock(startTime)
@ -110,13 +122,13 @@ func TestFakeTick(t *testing.T) {
if tc.HasWaiters() { if tc.HasWaiters() {
t.Errorf("unexpected waiter?") t.Errorf("unexpected waiter?")
} }
oneSec := tc.Tick(time.Second) oneSec := tc.NewTicker(time.Second).C()
if !tc.HasWaiters() { if !tc.HasWaiters() {
t.Errorf("unexpected lack of waiter?") t.Errorf("unexpected lack of waiter?")
} }
oneOhOneSec := tc.Tick(time.Second + time.Millisecond) oneOhOneSec := tc.NewTicker(time.Second + time.Millisecond).C()
twoSec := tc.Tick(2 * time.Second) twoSec := tc.NewTicker(2 * time.Second).C()
select { select {
case <-oneSec: case <-oneSec:
t.Errorf("unexpected channel read") t.Errorf("unexpected channel read")

View File

@ -45,7 +45,7 @@ func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{ ret := &delayingType{
Interface: NewNamed(name), Interface: NewNamed(name),
clock: clock, clock: clock,
heartbeat: clock.Tick(maxWait), heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name), metrics: newRetryMetrics(name),
@ -67,10 +67,7 @@ type delayingType struct {
stopCh chan struct{} stopCh chan struct{}
// heartbeat ensures we wait no more than maxWait before firing // heartbeat ensures we wait no more than maxWait before firing
// heartbeat clock.Ticker
// TODO: replace with Ticker (and add to clock) so this can be cleaned up.
// clock.Tick will leak.
heartbeat <-chan time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd // waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor waitingForAddCh chan *waitFor
@ -138,6 +135,7 @@ func (pq waitForPriorityQueue) Peek() interface{} {
func (q *delayingType) ShutDown() { func (q *delayingType) ShutDown() {
q.Interface.ShutDown() q.Interface.ShutDown()
close(q.stopCh) close(q.stopCh)
q.heartbeat.Stop()
} }
// AddAfter adds the given item to the work queue after the given delay // AddAfter adds the given item to the work queue after the given delay
@ -209,7 +207,7 @@ func (q *delayingType) waitingLoop() {
case <-q.stopCh: case <-q.stopCh:
return return
case <-q.heartbeat: case <-q.heartbeat.C():
// continue the loop, which will add ready items // continue the loop, which will add ready items
case <-nextReadyAt: case <-nextReadyAt:

View File

@ -30,7 +30,7 @@ func TestRateLimitingQueue(t *testing.T) {
delayingQueue := &delayingType{ delayingQueue := &delayingType{
Interface: New(), Interface: New(),
clock: fakeClock, clock: fakeClock,
heartbeat: fakeClock.Tick(maxWait), heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""), metrics: newRetryMetrics(""),

View File

@ -62,10 +62,11 @@ func generateLogs(linesTotal int, duration time.Duration) {
delay := duration / time.Duration(linesTotal) delay := duration / time.Duration(linesTotal)
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
tick := time.Tick(delay) ticker := time.NewTicker(delay)
defer ticker.Stop()
for id := 0; id < linesTotal; id++ { for id := 0; id < linesTotal; id++ {
glog.Info(generateLogLine(id)) glog.Info(generateLogLine(id))
<-tick <-ticker.C
} }
} }