mirror of https://github.com/k3s-io/k3s
Add a rate limiter, use it to rate limit docker pulls.
parent
da92a016f5
commit
3ac706a32e
|
@ -61,6 +61,8 @@ var (
|
|||
etcdServerList util.StringList
|
||||
rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).")
|
||||
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
|
||||
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
|
||||
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -157,7 +159,9 @@ func main() {
|
|||
cadvisorClient,
|
||||
etcdClient,
|
||||
*rootDirectory,
|
||||
*syncFrequency)
|
||||
*syncFrequency,
|
||||
float32(*registryPullQPS),
|
||||
*registryBurst)
|
||||
|
||||
health.AddHealthChecker("exec", health.NewExecHealthChecker(k))
|
||||
health.AddHealthChecker("http", health.NewHTTPHealthChecker(&http.Client{}))
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
@ -64,8 +65,13 @@ type dockerPuller struct {
|
|||
keyring *dockerKeyring
|
||||
}
|
||||
|
||||
type throttledDockerPuller struct {
|
||||
puller dockerPuller
|
||||
limiter util.RateLimiter
|
||||
}
|
||||
|
||||
// NewDockerPuller creates a new instance of the default implementation of DockerPuller.
|
||||
func NewDockerPuller(client DockerInterface) DockerPuller {
|
||||
func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
|
||||
dp := dockerPuller{
|
||||
client: client,
|
||||
keyring: newDockerKeyring(),
|
||||
|
@ -81,8 +87,13 @@ func NewDockerPuller(client DockerInterface) DockerPuller {
|
|||
if dp.keyring.count() == 0 {
|
||||
glog.Infof("Continuing with empty docker keyring")
|
||||
}
|
||||
|
||||
return dp
|
||||
if qps == 0.0 {
|
||||
return dp
|
||||
}
|
||||
return &throttledDockerPuller{
|
||||
puller: dp,
|
||||
limiter: util.NewTokenBucketRateLimiter(qps, burst),
|
||||
}
|
||||
}
|
||||
|
||||
type dockerContainerCommandRunner struct{}
|
||||
|
@ -130,6 +141,13 @@ func (p dockerPuller) Pull(image string) error {
|
|||
return p.client.PullImage(opts, creds)
|
||||
}
|
||||
|
||||
func (p throttledDockerPuller) Pull(image string) error {
|
||||
if p.limiter.CanAccept() {
|
||||
return p.puller.Pull(image)
|
||||
}
|
||||
return fmt.Errorf("pull QPS exceeded.")
|
||||
}
|
||||
|
||||
// DockerContainers is a map of containers
|
||||
type DockerContainers map[DockerID]*docker.APIContainers
|
||||
|
||||
|
|
|
@ -69,7 +69,9 @@ func NewMainKubelet(
|
|||
cc CadvisorInterface,
|
||||
ec tools.EtcdClient,
|
||||
rd string,
|
||||
ri time.Duration) *Kubelet {
|
||||
ri time.Duration,
|
||||
pullQPS float32,
|
||||
pullBurst int) *Kubelet {
|
||||
return &Kubelet{
|
||||
hostname: hn,
|
||||
dockerClient: dc,
|
||||
|
@ -80,6 +82,8 @@ func NewMainKubelet(
|
|||
podWorkers: newPodWorkers(),
|
||||
runner: dockertools.NewDockerContainerCommandRunner(),
|
||||
httpClient: &http.Client{},
|
||||
pullQPS: pullQPS,
|
||||
pullBurst: pullBurst,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,6 +125,10 @@ type Kubelet struct {
|
|||
runner dockertools.ContainerCommandRunner
|
||||
// Optional, client for http requests, defaults to empty client
|
||||
httpClient httpGetInterface
|
||||
// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
|
||||
pullQPS float32
|
||||
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
|
||||
pullBurst int
|
||||
}
|
||||
|
||||
// Run starts the kubelet reacting to config updates
|
||||
|
@ -129,7 +137,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
|
|||
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
|
||||
}
|
||||
if kl.dockerPuller == nil {
|
||||
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient)
|
||||
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
|
||||
}
|
||||
if kl.healthChecker == nil {
|
||||
kl.healthChecker = health.NewHealthChecker()
|
||||
|
@ -404,7 +412,9 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error
|
|||
Image: networkContainerImage,
|
||||
Ports: ports,
|
||||
}
|
||||
kl.dockerPuller.Pull(networkContainerImage)
|
||||
if err := kl.dockerPuller.Pull(networkContainerImage); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return kl.runContainer(pod, container, nil, "")
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type RateLimiter interface {
|
||||
// CanAccept returns true if the rate is below the limit, false otherwise
|
||||
CanAccept() bool
|
||||
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
|
||||
Stop()
|
||||
}
|
||||
|
||||
type tickRateLimiter struct {
|
||||
lock sync.Mutex
|
||||
tokens chan bool
|
||||
ticker <-chan time.Time
|
||||
stop chan bool
|
||||
}
|
||||
|
||||
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
|
||||
// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
|
||||
// smoothed qps rate of 'qps'.
|
||||
// The bucket is initially filled with 'burst' tokens, the rate limiter spawns a go routine
|
||||
// which refills the bucket with one token at a rate of 'qps'. The maximum number of tokens in
|
||||
// the bucket is capped at 'burst'.
|
||||
// When done with the limiter, Stop() must be called to halt the associated goroutine.
|
||||
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
|
||||
ticker := time.Tick(time.Duration(float32(time.Second) / qps))
|
||||
rate := newTokenBucketRateLimiterFromTicker(ticker, burst)
|
||||
go rate.run()
|
||||
return rate
|
||||
}
|
||||
|
||||
func newTokenBucketRateLimiterFromTicker(ticker <-chan time.Time, burst int) *tickRateLimiter {
|
||||
if burst < 1 {
|
||||
panic("burst must be a positive integer")
|
||||
}
|
||||
rate := &tickRateLimiter{
|
||||
tokens: make(chan bool, burst),
|
||||
ticker: ticker,
|
||||
stop: make(chan bool),
|
||||
}
|
||||
for i := 0; i < burst; i++ {
|
||||
rate.tokens <- true
|
||||
}
|
||||
return rate
|
||||
}
|
||||
|
||||
func (t *tickRateLimiter) CanAccept() bool {
|
||||
select {
|
||||
case <-t.tokens:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tickRateLimiter) Stop() {
|
||||
close(t.stop)
|
||||
}
|
||||
|
||||
func (r *tickRateLimiter) run() {
|
||||
for {
|
||||
if !r.step() {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *tickRateLimiter) step() bool {
|
||||
select {
|
||||
case <-r.ticker:
|
||||
r.increment()
|
||||
return true
|
||||
case <-r.stop:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tickRateLimiter) increment() {
|
||||
// non-blocking send
|
||||
select {
|
||||
case t.tokens <- true:
|
||||
default:
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestBasicThrottle(t *testing.T) {
|
||||
ticker := make(chan time.Time, 1)
|
||||
r := newTokenBucketRateLimiterFromTicker(ticker, 3)
|
||||
for i := 0; i < 3; i++ {
|
||||
if !r.CanAccept() {
|
||||
t.Error("unexpected false accept")
|
||||
}
|
||||
}
|
||||
if r.CanAccept() {
|
||||
t.Error("unexpected true accept")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIncrementThrottle(t *testing.T) {
|
||||
ticker := make(chan time.Time, 1)
|
||||
r := newTokenBucketRateLimiterFromTicker(ticker, 1)
|
||||
if !r.CanAccept() {
|
||||
t.Error("unexpected false accept")
|
||||
}
|
||||
if r.CanAccept() {
|
||||
t.Error("unexpected true accept")
|
||||
}
|
||||
ticker <- time.Now()
|
||||
r.step()
|
||||
|
||||
if !r.CanAccept() {
|
||||
t.Error("unexpected false accept")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOverBurst(t *testing.T) {
|
||||
ticker := make(chan time.Time, 1)
|
||||
r := newTokenBucketRateLimiterFromTicker(ticker, 3)
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
ticker <- time.Now()
|
||||
r.step()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue