back-off image pull on failure

Signed-off-by: Sam Abed <samabed@gmail.com>
pull/6/head
Sam Abed 2015-10-02 23:45:46 +10:00
parent 887aeaa101
commit fdf712cd84
14 changed files with 343 additions and 104 deletions

View File

@ -41,6 +41,7 @@ type FakeRuntime struct {
KilledContainers []string
VersionInfo string
Err error
InspectErr error
}
// FakeRuntime should implement Runtime.
@ -94,6 +95,7 @@ func (f *FakeRuntime) ClearCalls() {
f.KilledContainers = []string{}
f.VersionInfo = ""
f.Err = nil
f.InspectErr = nil
}
func (f *FakeRuntime) assertList(expect []string, test []string) error {
@ -264,10 +266,10 @@ func (f *FakeRuntime) IsImagePresent(image ImageSpec) (bool, error) {
f.CalledFunctions = append(f.CalledFunctions, "IsImagePresent")
for _, i := range f.ImageList {
if i.ID == image.Image {
return true, f.Err
return true, nil
}
}
return false, f.Err
return false, f.InspectErr
}
func (f *FakeRuntime) ListImages() ([]Image, error) {

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
)
// imagePuller pulls the image using Runtime.PullImage().
@ -30,14 +31,16 @@ import (
type imagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
}
// NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime) ImagePuller {
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
return &imagePuller{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
}
}
@ -56,24 +59,18 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool {
return false
}
// reportImagePull reports 'image pulling', 'image pulled' or 'image pulling failed' events.
func (puller *imagePuller) reportImagePull(ref *api.ObjectReference, event string, image string, pullError error) {
if ref == nil {
return
}
switch event {
case "pulling":
puller.recorder.Eventf(ref, "Pulling", "Pulling image %q", image)
case "pulled":
puller.recorder.Eventf(ref, "Pulled", "Successfully pulled image %q", image)
case "failed":
puller.recorder.Eventf(ref, "Failed", "Failed to pull image %q: %v", image, pullError)
// records an event using ref, event msg. log to glog using prefix, msg, logFn
func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) {
if ref != nil {
puller.recorder.Eventf(ref, event, msg)
} else {
logFn(fmt.Sprint(prefix, " ", msg))
}
}
// PullImage pulls the image for the specified pod and container.
func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error {
func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
@ -81,24 +78,36 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul
spec := ImageSpec{container.Image}
present, err := puller.runtime.IsImagePresent(spec)
if err != nil {
if ref != nil {
puller.recorder.Eventf(ref, "Failed", "Failed to inspect image %q: %v", container.Image, err)
}
return fmt.Errorf("failed to inspect image %q: %v", container.Image, err)
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
puller.logIt(ref, "Failed", logPrefix, msg, glog.Warning)
return ErrImageInspect, msg
}
if !shouldPullImage(container, present) {
if present && ref != nil {
puller.recorder.Eventf(ref, "Pulled", "Container image %q already present on machine", container.Image)
if present {
msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
puller.logIt(ref, "Pulled", logPrefix, msg, glog.Info)
return nil, ""
} else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
puller.logIt(ref, "ErrImageNeverPull", logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg
}
return nil
}
puller.reportImagePull(ref, "pulling", container.Image, nil)
if err = puller.runtime.PullImage(spec, pullSecrets); err != nil {
puller.reportImagePull(ref, "failed", container.Image, err)
return err
backOffKey := fmt.Sprintf("%s_%s", pod.Name, container.Image)
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg
}
puller.reportImagePull(ref, "pulled", container.Image, nil)
return nil
puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
if err = puller.runtime.PullImage(spec, pullSecrets); err != nil {
puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
return ErrImagePull, err.Error()
}
puller.logIt(ref, "Pulled", logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)
puller.backOff.GC()
return nil, ""
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
)
func TestPuller(t *testing.T) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test_pod",
Namespace: "test-ns",
UID: "bar",
ResourceVersion: "42",
SelfLink: "/api/v1/pods/foo",
}}
cases := []struct {
containerImage string
policy api.PullPolicy
calledFunctions []string
inspectErr error
pullerErr error
expectedErr []error
}{
{ // pull missing image
containerImage: "missing_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil}},
{ // image present, dont pull
containerImage: "present_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil, nil, nil}},
// image present, pull it
{containerImage: "present_image",
policy: api.PullAlways,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil, nil, nil}},
// missing image, error PullNever
{containerImage: "missing_image",
policy: api.PullNever,
calledFunctions: []string{"IsImagePresent"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}},
// missing image, unable to inspect
{containerImage: "missing_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent"},
inspectErr: errors.New("unknown inspectError"),
pullerErr: nil,
expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}},
// missing image, unable to fetch
{containerImage: "typo_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: errors.New("404"),
expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}},
}
for i, c := range cases {
container := &api.Container{
Name: "container_name",
Image: c.containerImage,
ImagePullPolicy: c.policy,
}
backOff := util.NewBackOff(time.Second, time.Minute)
fakeClock := &util.FakeClock{Time: time.Now()}
backOff.Clock = fakeClock
fakeRuntime := &FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, 0}}
fakeRuntime.Err = c.pullerErr
fakeRuntime.InspectErr = c.inspectErr
for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second)
err, _ := puller.PullImage(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
}
}
}

