Merge pull request #47599 from yujuhong/restart-init

Automatic merge from submit-queue (batch tested with PRs 46317, 48922, 50651, 50230, 47599)

Rerun init containers when the pod needs to be restarted

Whenever pod sandbox needs to be recreated, all containers associated
with it will be killed by kubelet. This change ensures that the init
containers will be rerun in such cases.

The change also refactors the compute logic so that the control flow of
init containers act is more aligned with the regular containers. Unit
tests are added to verify the logic.

This fixes #36485
pull/6/head
Kubernetes Submit Queue 2017-08-16 19:50:22 -07:00 committed by GitHub
commit 4bfe9b1a56
4 changed files with 598 additions and 208 deletions

View File

@ -55,6 +55,18 @@ type FakeRuntimeService struct {
Sandboxes map[string]*FakePodSandbox
}
func (r *FakeRuntimeService) GetContainerID(sandboxID, name string, attempt uint32) (string, error) {
r.Lock()
defer r.Unlock()
for id, c := range r.Containers {
if c.SandboxID == sandboxID && c.Metadata.Name == name && c.Metadata.Attempt == attempt {
return id, nil
}
}
return "", fmt.Errorf("container (name, attempt, sandboxID)=(%q, %d, %q) not found", name, attempt, sandboxID)
}
func (r *FakeRuntimeService) SetFakeSandboxes(sandboxes []*FakePodSandbox) {
r.Lock()
defer r.Unlock()

View File

@ -631,10 +631,11 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, ru
return
}
// pruneInitContainers ensures that before we begin creating init containers, we have reduced the number
// of outstanding init containers still present. This reduces load on the container garbage collector
// by only preserving the most recent terminated init container.
func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod, podStatus *kubecontainer.PodStatus, initContainersToKeep map[kubecontainer.ContainerID]int) {
// pruneInitContainersBeforeStart ensures that before we begin creating init
// containers, we have reduced the number of outstanding init containers still
// present. This reduces load on the container garbage collector by only
// preserving the most recent terminated init container.
func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {
// only the last execution of each init container should be preserved, and only preserve it if it is in the
// list of init containers to keep.
initContainerNames := sets.NewString()
@ -652,14 +653,9 @@ func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod,
if count == 1 {
continue
}
// if there is a reason to preserve the older container, do so
if _, ok := initContainersToKeep[status.ID]; ok {
continue
}
// prune all other init containers that match this container name
glog.V(4).Infof("Removing init container %q instance %q %d", status.Name, status.ID.ID, count)
if err := m.runtimeService.RemoveContainer(status.ID.ID); err != nil {
if err := m.removeContainer(status.ID.ID); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))
continue
}
@ -674,6 +670,37 @@ func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod,
}
}
// Remove all init containres. Note that this function does not check the state
// of the container because it assumes all init containers have been stopped
// before the call happens.
func (m *kubeGenericRuntimeManager) purgeInitContainers(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {
initContainerNames := sets.NewString()
for _, container := range pod.Spec.InitContainers {
initContainerNames.Insert(container.Name)
}
for name := range initContainerNames {
count := 0
for _, status := range podStatus.ContainerStatuses {
if status.Name != name || !initContainerNames.Has(status.Name) {
continue
}
count++
// Purge all init containers that match this container name
glog.V(4).Infof("Removing init container %q instance %q %d", status.Name, status.ID.ID, count)
if err := m.removeContainer(status.ID.ID); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))
continue
}
// Remove any references to this container
if _, ok := m.containerRefManager.GetRef(status.ID); ok {
m.containerRefManager.ClearRef(status.ID)
} else {
glog.Warningf("No ref for container %q", status.ID)
}
}
}
}
// findNextInitContainerToRun returns the status of the last failed container, the
// next init container to start, or done if there are no further init containers.
// Status is only returned if an init container is failed, in which case next will

View File

