Kubelet should garbage collect dead pods

The sync loop should check for terminated pods that are no longer
running and clear them. The status loop should never write status
if the pod UID changes. Mirror pods should be deleted immediately
rather than gracefully.
pull/6/head
Clayton Coleman 2015-07-28 16:06:05 -04:00
parent 71d10c6c7a
commit 780accb3ba
5 changed files with 91 additions and 1 deletions

View File

@ -1261,6 +1261,10 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con
gracePeriod = minimumGracePeriodInSeconds
}
err := dm.client.StopContainer(ID, uint(gracePeriod))
if _, ok := err.(*docker.ContainerNotRunning); ok && err != nil {
glog.V(4).Infof("Container %q has already exited", name)
return nil
}
if err == nil {
glog.V(2).Infof("Container %q exited after %s", name, util.Now().Sub(start.Time))
} else {

View File

@ -1367,6 +1367,32 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
return nil
}
// Delete any pods that are no longer running and are marked for deletion.
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
var terminating []*api.Pod
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
found := false
for _, runningPod := range runningPods {
if runningPod.ID == pod.UID {
found = true
break
}
}
if found {
podFullName := kubecontainer.GetPodFullName(pod)
glog.V(5).Infof("Keeping terminated pod %q and uid %q, still running", podFullName, pod.UID)
continue
}
terminating = append(terminating, pod)
}
}
if !kl.statusManager.TerminatePods(terminating) {
return errors.New("not all pods were successfully terminated")
}
return nil
}
// pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
@ -1529,6 +1555,10 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro
// Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods()
if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
glog.Errorf("Failed to cleanup terminated pods: %v", err)
}
return err
}

View File

@ -64,7 +64,7 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
return err
}
glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
if err := mc.apiserverClient.Pods(namespace).Delete(name, nil); err != nil {
if err := mc.apiserverClient.Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
}
return nil

View File

@ -131,6 +131,29 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
}
}
// TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will
// return false
func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
sent := true
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
for _, pod := range pods {
for i := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
}
}
select {
case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}:
default:
sent = false
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod))
}
}
return sent
}
func (s *statusManager) DeletePodStatus(podFullName string) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
@ -167,6 +190,10 @@ func (s *statusManager) syncBatch() error {
return nil
}
if err == nil {
if len(pod.UID) > 0 && statusPod.UID != pod.UID {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletUtil.FormatPodName(pod))
return nil
}
statusPod.Status = status
// TODO: handle conflict as a retry, make that easier too.
statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)

View File

@ -153,8 +153,21 @@ func TestUnchangedStatus(t *testing.T) {
verifyUpdates(t, syncer, 1)
}
func TestSyncBatchIgnoresNotFound(t *testing.T) {
syncer := newTestStatusManager()
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
}
func TestSyncBatch(t *testing.T) {
syncer := newTestStatusManager()
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
@ -167,6 +180,22 @@ func TestSyncBatch(t *testing.T) {
)
}
func TestSyncBatchChecksMismatchedUID(t *testing.T) {
syncer := newTestStatusManager()
testPod.UID = "first"
differentPod := *testPod
differentPod.UID = "second"
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(&differentPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
}
// shuffle returns a new shuffled list of container statuses.
func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus {
numStatuses := len(statuses)