Move active deadline check into main syncLoop of kubelet.

pull/6/head
Lantao Liu 2015-11-04 21:59:15 -08:00
parent 0932755036
commit d6b93cdfe1
2 changed files with 82 additions and 7 deletions

View File

@ -1513,6 +1513,9 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} else {
var err error
podStatus, err = kl.generatePodStatus(pod)
// TODO (random-liu) It's strange that generatePodStatus generates some podStatus in
// the phase Failed, Pending etc, even with empty ContainerStatuses but still keep going
// on. Maybe need refactor here.
if err != nil {
glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err)
return err
@ -1764,6 +1767,31 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
return false
}
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * pod past the active deadline.
func (kl *Kubelet) getPodsToSync() []*api.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
podUIDSet := sets.NewString()
for _, podUID := range podUIDs {
podUIDSet.Insert(string(podUID))
}
var podsToSync []*api.Pod
for _, pod := range allPods {
if kl.pastActiveDeadline(pod) {
// The pod has passed the active deadline
podsToSync = append(podsToSync, pod)
continue
}
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
}
}
return podsToSync
}
// Returns true if pod is in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool {
var status api.PodStatus
@ -2118,13 +2146,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
glog.Errorf("Kubelet does not support snapshot update")
}
case <-syncCh:
podUIDs := kl.workQueue.GetWork()
var podsToSync []*api.Pod
for _, uid := range podUIDs {
if pod, ok := kl.podManager.GetPodByUID(uid); ok {
podsToSync = append(podsToSync, pod)
}
}
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}

View File

@ -4014,3 +4014,56 @@ func TestExtractBandwidthResources(t *testing.T) {
}
}
}
func TestGetPodsToSync(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
pods := newTestPods(5)
podUIDs := []types.UID{}
for _, pod := range pods {
podUIDs = append(podUIDs, pod.UID)
}
exceededActiveDeadlineSeconds := int64(30)
notYetActiveDeadlineSeconds := int64(120)
now := unversioned.Now()
startTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute))
pods[0].Status.StartTime = &startTime
pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
pods[1].Status.StartTime = &startTime
pods[1].Spec.ActiveDeadlineSeconds = &notYetActiveDeadlineSeconds
pods[2].Status.StartTime = &startTime
pods[2].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
kubelet.podManager.SetPods(pods)
kubelet.workQueue.Enqueue(pods[2].UID, 0)
kubelet.workQueue.Enqueue(pods[3].UID, 0)
kubelet.workQueue.Enqueue(pods[4].UID, time.Hour)
expectedPodsUID := []types.UID{pods[0].UID, pods[2].UID, pods[3].UID}
podsToSync := kubelet.getPodsToSync()
if len(podsToSync) == len(expectedPodsUID) {
var rightNum int
for _, podUID := range expectedPodsUID {
for _, podToSync := range podsToSync {
if podToSync.UID == podUID {
rightNum++
break
}
}
}
if rightNum != len(expectedPodsUID) {
// Just for report error
podsToSyncUID := []types.UID{}
for _, podToSync := range podsToSync {
podsToSyncUID = append(podsToSyncUID, podToSync.UID)
}
t.Errorf("expected pods %v to sync, got %v", expectedPodsUID, podsToSyncUID)
}
} else {
t.Errorf("expected %d pods to sync, got %d", 3, len(podsToSync))
}
}