From b0e6926f67048f0cf549bcfe4c55be67b490c2d5 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 24 Apr 2015 11:20:23 -0700 Subject: [PATCH] kubelet: filter out terminated pods in SyncPods Once a pod reaches a terminated state (whether failed or succeeded), it should not transit out ever again. Currently, kubelet relies on examining the dead containers to verify that the container has already been run. This is fine in most cases, but if the dead containers were garbage collected, kubelet may falsely concluded that the pod has never been run. It would then try to restart all the containers. This change eliminates most of such possibilities by pre-filtering out the pods in the final states before sending updates to per-pod workers. --- pkg/kubelet/kubelet.go | 44 +++++++++++++++++++++++++++++-------- pkg/kubelet/kubelet_test.go | 29 ++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a82e6c6d39..ad67a9efbb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1430,6 +1430,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Con return nil } +// Filter out pods in the terminated state ("Failed" or "Succeeded"). +func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { + var pods []*api.Pod + for _, pod := range allPods { + var status api.PodStatus + // Check the cached pod status which was set after the last sync. + status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !ok { + // If there is no cached status, use the status from the + // apiserver. This is useful if kubelet has recently been + // restarted. + status = pod.Status + } + if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { + // Pod has reached the final state; ignore it. + continue + } + pods = append(pods, pod) + } + return pods +} + // SyncPods synchronizes the configured list of pods (desired state) with the host current state. func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod, start time.Time) error { @@ -1444,16 +1466,20 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri } kl.statusManager.RemoveOrphanedStatuses(podFullNames) - // Filter out the rejected pod. They don't have running containers. + // Reject pods that we cannot run. kl.handleNotFittingPods(allPods) - var pods []*api.Pod - for _, pod := range allPods { - status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) - if ok && status.Phase == api.PodFailed { - continue - } - pods = append(pods, pod) - } + + // Pod phase progresses monotonically. Once a pod has reached a final state, + // it should never leave irregardless of the restart policy. The statuses + // of such pods should not be changed, and there is no need to sync them. + // TODO: the logic here does not handle two cases: + // 1. If the containers were removed immediately after they died, kubelet + // may fail to generate correct statuses, let alone filtering correctly. + // 2. If kubelet restarted before writing the terminated status for a pod + // to the apiserver, it could still restart the terminated pod (even + // though the pod was not considered terminated by the apiserver). + // These two conditions could be alleviated by checkpointing kubelet. + pods := kl.filterOutTerminatedPods(allPods) glog.V(4).Infof("Desired: %#v", pods) var err error diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index bb8c934e7f..45e4358487 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -168,6 +168,18 @@ func verifyBoolean(t *testing.T, expected, value bool) { } } +func newTestPods(count int) []*api.Pod { + pods := make([]*api.Pod, count) + for i := 0; i < count; i++ { + pods[i] = &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("pod%d", i), + }, + } + } + return pods +} + func TestKubeletDirs(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet @@ -4246,3 +4258,20 @@ func TestGetRestartCount(t *testing.T) { fakeDocker.ExitedContainerList = []docker.APIContainers{} verifyRestartCount(&pod, 2) } + +func TestFilterOutTerminatedPods(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + pods := newTestPods(5) + pods[0].Status.Phase = api.PodFailed + pods[1].Status.Phase = api.PodSucceeded + pods[2].Status.Phase = api.PodRunning + pods[3].Status.Phase = api.PodPending + + expected := []*api.Pod{pods[2], pods[3], pods[4]} + kubelet.podManager.SetPods(pods) + actual := kubelet.filterOutTerminatedPods(pods) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } +}