From 3b9eba2c1b3201e116c881792e7c65ad9977cca6 Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Thu, 30 Apr 2015 18:37:15 -0700 Subject: [PATCH] Add SyncPod() to DockerManager and use in Kubelet. This allows us to abstract away the logic of syncing a pod by the runtime. It will allow other runtimes to perform their own sync as well. --- pkg/kubelet/dockertools/docker_test.go | 2 +- pkg/kubelet/dockertools/manager.go | 133 ++++++++++++++++++-- pkg/kubelet/dockertools/manager_test.go | 3 + pkg/kubelet/kubelet.go | 112 ++--------------- pkg/kubelet/kubelet_test.go | 157 ++++-------------------- pkg/kubelet/lifecycle/handlers_test.go | 12 +- pkg/kubelet/pod_workers_test.go | 2 +- pkg/kubelet/runonce_test.go | 6 +- 8 files changed, 173 insertions(+), 254 deletions(-) diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 31e9f126c4..2df9d42097 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -553,7 +553,7 @@ func TestFindContainersByPod(t *testing.T) { } fakeClient := &FakeDockerClient{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) + containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}, nil, nil, nil) for i, test := range tests { fakeClient.ContainerList = test.containerList fakeClient.ExitedContainerList = test.exitedContainerList diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 1146088d8c..3b6deae259 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -34,6 +34,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" @@ -93,6 +94,15 @@ type DockerManager struct { // with prober. // Health check prober. Prober prober.Prober + + // Generator of runtime container options. + generator kubecontainer.RunContainerOptionsGenerator + + // Runner of lifecycle events. + runner kubecontainer.HandlerRunner + + // Hooks injected into the container runtime. + runtimeHooks kubecontainer.RuntimeHooks } func NewDockerManager( @@ -106,7 +116,10 @@ func NewDockerManager( containerLogsDir string, osInterface kubecontainer.OSInterface, networkPlugin network.NetworkPlugin, - prober prober.Prober) *DockerManager { + prober prober.Prober, + generator kubecontainer.RunContainerOptionsGenerator, + httpClient kubeletTypes.HttpGetter, + runtimeHooks kubecontainer.RuntimeHooks) *DockerManager { // Work out the location of the Docker runtime, defaulting to /var/lib/docker // if there are any problems. dockerRoot := "/var/lib/docker" @@ -138,7 +151,7 @@ func NewDockerManager( } reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)} - return &DockerManager{ + dm := &DockerManager{ client: client, recorder: recorder, readinessManager: readinessManager, @@ -151,7 +164,11 @@ func NewDockerManager( containerLogsDir: containerLogsDir, networkPlugin: networkPlugin, Prober: prober, + generator: generator, + runtimeHooks: runtimeHooks, } + dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) + return dm } // A cache which stores strings keyed by _. @@ -737,6 +754,7 @@ func (dm *DockerManager) ListImages() ([]kubecontainer.Image, error) { return images, nil } +// TODO(vmarmol): Consider unexporting. // PullImage pulls an image from network to local storage. func (dm *DockerManager) PullImage(image string) error { return dm.Puller.Pull(image) @@ -1061,6 +1079,7 @@ func (dm *DockerManager) KillContainerInPod(container api.Container, pod *api.Po return dm.killContainer(targetContainer.ID) } +// TODO(vmarmol): Unexport this as it is no longer used externally. // KillContainer kills a container identified by containerID. // Internally, it invokes docker's StopContainer API with a timeout of 10s. // TODO: Deprecate this function in favor of KillContainerInPod. @@ -1084,14 +1103,15 @@ func (dm *DockerManager) killContainer(containerID types.UID) error { return err } +// TODO(vmarmol): Unexport this as it is no longer used externally. // Run a single container from a pod. Returns the docker container ID -func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner, netMode, ipcMode string) (kubeletTypes.DockerID, error) { +func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, error) { ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } - opts, err := generator.GenerateRunContainerOptions(pod, container, netMode, ipcMode) + opts, err := dm.generator.GenerateRunContainerOptions(pod, container, netMode, ipcMode) if err != nil { return "", err } @@ -1107,7 +1127,7 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge } if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { - handlerErr := runner.Run(id, pod, container, container.Lifecycle.PostStart) + handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart) if handlerErr != nil { dm.killContainer(types.UID(id)) return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) @@ -1127,8 +1147,8 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge return kubeletTypes.DockerID(id), err } -// CreatePodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container. -func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner) (kubeletTypes.DockerID, error) { +// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container. +func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, error) { // Use host networking if specified. netNamespace := "" var ports []api.ContainerPort @@ -1172,7 +1192,7 @@ func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecon dm.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image) } - id, err := dm.RunContainer(pod, container, generator, runner, netNamespace, "") + id, err := dm.RunContainer(pod, container, netNamespace, "") if err != nil { return "", err } @@ -1211,8 +1231,7 @@ type PodContainerChangesSpec struct { ContainersToKeep map[kubeletTypes.DockerID]int } -// TODO(vmarmol): This will soon be made non-public when its only use is internal. -func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) { +func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) { podFullName := kubecontainer.GetPodFullName(pod) uid := pod.UID glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) @@ -1318,3 +1337,97 @@ func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kub ContainersToKeep: containersToKeep, }, nil } + +// Pull the image for the specified pod and container. +func (dm *DockerManager) pullImage(pod *api.Pod, container *api.Container) error { + present, err := dm.IsImagePresent(container.Image) + if err != nil { + ref, err := kubecontainer.GenerateContainerRef(pod, container) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) + } + if ref != nil { + dm.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err) + } + return fmt.Errorf("failed to inspect image %q: %v", container.Image, err) + } + + if !dm.runtimeHooks.ShouldPullImage(pod, container, present) { + return nil + } + + err = dm.PullImage(container.Image) + dm.runtimeHooks.ReportImagePull(pod, container, err) + return err +} + +// Sync the running pod to match the specified desired pod. +func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) error { + podFullName := kubecontainer.GetPodFullName(pod) + containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus) + glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) + if err != nil { + return err + } + + if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) { + if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 { + glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName) + } else { + glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) + } + + // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) + err = dm.KillPod(runningPod) + if err != nil { + return err + } + } else { + // Otherwise kill any containers in this pod which are not specified as ones to keep. + for _, container := range runningPod.Containers { + _, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)] + if !keep { + glog.V(3).Infof("Killing unwanted container %+v", container) + err = dm.KillContainer(container.ID) + if err != nil { + glog.Errorf("Error killing container: %v", err) + } + } + } + } + + // If we should create infra container then we do it first. + podInfraContainerID := containerChanges.InfraContainerId + if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) { + glog.V(4).Infof("Creating pod infra container for %q", podFullName) + podInfraContainerID, err = dm.createPodInfraContainer(pod) + + // Call the networking plugin + if err == nil { + err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID) + } + if err != nil { + glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName) + return err + } + } + + // Start everything + for container := range containerChanges.ContainersToStart { + glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container]) + containerSpec := &pod.Spec.Containers[container] + if err := dm.pullImage(pod, containerSpec); err != nil { + glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", containerSpec.Image, kubecontainer.GetPodFullName(pod), containerSpec.Name, err) + continue + } + // TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container + namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID) + _, err := dm.RunContainer(pod, containerSpec, namespaceMode, namespaceMode) + if err != nil { + // TODO(bburns) : Perhaps blacklist a container after N failures? + glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), containerSpec.Name, err) + } + } + + return nil +} diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index c048d06b5b..5979425fe8 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -47,6 +47,9 @@ func NewFakeDockerManager() (*DockerManager, *FakeDockerClient) { 0, 0, "", kubecontainer.FakeOS{}, networkPlugin, + nil, + nil, + nil, nil) return dockerManager, fakeDocker diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6341db8248..6fedec68d0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -41,7 +41,6 @@ import ( kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" @@ -233,7 +232,6 @@ func NewMainKubelet( resourceContainer: resourceContainer, os: osInterface, oomWatcher: oomWatcher, - runtimeHooks: newKubeletRuntimeHooks(recorder), cgroupRoot: cgroupRoot, } @@ -253,13 +251,15 @@ func NewMainKubelet( containerLogsDir, osInterface, klet.networkPlugin, - nil) + nil, + klet, + klet.httpClient, + newKubeletRuntimeHooks(recorder)) klet.runner = containerManager klet.containerManager = containerManager klet.podManager = newBasicPodManager(klet.kubeClient) klet.prober = prober.New(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder) - klet.handlerRunner = lifecycle.NewHandlerRunner(klet.httpClient, klet.runner, klet.containerManager) // TODO(vmarmol): Remove when the circular dependency is removed :( containerManager.Prober = klet.prober @@ -345,9 +345,6 @@ type Kubelet struct { // Healthy check prober. prober prober.Prober - // Container lifecycle handler runner. - handlerRunner kubecontainer.HandlerRunner - // Container readiness state manager. readinessManager *kubecontainer.ReadinessManager @@ -403,10 +400,6 @@ type Kubelet struct { // Watcher of out of memory events. oomWatcher OOMWatcher - // TODO(vmarmol): Remove this when we only have to inject the hooks into the runtimes. - // Hooks injected into the container runtime. - runtimeHooks kubecontainer.RuntimeHooks - // If non-empty, pass this to the container runtime as the root cgroup. cgroupRoot string } @@ -871,29 +864,6 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string, return nameservers, searches, nil } -// Pull the image for the specified pod and container. -func (kl *Kubelet) pullImage(pod *api.Pod, container *api.Container) error { - present, err := kl.containerManager.IsImagePresent(container.Image) - if err != nil { - ref, err := kubecontainer.GenerateContainerRef(pod, container) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) - } - if ref != nil { - kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err) - } - return fmt.Errorf("failed to inspect image %q: %v", container.Image, err) - } - - if !kl.runtimeHooks.ShouldPullImage(pod, container, present) { - return nil - } - - err = kl.containerManager.PullImage(container.Image) - kl.runtimeHooks.ReportImagePull(pod, container, err) - return err -} - // Kill all running containers in a pod (includes the pod infra container). func (kl *Kubelet) killPod(pod kubecontainer.Pod) error { return kl.containerManager.KillPod(pod) @@ -951,44 +921,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont return err } - podStatus, err := kl.generatePodStatus(pod) - if err != nil { - glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err) - return err - } - - containerChanges, err := kl.containerManager.ComputePodContainerChanges(pod, runningPod, podStatus) - glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) - if err != nil { - return err - } - - if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) { - if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 { - glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName) - } else { - glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) - } - - // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) - err = kl.killPod(runningPod) - if err != nil { - return err - } - } else { - // Otherwise kill any containers in this pod which are not specified as ones to keep. - for _, container := range runningPod.Containers { - _, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)] - if !keep { - glog.V(3).Infof("Killing unwanted container %+v", container) - err = kl.containerManager.KillContainer(container.ID) - if err != nil { - glog.Errorf("Error killing container: %v", err) - } - } - } - } - // Starting phase: ref, err := api.GetReference(pod) if err != nil { @@ -1006,37 +938,15 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } kl.volumeManager.SetVolumes(pod.UID, podVolumes) - // If we should create infra container then we do it first. - podInfraContainerID := containerChanges.InfraContainerId - if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) { - glog.V(4).Infof("Creating pod infra container for %q", podFullName) - podInfraContainerID, err = kl.containerManager.CreatePodInfraContainer(pod, kl, kl.handlerRunner) - - // Call the networking plugin - if err == nil { - err = kl.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID) - } - if err != nil { - glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName) - return err - } + podStatus, err := kl.generatePodStatus(pod) + if err != nil { + glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err) + return err } - // Start everything - for container := range containerChanges.ContainersToStart { - glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container]) - containerSpec := &pod.Spec.Containers[container] - if err := kl.pullImage(pod, containerSpec); err != nil { - glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", containerSpec.Image, kubecontainer.GetPodFullName(pod), containerSpec.Name, err) - continue - } - // TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container - namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID) - _, err := kl.containerManager.RunContainer(pod, containerSpec, kl, kl.handlerRunner, namespaceMode, namespaceMode) - if err != nil { - // TODO(bburns) : Perhaps blacklist a container after N failures? - glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), containerSpec.Name, err) - } + err = kl.containerManager.SyncPod(pod, runningPod, podStatus) + if err != nil { + return err } if isStaticPod(pod) { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 94fe518ccf..d4a55efe72 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -43,7 +43,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" @@ -109,7 +108,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { podManager, fakeMirrorClient := newFakePodManager() kubelet.podManager = podManager kubelet.containerRefManager = kubecontainer.NewRefManager() - kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil) + runtimeHooks := newKubeletRuntimeHooks(kubelet.recorder) + kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil, kubelet, &fakeHTTP{}, runtimeHooks) kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager) kubelet.podWorkers = newPodWorkers( kubelet.runtimeCache, @@ -122,9 +122,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{} kubelet.prober = prober.New(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder) kubelet.containerManager.Prober = kubelet.prober - kubelet.handlerRunner = lifecycle.NewHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager) kubelet.volumeManager = newVolumeManager() - kubelet.runtimeHooks = newKubeletRuntimeHooks(kubelet.recorder) return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} } @@ -679,6 +677,16 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { fakeDocker.Unlock() } +type fakeHTTP struct { + url string + err error +} + +func (f *fakeHTTP) Get(url string) (*http.Response, error) { + f.url = url + return nil, f.err +} + func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) @@ -686,8 +694,12 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { fakeDocker := testKubelet.fakeDocker waitGroup := testKubelet.waitGroup fakeHttp := fakeHTTP{} + + // Simulate HTTP failure. Re-create the containerManager to inject the failure. kubelet.httpClient = &fakeHttp - kubelet.handlerRunner = lifecycle.NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager) + runtimeHooks := newKubeletRuntimeHooks(kubelet.recorder) + kubelet.containerManager = dockertools.NewDockerManager(kubelet.dockerClient, kubelet.recorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil, kubelet, kubelet.httpClient, runtimeHooks) + pods := []*api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -740,7 +752,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { // Get pod status. "list", "inspect_container", "inspect_image", // Check the pod infra container. - "inspect_container", + "inspect_container", "inspect_image", // Create container. "create", "start", // Get pod status. @@ -753,7 +765,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { } fakeDocker.Unlock() if fakeHttp.url != "http://foo:8080/bar" { - t.Errorf("Unexpected handler: %s", fakeHttp.url) + t.Errorf("Unexpected handler: %q", fakeHttp.url) } } @@ -1560,7 +1572,6 @@ func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port ui f.Stream = stream return nil } - func TestRunInContainerNoSuchPod(t *testing.T) { fakeCommandRunner := fakeContainerCommandRunner{} testKubelet := newTestKubelet(t) @@ -1627,125 +1638,6 @@ func TestRunInContainer(t *testing.T) { } } -func TestRunHandlerExec(t *testing.T) { - fakeCommandRunner := fakeContainerCommandRunner{} - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - fakeDocker := testKubelet.fakeDocker - kubelet.runner = &fakeCommandRunner - kubelet.handlerRunner = lifecycle.NewHandlerRunner(&fakeHTTP{}, kubelet.runner, kubelet.containerManager) - - containerID := "abc1234" - podName := "podFoo" - podNamespace := "nsFoo" - containerName := "containerFoo" - - fakeDocker.ContainerList = []docker.APIContainers{ - { - ID: containerID, - Names: []string{"/k8s_" + containerName + "_" + podName + "_" + podNamespace + "_12345678_42"}, - }, - } - - container := api.Container{ - Name: containerName, - Lifecycle: &api.Lifecycle{ - PostStart: &api.Handler{ - Exec: &api.ExecAction{ - Command: []string{"ls", "-a"}, - }, - }, - }, - } - - pod := api.Pod{} - pod.ObjectMeta.Name = podName - pod.ObjectMeta.Namespace = podNamespace - pod.Spec.Containers = []api.Container{container} - err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if fakeCommandRunner.ID != containerID || - !reflect.DeepEqual(container.Lifecycle.PostStart.Exec.Command, fakeCommandRunner.Cmd) { - t.Errorf("unexpected commands: %v", fakeCommandRunner) - } -} - -type fakeHTTP struct { - url string - err error -} - -func (f *fakeHTTP) Get(url string) (*http.Response, error) { - f.url = url - return nil, f.err -} - -func TestRunHandlerHttp(t *testing.T) { - fakeHttp := fakeHTTP{} - - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - kubelet.httpClient = &fakeHttp - kubelet.handlerRunner = lifecycle.NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager) - - containerID := "abc1234" - podName := "podFoo" - podNamespace := "nsFoo" - containerName := "containerFoo" - - container := api.Container{ - Name: containerName, - Lifecycle: &api.Lifecycle{ - PostStart: &api.Handler{ - HTTPGet: &api.HTTPGetAction{ - Host: "foo", - Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt}, - Path: "bar", - }, - }, - }, - } - pod := api.Pod{} - pod.ObjectMeta.Name = podName - pod.ObjectMeta.Namespace = podNamespace - pod.Spec.Containers = []api.Container{container} - err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart) - - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if fakeHttp.url != "http://foo:8080/bar" { - t.Errorf("unexpected url: %s", fakeHttp.url) - } -} - -func TestRunHandlerNil(t *testing.T) { - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - - containerID := "abc1234" - podName := "podFoo" - podNamespace := "nsFoo" - containerName := "containerFoo" - - container := api.Container{ - Name: containerName, - Lifecycle: &api.Lifecycle{ - PostStart: &api.Handler{}, - }, - } - pod := api.Pod{} - pod.ObjectMeta.Name = podName - pod.ObjectMeta.Namespace = podNamespace - pod.Spec.Containers = []api.Container{container} - err := kubelet.handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart) - if err == nil { - t.Errorf("expect error, but got nil") - } -} - func TestSyncPodEventHandlerFails(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) @@ -1753,10 +1645,12 @@ func TestSyncPodEventHandlerFails(t *testing.T) { fakeDocker := testKubelet.fakeDocker waitGroup := testKubelet.waitGroup + // Simulate HTTP failure. Re-create the containerManager to inject the failure. kubelet.httpClient = &fakeHTTP{ err: fmt.Errorf("test error"), } - kubelet.handlerRunner = lifecycle.NewHandlerRunner(kubelet.httpClient, &fakeContainerCommandRunner{}, kubelet.containerManager) + runtimeHooks := newKubeletRuntimeHooks(kubelet.recorder) + kubelet.containerManager = dockertools.NewDockerManager(kubelet.dockerClient, kubelet.recorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil, kubelet, kubelet.httpClient, runtimeHooks) pods := []*api.Pod{ { @@ -1810,7 +1704,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { // Get pod status. "list", "inspect_container", "inspect_image", // Check the pod infra container. - "inspect_container", + "inspect_container", "inspect_image", // Create the container. "create", "start", // Kill the container since event handler fails. @@ -1820,7 +1714,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { // TODO(yifan): Check the stopped container's name. if len(fakeDocker.Stopped) != 1 { - t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) + t.Fatalf("Wrong containers were stopped: %v", fakeDocker.Stopped) } dockerName, _, err := dockertools.ParseDockerName(fakeDocker.Stopped[0]) if err != nil { @@ -4065,6 +3959,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { } } +// TODO(vmarmol): Move this test away from using RunContainer(). func TestGetPodCreationFailureReason(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet @@ -4089,7 +3984,7 @@ func TestGetPodCreationFailureReason(t *testing.T) { pods := []*api.Pod{pod} kubelet.podManager.SetPods(pods) kubelet.volumeManager.SetVolumes(pod.UID, volumeMap{}) - _, err := kubelet.containerManager.RunContainer(pod, &pod.Spec.Containers[0], kubelet, kubelet.handlerRunner, "", "") + _, err := kubelet.containerManager.RunContainer(pod, &pod.Spec.Containers[0], "", "") if err == nil { t.Errorf("expected error, found nil") } diff --git a/pkg/kubelet/lifecycle/handlers_test.go b/pkg/kubelet/lifecycle/handlers_test.go index 403079bd61..f5baf0dd5e 100644 --- a/pkg/kubelet/lifecycle/handlers_test.go +++ b/pkg/kubelet/lifecycle/handlers_test.go @@ -96,8 +96,6 @@ func TestRunHandlerExec(t *testing.T) { handlerRunner := NewHandlerRunner(&fakeHTTP{}, &fakeCommandRunner, nil) containerID := "abc1234" - podName := "podFoo" - podNamespace := "nsFoo" containerName := "containerFoo" container := api.Container{ @@ -112,8 +110,8 @@ func TestRunHandlerExec(t *testing.T) { } pod := api.Pod{} - pod.ObjectMeta.Name = podName - pod.ObjectMeta.Namespace = podNamespace + pod.ObjectMeta.Name = "podFoo" + pod.ObjectMeta.Namespace = "nsFoo" pod.Spec.Containers = []api.Container{container} err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart) if err != nil { @@ -140,8 +138,6 @@ func TestRunHandlerHttp(t *testing.T) { handlerRunner := NewHandlerRunner(&fakeHttp, &fakeContainerCommandRunner{}, nil) containerID := "abc1234" - podName := "podFoo" - podNamespace := "nsFoo" containerName := "containerFoo" container := api.Container{ @@ -157,8 +153,8 @@ func TestRunHandlerHttp(t *testing.T) { }, } pod := api.Pod{} - pod.ObjectMeta.Name = podName - pod.ObjectMeta.Namespace = podNamespace + pod.ObjectMeta.Name = "podFoo" + pod.ObjectMeta.Namespace = "nsFoo" pod.Spec.Containers = []api.Container{container} err := handlerRunner.Run(containerID, &pod, &container, container.Lifecycle.PostStart) diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index a2e2831a5c..1f2907f818 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -43,7 +43,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { fakeDocker := &dockertools.FakeDockerClient{} fakeRecorder := &record.FakeRecorder{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) + dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}, nil, nil, newKubeletRuntimeHooks(fakeRecorder)) fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) lock := sync.Mutex{} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index c0d1084a38..ca2ecb6d3e 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -90,7 +90,6 @@ func TestRunOnce(t *testing.T) { os: kubecontainer.FakeOS{}, volumeManager: newVolumeManager(), } - kb.runtimeHooks = newKubeletRuntimeHooks(kb.recorder) kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) if err := kb.setupDataDirs(); err != nil { @@ -161,7 +160,10 @@ func TestRunOnce(t *testing.T) { "", kubecontainer.FakeOS{}, kb.networkPlugin, - &kubeletProber.FakeProber{}) + &kubeletProber.FakeProber{}, + kb, + nil, + newKubeletRuntimeHooks(kb.recorder)) kb.containerManager.Puller = &dockertools.FakeDockerPuller{} pods := []*api.Pod{