Merge pull request #5032 from yifan-gu/clean_syncPod

Refactor pkg/kubelet/kubelet.go: syncPod().
pull/6/head
Victor Marmol 2015-03-05 17:24:20 -08:00
commit b314dc6025
5 changed files with 201 additions and 63 deletions

View File

@ -413,17 +413,23 @@ func (c DockerContainers) FindPodContainer(podFullName string, uid types.UID, co
return nil, false, 0
}
// Note, this might return containers belong to a different Pod instance with the same name
func (c DockerContainers) FindContainersByPodFullName(podFullName string) map[string]*docker.APIContainers {
containers := make(map[string]*docker.APIContainers)
// RemoveContainerWithID removes the container with the given containerID.
func (c DockerContainers) RemoveContainerWithID(containerID DockerID) {
delete(c, containerID)
}
// FindContainersByPod returns the containers that belong to the pod.
func (c DockerContainers) FindContainersByPod(podUID types.UID, podFullName string) DockerContainers {
containers := make(DockerContainers)
for _, dockerContainer := range c {
if len(dockerContainer.Names) == 0 {
continue
}
dockerManifestID, _, dockerContainerName, _ := ParseDockerName(dockerContainer.Names[0])
if dockerManifestID == podFullName {
containers[dockerContainerName] = dockerContainer
dockerPodName, uuid, _, _ := ParseDockerName(dockerContainer.Names[0])
if podUID == uuid ||
(podUID == "" && podFullName == dockerPodName) {
containers[DockerID(dockerContainer.ID)] = dockerContainer
}
}
return containers

View File

@ -451,3 +451,146 @@ func TestGetRunningContainers(t *testing.T) {
}
}
}
func TestFindContainersByPod(t *testing.T) {
tests := []struct {
testContainers DockerContainers
inputPodID types.UID
inputPodFullName string
expectedContainers DockerContainers
}{
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
types.UID("1234"),
"",
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
},
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_qux_2343_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
types.UID("1234"),
"",
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
},
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_qux_2343_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_1234_42"},
},
},
types.UID("5678"),
"",
DockerContainers{},
},
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: nil,
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_5678_42"},
},
},
types.UID("5678"),
"",
DockerContainers{
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_5678_42"},
},
},
},
{
DockerContainers{
"foobar": &docker.APIContainers{
ID: "foobar",
Names: []string{"/k8s_foo_qux_1234_42"},
},
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_abc_5678_42"},
},
"baz": &docker.APIContainers{
ID: "baz",
Names: []string{"/k8s_foo_qux_5678_42"},
},
},
"",
"abc",
DockerContainers{
"barbar": &docker.APIContainers{
ID: "barbar",
Names: []string{"/k8s_foo_abc_5678_42"},
},
},
},
}
for _, test := range tests {
result := test.testContainers.FindContainersByPod(test.inputPodID, test.inputPodFullName)
if !reflect.DeepEqual(result, test.expectedContainers) {
t.Errorf("expected: %v, saw: %v", test.expectedContainers, result)
}
}
}

View File

