make kubelet image pulls serialized by default.

pull/6/head
Vishnu kannan 2015-10-19 17:35:33 -07:00
parent b02b5b9f87
commit 94b45830c3
5 changed files with 49 additions and 16 deletions

View File

@ -18,6 +18,7 @@ package container
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
@ -25,23 +26,36 @@ import (
"k8s.io/kubernetes/pkg/util"
)
type imagePullRequest struct {
spec ImageSpec
container *api.Container
pullSecrets []api.Secret
logPrefix string
ref *api.ObjectReference
returnChan chan<- error
}
// imagePuller pulls the image using Runtime.PullImage().
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type imagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
type serializedImagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
pullRequests chan *imagePullRequest
}
// NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
return &imagePuller{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
imagePuller := &serializedImagePuller{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
pullRequests: make(chan *imagePullRequest, 10),
}
go util.Until(imagePuller.pullImages, time.Second, util.NeverStop)
return imagePuller
}
// shouldPullImage returns whether we should pull an image according to
@ -60,7 +74,7 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool {
}
// records an event using ref, event msg. log to glog using prefix, msg, logFn
func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) {
func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) {
if ref != nil {
puller.recorder.Eventf(ref, event, msg)
} else {
@ -69,12 +83,13 @@ func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg st
}
// PullImage pulls the image for the specified pod and container.
func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
spec := ImageSpec{container.Image}
present, err := puller.runtime.IsImagePresent(spec)
if err != nil {
@ -101,8 +116,18 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul
puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg
}
puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
if err = puller.runtime.PullImage(spec, pullSecrets); err != nil {
// enqueue image pull request and wait for response.
returnChan := make(chan error)
puller.pullRequests <- &imagePullRequest{
spec: spec,
container: container,
pullSecrets: pullSecrets,
logPrefix: logPrefix,
ref: ref,
returnChan: returnChan,
}
if err = <-returnChan; err != nil {
puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable {
@ -116,3 +141,10 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul
puller.backOff.GC()
return nil, ""
}
func (puller *serializedImagePuller) pullImages() {
for pullRequest := range puller.pullRequests {
puller.logIt(pullRequest.ref, "Pulling", pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info)
pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets)
}
}

View File

@ -103,7 +103,7 @@ func TestPuller(t *testing.T) {
fakeRuntime := &FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff)
puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, 0}}
fakeRuntime.Err = c.pullerErr

View File

@ -70,6 +70,7 @@ type ImageSpec struct {
// Runtime interface defines the interfaces that should be implemented
// by a container runtime.
// Thread safety is required from implementations of this interface.
type Runtime interface {
// Version returns the version information of the container runtime.
Version() (Version, error)

View File

@ -215,7 +215,7 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff)
dm.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, dm, imageBackOff)
dm.containerGC = NewContainerGC(client, containerLogsDir)
return dm

View File

@ -149,7 +149,7 @@ func New(config *Config,
livenessManager: livenessManager,
volumeGetter: volumeGetter,
}
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff)
rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff)
// Test the rkt version.
version, err := rkt.Version()