Merge pull request #28095 from saad-ali/fixRacyVolumesInUse

Automatic merge from submit-queue

Kubelet should mark VolumeInUse before checking if it is Attached

Kubelet should mark VolumeInUse before checking if it is Attached.
Controller should fetch fresh copy of node object before detach instead of relying on node informer cache.

Fixes #27836
pull/6/head
k8s-merge-robot 2016-06-28 15:59:17 -07:00 committed by GitHub
commit 532491aab6
12 changed files with 525 additions and 83 deletions

View File

@ -50,7 +50,7 @@ const (
// attach detach controller will wait for a volume to be safely unmounted
// from its node. Once this time has expired, the controller will assume the
// node or kubelet are unresponsive and will detach the volume anyway.
reconcilerMaxWaitForUnmountDuration time.Duration = 3 * time.Minute
reconcilerMaxWaitForUnmountDuration time.Duration = 6 * time.Minute
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions

View File

@ -109,7 +109,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
if !attachedVolume.MountedByNode {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, true /* verifySafeToDetach */, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
}
@ -129,7 +129,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
// If volume is not safe to detach (is mounted) wait a max amount of time before detaching any way.
if timeElapsed > rc.maxWaitForUnmountDuration {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started DetachVolume for volume %q from node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName)
}

View File

@ -3404,7 +3404,11 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
return err
}
// Update the current status on the API server
_, err = kl.kubeClient.Core().Nodes().UpdateStatus(node)
updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node)
if err == nil {
kl.volumeManager.MarkVolumesAsReportedInUse(
updatedNode.Status.VolumesInUse)
}
return err
}

View File

@ -597,7 +597,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
}
// Verify volumes detached and no longer reported as in use
err = waitForVolumeDetach(kubelet.volumeManager)
err = waitForVolumeDetach(api.UniqueVolumeName("fake/vol1"), kubelet.volumeManager)
if err != nil {
t.Error(err)
}
@ -611,7 +611,6 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
if err != nil {
t.Error(err)
}
}
func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
@ -657,6 +656,13 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
}()
kubelet.podManager.SetPods([]*api.Pod{pod})
// Fake node status update
go simulateVolumeInUseUpdate(
api.UniqueVolumeName("fake/vol1"),
stopCh,
kubelet.volumeManager)
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
if err != nil {
t.Errorf("Expected success: %v", err)
@ -747,6 +753,12 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
// Add pod
kubelet.podManager.SetPods([]*api.Pod{pod})
// Fake node status update
go simulateVolumeInUseUpdate(
api.UniqueVolumeName("fake/vol1"),
stopCh,
kubelet.volumeManager)
// Verify volumes attached
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
if err != nil {
@ -815,7 +827,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
}
// Verify volumes detached and no longer reported as in use
err = waitForVolumeDetach(kubelet.volumeManager)
err = waitForVolumeDetach(api.UniqueVolumeName("fake/vol1"), kubelet.volumeManager)
if err != nil {
t.Error(err)
}
@ -828,7 +840,6 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
if err != nil {
t.Error(err)
}
}
func TestPodVolumesExist(t *testing.T) {
@ -4987,19 +4998,15 @@ func waitForVolumeUnmount(
}
func waitForVolumeDetach(
volumeName api.UniqueVolumeName,
volumeManager kubeletvolume.VolumeManager) error {
attachedVolumes := []api.UniqueVolumeName{}
err := retryWithExponentialBackOff(
time.Duration(50*time.Millisecond),
func() (bool, error) {
// Verify volumes detached
attachedVolumes = volumeManager.GetVolumesInUse()
if len(attachedVolumes) != 0 {
return false, nil
}
return true, nil
volumeAttached := volumeManager.VolumeIsAttached(volumeName)
return !volumeAttached, nil
},
)
@ -5020,3 +5027,20 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio
}
return wait.ExponentialBackoff(backoff, fn)
}
func simulateVolumeInUseUpdate(
volumeName api.UniqueVolumeName,
stopCh <-chan struct{},
volumeManager kubeletvolume.VolumeManager) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
volumeManager.MarkVolumesAsReportedInUse(
[]api.UniqueVolumeName{volumeName})
case <-stopCh:
return
}
}
}

