diff --git a/pkg/kubelet/container/fake_runtime.go b/pkg/kubelet/container/fake_runtime.go index 201ce21b60..78fb63668a 100644 --- a/pkg/kubelet/container/fake_runtime.go +++ b/pkg/kubelet/container/fake_runtime.go @@ -41,6 +41,7 @@ type FakeRuntime struct { KilledContainers []string VersionInfo string Err error + InspectErr error } // FakeRuntime should implement Runtime. @@ -94,6 +95,7 @@ func (f *FakeRuntime) ClearCalls() { f.KilledContainers = []string{} f.VersionInfo = "" f.Err = nil + f.InspectErr = nil } func (f *FakeRuntime) assertList(expect []string, test []string) error { @@ -264,10 +266,10 @@ func (f *FakeRuntime) IsImagePresent(image ImageSpec) (bool, error) { f.CalledFunctions = append(f.CalledFunctions, "IsImagePresent") for _, i := range f.ImageList { if i.ID == image.Image { - return true, f.Err + return true, nil } } - return false, f.Err + return false, f.InspectErr } func (f *FakeRuntime) ListImages() ([]Image, error) { diff --git a/pkg/kubelet/container/image_puller.go b/pkg/kubelet/container/image_puller.go index b9268a0adc..3a5e5de919 100644 --- a/pkg/kubelet/container/image_puller.go +++ b/pkg/kubelet/container/image_puller.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/util" ) // imagePuller pulls the image using Runtime.PullImage(). @@ -30,14 +31,16 @@ import ( type imagePuller struct { recorder record.EventRecorder runtime Runtime + backOff *util.Backoff } // NewImagePuller takes an event recorder and container runtime to create a // image puller that wraps the container runtime's PullImage interface. -func NewImagePuller(recorder record.EventRecorder, runtime Runtime) ImagePuller { +func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { return &imagePuller{ recorder: recorder, runtime: runtime, + backOff: imageBackOff, } } @@ -56,24 +59,18 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool { return false } -// reportImagePull reports 'image pulling', 'image pulled' or 'image pulling failed' events. -func (puller *imagePuller) reportImagePull(ref *api.ObjectReference, event string, image string, pullError error) { - if ref == nil { - return - } - - switch event { - case "pulling": - puller.recorder.Eventf(ref, "Pulling", "Pulling image %q", image) - case "pulled": - puller.recorder.Eventf(ref, "Pulled", "Successfully pulled image %q", image) - case "failed": - puller.recorder.Eventf(ref, "Failed", "Failed to pull image %q: %v", image, pullError) +// records an event using ref, event msg. log to glog using prefix, msg, logFn +func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) { + if ref != nil { + puller.recorder.Eventf(ref, event, msg) + } else { + logFn(fmt.Sprint(prefix, " ", msg)) } } // PullImage pulls the image for the specified pod and container. -func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error { +func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { + logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) ref, err := GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) @@ -81,24 +78,36 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul spec := ImageSpec{container.Image} present, err := puller.runtime.IsImagePresent(spec) if err != nil { - if ref != nil { - puller.recorder.Eventf(ref, "Failed", "Failed to inspect image %q: %v", container.Image, err) - } - return fmt.Errorf("failed to inspect image %q: %v", container.Image, err) + msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) + puller.logIt(ref, "Failed", logPrefix, msg, glog.Warning) + return ErrImageInspect, msg } if !shouldPullImage(container, present) { - if present && ref != nil { - puller.recorder.Eventf(ref, "Pulled", "Container image %q already present on machine", container.Image) + if present { + msg := fmt.Sprintf("Container image %q already present on machine", container.Image) + puller.logIt(ref, "Pulled", logPrefix, msg, glog.Info) + return nil, "" + } else { + msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) + puller.logIt(ref, "ErrImageNeverPull", logPrefix, msg, glog.Warning) + return ErrImageNeverPull, msg } - return nil } - puller.reportImagePull(ref, "pulling", container.Image, nil) - if err = puller.runtime.PullImage(spec, pullSecrets); err != nil { - puller.reportImagePull(ref, "failed", container.Image, err) - return err + backOffKey := fmt.Sprintf("%s_%s", pod.Name, container.Image) + if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { + msg := fmt.Sprintf("Back-off pulling image %q", container.Image) + puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info) + return ErrImagePullBackOff, msg } - puller.reportImagePull(ref, "pulled", container.Image, nil) - return nil + puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) + if err = puller.runtime.PullImage(spec, pullSecrets); err != nil { + puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) + puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) + return ErrImagePull, err.Error() + } + puller.logIt(ref, "Pulled", logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) + puller.backOff.GC() + return nil, "" } diff --git a/pkg/kubelet/container/image_puller_test.go b/pkg/kubelet/container/image_puller_test.go new file mode 100644 index 0000000000..ad0d46e3b4 --- /dev/null +++ b/pkg/kubelet/container/image_puller_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/util" +) + +func TestPuller(t *testing.T) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "test_pod", + Namespace: "test-ns", + UID: "bar", + ResourceVersion: "42", + SelfLink: "/api/v1/pods/foo", + }} + + cases := []struct { + containerImage string + policy api.PullPolicy + calledFunctions []string + inspectErr error + pullerErr error + expectedErr []error + }{ + { // pull missing image + containerImage: "missing_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil}}, + + { // image present, dont pull + containerImage: "present_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil, nil, nil}}, + // image present, pull it + {containerImage: "present_image", + policy: api.PullAlways, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil, nil, nil}}, + // missing image, error PullNever + {containerImage: "missing_image", + policy: api.PullNever, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}}, + // missing image, unable to inspect + {containerImage: "missing_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: errors.New("unknown inspectError"), + pullerErr: nil, + expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}}, + // missing image, unable to fetch + {containerImage: "typo_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: errors.New("404"), + expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}}, + } + + for i, c := range cases { + container := &api.Container{ + Name: "container_name", + Image: c.containerImage, + ImagePullPolicy: c.policy, + } + + backOff := util.NewBackOff(time.Second, time.Minute) + fakeClock := &util.FakeClock{Time: time.Now()} + backOff.Clock = fakeClock + + fakeRuntime := &FakeRuntime{} + fakeRecorder := &record.FakeRecorder{} + puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff) + + fakeRuntime.ImageList = []Image{{"present_image", nil, 0}} + fakeRuntime.Err = c.pullerErr + fakeRuntime.InspectErr = c.inspectErr + + for tick, expected := range c.expectedErr { + fakeClock.Step(time.Second) + err, _ := puller.PullImage(pod, container, nil) + fakeRuntime.AssertCalls(c.calledFunctions) + assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) + } + } +} diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index e26a54d856..a3b66cc89c 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -32,6 +32,22 @@ import ( // Container Terminated and Kubelet is backing off the restart var ErrCrashLoopBackOff = errors.New("CrashLoopBackOff") +var ( + // Container image pull failed, kubelet is backing off image pull + ErrImagePullBackOff = errors.New("ImagePullBackOff") + + // Unable to inspect image + ErrImageInspect = errors.New("ImageInspectError") + + // General image pull error + ErrImagePull = errors.New("ErrImagePull") + + // Required Image is absent on host and PullPolicy is NeverPullImage + ErrImageNeverPull = errors.New("ErrImageNeverPull") +) + +var ErrRunContainer = errors.New("RunContainerError") + type Version interface { // Compare compares two versions of the runtime. On success it returns -1 // if the version is less than the other, 1 if it is greater than the other, @@ -109,7 +125,7 @@ type ContainerCommandRunner interface { // It will check the presence of the image, and report the 'image pulling', // 'image pulled' events correspondingly. type ImagePuller interface { - PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error + PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) } // Pod is a group of containers, with the status of the pod. diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 6674c11b64..b9ea704414 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -669,7 +669,8 @@ func TestFindContainersByPod(t *testing.T) { } fakeClient := &FakeDockerClient{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) + // image back-off is set to nil, this test shouldnt pull images + containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil) for i, test := range tests { fakeClient.ContainerList = test.containerList fakeClient.ExitedContainerList = test.exitedContainerList diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 60fac0a69b..6786e55b8d 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/prober" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" ) @@ -40,13 +41,13 @@ func NewFakeDockerManager( osInterface kubecontainer.OSInterface, networkPlugin network.NetworkPlugin, generator kubecontainer.RunContainerOptionsGenerator, - httpClient kubeletTypes.HttpGetter) *DockerManager { + httpClient kubeletTypes.HttpGetter, imageBackOff *util.Backoff) *DockerManager { fakeOOMAdjuster := oom.NewFakeOOMAdjuster() fakeProcFs := procfs.NewFakeProcFs() dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, - fakeOOMAdjuster, fakeProcFs, false) + fakeOOMAdjuster, fakeProcFs, false, imageBackOff) dm.dockerPuller = &FakeDockerPuller{} return dm } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index d71d101fbc..22c9a379b1 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -158,7 +158,9 @@ func NewDockerManager( execHandler ExecHandler, oomAdjuster *oom.OOMAdjuster, procFs procfs.ProcFsInterface, - cpuCFSQuota bool) *DockerManager { + cpuCFSQuota bool, + imageBackOff *util.Backoff) *DockerManager { + // Work out the location of the Docker runtime, defaulting to /var/lib/docker // if there are any problems. dockerRoot := "/var/lib/docker" @@ -211,7 +213,7 @@ func NewDockerManager( cpuCFSQuota: cpuCFSQuota, } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) - dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm) + dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff) return dm } @@ -509,9 +511,12 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { } } containerStatus.LastTerminationState = containerStatus.State - containerStatus.State.Waiting = &api.ContainerStateWaiting{Reason: reasonInfo.reason, - Message: reasonInfo.message} - containerStatus.State.Running = nil + containerStatus.State = api.ContainerState{ + Waiting: &api.ContainerStateWaiting{ + Reason: reasonInfo.reason, + Message: reasonInfo.message, + }, + } } continue } @@ -524,20 +529,27 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { containerStatus.RestartCount = oldStatus.RestartCount containerStatus.LastTerminationState = oldStatus.LastTerminationState } - //Check image is ready on the node or not. - image := container.Image // TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists - _, err := dm.client.InspectImage(image) - if err == nil { - containerStatus.State.Waiting = &api.ContainerStateWaiting{ - Message: fmt.Sprintf("Image: %s is ready, container is creating", image), - Reason: "ContainerCreating", - } - } else if err == docker.ErrNoSuchImage { - containerStatus.State.Waiting = &api.ContainerStateWaiting{ - Message: fmt.Sprintf("Image: %s is not ready on the node", image), - Reason: "ImageNotReady", + reasonInfo, ok := dm.reasonCache.Get(uid, container.Name) + if !ok { + // default position for a container + // At this point there are no active or dead containers, the reasonCache is empty (no entry or the entry has expired) + // its reasonable to say the container is being created till a more accurate reason is logged + containerStatus.State = api.ContainerState{ + Waiting: &api.ContainerStateWaiting{ + Reason: fmt.Sprintf("ContainerCreating"), + Message: fmt.Sprintf("Image: %s is ready, container is creating", container.Image), + }, } + } else if reasonInfo.reason == kubecontainer.ErrImagePullBackOff.Error() || + reasonInfo.reason == kubecontainer.ErrImageInspect.Error() || + reasonInfo.reason == kubecontainer.ErrImagePull.Error() || + reasonInfo.reason == kubecontainer.ErrImageNeverPull.Error() { + // mark it as waiting, reason will be filled bellow + containerStatus.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}} + } else if reasonInfo.reason == kubecontainer.ErrRunContainer.Error() { + // mark it as waiting, reason will be filled bellow + containerStatus.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}} } statuses[container.Name] = &containerStatus } @@ -545,6 +557,7 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { podStatus.ContainerStatuses = make([]api.ContainerStatus, 0) for containerName, status := range statuses { if status.State.Waiting != nil { + status.State.Running = nil // For containers in the waiting state, fill in a specific reason if it is recorded. if reasonInfo, ok := dm.reasonCache.Get(uid, containerName); ok { status.State.Waiting.Reason = reasonInfo.reason @@ -1603,7 +1616,8 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.Doc } // No pod secrets for the infra container. - if err := dm.imagePuller.PullImage(pod, container, nil); err != nil { + // The message isnt needed for the Infra container + if err, _ := dm.imagePuller.PullImage(pod, container, nil); err != nil { return "", err } @@ -1845,10 +1859,9 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod continue } glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName) - err := dm.imagePuller.PullImage(pod, container, pullSecrets) - dm.updateReasonCache(pod, container, "PullImageError", err) + err, msg := dm.imagePuller.PullImage(pod, container, pullSecrets) if err != nil { - glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", container.Image, kubecontainer.GetPodFullName(pod), container.Name, err) + dm.updateReasonCache(pod, container, err.Error(), errors.New(msg)) continue } @@ -1868,7 +1881,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod // See createPodInfraContainer for infra container setup. namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID) _, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, getPidMode(pod)) - dm.updateReasonCache(pod, container, "RunContainerError", err) + dm.updateReasonCache(pod, container, kubecontainer.ErrRunContainer.Error(), err) if err != nil { // TODO(bburns) : Perhaps blacklist a container after N failures? glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err) diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 11a3b97419..f3bb4c3cad 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -31,8 +31,10 @@ import ( docker "github.com/fsouza/go-dockerclient" cadvisorApi "github.com/google/cadvisor/info/v1" + "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" @@ -89,7 +91,8 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage kubecontainer.FakeOS{}, networkPlugin, optionGenerator, - fakeHTTPClient) + fakeHTTPClient, + util.NewBackOff(time.Second, 300*time.Second)) return dockerManager, fakeDocker } @@ -976,60 +979,48 @@ func TestSyncPodWithPullPolicy(t *testing.T) { Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar", Image: "pull_always_image", ImagePullPolicy: api.PullAlways}, - {Name: "bar1", Image: "pull_never_image", ImagePullPolicy: api.PullNever}, {Name: "bar2", Image: "pull_if_not_present_image", ImagePullPolicy: api.PullIfNotPresent}, {Name: "bar3", Image: "existing_one", ImagePullPolicy: api.PullIfNotPresent}, {Name: "bar4", Image: "want:latest", ImagePullPolicy: api.PullIfNotPresent}, + {Name: "bar5", Image: "pull_never_image", ImagePullPolicy: api.PullNever}, }, }, } - runSyncPod(t, dm, fakeDocker, pod, nil) - - fakeDocker.Lock() - - eventSet := []string{ - `Pulling Pulling image "pod_infra_image"`, - `Pulled Successfully pulled image "pod_infra_image"`, - `Pulling Pulling image "pull_always_image"`, - `Pulled Successfully pulled image "pull_always_image"`, - `Pulling Pulling image "pull_if_not_present_image"`, - `Pulled Successfully pulled image "pull_if_not_present_image"`, - `Pulled Container image "existing_one" already present on machine`, - `Pulled Container image "want:latest" already present on machine`, + expectedStatusMap := map[string]api.ContainerState{ + "bar": {Running: &api.ContainerStateRunning{unversioned.Now()}}, + "bar2": {Running: &api.ContainerStateRunning{unversioned.Now()}}, + "bar3": {Running: &api.ContainerStateRunning{unversioned.Now()}}, + "bar4": {Running: &api.ContainerStateRunning{unversioned.Now()}}, + "bar5": {Waiting: &api.ContainerStateWaiting{Reason: kubecontainer.ErrImageNeverPull.Error(), + Message: "Container image \"pull_never_image\" is not present with pull policy of Never"}}, } - recorder := dm.recorder.(*record.FakeRecorder) - - var actualEvents []string - for _, ev := range recorder.Events { - if strings.HasPrefix(ev, "Pull") { - actualEvents = append(actualEvents, ev) + runSyncPod(t, dm, fakeDocker, pod, nil) + statuses, err := dm.GetPodStatus(pod) + if err != nil { + t.Errorf("unable to get pod status") + } + for _, c := range pod.Spec.Containers { + if containerStatus, ok := api.GetContainerStatus(statuses.ContainerStatuses, c.Name); ok { + // copy the StartedAt time, to make the structs match + if containerStatus.State.Running != nil && expectedStatusMap[c.Name].Running != nil { + expectedStatusMap[c.Name].Running.StartedAt = containerStatus.State.Running.StartedAt + } + assert.Equal(t, containerStatus.State, expectedStatusMap[c.Name], "for container %s", c.Name) } } - sort.StringSlice(actualEvents).Sort() - sort.StringSlice(eventSet).Sort() - if !reflect.DeepEqual(actualEvents, eventSet) { - t.Errorf("Expected: %#v, Actual: %#v", eventSet, actualEvents) - } - pulledImageSet := make(map[string]empty) - for v := range puller.ImagesPulled { - pulledImageSet[puller.ImagesPulled[v]] = empty{} - } + fakeDocker.Lock() + defer fakeDocker.Unlock() - if !reflect.DeepEqual(pulledImageSet, map[string]empty{ - "pod_infra_image": {}, - "pull_always_image": {}, - "pull_if_not_present_image": {}, - }) { - t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled) - } + pulledImageSorted := puller.ImagesPulled[:] + sort.Strings(pulledImageSorted) + assert.Equal(t, []string{"pod_infra_image", "pull_always_image", "pull_if_not_present_image"}, pulledImageSorted) - if len(fakeDocker.Created) != 6 { + if len(fakeDocker.Created) != 5 { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } - fakeDocker.Unlock() } func TestSyncPodWithRestartPolicy(t *testing.T) { @@ -1474,7 +1465,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) { puller := dm.dockerPuller.(*FakeDockerPuller) puller.HasImages = []string{} // Inject the pull image failure error. - failureReason := "PullImageError" + failureReason := kubecontainer.ErrImagePull.Error() puller.ErrorsToInject = []error{fmt.Errorf("%s", failureReason)} pod := &api.Pod{ diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index da8a450732..8fd7052cce 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -309,7 +309,7 @@ func NewMainKubelet( } procFs := procfs.NewProcFs() - + imageBackOff := util.NewBackOff(resyncInterval, maxContainerBackOff) // Initialize the runtime. switch containerRuntime { case "docker": @@ -331,7 +331,9 @@ func NewMainKubelet( dockerExecHandler, oomAdjuster, procFs, - klet.cpuCFSQuota) + klet.cpuCFSQuota, + imageBackOff) + case "rkt": conf := &rkt.Config{ Path: rktPath, @@ -344,7 +346,8 @@ func NewMainKubelet( recorder, containerRefManager, klet, // prober - klet.volumeManager) + klet.volumeManager, + imageBackOff) if err != nil { return nil, err } diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index e973309e4b..bfc743f1fb 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -157,6 +157,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker kubecontainer.FakeOS{}, networkPlugin, nil, + nil, nil) return dockerManager, fakeDocker diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 8b2a3e5e09..8ae565ec41 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -45,7 +45,7 @@ func newPod(uid, name string) *api.Pod { func createFakeRuntimeCache(fakeRecorder *record.FakeRecorder) kubecontainer.RuntimeCache { fakeDocker := &dockertools.FakeDockerClient{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) + dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil) return kubecontainer.NewFakeRuntimeCache(dockerManager) } @@ -193,7 +193,7 @@ func TestFakePodWorkers(t *testing.T) { fakeDocker := &dockertools.FakeDockerClient{} fakeRecorder := &record.FakeRecorder{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) + dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil) fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) kubeletForRealWorkers := &simpleFakeKubelet{} diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 80614684e7..92ffe1571d 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -113,7 +113,7 @@ func New(config *Config, recorder record.EventRecorder, containerRefManager *kubecontainer.RefManager, prober prober.Prober, - volumeGetter volumeGetter) (*Runtime, error) { + volumeGetter volumeGetter, imageBackOff *util.Backoff) (*Runtime, error) { systemdVersion, err := getSystemdVersion() if err != nil { @@ -153,7 +153,7 @@ func New(config *Config, prober: prober, volumeGetter: volumeGetter, } - rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt) + rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) // Test the rkt version. version, err := rkt.Version() @@ -418,7 +418,7 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc manifest := appcschema.BlankPodManifest() for _, c := range pod.Spec.Containers { - if err := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil { + if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil { return nil, err } imgManifest, err := r.getImageManifest(c.Image) diff --git a/pkg/util/backoff.go b/pkg/util/backoff.go index d7936071af..173e131872 100644 --- a/pkg/util/backoff.go +++ b/pkg/util/backoff.go @@ -84,6 +84,20 @@ func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool { return p.Clock.Now().Sub(eventTime) < entry.backoff } +// Returns True if time since lastupdate is less than the current backoff window. +func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool { + p.Lock() + defer p.Unlock() + entry, ok := p.perItemBackoff[id] + if !ok { + return false + } + if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { + return false + } + return eventTime.Sub(entry.lastUpdate) < entry.backoff +} + // Garbage collect records that have aged past maxDuration. Backoff users are expected // to invoke this periodically. func (p *Backoff) GC() { diff --git a/pkg/util/backoff_test.go b/pkg/util/backoff_test.go index 0a49f9eee7..f2bc977157 100644 --- a/pkg/util/backoff_test.go +++ b/pkg/util/backoff_test.go @@ -123,3 +123,72 @@ func TestBackoffGC(t *testing.T) { t.Errorf("expected GC of entry after %s got entry %v", tc.Now().Sub(lastUpdate), r) } } + +func TestIsInBackOffSinceUpdate(t *testing.T) { + id := "_idIsInBackOffSinceUpdate" + tc := &FakeClock{Time: time.Now()} + step := time.Second + maxDuration := 10 * step + b := NewFakeBackOff(step, maxDuration, tc) + startTime := tc.Now() + + cases := []struct { + tick time.Duration + inBackOff bool + value int + }{ + {tick: 0, inBackOff: false, value: 0}, + {tick: 1, inBackOff: false, value: 1}, + {tick: 2, inBackOff: true, value: 2}, + {tick: 3, inBackOff: false, value: 2}, + {tick: 4, inBackOff: true, value: 4}, + {tick: 5, inBackOff: true, value: 4}, + {tick: 6, inBackOff: true, value: 4}, + {tick: 7, inBackOff: false, value: 4}, + {tick: 8, inBackOff: true, value: 8}, + {tick: 9, inBackOff: true, value: 8}, + {tick: 10, inBackOff: true, value: 8}, + {tick: 11, inBackOff: true, value: 8}, + {tick: 12, inBackOff: true, value: 8}, + {tick: 13, inBackOff: true, value: 8}, + {tick: 14, inBackOff: true, value: 8}, + {tick: 15, inBackOff: false, value: 8}, + {tick: 16, inBackOff: true, value: 10}, + {tick: 17, inBackOff: true, value: 10}, + {tick: 18, inBackOff: true, value: 10}, + {tick: 19, inBackOff: true, value: 10}, + {tick: 20, inBackOff: true, value: 10}, + {tick: 21, inBackOff: true, value: 10}, + {tick: 22, inBackOff: true, value: 10}, + {tick: 23, inBackOff: true, value: 10}, + {tick: 24, inBackOff: true, value: 10}, + {tick: 25, inBackOff: false, value: 10}, + {tick: 26, inBackOff: true, value: 10}, + {tick: 27, inBackOff: true, value: 10}, + {tick: 28, inBackOff: true, value: 10}, + {tick: 29, inBackOff: true, value: 10}, + {tick: 30, inBackOff: true, value: 10}, + {tick: 31, inBackOff: true, value: 10}, + {tick: 32, inBackOff: true, value: 10}, + {tick: 33, inBackOff: true, value: 10}, + {tick: 34, inBackOff: true, value: 10}, + {tick: 35, inBackOff: false, value: 10}, + {tick: 56, inBackOff: false, value: 0}, + {tick: 57, inBackOff: false, value: 1}, + } + + for _, c := range cases { + tc.Time = startTime.Add(c.tick * step) + if c.inBackOff != b.IsInBackOffSinceUpdate(id, tc.Now()) { + t.Errorf("expected IsInBackOffSinceUpdate %v got %v at tick %s", c.inBackOff, b.IsInBackOffSinceUpdate(id, tc.Now()), c.tick*step) + } + + if c.inBackOff && (time.Duration(c.value)*step != b.Get(id)) { + t.Errorf("expected backoff value=%s got %s at tick %s", time.Duration(c.value)*step, b.Get(id), c.tick*step) + } + + if !c.inBackOff { + b.Next(id, tc.Now()) + } + } +}