diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 20fb98488b..b3e7a3069a 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -61,6 +61,8 @@ var ( etcdServerList util.StringList rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).") allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") + registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") + registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") ) func init() { @@ -157,7 +159,9 @@ func main() { cadvisorClient, etcdClient, *rootDirectory, - *syncFrequency) + *syncFrequency, + float32(*registryPullQPS), + *registryBurst) health.AddHealthChecker("exec", health.NewExecHealthChecker(k)) health.AddHealthChecker("http", health.NewHTTPHealthChecker(&http.Client{})) diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 35cb2a6b87..2ad0d4ea0e 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -28,6 +28,7 @@ import ( "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/fsouza/go-dockerclient" "github.com/golang/glog" ) @@ -64,8 +65,13 @@ type dockerPuller struct { keyring *dockerKeyring } +type throttledDockerPuller struct { + puller dockerPuller + limiter util.RateLimiter +} + // NewDockerPuller creates a new instance of the default implementation of DockerPuller. -func NewDockerPuller(client DockerInterface) DockerPuller { +func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller { dp := dockerPuller{ client: client, keyring: newDockerKeyring(), @@ -81,8 +87,13 @@ func NewDockerPuller(client DockerInterface) DockerPuller { if dp.keyring.count() == 0 { glog.Infof("Continuing with empty docker keyring") } - - return dp + if qps == 0.0 { + return dp + } + return &throttledDockerPuller{ + puller: dp, + limiter: util.NewTokenBucketRateLimiter(qps, burst), + } } type dockerContainerCommandRunner struct{} @@ -130,6 +141,13 @@ func (p dockerPuller) Pull(image string) error { return p.client.PullImage(opts, creds) } +func (p throttledDockerPuller) Pull(image string) error { + if p.limiter.CanAccept() { + return p.puller.Pull(image) + } + return fmt.Errorf("pull QPS exceeded.") +} + // DockerContainers is a map of containers type DockerContainers map[DockerID]*docker.APIContainers diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8e097a95b3..18b03040d0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -69,7 +69,9 @@ func NewMainKubelet( cc CadvisorInterface, ec tools.EtcdClient, rd string, - ri time.Duration) *Kubelet { + ri time.Duration, + pullQPS float32, + pullBurst int) *Kubelet { return &Kubelet{ hostname: hn, dockerClient: dc, @@ -80,6 +82,8 @@ func NewMainKubelet( podWorkers: newPodWorkers(), runner: dockertools.NewDockerContainerCommandRunner(), httpClient: &http.Client{}, + pullQPS: pullQPS, + pullBurst: pullBurst, } } @@ -121,6 +125,10 @@ type Kubelet struct { runner dockertools.ContainerCommandRunner // Optional, client for http requests, defaults to empty client httpClient httpGetInterface + // Optional, maximum pull QPS from the docker registry, 0.0 means unlimited. + pullQPS float32 + // Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0 + pullBurst int } // Run starts the kubelet reacting to config updates @@ -129,7 +137,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } if kl.dockerPuller == nil { - kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient) + kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) } if kl.healthChecker == nil { kl.healthChecker = health.NewHealthChecker() @@ -404,7 +412,9 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error Image: networkContainerImage, Ports: ports, } - kl.dockerPuller.Pull(networkContainerImage) + if err := kl.dockerPuller.Pull(networkContainerImage); err != nil { + return "", err + } return kl.runContainer(pod, container, nil, "") } diff --git a/pkg/util/throttle.go b/pkg/util/throttle.go new file mode 100644 index 0000000000..0b337c5e3c --- /dev/null +++ b/pkg/util/throttle.go @@ -0,0 +1,104 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" + "time" +) + +type RateLimiter interface { + // CanAccept returns true if the rate is below the limit, false otherwise + CanAccept() bool + // Stop stops the rate limiter, subsequent calls to CanAccept will return false + Stop() +} + +type tickRateLimiter struct { + lock sync.Mutex + tokens chan bool + ticker <-chan time.Time + stop chan bool +} + +// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach. +// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a +// smoothed qps rate of 'qps'. +// The bucket is initially filled with 'burst' tokens, the rate limiter spawns a go routine +// which refills the bucket with one token at a rate of 'qps'. The maximum number of tokens in +// the bucket is capped at 'burst'. +// When done with the limiter, Stop() must be called to halt the associated goroutine. +func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { + ticker := time.Tick(time.Duration(float32(time.Second) / qps)) + rate := newTokenBucketRateLimiterFromTicker(ticker, burst) + go rate.run() + return rate +} + +func newTokenBucketRateLimiterFromTicker(ticker <-chan time.Time, burst int) *tickRateLimiter { + if burst < 1 { + panic("burst must be a positive integer") + } + rate := &tickRateLimiter{ + tokens: make(chan bool, burst), + ticker: ticker, + stop: make(chan bool), + } + for i := 0; i < burst; i++ { + rate.tokens <- true + } + return rate +} + +func (t *tickRateLimiter) CanAccept() bool { + select { + case <-t.tokens: + return true + default: + return false + } +} + +func (t *tickRateLimiter) Stop() { + close(t.stop) +} + +func (r *tickRateLimiter) run() { + for { + if !r.step() { + break + } + } +} + +func (r *tickRateLimiter) step() bool { + select { + case <-r.ticker: + r.increment() + return true + case <-r.stop: + return false + } +} + +func (t *tickRateLimiter) increment() { + // non-blocking send + select { + case t.tokens <- true: + default: + } +} diff --git a/pkg/util/throttle_test.go b/pkg/util/throttle_test.go new file mode 100644 index 0000000000..328cf232cb --- /dev/null +++ b/pkg/util/throttle_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + "time" +) + +func TestBasicThrottle(t *testing.T) { + ticker := make(chan time.Time, 1) + r := newTokenBucketRateLimiterFromTicker(ticker, 3) + for i := 0; i < 3; i++ { + if !r.CanAccept() { + t.Error("unexpected false accept") + } + } + if r.CanAccept() { + t.Error("unexpected true accept") + } +} + +func TestIncrementThrottle(t *testing.T) { + ticker := make(chan time.Time, 1) + r := newTokenBucketRateLimiterFromTicker(ticker, 1) + if !r.CanAccept() { + t.Error("unexpected false accept") + } + if r.CanAccept() { + t.Error("unexpected true accept") + } + ticker <- time.Now() + r.step() + + if !r.CanAccept() { + t.Error("unexpected false accept") + } +} + +func TestOverBurst(t *testing.T) { + ticker := make(chan time.Time, 1) + r := newTokenBucketRateLimiterFromTicker(ticker, 3) + + for i := 0; i < 4; i++ { + ticker <- time.Now() + r.step() + } +}