View File

@ -117,6 +117,11 @@ type ActualStateOfWorld interface {
// volumes that do not need to update contents should not fail.
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) (bool, string, error)
// VolumeExists returns true if the given volume exists in the list of
// attached volumes in the cache, indicating the volume is attached to this
// node.
VolumeExists(volumeName api.UniqueVolumeName) bool
// GetMountedVolumes generates and returns a list of volumes and the pods
// they are successfully attached and mounted for based on the current
// actual state of the world.
@ -127,12 +132,17 @@ type ActualStateOfWorld interface {
// current actual state of the world.
GetMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume
// GetAttachedVolumes generates and returns a list of all attached volumes.
GetAttachedVolumes() []AttachedVolume
// GetGloballyMountedVolumes generates and returns a list of all attached
// volumes that are globally mounted. This list can be used to determine
// which volumes should be reported as "in use" in the node's VolumesInUse
// status field. Globally mounted here refers to the shared plugin mount
// point for the attachable volume from which the pod specific mount points
// are created (via bind mount).
GetGloballyMountedVolumes() []AttachedVolume
// GetUnmountedVolumes generates and returns a list of attached volumes that
// have no mountedPods. This list can be used to determine which volumes are
// no longer referenced and may be detached.
// no longer referenced and may be globally unmounted and detached.
GetUnmountedVolumes() []AttachedVolume
}
@ -492,6 +502,15 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
return podExists, volumeObj.devicePath, nil
}
func (asw *actualStateOfWorld) VolumeExists(
volumeName api.UniqueVolumeName) bool {
asw.RLock()
defer asw.RUnlock()
_, volumeExists := asw.attachedVolumes[volumeName]
return volumeExists
}
func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume {
asw.RLock()
defer asw.RUnlock()
@ -525,17 +544,20 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod(
return mountedVolume
}
func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {
func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume {
asw.RLock()
defer asw.RUnlock()
unmountedVolumes := make([]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
globallyMountedVolumes := make(
[]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
unmountedVolumes = append(
unmountedVolumes,
asw.getAttachedVolume(&volumeObj))
if volumeObj.globallyMounted {
globallyMountedVolumes = append(
globallyMountedVolumes,
asw.newAttachedVolume(&volumeObj))
}
}
return unmountedVolumes
return globallyMountedVolumes
}
func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume {
@ -546,14 +568,14 @@ func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume {
if len(volumeObj.mountedPods) == 0 {
unmountedVolumes = append(
unmountedVolumes,
asw.getAttachedVolume(&volumeObj))
asw.newAttachedVolume(&volumeObj))
}
}
return unmountedVolumes
}
func (asw *actualStateOfWorld) getAttachedVolume(
func (asw *actualStateOfWorld) newAttachedVolume(
attachedVolume *attachedVolume) AttachedVolume {
return AttachedVolume{
AttachedVolume: operationexecutor.AttachedVolume{

View File

@ -27,7 +27,8 @@ import (
)
// Calls AddVolume() once to add volume
// Verifies newly added volume exists in GetAttachedVolumes()
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_AddVolume_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
@ -61,12 +62,15 @@ func Test_AddVolume_Positive_NewVolume(t *testing.T) {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
// Calls AddVolume() twice to add the same volume
// Verifies newly added volume exists in GetAttachedVolumes() and second call
// doesn't fail
// Verifies second call doesn't fail
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_AddVolume_Positive_ExistingVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
@ -105,7 +109,9 @@ func Test_AddVolume_Positive_ExistingVolume(t *testing.T) {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
// Populates data struct with a volume
@ -160,7 +166,9 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw)
}
@ -223,7 +231,9 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsInAttachedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw)
}
@ -280,7 +290,9 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
t.Fatalf("AddPodToVolume did not fail. Expected: <\"no volume with the name ... exists in the list of attached volumes\"> Actual: <no error>")
}
verifyVolumeDoesntExistInAttachedVolumes(t, volumeName, asw)
verifyVolumeExistsAsw(t, volumeName, false /* shouldExist */, asw)
verifyVolumeDoesntExistInUnmountedVolumes(t, volumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, volumeName, asw)
verifyPodDoesntExistInVolumeAsw(
t,
podName,
@ -289,28 +301,116 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
asw)
}
func verifyVolumeExistsInAttachedVolumes(
// Calls AddVolume() once to add volume
// Calls MarkDeviceAsMounted() to mark volume as globally mounted.
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume exists in GetGloballyMountedVolumes()
func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
}
// Act
err = asw.MarkDeviceAsMounted(generatedVolumeName)
// Assert
if err != nil {
t.Fatalf("MarkDeviceAsMounted failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeExistsInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
func verifyVolumeExistsInGloballyMountedVolumes(
t *testing.T, expectedVolumeName api.UniqueVolumeName, asw ActualStateOfWorld) {
attachedVolumes := asw.GetAttachedVolumes()
for _, volume := range attachedVolumes {
globallyMountedVolumes := asw.GetGloballyMountedVolumes()
for _, volume := range globallyMountedVolumes {
if volume.VolumeName == expectedVolumeName {
return
}
}
t.Fatalf(
"Could not find volume %v in the list of attached volumes for actual state of world %+v",
"Could not find volume %v in the list of GloballyMountedVolumes for actual state of world %+v",
expectedVolumeName,
attachedVolumes)
globallyMountedVolumes)
}
func verifyVolumeDoesntExistInAttachedVolumes(
func verifyVolumeDoesntExistInGloballyMountedVolumes(
t *testing.T, volumeToCheck api.UniqueVolumeName, asw ActualStateOfWorld) {
attachedVolumes := asw.GetAttachedVolumes()
for _, volume := range attachedVolumes {
globallyMountedVolumes := asw.GetGloballyMountedVolumes()
for _, volume := range globallyMountedVolumes {
if volume.VolumeName == volumeToCheck {
t.Fatalf(
"Found volume %v in the list of attached volumes. Expected it not to exist.",
"Found volume %v in the list of GloballyMountedVolumes. Expected it not to exist.",
volumeToCheck)
}
}
}
func verifyVolumeExistsAsw(
t *testing.T,
expectedVolumeName api.UniqueVolumeName,
shouldExist bool,
asw ActualStateOfWorld) {
volumeExists := asw.VolumeExists(expectedVolumeName)
if shouldExist != volumeExists {
t.Fatalf(
"VolumeExists(%q) response incorrect. Expected: <%v> Actual: <%v>",
expectedVolumeName,
shouldExist,
volumeExists)
}
}
func verifyVolumeExistsInUnmountedVolumes(
t *testing.T, expectedVolumeName api.UniqueVolumeName, asw ActualStateOfWorld) {
unmountedVolumes := asw.GetUnmountedVolumes()
for _, volume := range unmountedVolumes {
if volume.VolumeName == expectedVolumeName {
return
}
}
t.Fatalf(
"Could not find volume %v in the list of UnmountedVolumes for actual state of world %+v",
expectedVolumeName,
unmountedVolumes)
}
func verifyVolumeDoesntExistInUnmountedVolumes(
t *testing.T, volumeToCheck api.UniqueVolumeName, asw ActualStateOfWorld) {
unmountedVolumes := asw.GetUnmountedVolumes()
for _, volume := range unmountedVolumes {
if volume.VolumeName == volumeToCheck {
t.Fatalf(
"Found volume %v in the list of UnmountedVolumes. Expected it not to exist.",
volumeToCheck)
}
}

View File

@ -53,6 +53,16 @@ type DesiredStateOfWorld interface {
// volume, this is a no-op.
AddPodToVolume(podName types.UniquePodName, pod *api.Pod, volumeSpec *volume.Spec, outerVolumeSpecName string, volumeGidValue string) (api.UniqueVolumeName, error)
// MarkVolumesReportedInUse sets the ReportedInUse value to true for the
// reportedVolumes. For volumes not in the reportedVolumes list, the
// ReportedInUse value is reset to false. The default ReportedInUse value
// for a newly created volume is false.
// When set to true this value indicates that the volume was successfully
// added to the VolumesInUse field in the node's status.
// If a volume in the reportedVolumes list does not exist in the list of
// volumes that should be attached to this node, it is skipped without error.
MarkVolumesReportedInUse(reportedVolumes []api.UniqueVolumeName)
// DeletePodFromVolume removes the given pod from the given volume in the
// cache indicating the specified pod no longer requires the specified
// volume.
@ -128,6 +138,10 @@ type volumeToMount struct {
// volumeGidValue contains the value of the GID annotation, if present.
volumeGidValue string
// reportedInUse indicates that the volume was successfully added to the
// VolumesInUse field in the node's status.
reportedInUse bool
}
// The pod object represents a pod that references the underlying volume and
@ -186,6 +200,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
podsToMount: make(map[types.UniquePodName]podToMount),
pluginIsAttachable: dsw.isAttachableVolume(volumeSpec),
volumeGidValue: volumeGidValue,
reportedInUse: false,
}
dsw.volumesToMount[volumeName] = volumeObj
}
@ -203,6 +218,25 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
return volumeName, nil
}
func (dsw *desiredStateOfWorld) MarkVolumesReportedInUse(
reportedVolumes []api.UniqueVolumeName) {
dsw.Lock()
defer dsw.Unlock()
reportedVolumesMap := make(
map[api.UniqueVolumeName]bool, len(reportedVolumes) /* capacity */)
for _, reportedVolume := range reportedVolumes {
reportedVolumesMap[reportedVolume] = true
}
for volumeName, volumeObj := range dsw.volumesToMount {
_, volumeReported := reportedVolumesMap[volumeName]
volumeObj.reportedInUse = volumeReported
dsw.volumesToMount[volumeName] = volumeObj
}
}
func (dsw *desiredStateOfWorld) DeletePodFromVolume(
podName types.UniquePodName, volumeName api.UniqueVolumeName) {
dsw.Lock()
@ -266,7 +300,8 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
VolumeSpec: podObj.spec,
PluginIsAttachable: volumeObj.pluginIsAttachable,
OuterVolumeSpecName: podObj.outerVolumeSpecName,
VolumeGidValue: volumeObj.volumeGidValue}})
VolumeGidValue: volumeObj.volumeGidValue,
ReportedInUse: volumeObj.reportedInUse}})
}
}
return volumesToMount

