Merge pull request #28160 from saad-ali/waitForGracefulTerm

Automatic merge from submit-queue

Volume manager must verify containers terminated before deleting for ungracefully terminated pods

A pod is removed from volume manager (triggering unmount) when it is deleted from the kubelet pod manager. Kubelet deletes the pod from pod manager as soon as it receives a delete pod request. As long as the graceful termination period is non-zero, this happens after kubelet has terminated all containers for the pod. However, when graceful termination period for a pod is set to zero, the volume is deleted from pod manager *before* its containers are terminated.

This  can result in volumes getting unmounted from a pod before all containers have exited when graceful termination is set to zero.

This PR prevents that from happening by only deleting a volume from volume manager once it is deleted from the pod manager AND the kubelet containerRuntime status indicates all containers for the pod have exited. Because we do not want to call containerRuntime too frequently, we introduce a delay in the `findAndRemoveDeletedPods()` method to prevent it from executing more frequently than every two seconds.

Fixes https://github.com/kubernetes/kubernetes/issues/27691

Running test in tight loop to verify fix.
pull/6/head
k8s-merge-robot 2016-06-29 01:33:31 -07:00 committed by GitHub
commit c6fa861688
5 changed files with 104 additions and 28 deletions

View File

@ -504,7 +504,8 @@ func NewMainKubelet(
hostname,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr)
klet.volumePluginMgr,
klet.containerRuntime)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {

View File

@ -299,7 +299,8 @@ func newTestKubeletWithImageList(
kubelet.hostname,
kubelet.podManager,
fakeKubeClient,
kubelet.volumePluginMgr)
kubelet.volumePluginMgr,
fakeRuntime)
if err != nil {
t.Fatalf("failed to initialize volume manager: %v", err)
}

View File

@ -97,7 +97,8 @@ func TestRunOnce(t *testing.T) {
kb.hostname,
kb.podManager,
kb.kubeClient,
kb.volumePluginMgr)
kb.volumePluginMgr,
fakeRuntime)
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR)
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency

View File

@ -29,7 +29,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/volume/cache"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
@ -64,24 +66,31 @@ type DesiredStateOfWorldPopulator interface {
func NewDesiredStateOfWorldPopulator(
kubeClient internalclientset.Interface,
loopSleepDuration time.Duration,
getPodStatusRetryDuration time.Duration,
podManager pod.Manager,
desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator {
desiredStateOfWorld cache.DesiredStateOfWorld,
kubeContainerRuntime kubecontainer.Runtime) DesiredStateOfWorldPopulator {
return &desiredStateOfWorldPopulator{
kubeClient: kubeClient,
loopSleepDuration: loopSleepDuration,
podManager: podManager,
desiredStateOfWorld: desiredStateOfWorld,
kubeClient: kubeClient,
loopSleepDuration: loopSleepDuration,
getPodStatusRetryDuration: getPodStatusRetryDuration,
podManager: podManager,
desiredStateOfWorld: desiredStateOfWorld,
pods: processedPods{
processedPods: make(map[volumetypes.UniquePodName]bool)},
kubeContainerRuntime: kubeContainerRuntime,
}
}
type desiredStateOfWorldPopulator struct {
kubeClient internalclientset.Interface
loopSleepDuration time.Duration
podManager pod.Manager
desiredStateOfWorld cache.DesiredStateOfWorld
pods processedPods
kubeClient internalclientset.Interface
loopSleepDuration time.Duration
getPodStatusRetryDuration time.Duration
podManager pod.Manager
desiredStateOfWorld cache.DesiredStateOfWorld
pods processedPods
kubeContainerRuntime kubecontainer.Runtime
timeOfLastGetPodStatus time.Time
}
type processedPods struct {
@ -102,6 +111,20 @@ func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndAddNewPods()
// findAndRemoveDeletedPods() calls out to the container runtime to
// determine if the containers for a given pod are terminated. This is
// an expensive operation, therefore we limit the rate that
// findAndRemoveDeletedPods() is called independently of the main
// populator loop.
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
glog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return
}
dswp.findAndRemoveDeletedPods()
}
}
@ -117,19 +140,60 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
// Iterate through all pods in desired state of world, and remove if they no
// longer exist
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
var runningPods []*kubecontainer.Pod
runningPodsFetched := false
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
if _, podExists :=
dswp.podManager.GetPodByUID(volumeToMount.Pod.UID); !podExists {
glog.V(10).Infof(
"Removing volume %q (volSpec=%q) for pod %q from desired state.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName)
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName)
dswp.podManager.GetPodByUID(volumeToMount.Pod.UID); podExists {
continue
}
// Once a pod has been deleted from kubelet pod manager, do not delete
// it immediately from volume manager. Instead, check the kubelet
// containerRuntime to verify that all containers in the pod have been
// terminated.
if !runningPodsFetched {
var getPodsErr error
runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false)
if getPodsErr != nil {
glog.Errorf(
"kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.",
getPodsErr)
continue
}
runningPodsFetched = true
dswp.timeOfLastGetPodStatus = time.Now()
}
runningContainers := false
for _, runningPod := range runningPods {
if runningPod.ID == volumeToMount.Pod.UID {
if len(runningPod.Containers) > 0 {
runningContainers = true
}
break
}
}
if runningContainers {
glog.V(5).Infof(
"Pod %q has been removed from pod manager. However, it still has one or more containers in the non-exited state. Therefore it will not be removed from volume manager.",
format.Pod(volumeToMount.Pod))
continue
}
glog.V(5).Infof(
"Removing volume %q (volSpec=%q) for pod %q from desired state.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
format.Pod(volumeToMount.Pod))
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName)
}
}
@ -151,10 +215,9 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *api.Pod) {
dswp.createVolumeSpec(podVolume, pod.Namespace)
if err != nil {
glog.Errorf(
"Error processing volume %q for pod %q/%q: %v",
"Error processing volume %q for pod %q: %v",
podVolume.Name,
pod.Namespace,
pod.Name,
format.Pod(pod),
err)
continue
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/volume/cache"
@ -48,6 +49,12 @@ const (
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond
// desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of
// time the DesiredStateOfWorldPopulator loop waits between successive pod
// cleanup calls (to prevent calling containerruntime.GetPodStatus too
// frequently).
desiredStateOfWorldPopulatorGetPodStatusRetryDuration time.Duration = 2 * time.Second
// podAttachAndMountTimeout is the maximum amount of time the
// WaitForAttachAndMount call will wait for all volumes in the specified pod
// to be attached and mounted. Even though cloud operations can take several
@ -134,7 +141,8 @@ func NewVolumeManager(
hostName string,
podManager pod.Manager,
kubeClient internalclientset.Interface,
volumePluginMgr *volume.VolumePluginMgr) (VolumeManager, error) {
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime kubecontainer.Runtime) (VolumeManager, error) {
vm := &volumeManager{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
@ -157,8 +165,10 @@ func NewVolumeManager(
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,
desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
podManager,
vm.desiredStateOfWorld)
vm.desiredStateOfWorld,
kubeContainerRuntime)
return vm, nil
}