Volume manager verify containers terminated before deleting

pull/6/head
Saad Ali 2016-06-28 06:01:07 -07:00 committed by saadali
parent a59ec45e2a
commit c723d9e5c4
5 changed files with 104 additions and 28 deletions

View File

@ -504,7 +504,8 @@ func NewMainKubelet(
hostname,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr)
klet.volumePluginMgr,
klet.containerRuntime)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {

View File

@ -299,7 +299,8 @@ func newTestKubeletWithImageList(
kubelet.hostname,
kubelet.podManager,
fakeKubeClient,
kubelet.volumePluginMgr)
kubelet.volumePluginMgr,
fakeRuntime)
if err != nil {
t.Fatalf("failed to initialize volume manager: %v", err)
}

View File

@ -97,7 +97,8 @@ func TestRunOnce(t *testing.T) {
kb.hostname,
kb.podManager,
kb.kubeClient,
kb.volumePluginMgr)
kb.volumePluginMgr,
fakeRuntime)
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR)
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency

View File

@ -29,7 +29,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/volume/cache"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
@ -64,24 +66,31 @@ type DesiredStateOfWorldPopulator interface {
func NewDesiredStateOfWorldPopulator(
kubeClient internalclientset.Interface,
loopSleepDuration time.Duration,
getPodStatusRetryDuration time.Duration,
podManager pod.Manager,
desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator {
desiredStateOfWorld cache.DesiredStateOfWorld,
kubeContainerRuntime kubecontainer.Runtime) DesiredStateOfWorldPopulator {
return &desiredStateOfWorldPopulator{
kubeClient: kubeClient,
loopSleepDuration: loopSleepDuration,
podManager: podManager,
desiredStateOfWorld: desiredStateOfWorld,
kubeClient: kubeClient,
loopSleepDuration: loopSleepDuration,
getPodStatusRetryDuration: getPodStatusRetryDuration,
podManager: podManager,
desiredStateOfWorld: desiredStateOfWorld,
pods: processedPods{
processedPods: make(map[volumetypes.UniquePodName]bool)},
kubeContainerRuntime: kubeContainerRuntime,
}
}
type desiredStateOfWorldPopulator struct {
kubeClient internalclientset.Interface
loopSleepDuration time.Duration
podManager pod.Manager
desiredStateOfWorld cache.DesiredStateOfWorld
pods processedPods
kubeClient internalclientset.Interface
loopSleepDuration time.Duration
getPodStatusRetryDuration time.Duration
podManager pod.Manager
desiredStateOfWorld cache.DesiredStateOfWorld
pods processedPods
kubeContainerRuntime kubecontainer.Runtime
timeOfLastGetPodStatus time.Time
}
type processedPods struct {
@ -102,6 +111,20 @@ func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
dswp.findAndAddNewPods()
// findAndRemoveDeletedPods() calls out to the container runtime to
// determine if the containers for a given pod are terminated. This is
// an expensive operation, therefore we limit the rate that
// findAndRemoveDeletedPods() is called independently of the main
// populator loop.
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
glog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return
}
dswp.findAndRemoveDeletedPods()
}
}
@ -117,19 +140,60 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
// Iterate through all pods in desired state of world, and remove if they no
// longer exist
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
var runningPods []*kubecontainer.Pod
runningPodsFetched := false
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
if _, podExists :=
dswp.podManager.GetPodByUID(volumeToMount.Pod.UID); !podExists {
glog.V(10).Infof(
"Removing volume %q (volSpec=%q) for pod %q from desired state.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName)
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName)
dswp.podManager.GetPodByUID(volumeToMount.Pod.UID); podExists {
continue
}
// Once a pod has been deleted from kubelet pod manager, do not delete
// it immediately from volume manager. Instead, check the kubelet
// containerRuntime to verify that all containers in the pod have been
// terminated.
if !runningPodsFetched {
var getPodsErr error
runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false)
if getPodsErr != nil {
glog.Errorf(
"kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.",
getPodsErr)
continue
}
runningPodsFetched = true
dswp.timeOfLastGetPodStatus = time.Now()
}
runningContainers := false
for _, runningPod := range runningPods {
if runningPod.ID == volumeToMount.Pod.UID {
if len(runningPod.Containers) > 0 {
runningContainers = true
}
break
}
}
if runningContainers {
glog.V(5).Infof(
"Pod %q has been removed from pod manager. However, it still has one or more containers in the non-exited state. Therefore it will not be removed from volume manager.",
format.Pod(volumeToMount.Pod))
continue
}
glog.V(5).Infof(
"Removing volume %q (volSpec=%q) for pod %q from desired state.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
format.Pod(volumeToMount.Pod))
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName)
}
}
@ -151,10 +215,9 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *api.Pod) {
dswp.createVolumeSpec(podVolume, pod.Namespace)
if err != nil {
glog.Errorf(
"Error processing volume %q for pod %q/%q: %v",
"Error processing volume %q for pod %q: %v",
podVolume.Name,
pod.Namespace,
pod.Name,
format.Pod(pod),
err)
continue
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/volume/cache"
@ -48,6 +49,12 @@ const (
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond
// desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of
// time the DesiredStateOfWorldPopulator loop waits between successive pod
// cleanup calls (to prevent calling containerruntime.GetPodStatus too
// frequently).
desiredStateOfWorldPopulatorGetPodStatusRetryDuration time.Duration = 2 * time.Second
// podAttachAndMountTimeout is the maximum amount of time the
// WaitForAttachAndMount call will wait for all volumes in the specified pod
// to be attached and mounted. Even though cloud operations can take several
@ -120,7 +127,8 @@ func NewVolumeManager(
hostName string,
podManager pod.Manager,
kubeClient internalclientset.Interface,
volumePluginMgr *volume.VolumePluginMgr) (VolumeManager, error) {
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime kubecontainer.Runtime) (VolumeManager, error) {
vm := &volumeManager{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
@ -143,8 +151,10 @@ func NewVolumeManager(
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,
desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
podManager,
vm.desiredStateOfWorld)
vm.desiredStateOfWorld,
kubeContainerRuntime)
return vm, nil
}