mirror of https://github.com/k3s-io/k3s
Merge pull request #7301 from yujuhong/no_resurrection
Kubelet: filter out terminated pods in SyncPodspull/6/head
commit
d0288f7143
|
@ -1430,6 +1430,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Con
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Filter out pods in the terminated state ("Failed" or "Succeeded").
|
||||||
|
func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
|
||||||
|
var pods []*api.Pod
|
||||||
|
for _, pod := range allPods {
|
||||||
|
var status api.PodStatus
|
||||||
|
// Check the cached pod status which was set after the last sync.
|
||||||
|
status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||||
|
if !ok {
|
||||||
|
// If there is no cached status, use the status from the
|
||||||
|
// apiserver. This is useful if kubelet has recently been
|
||||||
|
// restarted.
|
||||||
|
status = pod.Status
|
||||||
|
}
|
||||||
|
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
|
||||||
|
// Pod has reached the final state; ignore it.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pods = append(pods, pod)
|
||||||
|
}
|
||||||
|
return pods
|
||||||
|
}
|
||||||
|
|
||||||
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
|
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
|
||||||
func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType,
|
func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType,
|
||||||
mirrorPods map[string]*api.Pod, start time.Time) error {
|
mirrorPods map[string]*api.Pod, start time.Time) error {
|
||||||
|
@ -1444,16 +1466,20 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
|
||||||
}
|
}
|
||||||
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
|
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
|
||||||
|
|
||||||
// Filter out the rejected pod. They don't have running containers.
|
// Reject pods that we cannot run.
|
||||||
kl.handleNotFittingPods(allPods)
|
kl.handleNotFittingPods(allPods)
|
||||||
var pods []*api.Pod
|
|
||||||
for _, pod := range allPods {
|
// Pod phase progresses monotonically. Once a pod has reached a final state,
|
||||||
status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
// it should never leave irregardless of the restart policy. The statuses
|
||||||
if ok && status.Phase == api.PodFailed {
|
// of such pods should not be changed, and there is no need to sync them.
|
||||||
continue
|
// TODO: the logic here does not handle two cases:
|
||||||
}
|
// 1. If the containers were removed immediately after they died, kubelet
|
||||||
pods = append(pods, pod)
|
// may fail to generate correct statuses, let alone filtering correctly.
|
||||||
}
|
// 2. If kubelet restarted before writing the terminated status for a pod
|
||||||
|
// to the apiserver, it could still restart the terminated pod (even
|
||||||
|
// though the pod was not considered terminated by the apiserver).
|
||||||
|
// These two conditions could be alleviated by checkpointing kubelet.
|
||||||
|
pods := kl.filterOutTerminatedPods(allPods)
|
||||||
|
|
||||||
glog.V(4).Infof("Desired: %#v", pods)
|
glog.V(4).Infof("Desired: %#v", pods)
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -168,6 +168,18 @@ func verifyBoolean(t *testing.T, expected, value bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTestPods(count int) []*api.Pod {
|
||||||
|
pods := make([]*api.Pod, count)
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
pods[i] = &api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod%d", i),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pods
|
||||||
|
}
|
||||||
|
|
||||||
func TestKubeletDirs(t *testing.T) {
|
func TestKubeletDirs(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
|
@ -4246,3 +4258,20 @@ func TestGetRestartCount(t *testing.T) {
|
||||||
fakeDocker.ExitedContainerList = []docker.APIContainers{}
|
fakeDocker.ExitedContainerList = []docker.APIContainers{}
|
||||||
verifyRestartCount(&pod, 2)
|
verifyRestartCount(&pod, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFilterOutTerminatedPods(t *testing.T) {
|
||||||
|
testKubelet := newTestKubelet(t)
|
||||||
|
kubelet := testKubelet.kubelet
|
||||||
|
pods := newTestPods(5)
|
||||||
|
pods[0].Status.Phase = api.PodFailed
|
||||||
|
pods[1].Status.Phase = api.PodSucceeded
|
||||||
|
pods[2].Status.Phase = api.PodRunning
|
||||||
|
pods[3].Status.Phase = api.PodPending
|
||||||
|
|
||||||
|
expected := []*api.Pod{pods[2], pods[3], pods[4]}
|
||||||
|
kubelet.podManager.SetPods(pods)
|
||||||
|
actual := kubelet.filterOutTerminatedPods(pods)
|
||||||
|
if !reflect.DeepEqual(expected, actual) {
|
||||||
|
t.Errorf("expected %#v, got %#v", expected, actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue