Add bounded frequency runner

This lib manages runs of a function to have min and max frequencies.
pull/6/head
Tim Hockin 2017-05-12 08:30:45 -07:00
parent 3153ca2815
commit bbb80c252b
3 changed files with 573 additions and 2 deletions

View File

@ -10,13 +10,23 @@ load(
go_library(
name = "go_default_library",
srcs = ["runner.go"],
srcs = [
"bounded_frequency_runner.go",
"runner.go",
],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["runner_test.go"],
srcs = [
"bounded_frequency_runner_test.go",
"runner_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
)

View File

@ -0,0 +1,229 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package async
import (
"fmt"
"sync"
"time"
"k8s.io/client-go/util/flowcontrol"
"github.com/golang/glog"
)
// BoundedFrequencyRunner manages runs of a user-provided function.
// See NewBoundedFrequencyRunner for examples.
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs
}
// designed so that flowcontrol.RateLimiter satisfies
type rateLimiter interface {
TryAccept() bool
Stop()
}
type nullLimiter struct{}
func (nullLimiter) TryAccept() bool {
return true
}
func (nullLimiter) Stop() {}
var _ rateLimiter = nullLimiter{}
// for testing
type timer interface {
// C returns the timer's selectable channel.
C() <-chan time.Time
// See time.Timer.Reset.
Reset(d time.Duration) bool
// See time.Timer.Stop.
Stop() bool
// See time.Now.
Now() time.Time
// See time.Since.
Since(t time.Time) time.Duration
// See time.Sleep.
Sleep(d time.Duration)
}
// implement our timer in terms of std time.Timer.
type realTimer struct {
*time.Timer
}
func (rt realTimer) C() <-chan time.Time {
return rt.Timer.C
}
func (rt realTimer) Now() time.Time {
return time.Now()
}
func (rt realTimer) Since(t time.Time) time.Duration {
return time.Since(t)
}
func (rt realTimer) Sleep(d time.Duration) {
time.Sleep(d)
}
var _ timer = realTimer{}
// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
// which will manage runs of the specified function.
//
// All runs will be async to the caller of BoundedFrequencyRunner.Run, but
// multiple runs are serialized. If the function needs to hold locks, it must
// take them internally.
//
// Runs of the funtion will have at least minInterval between them (from
// completion to next start), except that up to bursts may be allowed. Burst
// runs are "accumulated" over time, one per minInterval up to burstRuns total.
// This can be used, for example, to mitigate the impact of expensive operations
// being called in response to user-initiated operations. Run requests that
// would violate the minInterval are coallesced and run at the next opportunity.
//
// The function will be run at least once per maxInterval. For example, this can
// force periodic refreshes of state in the absence of anyone calling Run.
//
// Examples:
//
// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1)
// - fn will have at least 1 second between runs
// - fn will have no more than 5 seconds between runs
//
// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
// - fn will have at least 3 seconds between runs, with up to 3 burst runs
// - fn will have no more than 10 seconds between runs
//
// The maxInterval must be greater than or equal to the minInterval, If the
// caller passes a maxInterval less than minInterval, this function will panic.
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately
<-timer.C() // consume the first tick
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}
// Make an instance with dependencies injected.
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
if maxInterval < minInterval {
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval))
}
if timer == nil {
panic(fmt.Sprintf("%s: timer must be non-nil", name))
}
bfr := &BoundedFrequencyRunner{
name: name,
fn: fn,
minInterval: minInterval,
maxInterval: maxInterval,
run: make(chan struct{}, 16),
timer: timer,
}
if minInterval == 0 {
bfr.limiter = nullLimiter{}
} else {
// allow burst updates in short succession
qps := float32(time.Second) / float32(minInterval)
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
}
return bfr
}
// Loop handles the periodic timer and run requests. This is expected to be
// called as a goroutine.
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
glog.V(3).Infof("%s Loop running", bfr.name)
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
glog.V(3).Infof("%s Loop stopping", bfr.name)
return
case <-bfr.timer.C():
bfr.tryRun()
case <-bfr.run:
bfr.tryRun()
}
}
}
// Run the function as soon as possible. If this is called while Loop is not
// running, the call may be deferred indefinitely.
func (bfr *BoundedFrequencyRunner) Run() {
bfr.run <- struct{}{}
}
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) stop() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
bfr.limiter.Stop()
bfr.timer.Stop()
}
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
if bfr.limiter.TryAccept() {
// We're allowed to run the function right now.
bfr.fn()
bfr.lastRun = bfr.timer.Now()
bfr.timer.Stop()
bfr.timer.Reset(bfr.maxInterval)
glog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
return
}
// It can't run right now, figure out when it can run next.
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
nextPossible := bfr.minInterval - elapsed // time to next possible run
nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
glog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
if nextPossible < nextScheduled {
// Set the timer for ASAP, but don't drain here. Assuming Loop is running,
// it might get a delivery in the mean time, but that is OK.
bfr.timer.Stop()
bfr.timer.Reset(nextPossible)
glog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
}
}

View File

@ -0,0 +1,332 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package async
import (
"sync"
"testing"
"time"
)
// Track calls to the managed function.
type receiver struct {
lock sync.Mutex
run bool
}
func (r *receiver) F() {
r.lock.Lock()
defer r.lock.Unlock()
r.run = true
}
func (r *receiver) reset() bool {
r.lock.Lock()
defer r.lock.Unlock()
was := r.run
r.run = false
return was
}
// A single change event in the fake timer.
type timerUpdate struct {
active bool
next time.Duration // iff active == true
}
// Fake time.
type fakeTimer struct {
c chan time.Time
lock sync.Mutex
now time.Time
active bool
updated chan timerUpdate
}
func newFakeTimer() *fakeTimer {
ft := &fakeTimer{
c: make(chan time.Time),
updated: make(chan timerUpdate),
}
return ft
}
func (ft *fakeTimer) C() <-chan time.Time {
return ft.c
}
func (ft *fakeTimer) Reset(in time.Duration) bool {
ft.lock.Lock()
defer ft.lock.Unlock()
was := ft.active
ft.active = true
ft.updated <- timerUpdate{
active: true,
next: in,
}
return was
}
func (ft *fakeTimer) Stop() bool {
ft.lock.Lock()
defer ft.lock.Unlock()
was := ft.active
ft.active = false
ft.updated <- timerUpdate{
active: false,
}
return was
}
func (ft *fakeTimer) Now() time.Time {
ft.lock.Lock()
defer ft.lock.Unlock()
return ft.now
}
func (ft *fakeTimer) Since(t time.Time) time.Duration {
ft.lock.Lock()
defer ft.lock.Unlock()
return ft.now.Sub(t)
}
func (ft *fakeTimer) Sleep(d time.Duration) {
ft.lock.Lock()
defer ft.lock.Unlock()
ft.advance(d)
}
// advance the current time.
func (ft *fakeTimer) advance(d time.Duration) {
ft.lock.Lock()
defer ft.lock.Unlock()
ft.now = ft.now.Add(d)
}
// send a timer tick.
func (ft *fakeTimer) tick() {
ft.lock.Lock()
defer ft.lock.Unlock()
ft.active = false
ft.c <- ft.now
}
// return the calling line number (for printing)
// test the timer's state
func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
if upd.active != active {
t.Fatalf("%s: expected timer active=%v", name, active)
}
if active && upd.next != next {
t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
}
}
// test and reset the receiver's state
func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
triggered := receiver.reset()
if expected && !triggered {
t.Fatalf("%s: function should have been called", name)
} else if !expected && triggered {
t.Fatalf("%s: function should not have been called", name)
}
}
// Durations embedded in test cases depend on these.
var minInterval = 1 * time.Second
var maxInterval = 10 * time.Second
func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
upd := <-timer.updated // wait for stop
checkReceiver(name, t, obj, expectCall)
checkReceiver(name, t, obj, false) // prove post-condition
checkTimer(name, t, upd, false, 0)
upd = <-timer.updated // wait for reset
checkTimer(name, t, upd, true, expectNext)
}
func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
waitForReset(name, t, timer, obj, true, maxInterval)
}
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
waitForReset(name, t, timer, obj, false, expectNext)
}
func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
obj := &receiver{}
timer := newFakeTimer()
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
stop := make(chan struct{})
var upd timerUpdate
// Start.
go runner.Loop(stop)
upd = <-timer.updated // wait for initial time to be set to max
checkTimer("init", t, upd, true, maxInterval)
checkReceiver("init", t, obj, false)
// Run once, immediately.
// rel=0ms
runner.Run()
waitForRun("first run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(500 * time.Millisecond) // rel=500ms
runner.Run()
waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(499 * time.Millisecond) // rel=999ms
runner.Run()
waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
// Run again, once minInterval has passed (race with timer).
timer.advance(1 * time.Millisecond) // rel=1000ms
runner.Run()
waitForRun("second run", t, timer, obj)
// Run again, before minInterval expires.
// rel=0ms
runner.Run()
waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // rel=1ms
runner.Run()
waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
// Let the timer tick prematurely.
timer.advance(998 * time.Millisecond) // rel=999ms
timer.tick()
waitForDefer("premature tick", t, timer, obj, 1*time.Millisecond)
// Let the timer tick.
timer.advance(1 * time.Millisecond) // rel=1000ms
timer.tick()
waitForRun("first tick", t, timer, obj)
// Let the timer tick.
timer.advance(10 * time.Second) // rel=10000ms
timer.tick()
waitForRun("second tick", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // rel=1ms
runner.Run()
waitForDefer("too soon after tick", t, timer, obj, 999*time.Millisecond)
// Let the timer tick.
timer.advance(999 * time.Millisecond) // rel=1000ms
timer.tick()
waitForRun("third tick", t, timer, obj)
// Clean up.
stop <- struct{}{}
}
func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
obj := &receiver{}
timer := newFakeTimer()
runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
stop := make(chan struct{})
var upd timerUpdate
// Start.
go runner.Loop(stop)
upd = <-timer.updated // wait for initial time to be set to max
checkTimer("init", t, upd, true, maxInterval)
checkReceiver("init", t, obj, false)
// Run once, immediately.
// abs=0ms, rel=0ms
runner.Run()
waitForRun("first run", t, timer, obj)
// Run again, before minInterval expires, with burst.
timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
runner.Run()
waitForRun("second run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
runner.Run()
waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
runner.Run()
waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
runner.Run()
waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
// Run again, once burst has replenished.
timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
runner.Run()
waitForRun("third run", t, timer, obj)
// Run again, before minInterval expires.
timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
runner.Run()
waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
// Run again, before minInterval expires.
timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
runner.Run()
waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
// Run again, once burst has replenished.
timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
runner.Run()
waitForRun("fourth run", t, timer, obj)
// Run again, once burst has fully replenished.
timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
runner.Run()
waitForRun("fifth run", t, timer, obj)
runner.Run()
waitForRun("sixth run", t, timer, obj)
runner.Run()
waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
// Let the timer tick.
timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
timer.tick()
waitForRun("first tick", t, timer, obj)
// Let the timer tick.
timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
timer.tick()
waitForRun("second tick", t, timer, obj)
// Clean up.
stop <- struct{}{}
}