View File

@ -32,6 +32,22 @@ import (
// Container Terminated and Kubelet is backing off the restart
var ErrCrashLoopBackOff = errors.New("CrashLoopBackOff")
var (
// Container image pull failed, kubelet is backing off image pull
ErrImagePullBackOff = errors.New("ImagePullBackOff")
// Unable to inspect image
ErrImageInspect = errors.New("ImageInspectError")
// General image pull error
ErrImagePull = errors.New("ErrImagePull")
// Required Image is absent on host and PullPolicy is NeverPullImage
ErrImageNeverPull = errors.New("ErrImageNeverPull")
)
var ErrRunContainer = errors.New("RunContainerError")
type Version interface {
// Compare compares two versions of the runtime. On success it returns -1
// if the version is less than the other, 1 if it is greater than the other,
@ -109,7 +125,7 @@ type ContainerCommandRunner interface {
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type ImagePuller interface {
PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error
PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string)
}
// Pod is a group of containers, with the status of the pod.

View File

@ -669,7 +669,8 @@ func TestFindContainersByPod(t *testing.T) {
}
fakeClient := &FakeDockerClient{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil)
// image back-off is set to nil, this test shouldnt pull images
containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil)
for i, test := range tests {
fakeClient.ContainerList = test.containerList
fakeClient.ExitedContainerList = test.exitedContainerList

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
)
@ -40,13 +41,13 @@ func NewFakeDockerManager(
osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin,
generator kubecontainer.RunContainerOptionsGenerator,
httpClient kubeletTypes.HttpGetter) *DockerManager {
httpClient kubeletTypes.HttpGetter, imageBackOff *util.Backoff) *DockerManager {
fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false)
fakeOOMAdjuster, fakeProcFs, false, imageBackOff)
dm.dockerPuller = &FakeDockerPuller{}
return dm
}

View File

