mirror of https://github.com/k3s-io/k3s
Redesign and implement volume reconstruction work
This PR is the first part of redesign of volume reconstruction work. The changes include 1. Remove dependency on volume spec stored in actual state for volume cleanup process (UnmountVolume and UnmountDevice) Modify AttachedVolume struct to add DeviceMountPath so that volume unmount operation can use this information instead of constructing from volume spec 2. Modify reconciler's volume reconstruction process (syncState). Currently workflow is when kubelet restarts, syncState() is only called once before reconciler starts its loop. a. If volume plugin supports reconstruction, it will use the reconstructed volume spec information to update actual state as before. b. If volume plugin cannot support reconstruction, it will use the scanned mount path information to clean up the mounts. In this PR, all the plugins still support reconstruction (except glusterfs), so reconstruction of some plugins will still have issues. The next PR will modify those plugins that cannot support reconstruction well. This PR addresses issue #52683, #54108 (This PR includes the changes to update devicePath after local attach finishes)pull/6/head
parent
49bf442175
commit
9588d2098a
|
@ -96,7 +96,7 @@ func (c *PodConfig) SeenAllSources(seenSources sets.String) bool {
|
|||
if c.pods == nil {
|
||||
return false
|
||||
}
|
||||
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), seenSources)
|
||||
glog.V(5).Infof("Looking for %v, have seen %v", c.sources.List(), seenSources)
|
||||
return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...)
|
||||
}
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ type ActualStateOfWorld interface {
|
|||
// must unmounted prior to detach.
|
||||
// If a volume with the name volumeName does not exist in the list of
|
||||
// attached volumes, an error is returned.
|
||||
SetVolumeGloballyMounted(volumeName v1.UniqueVolumeName, globallyMounted bool) error
|
||||
SetVolumeGloballyMounted(volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error
|
||||
|
||||
// DeletePodFromVolume removes the given pod from the given volume in the
|
||||
// cache indicating the volume has been successfully unmounted from the pod.
|
||||
|
@ -109,6 +109,13 @@ type ActualStateOfWorld interface {
|
|||
// volumes that do not need to update contents should not fail.
|
||||
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, string, error)
|
||||
|
||||
// VolumeExistsWithSpecName returns true if the given volume specified with the
|
||||
// volume spec name (a.k.a., InnerVolumeSpecName) exists in the list of
|
||||
// volumes that should be attached to this node.
|
||||
// If a pod with the same name does not exist under the specified
|
||||
// volume, false is returned.
|
||||
VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool
|
||||
|
||||
// VolumeExists returns true if the given volume exists in the list of
|
||||
// attached volumes in the cache, indicating the volume is attached to this
|
||||
// node.
|
||||
|
@ -240,6 +247,10 @@ type attachedVolume struct {
|
|||
// devicePath contains the path on the node where the volume is attached for
|
||||
// attachable volumes
|
||||
devicePath string
|
||||
|
||||
// deviceMountPath contains the path on the node where the device should
|
||||
// be mounted after it is attached.
|
||||
deviceMountPath string
|
||||
}
|
||||
|
||||
// The mountedPod object represents a pod for which the kubelet volume manager
|
||||
|
@ -318,13 +329,13 @@ func (asw *actualStateOfWorld) MarkVolumeAsUnmounted(
|
|||
}
|
||||
|
||||
func (asw *actualStateOfWorld) MarkDeviceAsMounted(
|
||||
volumeName v1.UniqueVolumeName) error {
|
||||
return asw.SetVolumeGloballyMounted(volumeName, true /* globallyMounted */)
|
||||
volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error {
|
||||
return asw.SetVolumeGloballyMounted(volumeName, true /* globallyMounted */, devicePath, deviceMountPath)
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
|
||||
volumeName v1.UniqueVolumeName) error {
|
||||
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */)
|
||||
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */, "", "")
|
||||
}
|
||||
|
||||
// addVolume adds the given volume to the cache indicating the specified
|
||||
|
@ -454,7 +465,7 @@ func (asw *actualStateOfWorld) MarkRemountRequired(
|
|||
}
|
||||
|
||||
func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
|
||||
volumeName v1.UniqueVolumeName, globallyMounted bool) error {
|
||||
volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error {
|
||||
asw.Lock()
|
||||
defer asw.Unlock()
|
||||
|
||||
|
@ -466,6 +477,8 @@ func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
|
|||
}
|
||||
|
||||
volumeObj.globallyMounted = globallyMounted
|
||||
volumeObj.deviceMountPath = deviceMountPath
|
||||
volumeObj.devicePath = devicePath
|
||||
asw.attachedVolumes[volumeName] = volumeObj
|
||||
return nil
|
||||
}
|
||||
|
@ -529,6 +542,19 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
|
|||
return podExists, volumeObj.devicePath, nil
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
for _, volumeObj := range asw.attachedVolumes {
|
||||
for name := range volumeObj.mountedPods {
|
||||
if podName == name && volumeObj.spec.Name() == volumeSpecName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) VolumeExists(
|
||||
volumeName v1.UniqueVolumeName) bool {
|
||||
asw.RLock()
|
||||
|
@ -625,8 +651,11 @@ func (asw *actualStateOfWorld) newAttachedVolume(
|
|||
VolumeSpec: attachedVolume.spec,
|
||||
NodeName: asw.nodeName,
|
||||
PluginIsAttachable: attachedVolume.pluginIsAttachable,
|
||||
DevicePath: attachedVolume.devicePath},
|
||||
GloballyMounted: attachedVolume.globallyMounted}
|
||||
DevicePath: attachedVolume.devicePath,
|
||||
DeviceMountPath: attachedVolume.deviceMountPath,
|
||||
PluginName: attachedVolume.pluginName},
|
||||
GloballyMounted: attachedVolume.globallyMounted,
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time check to ensure volumeNotAttachedError implements the error interface
|
||||
|
@ -691,5 +720,6 @@ func getMountedVolume(
|
|||
Mounter: mountedPod.mounter,
|
||||
BlockVolumeMapper: mountedPod.blockVolumeMapper,
|
||||
VolumeGidValue: mountedPod.volumeGidValue,
|
||||
VolumeSpec: attachedVolume.spec}}
|
||||
VolumeSpec: attachedVolume.spec,
|
||||
DeviceMountPath: attachedVolume.deviceMountPath}}
|
||||
}
|
||||
|
|
|
@ -222,6 +222,7 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
|
|||
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw)
|
||||
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
|
||||
verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw)
|
||||
verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName, volumeSpec.Name(), asw)
|
||||
}
|
||||
|
||||
// Populates data struct with a volume
|
||||
|
@ -292,6 +293,7 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
|
|||
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw)
|
||||
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
|
||||
verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw)
|
||||
verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName, volumeSpec.Name(), asw)
|
||||
}
|
||||
|
||||
// Calls AddPodToVolume() to add pod to empty data stuct
|
||||
|
@ -370,6 +372,7 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
|
|||
volumeName,
|
||||
false, /* expectVolumeToExist */
|
||||
asw)
|
||||
verifyVolumeDoesntExistWithSpecNameInVolumeAsw(t, podName, volumeSpec.Name(), asw)
|
||||
}
|
||||
|
||||
// Calls MarkVolumeAsAttached() once to add volume
|
||||
|
@ -400,6 +403,7 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
|
|||
}
|
||||
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
||||
devicePath := "fake/device/path"
|
||||
deviceMountPath := "fake/device/mount/path"
|
||||
generatedVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
|
||||
|
||||
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
|
||||
|
@ -408,7 +412,7 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
|
|||
}
|
||||
|
||||
// Act
|
||||
err = asw.MarkDeviceAsMounted(generatedVolumeName)
|
||||
err = asw.MarkDeviceAsMounted(generatedVolumeName, devicePath, deviceMountPath)
|
||||
|
||||
// Assert
|
||||
if err != nil {
|
||||
|
@ -546,3 +550,33 @@ func verifyPodDoesntExistInVolumeAsw(
|
|||
devicePath)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyVolumeExistsWithSpecNameInVolumeAsw(
|
||||
t *testing.T,
|
||||
expectedPodName volumetypes.UniquePodName,
|
||||
expectedVolumeName string,
|
||||
asw ActualStateOfWorld) {
|
||||
podExistsInVolume :=
|
||||
asw.VolumeExistsWithSpecName(expectedPodName, expectedVolumeName)
|
||||
|
||||
if !podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"ASW VolumeExistsWithSpecName result invalid. Expected: <true> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyVolumeDoesntExistWithSpecNameInVolumeAsw(
|
||||
t *testing.T,
|
||||
podToCheck volumetypes.UniquePodName,
|
||||
volumeToCheck string,
|
||||
asw ActualStateOfWorld) {
|
||||
podExistsInVolume :=
|
||||
asw.VolumeExistsWithSpecName(podToCheck, volumeToCheck)
|
||||
|
||||
if podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"ASW VolumeExistsWithSpecName result invalid. Expected: <false> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,6 +98,13 @@ type DesiredStateOfWorld interface {
|
|||
// with pod's unique name. This map can be used to determine which pod is currently
|
||||
// in desired state of world.
|
||||
GetPods() map[types.UniquePodName]bool
|
||||
|
||||
// VolumeExistsWithSpecName returns true if the given volume specified with the
|
||||
// volume spec name (a.k.a., InnerVolumeSpecName) exists in the list of
|
||||
// volumes that should be attached to this node.
|
||||
// If a pod with the same name does not exist under the specified
|
||||
// volume, false is returned.
|
||||
VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool
|
||||
}
|
||||
|
||||
// VolumeToMount represents a volume that is attached to this node and needs to
|
||||
|
@ -234,7 +241,6 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
|
|||
spec: volumeSpec,
|
||||
outerVolumeSpecName: outerVolumeSpecName,
|
||||
}
|
||||
|
||||
return volumeName, nil
|
||||
}
|
||||
|
||||
|
@ -303,6 +309,19 @@ func (dsw *desiredStateOfWorld) PodExistsInVolume(
|
|||
return podExists
|
||||
}
|
||||
|
||||
func (dsw *desiredStateOfWorld) VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool {
|
||||
dsw.RLock()
|
||||
defer dsw.RUnlock()
|
||||
for _, volumeObj := range dsw.volumesToMount {
|
||||
for name, podObj := range volumeObj.podsToMount {
|
||||
if podName == name && podObj.spec.Name() == volumeSpecName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool {
|
||||
dsw.RLock()
|
||||
defer dsw.RUnlock()
|
||||
|
|
|
@ -69,6 +69,7 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) {
|
|||
verifyVolumeExistsInVolumesToMount(
|
||||
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
|
||||
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
|
||||
verifyVolumeExistsWithSpecNameInVolumeDsw(t, podName, volumeSpec.Name(), dsw)
|
||||
}
|
||||
|
||||
// Calls AddPodToVolume() twice to add the same pod to the same volume
|
||||
|
@ -113,6 +114,7 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) {
|
|||
verifyVolumeExistsInVolumesToMount(
|
||||
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
|
||||
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
|
||||
verifyVolumeExistsWithSpecNameInVolumeDsw(t, podName, volumeSpec.Name(), dsw)
|
||||
}
|
||||
|
||||
// Populates data struct with a new volume/pod
|
||||
|
@ -160,6 +162,7 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
|
|||
verifyVolumeDoesntExist(t, generatedVolumeName, dsw)
|
||||
verifyVolumeDoesntExistInVolumesToMount(t, generatedVolumeName, dsw)
|
||||
verifyPodDoesntExistInVolumeDsw(t, podName, generatedVolumeName, dsw)
|
||||
verifyVolumeDoesntExistWithSpecNameInVolumeDsw(t, podName, volumeSpec.Name(), dsw)
|
||||
}
|
||||
|
||||
// Calls AddPodToVolume() to add three new volumes to data struct
|
||||
|
@ -380,3 +383,29 @@ func verifyPodDoesntExistInVolumeDsw(
|
|||
podExistsInVolume)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyVolumeExistsWithSpecNameInVolumeDsw(
|
||||
t *testing.T,
|
||||
expectedPodName volumetypes.UniquePodName,
|
||||
expectedVolumeSpecName string,
|
||||
dsw DesiredStateOfWorld) {
|
||||
if podExistsInVolume := dsw.VolumeExistsWithSpecName(
|
||||
expectedPodName, expectedVolumeSpecName); !podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW VolumeExistsWithSpecNam returned incorrect value. Expected: <true> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyVolumeDoesntExistWithSpecNameInVolumeDsw(
|
||||
t *testing.T,
|
||||
expectedPodName volumetypes.UniquePodName,
|
||||
expectedVolumeSpecName string,
|
||||
dsw DesiredStateOfWorld) {
|
||||
if podExistsInVolume := dsw.VolumeExistsWithSpecName(
|
||||
expectedPodName, expectedVolumeSpecName); podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW VolumeExistsWithSpecNam returned incorrect value. Expected: <true> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,6 +123,7 @@ type processedPods struct {
|
|||
|
||||
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
|
||||
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
|
||||
glog.Infof("Desired state populator starts to run")
|
||||
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
|
||||
done := sourcesReady.AllReady()
|
||||
dswp.populatorLoopFunc()()
|
||||
|
|
|
@ -140,26 +140,18 @@ type reconciler struct {
|
|||
}
|
||||
|
||||
func (rc *reconciler) Run(stopCh <-chan struct{}) {
|
||||
// Wait for the populator to indicate that it has actually populated the desired state of world, meaning it has
|
||||
// completed a populate loop that started after sources are all ready. After, there's no need to keep checking.
|
||||
wait.PollUntil(rc.loopSleepDuration, func() (bool, error) {
|
||||
rc.reconciliationLoopFunc(rc.populatorHasAddedPods())()
|
||||
return rc.populatorHasAddedPods(), nil
|
||||
}, stopCh)
|
||||
wait.Until(rc.reconciliationLoopFunc(true), rc.loopSleepDuration, stopCh)
|
||||
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
|
||||
}
|
||||
|
||||
func (rc *reconciler) reconciliationLoopFunc(populatorHasAddedPods bool) func() {
|
||||
func (rc *reconciler) reconciliationLoopFunc() func() {
|
||||
return func() {
|
||||
rc.reconcile()
|
||||
|
||||
// Add a check that the populator has added pods so that reconciler's reconstruct process will start
|
||||
// after desired state of world is populated with pod volume information. Otherwise, reconciler's
|
||||
// reconstruct process may add incomplete volume information and cause confusion. In addition, if the
|
||||
// desired state of world has not been populated yet, the reconstruct process may clean up pods' volumes
|
||||
// that are still in use because desired state of world does not contain a complete list of pods.
|
||||
if populatorHasAddedPods && time.Since(rc.timeOfLastSync) > rc.syncDuration {
|
||||
glog.V(5).Infof("Desired state of world has been populated with pods, starting reconstruct state function")
|
||||
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
|
||||
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
|
||||
// desired state of world does not contain a complete list of pods.
|
||||
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
|
||||
glog.Infof("Reconciler: start to sync state")
|
||||
rc.sync()
|
||||
}
|
||||
}
|
||||
|
@ -319,11 +311,11 @@ func (rc *reconciler) reconcile() {
|
|||
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
|
||||
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
|
||||
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
|
||||
// the volumes and udpate the actual and desired states. In the following reconciler loop, those volumes will
|
||||
// be cleaned up.
|
||||
// the volumes and udpate the actual and desired states. For the volumes that cannot support reconstruction,
|
||||
// it will try to clean up the mount paths with operation executor.
|
||||
func (rc *reconciler) sync() {
|
||||
defer rc.updateLastSyncTime()
|
||||
rc.syncStates(rc.kubeletPodsDir)
|
||||
rc.syncStates()
|
||||
}
|
||||
|
||||
func (rc *reconciler) updateLastSyncTime() {
|
||||
|
@ -348,7 +340,7 @@ type reconstructedVolume struct {
|
|||
volumeSpec *volumepkg.Spec
|
||||
outerVolumeSpecName string
|
||||
pod *v1.Pod
|
||||
pluginIsAttachable bool
|
||||
attachablePlugin volumepkg.AttachableVolumePlugin
|
||||
volumeGidValue string
|
||||
devicePath string
|
||||
reportedInUse bool
|
||||
|
@ -356,66 +348,41 @@ type reconstructedVolume struct {
|
|||
blockVolumeMapper volumepkg.BlockVolumeMapper
|
||||
}
|
||||
|
||||
// reconstructFromDisk scans the volume directories under the given pod directory. If the volume is not
|
||||
// in either actual or desired state of world, or pending operation, this function will reconstruct
|
||||
// the volume spec and put it in both the actual and desired state of worlds. If no running
|
||||
// container is mounting the volume, the volume will be removed by desired state of world's populator and
|
||||
// cleaned up by the reconciler.
|
||||
func (rc *reconciler) syncStates(podsDir string) {
|
||||
// syncStates scans the volume directories under the given pod directory.
|
||||
// If the volume is not in desired state of world, this function will reconstruct
|
||||
// the volume related information and put it in both the actual and desired state of worlds.
|
||||
// For some volume plugins that cannot support reconstruction, it will clean up the existing
|
||||
// mount points since the volume is no long needed (removed from desired state)
|
||||
func (rc *reconciler) syncStates() {
|
||||
// Get volumes information by reading the pod's directory
|
||||
podVolumes, err := getVolumesFromPodDir(podsDir)
|
||||
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot get volumes from disk %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
|
||||
for _, volume := range podVolumes {
|
||||
reconstructedVolume, err := rc.reconstructVolume(volume)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not construct volume information: %v", err)
|
||||
if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
|
||||
glog.V(4).Info("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
|
||||
continue
|
||||
}
|
||||
// Check if there is an pending operation for the given pod and volume.
|
||||
// Need to check pending operation before checking the actual and desired
|
||||
// states to avoid race condition during checking. For example, the following
|
||||
// might happen if pending operation is checked after checking actual and desired states.
|
||||
// 1. Checking the pod and it does not exist in either actual or desired state.
|
||||
// 2. An operation for the given pod finishes and the actual state is updated.
|
||||
// 3. Checking and there is no pending operation for the given pod.
|
||||
// During state reconstruction period, no new volume operations could be issued. If the
|
||||
// mounted path is not in either pending operation, or actual or desired states, this
|
||||
// volume needs to be reconstructed back to the states.
|
||||
pending := rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, reconstructedVolume.podName)
|
||||
dswExist := rc.desiredStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
|
||||
aswExist, _, _ := rc.actualStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
|
||||
|
||||
if !rc.StatesHasBeenSynced() {
|
||||
// In case this is the first time to reconstruct state after kubelet starts, for a persistant volume, it must have
|
||||
// been mounted before kubelet restarts because no mount operations could be started at this time (node
|
||||
// status has not yet been updated before this very first syncStates finishes, so that VerifyControllerAttachedVolume will fail),
|
||||
// In this case, the volume state should be put back to actual state now no matter desired state has it or not.
|
||||
// This is to prevent node status from being updated to empty for attachable volumes. This might happen because
|
||||
// in the case that a volume is discovered on disk, and it is part of desired state, but is then quickly deleted
|
||||
// from the desired state. If in such situation, the volume is not added to the actual state, the node status updater will
|
||||
// not get this volume from either actual or desired state. In turn, this might cause master controller
|
||||
// detaching while the volume is still mounted.
|
||||
if aswExist || !reconstructedVolume.pluginIsAttachable {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// Check pending first since no new operations could be started at this point.
|
||||
// Otherwise there might a race condition in checking actual states and pending operations
|
||||
if pending || dswExist || aswExist {
|
||||
continue
|
||||
}
|
||||
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
|
||||
glog.V(4).Info("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
|
||||
continue
|
||||
}
|
||||
reconstructedVolume, err := rc.reconstructVolume(volume)
|
||||
if err != nil {
|
||||
glog.Warning("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err)
|
||||
rc.cleanupMounts(volume)
|
||||
continue
|
||||
}
|
||||
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
|
||||
glog.Warning("Volume is in pending operation, skip cleaning up mounts")
|
||||
}
|
||||
|
||||
glog.V(2).Infof(
|
||||
"Reconciler sync states: could not find pod information in desired or actual states or pending operation, update it in both states: %+v",
|
||||
"Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v",
|
||||
reconstructedVolume)
|
||||
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
|
||||
|
||||
}
|
||||
|
||||
if len(volumesNeedUpdate) > 0 {
|
||||
|
@ -426,7 +393,31 @@ func (rc *reconciler) syncStates(podsDir string) {
|
|||
|
||||
}
|
||||
|
||||
// Reconstruct Volume object and reconstructedVolume data structure by reading the pod's volume directories
|
||||
func (rc *reconciler) cleanupMounts(volume podVolume) {
|
||||
glog.V(2).Infof("Reconciler sync states: could not find information (PID: %s) (Volume SpecName: %s) in desired state, clean up the mount points",
|
||||
volume.podName, volume.volumeSpecName)
|
||||
mountedVolume := operationexecutor.MountedVolume{
|
||||
PodName: volume.podName,
|
||||
VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
|
||||
InnerVolumeSpecName: volume.volumeSpecName,
|
||||
PluginName: volume.pluginName,
|
||||
PodUID: types.UID(volume.podName),
|
||||
}
|
||||
volumeHandler, err := operationexecutor.NewVolumeHandlerWithMode(volume.volumeMode, rc.operationExecutor)
|
||||
if err != nil {
|
||||
glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for UnmountVolume failed"), err).Error())
|
||||
return
|
||||
}
|
||||
// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
|
||||
// to unmount both volume and device in the same routine.
|
||||
err = volumeHandler.UnmountVolumeHandler(mountedVolume, rc.actualStateOfWorld)
|
||||
if err != nil {
|
||||
glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("volumeHandler.UnmountVolumeHandler for UnmountVolume failed"), err).Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Reconstruct volume data structure by reading the pod's volume directories
|
||||
func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
|
||||
// plugin initializations
|
||||
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
|
||||
|
@ -438,23 +429,17 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Create volumeSpec
|
||||
// Create pod object
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: types.UID(volume.podName),
|
||||
},
|
||||
}
|
||||
// TODO: remove feature gate check after no longer needed
|
||||
var mapperPlugin volumepkg.BlockVolumePlugin
|
||||
tmpSpec := &volumepkg.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{}}}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
|
||||
mapperPlugin, err = rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tmpSpec = &volumepkg.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volume.volumeMode}}}
|
||||
volumeHandler, err := operationexecutor.NewVolumeHandlerWithMode(volume.volumeMode, rc.operationExecutor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
volumeHandler, err := operationexecutor.NewVolumeHandler(tmpSpec, rc.operationExecutor)
|
||||
mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -480,7 +465,6 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
|
|||
} else {
|
||||
uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec)
|
||||
}
|
||||
|
||||
// Check existence of mount point for filesystem volume or symbolic link for block volume
|
||||
isExist, checkErr := volumeHandler.CheckVolumeExistence(volume.mountPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
|
||||
if checkErr != nil {
|
||||
|
@ -507,7 +491,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
|
|||
|
||||
// TODO: remove feature gate check after no longer needed
|
||||
var volumeMapper volumepkg.BlockVolumeMapper
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) && volume.volumeMode == v1.PersistentVolumeBlock {
|
||||
var newMapperErr error
|
||||
if mapperPlugin != nil {
|
||||
volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
|
||||
|
@ -530,23 +514,24 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
|
|||
volumeName: uniqueVolumeName,
|
||||
podName: volume.podName,
|
||||
volumeSpec: volumeSpec,
|
||||
// volume.volumeSpecName is actually InnerVolumeSpecName. But this information will likely to be updated in updateStates()
|
||||
// by checking the desired state volumeToMount list and getting the real OuterVolumeSpecName.
|
||||
// In case the pod is deleted during this period and desired state does not have this information, it will not be used
|
||||
// volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
|
||||
// for volume cleanup.
|
||||
// TODO: in case pod is added back before reconciler starts to unmount, we can update this field from desired state information
|
||||
outerVolumeSpecName: volume.volumeSpecName,
|
||||
pod: pod,
|
||||
pluginIsAttachable: attachablePlugin != nil,
|
||||
attachablePlugin: attachablePlugin,
|
||||
volumeGidValue: "",
|
||||
devicePath: "",
|
||||
mounter: volumeMounter,
|
||||
blockVolumeMapper: volumeMapper,
|
||||
// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
|
||||
// TODO: get device path directly from the volume mount path.
|
||||
devicePath: "",
|
||||
mounter: volumeMounter,
|
||||
blockVolumeMapper: volumeMapper,
|
||||
}
|
||||
return reconstructedVolume, nil
|
||||
}
|
||||
|
||||
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
|
||||
// Get the node status to retrieve volume device path information.
|
||||
// updateDevicePath gets the node status to retrieve volume device path information.
|
||||
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) {
|
||||
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(string(rc.nodeName), metav1.GetOptions{})
|
||||
if fetchErr != nil {
|
||||
glog.Errorf("updateStates in reconciler: could not get node status with error %v", fetchErr)
|
||||
|
@ -559,26 +544,42 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the list of volumes from desired state and update OuterVolumeSpecName if the information is available
|
||||
volumesToMount := rc.desiredStateOfWorld.GetVolumesToMount()
|
||||
for _, volumeToMount := range volumesToMount {
|
||||
if volume, exists := volumesNeedUpdate[volumeToMount.VolumeName]; exists {
|
||||
volume.outerVolumeSpecName = volumeToMount.OuterVolumeSpecName
|
||||
volumesNeedUpdate[volumeToMount.VolumeName] = volume
|
||||
glog.V(4).Infof("Update OuterVolumeSpecName from desired state for volume (%q): %q",
|
||||
volumeToMount.VolumeName, volume.outerVolumeSpecName)
|
||||
func getDeviceMountPath(volume *reconstructedVolume) (string, error) {
|
||||
volumeAttacher, err := volume.attachablePlugin.NewAttacher()
|
||||
if volumeAttacher == nil || err != nil {
|
||||
return "", err
|
||||
}
|
||||
deviceMountPath, err :=
|
||||
volumeAttacher.GetDeviceMountPath(volume.volumeSpec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if volume.blockVolumeMapper != nil {
|
||||
deviceMountPath, err =
|
||||
volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return deviceMountPath, nil
|
||||
}
|
||||
|
||||
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
|
||||
// Get the node status to retrieve volume device path information.
|
||||
rc.updateDevicePath(volumesNeedUpdate)
|
||||
|
||||
for _, volume := range volumesNeedUpdate {
|
||||
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
|
||||
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
|
||||
volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not add volume information to actual state of world: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = rc.actualStateOfWorld.AddPodToVolume(
|
||||
err = rc.actualStateOfWorld.MarkVolumeAsMounted(
|
||||
volume.podName,
|
||||
types.UID(volume.podName),
|
||||
volume.volumeName,
|
||||
|
@ -590,22 +591,19 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
|
|||
glog.Errorf("Could not add pod to volume information to actual state of world: %v", err)
|
||||
continue
|
||||
}
|
||||
if volume.pluginIsAttachable {
|
||||
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName)
|
||||
glog.V(4).Infof("Volume: %s (pod UID %s) is marked as mounted and added into the actual state", volume.volumeName, volume.podName)
|
||||
if volume.attachablePlugin != nil {
|
||||
deviceMountPath, err := getDeviceMountPath(volume)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not find device mount path for volume %s", volume.volumeName)
|
||||
continue
|
||||
}
|
||||
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not mark device is mounted to actual state of world: %v", err)
|
||||
continue
|
||||
}
|
||||
glog.Infof("Volume: %v is mounted", volume.volumeName)
|
||||
}
|
||||
|
||||
_, err = rc.desiredStateOfWorld.AddPodToVolume(volume.podName,
|
||||
volume.pod,
|
||||
volume.volumeSpec,
|
||||
volume.outerVolumeSpecName,
|
||||
volume.volumeGidValue)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not add pod to volume information to desired state of world: %v", err)
|
||||
glog.V(4).Infof("Volume: %s (pod UID %s) is marked device as mounted and added into the actual state", volume.volumeName, volume.podName)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -667,6 +665,6 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
glog.V(10).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
|
||||
glog.V(4).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
|
||||
return volumes, nil
|
||||
}
|
||||
|
|
|
@ -933,7 +933,8 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
|
|||
false, /* checkNodeCapabilitiesBeforeMount */
|
||||
nil))
|
||||
var mounter mount.Interface
|
||||
deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: &volume.Spec{}}
|
||||
plugins := volumetesting.NewFakeFileVolumePlugin()
|
||||
deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: &volume.Spec{}, PluginName: plugins[0].GetPluginName()}
|
||||
err := oex.UnmapDevice(deviceToDetach, asw, mounter)
|
||||
// Assert
|
||||
if assert.Error(t, err) {
|
||||
|
@ -949,7 +950,7 @@ func waitForMount(
|
|||
volumeName v1.UniqueVolumeName,
|
||||
asw cache.ActualStateOfWorld) {
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(5*time.Millisecond),
|
||||
time.Duration(500*time.Millisecond),
|
||||
func() (bool, error) {
|
||||
mountedVolumes := asw.GetMountedVolumes()
|
||||
for _, mountedVolume := range mountedVolumes {
|
||||
|
@ -973,7 +974,7 @@ func waitForDetach(
|
|||
volumeName v1.UniqueVolumeName,
|
||||
asw cache.ActualStateOfWorld) {
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(5*time.Millisecond),
|
||||
time.Duration(500*time.Millisecond),
|
||||
func() (bool, error) {
|
||||
if asw.VolumeExists(volumeName) {
|
||||
return false, nil
|
||||
|
|
|
@ -21,6 +21,7 @@ package mount
|
|||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type FileType string
|
||||
|
@ -261,3 +262,21 @@ func isBind(options []string) (bool, []string) {
|
|||
|
||||
return bind, bindRemountOpts
|
||||
}
|
||||
|
||||
// TODO: this is a workaround for the unmount device issue caused by gci mounter.
|
||||
// In GCI cluster, if gci mounter is used for mounting, the container started by mounter
|
||||
// script will cause additional mounts created in the container. Since these mounts are
|
||||
// irrelavant to the original mounts, they should be not considered when checking the
|
||||
// mount references. Current solution is to filter out those mount paths that contain
|
||||
// the string of original mount path.
|
||||
// Plan to work on better approach to solve this issue.
|
||||
|
||||
func HasMountRefs(mountPath string, mountRefs []string) bool {
|
||||
count := 0
|
||||
for _, ref := range mountRefs {
|
||||
if !strings.Contains(ref, mountPath) {
|
||||
count = count + 1
|
||||
}
|
||||
}
|
||||
return count > 0
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package operationexecutor
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -169,7 +168,7 @@ type ActualStateOfWorldMounterUpdater interface {
|
|||
MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
|
||||
|
||||
// Marks the specified volume as having been globally mounted.
|
||||
MarkDeviceAsMounted(volumeName v1.UniqueVolumeName) error
|
||||
MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
|
||||
|
||||
// Marks the specified volume as having its global mount unmounted.
|
||||
MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
|
||||
|
@ -386,6 +385,14 @@ type AttachedVolume struct {
|
|||
// DevicePath contains the path on the node where the volume is attached.
|
||||
// For non-attachable volumes this is empty.
|
||||
DevicePath string
|
||||
|
||||
// DeviceMountPath contains the path on the node where the device should
|
||||
// be mounted after it is attached.
|
||||
DeviceMountPath string
|
||||
|
||||
// PluginName is the Unescaped Qualified name of the volume plugin used to
|
||||
// attach and mount this volume.
|
||||
PluginName string
|
||||
}
|
||||
|
||||
// GenerateMsgDetailed returns detailed msgs for attached volumes
|
||||
|
@ -529,6 +536,10 @@ type MountedVolume struct {
|
|||
// VolumeSpec is a volume spec containing the specification for the volume
|
||||
// that should be mounted.
|
||||
VolumeSpec *volume.Spec
|
||||
|
||||
// DeviceMountPath contains the path on the node where the device should
|
||||
// be mounted after it is attached.
|
||||
DeviceMountPath string
|
||||
}
|
||||
|
||||
// GenerateMsgDetailed returns detailed msgs for mounted volumes
|
||||
|
@ -866,6 +877,21 @@ func NewVolumeHandler(volumeSpec *volume.Spec, oe OperationExecutor) (VolumeStat
|
|||
return volumeHandler, nil
|
||||
}
|
||||
|
||||
// NewVolumeHandlerWithMode return a new instance of volumeHandler depens on a volumeMode
|
||||
func NewVolumeHandlerWithMode(volumeMode v1.PersistentVolumeMode, oe OperationExecutor) (VolumeStateHandler, error) {
|
||||
var volumeHandler VolumeStateHandler
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
|
||||
if volumeMode == v1.PersistentVolumeFilesystem {
|
||||
volumeHandler = NewFilesystemVolumeHandler(oe)
|
||||
} else {
|
||||
volumeHandler = NewBlockVolumeHandler(oe)
|
||||
}
|
||||
} else {
|
||||
volumeHandler = NewFilesystemVolumeHandler(oe)
|
||||
}
|
||||
return volumeHandler, nil
|
||||
}
|
||||
|
||||
// NewFilesystemVolumeHandler returns a new instance of FilesystemVolumeHandler.
|
||||
func NewFilesystemVolumeHandler(operationExecutor OperationExecutor) FilesystemVolumeHandler {
|
||||
return FilesystemVolumeHandler{
|
||||
|
@ -924,7 +950,7 @@ func (f FilesystemVolumeHandler) UnmountDeviceHandler(attachedVolume AttachedVol
|
|||
// ReconstructVolumeHandler create volumeSpec from mount path
|
||||
// This method is handler for filesystem volume
|
||||
func (f FilesystemVolumeHandler) ReconstructVolumeHandler(plugin volume.VolumePlugin, _ volume.BlockVolumePlugin, _ types.UID, _ volumetypes.UniquePodName, volumeSpecName string, mountPath string, _ string) (*volume.Spec, error) {
|
||||
glog.V(12).Infof("Starting operationExecutor.ReconstructVolumepodName")
|
||||
glog.V(4).Infof("Starting operationExecutor.ReconstructVolumepodName volume spec name %s, mount path %s", volumeSpecName, mountPath)
|
||||
volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, mountPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1024,21 +1050,3 @@ func (b BlockVolumeHandler) CheckVolumeExistence(mountPath, volumeName string, m
|
|||
}
|
||||
return islinkExist, nil
|
||||
}
|
||||
|
||||
// TODO: this is a workaround for the unmount device issue caused by gci mounter.
|
||||
// In GCI cluster, if gci mounter is used for mounting, the container started by mounter
|
||||
// script will cause additional mounts created in the container. Since these mounts are
|
||||
// irrelavant to the original mounts, they should be not considered when checking the
|
||||
// mount references. Current solution is to filter out those mount paths that contain
|
||||
// the string of original mount path.
|
||||
// Plan to work on better approach to solve this issue.
|
||||
|
||||
func hasMountRefs(mountPath string, mountRefs []string) bool {
|
||||
count := 0
|
||||
for _, ref := range mountRefs {
|
||||
if !strings.Contains(ref, mountPath) {
|
||||
count = count + 1
|
||||
}
|
||||
}
|
||||
return count > 0
|
||||
}
|
||||
|
|
|
@ -524,7 +524,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
|
|||
|
||||
// Update actual state of world to reflect volume is globally mounted
|
||||
markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
|
||||
volumeToMount.VolumeName)
|
||||
volumeToMount.VolumeName, devicePath, deviceMountPath)
|
||||
if markDeviceMountedErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
|
||||
|
@ -708,7 +708,7 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
|
|||
mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
|
||||
// Get attacher plugin
|
||||
attachableVolumePlugin, err :=
|
||||
og.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec)
|
||||
og.volumePluginMgr.FindAttachablePluginByName(deviceToDetach.PluginName)
|
||||
if err != nil || attachableVolumePlugin == nil {
|
||||
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err)
|
||||
}
|
||||
|
@ -717,22 +717,11 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
|
|||
if err != nil {
|
||||
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err)
|
||||
}
|
||||
|
||||
volumeAttacher, err := attachableVolumePlugin.NewAttacher()
|
||||
if err != nil {
|
||||
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err)
|
||||
}
|
||||
|
||||
unmountDeviceFunc := func() (error, error) {
|
||||
deviceMountPath, err :=
|
||||
volumeAttacher.GetDeviceMountPath(deviceToDetach.VolumeSpec)
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return deviceToDetach.GenerateError("GetDeviceMountPath failed", err)
|
||||
}
|
||||
deviceMountPath := deviceToDetach.DeviceMountPath
|
||||
refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
|
||||
|
||||
if err != nil || hasMountRefs(deviceMountPath, refs) {
|
||||
if err != nil || mount.HasMountRefs(deviceMountPath, refs) {
|
||||
if err == nil {
|
||||
err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs)
|
||||
}
|
||||
|
@ -829,6 +818,13 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
|
|||
|
||||
mapVolumeFunc := func() (error, error) {
|
||||
var devicePath string
|
||||
// Set up global map path under the given plugin directory using symbolic link
|
||||
globalMapPath, err :=
|
||||
blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MapVolume.GetDeviceMountPath failed", err)
|
||||
}
|
||||
if volumeAttacher != nil {
|
||||
// Wait for attachable volumes to finish attaching
|
||||
glog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)))
|
||||
|
@ -844,7 +840,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
|
|||
|
||||
// Update actual state of world to reflect volume is globally mounted
|
||||
markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted(
|
||||
volumeToMount.VolumeName)
|
||||
volumeToMount.VolumeName, devicePath, globalMapPath)
|
||||
if markDeviceMappedErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
|
||||
|
@ -864,18 +860,13 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
|
|||
return volumeToMount.GenerateError("MapVolume failed", fmt.Errorf("Device path of the volume is empty"))
|
||||
}
|
||||
}
|
||||
// Set up global map path under the given plugin directory using symbolic link
|
||||
globalMapPath, err :=
|
||||
blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MapVolume.GetDeviceMountPath failed", err)
|
||||
}
|
||||
|
||||
mapErr = og.blkUtil.MapDevice(devicePath, globalMapPath, string(volumeToMount.Pod.UID))
|
||||
if mapErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr)
|
||||
}
|
||||
|
||||
// Device mapping for global map path succeeded
|
||||
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MapVolume.MapDevice succeeded", fmt.Sprintf("globalMapPath %q", globalMapPath))
|
||||
verbosity := glog.Level(4)
|
||||
|
@ -970,8 +961,7 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc(
|
|||
}
|
||||
// Try to unmap podUID symlink under global map path dir
|
||||
// plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID}
|
||||
globalUnmapPath, err :=
|
||||
blockVolumeUnmapper.GetGlobalMapPath(volumeToUnmount.VolumeSpec)
|
||||
globalUnmapPath := volumeToUnmount.DeviceMountPath
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return volumeToUnmount.GenerateError("UnmapVolume.GetGlobalUnmapPath failed", err)
|
||||
|
@ -1025,23 +1015,14 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
|
|||
actualStateOfWorld ActualStateOfWorldMounterUpdater,
|
||||
mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
|
||||
|
||||
// Get block volume mapper plugin
|
||||
var blockVolumeMapper volume.BlockVolumeMapper
|
||||
blockVolumePlugin, err :=
|
||||
og.volumePluginMgr.FindMapperPluginBySpec(deviceToDetach.VolumeSpec)
|
||||
og.volumePluginMgr.FindMapperPluginByName(deviceToDetach.PluginName)
|
||||
if err != nil {
|
||||
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed", err)
|
||||
}
|
||||
if blockVolumePlugin == nil {
|
||||
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
|
||||
}
|
||||
blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
|
||||
deviceToDetach.VolumeSpec,
|
||||
nil, /* Pod */
|
||||
volume.VolumeOptions{})
|
||||
if newMapperErr != nil {
|
||||
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewBlockVolumeMapper initialization failed", newMapperErr)
|
||||
}
|
||||
|
||||
blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
|
||||
string(deviceToDetach.VolumeName),
|
||||
|
@ -1053,8 +1034,7 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
|
|||
unmapDeviceFunc := func() (error, error) {
|
||||
// Search under globalMapPath dir if all symbolic links from pods have been removed already.
|
||||
// If symbolick links are there, pods may still refer the volume.
|
||||
globalMapPath, err :=
|
||||
blockVolumeMapper.GetGlobalMapPath(deviceToDetach.VolumeSpec)
|
||||
globalMapPath := deviceToDetach.DeviceMountPath
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return deviceToDetach.GenerateError("UnmapDevice.GetGlobalMapPath failed", err)
|
||||
|
|
|
@ -223,6 +223,10 @@ var _ = utils.SIGDescribe("PersistentVolumes[Disruptive][Flaky]", func() {
|
|||
testItStmt: "Should test that a volume mounted to a pod that is deleted while the kubelet is down unmounts when the kubelet returns.",
|
||||
runTest: utils.TestVolumeUnmountsFromDeletedPod,
|
||||
},
|
||||
{
|
||||
testItStmt: "Should test that a volume mounted to a pod that is force deleted while the kubelet is down unmounts when the kubelet returns.",
|
||||
runTest: utils.TestVolumeUnmountsFromForceDeletedPod,
|
||||
},
|
||||
}
|
||||
|
||||
// Test loop executes each disruptiveTest iteratively.
|
||||
|
|
|
@ -156,7 +156,8 @@ func TestKubeletRestartsAndRestoresMount(c clientset.Interface, f *framework.Fra
|
|||
}
|
||||
|
||||
// TestVolumeUnmountsFromDeletedPod tests that a volume unmounts if the client pod was deleted while the kubelet was down.
|
||||
func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) {
|
||||
// forceDelete is true indicating whether the pod is forcelly deleted.
|
||||
func TestVolumeUnmountsFromDeletedPodWithForceOption(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, forceDelete bool) {
|
||||
nodeIP, err := framework.GetHostExternalAddress(c, clientPod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
nodeIP = nodeIP + ":22"
|
||||
|
@ -175,7 +176,11 @@ func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framew
|
|||
}
|
||||
}()
|
||||
By(fmt.Sprintf("Deleting Pod %q", clientPod.Name))
|
||||
err = c.CoreV1().Pods(clientPod.Namespace).Delete(clientPod.Name, &metav1.DeleteOptions{})
|
||||
if forceDelete {
|
||||
err = c.CoreV1().Pods(clientPod.Namespace).Delete(clientPod.Name, metav1.NewDeleteOptions(0))
|
||||
} else {
|
||||
err = c.CoreV1().Pods(clientPod.Namespace).Delete(clientPod.Name, &metav1.DeleteOptions{})
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
By("Starting the kubelet and waiting for pod to delete.")
|
||||
KubeletCommand(KStart, c, clientPod)
|
||||
|
@ -184,6 +189,11 @@ func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framew
|
|||
Expect(err).NotTo(HaveOccurred(), "Expected pod to terminate.")
|
||||
}
|
||||
|
||||
if forceDelete {
|
||||
// With forceDelete, since pods are immediately deleted from API server, there is no way to be sure when volumes are torn down
|
||||
// so wait some time to finish
|
||||
time.Sleep(30 * time.Second)
|
||||
}
|
||||
By("Expecting the volume mount not to be found.")
|
||||
result, err = framework.SSH(fmt.Sprintf("mount | grep %s", clientPod.UID), nodeIP, framework.TestContext.Provider)
|
||||
framework.LogSSHResult(result)
|
||||
|
@ -192,6 +202,16 @@ func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framew
|
|||
framework.Logf("Volume unmounted on node %s", clientPod.Spec.NodeName)
|
||||
}
|
||||
|
||||
// TestVolumeUnmountsFromDeletedPod tests that a volume unmounts if the client pod was deleted while the kubelet was down.
|
||||
func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) {
|
||||
TestVolumeUnmountsFromDeletedPodWithForceOption(c, f, clientPod, pvc, pv, false)
|
||||
}
|
||||
|
||||
// TestVolumeUnmountsFromFoceDeletedPod tests that a volume unmounts if the client pod was forcelly deleted while the kubelet was down.
|
||||
func TestVolumeUnmountsFromForceDeletedPod(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) {
|
||||
TestVolumeUnmountsFromDeletedPodWithForceOption(c, f, clientPod, pvc, pv, true)
|
||||
}
|
||||
|
||||
// RunInPodWithVolume runs a command in a pod with given claim mounted to /mnt directory.
|
||||
func RunInPodWithVolume(c clientset.Interface, ns, claimName, command string) {
|
||||
pod := &v1.Pod{
|
||||
|
|
Loading…
Reference in New Issue