Switch from juju/ratelimit to golang.org/x/time/rate

pull/6/head
Jordan Liggitt 2018-01-19 02:08:51 -05:00
parent a9ed90f227
commit 4b9f00988b
No known key found for this signature in database
GPG Key ID: 39928704103C7229
13 changed files with 65 additions and 22 deletions

View File

@ -15,7 +15,7 @@ go_library(
deps = [ deps = [
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/juju/ratelimit:go_default_library", "//vendor/golang.org/x/time/rate:go_default_library",
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library", "//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",

View File

@ -22,6 +22,9 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/golang/glog"
"golang.org/x/time/rate"
certificates "k8s.io/api/certificates/v1beta1" certificates "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -34,9 +37,6 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
"github.com/juju/ratelimit"
) )
type CertificateController struct { type CertificateController struct {
@ -65,7 +65,7 @@ func NewCertificateController(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second), workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))}, &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "certificate"), ), "certificate"),
handler: handler, handler: handler,
} }

View File

@ -61,6 +61,7 @@ type fakeTimer struct {
func newFakeTimer() *fakeTimer { func newFakeTimer() *fakeTimer {
ft := &fakeTimer{ ft := &fakeTimer{
now: time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC),
c: make(chan time.Time), c: make(chan time.Time),
updated: make(chan timerUpdate), updated: make(chan timerUpdate),
} }

View File

@ -406,6 +406,10 @@
"ImportPath": "golang.org/x/text/width", "ImportPath": "golang.org/x/text/width",
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
}, },
{
"ImportPath": "golang.org/x/time/rate",
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
},
{ {
"ImportPath": "google.golang.org/genproto/googleapis/api/annotations", "ImportPath": "google.golang.org/genproto/googleapis/api/annotations",
"Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85" "Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85"

View File

@ -274,6 +274,10 @@
"ImportPath": "golang.org/x/text/unicode/norm", "ImportPath": "golang.org/x/text/unicode/norm",
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
}, },
{
"ImportPath": "golang.org/x/time/rate",
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
},
{ {
"ImportPath": "gopkg.in/inf.v0", "ImportPath": "gopkg.in/inf.v0",
"Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4" "Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4"

View File

@ -25,7 +25,7 @@ go_library(
], ],
importpath = "k8s.io/client-go/util/flowcontrol", importpath = "k8s.io/client-go/util/flowcontrol",
deps = [ deps = [
"//vendor/github.com/juju/ratelimit:go_default_library", "//vendor/golang.org/x/time/rate:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library", "//vendor/k8s.io/client-go/util/integer:go_default_library",
], ],

View File

@ -18,8 +18,9 @@ package flowcontrol
import ( import (
"sync" "sync"
"time"
"github.com/juju/ratelimit" "golang.org/x/time/rate"
) )
type RateLimiter interface { type RateLimiter interface {
@ -35,7 +36,8 @@ type RateLimiter interface {
} }
type tokenBucketRateLimiter struct { type tokenBucketRateLimiter struct {
limiter *ratelimit.Bucket limiter *rate.Limiter
clock Clock
qps float32 qps float32
} }
@ -45,36 +47,48 @@ type tokenBucketRateLimiter struct {
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'. // The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'. // The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst)) limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, qps) return newTokenBucketRateLimiter(limiter, realClock{}, qps)
} }
// An injectable, mockable clock interface. // An injectable, mockable clock interface.
type Clock interface { type Clock interface {
ratelimit.Clock Now() time.Time
Sleep(time.Duration)
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
} }
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter // NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
// but allows an injectable clock, for testing. // but allows an injectable clock, for testing.
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter { func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock) limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, qps) return newTokenBucketRateLimiter(limiter, c, qps)
} }
func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter { func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
return &tokenBucketRateLimiter{ return &tokenBucketRateLimiter{
limiter: limiter, limiter: limiter,
clock: c,
qps: qps, qps: qps,
} }
} }
func (t *tokenBucketRateLimiter) TryAccept() bool { func (t *tokenBucketRateLimiter) TryAccept() bool {
return t.limiter.TakeAvailable(1) == 1 return t.limiter.AllowN(t.clock.Now(), 1)
} }
// Accept will block until a token becomes available // Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() { func (t *tokenBucketRateLimiter) Accept() {
t.limiter.Wait(1) now := t.clock.Now()
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
} }
func (t *tokenBucketRateLimiter) Stop() { func (t *tokenBucketRateLimiter) Stop() {

View File

@ -34,7 +34,7 @@ go_library(
], ],
importpath = "k8s.io/client-go/util/workqueue", importpath = "k8s.io/client-go/util/workqueue",
deps = [ deps = [
"//vendor/github.com/juju/ratelimit:go_default_library", "//vendor/golang.org/x/time/rate:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
], ],

View File

@ -21,7 +21,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/juju/ratelimit" "golang.org/x/time/rate"
) )
type RateLimiter interface { type RateLimiter interface {
@ -40,19 +40,19 @@ func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter( return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))}, &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
) )
} }
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API // BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct { type BucketRateLimiter struct {
*ratelimit.Bucket *rate.Limiter
} }
var _ RateLimiter = &BucketRateLimiter{} var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration { func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Bucket.Take(1) return r.Limiter.Reserve().Delay()
} }
func (r *BucketRateLimiter) NumRequeues(item interface{}) int { func (r *BucketRateLimiter) NumRequeues(item interface{}) int {

View File

@ -382,6 +382,10 @@
"ImportPath": "golang.org/x/text/width", "ImportPath": "golang.org/x/text/width",
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
}, },
{
"ImportPath": "golang.org/x/time/rate",
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
},
{ {
"ImportPath": "google.golang.org/genproto/googleapis/api/annotations", "ImportPath": "google.golang.org/genproto/googleapis/api/annotations",
"Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85" "Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85"

View File

@ -70,6 +70,10 @@
"ImportPath": "github.com/spf13/pflag", "ImportPath": "github.com/spf13/pflag",
"Rev": "4c012f6dcd9546820e378d0bdda4d8fc772cdfea" "Rev": "4c012f6dcd9546820e378d0bdda4d8fc772cdfea"
}, },
{
"ImportPath": "golang.org/x/net/context",
"Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f"
},
{ {
"ImportPath": "golang.org/x/net/http2", "ImportPath": "golang.org/x/net/http2",
"Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f" "Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f"
@ -102,6 +106,10 @@
"ImportPath": "golang.org/x/text/unicode/norm", "ImportPath": "golang.org/x/text/unicode/norm",
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
}, },
{
"ImportPath": "golang.org/x/time/rate",
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
},
{ {
"ImportPath": "gopkg.in/inf.v0", "ImportPath": "gopkg.in/inf.v0",
"Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4" "Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4"

View File

@ -366,6 +366,10 @@
"ImportPath": "golang.org/x/text/width", "ImportPath": "golang.org/x/text/width",
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
}, },
{
"ImportPath": "golang.org/x/time/rate",
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
},
{ {
"ImportPath": "google.golang.org/genproto/googleapis/api/annotations", "ImportPath": "google.golang.org/genproto/googleapis/api/annotations",
"Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85" "Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85"

View File

@ -142,6 +142,10 @@
"ImportPath": "golang.org/x/text/unicode/norm", "ImportPath": "golang.org/x/text/unicode/norm",
"Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01"
}, },
{
"ImportPath": "golang.org/x/time/rate",
"Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631"
},
{ {
"ImportPath": "gopkg.in/inf.v0", "ImportPath": "gopkg.in/inf.v0",
"Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4" "Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4"