@ -158,7 +158,9 @@ func NewDockerManager(
execHandler ExecHandler,
oomAdjuster *oom.OOMAdjuster,
procFs procfs.ProcFsInterface,
cpuCFSQuota bool) *DockerManager {
cpuCFSQuota bool,
imageBackOff *util.Backoff) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
dockerRoot := "/var/lib/docker"
@ -211,7 +213,7 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff)
return dm
}
@ -509,9 +511,12 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
}
}
containerStatus.LastTerminationState = containerStatus.State
containerStatus.State.Waiting = &api.ContainerStateWaiting{Reason: reasonInfo.reason,
Message: reasonInfo.message}
containerStatus.State.Running = nil
containerStatus.State = api.ContainerState{
Waiting: &api.ContainerStateWaiting{
Reason: reasonInfo.reason,
Message: reasonInfo.message,
},
}
}
continue
}
@ -524,20 +529,27 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
containerStatus.RestartCount = oldStatus.RestartCount
containerStatus.LastTerminationState = oldStatus.LastTerminationState
}
//Check image is ready on the node or not.
image := container.Image
// TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists
_, err := dm.client.InspectImage(image)
if err == nil {
containerStatus.State.Waiting = &api.ContainerStateWaiting{
Message: fmt.Sprintf("Image: %s is ready, container is creating", image),
Reason: "ContainerCreating",
}
} else if err == docker.ErrNoSuchImage {
containerStatus.State.Waiting = &api.ContainerStateWaiting{
Message: fmt.Sprintf("Image: %s is not ready on the node", image),
Reason: "ImageNotReady",
reasonInfo, ok := dm.reasonCache.Get(uid, container.Name)
if !ok {
// default position for a container
// At this point there are no active or dead containers, the reasonCache is empty (no entry or the entry has expired)
// its reasonable to say the container is being created till a more accurate reason is logged
containerStatus.State = api.ContainerState{
Waiting: &api.ContainerStateWaiting{
Reason: fmt.Sprintf("ContainerCreating"),
Message: fmt.Sprintf("Image: %s is ready, container is creating", container.Image),
},
}
} else if reasonInfo.reason == kubecontainer.ErrImagePullBackOff.Error() ||
reasonInfo.reason == kubecontainer.ErrImageInspect.Error() ||
reasonInfo.reason == kubecontainer.ErrImagePull.Error() ||
reasonInfo.reason == kubecontainer.ErrImageNeverPull.Error() {
// mark it as waiting, reason will be filled bellow
containerStatus.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}}
} else if reasonInfo.reason == kubecontainer.ErrRunContainer.Error() {
// mark it as waiting, reason will be filled bellow
containerStatus.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}}
}
statuses[container.Name] = &containerStatus
}
@ -545,6 +557,7 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
podStatus.ContainerStatuses = make([]api.ContainerStatus, 0)
for containerName, status := range statuses {
if status.State.Waiting != nil {
status.State.Running = nil
// For containers in the waiting state, fill in a specific reason if it is recorded.
if reasonInfo, ok := dm.reasonCache.Get(uid, containerName); ok {
status.State.Waiting.Reason = reasonInfo.reason
@ -1603,7 +1616,8 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.Doc
}
// No pod secrets for the infra container.
if err := dm.imagePuller.PullImage(pod, container, nil); err != nil {
// The message isnt needed for the Infra container
if err, _ := dm.imagePuller.PullImage(pod, container, nil); err != nil {
return "", err
}
@ -1845,10 +1859,9 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
continue
}
glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName)
err := dm.imagePuller.PullImage(pod, container, pullSecrets)
dm.updateReasonCache(pod, container, "PullImageError", err)
err, msg := dm.imagePuller.PullImage(pod, container, pullSecrets)
if err != nil {
glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", container.Image, kubecontainer.GetPodFullName(pod), container.Name, err)
dm.updateReasonCache(pod, container, err.Error(), errors.New(msg))
continue
}
@ -1868,7 +1881,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
// See createPodInfraContainer for infra container setup.
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
_, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, getPidMode(pod))
dm.updateReasonCache(pod, container, "RunContainerError", err)
dm.updateReasonCache(pod, container, kubecontainer.ErrRunContainer.Error(), err)
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err)

View File