View File

@ -64,8 +64,9 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExists(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw)
verifyVolumeExistsDsw(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
}
@ -107,8 +108,9 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExists(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw)
verifyVolumeExistsDsw(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
}
@ -145,8 +147,9 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExists(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(t, generatedVolumeName, dsw)
verifyVolumeExistsDsw(t, generatedVolumeName, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
// Act
@ -158,7 +161,140 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
verifyPodDoesntExistInVolumeDsw(t, podName, generatedVolumeName, dsw)
}
func verifyVolumeExists(
// Calls AddPodToVolume() to add three new volumes to data struct
// Verifies newly added pod/volume exists via PodExistsInVolume()
// VolumeExists() and GetVolumesToMount()
// Marks only second volume as reported in use.
// Verifies only that volume is marked reported in use
// Marks only first volume as reported in use.
// Verifies only that volume is marked reported in use
func Test_MarkVolumesReportedInUse_Positive_NewPodNewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)
pod1 := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume1-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volume1Spec := &volume.Spec{Volume: &pod1.Spec.Volumes[0]}
pod1Name := volumehelper.GetUniquePodName(pod1)
pod2 := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod2",
UID: "pod2uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume2-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device2",
},
},
},
},
},
}
volume2Spec := &volume.Spec{Volume: &pod2.Spec.Volumes[0]}
pod2Name := volumehelper.GetUniquePodName(pod2)
pod3 := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod3",
UID: "pod3uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume3-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device3",
},
},
},
},
},
}
volume3Spec := &volume.Spec{Volume: &pod3.Spec.Volumes[0]}
pod3Name := volumehelper.GetUniquePodName(pod3)
generatedVolume1Name, err := dsw.AddPodToVolume(
pod1Name, pod1, volume1Spec, volume1Spec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
generatedVolume2Name, err := dsw.AddPodToVolume(
pod2Name, pod2, volume2Spec, volume2Spec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
generatedVolume3Name, err := dsw.AddPodToVolume(
pod3Name, pod3, volume3Spec, volume3Spec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
// Act
volumesReportedInUse := []api.UniqueVolumeName{generatedVolume2Name}
dsw.MarkVolumesReportedInUse(volumesReportedInUse)
// Assert
verifyVolumeExistsDsw(t, generatedVolume1Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume1Name, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod1Name, generatedVolume1Name, dsw)
verifyVolumeExistsDsw(t, generatedVolume2Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume2Name, true /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod2Name, generatedVolume2Name, dsw)
verifyVolumeExistsDsw(t, generatedVolume3Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume3Name, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod3Name, generatedVolume3Name, dsw)
// Act
volumesReportedInUse = []api.UniqueVolumeName{generatedVolume3Name}
dsw.MarkVolumesReportedInUse(volumesReportedInUse)
// Assert
verifyVolumeExistsDsw(t, generatedVolume1Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume1Name, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod1Name, generatedVolume1Name, dsw)
verifyVolumeExistsDsw(t, generatedVolume2Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume2Name, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod2Name, generatedVolume2Name, dsw)
verifyVolumeExistsDsw(t, generatedVolume3Name, dsw)
verifyVolumeExistsInVolumesToMount(
t, generatedVolume3Name, true /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, pod3Name, generatedVolume3Name, dsw)
}
func verifyVolumeExistsDsw(
t *testing.T, expectedVolumeName api.UniqueVolumeName, dsw DesiredStateOfWorld) {
volumeExists := dsw.VolumeExists(expectedVolumeName)
if !volumeExists {
@ -181,10 +317,21 @@ func verifyVolumeDoesntExist(
}
func verifyVolumeExistsInVolumesToMount(
t *testing.T, expectedVolumeName api.UniqueVolumeName, dsw DesiredStateOfWorld) {
t *testing.T,
expectedVolumeName api.UniqueVolumeName,
expectReportedInUse bool,
dsw DesiredStateOfWorld) {
volumesToMount := dsw.GetVolumesToMount()
for _, volume := range volumesToMount {
if volume.VolumeName == expectedVolumeName {
if volume.ReportedInUse != expectReportedInUse {
t.Fatalf(
"Found volume %v in the list of VolumesToMount, but ReportedInUse incorrect. Expected: <%v> Actual: <%v>",
expectedVolumeName,
expectReportedInUse,
volume.ReportedInUse)
}
return
}
}

View File

@ -295,7 +295,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, rc.actualStateOfWorld)
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!goroutinemap.IsExponentialBackoff(err) {

View File

@ -116,7 +116,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := volumehelper.GetUniquePodName(pod)
_, err := dsw.AddPodToVolume(
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
@ -126,7 +126,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
// Act
go reconciler.Run(wait.NeverStop)
waitForAttach(t, fakePlugin, asw)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
@ -183,8 +183,9 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := volumehelper.GetUniquePodName(pod)
_, err := dsw.AddPodToVolume(
generatedVolumeName, err := dsw.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName})
// Assert
if err != nil {
@ -193,7 +194,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
// Act
go reconciler.Run(wait.NeverStop)
waitForAttach(t, fakePlugin, asw)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
@ -259,7 +260,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
// Act
go reconciler.Run(wait.NeverStop)
waitForAttach(t, fakePlugin, asw)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
@ -275,7 +276,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
// Act
dsw.DeletePodFromVolume(podName, generatedVolumeName)
waitForDetach(t, fakePlugin, asw)
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
@ -338,7 +339,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
// Act
go reconciler.Run(wait.NeverStop)
waitForAttach(t, fakePlugin, asw)
dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName})
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
@ -353,7 +355,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
// Act
dsw.DeletePodFromVolume(podName, generatedVolumeName)
waitForDetach(t, fakePlugin, asw)
waitForDetach(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
@ -361,16 +363,19 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
}
func waitForAttach(
func waitForMount(
t *testing.T,
fakePlugin *volumetesting.FakeVolumePlugin,
volumeName api.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
mountedVolumes := asw.GetMountedVolumes()
if len(mountedVolumes) > 0 {
return true, nil
for _, mountedVolume := range mountedVolumes {
if mountedVolume.VolumeName == volumeName {
return true, nil
}
}
return false, nil
@ -378,28 +383,28 @@ func waitForAttach(
)
if err != nil {
t.Fatalf("Timed out waiting for len of asw.GetMountedVolumes() to become non-zero.")
t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName)
}
}
func waitForDetach(
t *testing.T,
fakePlugin *volumetesting.FakeVolumePlugin,
volumeName api.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
attachedVolumes := asw.GetAttachedVolumes()
if len(attachedVolumes) == 0 {
return true, nil
if asw.VolumeExists(volumeName) {
return false, nil
}
return false, nil
return true, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for len of asw.attachedVolumes() to become zero.")
t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName)
}
}

View File

@ -102,10 +102,24 @@ type VolumeManager interface {
// pod object is bad, and should be avoided.
GetVolumesForPodAndAppendSupplementalGroups(pod *api.Pod) container.VolumeMap
// Returns a list of all volumes that are currently attached according to
// the actual state of the world cache and implement the volume.Attacher
// interface.
// Returns a list of all volumes that implement the volume.Attacher
// interface and are currently in use according to the actual and desired
// state of the world caches. A volume is considered "in use" as soon as it
// is added to the desired state of world, indicating it *should* be
// attached to this node and remains "in use" until it is removed from both
// the desired state of the world and the actual state of the world, or it
// has been unmounted (as indicated in actual state of world).
// TODO(#27653): VolumesInUse should be handled gracefully on kubelet'
// restarts.
GetVolumesInUse() []api.UniqueVolumeName
// VolumeIsAttached returns true if the given volume is attached to this
// node.
VolumeIsAttached(volumeName api.UniqueVolumeName) bool
// Marks the specified volume as having successfully been reported as "in
// use" in the nodes's volume status.
MarkVolumesAsReportedInUse(volumesReportedAsInUse []api.UniqueVolumeName)
}
// NewVolumeManager returns a new concrete instance implementing the
@ -209,16 +223,47 @@ func (vm *volumeManager) GetVolumesForPodAndAppendSupplementalGroups(
}
func (vm *volumeManager) GetVolumesInUse() []api.UniqueVolumeName {
attachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes()
volumesInUse :=
make([]api.UniqueVolumeName, 0 /* len */, len(attachedVolumes) /* cap */)
for _, attachedVolume := range attachedVolumes {
if attachedVolume.PluginIsAttachable {
volumesInUse = append(volumesInUse, attachedVolume.VolumeName)
// Report volumes in desired state of world and actual state of world so
// that volumes are marked in use as soon as the decision is made that the
// volume *should* be attached to this node until it is safely unmounted.
desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
mountedVolumes := vm.actualStateOfWorld.GetGloballyMountedVolumes()
volumesToReportInUse :=
make(
[]api.UniqueVolumeName,
0, /* len */
len(desiredVolumes)+len(mountedVolumes) /* cap */)
desiredVolumesMap :=
make(
map[api.UniqueVolumeName]bool,
len(desiredVolumes)+len(mountedVolumes) /* cap */)
for _, volume := range desiredVolumes {
if volume.PluginIsAttachable {
desiredVolumesMap[volume.VolumeName] = true
volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
}
}
return volumesInUse
for _, volume := range mountedVolumes {
if volume.PluginIsAttachable {
if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
}
}
}
return volumesToReportInUse
}
func (vm *volumeManager) VolumeIsAttached(
volumeName api.UniqueVolumeName) bool {
return vm.actualStateOfWorld.VolumeExists(volumeName)
}
func (vm *volumeManager) MarkVolumesAsReportedInUse(
volumesReportedAsInUse []api.UniqueVolumeName) {
vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
}
// getVolumesForPodHelper is a helper method implements the common logic for

View File

@ -60,8 +60,10 @@ type OperationExecutor interface {
// DetachVolume detaches the volume from the node specified in
// volumeToDetach, and updates the actual state of the world to reflect
// that.
DetachVolume(volumeToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// that. If verifySafeToDetach is set, a call is made to the fetch the node
// object and it is used to verify that the volume does not exist in Node's
// Status.VolumesInUse list (operation fails with error if it is).
DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// MountVolume mounts the volume to the pod specified in volumeToMount.
// Specifically it will:
@ -183,6 +185,10 @@ type VolumeToMount struct {
// DevicePath contains the path on the node where the volume is attached.
// For non-attachable volumes this is empty.
DevicePath string
// ReportedInUse indicates that the volume was successfully added to the
// VolumesInUse field in the node's status.
ReportedInUse bool
}
// AttachedVolume represents a volume that is attached to a node.
@ -335,9 +341,10 @@ func (oe *operationExecutor) AttachVolume(
func (oe *operationExecutor) DetachVolume(
volumeToDetach AttachedVolume,
verifySafeToDetach bool,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
detachFunc, err :=
oe.generateDetachVolumeFunc(volumeToDetach, actualStateOfWorld)
oe.generateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
if err != nil {
return err
}
@ -465,6 +472,7 @@ func (oe *operationExecutor) generateAttachVolumeFunc(
func (oe *operationExecutor) generateDetachVolumeFunc(
volumeToDetach AttachedVolume,
verifySafeToDetach bool,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
// Get attacher plugin
attachableVolumePlugin, err :=
@ -500,6 +508,44 @@ func (oe *operationExecutor) generateDetachVolumeFunc(
}
return func() error {
if verifySafeToDetach {
// Fetch current node object
node, fetchErr := oe.kubeClient.Core().Nodes().Get(volumeToDetach.NodeName)
if fetchErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q with: %v",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName,
fetchErr)
}
if node == nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q. Error: node object retrieved from API server is nil.",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName)
}
for _, inUseVolume := range node.Status.VolumesInUse {
if inUseVolume == volumeToDetach.VolumeName {
return fmt.Errorf("DetachVolume failed for volume %q (spec.Name: %q) from node %q. Error: volume is still in use by node, according to Node status.",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName)
}
}
// Volume not attached, return error. Caller will log and retry.
glog.Infof("Verified volume is safe to detach for volume %q (spec.Name: %q) from node %q.",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName)
}
// Execute detach
detachErr := volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
if detachErr != nil {
@ -864,6 +910,20 @@ func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
return nil
}
if !volumeToMount.ReportedInUse {
// If the given volume has not yet been added to the list of
// VolumesInUse in the node's volume status, do not proceed, return
// error. Caller will log and retry. The node status is updated
// periodically by kubelet, so it may take as much as 10 seconds
// before this clears.
// Issue #28141 to enable on demand status updates.
return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) has not yet been added to the list of VolumesInUse in the node's volume status.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
// Fetch current node object
node, fetchErr := oe.kubeClient.Core().Nodes().Get(nodeName)
if fetchErr != nil {