@ -333,34 +333,28 @@ type containerToKillInfo struct {
message string
}
// podContainerSpecChanges keeps information on changes that need to happen for a pod.
type podContainerSpecChanges struct {
// Whether need to create a new sandbox.
// podActions keeps information what to do for a pod.
type podActions struct {
// Stop all running (regular and init) containers and the sandbox for the pod.
KillPod bool
// Whether need to create a new sandbox. If needed to kill pod and create a
// a new pod sandbox, all init containers need to be purged (i.e., removed).
CreateSandbox bool
// The id of existing sandbox. It is used for starting containers in ContainersToStart.
SandboxID string
// The attempt number of creating sandboxes for the pod.
Attempt uint32
// ContainersToStart keeps a map of containers that need to be started, note that
// the key is index of the container inside pod.Spec.Containers, while
// the value is a message indicates why the container needs to start.
ContainersToStart map[int]string
// ContainersToKeep keeps a map of containers that need to be kept as is, note that
// the key is the container ID of the container, while
// the value is index of the container inside pod.Spec.Containers.
ContainersToKeep map[kubecontainer.ContainerID]int
// The next init container to start.
NextInitContainerToStart *v1.Container
// ContainersToStart keeps a list of indexes for the containers to start,
// where the index is the index of the specific container in the pod spec (
// pod.Spec.Containers.
ContainersToStart []int
// ContainersToKill keeps a map of containers that need to be killed, note that
// the key is the container ID of the container, while
// the value contains necessary information to kill a container.
ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo
// InitFailed indicates whether init containers are failed.
InitFailed bool
// InitContainersToKeep keeps a map of init containers that need to be kept as
// is, note that the key is the container ID of the container, while
// the value is index of the container inside pod.Spec.InitContainers.
InitContainersToKeep map[kubecontainer.ContainerID]int
}
// podSandboxChanged checks whether the spec of the pod is changed and returns
@ -399,141 +393,127 @@ func (m *kubeGenericRuntimeManager) podSandboxChanged(pod *v1.Pod, podStatus *ku
return false, sandboxStatus.Metadata.Attempt, sandboxStatus.Id
}
// checkAndKeepInitContainers keeps all successfully completed init containers. If there
// are failing containers, only keep the first failing one.
func checkAndKeepInitContainers(pod *v1.Pod, podStatus *kubecontainer.PodStatus, initContainersToKeep map[kubecontainer.ContainerID]int) bool {
initFailed := false
for i, container := range pod.Spec.InitContainers {
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus == nil {
continue
}
if containerStatus.State == kubecontainer.ContainerStateRunning {
initContainersToKeep[containerStatus.ID] = i
continue
}
if containerStatus.State == kubecontainer.ContainerStateExited {
initContainersToKeep[containerStatus.ID] = i
}
if isContainerFailed(containerStatus) {
initFailed = true
break
}
}
return initFailed
func containerChanged(container *v1.Container, containerStatus *kubecontainer.ContainerStatus) (uint64, uint64, bool) {
expectedHash := kubecontainer.HashContainer(container)
return expectedHash, containerStatus.Hash, containerStatus.Hash != expectedHash
}
// computePodContainerChanges checks whether the pod spec has changed and returns the changes if true.
func (m *kubeGenericRuntimeManager) computePodContainerChanges(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podContainerSpecChanges {
func shouldRestartOnFailure(pod *v1.Pod) bool {
return pod.Spec.RestartPolicy != v1.RestartPolicyNever
}
func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) bool {
cStatus := podStatus.FindContainerStatusByName(c.Name)
if cStatus == nil || cStatus.State == kubecontainer.ContainerStateRunning {
return false
}
return cStatus.ExitCode == 0
}
// computePodActions checks whether the pod spec has changed and returns the changes if true.
func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod)
sandboxChanged, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)
changes := podContainerSpecChanges{
CreateSandbox: sandboxChanged,
SandboxID: sandboxID,
Attempt: attempt,
ContainersToStart: make(map[int]string),
ContainersToKeep: make(map[kubecontainer.ContainerID]int),
InitContainersToKeep: make(map[kubecontainer.ContainerID]int),
ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo),
createPodSandbox, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)
changes := podActions{
KillPod: createPodSandbox,
CreateSandbox: createPodSandbox,
SandboxID: sandboxID,
Attempt: attempt,
ContainersToStart: []int{},
ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo),
}
// check the status of init containers.
initFailed := false
// always reset the init containers if the sandbox is changed.
if !sandboxChanged {
// Keep all successfully completed containers. If there are failing containers,
// only keep the first failing one.
initFailed = checkAndKeepInitContainers(pod, podStatus, changes.InitContainersToKeep)
// If we need to (re-)create the pod sandbox, everything will need to be
// killed and recreated, and init containers should be purged.
if createPodSandbox {
if !shouldRestartOnFailure(pod) && attempt != 0 {
// Should not restart the pod, just return.
return changes
}
if len(pod.Spec.InitContainers) != 0 {
// Pod has init containers, return the first one.
changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
return changes
}
// Start all containers by default but exclude the ones that succeeded if
// RestartPolicy is OnFailure.
for idx, c := range pod.Spec.Containers {
if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
continue
}
changes.ContainersToStart = append(changes.ContainersToStart, idx)
}
return changes
}
changes.InitFailed = initFailed
// Check initialization progress.
initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)
if !done {
if next != nil {
initFailed := initLastStatus != nil && isContainerFailed(initLastStatus)
if initFailed && !shouldRestartOnFailure(pod) {
changes.KillPod = true
} else {
changes.NextInitContainerToStart = next
}
}
// Initialization failed or still in progress. Skip inspecting non-init
// containers.
return changes
}
// Number of running containers to keep.
keepCount := 0
// check the status of containers.
for index, container := range pod.Spec.Containers {
for idx, container := range pod.Spec.Containers {
containerStatus := podStatus.FindContainerStatusByName(container.Name)
// If container does not exist, or is not running, check whether we
// need to restart it.
if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
glog.Info(message)
changes.ContainersToStart[index] = message
changes.ContainersToStart = append(changes.ContainersToStart, idx)
}
continue
}
if sandboxChanged {
if pod.Spec.RestartPolicy != v1.RestartPolicyNever {
message := fmt.Sprintf("Container %+v's pod sandbox is dead, the container will be recreated.", container)
glog.Info(message)
changes.ContainersToStart[index] = message
}
// The container is running, but kill the container if any of the following condition is met.
reason := ""
restart := shouldRestartOnFailure(pod)
if expectedHash, actualHash, changed := containerChanged(&container, containerStatus); changed {
reason = fmt.Sprintf("Container spec hash changed (%d vs %d).", actualHash, expectedHash)
// Restart regardless of the restart policy because the container
// spec changed.
restart = true
} else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {
// If the container failed the liveness probe, we should kill it.
reason = "Container failed liveness probe."
} else {
// Keep the container.
keepCount += 1
continue
}
if initFailed {
// Initialization failed and Container exists.
// If we have an initialization failure everything will be killed anyway.
// If RestartPolicy is Always or OnFailure we restart containers that were running before.
if pod.Spec.RestartPolicy != v1.RestartPolicyNever {
message := fmt.Sprintf("Failed to initialize pod. %q will be restarted.", container.Name)
glog.V(1).Info(message)
changes.ContainersToStart[index] = message
}
continue
// We need to kill the container, but if we also want to restart the
// container afterwards, make the intent clear in the message. Also do
// not kill the entire pod since we expect container to be running eventually.
message := reason
if restart {
message = fmt.Sprintf("%s. Container will be killed and recreated.", message)
changes.ContainersToStart = append(changes.ContainersToStart, idx)
}
expectedHash := kubecontainer.HashContainer(&container)
containerChanged := containerStatus.Hash != expectedHash
if containerChanged {
message := fmt.Sprintf("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.",
pod.Name, container.Name, containerStatus.Hash, expectedHash)
glog.Info(message)
changes.ContainersToStart[index] = message
continue
}
liveness, found := m.livenessManager.Get(containerStatus.ID)
if !found || liveness == proberesults.Success {
changes.ContainersToKeep[containerStatus.ID] = index
continue
}
if pod.Spec.RestartPolicy != v1.RestartPolicyNever {
message := fmt.Sprintf("pod %q container %q is unhealthy, it will be killed and re-created.", format.Pod(pod), container.Name)
glog.Info(message)
changes.ContainersToStart[index] = message
changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
name: containerStatus.Name,
container: &pod.Spec.Containers[idx],
message: message,
}
glog.V(2).Infof("Container %q (%q) of pod %s: %s", container.Name, containerStatus.ID, format.Pod(pod), message)
}
// Don't keep init containers if they are the only containers to keep.
if !sandboxChanged && len(changes.ContainersToStart) == 0 && len(changes.ContainersToKeep) == 0 {
changes.InitContainersToKeep = make(map[kubecontainer.ContainerID]int)
}
// compute containers to be killed
runningContainerStatuses := podStatus.GetRunningContainerStatuses()
for _, containerStatus := range runningContainerStatuses {
_, keep := changes.ContainersToKeep[containerStatus.ID]
_, keepInit := changes.InitContainersToKeep[containerStatus.ID]
if !keep && !keepInit {
var podContainer *v1.Container
var killMessage string
for i, c := range pod.Spec.Containers {
if c.Name == containerStatus.Name {
podContainer = &pod.Spec.Containers[i]
killMessage = changes.ContainersToStart[i]
break
}
}
changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
name: containerStatus.Name,
container: podContainer,
message: killMessage,
}
}
if keepCount == 0 && len(changes.ContainersToStart) == 0 {
changes.KillPod = true
}
return changes
@ -549,8 +529,8 @@ func (m *kubeGenericRuntimeManager) computePodContainerChanges(pod *v1.Pod, podS
// 6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodContainerChanges(pod, podStatus)
glog.V(3).Infof("computePodContainerChanges got %+v for pod %q", podContainerChanges, format.Pod(pod))
podContainerChanges := m.computePodActions(pod, podStatus)
glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(api.Scheme, pod)
if err != nil {
@ -559,13 +539,13 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
glog.V(4).Infof("SyncPod received new pod %q, will create a new sandbox for it", format.Pod(pod))
glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
}
}
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.CreateSandbox || (len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0) {
if len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0 {
if podContainerChanges.KillPod {
if !podContainerChanges.CreateSandbox {
glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
} else {
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
@ -577,6 +557,10 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
@ -592,7 +576,9 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
}
// Keep terminated init containers fairly aggressively controlled
m.pruneInitContainersBeforeStart(pod, podStatus, podContainerChanges.InitContainersToKeep)
// This is an optmization because container removals are typically handled
// by container garbage collector.
m.pruneInitContainersBeforeStart(pod, podStatus)
// We pass the value of the podIP down to generatePodSandboxConfig and
// generateContainerConfig, which in turn passes it to various other
@ -610,7 +596,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox && len(podContainerChanges.ContainersToStart) > 0 {
if podContainerChanges.CreateSandbox {
var msg string
var err error
@ -652,30 +638,11 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
return
}
// Step 5: start init containers.
status, next, done := findNextInitContainerToRun(pod, podStatus)
if status != nil && status.ExitCode != 0 {
// container initialization has failed, flag the pod as failed
initContainerResult := kubecontainer.NewSyncResult(kubecontainer.InitContainer, status.Name)
initContainerResult.Fail(kubecontainer.ErrRunInitContainer, fmt.Sprintf("init container %q exited with %d", status.Name, status.ExitCode))
result.AddSyncResult(initContainerResult)
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
utilruntime.HandleError(fmt.Errorf("error running pod %q init container %q, restart=Never: %#v", format.Pod(pod), status.Name, status))
return
}
utilruntime.HandleError(fmt.Errorf("Error running pod %q init container %q, restarting: %#v", format.Pod(pod), status.Name, status))
}
if next != nil {
if len(podContainerChanges.ContainersToStart) == 0 {
glog.V(4).Infof("No containers to start, stopping at init container %+v in pod %v", next.Name, format.Pod(pod))
return
}
// If we need to start the next container, do so now then exit
container := next
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
@ -692,20 +659,10 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStat
// Successfully started the container; clear the entry in the failure
glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
return
}
if !done {
// init container still running
glog.V(4).Infof("An init container is still running in pod %v", format.Pod(pod))
return
}
if podContainerChanges.InitFailed {
glog.V(4).Infof("Not all init containers have succeeded for pod %v", format.Pod(pod))
return
}
// Step 6: start containers in podContainerChanges.ContainersToStart.
for idx := range podContainerChanges.ContainersToStart {
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)

View File

@ -24,6 +24,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -226,6 +227,40 @@ func verifyFakeContainerList(fakeRuntime *apitest.FakeRuntimeService, expected [
return actual, reflect.DeepEqual(expected, actual)
}
type containerRecord struct {
container *v1.Container
attempt uint32
state runtimeapi.ContainerState
}
// Only extract the fields of interests.
type cRecord struct {
name string
attempt uint32
state runtimeapi.ContainerState
}
type cRecordList []*cRecord
func (b cRecordList) Len() int { return len(b) }
func (b cRecordList) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b cRecordList) Less(i, j int) bool {
if b[i].name != b[j].name {
return b[i].name < b[j].name
}
return b[i].attempt < b[j].attempt
}
func verifyContainerStatuses(t *testing.T, runtime *apitest.FakeRuntimeService, expected []*cRecord, desc string) {
actual := []*cRecord{}
for _, cStatus := range runtime.Containers {
actual = append(actual, &cRecord{name: cStatus.Metadata.Name, attempt: cStatus.Metadata.Attempt, state: cStatus.State})
}
sort.Sort(cRecordList(expected))
sort.Sort(cRecordList(actual))
assert.Equal(t, expected, actual, desc)
}
func TestNewKubeRuntimeManager(t *testing.T) {
_, _, _, err := createTestRuntimeManager()
assert.NoError(t, err)
@ -582,8 +617,7 @@ func TestPruneInitContainers(t *testing.T) {
podStatus, err := m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err)
keep := map[kubecontainer.ContainerID]int{}
m.pruneInitContainersBeforeStart(pod, podStatus, keep)
m.pruneInitContainersBeforeStart(pod, podStatus)
expectedContainers := []string{fakes[0].Id, fakes[2].Id}
if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok {
t.Errorf("expected %q, got %q", expectedContainers, actual)
@ -625,18 +659,6 @@ func TestSyncPodWithInitContainers(t *testing.T) {
},
}
// buildContainerID is an internal helper function to build container id from api pod
// and container with default attempt number 0.
buildContainerID := func(pod *v1.Pod, container v1.Container) string {
uid := string(pod.UID)
sandboxID := apitest.BuildSandboxName(&runtimeapi.PodSandboxMetadata{
Name: pod.Name,
Uid: uid,
Namespace: pod.Namespace,
})
return apitest.BuildContainerName(&runtimeapi.ContainerMetadata{Name: container.Name}, sandboxID)
}
backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
// 1. should only create the init container.
@ -644,34 +666,406 @@ func TestSyncPodWithInitContainers(t *testing.T) {
assert.NoError(t, err)
result := m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff)
assert.NoError(t, result.Error())
assert.Equal(t, 1, len(fakeRuntime.Containers))
initContainerID := buildContainerID(pod, initContainers[0])
expectedContainers := []string{initContainerID}
if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok {
t.Errorf("expected %q, got %q", expectedContainers, actual)
expected := []*cRecord{
{name: initContainers[0].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_RUNNING},
}
verifyContainerStatuses(t, fakeRuntime, expected, "start only the init container")
// 2. should not create app container because init container is still running.
podStatus, err = m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err)
result = m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff)
assert.NoError(t, result.Error())
assert.Equal(t, 1, len(fakeRuntime.Containers))
expectedContainers = []string{initContainerID}
if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok {
t.Errorf("expected %q, got %q", expectedContainers, actual)
}
verifyContainerStatuses(t, fakeRuntime, expected, "init container still running; do nothing")
// 3. should create all app containers because init container finished.
fakeRuntime.StopContainer(initContainerID, 0)
// Stop init container instance 0.
sandboxIDs, err := m.getSandboxIDByPodUID(pod.UID, nil)
require.NoError(t, err)
sandboxID := sandboxIDs[0]
initID0, err := fakeRuntime.GetContainerID(sandboxID, initContainers[0].Name, 0)
require.NoError(t, err)
fakeRuntime.StopContainer(initID0, 0)
// Sync again.
podStatus, err = m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err)
result = m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff)
assert.NoError(t, result.Error())
assert.Equal(t, 3, len(fakeRuntime.Containers))
expectedContainers = []string{initContainerID, buildContainerID(pod, containers[0]),
buildContainerID(pod, containers[1])}
if actual, ok := verifyFakeContainerList(fakeRuntime, expectedContainers); !ok {
t.Errorf("expected %q, got %q", expectedContainers, actual)
expected = []*cRecord{
{name: initContainers[0].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_EXITED},
{name: containers[0].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_RUNNING},
{name: containers[1].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_RUNNING},
}
verifyContainerStatuses(t, fakeRuntime, expected, "init container completed; all app containers should be running")
// 4. should restart the init container if needed to create a new podsandbox
// Stop the pod sandbox.
fakeRuntime.StopPodSandbox(sandboxID)
// Sync again.
podStatus, err = m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err)
result = m.SyncPod(pod, v1.PodStatus{}, podStatus, []v1.Secret{}, backOff)
assert.NoError(t, result.Error())
expected = []*cRecord{
// The first init container instance is purged and no longer visible.
// The second (attempt == 1) instance has been started and is running.
{name: initContainers[0].Name, attempt: 1, state: runtimeapi.ContainerState_CONTAINER_RUNNING},
// All containers are killed.
{name: containers[0].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_EXITED},
{name: containers[1].Name, attempt: 0, state: runtimeapi.ContainerState_CONTAINER_EXITED},
}
verifyContainerStatuses(t, fakeRuntime, expected, "kill all app containers, purge the existing init container, and restart a new one")
}
// A helper function to get a basic pod and its status assuming all sandbox and
// containers are running and ready.
func makeBasePodAndStatus() (*v1.Pod, *kubecontainer.PodStatus) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345678",
Name: "foo",
Namespace: "foo-ns",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "foo1",
Image: "busybox",
},
{
Name: "foo2",
Image: "busybox",
},
{
Name: "foo3",
Image: "busybox",
},
},
},
}
status := &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
SandboxStatuses: []*runtimeapi.PodSandboxStatus{
{
Id: "sandboxID",
State: runtimeapi.PodSandboxState_SANDBOX_READY,
Metadata: &runtimeapi.PodSandboxMetadata{Name: pod.Name, Namespace: pod.Namespace, Uid: "sandboxuid", Attempt: uint32(0)},
},
},
ContainerStatuses: []*kubecontainer.ContainerStatus{
{
ID: kubecontainer.ContainerID{ID: "id1"},
Name: "foo1", State: kubecontainer.ContainerStateRunning,
Hash: kubecontainer.HashContainer(&pod.Spec.Containers[0]),
},
{
ID: kubecontainer.ContainerID{ID: "id2"},
Name: "foo2", State: kubecontainer.ContainerStateRunning,
Hash: kubecontainer.HashContainer(&pod.Spec.Containers[1]),
},
{
ID: kubecontainer.ContainerID{ID: "id3"},
Name: "foo3", State: kubecontainer.ContainerStateRunning,
Hash: kubecontainer.HashContainer(&pod.Spec.Containers[2]),
},
},
}
return pod, status
}
func TestComputePodActions(t *testing.T) {
_, _, m, err := createTestRuntimeManager()
require.NoError(t, err)
// Createing a pair reference pod and status for the test cases to refer
// the specific fields.
basePod, baseStatus := makeBasePodAndStatus()
noAction := podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{},
ContainersToKill: map[kubecontainer.ContainerID]containerToKillInfo{},
}
for desc, test := range map[string]struct {
mutatePodFn func(*v1.Pod)
mutateStatusFn func(*kubecontainer.PodStatus)
actions podActions
}{
"everying is good; do nothing": {
actions: noAction,
},
"start pod sandbox and all containers for a new pod": {
mutateStatusFn: func(status *kubecontainer.PodStatus) {
// No container or sandbox exists.
status.SandboxStatuses = []*runtimeapi.PodSandboxStatus{}
status.ContainerStatuses = []*kubecontainer.ContainerStatus{}
},
actions: podActions{
KillPod: true,
CreateSandbox: true,
Attempt: uint32(0),
ContainersToStart: []int{0, 1, 2},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"restart exited containers if RestartPolicy == Always": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
// The first container completed, restart it,
status.ContainerStatuses[0].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[0].ExitCode = 0
// The second container exited with failure, restart it,
status.ContainerStatuses[1].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[1].ExitCode = 111
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{0, 1},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"restart failed containers if RestartPolicy == OnFailure": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyOnFailure },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
// The first container completed, don't restart it,
status.ContainerStatuses[0].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[0].ExitCode = 0
// The second container exited with failure, restart it,
status.ContainerStatuses[1].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[1].ExitCode = 111
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{1},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"don't restart containers if RestartPolicy == Never": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyNever },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
// Don't restart any containers.
status.ContainerStatuses[0].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[0].ExitCode = 0
status.ContainerStatuses[1].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[1].ExitCode = 111
},
actions: noAction,
},
"Kill pod and recreate everything if the pod sandbox is dead, and RestartPolicy == Always": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.SandboxStatuses[0].State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
},
actions: podActions{
KillPod: true,
CreateSandbox: true,
SandboxID: baseStatus.SandboxStatuses[0].Id,
Attempt: uint32(1),
ContainersToStart: []int{0, 1, 2},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"Kill pod and recreate all containers (except for the succeeded one) if the pod sandbox is dead, and RestartPolicy == OnFailure": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyOnFailure },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.SandboxStatuses[0].State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
status.ContainerStatuses[1].State = kubecontainer.ContainerStateExited
status.ContainerStatuses[1].ExitCode = 0
},
actions: podActions{
KillPod: true,
CreateSandbox: true,
SandboxID: baseStatus.SandboxStatuses[0].Id,
Attempt: uint32(1),
ContainersToStart: []int{0, 2},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"Kill and recreate the container if the container's spec changed": {
mutatePodFn: func(pod *v1.Pod) {
pod.Spec.RestartPolicy = v1.RestartPolicyAlways
},
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[1].Hash = uint64(432423432)
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToKill: getKillMap(basePod, baseStatus, []int{1}),
ContainersToStart: []int{1},
},
// TODO: Add a test case for containers which failed the liveness
// check. Will need to fake the livessness check result.
},
} {
pod, status := makeBasePodAndStatus()
if test.mutatePodFn != nil {
test.mutatePodFn(pod)
}
if test.mutateStatusFn != nil {
test.mutateStatusFn(status)
}
actions := m.computePodActions(pod, status)
verifyActions(t, &test.actions, &actions, desc)
}
}
func getKillMap(pod *v1.Pod, status *kubecontainer.PodStatus, cIndexes []int) map[kubecontainer.ContainerID]containerToKillInfo {
m := map[kubecontainer.ContainerID]containerToKillInfo{}
for _, i := range cIndexes {
m[status.ContainerStatuses[i].ID] = containerToKillInfo{
container: &pod.Spec.Containers[i],
name: pod.Spec.Containers[i].Name,
}
}
return m
}
func verifyActions(t *testing.T, expected, actual *podActions, desc string) {
if actual.ContainersToKill != nil {
// Clear the message field since we don't need to verify the message.
for k, info := range actual.ContainersToKill {
info.message = ""
actual.ContainersToKill[k] = info
}
}
assert.Equal(t, expected, actual, desc)
}
func TestComputePodActionsWithInitContainers(t *testing.T) {
_, _, m, err := createTestRuntimeManager()
require.NoError(t, err)
// Createing a pair reference pod and status for the test cases to refer
// the specific fields.
basePod, baseStatus := makeBasePodAndStatusWithInitContainers()
noAction := podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{},
ContainersToKill: map[kubecontainer.ContainerID]containerToKillInfo{},
}
for desc, test := range map[string]struct {
mutatePodFn func(*v1.Pod)
mutateStatusFn func(*kubecontainer.PodStatus)
actions podActions
}{
"initialization completed; start all containers": {
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{0, 1, 2},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"initialization in progress; do nothing": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[2].State = kubecontainer.ContainerStateRunning
},
actions: noAction,
},
"Kill pod and restart the first init container if the pod sandbox is dead": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.SandboxStatuses[0].State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
},
actions: podActions{
KillPod: true,
CreateSandbox: true,
SandboxID: baseStatus.SandboxStatuses[0].Id,
Attempt: uint32(1),
NextInitContainerToStart: &basePod.Spec.InitContainers[0],
ContainersToStart: []int{},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"initialization failed; restart the last init container if RestartPolicy == Always": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyAlways },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[2].ExitCode = 137
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
NextInitContainerToStart: &basePod.Spec.InitContainers[2],
ContainersToStart: []int{},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"initialization failed; restart the last init container if RestartPolicy == OnFailure": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyOnFailure },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[2].ExitCode = 137
},
actions: podActions{
SandboxID: baseStatus.SandboxStatuses[0].Id,
NextInitContainerToStart: &basePod.Spec.InitContainers[2],
ContainersToStart: []int{},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
"initialization failed; kill pod if RestartPolicy == Never": {
mutatePodFn: func(pod *v1.Pod) { pod.Spec.RestartPolicy = v1.RestartPolicyNever },
mutateStatusFn: func(status *kubecontainer.PodStatus) {
status.ContainerStatuses[2].ExitCode = 137
},
actions: podActions{
KillPod: true,
SandboxID: baseStatus.SandboxStatuses[0].Id,
ContainersToStart: []int{},
ContainersToKill: getKillMap(basePod, baseStatus, []int{}),
},
},
} {
pod, status := makeBasePodAndStatusWithInitContainers()
if test.mutatePodFn != nil {
test.mutatePodFn(pod)
}
if test.mutateStatusFn != nil {
test.mutateStatusFn(status)
}
actions := m.computePodActions(pod, status)
verifyActions(t, &test.actions, &actions, desc)
}
}
func makeBasePodAndStatusWithInitContainers() (*v1.Pod, *kubecontainer.PodStatus) {
pod, status := makeBasePodAndStatus()
pod.Spec.InitContainers = []v1.Container{
{
Name: "init1",
Image: "bar-image",
},
{
Name: "init2",
Image: "bar-image",
},
{
Name: "init3",
Image: "bar-image",
},
}
// Replace the original statuses of the containers with those for the init
// containers.
status.ContainerStatuses = []*kubecontainer.ContainerStatus{
{
ID: kubecontainer.ContainerID{ID: "initid1"},
Name: "init1", State: kubecontainer.ContainerStateExited,
Hash: kubecontainer.HashContainer(&pod.Spec.InitContainers[0]),
},
{
ID: kubecontainer.ContainerID{ID: "initid2"},
Name: "init2", State: kubecontainer.ContainerStateExited,
Hash: kubecontainer.HashContainer(&pod.Spec.InitContainers[0]),
},
{
ID: kubecontainer.ContainerID{ID: "initid3"},
Name: "init3", State: kubecontainer.ContainerStateExited,
Hash: kubecontainer.HashContainer(&pod.Spec.InitContainers[0]),
},
}
return pod, status
}