Delete containers for a pod if we have to create the network container.

pull/6/head
Brendan Burns 2014-08-07 21:49:17 -07:00
parent b43e3865b4
commit 38900a9c58
2 changed files with 175 additions and 6 deletions

View File

@ -309,7 +309,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
}
// Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error {
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error {
glog.Infof("Killing: %s", dockerContainer.ID)
err := kl.dockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName := parseDockerName(dockerContainer.Names[0])
@ -349,6 +349,38 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) {
return kl.runContainer(pod, container, nil, "")
}
// Delete all containers in a pod (except the network container) returns the number of containers deleted
// and an error if one occurs.
func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerContainers DockerContainers) (int, error) {
count := 0
errs := make(chan error, len(pod.Manifest.Containers))
wg := sync.WaitGroup{}
for _, container := range pod.Manifest.Containers {
if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found {
count++
wg.Add(1)
go func() {
err := kl.killContainer(dockerContainer)
if err != nil {
glog.Errorf("Failed to delete container. (%v) Skipping pod %s", err, podFullName)
errs <- err
}
wg.Done()
}()
}
}
wg.Wait()
close(errs)
if len(errs) > 0 {
errList := []error{}
for err := range errs {
errList = append(errList, err)
}
return -1, fmt.Errorf("failed to delete containers (%v)", errList)
}
return count, nil
}
type empty struct{}
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
@ -362,12 +394,24 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
netID = DockerID(networkDockerContainer.ID)
} else {
glog.Infof("Network container doesn't exist, creating")
count, err := kl.deleteAllContainers(pod, podFullName, dockerContainers)
if err != nil {
return err
}
dockerNetworkID, err := kl.createNetworkContainer(pod)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName)
return err
}
netID = dockerNetworkID
if count > 0 {
// relist everything, otherwise we'll think we're ok
dockerContainers, err = getKubeletDockerContainers(kl.dockerClient)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
return err
}
}
}
containersToKeep[netID] = empty{}
@ -405,7 +449,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
}
glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy)
if err := kl.killContainer(*dockerContainer); err != nil {
if err := kl.killContainer(dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue
}
@ -434,7 +478,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
err = kl.killContainer(*container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
@ -525,7 +569,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
// Don't kill containers that are in the desired pods.
podFullName, containerName := parseDockerName(container.Names[0])
if _, ok := desiredContainers[podContainer{podFullName, containerName}]; !ok {
err = kl.killContainer(*container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}

View File

@ -20,8 +20,10 @@ import (
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
@ -146,7 +148,7 @@ func TestKillContainerWithError(t *testing.T) {
}
kubelet, _, _ := makeTestKubelet(t)
kubelet.dockerClient = fakeDocker
err := kubelet.killContainer(fakeDocker.containerList[0])
err := kubelet.killContainer(&fakeDocker.containerList[0])
if err == nil {
t.Errorf("expected error, found nil")
}
@ -169,7 +171,7 @@ func TestKillContainer(t *testing.T) {
ID: "foobar",
}
err := kubelet.killContainer(fakeDocker.containerList[0])
err := kubelet.killContainer(&fakeDocker.containerList[0])
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -238,6 +240,129 @@ func TestSyncPodsDoesNothing(t *testing.T) {
verifyCalls(t, fakeDocker, []string{"list", "list"})
}
// drainWorkers waits until all workers are done. Should only used for testing.
func (kl *Kubelet) drainWorkers() {
for {
kl.podWorkers.lock.Lock()
length := len(kl.podWorkers.workers)
kl.podWorkers.lock.Unlock()
if length == 0 {
return
}
time.Sleep(time.Millisecond * 100)
}
}
func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{}
err := kubelet.SyncPods([]Pod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
},
},
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "list", "inspect", "create", "start"})
fakeDocker.lock.Lock()
if len(fakeDocker.Created) != 2 ||
!strings.HasPrefix(fakeDocker.Created[0], "k8s--net--foo.test--") ||
!strings.HasPrefix(fakeDocker.Created[1], "k8s--bar--foo.test--") {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.lock.Unlock()
}
func TestSyncPodsWithNetCreatesContainer(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{
{
// network container
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
}
err := kubelet.SyncPods([]Pod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
},
},
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect", "create", "start"})
fakeDocker.lock.Lock()
if len(fakeDocker.Created) != 1 ||
!strings.HasPrefix(fakeDocker.Created[0], "k8s--bar--foo.test--") {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.lock.Unlock()
}
func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{
{
// format is k8s--<container-id>--<pod-fullname>
Names: []string{"/k8s--bar--foo.test"},
ID: "1234",
},
}
err := kubelet.SyncPods([]Pod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
},
},
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "stop", "create", "start", "list", "list", "inspect", "create", "start"})
// A map iteration is used to delete containers, so must not depend on
// order here.
expectedToStop := map[string]bool{
"1234": true,
}
fakeDocker.lock.Lock()
if len(fakeDocker.stopped) != 1 || !expectedToStop[fakeDocker.stopped[0]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped)
}
fakeDocker.lock.Unlock()
}
func TestSyncPodsDeletes(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{