kubelet: filter out terminated pods in SyncPods

Once a pod reaches a terminated state (whether failed or succeeded), it should
not transit out ever again. Currently, kubelet relies on examining the dead
containers to verify that the container has already been run. This is fine
in most cases, but if the dead containers were garbage collected, kubelet may
falsely concluded that the pod has never been run. It would then try to restart
all the containers.

This change eliminates most of such possibilities by pre-filtering out the pods
in the final states before sending updates to per-pod workers.
pull/6/head
Yu-Ju Hong 2015-04-24 11:20:23 -07:00
parent ee5cad84e0
commit b0e6926f67
2 changed files with 64 additions and 9 deletions

View File

@ -1430,6 +1430,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Con
return nil return nil
} }
// Filter out pods in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
var pods []*api.Pod
for _, pod := range allPods {
var status api.PodStatus
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
if !ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
status = pod.Status
}
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
// Pod has reached the final state; ignore it.
continue
}
pods = append(pods, pod)
}
return pods
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state. // SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) error { mirrorPods map[string]*api.Pod, start time.Time) error {
@ -1444,16 +1466,20 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
} }
kl.statusManager.RemoveOrphanedStatuses(podFullNames) kl.statusManager.RemoveOrphanedStatuses(podFullNames)
// Filter out the rejected pod. They don't have running containers. // Reject pods that we cannot run.
kl.handleNotFittingPods(allPods) kl.handleNotFittingPods(allPods)
var pods []*api.Pod
for _, pod := range allPods { // Pod phase progresses monotonically. Once a pod has reached a final state,
status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) // it should never leave irregardless of the restart policy. The statuses
if ok && status.Phase == api.PodFailed { // of such pods should not be changed, and there is no need to sync them.
continue // TODO: the logic here does not handle two cases:
} // 1. If the containers were removed immediately after they died, kubelet
pods = append(pods, pod) // may fail to generate correct statuses, let alone filtering correctly.
} // 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
pods := kl.filterOutTerminatedPods(allPods)
glog.V(4).Infof("Desired: %#v", pods) glog.V(4).Infof("Desired: %#v", pods)
var err error var err error

View File

@ -168,6 +168,18 @@ func verifyBoolean(t *testing.T, expected, value bool) {
} }
} }
func newTestPods(count int) []*api.Pod {
pods := make([]*api.Pod, count)
for i := 0; i < count; i++ {
pods[i] = &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("pod%d", i),
},
}
}
return pods
}
func TestKubeletDirs(t *testing.T) { func TestKubeletDirs(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
@ -4246,3 +4258,20 @@ func TestGetRestartCount(t *testing.T) {
fakeDocker.ExitedContainerList = []docker.APIContainers{} fakeDocker.ExitedContainerList = []docker.APIContainers{}
verifyRestartCount(&pod, 2) verifyRestartCount(&pod, 2)
} }
func TestFilterOutTerminatedPods(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
pods := newTestPods(5)
pods[0].Status.Phase = api.PodFailed
pods[1].Status.Phase = api.PodSucceeded
pods[2].Status.Phase = api.PodRunning
pods[3].Status.Phase = api.PodPending
expected := []*api.Pod{pods[2], pods[3], pods[4]}
kubelet.podManager.SetPods(pods)
actual := kubelet.filterOutTerminatedPods(pods)
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
}