From f265d5c5eebbd76048228573ba63d2ac34b554fa Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Tue, 30 Jun 2015 09:59:37 -0400 Subject: [PATCH] Switch token bucket rate limiter to github.com/juju/ratelimit --- pkg/util/throttle.go | 73 +++++---------------------------------- pkg/util/throttle_test.go | 21 +++-------- 2 files changed, 13 insertions(+), 81 deletions(-) diff --git a/pkg/util/throttle.go b/pkg/util/throttle.go index e10f372746..43fdc0a946 100644 --- a/pkg/util/throttle.go +++ b/pkg/util/throttle.go @@ -16,10 +16,7 @@ limitations under the License. package util -import ( - "sync" - "time" -) +import "github.com/juju/ratelimit" type RateLimiter interface { // CanAccept returns true if the rate is below the limit, false otherwise @@ -31,24 +28,17 @@ type RateLimiter interface { } type tickRateLimiter struct { - lock sync.Mutex - tokens chan bool - ticker <-chan time.Time - stop chan bool + limiter *ratelimit.Bucket } // NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach. // The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a // smoothed qps rate of 'qps'. -// The bucket is initially filled with 'burst' tokens, the rate limiter spawns a go routine -// which refills the bucket with one token at a rate of 'qps'. The maximum number of tokens in -// the bucket is capped at 'burst'. -// When done with the limiter, Stop() must be called to halt the associated goroutine. +// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'. +// The maximum number of tokens in the bucket is capped at 'burst'. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { - ticker := time.Tick(time.Duration(float32(time.Second) / qps)) - rate := newTokenBucketRateLimiterFromTicker(ticker, burst) - go rate.run() - return rate + limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst)) + return &tickRateLimiter{limiter} } type fakeRateLimiter struct{} @@ -57,63 +47,16 @@ func NewFakeRateLimiter() RateLimiter { return &fakeRateLimiter{} } -func newTokenBucketRateLimiterFromTicker(ticker <-chan time.Time, burst int) *tickRateLimiter { - if burst < 1 { - panic("burst must be a positive integer") - } - rate := &tickRateLimiter{ - tokens: make(chan bool, burst), - ticker: ticker, - stop: make(chan bool), - } - for i := 0; i < burst; i++ { - rate.tokens <- true - } - return rate -} - func (t *tickRateLimiter) CanAccept() bool { - select { - case <-t.tokens: - return true - default: - return false - } + return t.limiter.TakeAvailable(1) == 1 } // Accept will block until a token becomes available func (t *tickRateLimiter) Accept() { - <-t.tokens + t.limiter.Wait(1) } func (t *tickRateLimiter) Stop() { - close(t.stop) -} - -func (r *tickRateLimiter) run() { - for { - if !r.step() { - break - } - } -} - -func (r *tickRateLimiter) step() bool { - select { - case <-r.ticker: - r.increment() - return true - case <-r.stop: - return false - } -} - -func (t *tickRateLimiter) increment() { - // non-blocking send - select { - case t.tokens <- true: - default: - } } func (t *fakeRateLimiter) CanAccept() bool { diff --git a/pkg/util/throttle_test.go b/pkg/util/throttle_test.go index b8edd21653..b4b145b79d 100644 --- a/pkg/util/throttle_test.go +++ b/pkg/util/throttle_test.go @@ -22,8 +22,7 @@ import ( ) func TestBasicThrottle(t *testing.T) { - ticker := make(chan time.Time, 1) - r := newTokenBucketRateLimiterFromTicker(ticker, 3) + r := NewTokenBucketRateLimiter(1, 3) for i := 0; i < 3; i++ { if !r.CanAccept() { t.Error("unexpected false accept") @@ -35,32 +34,22 @@ func TestBasicThrottle(t *testing.T) { } func TestIncrementThrottle(t *testing.T) { - ticker := make(chan time.Time, 1) - r := newTokenBucketRateLimiterFromTicker(ticker, 1) + r := NewTokenBucketRateLimiter(1, 1) if !r.CanAccept() { t.Error("unexpected false accept") } if r.CanAccept() { t.Error("unexpected true accept") } - ticker <- time.Now() - r.step() + + // Allow to refill + time.Sleep(2 * time.Second) if !r.CanAccept() { t.Error("unexpected false accept") } } -func TestOverBurst(t *testing.T) { - ticker := make(chan time.Time, 1) - r := newTokenBucketRateLimiterFromTicker(ticker, 3) - - for i := 0; i < 4; i++ { - ticker <- time.Now() - r.step() - } -} - func TestThrottle(t *testing.T) { r := NewTokenBucketRateLimiter(10, 5)