@ -31,8 +31,10 @@ import (
docker "github.com/fsouza/go-dockerclient"
cadvisorApi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
@ -89,7 +91,8 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage
kubecontainer.FakeOS{},
networkPlugin,
optionGenerator,
fakeHTTPClient)
fakeHTTPClient,
util.NewBackOff(time.Second, 300*time.Second))
return dockerManager, fakeDocker
}
@ -976,60 +979,48 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "bar", Image: "pull_always_image", ImagePullPolicy: api.PullAlways},
{Name: "bar1", Image: "pull_never_image", ImagePullPolicy: api.PullNever},
{Name: "bar2", Image: "pull_if_not_present_image", ImagePullPolicy: api.PullIfNotPresent},
{Name: "bar3", Image: "existing_one", ImagePullPolicy: api.PullIfNotPresent},
{Name: "bar4", Image: "want:latest", ImagePullPolicy: api.PullIfNotPresent},
{Name: "bar5", Image: "pull_never_image", ImagePullPolicy: api.PullNever},
},
},
}
runSyncPod(t, dm, fakeDocker, pod, nil)
fakeDocker.Lock()
eventSet := []string{
`Pulling Pulling image "pod_infra_image"`,
`Pulled Successfully pulled image "pod_infra_image"`,
`Pulling Pulling image "pull_always_image"`,
`Pulled Successfully pulled image "pull_always_image"`,
`Pulling Pulling image "pull_if_not_present_image"`,
`Pulled Successfully pulled image "pull_if_not_present_image"`,
`Pulled Container image "existing_one" already present on machine`,
`Pulled Container image "want:latest" already present on machine`,
expectedStatusMap := map[string]api.ContainerState{
"bar": {Running: &api.ContainerStateRunning{unversioned.Now()}},
"bar2": {Running: &api.ContainerStateRunning{unversioned.Now()}},
"bar3": {Running: &api.ContainerStateRunning{unversioned.Now()}},
"bar4": {Running: &api.ContainerStateRunning{unversioned.Now()}},
"bar5": {Waiting: &api.ContainerStateWaiting{Reason: kubecontainer.ErrImageNeverPull.Error(),
Message: "Container image \"pull_never_image\" is not present with pull policy of Never"}},
}
recorder := dm.recorder.(*record.FakeRecorder)
var actualEvents []string
for _, ev := range recorder.Events {
if strings.HasPrefix(ev, "Pull") {
actualEvents = append(actualEvents, ev)
runSyncPod(t, dm, fakeDocker, pod, nil)
statuses, err := dm.GetPodStatus(pod)
if err != nil {
t.Errorf("unable to get pod status")
}
for _, c := range pod.Spec.Containers {
if containerStatus, ok := api.GetContainerStatus(statuses.ContainerStatuses, c.Name); ok {
// copy the StartedAt time, to make the structs match
if containerStatus.State.Running != nil && expectedStatusMap[c.Name].Running != nil {
expectedStatusMap[c.Name].Running.StartedAt = containerStatus.State.Running.StartedAt
}
assert.Equal(t, containerStatus.State, expectedStatusMap[c.Name], "for container %s", c.Name)
}
}
sort.StringSlice(actualEvents).Sort()
sort.StringSlice(eventSet).Sort()
if !reflect.DeepEqual(actualEvents, eventSet) {
t.Errorf("Expected: %#v, Actual: %#v", eventSet, actualEvents)
}
pulledImageSet := make(map[string]empty)
for v := range puller.ImagesPulled {
pulledImageSet[puller.ImagesPulled[v]] = empty{}
}
fakeDocker.Lock()
defer fakeDocker.Unlock()
if !reflect.DeepEqual(pulledImageSet, map[string]empty{
"pod_infra_image": {},
"pull_always_image": {},
"pull_if_not_present_image": {},
}) {
t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled)
}
pulledImageSorted := puller.ImagesPulled[:]
sort.Strings(pulledImageSorted)
assert.Equal(t, []string{"pod_infra_image", "pull_always_image", "pull_if_not_present_image"}, pulledImageSorted)
if len(fakeDocker.Created) != 6 {
if len(fakeDocker.Created) != 5 {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.Unlock()
}
func TestSyncPodWithRestartPolicy(t *testing.T) {
@ -1474,7 +1465,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
puller := dm.dockerPuller.(*FakeDockerPuller)
puller.HasImages = []string{}
// Inject the pull image failure error.
failureReason := "PullImageError"
failureReason := kubecontainer.ErrImagePull.Error()
puller.ErrorsToInject = []error{fmt.Errorf("%s", failureReason)}
pod := &api.Pod{

View File

@ -309,7 +309,7 @@ func NewMainKubelet(
}
procFs := procfs.NewProcFs()
imageBackOff := util.NewBackOff(resyncInterval, maxContainerBackOff)
// Initialize the runtime.
switch containerRuntime {
case "docker":
@ -331,7 +331,9 @@ func NewMainKubelet(
dockerExecHandler,
oomAdjuster,
procFs,
klet.cpuCFSQuota)
klet.cpuCFSQuota,
imageBackOff)
case "rkt":
conf := &rkt.Config{
Path: rktPath,
@ -344,7 +346,8 @@ func NewMainKubelet(
recorder,
containerRefManager,
klet, // prober
klet.volumeManager)
klet.volumeManager,
imageBackOff)
if err != nil {
return nil, err
}

View File

@ -157,6 +157,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker
kubecontainer.FakeOS{},
networkPlugin,
nil,
nil,
nil)
return dockerManager, fakeDocker

View File

@ -45,7 +45,7 @@ func newPod(uid, name string) *api.Pod {
func createFakeRuntimeCache(fakeRecorder *record.FakeRecorder) kubecontainer.RuntimeCache {
fakeDocker := &dockertools.FakeDockerClient{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil)
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil)
return kubecontainer.NewFakeRuntimeCache(dockerManager)
}
@ -193,7 +193,7 @@ func TestFakePodWorkers(t *testing.T) {
fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil)
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil)
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
kubeletForRealWorkers := &simpleFakeKubelet{}

View File

@ -113,7 +113,7 @@ func New(config *Config,
recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager,
prober prober.Prober,
volumeGetter volumeGetter) (*Runtime, error) {
volumeGetter volumeGetter, imageBackOff *util.Backoff) (*Runtime, error) {
systemdVersion, err := getSystemdVersion()
if err != nil {
@ -153,7 +153,7 @@ func New(config *Config,
prober: prober,
volumeGetter: volumeGetter,
}
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt)
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff)
// Test the rkt version.
version, err := rkt.Version()
@ -418,7 +418,7 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
manifest := appcschema.BlankPodManifest()
for _, c := range pod.Spec.Containers {
if err := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, err
}
imgManifest, err := r.getImageManifest(c.Image)

View File

@ -84,6 +84,20 @@ func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
return p.Clock.Now().Sub(eventTime) < entry.backoff
}
// Returns True if time since lastupdate is less than the current backoff window.
func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok {
return false
}
if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
return false
}
return eventTime.Sub(entry.lastUpdate) < entry.backoff
}
// Garbage collect records that have aged past maxDuration. Backoff users are expected
// to invoke this periodically.
func (p *Backoff) GC() {

View File

@ -123,3 +123,72 @@ func TestBackoffGC(t *testing.T) {
t.Errorf("expected GC of entry after %s got entry %v", tc.Now().Sub(lastUpdate), r)
}
}
func TestIsInBackOffSinceUpdate(t *testing.T) {
id := "_idIsInBackOffSinceUpdate"
tc := &FakeClock{Time: time.Now()}
step := time.Second
maxDuration := 10 * step
b := NewFakeBackOff(step, maxDuration, tc)
startTime := tc.Now()
cases := []struct {
tick time.Duration
inBackOff bool
value int
}{
{tick: 0, inBackOff: false, value: 0},
{tick: 1, inBackOff: false, value: 1},
{tick: 2, inBackOff: true, value: 2},
{tick: 3, inBackOff: false, value: 2},
{tick: 4, inBackOff: true, value: 4},
{tick: 5, inBackOff: true, value: 4},
{tick: 6, inBackOff: true, value: 4},
{tick: 7, inBackOff: false, value: 4},
{tick: 8, inBackOff: true, value: 8},
{tick: 9, inBackOff: true, value: 8},
{tick: 10, inBackOff: true, value: 8},
{tick: 11, inBackOff: true, value: 8},
{tick: 12, inBackOff: true, value: 8},
{tick: 13, inBackOff: true, value: 8},
{tick: 14, inBackOff: true, value: 8},
{tick: 15, inBackOff: false, value: 8},
{tick: 16, inBackOff: true, value: 10},
{tick: 17, inBackOff: true, value: 10},
{tick: 18, inBackOff: true, value: 10},
{tick: 19, inBackOff: true, value: 10},
{tick: 20, inBackOff: true, value: 10},
{tick: 21, inBackOff: true, value: 10},
{tick: 22, inBackOff: true, value: 10},
{tick: 23, inBackOff: true, value: 10},
{tick: 24, inBackOff: true, value: 10},
{tick: 25, inBackOff: false, value: 10},
{tick: 26, inBackOff: true, value: 10},
{tick: 27, inBackOff: true, value: 10},
{tick: 28, inBackOff: true, value: 10},
{tick: 29, inBackOff: true, value: 10},
{tick: 30, inBackOff: true, value: 10},
{tick: 31, inBackOff: true, value: 10},
{tick: 32, inBackOff: true, value: 10},
{tick: 33, inBackOff: true, value: 10},
{tick: 34, inBackOff: true, value: 10},
{tick: 35, inBackOff: false, value: 10},
{tick: 56, inBackOff: false, value: 0},
{tick: 57, inBackOff: false, value: 1},
}
for _, c := range cases {
tc.Time = startTime.Add(c.tick * step)
if c.inBackOff != b.IsInBackOffSinceUpdate(id, tc.Now()) {
t.Errorf("expected IsInBackOffSinceUpdate %v got %v at tick %s", c.inBackOff, b.IsInBackOffSinceUpdate(id, tc.Now()), c.tick*step)
}
if c.inBackOff && (time.Duration(c.value)*step != b.Get(id)) {
t.Errorf("expected backoff value=%s got %s at tick %s", time.Duration(c.value)*step, b.Get(id), c.tick*step)
}
if !c.inBackOff {
b.Next(id, tc.Now())
}
}
}