@ -723,7 +723,7 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := kl.runHandler(GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart)
if handlerErr != nil {
kl.killContainerByID(dockerContainer.ID, "")
kl.killContainerByID(dockerContainer.ID)
return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
}
}
@ -873,25 +873,21 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,
// Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error {
return kl.killContainerByID(dockerContainer.ID, dockerContainer.Names[0])
return kl.killContainerByID(dockerContainer.ID)
}
func (kl *Kubelet) killContainerByID(ID, name string) error {
glog.V(2).Infof("Killing container with id %q and name %q", ID, name)
func (kl *Kubelet) killContainerByID(ID string) error {
glog.V(2).Infof("Killing container with id %q", ID)
kl.readiness.remove(ID)
err := kl.dockerClient.StopContainer(ID, 10)
if len(name) == 0 {
return err
}
ref, ok := kl.getRef(dockertools.DockerID(ID))
if !ok {
glog.Warningf("No ref for pod '%v' - '%v'", ID, name)
glog.Warningf("No ref for pod '%v'", ID)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
kl.recorder.Eventf(ref, "killing", "Killing %v - %v", ID, name)
kl.recorder.Eventf(ref, "killing", "Killing %v", ID)
}
return err
}
@ -1007,19 +1003,9 @@ func (kl *Kubelet) killContainersInPod(pod *api.BoundPod, dockerContainers docke
type empty struct{}
func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
// makePodDataDirs creates the dirs for the pod datas.
func (kl *Kubelet) makePodDataDirs(pod *api.BoundPod) error {
uid := pod.UID
containersToKeep := make(map[dockertools.DockerID]empty)
killedContainers := make(map[dockertools.DockerID]empty)
glog.V(4).Infof("Syncing Pod, podFullName: %q, uid: %q", podFullName, uid)
ref, err := api.GetReference(pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
}
// Make data dirs.
if err := os.Mkdir(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
@ -1029,14 +1015,30 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
if err := os.Mkdir(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
return nil
}
func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uid := pod.UID
glog.V(4).Infof("Syncing Pod, podFullName: %q, uid: %q", podFullName, uid)
ref, err := api.GetReference(pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
}
if err = kl.makePodDataDirs(pod); err != nil {
return err
}
// Make sure we have a pod infra container
var podInfraContainerID dockertools.DockerID
if podInfraDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found {
if podInfraDockerContainer, found, _ := containersInPod.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found {
podInfraContainerID = dockertools.DockerID(podInfraDockerContainer.ID)
} else {
glog.V(2).Infof("Pod infra container doesn't exist for pod %q, killing and re-creating the pod", podFullName)
count, err := kl.killContainersInPod(pod, dockerContainers)
count, err := kl.killContainersInPod(pod, containersInPod)
if err != nil {
return err
}
@ -1047,14 +1049,14 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
}
if count > 0 {
// Re-list everything, otherwise we'll think we're ok.
dockerContainers, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
containersInPod, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
glog.Errorf("Error listing containers %#v", containersInPod)
return err
}
}
}
containersToKeep[podInfraContainerID] = empty{}
containersInPod.RemoveContainerWithID(podInfraContainerID)
podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
@ -1074,7 +1076,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
for _, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container)
dockerContainerName := dockertools.BuildDockerName(uid, podFullName, &container)
if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, uid, container.Name); found {
if dockerContainer, found, hash := containersInPod.FindPodContainer(podFullName, uid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID)
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
@ -1092,7 +1094,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
}
if err != nil {
glog.V(1).Infof("liveness/readiness probe errored: %v", err)
containersToKeep[containerID] = empty{}
containersInPod.RemoveContainerWithID(containerID)
continue
}
if ready == probe.Success {
@ -1101,7 +1103,7 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
kl.readiness.set(dockerContainer.ID, false)
}
if live == probe.Success {
containersToKeep[containerID] = empty{}
containersInPod.RemoveContainerWithID(containerID)
continue
}
ref, ok := kl.getRef(containerID)
@ -1118,16 +1120,15 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
glog.V(1).Infof("Failed to kill container %q: %v", dockerContainer.ID, err)
continue
}
killedContainers[containerID] = empty{}
containersInPod.RemoveContainerWithID(containerID)
if podChanged {
// Also kill associated pod infra container if the pod changed.
if podInfraContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found {
if err := kl.killContainer(podInfraContainer); err != nil {
glog.V(1).Infof("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err)
continue
}
if err := kl.killContainerByID(string(podInfraContainerID)); err != nil {
glog.V(1).Infof("Failed to kill pod infra container %q: %v", podInfraContainerID, err)
continue
}
containersInPod.RemoveContainerWithID(containerID)
}
}
@ -1187,23 +1188,15 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
glog.Errorf("Error running pod %q container %q: %v", podFullName, container.Name, err)
continue
}
containersToKeep[containerID] = empty{}
containersInPod.RemoveContainerWithID(containerID)
}
// Kill any containers in this pod which were not identified above (guards against duplicates).
for id, container := range dockerContainers {
curPodFullName, curUUID, _, _ := dockertools.ParseDockerName(container.Names[0])
if curPodFullName == podFullName && curUUID == uid {
// Don't kill containers we want to keep or those we already killed.
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
glog.V(1).Infof("Killing unwanted container in pod %q: %+v", curUUID, container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
// Kill any remaining containers in this pod which were not identified above (guards against duplicates).
for _, container := range containersInPod {
glog.V(1).Infof("Killing unwanted container in pod %q: %+v", pod.UID, container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}

View File

@ -878,11 +878,6 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
Names: []string{"/k8s_foo_bar.new.test_12345678_3333"},
ID: "4567",
},
"2304": &docker.APIContainers{
// Container for another pod, untouched.
Names: []string{"/k8s_baz_fiz.new.test_6_42"},
ID: "2304",
},
}
bound := api.BoundPod{
ObjectMeta: api.ObjectMeta{

View File

@ -84,7 +84,8 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
glog.Errorf("Error listing containers while syncing pod: %v", err)
return
}
err = p.syncPodFn(newWork.pod, containers)
err = p.syncPodFn(newWork.pod, containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod)))
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)