diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index c0df853a74..88d0b4063a 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -222,9 +222,11 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { f.Stopped = append(f.Stopped, id) var newList []docker.APIContainers for _, container := range f.ContainerList { - if container.ID != id { - newList = append(newList, container) + if container.ID == id { + f.ExitedContainerList = append(f.ExitedContainerList, container) + continue } + newList = append(newList, container) } f.ContainerList = newList } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 77ca5a4e01..64058a8e4d 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -584,3 +584,37 @@ func (self *DockerManager) Pull(image string) error { func (self *DockerManager) IsImagePresent(image string) (bool, error) { return self.Puller.IsImagePresent(image) } + +// PodInfraContainer returns true if the pod infra container has changed. +func (self *DockerManager) PodInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) { + networkMode := "" + var ports []api.ContainerPort + + dockerPodInfraContainer, err := self.client.InspectContainer(string(podInfraContainer.ID)) + if err != nil { + return false, err + } + + // Check network mode. + if dockerPodInfraContainer.HostConfig != nil { + networkMode = dockerPodInfraContainer.HostConfig.NetworkMode + } + if pod.Spec.HostNetwork { + if networkMode != "host" { + glog.V(4).Infof("host: %v, %v", pod.Spec.HostNetwork, networkMode) + return true, nil + } + } else { + // Docker only exports ports from the pod infra container. Let's + // collect all of the relevant ports and export them. + for _, container := range pod.Spec.Containers { + ports = append(ports, container.Ports...) + } + } + expectedPodInfraContainer := &api.Container{ + Name: PodInfraContainerName, + Image: self.PodInfraContainerImage, + Ports: ports, + } + return podInfraContainer.Hash != HashContainer(expectedPodInfraContainer), nil +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e22b3d1d1a..34a479250e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1080,15 +1080,26 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta createPodInfraContainer := false var podInfraContainerID dockertools.DockerID + var changed bool podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName) if podInfraContainer != nil { - glog.V(4).Infof("Found infra pod for %q", podFullName) + glog.V(4).Infof("Found pod infra container for %q", podFullName) + changed, err = kl.containerManager.PodInfraContainerChanged(pod, podInfraContainer) + if err != nil { + return podContainerChangesSpec{}, err + } + } + + createPodInfraContainer = true + if podInfraContainer == nil { + glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName) + } else if changed { + glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName) + } else { + glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName) + createPodInfraContainer = false podInfraContainerID = dockertools.DockerID(podInfraContainer.ID) containersToKeep[podInfraContainerID] = -1 - - } else { - glog.V(2).Infof("No Infra Container for %q found. All containers will be restarted.", podFullName) - createPodInfraContainer = true } // Do not use the cache here since we need the newest status to check @@ -1129,22 +1140,11 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta containersToKeep[containerID] = index continue } - glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, result) - containersToStart[index] = empty{} + glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) } else { - glog.Infof("pod %q container %q hash changed (%d vs %d). Pod will be killed and re-created.", podFullName, container.Name, hash, expectedHash) - createPodInfraContainer = true - delete(containersToKeep, podInfraContainerID) - // If we are to restart Infra Container then we move containersToKeep into containersToStart - // if RestartPolicy allows restarting failed containers. - if pod.Spec.RestartPolicy != api.RestartPolicyNever { - for _, v := range containersToKeep { - containersToStart[v] = empty{} - } - } - containersToStart[index] = empty{} - containersToKeep = make(map[dockertools.DockerID]int) + glog.Infof("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, hash, expectedHash) } + containersToStart[index] = empty{} } else { // createPodInfraContainer == true and Container exists // If we're creating infra containere everything will be killed anyway // If RestartPolicy is Always or OnFailure we restart containers that were running before we @@ -1167,7 +1167,8 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta } // After the loop one of the following should be true: - // - createPodInfraContainer is true and containersToKeep is empty + // - createPodInfraContainer is true and containersToKeep is empty. + // (In fact, when createPodInfraContainer is false, containersToKeep will not be touched). // - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container // If Infra container is the last running one, we don't want to keep it. @@ -1221,7 +1222,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont if containerChanges.startInfraContainer || (len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0) { if len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0 { - glog.V(4).Infof("Killing Infra Container for %q becase all other containers are dead.", podFullName) + glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName) } else { glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e279594bca..155afa9a9b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -70,7 +70,6 @@ type TestKubelet struct { func newTestKubelet(t *testing.T) *TestKubelet { fakeDocker := &dockertools.FakeDockerClient{Errors: make(map[string]error), RemovedImages: util.StringSet{}} - fakeRecorder := &record.FakeRecorder{} fakeKubeClient := &testclient.Fake{} kubelet := &Kubelet{} @@ -436,6 +435,22 @@ func TestKillContainer(t *testing.T) { var emptyPodUIDs map[types.UID]metrics.SyncPodType +func generatePodInfraContainerHash(pod *api.Pod) uint64 { + var ports []api.ContainerPort + if !pod.Spec.HostNetwork { + for _, container := range pod.Spec.Containers { + ports = append(ports, container.Ports...) + } + } + + container := &api.Container{ + Name: dockertools.PodInfraContainerName, + Image: dockertools.PodInfraContainerImage, + Ports: ports, + } + return dockertools.HashContainer(container) +} + func TestSyncPodsDoesNothing(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) @@ -444,18 +459,6 @@ func TestSyncPodsDoesNothing(t *testing.T) { waitGroup := testKubelet.waitGroup container := api.Container{Name: "bar"} - fakeDocker.ContainerList = []docker.APIContainers{ - { - // format is // k8s____ - Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo_new_12345678_0"}, - ID: "1234", - }, - { - // pod infra container - Names: []string{"/k8s_POD_foo_new_12345678_0"}, - ID: "9876", - }, - } pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -470,6 +473,32 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, }, } + + fakeDocker.ContainerList = []docker.APIContainers{ + { + // format is // k8s____ + Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo_new_12345678_0"}, + ID: "1234", + }, + { + // pod infra container + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo_new_12345678_0"}, + ID: "9876", + }, + } + fakeDocker.ContainerMap = map[string]*docker.Container{ + "1234": { + ID: "1234", + HostConfig: &docker.HostConfig{}, + Config: &docker.Config{}, + }, + "9876": { + ID: "9876", + HostConfig: &docker.HostConfig{}, + Config: &docker.Config{}, + }, + } + kubelet.podManager.SetPods(pods) waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) @@ -477,7 +506,14 @@ func TestSyncPodsDoesNothing(t *testing.T) { t.Errorf("unexpected error: %v", err) } waitGroup.Wait() - verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{ + "list", "list", + // Check the pod infra contianer. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container"}) } func TestSyncPodsWithTerminationLog(t *testing.T) { @@ -513,7 +549,15 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { } waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_image", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", + // Get pod status. + "list", "inspect_image", + // Create pod infra container. + "create", "start", "inspect_container", + // Create container. + "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") @@ -565,7 +609,15 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_image", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", + // Get pod status. + "list", "inspect_image", + // Create pod infra container. + "create", "start", "inspect_container", + // Create container. + "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() @@ -620,7 +672,15 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_image", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", + // Get pod status. + "list", "inspect_image", + // Create pod infra container. + "create", "start", "inspect_container", + // Create container. + "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() @@ -642,13 +702,6 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker waitGroup := testKubelet.waitGroup - fakeDocker.ContainerList = []docker.APIContainers{ - { - // pod infra container - Names: []string{"/k8s_POD_foo_new_12345678_0"}, - ID: "9876", - }, - } pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -663,6 +716,20 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { }, }, } + fakeDocker.ContainerList = []docker.APIContainers{ + { + // pod infra container + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo_new_12345678_0"}, + ID: "9876", + }, + } + fakeDocker.ContainerMap = map[string]*docker.Container{ + "9876": { + ID: "9876", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + } waitGroup.Add(1) kubelet.podManager.SetPods(pods) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) @@ -672,7 +739,15 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "inspect_image", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_image", + // Create container. + "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -690,13 +765,6 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { waitGroup := testKubelet.waitGroup fakeHttp := fakeHTTP{} kubelet.httpClient = &fakeHttp - fakeDocker.ContainerList = []docker.APIContainers{ - { - // pod infra container - Names: []string{"/k8s_POD_foo_new_12345678_0"}, - ID: "9876", - }, - } pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -722,6 +790,20 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, }, } + fakeDocker.ContainerList = []docker.APIContainers{ + { + // pod infra container + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo_new_12345678_0"}, + ID: "9876", + }, + } + fakeDocker.ContainerMap = map[string]*docker.Container{ + "9876": { + ID: "9876", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + } waitGroup.Add(1) kubelet.podManager.SetPods(pods) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) @@ -731,7 +813,15 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "inspect_image", "create", "start", "list", "inspect_container", "inspect_container"}) + "list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_image", + // Create container. + "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -750,23 +840,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker waitGroup := testKubelet.waitGroup - fakeDocker.ContainerList = []docker.APIContainers{ - { - // format is // k8s___ - Names: []string{"/k8s_bar1_foo1_new_12345678_0"}, - ID: "1234", - }, - { - // format is // k8s___ - Names: []string{"/k8s_bar2_foo2_new_87654321_0"}, - ID: "5678", - }, - { - // format is // k8s___ - Names: []string{"/k8s_POD_foo2_new_87654321_0"}, - ID: "8765", - }, - } + pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -793,6 +867,41 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { }, }, } + fakeDocker.ContainerList = []docker.APIContainers{ + { + // format is // k8s___ + Names: []string{"/k8s_bar1_foo1_new_12345678_0"}, + ID: "1234", + }, + { + // format is // k8s___ + Names: []string{"/k8s_bar2_foo2_new_87654321_0"}, + ID: "5678", + }, + { + // format is // k8s___ + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo2_new_87654321_0"}, + ID: "8765", + }, + } + fakeDocker.ContainerMap = map[string]*docker.Container{ + "1234": { + ID: "1234", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + "5678": { + ID: "5678", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + "8765": { + ID: "8765", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + } + waitGroup.Add(2) kubelet.podManager.SetPods(pods) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) @@ -804,11 +913,26 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { verifyUnorderedCalls(t, fakeDocker, []string{ "list", // foo1 - "list", "list", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container", + "list", + // Get pod status. + "list", "inspect_container", + // Kill the container since pod infra container is not running. + "stop", + // Create pod infra container. + "create", "start", "inspect_container", + // Create container. + "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container", // foo2 - "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container", - }) + "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -909,153 +1033,231 @@ func TestSyncPodsDeletes(t *testing.T) { } } -func TestSyncPodDeletesDuplicate(t *testing.T) { +func TestSyncPodsDeletesDuplicate(t *testing.T) { testKubelet := newTestKubelet(t) + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - dockerContainers := dockertools.DockerContainers{ - "1234": &docker.APIContainers{ + waitGroup := testKubelet.waitGroup + + pods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "bar", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "foo"}, + }, + }, + }, + } + + fakeDocker.ContainerList = []docker.APIContainers{ + { // the k8s prefix is required for the kubelet to manage the container Names: []string{"/k8s_foo_bar_new_12345678_1111"}, ID: "1234", }, - "9876": &docker.APIContainers{ + { // pod infra container - Names: []string{"/k8s_POD_bar_new_12345678_2222"}, + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_bar_new_12345678_2222"}, ID: "9876", }, - "4567": &docker.APIContainers{ + { // Duplicate for the same container. Names: []string{"/k8s_foo_bar_new_12345678_3333"}, ID: "4567", }, } - bound := api.Pod{ - ObjectMeta: api.ObjectMeta{ - UID: "12345678", - Name: "bar", - Namespace: "new", + fakeDocker.ContainerMap = map[string]*docker.Container{ + "1234": { + ID: "1234", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, }, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "foo"}, - }, + "9876": { + ID: "9876", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + "4567": { + ID: "4567", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, }, } - pods := []api.Pod{bound} + kubelet.podManager.SetPods(pods) - err := kubelet.syncPod(&bound, nil, dockerContainersToPod(dockerContainers)) + waitGroup.Add(1) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } + waitGroup.Wait() - verifyCalls(t, fakeDocker, []string{"list", "inspect_image", "stop", "list", "inspect_image"}) + verifyCalls(t, fakeDocker, []string{ + "list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container", + // Kill the duplicated container. + "stop", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container"}) // Expect one of the duplicates to be killed. if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") { t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) } } -func TestSyncPodBadHash(t *testing.T) { +func TestSyncPodsBadHash(t *testing.T) { testKubelet := newTestKubelet(t) + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - dockerContainers := dockertools.DockerContainers{ - "1234": &docker.APIContainers{ + waitGroup := testKubelet.waitGroup + + pods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "foo", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "bar"}, + }, + }, + }, + } + + fakeDocker.ContainerList = []docker.APIContainers{ + { // the k8s prefix is required for the kubelet to manage the container Names: []string{"/k8s_bar.1234_foo_new_12345678_42"}, ID: "1234", }, - "9876": &docker.APIContainers{ + { // pod infra container - Names: []string{"/k8s_POD_foo_new_12345678_42"}, + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo_new_12345678_42"}, ID: "9876", }, } - bound := api.Pod{ - ObjectMeta: api.ObjectMeta{ - UID: "12345678", - Name: "foo", - Namespace: "new", + fakeDocker.ContainerMap = map[string]*docker.Container{ + "1234": { + ID: "1234", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, }, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "bar"}, - }, + "9876": { + ID: "9876", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, }, } - pods := []api.Pod{bound} + kubelet.podManager.SetPods(pods) - err := kubelet.syncPod(&bound, nil, dockerContainersToPod(dockerContainers)) + waitGroup.Add(1) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } + waitGroup.Wait() - verifyCalls(t, fakeDocker, []string{"list", "inspect_image", "stop", "stop", "create", "start", - "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{ + "list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container", + // Kill and restart the bad hash container. + "stop", "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container"}) - // A map interation is used to delete containers, so must not depend on - // order here. - expectedToStop := map[string]bool{ - "1234": true, - "9876": true, - } - if len(fakeDocker.Stopped) != 2 || - (!expectedToStop[fakeDocker.Stopped[0]] && - !expectedToStop[fakeDocker.Stopped[1]]) { - t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) + if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { + t.Errorf("%v", err) } } -func TestSyncPodUnhealthy(t *testing.T) { +func TestSyncPodsUnhealthy(t *testing.T) { testKubelet := newTestKubelet(t) + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - dockerContainers := dockertools.DockerContainers{ - "1234": &docker.APIContainers{ - // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_bar_foo_new_12345678_42"}, - ID: "1234", - }, - "9876": &docker.APIContainers{ - // pod infra container - Names: []string{"/k8s_POD_foo_new_12345678_42"}, - ID: "9876", - }, - } - bound := api.Pod{ - ObjectMeta: api.ObjectMeta{ - UID: "12345678", - Name: "foo", - Namespace: "new", - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "bar", - LivenessProbe: &api.Probe{ - // Always returns healthy == false + waitGroup := testKubelet.waitGroup + + pods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "foo", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "bar", + LivenessProbe: &api.Probe{ + // Always returns healthy == false + }, }, }, }, }, } - pods := []api.Pod{bound} + + fakeDocker.ContainerList = []docker.APIContainers{ + { + // the k8s prefix is required for the kubelet to manage the container + Names: []string{"/k8s_bar_foo_new_12345678_42"}, + ID: "1234", + }, + { + // pod infra container + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo_new_12345678_42"}, + ID: "9876", + }, + } + fakeDocker.ContainerMap = map[string]*docker.Container{ + "1234": { + ID: "1234", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + "9876": { + ID: "9876", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + } kubelet.podManager.SetPods(pods) - err := kubelet.syncPod(&bound, nil, dockerContainersToPod(dockerContainers)) + waitGroup.Add(1) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } + waitGroup.Wait() - verifyCalls(t, fakeDocker, []string{"list", "inspect_image", "stop", "create", "start", "list", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{ + "list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container", + // Kill the unhealthy container. + "stop", + // Restart the unhealthy container. + "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container"}) - // A map interation is used to delete containers, so must not depend on - // order here. - expectedToStop := map[string]bool{ - "1234": true, - } - if len(fakeDocker.Stopped) != len(expectedToStop) || - !expectedToStop[fakeDocker.Stopped[0]] { - t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) + if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { + t.Errorf("%v", err) } } @@ -1597,33 +1799,32 @@ func TestNewHandler(t *testing.T) { func TestSyncPodEventHandlerFails(t *testing.T) { testKubelet := newTestKubelet(t) + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker + waitGroup := testKubelet.waitGroup + kubelet.httpClient = &fakeHTTP{ err: fmt.Errorf("test error"), } - dockerContainers := dockertools.DockerContainers{ - "9876": &docker.APIContainers{ - // pod infra container - Names: []string{"/k8s_POD_foo_new_12345678_42"}, - ID: "9876", - }, - } - bound := api.Pod{ - ObjectMeta: api.ObjectMeta{ - UID: "12345678", - Name: "foo", - Namespace: "new", - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "bar", - Lifecycle: &api.Lifecycle{ - PostStart: &api.Handler{ - HTTPGet: &api.HTTPGetAction{ - Host: "does.no.exist", - Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt}, - Path: "bar", + + pods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "foo", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "bar", + Lifecycle: &api.Lifecycle{ + PostStart: &api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Host: "does.no.exist", + Port: util.IntOrString{IntVal: 8080, Kind: util.IntstrInt}, + Path: "bar", + }, }, }, }, @@ -1631,18 +1832,53 @@ func TestSyncPodEventHandlerFails(t *testing.T) { }, }, } - pods := []api.Pod{bound} + + fakeDocker.ContainerList = []docker.APIContainers{ + { + // pod infra container + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo_new_12345678_42"}, + ID: "9876", + }, + } + fakeDocker.ContainerMap = map[string]*docker.Container{ + "9876": { + ID: "9876", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + }, + } kubelet.podManager.SetPods(pods) - err := kubelet.syncPod(&bound, nil, dockerContainersToPod(dockerContainers)) + waitGroup.Add(1) + err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } + waitGroup.Wait() - verifyCalls(t, fakeDocker, []string{"list", "inspect_image", "create", "start", "stop", "list", "inspect_image"}) + verifyCalls(t, fakeDocker, []string{ + "list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_image", + // Create the container. + "create", "start", + // Kill the container since event handler fails. + "stop", + // Get pod status. + "list", "inspect_container", "inspect_container"}) + // TODO(yifan): Check the stopped container's name. if len(fakeDocker.Stopped) != 1 { t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) } + dockerName, _, err := dockertools.ParseDockerName(fakeDocker.Stopped[0]) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if dockerName.ContainerName != "bar" { + t.Errorf("Wrong stopped container, expected: bar, get: %q", dockerName.ContainerName) + } } func TestSyncPodsWithPullPolicy(t *testing.T) { @@ -3550,11 +3786,23 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { {Name: "succeeded"}, {Name: "failed"}, } + pods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "foo", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: containers, + }, + }, + } runningAPIContainers := []docker.APIContainers{ { // pod infra container - Names: []string{"/k8s_POD_foo_new_12345678_0"}, + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo_new_12345678_0"}, ID: "9876", }, } @@ -3572,6 +3820,15 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { } containerMap := map[string]*docker.Container{ + "9876": { + ID: "9876", + Name: "POD", + Config: &docker.Config{}, + State: docker.State{ + StartedAt: time.Now(), + Running: true, + }, + }, "1234": { ID: "1234", Name: "succeeded", @@ -3602,19 +3859,43 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { }{ { api.RestartPolicyAlways, - []string{"list", "list", "list", "inspect_container", "inspect_container", "inspect_container", "create", "start", "create", "start", "list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", "inspect_container"}, + []string{"list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container", + // Restart both containers. + "create", "start", "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", "inspect_container"}, []string{"succeeded", "failed"}, []string{}, }, { api.RestartPolicyOnFailure, - []string{"list", "list", "list", "inspect_container", "inspect_container", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container", "inspect_container", "inspect_container"}, + []string{"list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container", + // Restart the failed container. + "create", "start", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container", "inspect_container"}, []string{"failed"}, []string{}, }, { api.RestartPolicyNever, - []string{"list", "list", "list", "inspect_container", "inspect_container", "inspect_container", "stop", "list", "inspect_container", "inspect_container"}, + []string{"list", "list", + // Check the pod infra container. + "inspect_container", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container", + // Stop the last pod infra container. + "stop", + // Get pod status. + "list", "inspect_container", "inspect_container", "inspect_container"}, []string{}, []string{"9876"}, }, @@ -3625,19 +3906,8 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { fakeDocker.ExitedContainerList = exitedAPIContainers fakeDocker.ContainerMap = containerMap fakeDocker.ClearCalls() - pods := []api.Pod{ - { - ObjectMeta: api.ObjectMeta{ - UID: "12345678", - Name: "foo", - Namespace: "new", - }, - Spec: api.PodSpec{ - Containers: containers, - RestartPolicy: tt.policy, - }, - }, - } + pods[0].Spec.RestartPolicy = tt.policy + kubelet.podManager.SetPods(pods) waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) @@ -3670,13 +3940,6 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { {Name: "failed"}, } - runningAPIContainers := []docker.APIContainers{ - { - // pod infra container - Names: []string{"/k8s_POD_foo_new_12345678_0"}, - ID: "9876", - }, - } exitedAPIContainers := []docker.APIContainers{ { // format is // k8s___ @@ -3691,10 +3954,22 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { } containerMap := map[string]*docker.Container{ + "9876": { + ID: "9876", + Name: "POD", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, + State: docker.State{ + StartedAt: time.Now(), + FinishedAt: time.Now(), + Running: true, + }, + }, "1234": { - ID: "1234", - Name: "succeeded", - Config: &docker.Config{}, + ID: "1234", + Name: "succeeded", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, State: docker.State{ ExitCode: 0, StartedAt: time.Now(), @@ -3702,9 +3977,10 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { }, }, "5678": { - ID: "5678", - Name: "failed", - Config: &docker.Config{}, + ID: "5678", + Name: "failed", + Config: &docker.Config{}, + HostConfig: &docker.HostConfig{}, State: docker.State{ ExitCode: 42, StartedAt: time.Now(), @@ -3740,7 +4016,6 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { } for i, tt := range tests { - fakeDocker.ContainerList = runningAPIContainers fakeDocker.ExitedContainerList = exitedAPIContainers fakeDocker.ContainerMap = containerMap fakeDocker.ClearCalls() @@ -3757,6 +4032,13 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { }, }, } + fakeDocker.ContainerList = []docker.APIContainers{ + { + // pod infra container + Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(&pods[0]), 16) + "_foo_new_12345678_0"}, + ID: "9876", + }, + } kubelet.podManager.SetPods(pods) waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]api.Pod{}, time.Now()) @@ -3798,7 +4080,9 @@ func TestGetPodCreationFailureReason(t *testing.T) { kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker failureReason := "creation failure" - fakeDocker.Errors["create"] = fmt.Errorf("%s", failureReason) + fakeDocker.Errors = map[string]error{ + "create": fmt.Errorf("%s", failureReason), + } fakeDocker.ContainerList = []docker.APIContainers{} pod := api.Pod{ ObjectMeta: api.ObjectMeta{