Fixed cleanup of persistent volumes.

Kubelet.cleanupOrphanedVolumes() compares list of volumes mounted to a node
with list of volumes that are required by pods scheduled on the node
("scheduled volume").

Both lists should contain real volumes, i.e. when a pod uses
PersistentVolumeClaim, the list must contain name of the bound volume instead
of name of the claim.
pull/6/head
Jan Safranek 2016-02-03 10:00:09 +01:00
parent 5914deeac8
commit e90de3f985
2 changed files with 134 additions and 3 deletions

View File

@ -1725,13 +1725,41 @@ func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) {
return pullSecrets, nil
}
// Return name of a volume. When the volume is a PersistentVolumeClaim,
// it returns name of the real PersistentVolume bound to the claim.
// It returns errror when the clam is not bound yet.
func (kl *Kubelet) resolveVolumeName(pod *api.Pod, volume *api.Volume) (string, error) {
claimSource := volume.VolumeSource.PersistentVolumeClaim
if claimSource != nil {
// resolve real volume behind the claim
claim, err := kl.kubeClient.Legacy().PersistentVolumeClaims(pod.Namespace).Get(claimSource.ClaimName)
if err != nil {
return "", fmt.Errorf("Cannot find claim %s/%s for volume %s", pod.Namespace, claimSource.ClaimName, volume.Name)
}
if claim.Status.Phase != api.ClaimBound {
return "", fmt.Errorf("Claim for volume %s/%s is not bound yet", pod.Namespace, claimSource.ClaimName)
}
// Use the real bound volume instead of PersistentVolume.Name
return claim.Spec.VolumeName, nil
}
return volume.Name, nil
}
// Stores all volumes defined by the set of pods into a map.
// It stores real volumes there, i.e. persistent volume claims are resolved
// to volumes that are bound to them.
// Keys for each entry are in the format (POD_ID)/(VOLUME_NAME)
func getDesiredVolumes(pods []*api.Pod) map[string]api.Volume {
func (kl *Kubelet) getDesiredVolumes(pods []*api.Pod) map[string]api.Volume {
desiredVolumes := make(map[string]api.Volume)
for _, pod := range pods {
for _, volume := range pod.Spec.Volumes {
identifier := path.Join(string(pod.UID), volume.Name)
volumeName, err := kl.resolveVolumeName(pod, &volume)
if err != nil {
glog.V(3).Infof("%v", err)
// Ignore the error and hope it's resolved next time
continue
}
identifier := path.Join(string(pod.UID), volumeName)
desiredVolumes[identifier] = volume
}
}
@ -1815,8 +1843,11 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
// This method is blocking:
// 1) it talks to API server to find volumes bound to persistent volume claims
// 2) it talks to cloud to detach volumes
func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
desiredVolumes := getDesiredVolumes(pods)
desiredVolumes := kl.getDesiredVolumes(pods)
currentVolumes := kl.getPodVolumesFromDisk()
runningSet := sets.String{}

View File

@ -551,6 +551,106 @@ func TestGetPodVolumesFromDisk(t *testing.T) {
}
}
// Test for https://github.com/kubernetes/kubernetes/pull/19600
func TestCleanupOrphanedVolumes(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
plug := &volume.FakeVolumePlugin{PluginName: "fake", Host: nil}
kubelet.volumePluginMgr.InitPlugins([]volume.VolumePlugin{plug}, &volumeHost{kubelet})
// create a volume "on disk"
volsOnDisk := []struct {
podUID types.UID
volName string
}{
{"podUID", "myrealvol"},
}
pathsOnDisk := []string{}
for i := range volsOnDisk {
fv := volume.FakeVolume{PodUID: volsOnDisk[i].podUID, VolName: volsOnDisk[i].volName, Plugin: plug}
fv.SetUp(nil)
pathsOnDisk = append(pathsOnDisk, fv.GetPath())
}
// store the claim in fake kubelet database
claim := api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "myclaim",
Namespace: "test",
},
Spec: api.PersistentVolumeClaimSpec{
VolumeName: "myrealvol",
},
Status: api.PersistentVolumeClaimStatus{
Phase: api.ClaimBound,
},
}
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.PersistentVolumeClaimList{Items: []api.PersistentVolumeClaim{
claim,
}}).ReactionChain
// Create a pod referencing the volume via a PersistentVolumeClaim
pod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "podUID",
Name: "pod",
Namespace: "test",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "myvolumeclaim",
VolumeSource: api.VolumeSource{
PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{
ClaimName: "myclaim",
},
},
},
},
},
}
// The pod is pending and not running yet. Test that cleanupOrphanedVolumes
// won't remove the volume from disk if the volume is referenced only
// indirectly by a claim.
err := kubelet.cleanupOrphanedVolumes([]*api.Pod{&pod}, []*kubecontainer.Pod{})
if err != nil {
t.Errorf("cleanupOrphanedVolumes failed: %v", err)
}
volumesFound := kubelet.getPodVolumesFromDisk()
if len(volumesFound) != len(pathsOnDisk) {
t.Errorf("Expected to find %d cleaners, got %d", len(pathsOnDisk), len(volumesFound))
}
for _, ep := range pathsOnDisk {
found := false
for _, cl := range volumesFound {
if ep == cl.GetPath() {
found = true
break
}
}
if !found {
t.Errorf("Could not find a volume with path %s", ep)
}
}
// The pod is deleted -> kubelet should delete the volume
err = kubelet.cleanupOrphanedVolumes([]*api.Pod{}, []*kubecontainer.Pod{})
if err != nil {
t.Errorf("cleanupOrphanedVolumes failed: %v", err)
}
volumesFound = kubelet.getPodVolumesFromDisk()
if len(volumesFound) != 0 {
t.Errorf("Expected to find 0 cleaners, got %d", len(volumesFound))
}
for _, cl := range volumesFound {
t.Errorf("Found unexpected volume %s", cl.GetPath())
}
}
type stubVolume struct {
path string
volume.MetricsNil