diff --git a/pkg/kubelet/container/image_puller.go b/pkg/kubelet/container/image_puller.go index 7138eac1cc..76b0fb812f 100644 --- a/pkg/kubelet/container/image_puller.go +++ b/pkg/kubelet/container/image_puller.go @@ -18,6 +18,7 @@ package container import ( "fmt" + "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" @@ -25,23 +26,36 @@ import ( "k8s.io/kubernetes/pkg/util" ) +type imagePullRequest struct { + spec ImageSpec + container *api.Container + pullSecrets []api.Secret + logPrefix string + ref *api.ObjectReference + returnChan chan<- error +} + // imagePuller pulls the image using Runtime.PullImage(). // It will check the presence of the image, and report the 'image pulling', // 'image pulled' events correspondingly. -type imagePuller struct { - recorder record.EventRecorder - runtime Runtime - backOff *util.Backoff +type serializedImagePuller struct { + recorder record.EventRecorder + runtime Runtime + backOff *util.Backoff + pullRequests chan *imagePullRequest } // NewImagePuller takes an event recorder and container runtime to create a // image puller that wraps the container runtime's PullImage interface. -func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { - return &imagePuller{ - recorder: recorder, - runtime: runtime, - backOff: imageBackOff, +func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { + imagePuller := &serializedImagePuller{ + recorder: recorder, + runtime: runtime, + backOff: imageBackOff, + pullRequests: make(chan *imagePullRequest, 10), } + go util.Until(imagePuller.pullImages, time.Second, util.NeverStop) + return imagePuller } // shouldPullImage returns whether we should pull an image according to @@ -60,7 +74,7 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool { } // records an event using ref, event msg. log to glog using prefix, msg, logFn -func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) { +func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) { if ref != nil { puller.recorder.Eventf(ref, event, msg) } else { @@ -69,12 +83,13 @@ func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg st } // PullImage pulls the image for the specified pod and container. -func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { +func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) ref, err := GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } + spec := ImageSpec{container.Image} present, err := puller.runtime.IsImagePresent(spec) if err != nil { @@ -101,8 +116,18 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info) return ErrImagePullBackOff, msg } - puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) - if err = puller.runtime.PullImage(spec, pullSecrets); err != nil { + + // enqueue image pull request and wait for response. + returnChan := make(chan error) + puller.pullRequests <- &imagePullRequest{ + spec: spec, + container: container, + pullSecrets: pullSecrets, + logPrefix: logPrefix, + ref: ref, + returnChan: returnChan, + } + if err = <-returnChan; err != nil { puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) if err == RegistryUnavailable { @@ -116,3 +141,10 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul puller.backOff.GC() return nil, "" } + +func (puller *serializedImagePuller) pullImages() { + for pullRequest := range puller.pullRequests { + puller.logIt(pullRequest.ref, "Pulling", pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info) + pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets) + } +} diff --git a/pkg/kubelet/container/image_puller_test.go b/pkg/kubelet/container/image_puller_test.go index ad0d46e3b4..432ba1ef05 100644 --- a/pkg/kubelet/container/image_puller_test.go +++ b/pkg/kubelet/container/image_puller_test.go @@ -103,7 +103,7 @@ func TestPuller(t *testing.T) { fakeRuntime := &FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff) + puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) fakeRuntime.ImageList = []Image{{"present_image", nil, 0}} fakeRuntime.Err = c.pullerErr diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 83a4e17919..04b999e092 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -70,6 +70,7 @@ type ImageSpec struct { // Runtime interface defines the interfaces that should be implemented // by a container runtime. +// Thread safety is required from implementations of this interface. type Runtime interface { // Version returns the version information of the container runtime. Version() (Version, error) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index e8f1bfc835..78d8b7a4e2 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -215,7 +215,7 @@ func NewDockerManager( cpuCFSQuota: cpuCFSQuota, } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) - dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff) + dm.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, dm, imageBackOff) dm.containerGC = NewContainerGC(client, containerLogsDir) return dm diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index e78473e4e3..bcc0d9655a 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -149,7 +149,7 @@ func New(config *Config, livenessManager: livenessManager, volumeGetter: volumeGetter, } - rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) + rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff) // Test the rkt version. version, err := rkt.Version()