Merge pull request #44412 from StackPointCloud/recheck-pvc-phase

Automatic merge from submit-queue (batch tested with PRs 44412, 44810, 47130, 46017, 47829)

recheck pod volumes before marking pod as processed

This PR allows a pod's volumes to be re-checked until all are added correctly.  There's a limited amount of time when a persistent volume claim is still in the Pending phase, and if a pod is created in that time, the volume will not be added.  The issue is not uncommon with helm charts that create all objects in close succession, particularly when using aws-ebs volumes.

fixes #28962
pull/6/head
Kubernetes Submit Queue 2017-07-11 20:00:14 -07:00 committed by GitHub
commit 3f1776e07d
2 changed files with 63 additions and 1 deletions

View File

@ -260,6 +260,8 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
return return
} }
allVolumesAdded := true
// Process volume spec for each volume defined in pod // Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes { for _, podVolume := range pod.Spec.Volumes {
volumeSpec, volumeGidValue, err := volumeSpec, volumeGidValue, err :=
@ -270,6 +272,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
podVolume.Name, podVolume.Name,
format.Pod(pod), format.Pod(pod),
err) err)
allVolumesAdded = false
continue continue
} }
@ -283,6 +286,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
volumeSpec.Name(), volumeSpec.Name(),
uniquePodName, uniquePodName,
err) err)
allVolumesAdded = false
} }
glog.V(10).Infof( glog.V(10).Infof(
@ -292,7 +296,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
uniquePodName) uniquePodName)
} }
dswp.markPodProcessed(uniquePodName) // some of the volume additions may have failed, should not mark this pod as fully processed
if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
}
} }
// podPreviouslyProcessed returns true if the volumes for this pod have already // podPreviouslyProcessed returns true if the volumes for this pod have already
@ -327,6 +335,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
// createVolumeSpec creates and returns a mutatable volume.Spec object for the // createVolumeSpec creates and returns a mutatable volume.Spec object for the
// specified volume. It dereference any PVC to get PV objects, if needed. // specified volume. It dereference any PVC to get PV objects, if needed.
// Returns an error if unable to obtain the volume at this time.
func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
podVolume v1.Volume, podNamespace string) (*volume.Spec, string, error) { podVolume v1.Volume, podNamespace string) (*volume.Spec, string, error) {
if pvcSource := if pvcSource :=
@ -409,6 +418,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
} }
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
return "", "", fmt.Errorf( return "", "", fmt.Errorf(
"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)", "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
namespace, namespace,

View File

@ -92,6 +92,44 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
} }
} }
func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
node, pod, pv, claim := createObjects()
claim.Status = v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimPending,
}
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
manager := newTestVolumeManager(tmpDir, podManager, kubeClient)
stopCh := runVolumeManager(manager)
defer close(stopCh)
podManager.SetPods([]*v1.Pod{pod})
// Fake node status update
go simulateVolumeInUseUpdate(
v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
stopCh,
manager)
// delayed claim binding
go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name)
err = manager.WaitForAttachAndMount(pod)
if err != nil {
t.Errorf("Expected success: %v", err)
}
}
func TestGetExtraSupplementalGroupsForPod(t *testing.T) { func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil { if err != nil {
@ -279,6 +317,20 @@ func simulateVolumeInUseUpdate(volumeName v1.UniqueVolumeName, stopCh <-chan str
} }
} }
func delayClaimBecomesBound(
kubeClient clientset.Interface,
namespace, claimName string,
) {
time.Sleep(500 * time.Millisecond)
volumeClaim, _ :=
kubeClient.Core().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{})
volumeClaim.Status = v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimBound,
}
kubeClient.Core().PersistentVolumeClaims(namespace).Update(volumeClaim)
return
}
func runVolumeManager(manager VolumeManager) chan struct{} { func runVolumeManager(manager VolumeManager) chan struct{} {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
//readyCh := make(chan bool, 1) //readyCh := make(chan bool, 1)