diff --git a/pkg/controller/certificates/BUILD b/pkg/controller/certificates/BUILD index a6940009ac..d5cfdf9bdc 100644 --- a/pkg/controller/certificates/BUILD +++ b/pkg/controller/certificates/BUILD @@ -15,7 +15,7 @@ go_library( deps = [ "//pkg/controller:go_default_library", "//vendor/github.com/golang/glog:go_default_library", - "//vendor/github.com/juju/ratelimit:go_default_library", + "//vendor/golang.org/x/time/rate:go_default_library", "//vendor/k8s.io/api/certificates/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index e86810a4be..23e9f342e1 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -22,6 +22,9 @@ import ( "fmt" "time" + "github.com/golang/glog" + "golang.org/x/time/rate" + certificates "k8s.io/api/certificates/v1beta1" "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -34,9 +37,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller" - - "github.com/golang/glog" - "github.com/juju/ratelimit" ) type CertificateController struct { @@ -65,7 +65,7 @@ func NewCertificateController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second), // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))}, + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ), "certificate"), handler: handler, } diff --git a/pkg/util/async/bounded_frequency_runner_test.go b/pkg/util/async/bounded_frequency_runner_test.go index 234ffb2a3f..f21bf58b1a 100644 --- a/pkg/util/async/bounded_frequency_runner_test.go +++ b/pkg/util/async/bounded_frequency_runner_test.go @@ -61,6 +61,7 @@ type fakeTimer struct { func newFakeTimer() *fakeTimer { ft := &fakeTimer{ + now: time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC), c: make(chan time.Time), updated: make(chan timerUpdate), } diff --git a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json index 579aed8d0a..911f2c9989 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json @@ -406,6 +406,10 @@ "ImportPath": "golang.org/x/text/width", "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" }, + { + "ImportPath": "golang.org/x/time/rate", + "Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631" + }, { "ImportPath": "google.golang.org/genproto/googleapis/api/annotations", "Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85" diff --git a/staging/src/k8s.io/client-go/Godeps/Godeps.json b/staging/src/k8s.io/client-go/Godeps/Godeps.json index 4d2293c4df..c3ca3219ab 100644 --- a/staging/src/k8s.io/client-go/Godeps/Godeps.json +++ b/staging/src/k8s.io/client-go/Godeps/Godeps.json @@ -274,6 +274,10 @@ "ImportPath": "golang.org/x/text/unicode/norm", "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" }, + { + "ImportPath": "golang.org/x/time/rate", + "Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631" + }, { "ImportPath": "gopkg.in/inf.v0", "Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4" diff --git a/staging/src/k8s.io/client-go/util/flowcontrol/BUILD b/staging/src/k8s.io/client-go/util/flowcontrol/BUILD index 410c369c6e..6590d5cb4c 100644 --- a/staging/src/k8s.io/client-go/util/flowcontrol/BUILD +++ b/staging/src/k8s.io/client-go/util/flowcontrol/BUILD @@ -25,7 +25,7 @@ go_library( ], importpath = "k8s.io/client-go/util/flowcontrol", deps = [ - "//vendor/github.com/juju/ratelimit:go_default_library", + "//vendor/golang.org/x/time/rate:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/client-go/util/integer:go_default_library", ], diff --git a/staging/src/k8s.io/client-go/util/flowcontrol/throttle.go b/staging/src/k8s.io/client-go/util/flowcontrol/throttle.go index 51e6c22f4c..e671c044d0 100644 --- a/staging/src/k8s.io/client-go/util/flowcontrol/throttle.go +++ b/staging/src/k8s.io/client-go/util/flowcontrol/throttle.go @@ -18,8 +18,9 @@ package flowcontrol import ( "sync" + "time" - "github.com/juju/ratelimit" + "golang.org/x/time/rate" ) type RateLimiter interface { @@ -35,7 +36,8 @@ type RateLimiter interface { } type tokenBucketRateLimiter struct { - limiter *ratelimit.Bucket + limiter *rate.Limiter + clock Clock qps float32 } @@ -45,36 +47,48 @@ type tokenBucketRateLimiter struct { // The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'. // The maximum number of tokens in the bucket is capped at 'burst'. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { - limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst)) - return newTokenBucketRateLimiter(limiter, qps) + limiter := rate.NewLimiter(rate.Limit(qps), burst) + return newTokenBucketRateLimiter(limiter, realClock{}, qps) } // An injectable, mockable clock interface. type Clock interface { - ratelimit.Clock + Now() time.Time + Sleep(time.Duration) +} + +type realClock struct{} + +func (realClock) Now() time.Time { + return time.Now() +} +func (realClock) Sleep(d time.Duration) { + time.Sleep(d) } // NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter // but allows an injectable clock, for testing. -func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter { - limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock) - return newTokenBucketRateLimiter(limiter, qps) +func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter { + limiter := rate.NewLimiter(rate.Limit(qps), burst) + return newTokenBucketRateLimiter(limiter, c, qps) } -func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter { +func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter { return &tokenBucketRateLimiter{ limiter: limiter, + clock: c, qps: qps, } } func (t *tokenBucketRateLimiter) TryAccept() bool { - return t.limiter.TakeAvailable(1) == 1 + return t.limiter.AllowN(t.clock.Now(), 1) } // Accept will block until a token becomes available func (t *tokenBucketRateLimiter) Accept() { - t.limiter.Wait(1) + now := t.clock.Now() + t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now)) } func (t *tokenBucketRateLimiter) Stop() { diff --git a/staging/src/k8s.io/client-go/util/workqueue/BUILD b/staging/src/k8s.io/client-go/util/workqueue/BUILD index 5cc87e6b36..54a299eb61 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/BUILD +++ b/staging/src/k8s.io/client-go/util/workqueue/BUILD @@ -34,7 +34,7 @@ go_library( ], importpath = "k8s.io/client-go/util/workqueue", deps = [ - "//vendor/github.com/juju/ratelimit:go_default_library", + "//vendor/golang.org/x/time/rate:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", ], diff --git a/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go b/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go index 35caed4fa4..a5bed29e00 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go +++ b/staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go @@ -21,7 +21,7 @@ import ( "sync" "time" - "github.com/juju/ratelimit" + "golang.org/x/time/rate" ) type RateLimiter interface { @@ -40,19 +40,19 @@ func DefaultControllerRateLimiter() RateLimiter { return NewMaxOfRateLimiter( NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))}, + &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) } // BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API type BucketRateLimiter struct { - *ratelimit.Bucket + *rate.Limiter } var _ RateLimiter = &BucketRateLimiter{} func (r *BucketRateLimiter) When(item interface{}) time.Duration { - return r.Bucket.Take(1) + return r.Limiter.Reserve().Delay() } func (r *BucketRateLimiter) NumRequeues(item interface{}) int { diff --git a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json index b82b465010..752484cf30 100644 --- a/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json +++ b/staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json @@ -382,6 +382,10 @@ "ImportPath": "golang.org/x/text/width", "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" }, + { + "ImportPath": "golang.org/x/time/rate", + "Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631" + }, { "ImportPath": "google.golang.org/genproto/googleapis/api/annotations", "Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85" diff --git a/staging/src/k8s.io/metrics/Godeps/Godeps.json b/staging/src/k8s.io/metrics/Godeps/Godeps.json index 08b642a0b0..7857e52340 100644 --- a/staging/src/k8s.io/metrics/Godeps/Godeps.json +++ b/staging/src/k8s.io/metrics/Godeps/Godeps.json @@ -70,6 +70,10 @@ "ImportPath": "github.com/spf13/pflag", "Rev": "4c012f6dcd9546820e378d0bdda4d8fc772cdfea" }, + { + "ImportPath": "golang.org/x/net/context", + "Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f" + }, { "ImportPath": "golang.org/x/net/http2", "Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f" @@ -102,6 +106,10 @@ "ImportPath": "golang.org/x/text/unicode/norm", "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" }, + { + "ImportPath": "golang.org/x/time/rate", + "Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631" + }, { "ImportPath": "gopkg.in/inf.v0", "Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4" diff --git a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json index 0dd5d9c39e..75f4e145f5 100644 --- a/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json @@ -366,6 +366,10 @@ "ImportPath": "golang.org/x/text/width", "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" }, + { + "ImportPath": "golang.org/x/time/rate", + "Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631" + }, { "ImportPath": "google.golang.org/genproto/googleapis/api/annotations", "Rev": "09f6ed296fc66555a25fe4ce95173148778dfa85" diff --git a/staging/src/k8s.io/sample-controller/Godeps/Godeps.json b/staging/src/k8s.io/sample-controller/Godeps/Godeps.json index e19928f564..81bf88781b 100644 --- a/staging/src/k8s.io/sample-controller/Godeps/Godeps.json +++ b/staging/src/k8s.io/sample-controller/Godeps/Godeps.json @@ -142,6 +142,10 @@ "ImportPath": "golang.org/x/text/unicode/norm", "Rev": "b19bf474d317b857955b12035d2c5acb57ce8b01" }, + { + "ImportPath": "golang.org/x/time/rate", + "Rev": "f51c12702a4d776e4c1fa9b0fabab841babae631" + }, { "ImportPath": "gopkg.in/inf.v0", "Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4"