mirror of https://github.com/k3s-io/k3s
Kubelet: move active deadline check to per pod worker
Per-pod workers have sufficient knowledge to determine whether a pod has exceeded the active deadline, and they set the status at the end of each sync. Move the active deadline check to generatePodStatus so that per pod workers can update the pod status directly. This eliminates the possibility of a race condition where both SyncPods and the pod worker are updating the status, which could lead to temporary erratic pod status behavior (pod phase: failed -> running -> failed).pull/6/head
parent
25668ccc11
commit
050b8ba60b
|
@ -1151,37 +1151,33 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
|
|||
return nil
|
||||
}
|
||||
|
||||
// filterOutPodsPastActiveDeadline filters pods with an ActiveDeadlineSeconds value that has been exceeded.
|
||||
// It records an event that the pod has been active longer than the allocated time, and updates the pod status as failed.
|
||||
// By filtering the pod from the result set, the Kubelet will kill the pod's containers as part of normal SyncPods workflow.
|
||||
func (kl *Kubelet) filterOutPodsPastActiveDeadline(allPods []*api.Pod) (pods []*api.Pod) {
|
||||
// pastActiveDeadline returns true if the pod has been active for more than
|
||||
// ActiveDeadlineSeconds.
|
||||
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
|
||||
now := util.Now()
|
||||
for _, pod := range allPods {
|
||||
keepPod := true
|
||||
if pod.Spec.ActiveDeadlineSeconds != nil {
|
||||
podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if !ok {
|
||||
podStatus = pod.Status
|
||||
}
|
||||
if !podStatus.StartTime.IsZero() {
|
||||
startTime := podStatus.StartTime.Time
|
||||
duration := now.Time.Sub(startTime)
|
||||
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
|
||||
if duration >= allowedDuration {
|
||||
keepPod = false
|
||||
}
|
||||
}
|
||||
if pod.Spec.ActiveDeadlineSeconds != nil {
|
||||
podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
|
||||
if !ok {
|
||||
podStatus = pod.Status
|
||||
}
|
||||
if keepPod {
|
||||
pods = append(pods, pod)
|
||||
} else {
|
||||
kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline")
|
||||
kl.statusManager.SetPodStatus(pod, api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod was active on the node longer than specified deadline"})
|
||||
if !podStatus.StartTime.IsZero() {
|
||||
startTime := podStatus.StartTime.Time
|
||||
duration := now.Time.Sub(startTime)
|
||||
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
|
||||
if duration >= allowedDuration {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return pods
|
||||
return false
|
||||
}
|
||||
|
||||
//podIsTerminated returns true if status is in one of the terminated state.
|
||||
func podIsTerminated(status *api.PodStatus) bool {
|
||||
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Filter out pods in the terminated state ("Failed" or "Succeeded").
|
||||
|
@ -1197,8 +1193,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
|
|||
// restarted.
|
||||
status = pod.Status
|
||||
}
|
||||
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
|
||||
// Pod has reached the final state; ignore it.
|
||||
if podIsTerminated(&status) {
|
||||
continue
|
||||
}
|
||||
pods = append(pods, pod)
|
||||
|
@ -1487,8 +1482,6 @@ func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metr
|
|||
// These two conditions could be alleviated by checkpointing kubelet.
|
||||
pods := kl.filterOutTerminatedPods(allPods)
|
||||
|
||||
pods = kl.filterOutPodsPastActiveDeadline(pods)
|
||||
|
||||
// Respect the pod creation order when resolving conflicts.
|
||||
sort.Sort(podsByCreationTime(pods))
|
||||
|
||||
|
@ -1922,8 +1915,15 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
|||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
glog.V(3).Infof("Generating status for %q", podFullName)
|
||||
|
||||
spec := &pod.Spec
|
||||
// TODO: Consider include the container information.
|
||||
if kl.pastActiveDeadline(pod) {
|
||||
kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline")
|
||||
return api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "Pod was active on the node longer than specified deadline"}, nil
|
||||
}
|
||||
|
||||
spec := &pod.Spec
|
||||
podStatus, err := kl.containerRuntime.GetPodStatus(pod)
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -4472,7 +4472,7 @@ func TestMakePortMappings(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFilterOutPodsPastActiveDeadline(t *testing.T) {
|
||||
func TestIsPodPastActiveDeadline(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
kubelet := testKubelet.kubelet
|
||||
pods := newTestPods(5)
|
||||
|
@ -4485,23 +4485,21 @@ func TestFilterOutPodsPastActiveDeadline(t *testing.T) {
|
|||
pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
|
||||
pods[1].Status.StartTime = &startTime
|
||||
pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds
|
||||
expected := []*api.Pod{pods[1], pods[2], pods[3], pods[4]}
|
||||
tests := []struct {
|
||||
pod *api.Pod
|
||||
expected bool
|
||||
}{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}, {pods[4], false}}
|
||||
|
||||
kubelet.podManager.SetPods(pods)
|
||||
actual := kubelet.filterOutPodsPastActiveDeadline(pods)
|
||||
if !reflect.DeepEqual(expected, actual) {
|
||||
expectedNames := ""
|
||||
for _, pod := range expected {
|
||||
expectedNames = expectedNames + pod.Name + " "
|
||||
for i, tt := range tests {
|
||||
actual := kubelet.pastActiveDeadline(tt.pod)
|
||||
if actual != tt.expected {
|
||||
t.Errorf("[%d] expected %#v, got %#v", i, tt.expected, actual)
|
||||
}
|
||||
actualNames := ""
|
||||
for _, pod := range actual {
|
||||
actualNames = actualNames + pod.Name + " "
|
||||
}
|
||||
t.Errorf("expected %#v, got %#v", expectedNames, actualNames)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) {
|
||||
func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
|
@ -4559,27 +4557,22 @@ func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
// Let the pod worker sets the status to fail after this sync.
|
||||
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "stop", "inspect_container", "stop", "list"})
|
||||
|
||||
// A map iteration is used to delete containers, so must not depend on
|
||||
// order here.
|
||||
expectedToStop := map[string]bool{
|
||||
"1234": true,
|
||||
"9876": true,
|
||||
podFullName := kubecontainer.GetPodFullName(pods[0])
|
||||
status, found := kubelet.statusManager.GetPodStatus(podFullName)
|
||||
if !found {
|
||||
t.Errorf("expected to found status for pod %q", status)
|
||||
}
|
||||
if len(fakeDocker.Stopped) != 2 ||
|
||||
!expectedToStop[fakeDocker.Stopped[0]] ||
|
||||
!expectedToStop[fakeDocker.Stopped[1]] {
|
||||
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
|
||||
if status.Phase != api.PodFailed {
|
||||
t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) {
|
||||
func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
|
||||
kubelet := testKubelet.kubelet
|
||||
|
@ -4642,12 +4635,12 @@ func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
"list", "list", "list",
|
||||
// Get pod status.
|
||||
"inspect_container", "inspect_container",
|
||||
// Check the pod infra container.
|
||||
"inspect_container",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container", "list"})
|
||||
podFullName := kubecontainer.GetPodFullName(pods[0])
|
||||
status, found := kubelet.statusManager.GetPodStatus(podFullName)
|
||||
if !found {
|
||||
t.Errorf("expected to found status for pod %q", status)
|
||||
}
|
||||
if status.Phase == api.PodFailed {
|
||||
t.Fatalf("expected pod status to not be %q", status.Phase)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,6 +81,9 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
|
|||
}
|
||||
|
||||
// if the status has no start time, we need to set an initial time
|
||||
// TODO(yujuhong): Consider setting StartTime when generating the pod
|
||||
// status instead, which would allow statusManager to become a simple cache
|
||||
// again.
|
||||
if status.StartTime.IsZero() {
|
||||
if pod.Status.StartTime.IsZero() {
|
||||
// the pod did not have a previously recorded value so set to now
|
||||
|
|
Loading…
Reference in New Issue