2019-01-12 04:58:27 +00:00
/ *
Copyright 2016 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
// Package reconciler implements interfaces that attempt to reconcile the
// desired state of the world with the actual state of the world by triggering
// relevant actions (attach, detach, mount, unmount).
package reconciler
import (
2020-03-26 21:07:15 +00:00
"context"
2019-01-12 04:58:27 +00:00
"fmt"
"io/ioutil"
"os"
"path"
2019-12-12 01:27:03 +00:00
"path/filepath"
2019-01-12 04:58:27 +00:00
"time"
2020-08-10 17:43:49 +00:00
"k8s.io/klog/v2"
2020-12-01 01:06:26 +00:00
"k8s.io/mount-utils"
2019-12-12 01:27:03 +00:00
utilpath "k8s.io/utils/path"
utilstrings "k8s.io/utils/strings"
2019-09-27 21:51:53 +00:00
v1 "k8s.io/api/core/v1"
2019-01-12 04:58:27 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
volumepkg "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
2019-09-27 21:51:53 +00:00
"k8s.io/kubernetes/pkg/volume/util/hostutil"
2019-01-12 04:58:27 +00:00
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// Reconciler runs a periodic loop to reconcile the desired state of the world
// with the actual state of the world by triggering attach, detach, mount, and
// unmount operations.
// Note: This is distinct from the Reconciler implemented by the attach/detach
// controller. This reconciles state for the kubelet volume manager. That
// reconciles state for the attach/detach controller.
type Reconciler interface {
// Starts running the reconciliation loop which executes periodically, checks
// if volumes that should be mounted are mounted and volumes that should
// be unmounted are unmounted. If not, it will trigger mount/unmount
// operations to rectify.
// If attach/detach management is enabled, the manager will also check if
// volumes that should be attached are attached and volumes that should
// be detached are detached and trigger attach/detach operations as needed.
Run ( stopCh <- chan struct { } )
// StatesHasBeenSynced returns true only after syncStates process starts to sync
// states at least once after kubelet starts
StatesHasBeenSynced ( ) bool
}
// NewReconciler returns a new instance of Reconciler.
//
// controllerAttachDetachEnabled - if true, indicates that the attach/detach
// controller is responsible for managing the attach/detach operations for
// this node, and therefore the volume manager should not
// loopSleepDuration - the amount of time the reconciler loop sleeps between
// successive executions
// waitForAttachTimeout - the amount of time the Mount function will wait for
// the volume to be attached
// nodeName - the Name for this node, used by Attach and Detach methods
// desiredStateOfWorld - cache containing the desired state of the world
// actualStateOfWorld - cache containing the actual state of the world
// populatorHasAddedPods - checker for whether the populator has finished
// adding pods to the desiredStateOfWorld cache at least once after sources
// are all ready (before sources are ready, pods are probably missing)
// operationExecutor - used to trigger attach/detach/mount/unmount operations
// safely (prevents more than one operation from being triggered on the same
// volume)
// mounter - mounter passed in from kubelet, passed down unmount path
2019-09-27 21:51:53 +00:00
// hostutil - hostutil passed in from kubelet
2019-04-07 17:07:55 +00:00
// volumePluginMgr - volume plugin manager passed from kubelet
2019-01-12 04:58:27 +00:00
func NewReconciler (
kubeClient clientset . Interface ,
controllerAttachDetachEnabled bool ,
loopSleepDuration time . Duration ,
waitForAttachTimeout time . Duration ,
nodeName types . NodeName ,
desiredStateOfWorld cache . DesiredStateOfWorld ,
actualStateOfWorld cache . ActualStateOfWorld ,
populatorHasAddedPods func ( ) bool ,
operationExecutor operationexecutor . OperationExecutor ,
mounter mount . Interface ,
2019-09-27 21:51:53 +00:00
hostutil hostutil . HostUtils ,
2019-01-12 04:58:27 +00:00
volumePluginMgr * volumepkg . VolumePluginMgr ,
kubeletPodsDir string ) Reconciler {
return & reconciler {
kubeClient : kubeClient ,
controllerAttachDetachEnabled : controllerAttachDetachEnabled ,
loopSleepDuration : loopSleepDuration ,
waitForAttachTimeout : waitForAttachTimeout ,
nodeName : nodeName ,
desiredStateOfWorld : desiredStateOfWorld ,
actualStateOfWorld : actualStateOfWorld ,
populatorHasAddedPods : populatorHasAddedPods ,
operationExecutor : operationExecutor ,
mounter : mounter ,
2019-09-27 21:51:53 +00:00
hostutil : hostutil ,
2019-01-12 04:58:27 +00:00
volumePluginMgr : volumePluginMgr ,
kubeletPodsDir : kubeletPodsDir ,
timeOfLastSync : time . Time { } ,
}
}
type reconciler struct {
kubeClient clientset . Interface
controllerAttachDetachEnabled bool
loopSleepDuration time . Duration
waitForAttachTimeout time . Duration
nodeName types . NodeName
desiredStateOfWorld cache . DesiredStateOfWorld
actualStateOfWorld cache . ActualStateOfWorld
populatorHasAddedPods func ( ) bool
operationExecutor operationexecutor . OperationExecutor
mounter mount . Interface
2019-09-27 21:51:53 +00:00
hostutil hostutil . HostUtils
2019-01-12 04:58:27 +00:00
volumePluginMgr * volumepkg . VolumePluginMgr
kubeletPodsDir string
timeOfLastSync time . Time
}
func ( rc * reconciler ) Run ( stopCh <- chan struct { } ) {
wait . Until ( rc . reconciliationLoopFunc ( ) , rc . loopSleepDuration , stopCh )
}
func ( rc * reconciler ) reconciliationLoopFunc ( ) func ( ) {
return func ( ) {
rc . reconcile ( )
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc . populatorHasAddedPods ( ) && ! rc . StatesHasBeenSynced ( ) {
2021-03-18 22:40:29 +00:00
klog . InfoS ( "Reconciler: start to sync state" )
2019-01-12 04:58:27 +00:00
rc . sync ( )
}
}
}
func ( rc * reconciler ) reconcile ( ) {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
2020-03-26 21:07:15 +00:00
rc . unmountVolumes ( )
2019-01-12 04:58:27 +00:00
2020-03-26 21:07:15 +00:00
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc . mountAttachVolumes ( )
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc . unmountDetachDevices ( )
}
func ( rc * reconciler ) unmountVolumes ( ) {
2019-01-12 04:58:27 +00:00
// Ensure volumes that should be unmounted are unmounted.
2020-03-26 21:07:15 +00:00
for _ , mountedVolume := range rc . actualStateOfWorld . GetAllMountedVolumes ( ) {
2019-01-12 04:58:27 +00:00
if ! rc . desiredStateOfWorld . PodExistsInVolume ( mountedVolume . PodName , mountedVolume . VolumeName ) {
// Volume is mounted, unmount it
2021-03-18 22:40:29 +00:00
klog . V ( 5 ) . InfoS ( mountedVolume . GenerateMsgDetailed ( "Starting operationExecutor.UnmountVolume" , "" ) )
2019-01-12 04:58:27 +00:00
err := rc . operationExecutor . UnmountVolume (
mountedVolume . MountedVolume , rc . actualStateOfWorld , rc . kubeletPodsDir )
if err != nil &&
! nestedpendingoperations . IsAlreadyExists ( err ) &&
! exponentialbackoff . IsExponentialBackoff ( err ) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , mountedVolume . GenerateErrorDetailed ( fmt . Sprintf ( "operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)" , rc . controllerAttachDetachEnabled ) , err ) . Error ( ) )
2019-01-12 04:58:27 +00:00
}
if err == nil {
2021-03-18 22:40:29 +00:00
klog . InfoS ( mountedVolume . GenerateMsgDetailed ( "operationExecutor.UnmountVolume started" , "" ) )
2019-01-12 04:58:27 +00:00
}
}
}
2020-03-26 21:07:15 +00:00
}
2019-01-12 04:58:27 +00:00
2020-03-26 21:07:15 +00:00
func ( rc * reconciler ) mountAttachVolumes ( ) {
2019-01-12 04:58:27 +00:00
// Ensure volumes that should be attached/mounted are attached/mounted.
for _ , volumeToMount := range rc . desiredStateOfWorld . GetVolumesToMount ( ) {
volMounted , devicePath , err := rc . actualStateOfWorld . PodExistsInVolume ( volumeToMount . PodName , volumeToMount . VolumeName )
volumeToMount . DevicePath = devicePath
if cache . IsVolumeNotAttachedError ( err ) {
if rc . controllerAttachDetachEnabled || ! volumeToMount . PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
2021-03-18 22:40:29 +00:00
klog . V ( 5 ) . InfoS ( volumeToMount . GenerateMsgDetailed ( "Starting operationExecutor.VerifyControllerAttachedVolume" , "" ) )
2019-01-12 04:58:27 +00:00
err := rc . operationExecutor . VerifyControllerAttachedVolume (
volumeToMount . VolumeToMount ,
rc . nodeName ,
rc . actualStateOfWorld )
if err != nil &&
! nestedpendingoperations . IsAlreadyExists ( err ) &&
! exponentialbackoff . IsExponentialBackoff ( err ) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , volumeToMount . GenerateErrorDetailed ( fmt . Sprintf ( "operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)" , rc . controllerAttachDetachEnabled ) , err ) . Error ( ) )
2019-01-12 04:58:27 +00:00
}
if err == nil {
2021-03-18 22:40:29 +00:00
klog . InfoS ( volumeToMount . GenerateMsgDetailed ( "operationExecutor.VerifyControllerAttachedVolume started" , "" ) )
2019-01-12 04:58:27 +00:00
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor . VolumeToAttach {
VolumeName : volumeToMount . VolumeName ,
VolumeSpec : volumeToMount . VolumeSpec ,
NodeName : rc . nodeName ,
}
2021-03-18 22:40:29 +00:00
klog . V ( 5 ) . InfoS ( volumeToAttach . GenerateMsgDetailed ( "Starting operationExecutor.AttachVolume" , "" ) )
2019-01-12 04:58:27 +00:00
err := rc . operationExecutor . AttachVolume ( volumeToAttach , rc . actualStateOfWorld )
if err != nil &&
! nestedpendingoperations . IsAlreadyExists ( err ) &&
! exponentialbackoff . IsExponentialBackoff ( err ) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , volumeToMount . GenerateErrorDetailed ( fmt . Sprintf ( "operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)" , rc . controllerAttachDetachEnabled ) , err ) . Error ( ) )
2019-01-12 04:58:27 +00:00
}
if err == nil {
2021-03-18 22:40:29 +00:00
klog . InfoS ( volumeToMount . GenerateMsgDetailed ( "operationExecutor.AttachVolume started" , "" ) )
2019-01-12 04:58:27 +00:00
}
}
} else if ! volMounted || cache . IsRemountRequiredError ( err ) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache . IsRemountRequiredError ( err )
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( volumeToMount . GenerateMsgDetailed ( "Starting operationExecutor.MountVolume" , remountingLogStr ) )
2019-01-12 04:58:27 +00:00
err := rc . operationExecutor . MountVolume (
rc . waitForAttachTimeout ,
volumeToMount . VolumeToMount ,
rc . actualStateOfWorld ,
isRemount )
if err != nil &&
! nestedpendingoperations . IsAlreadyExists ( err ) &&
! exponentialbackoff . IsExponentialBackoff ( err ) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , volumeToMount . GenerateErrorDetailed ( fmt . Sprintf ( "operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)" , rc . controllerAttachDetachEnabled ) , err ) . Error ( ) )
2019-01-12 04:58:27 +00:00
}
if err == nil {
if remountingLogStr == "" {
2021-03-18 22:40:29 +00:00
klog . V ( 1 ) . InfoS ( volumeToMount . GenerateMsgDetailed ( "operationExecutor.MountVolume started" , remountingLogStr ) )
2019-01-12 04:58:27 +00:00
} else {
2021-03-18 22:40:29 +00:00
klog . V ( 5 ) . InfoS ( volumeToMount . GenerateMsgDetailed ( "operationExecutor.MountVolume started" , remountingLogStr ) )
2019-01-12 04:58:27 +00:00
}
}
2019-08-30 18:33:25 +00:00
} else if cache . IsFSResizeRequiredError ( err ) &&
utilfeature . DefaultFeatureGate . Enabled ( features . ExpandInUsePersistentVolumes ) {
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( volumeToMount . GenerateMsgDetailed ( "Starting operationExecutor.ExpandInUseVolume" , "" ) )
2019-08-30 18:33:25 +00:00
err := rc . operationExecutor . ExpandInUseVolume (
volumeToMount . VolumeToMount ,
rc . actualStateOfWorld )
if err != nil &&
! nestedpendingoperations . IsAlreadyExists ( err ) &&
! exponentialbackoff . IsExponentialBackoff ( err ) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , volumeToMount . GenerateErrorDetailed ( "operationExecutor.ExpandInUseVolume failed" , err ) . Error ( ) )
2019-08-30 18:33:25 +00:00
}
if err == nil {
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( volumeToMount . GenerateMsgDetailed ( "operationExecutor.ExpandInUseVolume started" , "" ) )
2019-08-30 18:33:25 +00:00
}
2019-01-12 04:58:27 +00:00
}
}
2020-03-26 21:07:15 +00:00
}
2019-01-12 04:58:27 +00:00
2020-03-26 21:07:15 +00:00
func ( rc * reconciler ) unmountDetachDevices ( ) {
2019-01-12 04:58:27 +00:00
for _ , attachedVolume := range rc . actualStateOfWorld . GetUnmountedVolumes ( ) {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if ! rc . desiredStateOfWorld . VolumeExists ( attachedVolume . VolumeName ) &&
2020-03-26 21:07:15 +00:00
! rc . operationExecutor . IsOperationPending ( attachedVolume . VolumeName , nestedpendingoperations . EmptyUniquePodName , nestedpendingoperations . EmptyNodeName ) {
if attachedVolume . DeviceMayBeMounted ( ) {
2019-01-12 04:58:27 +00:00
// Volume is globally mounted to device, unmount it
2021-03-18 22:40:29 +00:00
klog . V ( 5 ) . InfoS ( attachedVolume . GenerateMsgDetailed ( "Starting operationExecutor.UnmountDevice" , "" ) )
2019-01-12 04:58:27 +00:00
err := rc . operationExecutor . UnmountDevice (
2019-09-27 21:51:53 +00:00
attachedVolume . AttachedVolume , rc . actualStateOfWorld , rc . hostutil )
2019-01-12 04:58:27 +00:00
if err != nil &&
! nestedpendingoperations . IsAlreadyExists ( err ) &&
! exponentialbackoff . IsExponentialBackoff ( err ) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , attachedVolume . GenerateErrorDetailed ( fmt . Sprintf ( "operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)" , rc . controllerAttachDetachEnabled ) , err ) . Error ( ) )
2019-01-12 04:58:27 +00:00
}
if err == nil {
2021-03-18 22:40:29 +00:00
klog . InfoS ( attachedVolume . GenerateMsgDetailed ( "operationExecutor.UnmountDevice started" , "" ) )
2019-01-12 04:58:27 +00:00
}
} else {
// Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
if rc . controllerAttachDetachEnabled || ! attachedVolume . PluginIsAttachable {
rc . actualStateOfWorld . MarkVolumeAsDetached ( attachedVolume . VolumeName , attachedVolume . NodeName )
2021-03-18 22:40:29 +00:00
klog . InfoS ( attachedVolume . GenerateMsgDetailed ( "Volume detached" , fmt . Sprintf ( "DevicePath %q" , attachedVolume . DevicePath ) ) )
2019-01-12 04:58:27 +00:00
} else {
// Only detach if kubelet detach is enabled
2021-03-18 22:40:29 +00:00
klog . V ( 5 ) . InfoS ( attachedVolume . GenerateMsgDetailed ( "Starting operationExecutor.DetachVolume" , "" ) )
2019-01-12 04:58:27 +00:00
err := rc . operationExecutor . DetachVolume (
attachedVolume . AttachedVolume , false /* verifySafeToDetach */ , rc . actualStateOfWorld )
if err != nil &&
! nestedpendingoperations . IsAlreadyExists ( err ) &&
! exponentialbackoff . IsExponentialBackoff ( err ) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , attachedVolume . GenerateErrorDetailed ( fmt . Sprintf ( "operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)" , rc . controllerAttachDetachEnabled ) , err ) . Error ( ) )
2019-01-12 04:58:27 +00:00
}
if err == nil {
2021-03-18 22:40:29 +00:00
klog . InfoS ( attachedVolume . GenerateMsgDetailed ( "operationExecutor.DetachVolume started" , "" ) )
2019-01-12 04:58:27 +00:00
}
}
}
}
}
}
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func ( rc * reconciler ) sync ( ) {
defer rc . updateLastSyncTime ( )
rc . syncStates ( )
}
func ( rc * reconciler ) updateLastSyncTime ( ) {
rc . timeOfLastSync = time . Now ( )
}
func ( rc * reconciler ) StatesHasBeenSynced ( ) bool {
return ! rc . timeOfLastSync . IsZero ( )
}
type podVolume struct {
podName volumetypes . UniquePodName
volumeSpecName string
2019-03-29 00:03:05 +00:00
volumePath string
2019-01-12 04:58:27 +00:00
pluginName string
volumeMode v1 . PersistentVolumeMode
}
type reconstructedVolume struct {
volumeName v1 . UniqueVolumeName
podName volumetypes . UniquePodName
volumeSpec * volumepkg . Spec
outerVolumeSpecName string
pod * v1 . Pod
volumeGidValue string
devicePath string
mounter volumepkg . Mounter
2019-12-12 01:27:03 +00:00
deviceMounter volumepkg . DeviceMounter
2019-01-12 04:58:27 +00:00
blockVolumeMapper volumepkg . BlockVolumeMapper
}
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func ( rc * reconciler ) syncStates ( ) {
// Get volumes information by reading the pod's directory
podVolumes , err := getVolumesFromPodDir ( rc . kubeletPodsDir )
if err != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , "Cannot get volumes from disk" )
2019-01-12 04:58:27 +00:00
return
}
volumesNeedUpdate := make ( map [ v1 . UniqueVolumeName ] * reconstructedVolume )
volumeNeedReport := [ ] v1 . UniqueVolumeName { }
for _ , volume := range podVolumes {
if rc . actualStateOfWorld . VolumeExistsWithSpecName ( volume . podName , volume . volumeSpecName ) {
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( "Volume exists in actual state, skip cleaning up mounts" , "podName" , volume . podName , "volumeSpecName" , volume . volumeSpecName )
2019-01-12 04:58:27 +00:00
// There is nothing to reconstruct
continue
}
volumeInDSW := rc . desiredStateOfWorld . VolumeExistsWithSpecName ( volume . podName , volume . volumeSpecName )
reconstructedVolume , err := rc . reconstructVolume ( volume )
if err != nil {
if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( "Volume exists in desired state, skip cleaning up mounts" , "podName" , volume . podName , "volumeSpecName" , volume . volumeSpecName )
2019-01-12 04:58:27 +00:00
continue
}
// No pod needs the volume.
2021-03-18 22:40:29 +00:00
klog . InfoS ( "Could not construct volume information, cleaning up mounts" , "podName" , volume . podName , "volumeSpecName" , volume . volumeSpecName , "error" , err )
2019-01-12 04:58:27 +00:00
rc . cleanupMounts ( volume )
continue
}
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append ( volumeNeedReport , reconstructedVolume . volumeName )
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( "Volume exists in desired state, marking as InUse" , "podName" , volume . podName , "volumeSpecName" , volume . volumeSpecName )
2019-01-12 04:58:27 +00:00
continue
}
// There is no pod that uses the volume.
2020-03-26 21:07:15 +00:00
if rc . operationExecutor . IsOperationPending ( reconstructedVolume . volumeName , nestedpendingoperations . EmptyUniquePodName , nestedpendingoperations . EmptyNodeName ) {
2021-03-18 22:40:29 +00:00
klog . InfoS ( "Volume is in pending operation, skip cleaning up mounts" )
2019-01-12 04:58:27 +00:00
}
2021-03-18 22:40:29 +00:00
klog . V ( 2 ) . InfoS ( "Reconciler sync states: could not find pod information in desired state, update it in actual state" , "reconstructedVolume" , reconstructedVolume )
2019-01-12 04:58:27 +00:00
volumesNeedUpdate [ reconstructedVolume . volumeName ] = reconstructedVolume
}
if len ( volumesNeedUpdate ) > 0 {
if err = rc . updateStates ( volumesNeedUpdate ) ; err != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , "Error occurred during reconstruct volume from disk" )
2019-01-12 04:58:27 +00:00
}
}
if len ( volumeNeedReport ) > 0 {
rc . desiredStateOfWorld . MarkVolumesReportedInUse ( volumeNeedReport )
}
}
func ( rc * reconciler ) cleanupMounts ( volume podVolume ) {
2021-03-18 22:40:29 +00:00
klog . V ( 2 ) . InfoS ( "Reconciler sync states: could not find volume information in desired state, clean up the mount points" , "podName" , volume . podName , "volumeSpecName" , volume . volumeSpecName )
2019-01-12 04:58:27 +00:00
mountedVolume := operationexecutor . MountedVolume {
PodName : volume . podName ,
VolumeName : v1 . UniqueVolumeName ( volume . volumeSpecName ) ,
InnerVolumeSpecName : volume . volumeSpecName ,
PluginName : volume . pluginName ,
PodUID : types . UID ( volume . podName ) ,
}
// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
// to unmount both volume and device in the same routine.
err := rc . operationExecutor . UnmountVolume ( mountedVolume , rc . actualStateOfWorld , rc . kubeletPodsDir )
if err != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , mountedVolume . GenerateErrorDetailed ( "volumeHandler.UnmountVolumeHandler for UnmountVolume failed" , err ) . Error ( ) )
2019-01-12 04:58:27 +00:00
return
}
}
// Reconstruct volume data structure by reading the pod's volume directories
func ( rc * reconciler ) reconstructVolume ( volume podVolume ) ( * reconstructedVolume , error ) {
// plugin initializations
plugin , err := rc . volumePluginMgr . FindPluginByName ( volume . pluginName )
if err != nil {
return nil , err
}
attachablePlugin , err := rc . volumePluginMgr . FindAttachablePluginByName ( volume . pluginName )
if err != nil {
return nil , err
}
2019-01-22 20:53:35 +00:00
deviceMountablePlugin , err := rc . volumePluginMgr . FindDeviceMountablePluginByName ( volume . pluginName )
if err != nil {
return nil , err
}
2019-01-12 04:58:27 +00:00
// Create pod object
pod := & v1 . Pod {
ObjectMeta : metav1 . ObjectMeta {
UID : types . UID ( volume . podName ) ,
} ,
}
mapperPlugin , err := rc . volumePluginMgr . FindMapperPluginByName ( volume . pluginName )
if err != nil {
return nil , err
}
2020-03-26 21:07:15 +00:00
if volume . volumeMode == v1 . PersistentVolumeBlock && mapperPlugin == nil {
2019-09-27 21:51:53 +00:00
return nil , fmt . Errorf ( "could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)" , volume . pluginName , volume . volumeSpecName , volume . podName , pod . UID )
}
2019-01-12 04:58:27 +00:00
volumeSpec , err := rc . operationExecutor . ReconstructVolumeOperation (
volume . volumeMode ,
plugin ,
mapperPlugin ,
pod . UID ,
volume . podName ,
volume . volumeSpecName ,
2019-03-29 00:03:05 +00:00
volume . volumePath ,
2019-01-12 04:58:27 +00:00
volume . pluginName )
if err != nil {
return nil , err
}
var uniqueVolumeName v1 . UniqueVolumeName
2019-01-22 20:53:35 +00:00
if attachablePlugin != nil || deviceMountablePlugin != nil {
2019-01-12 04:58:27 +00:00
uniqueVolumeName , err = util . GetUniqueVolumeNameFromSpec ( plugin , volumeSpec )
if err != nil {
return nil , err
}
} else {
2019-01-22 20:53:35 +00:00
uniqueVolumeName = util . GetUniqueVolumeNameFromSpecWithPod ( volume . podName , plugin , volumeSpec )
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
var volumeMapper volumepkg . BlockVolumeMapper
var volumeMounter volumepkg . Mounter
2019-12-12 01:27:03 +00:00
var deviceMounter volumepkg . DeviceMounter
2019-09-27 21:51:53 +00:00
// Path to the mount or block device to check
var checkPath string
2020-03-26 21:07:15 +00:00
if volume . volumeMode == v1 . PersistentVolumeBlock {
2019-09-27 21:51:53 +00:00
var newMapperErr error
volumeMapper , newMapperErr = mapperPlugin . NewBlockVolumeMapper (
volumeSpec ,
pod ,
volumepkg . VolumeOptions { } )
if newMapperErr != nil {
return nil , fmt . Errorf (
"reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v" ,
uniqueVolumeName ,
volumeSpec . Name ( ) ,
volume . podName ,
pod . UID ,
newMapperErr )
}
2019-12-12 01:27:03 +00:00
mapDir , linkName := volumeMapper . GetPodDeviceMapPath ( )
checkPath = filepath . Join ( mapDir , linkName )
2019-09-27 21:51:53 +00:00
} else {
var err error
volumeMounter , err = plugin . NewMounter (
volumeSpec ,
pod ,
volumepkg . VolumeOptions { } )
if err != nil {
return nil , fmt . Errorf (
"reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v" ,
uniqueVolumeName ,
volumeSpec . Name ( ) ,
volume . podName ,
pod . UID ,
err )
}
checkPath = volumeMounter . GetPath ( )
2019-12-12 01:27:03 +00:00
if deviceMountablePlugin != nil {
deviceMounter , err = deviceMountablePlugin . NewDeviceMounter ( )
if err != nil {
return nil , fmt . Errorf ( "reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v" ,
uniqueVolumeName ,
volumeSpec . Name ( ) ,
volume . podName ,
pod . UID ,
err )
}
}
2019-01-12 04:58:27 +00:00
}
2019-03-29 00:03:05 +00:00
// Check existence of mount point for filesystem volume or symbolic link for block volume
2019-09-27 21:51:53 +00:00
isExist , checkErr := rc . operationExecutor . CheckVolumeExistenceOperation ( volumeSpec , checkPath , volumeSpec . Name ( ) , rc . mounter , uniqueVolumeName , volume . podName , pod . UID , attachablePlugin )
2019-03-29 00:03:05 +00:00
if checkErr != nil {
return nil , checkErr
}
// If mount or symlink doesn't exist, volume reconstruction should be failed
if ! isExist {
2019-09-27 21:51:53 +00:00
return nil , fmt . Errorf ( "volume: %q is not mounted" , uniqueVolumeName )
2019-01-12 04:58:27 +00:00
}
reconstructedVolume := & reconstructedVolume {
volumeName : uniqueVolumeName ,
podName : volume . podName ,
volumeSpec : volumeSpec ,
// volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
// for volume cleanup.
// TODO: in case pod is added back before reconciler starts to unmount, we can update this field from desired state information
outerVolumeSpecName : volume . volumeSpecName ,
pod : pod ,
2019-12-12 01:27:03 +00:00
deviceMounter : deviceMounter ,
2019-01-12 04:58:27 +00:00
volumeGidValue : "" ,
// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
// TODO: get device path directly from the volume mount path.
devicePath : "" ,
mounter : volumeMounter ,
blockVolumeMapper : volumeMapper ,
}
return reconstructedVolume , nil
}
// updateDevicePath gets the node status to retrieve volume device path information.
func ( rc * reconciler ) updateDevicePath ( volumesNeedUpdate map [ v1 . UniqueVolumeName ] * reconstructedVolume ) {
2020-03-26 21:07:15 +00:00
node , fetchErr := rc . kubeClient . CoreV1 ( ) . Nodes ( ) . Get ( context . TODO ( ) , string ( rc . nodeName ) , metav1 . GetOptions { } )
2019-01-12 04:58:27 +00:00
if fetchErr != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( fetchErr , "UpdateStates in reconciler: could not get node status with error" )
2019-01-12 04:58:27 +00:00
} else {
for _ , attachedVolume := range node . Status . VolumesAttached {
if volume , exists := volumesNeedUpdate [ attachedVolume . Name ] ; exists {
volume . devicePath = attachedVolume . DevicePath
volumesNeedUpdate [ attachedVolume . Name ] = volume
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( "Update devicePath from node status for volume" , "volumeName" , attachedVolume . Name , "path" , volume . devicePath )
2019-01-12 04:58:27 +00:00
}
}
}
}
2019-12-12 01:27:03 +00:00
// getDeviceMountPath returns device mount path for block volume which
// implements BlockVolumeMapper or filesystem volume which implements
// DeviceMounter
2019-01-12 04:58:27 +00:00
func getDeviceMountPath ( volume * reconstructedVolume ) ( string , error ) {
if volume . blockVolumeMapper != nil {
2019-12-12 01:27:03 +00:00
// for block volume, we return its global map path
return volume . blockVolumeMapper . GetGlobalMapPath ( volume . volumeSpec )
} else if volume . deviceMounter != nil {
// for filesystem volume, we return its device mount path if the plugin implements DeviceMounter
return volume . deviceMounter . GetDeviceMountPath ( volume . volumeSpec )
} else {
return "" , fmt . Errorf ( "blockVolumeMapper or deviceMounter required" )
2019-01-12 04:58:27 +00:00
}
}
func ( rc * reconciler ) updateStates ( volumesNeedUpdate map [ v1 . UniqueVolumeName ] * reconstructedVolume ) error {
// Get the node status to retrieve volume device path information.
rc . updateDevicePath ( volumesNeedUpdate )
for _ , volume := range volumesNeedUpdate {
err := rc . actualStateOfWorld . MarkVolumeAsAttached (
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
volume . volumeName , volume . volumeSpec , "" /* nodeName */ , volume . devicePath )
if err != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , "Could not add volume information to actual state of world" )
2019-01-12 04:58:27 +00:00
continue
}
2020-03-26 21:07:15 +00:00
markVolumeOpts := operationexecutor . MarkVolumeOpts {
PodName : volume . podName ,
PodUID : types . UID ( volume . podName ) ,
VolumeName : volume . volumeName ,
Mounter : volume . mounter ,
BlockVolumeMapper : volume . blockVolumeMapper ,
OuterVolumeSpecName : volume . outerVolumeSpecName ,
VolumeGidVolume : volume . volumeGidValue ,
VolumeSpec : volume . volumeSpec ,
VolumeMountState : operationexecutor . VolumeMounted ,
}
err = rc . actualStateOfWorld . MarkVolumeAsMounted ( markVolumeOpts )
2019-01-12 04:58:27 +00:00
if err != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , "Could not add pod to volume information to actual state of world" )
2019-01-12 04:58:27 +00:00
continue
}
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( "Volume is marked as mounted and added into the actual state" , "podName" , volume . podName , "volumeName" , volume . volumeName )
2019-12-12 01:27:03 +00:00
// If the volume has device to mount, we mark its device as mounted.
if volume . deviceMounter != nil || volume . blockVolumeMapper != nil {
2019-01-12 04:58:27 +00:00
deviceMountPath , err := getDeviceMountPath ( volume )
if err != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , "Could not find device mount path for volume" , "volumeName" , volume . volumeName )
2019-01-12 04:58:27 +00:00
continue
}
err = rc . actualStateOfWorld . MarkDeviceAsMounted ( volume . volumeName , volume . devicePath , deviceMountPath )
if err != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , "Could not mark device is mounted to actual state of world" )
2019-01-12 04:58:27 +00:00
continue
}
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( "Volume is marked device as mounted and added into the actual state" , "podName" , volume . podName , "volumeName" , volume . volumeName )
2019-01-12 04:58:27 +00:00
}
}
return nil
}
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name.
func getVolumesFromPodDir ( podDir string ) ( [ ] podVolume , error ) {
podsDirInfo , err := ioutil . ReadDir ( podDir )
if err != nil {
return nil , err
}
volumes := [ ] podVolume { }
for i := range podsDirInfo {
if ! podsDirInfo [ i ] . IsDir ( ) {
continue
}
podName := podsDirInfo [ i ] . Name ( )
podDir := path . Join ( podDir , podName )
// Find filesystem volume information
// ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName}
volumesDirs := map [ v1 . PersistentVolumeMode ] string {
v1 . PersistentVolumeFilesystem : path . Join ( podDir , config . DefaultKubeletVolumesDirName ) ,
}
2020-03-26 21:07:15 +00:00
// Find block volume information
// ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
volumesDirs [ v1 . PersistentVolumeBlock ] = path . Join ( podDir , config . DefaultKubeletVolumeDevicesDirName )
2019-01-12 04:58:27 +00:00
for volumeMode , volumesDir := range volumesDirs {
var volumesDirInfo [ ] os . FileInfo
if volumesDirInfo , err = ioutil . ReadDir ( volumesDir ) ; err != nil {
// Just skip the loop because given volumesDir doesn't exist depending on volumeMode
continue
}
for _ , volumeDir := range volumesDirInfo {
pluginName := volumeDir . Name ( )
volumePluginPath := path . Join ( volumesDir , pluginName )
2019-04-07 17:07:55 +00:00
volumePluginDirs , err := utilpath . ReadDirNoStat ( volumePluginPath )
2019-01-12 04:58:27 +00:00
if err != nil {
2021-03-18 22:40:29 +00:00
klog . ErrorS ( err , "Could not read volume plugin directory" , "volumePluginPath" , volumePluginPath )
2019-01-12 04:58:27 +00:00
continue
}
2019-04-07 17:07:55 +00:00
unescapePluginName := utilstrings . UnescapeQualifiedName ( pluginName )
2019-01-12 04:58:27 +00:00
for _ , volumeName := range volumePluginDirs {
2019-03-29 00:03:05 +00:00
volumePath := path . Join ( volumePluginPath , volumeName )
2021-03-18 22:40:29 +00:00
klog . V ( 5 ) . InfoS ( "Volume path from volume plugin directory" , "podName" , podName , "volumePath" , volumePath )
2019-01-12 04:58:27 +00:00
volumes = append ( volumes , podVolume {
podName : volumetypes . UniquePodName ( podName ) ,
volumeSpecName : volumeName ,
2019-03-29 00:03:05 +00:00
volumePath : volumePath ,
2019-01-12 04:58:27 +00:00
pluginName : unescapePluginName ,
volumeMode : volumeMode ,
} )
}
}
}
}
2021-03-18 22:40:29 +00:00
klog . V ( 4 ) . InfoS ( "Get volumes from pod directory" , "path" , podDir , "volumes" , volumes )
2019-01-12 04:58:27 +00:00
return volumes , nil
}