This change supports robust kubelet volume cleanup

Currently kubelet volume management works on the concept of desired
and actual world of states. The volume manager periodically compares the
two worlds and perform volume mount/unmount and/or attach/detach
operations. When kubelet restarts, the cache of those two worlds are
gone. Although desired world can be recovered through apiserver, actual
world can not be recovered which may cause some volumes cannot be cleaned
up if their information is deleted by apiserver. This change adds the
reconstruction of the actual world by reading the pod directories from
disk. The reconstructed volume information is added to both desired
world and actual world if it cannot be found in either world. The rest
logic would be as same as before, desired world populator may clean up
the volume entry if it is no longer in apiserver, and then volume
manager should invoke unmount to clean it up.
pull/6/head
Jing Xu 2016-06-23 12:46:21 -07:00
parent 936c5171a5
commit f19a1148db
46 changed files with 999 additions and 247 deletions

View File

@ -29,6 +29,13 @@ import (
"github.com/spf13/pflag"
)
const (
DefaultKubeletPodsDirName = "pods"
DefaultKubeletVolumesDirName = "volumes"
DefaultKubeletPluginsDirName = "plugins"
DefaultKubeletContainersDirName = "containers"
)
// KubeletServer encapsulates all of the parameters necessary for starting up
// a kubelet. These can either be set via command line or directly.
type KubeletServer struct {

View File

@ -1056,6 +1056,10 @@ func (plugin *mockVolumePlugin) RequiresRemount() bool {
return false
}
func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol.Spec, error) {
return nil, nil
}
func (plugin *mockVolumePlugin) NewMounter(spec *vol.Spec, podRef *api.Pod, opts vol.VolumeOptions) (vol.Mounter, error) {
return nil, fmt.Errorf("Mounter is not supported by this plugin")
}

View File

@ -50,6 +50,9 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) {
func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
func (mi *fakeMountInterface) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
return "", nil
}
func (mi *fakeMountInterface) DeviceOpened(pathname string) (bool, error) {
for _, mp := range mi.mountPoints {

View File

@ -514,6 +514,7 @@ func NewMainKubelet(
return nil, err
}
// setup volumeManager
klet.volumeManager, err = volumemanager.NewVolumeManager(
enableControllerAttachDetach,
nodeName,
@ -521,7 +522,8 @@ func NewMainKubelet(
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
mounter)
mounter,
klet.getPodsDir())
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
@ -957,7 +959,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}
// Start volume manager
go kl.volumeManager.Run(wait.NeverStop)
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
@ -1731,16 +1733,21 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(
if allPods.Has(string(uid)) {
continue
}
// If volumes have not been unmounted/detached, do not delete directory.
// Doing so may result in corruption of data.
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
// If volumes have not been unmounted/detached, do not delete directory.
// Doing so may result in corruption of data.
glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err)
continue
}
// Check whether volume is still mounted on disk. If so, do not delete directory
if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 {
glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames)
continue
}
glog.V(3).Infof("Orphaned pod %q found, removing", uid)
if err := os.RemoveAll(kl.getPodDir(uid)); err != nil {
glog.Infof("Failed to remove orphaned pod %q dir; err: %v", uid, err)
glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err)
errlist = append(errlist, err)
}
}

View File

@ -18,14 +18,17 @@ package kubelet
import (
"fmt"
"io/ioutil"
"net"
"path"
"github.com/golang/glog"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
@ -40,7 +43,7 @@ func (kl *Kubelet) getRootDir() string {
// getPodsDir returns the full path to the directory under which pod
// directories are created.
func (kl *Kubelet) getPodsDir() string {
return path.Join(kl.getRootDir(), "pods")
return path.Join(kl.getRootDir(), options.DefaultKubeletPodsDirName)
}
// getPluginsDir returns the full path to the directory under which plugin
@ -48,7 +51,7 @@ func (kl *Kubelet) getPodsDir() string {
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func (kl *Kubelet) getPluginsDir() string {
return path.Join(kl.getRootDir(), "plugins")
return path.Join(kl.getRootDir(), options.DefaultKubeletPluginsDirName)
}
// getPluginDir returns a data directory name for a given plugin name.
@ -90,7 +93,7 @@ func (kl *Kubelet) getPodDir(podUID types.UID) string {
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string {
return path.Join(kl.getPodDir(podUID), "volumes")
return path.Join(kl.getPodDir(podUID), options.DefaultKubeletVolumesDirName)
}
// getPodVolumeDir returns the full path to the directory which represents the
@ -104,7 +107,7 @@ func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeNa
// which plugins may store data for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string {
return path.Join(kl.getPodDir(podUID), "plugins")
return path.Join(kl.getPodDir(podUID), options.DefaultKubeletPluginsDirName)
}
// getPodPluginDir returns a data directory name for a given plugin name for a
@ -126,7 +129,7 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
// old && new = use new (but warn)
oldPath := path.Join(kl.getPodDir(podUID), ctrName)
oldExists := dirExists(oldPath)
newPath := path.Join(kl.getPodDir(podUID), "containers", ctrName)
newPath := path.Join(kl.getPodDir(podUID), options.DefaultKubeletContainersDirName, ctrName)
newExists := dirExists(newPath)
if oldExists && !newExists {
return oldPath
@ -234,3 +237,32 @@ func (kl *Kubelet) getHostIPAnyWay() (net.IP, error) {
func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 {
return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod)
}
// getPodVolumeNameListFromDisk returns a list of the volume names by reading the
// volume directories for the given pod from the disk.
func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, error) {
volumes := []string{}
podVolDir := kl.getPodVolumesDir(podUID)
volumePluginDirs, err := ioutil.ReadDir(podVolDir)
if err != nil {
glog.Errorf("Could not read directory %s: %v", podVolDir, err)
return volumes, err
}
for _, volumePluginDir := range volumePluginDirs {
volumePluginName := volumePluginDir.Name()
volumePluginPath := path.Join(podVolDir, volumePluginName)
volumeDirs, volumeDirsStatErrs, err := util.ReadDirNoExit(volumePluginPath)
if err != nil {
return volumes, fmt.Errorf("Could not read directory %s: %v", volumePluginPath, err)
}
for i, volumeDir := range volumeDirs {
if volumeDir != nil {
volumes = append(volumes, volumeDir.Name())
continue
}
glog.Errorf("Could not read directory %s: %v", podVolDir, volumeDirsStatErrs[i])
}
}
return volumes, nil
}

View File

@ -248,7 +248,8 @@ func newTestKubeletWithImageList(
fakeKubeClient,
kubelet.volumePluginMgr,
fakeRuntime,
kubelet.mounter)
kubelet.mounter,
kubelet.getPodsDir())
if err != nil {
t.Fatalf("failed to initialize volume manager: %v", err)
}
@ -404,8 +405,7 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
},
})
stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
@ -474,8 +474,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
},
})
stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
@ -603,8 +602,7 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
},
})
stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
@ -697,8 +695,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
},
})
stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
@ -856,8 +853,7 @@ func TestPodVolumesExist(t *testing.T) {
},
}
stopCh := make(chan struct{})
go kubelet.volumeManager.Run(stopCh)
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
@ -3939,3 +3935,9 @@ func simulateVolumeInUseUpdate(
}
}
}
func runVolumeManager(kubelet *Kubelet) chan struct{} {
stopCh := make(chan struct{})
go kubelet.volumeManager.Run(kubelet.sourcesReady, stopCh)
return stopCh
}

View File

@ -99,7 +99,8 @@ func TestRunOnce(t *testing.T) {
kb.kubeClient,
kb.volumePluginMgr,
fakeRuntime,
kb.mounter)
kb.mounter,
kb.getPodsDir())
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR)
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency

View File

@ -136,6 +136,11 @@ type ActualStateOfWorld interface {
// have no mountedPods. This list can be used to determine which volumes are
// no longer referenced and may be globally unmounted and detached.
GetUnmountedVolumes() []AttachedVolume
// GetPods generates and returns a map of pods in which map is indexed
// with pod's unique name. This map can be used to determine which pod is currently
// in actual state of world.
GetPods() map[volumetypes.UniquePodName]bool
}
// MountedVolume represents a volume that has successfully been mounted to a pod.
@ -573,6 +578,21 @@ func (asw *actualStateOfWorld) GetUnmountedVolumes() []AttachedVolume {
return unmountedVolumes
}
func (asw *actualStateOfWorld) GetPods() map[volumetypes.UniquePodName]bool {
asw.RLock()
defer asw.RUnlock()
podList := make(map[volumetypes.UniquePodName]bool)
for _, volumeObj := range asw.attachedVolumes {
for podName := range volumeObj.mountedPods {
if !podList[podName] {
podList[podName] = true
}
}
}
return podList
}
func (asw *actualStateOfWorld) newAttachedVolume(
attachedVolume *attachedVolume) AttachedVolume {
return AttachedVolume{

View File

@ -92,6 +92,11 @@ type DesiredStateOfWorld interface {
// attached to this node and the pods they should be mounted to based on the
// current desired state of the world.
GetVolumesToMount() []VolumeToMount
// GetPods generates and returns a map of pods in which map is indexed
// with pod's unique name. This map can be used to determine which pod is currently
// in desired state of world.
GetPods() map[types.UniquePodName]bool
}
// VolumeToMount represents a volume that is attached to this node and needs to
@ -117,6 +122,7 @@ type desiredStateOfWorld struct {
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}
@ -203,7 +209,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
} else {
// For non-attachable volumes, generate a unique name based on the pod
// namespace and name and the name of the volume within the pod.
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, outerVolumeSpecName)
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)
}
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
@ -296,6 +302,21 @@ func (dsw *desiredStateOfWorld) PodExistsInVolume(
return podExists
}
func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool {
dsw.RLock()
defer dsw.RUnlock()
podList := make(map[types.UniquePodName]bool)
for _, volumeObj := range dsw.volumesToMount {
for podName := range volumeObj.podsToMount {
if !podList[podName] {
podList[podName] = true
}
}
}
return podList
}
func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
dsw.RLock()
defer dsw.RUnlock()

View File

@ -20,16 +20,27 @@ limitations under the License.
package reconciler
import (
"fmt"
"io/ioutil"
"path"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// Reconciler runs a periodic loop to reconcile the desired state of the world
@ -46,7 +57,7 @@ type Reconciler interface {
// If attach/detach management is enabled, the manager will also check if
// volumes that should be attached are attached and volumes that should
// be detached are detached and trigger attach/detach operations as needed.
Run(stopCh <-chan struct{})
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
}
// NewReconciler returns a new instance of Reconciler.
@ -56,6 +67,8 @@ type Reconciler interface {
// this node, and therefore the volume manager should not
// loopSleepDuration - the amount of time the reconciler loop sleeps between
// successive executions
// reconstructDuration - the amount of time the reconstruct sleeps between
// successive executions
// waitForAttachTimeout - the amount of time the Mount function will wait for
// the volume to be attached
// hostName - the hostname for this node, used by Attach and Detach methods
@ -65,26 +78,34 @@ type Reconciler interface {
// safely (prevents more than one operation from being triggered on the same
// volume)
// mounter - mounter passed in from kubelet, passed down unmount path
// volumePluginMrg - volume plugin manager passed from kubelet
func NewReconciler(
kubeClient internalclientset.Interface,
controllerAttachDetachEnabled bool,
loopSleepDuration time.Duration,
reconstructDuration time.Duration,
waitForAttachTimeout time.Duration,
hostName string,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
operationExecutor operationexecutor.OperationExecutor,
mounter mount.Interface) Reconciler {
mounter mount.Interface,
volumePluginMgr *volume.VolumePluginMgr,
kubeletPodsDir string) Reconciler {
return &reconciler{
kubeClient: kubeClient,
controllerAttachDetachEnabled: controllerAttachDetachEnabled,
loopSleepDuration: loopSleepDuration,
reconstructDuration: reconstructDuration,
waitForAttachTimeout: waitForAttachTimeout,
hostName: hostName,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
operationExecutor: operationExecutor,
mounter: mounter,
volumePluginMgr: volumePluginMgr,
kubeletPodsDir: kubeletPodsDir,
timeOfLastReconstruct: time.Now(),
}
}
@ -92,149 +113,95 @@ type reconciler struct {
kubeClient internalclientset.Interface
controllerAttachDetachEnabled bool
loopSleepDuration time.Duration
reconstructDuration time.Duration
waitForAttachTimeout time.Duration
hostName string
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
volumePluginMgr *volume.VolumePluginMgr
kubeletPodsDir string
timeOfLastReconstruct time.Time
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc() func() {
func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) func() {
return func() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.reconcile()
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).",
// Add all sources ready check so that reconciler's reconstruct process will start after
// desired state of world is populated with pod volume information from different sources. Otherwise,
// reconciler's reconstruct process may add incomplete volume information and cause confusion.
// In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes
// that are still in use because desired states could not get a complete list of pods.
if sourcesReady.AllReady() && time.Since(rc.timeOfLastReconstruct) > rc.reconstructDuration {
glog.V(5).Infof("Sources are all ready, starting reconstruct state function")
rc.reconstruct()
}
}
}
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).",
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID)
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).",
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID)
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).",
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID)
}
}
}
}
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.hostName,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.hostName,
}
glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
}
} else if !volMounted || cache.IsRemountRequiredError(err) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
if cache.IsRemountRequiredError(err) {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.Pod.UID)
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.hostName,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
@ -242,7 +209,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
@ -251,77 +218,341 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
err)
}
if err == nil {
glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.hostName,
}
glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
}
} else if !volMounted || cache.IsRemountRequiredError(err) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
if cache.IsRemountRequiredError(err) {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
}
}
}
// Ensure devices that should be detached/unmounted are detached/unmounted.
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) {
if attachedVolume.GloballyMounted {
// Volume is globally mounted to device, unmount it
glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)",
// Ensure devices that should be detached/unmounted are detached/unmounted.
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) {
if attachedVolume.GloballyMounted {
// Volume is globally mounted to device, unmount it
glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
}
} else {
// Volume is attached to node, detach it
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin,
// so just remove it to actualStateOfWorld without attach.
rc.actualStateOfWorld.MarkVolumeAsDetached(
attachedVolume.VolumeName, rc.hostName)
} else {
// Only detach if kubelet detach is enabled
glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)",
glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
}
} else {
// Volume is attached to node, detach it
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin,
// so just remove it to actualStateOfWorld without attach.
rc.actualStateOfWorld.MarkVolumeAsDetached(
attachedVolume.VolumeName, rc.hostName)
} else {
// Only detach if kubelet detach is enabled
glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
}
}
}
}
}
}
}
// reconstruct process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and udpate the actual and desired states. In the following reconciler loop, those volumes will
// be cleaned up.
func (rc *reconciler) reconstruct() {
defer rc.updateReconstructTime()
rc.reconstructStates(rc.kubeletPodsDir)
}
func (rc *reconciler) updateReconstructTime() {
rc.timeOfLastReconstruct = time.Now()
}
type podVolume struct {
podName volumetypes.UniquePodName
volumeSpecName string
mountPath string
pluginName string
}
// reconstructFromDisk scans the volume directories under the given pod directory. If the volume is not
// in either actual or desired state of world, or pending operation, this function will reconstuct
// the volume spec and put it in both the actual and desired state of worlds. If no running
// container is mounting the volume, the volume will be removed by desired state of world's populator and
// cleaned up by the reconciler.
func (rc *reconciler) reconstructStates(podsDir string) {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(podsDir)
if err != nil {
glog.Errorf("Cannot get volumes from disk %v", err)
return
}
for _, volume := range podVolumes {
volumeToMount, err := rc.reconstructVolume(volume)
if err != nil {
glog.Errorf("Could not construct volume information: %v", err)
continue
}
// Check if there is an pending operation for the given pod and volume.
// Need to check pending operation before checking the actual and desired
// states to avoid race condition during checking. For exmaple, the following
// might happen if pending operation is checked after checking actual and desired states.
// 1. Checking the pod and it does not exist in either actual or desired state.
// 2. An operation for the given pod finishes and the actual state is updated.
// 3. Checking and there is no pending operation for the given pod.
if rc.operationExecutor.IsOperationPending(volumeToMount.VolumeName, volumeToMount.PodName) {
continue
}
desiredPods := rc.desiredStateOfWorld.GetPods()
actualPods := rc.actualStateOfWorld.GetPods()
if desiredPods[volume.podName] || actualPods[volume.podName] {
continue
}
glog.V(3).Infof(
"Could not find pod information in desired or actual states or pending operation, update it in both states: %+v",
volumeToMount)
if err = rc.updateStates(volumeToMount); err != nil {
glog.Errorf("Error occured during reconstruct volume from disk: %v", err)
}
}
}
// Reconstruct Volume object and volumeToMount data structure by reading the pod's volume directories
func (rc *reconciler) reconstructVolume(volume podVolume) (*operationexecutor.VolumeToMount, error) {
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
volumeSpec, err := plugin.ConstructVolumeSpec(volume.volumeSpecName, volume.mountPath)
if err != nil {
return nil, err
}
volumeName, err := plugin.GetVolumeName(volumeSpec)
if err != nil {
return nil, err
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: types.UID(volume.podName),
},
}
attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginByName(volume.pluginName)
if err != nil {
return nil, err
}
var uniqueVolumeName api.UniqueVolumeName
if attachablePlugin != nil {
uniqueVolumeName = volumehelper.GetUniqueVolumeName(volume.pluginName, volumeName)
} else {
uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec)
}
volumeToMount := &operationexecutor.VolumeToMount{
VolumeName: uniqueVolumeName,
PodName: volume.podName,
VolumeSpec: volumeSpec,
OuterVolumeSpecName: volumeName, /*volumeName is InnerVolumeSpecName. But this information will not be used for cleanup*/
Pod: pod,
PluginIsAttachable: attachablePlugin != nil,
VolumeGidValue: "",
DevicePath: "",
}
return volumeToMount, nil
}
func (rc *reconciler) updateStates(volumeToMount *operationexecutor.VolumeToMount) error {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeName, volumeToMount.VolumeSpec, "", volumeToMount.DevicePath)
if err != nil {
return fmt.Errorf("Could not add volume information to actual state of world: %v", err)
}
err = rc.actualStateOfWorld.AddPodToVolume(
volumeToMount.PodName,
types.UID(volumeToMount.PodName),
volumeToMount.VolumeName,
nil,
volumeToMount.OuterVolumeSpecName,
volumeToMount.DevicePath)
if err != nil {
return fmt.Errorf("Could not add pod to volume information to actual state of world: %v", err)
}
if volumeToMount.PluginIsAttachable {
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volumeToMount.VolumeName)
if err != nil {
return fmt.Errorf("Could not mark device is mounted to actual state of world: %v", err)
}
}
_, err = rc.desiredStateOfWorld.AddPodToVolume(volumeToMount.PodName,
volumeToMount.Pod,
volumeToMount.VolumeSpec,
volumeToMount.OuterVolumeSpecName,
volumeToMount.VolumeGidValue)
if err != nil {
return fmt.Errorf("Could not add pod to volume information to desired state of world: %v", err)
}
return nil
}
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name.
func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
podsDirInfo, err := ioutil.ReadDir(podDir)
if err != nil {
return nil, err
}
volumes := []podVolume{}
for i := range podsDirInfo {
if !podsDirInfo[i].IsDir() {
continue
}
podName := podsDirInfo[i].Name()
podDir := path.Join(podDir, podName)
volumesDir := path.Join(podDir, options.DefaultKubeletVolumesDirName)
volumesDirInfo, err := ioutil.ReadDir(volumesDir)
if err != nil {
glog.Errorf("Could not read volume directory %q: %v", volumesDir, err)
continue
}
for _, volumeDir := range volumesDirInfo {
pluginName := volumeDir.Name()
volumePluginPath := path.Join(volumesDir, pluginName)
volumePluginDirs, err := ioutil.ReadDir(volumePluginPath)
if err != nil {
glog.Errorf("Could not read volume plugin directory %q: %v", volumePluginPath, err)
continue
}
unescapePluginName := strings.UnescapeQualifiedNameForDisk(pluginName)
for _, volumeNameDir := range volumePluginDirs {
if volumeNameDir != nil {
volumeName := volumeNameDir.Name()
mountPath := path.Join(volumePluginPath, volumeName)
volumes = append(volumes, podVolume{
podName: volumetypes.UniquePodName(podName),
volumeSpecName: volumeName,
mountPath: mountPath,
pluginName: unescapePluginName,
})
}
}
}
}
glog.V(10).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
return volumes, nil
}

View File

@ -25,9 +25,11 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
@ -38,12 +40,13 @@ import (
const (
// reconcilerLoopSleepDuration is the amount of time the reconciler loop
// waits between successive executions
reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond
reconcilerLoopSleepDuration time.Duration = 0 * time.Millisecond
reconcilerReconstructSleepPeriod time.Duration = 10 * time.Minute
// waitForAttachTimeout is the maximum amount of time a
// operationexecutor.Mount call will wait for a volume to be attached.
waitForAttachTimeout time.Duration = 1 * time.Second
nodeName string = "myhostname"
kubeletPodsDir string = "fake-dir"
)
// Calls Run()
@ -59,15 +62,18 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
kubeClient,
false, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
asw,
oex,
&mount.FakeMounter{})
&mount.FakeMounter{},
volumePluginMgr,
kubeletPodsDir)
// Act
go reconciler.Run(wait.NeverStop)
runReconciler(reconciler)
// Assert
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
@ -92,12 +98,15 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
kubeClient,
false, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
asw,
oex,
&mount.FakeMounter{})
&mount.FakeMounter{},
volumePluginMgr,
kubeletPodsDir)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -128,9 +137,8 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
}
// Act
go reconciler.Run(wait.NeverStop)
runReconciler(reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
1 /* expectedAttachCallCount */, fakePlugin))
@ -160,12 +168,15 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
asw,
oex,
&mount.FakeMounter{})
&mount.FakeMounter{},
volumePluginMgr,
kubeletPodsDir)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -197,7 +208,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
}
// Act
go reconciler.Run(wait.NeverStop)
runReconciler(reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
@ -228,12 +239,15 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
kubeClient,
false, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
asw,
oex,
&mount.FakeMounter{})
&mount.FakeMounter{},
volumePluginMgr,
kubeletPodsDir)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -264,9 +278,8 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
}
// Act
go reconciler.Run(wait.NeverStop)
runReconciler(reconciler)
waitForMount(t, fakePlugin, generatedVolumeName, asw)
// Assert
assert.NoError(t, volumetesting.VerifyAttachCallCount(
1 /* expectedAttachCallCount */, fakePlugin))
@ -308,12 +321,15 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
reconcilerReconstructSleepPeriod,
waitForAttachTimeout,
nodeName,
dsw,
asw,
oex,
&mount.FakeMounter{})
&mount.FakeMounter{},
volumePluginMgr,
kubeletPodsDir)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -344,7 +360,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
}
// Act
go reconciler.Run(wait.NeverStop)
runReconciler(reconciler)
dsw.MarkVolumesReportedInUse([]api.UniqueVolumeName{generatedVolumeName})
waitForMount(t, fakePlugin, generatedVolumeName, asw)
@ -445,3 +462,8 @@ func createTestClient() *fake.Clientset {
})
return fakeClient
}
func runReconciler(reconciler Reconciler) {
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return false })
go reconciler.Run(sourcesReady, wait.NeverStop)
}

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
@ -46,6 +47,10 @@ const (
// between successive executions
reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond
// reconcilerReconstructSleepPeriod is the amount of time the reconciler reconstruct process
// waits between successive executions
reconcilerReconstructSleepPeriod time.Duration = 3 * time.Minute
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond
@ -76,6 +81,10 @@ const (
// operation is waiting it only blocks other operations on the same device,
// other devices are not affected.
waitForAttachTimeout time.Duration = 10 * time.Minute
// reconcilerStartGracePeriod is the maximum amount of time volume manager
// can wait to start reconciler
reconcilerStartGracePeriod time.Duration = 60 * time.Second
)
// VolumeManager runs a set of asynchronous loops that figure out which volumes
@ -83,7 +92,7 @@ const (
// this node and makes it so.
type VolumeManager interface {
// Starts the volume manager and all the asynchronous loops that it controls
Run(stopCh <-chan struct{})
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
// WaitForAttachAndMount processes the volumes referenced in the specified
// pod and blocks until they are all attached and mounted (reflected in
@ -138,7 +147,8 @@ func NewVolumeManager(
kubeClient internalclientset.Interface,
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime kubecontainer.Runtime,
mounter mount.Interface) (VolumeManager, error) {
mounter mount.Interface,
kubeletPodsDir string) (VolumeManager, error) {
vm := &volumeManager{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
@ -153,12 +163,15 @@ func NewVolumeManager(
kubeClient,
controllerAttachDetachEnabled,
reconcilerLoopSleepPeriod,
reconcilerReconstructSleepPeriod,
waitForAttachTimeout,
hostName,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
vm.operationExecutor,
mounter)
mounter,
volumePluginMgr,
kubeletPodsDir)
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,
@ -208,12 +221,14 @@ type volumeManager struct {
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
}
func (vm *volumeManager) Run(stopCh <-chan struct{}) {
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
glog.Infof("Starting Kubelet Volume Manager")
go vm.reconciler.Run(stopCh)
go vm.desiredStateOfWorldPopulator.Run(stopCh)
glog.V(2).Infof("The desired_state_of_world populator starts")
glog.Infof("Starting Kubelet Volume Manager")
go vm.reconciler.Run(sourcesReady, stopCh)
<-stopCh
glog.Infof("Shutting down Kubelet Volume Manager")

View File

@ -26,11 +26,13 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/kubelet/config"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/pod"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/sets"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
@ -58,8 +60,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("Failed to initialize volume manager: %v", err)
}
stopCh := make(chan struct{})
go manager.Run(stopCh)
stopCh := runVolumeManager(manager)
defer close(stopCh)
podManager.SetPods([]*api.Pod{pod})
@ -149,8 +150,10 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
continue
}
stopCh := make(chan struct{})
go manager.Run(stopCh)
stopCh := runVolumeManager(manager)
defer func() {
close(stopCh)
}()
podManager.SetPods([]*api.Pod{pod})
@ -170,8 +173,6 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
if !reflect.DeepEqual(tc.expected, actual) {
t.Errorf("Expected supplemental groups %v, got %v", tc.expected, actual)
}
close(stopCh)
}
}
@ -190,7 +191,8 @@ func newTestVolumeManager(
kubeClient,
plugMgr,
&containertest.FakeRuntime{},
&mount.FakeMounter{})
&mount.FakeMounter{},
"")
return vm, err
}
@ -276,3 +278,12 @@ func simulateVolumeInUseUpdate(
}
}
}
func runVolumeManager(manager VolumeManager) chan struct{} {
stopCh := make(chan struct{})
//readyCh := make(chan bool, 1)
//readyCh <- true
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
go manager.Run(sourcesReady, stopCh)
return stopCh
}

View File

@ -58,6 +58,10 @@ type GoRoutineMap interface {
// necessary during tests - the test should wait until all operations finish
// and evaluate results after that.
Wait()
// IsOperationPending returns true if the operation is pending, otherwise
// returns false
IsOperationPending(operationName string) bool
}
// NewGoRoutineMap returns a new instance of GoRoutineMap.
@ -75,7 +79,7 @@ type goRoutineMap struct {
operations map[string]operation
exponentialBackOffOnError bool
cond *sync.Cond
lock sync.Mutex
lock sync.RWMutex
}
type operation struct {
@ -150,6 +154,16 @@ func (grm *goRoutineMap) operationComplete(
}
}
func (grm *goRoutineMap) IsOperationPending(operationName string) bool {
grm.lock.RLock()
defer grm.lock.RUnlock()
existingOp, exists := grm.operations[operationName]
if exists && existingOp.operationPending {
return true
}
return false
}
func (grm *goRoutineMap) Wait() {
grm.lock.Lock()
defer grm.lock.Unlock()

View File

@ -140,3 +140,7 @@ func (f *FakeMounter) DeviceOpened(pathname string) (bool, error) {
func (f *FakeMounter) PathIsDevice(pathname string) (bool, error) {
return true, nil
}
func (f *FakeMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
return getDeviceNameFromMount(f, mountPath, pluginDir)
}

View File

@ -19,7 +19,10 @@ limitations under the License.
package mount
import (
"fmt"
"path"
"path/filepath"
"strings"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/exec"
@ -43,6 +46,9 @@ type Interface interface {
DeviceOpened(pathname string) (bool, error)
// PathIsDevice determines if a path is a device.
PathIsDevice(pathname string) (bool, error)
// GetDeviceNameFromMount finds the device name by checking the mount path
// to get the global mount path which matches its plugin directory
GetDeviceNameFromMount(mountPath, pluginDir string) (string, error)
}
// This represents a single line in /proc/mounts or /etc/fstab.
@ -151,3 +157,25 @@ func GetDeviceNameFromMount(mounter Interface, mountPath string) (string, int, e
}
return device, refCount, nil
}
// getDeviceNameFromMount find the device name from /proc/mounts in which
// the mount path reference should match the given plugin directory. In case no mount path reference
// matches, returns the volume name taken from its given mountPath
func getDeviceNameFromMount(mounter Interface, mountPath, pluginDir string) (string, error) {
refs, err := GetMountRefs(mounter, mountPath)
if err != nil {
glog.V(4).Infof("GetMountRefs failed for mount path %q: %v", mountPath, err)
return "", err
}
if len(refs) == 0 {
glog.V(4).Infof("Directory %s is not mounted", mountPath)
return "", fmt.Errorf("directory %s is not mounted", mountPath)
}
for _, ref := range refs {
if strings.HasPrefix(ref, pluginDir) {
return path.Base(ref), nil
}
}
return path.Base(mountPath), nil
}

View File

@ -222,6 +222,11 @@ func pathIsDevice(pathname string) (bool, error) {
return false, nil
}
//GetDeviceNameFromMount: given a mount point, find the device name from its global mount point
func (mounter *Mounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
return getDeviceNameFromMount(mounter, mountPath, pluginDir)
}
func listProcMounts(mountFilePath string) ([]MountPoint, error) {
hash1, err := readProcMounts(mountFilePath, nil)
if err != nil {

View File

@ -217,6 +217,11 @@ func (n *NsenterMounter) PathIsDevice(pathname string) (bool, error) {
return pathIsDevice(pathname)
}
//GetDeviceNameFromMount given a mount point, find the volume id from checking /proc/mounts
func (n *NsenterMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
return getDeviceNameFromMount(n, mountPath, pluginDir)
}
func (n *NsenterMounter) absHostPath(command string) string {
path, ok := n.paths[command]
if !ok {

View File

@ -49,3 +49,7 @@ func (*NsenterMounter) DeviceOpened(pathname string) (bool, error) {
func (*NsenterMounter) PathIsDevice(pathname string) (bool, error) {
return true, nil
}
func (*NsenterMounter) GetDeviceNameFromMount(mountPath, pluginDir string) (string, error) {
return "", nil
}

View File

@ -51,6 +51,11 @@ func (plugin *awsElasticBlockStorePlugin) NewAttacher() (volume.Attacher, error)
}, nil
}
func (plugin *awsElasticBlockStorePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter()
return mount.GetMountRefs(mounter, deviceMountPath)
}
func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, hostName string) (string, error) {
volumeSource, readOnly, err := getVolumeSource(spec)
if err != nil {

View File

@ -188,6 +188,24 @@ func getVolumeSource(
return nil, false, fmt.Errorf("Spec does not reference an AWS EBS volume type")
}
func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
if err != nil {
return nil, err
}
awsVolume := &api.Volume{
Name: volName,
VolumeSource: api.VolumeSource{
AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
VolumeID: sourceName,
},
},
}
return volume.NewSpecFromVolume(awsVolume), nil
}
// Abstract interface to PD operations.
type ebsManager interface {
CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, labels map[string]string, err error)

View File

@ -124,6 +124,19 @@ func (plugin *azureFilePlugin) newUnmounterInternal(volName string, podUID types
}}, nil
}
func (plugin *azureFilePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
azureVolume := &api.Volume{
Name: volName,
VolumeSource: api.VolumeSource{
AzureFile: &api.AzureFileVolumeSource{
SecretName: volName,
ShareName: volName,
},
},
}
return volume.NewSpecFromVolume(azureVolume), nil
}
// azureFile volumes represent mount of an AzureFile share.
type azureFile struct {
volName string

View File

@ -154,6 +154,19 @@ func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UI
}, nil
}
func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
cephfsVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
CephFS: &api.CephFSVolumeSource{
Monitors: []string{},
Path: volumeName,
},
},
}
return volume.NewSpecFromVolume(cephfsVolume), nil
}
// CephFS volumes represent a bare host file or directory mount of an CephFS export.
type cephfs struct {
volName string

View File

@ -53,6 +53,11 @@ func (plugin *cinderPlugin) NewAttacher() (volume.Attacher, error) {
}, nil
}
func (plugin *cinderPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter()
return mount.GetMountRefs(mounter, deviceMountPath)
}
func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, hostName string) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {

View File

@ -204,6 +204,25 @@ func (plugin *cinderPlugin) getCloudProvider() (CinderProvider, error) {
}
}
func (plugin *cinderPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
if err != nil {
return nil, err
}
glog.V(4).Infof("Found volume %s mounted to %s", sourceName, mountPath)
cinderVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
Cinder: &api.CinderVolumeSource{
VolumeID: sourceName,
},
},
}
return volume.NewSpecFromVolume(cinderVolume), nil
}
// Abstract interface to PD operations.
type cdManager interface {
// Attaches the disk to the kubelet's host machine.

View File

@ -86,6 +86,16 @@ func (plugin *configMapPlugin) NewUnmounter(volName string, podUID types.UID) (v
return &configMapVolumeUnmounter{&configMapVolume{volName, podUID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}}, nil
}
func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
configMapVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
ConfigMap: &api.ConfigMapVolumeSource{},
},
}
return volume.NewSpecFromVolume(configMapVolume), nil
}
type configMapVolume struct {
volName string
podUID types.UID

View File

@ -106,6 +106,16 @@ func (plugin *downwardAPIPlugin) NewUnmounter(volName string, podUID types.UID)
}, nil
}
func (plugin *downwardAPIPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
downwardAPIVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
DownwardAPI: &api.DownwardAPIVolumeSource{},
},
}
return volume.NewSpecFromVolume(downwardAPIVolume), nil
}
// downwardAPIVolume retrieves downward API data and placing them into the volume on the host.
type downwardAPIVolume struct {
volName string

View File

@ -128,6 +128,16 @@ func (plugin *emptyDirPlugin) newUnmounterInternal(volName string, podUID types.
return ed, nil
}
func (plugin *emptyDirPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
emptyDirVolume := &api.Volume{
Name: volName,
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
}
return volume.NewSpecFromVolume(emptyDirVolume), nil
}
// mountDetector abstracts how to find what kind of mount a path is backed by.
type mountDetector interface {
// GetMountMedium determines what type of medium a given path is backed

View File

@ -141,6 +141,16 @@ func (plugin *fcPlugin) execCommand(command string, args []string) ([]byte, erro
return cmd.CombinedOutput()
}
func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
fcVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
FC: &api.FCVolumeSource{},
},
}
return volume.NewSpecFromVolume(fcVolume), nil
}
type fcDisk struct {
volName string
podUID types.UID

View File

@ -179,6 +179,18 @@ func (plugin *flexVolumePlugin) newUnmounterInternal(volName string, podUID type
}, nil
}
func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, sourceName string) (*volume.Spec, error) {
flexVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
FlexVolume: &api.FlexVolumeSource{
Driver: sourceName,
},
},
}
return volume.NewSpecFromVolume(flexVolume), nil
}
// flexVolume is the disk resource provided by this plugin.
type flexVolumeDisk struct {
// podUID is the UID of the pod.

View File

@ -117,6 +117,18 @@ func (p *flockerPlugin) NewUnmounter(datasetName string, podUID types.UID) (volu
return nil, nil
}
func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
flockerVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
Flocker: &api.FlockerVolumeSource{
DatasetName: volumeName,
},
},
}
return volume.NewSpecFromVolume(flockerVolume), nil
}
type flockerMounter struct {
*flocker
client flockerclient.Clientable

View File

@ -53,6 +53,11 @@ func (plugin *gcePersistentDiskPlugin) NewAttacher() (volume.Attacher, error) {
}, nil
}
func (plugin *gcePersistentDiskPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter()
return mount.GetMountRefs(mounter, deviceMountPath)
}
// Attach checks with the GCE cloud provider if the specified volume is already
// attached to the specified node. If the volume is attached, it succeeds
// (returns nil). If it is not, Attach issues a call to the GCE cloud provider

View File

@ -182,6 +182,24 @@ func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.Vol
}, nil
}
func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
sourceName, err := mounter.GetDeviceNameFromMount(mountPath, pluginDir)
if err != nil {
return nil, err
}
gceVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: sourceName,
},
},
}
return volume.NewSpecFromVolume(gceVolume), nil
}
// Abstract interface to PD operations.
type pdManager interface {
// Creates a volume

View File

@ -107,6 +107,16 @@ func (plugin *gitRepoPlugin) NewUnmounter(volName string, podUID types.UID) (vol
}, nil
}
func (plugin *gitRepoPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
gitVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
GitRepo: &api.GitRepoVolumeSource{},
},
}
return volume.NewSpecFromVolume(gitVolume), nil
}
// gitRepo volumes are directories which are pre-filled from a git repository.
// These do not persist beyond the lifetime of a pod.
type gitRepoVolume struct {

View File

@ -145,6 +145,19 @@ func (plugin *glusterfsPlugin) execCommand(command string, args []string) ([]byt
return cmd.CombinedOutput()
}
func (plugin *glusterfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
glusterfsVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{
EndpointsName: volumeName,
Path: volumeName,
},
},
}
return volume.NewSpecFromVolume(glusterfsVolume), nil
}
// Glusterfs volumes represent a bare host file or directory mount of an Glusterfs export.
type glusterfs struct {
volName string

View File

@ -138,6 +138,18 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu
return plugin.newProvisionerFunc(options, plugin.host)
}
func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
hostPathVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: volumeName,
},
},
}
return volume.NewSpecFromVolume(hostPathVolume), nil
}
func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil")

View File

@ -146,6 +146,19 @@ func (plugin *iscsiPlugin) execCommand(command string, args []string) ([]byte, e
return cmd.CombinedOutput()
}
func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
iscsiVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
ISCSI: &api.ISCSIVolumeSource{
TargetPortal: volumeName,
IQN: volumeName,
},
},
}
return volume.NewSpecFromVolume(iscsiVolume), nil
}
type iscsiDisk struct {
volName string
podUID types.UID

View File

@ -136,6 +136,18 @@ func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.R
return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config)
}
func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
nfsVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
NFS: &api.NFSVolumeSource{
Path: volumeName,
},
},
}
return volume.NewSpecFromVolume(nfsVolume), nil
}
// NFS volumes represent a bare host file or directory mount of an NFS export.
type nfs struct {
volName string

View File

@ -98,6 +98,12 @@ type VolumePlugin interface {
// - name: The volume name, as per the api.Volume spec.
// - podUID: The UID of the enclosing pod
NewUnmounter(name string, podUID types.UID) (Unmounter, error)
// ConstructVolumeSpec constructs a volume spec based on the given volume name
// and mountPath. The spec may have incomplete information due to limited
// information from input. This function is used by volume manager to reconstruct
// volume spec by reading the volume directories from disk
ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error)
}
// PersistentVolumePlugin is an extended interface of VolumePlugin and is used
@ -151,6 +157,7 @@ type AttachableVolumePlugin interface {
VolumePlugin
NewAttacher() (Attacher, error)
NewDetacher() (Detacher, error)
GetDeviceMountRefs(deviceMountPath string) ([]string, error)
}
// VolumeHost is an interface that plugins can use to access the kubelet.

View File

@ -165,6 +165,18 @@ func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID,
}, nil
}
func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
rbdVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
RBD: &api.RBDVolumeSource{
CephMonitors: []string{},
},
},
}
return volume.NewSpecFromVolume(rbdVolume), nil
}
type rbd struct {
volName string
podUID types.UID

View File

@ -110,6 +110,18 @@ func (plugin *secretPlugin) NewUnmounter(volName string, podUID types.UID) (volu
}, nil
}
func (plugin *secretPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
secretVolume := &api.Volume{
Name: volName,
VolumeSource: api.VolumeSource{
Secret: &api.SecretVolumeSource{
SecretName: volName,
},
},
}
return volume.NewSpecFromVolume(secretVolume), nil
}
type secretVolume struct {
volName string
podUID types.UID

View File

@ -288,6 +288,14 @@ func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMod
return []api.PersistentVolumeAccessMode{}
}
func (plugin *FakeVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) {
return nil, nil
}
func (plugin *FakeVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
return []string{}, nil
}
type FakeVolume struct {
sync.RWMutex
PodUID types.UID

View File

@ -58,6 +58,10 @@ type NestedPendingOperations interface {
// necessary during tests - the test should wait until all operations finish
// and evaluate results after that.
Wait()
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
// otherwise it returns false
IsOperationPending(volumeName api.UniqueVolumeName, podName types.UniquePodName) bool
}
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
@ -74,7 +78,7 @@ type nestedPendingOperations struct {
operations []operation
exponentialBackOffOnError bool
cond *sync.Cond
lock sync.Mutex
lock sync.RWMutex
}
type operation struct {
@ -90,29 +94,9 @@ func (grm *nestedPendingOperations) Run(
operationFunc func() error) error {
grm.lock.Lock()
defer grm.lock.Unlock()
var previousOp operation
opExists := false
previousOpIndex := -1
for previousOpIndex, previousOp = range grm.operations {
if previousOp.volumeName != volumeName {
// No match, keep searching
continue
}
if previousOp.podName != emptyUniquePodName &&
podName != emptyUniquePodName &&
previousOp.podName != podName {
// No match, keep searching
continue
}
// Match
opExists = true
break
}
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
if opExists {
previousOp := grm.operations[previousOpIndex]
// Operation already exists
if previousOp.operationPending {
// Operation is pending
@ -153,6 +137,43 @@ func (grm *nestedPendingOperations) Run(
return nil
}
func (grm *nestedPendingOperations) IsOperationPending(
volumeName api.UniqueVolumeName,
podName types.UniquePodName) bool {
grm.lock.RLock()
defer grm.lock.RUnlock()
exist, previousOpIndex := grm.isOperationExists(volumeName, podName)
if exist && grm.operations[previousOpIndex].operationPending {
return true
}
return false
}
func (grm *nestedPendingOperations) isOperationExists(
volumeName api.UniqueVolumeName,
podName types.UniquePodName) (bool, int) {
for previousOpIndex, previousOp := range grm.operations {
if previousOp.volumeName != volumeName {
// No match, keep searching
continue
}
if previousOp.podName != emptyUniquePodName &&
podName != emptyUniquePodName &&
previousOp.podName != podName {
// No match, keep searching
continue
}
// Match
return true, previousOpIndex
}
return false, -1
}
func (grm *nestedPendingOperations) getOperation(
volumeName api.UniqueVolumeName,
podName types.UniquePodName) (uint, error) {

View File

@ -99,6 +99,10 @@ type OperationExecutor interface {
// object, for example) then an error is returned which triggers exponential
// back off on retries.
VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName string, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
// otherwise it returns false
IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool
}
// NewOperationExecutor returns a new instance of OperationExecutor.
@ -339,6 +343,10 @@ type operationExecutor struct {
pendingOperations nestedpendingoperations.NestedPendingOperations
}
func (oe *operationExecutor) IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
return oe.pendingOperations.IsOperationPending(volumeName, podName)
}
func (oe *operationExecutor) AttachVolume(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
@ -391,6 +399,7 @@ func (oe *operationExecutor) MountVolume(
func (oe *operationExecutor) UnmountVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
unmountFunc, err :=
oe.generateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld)
if err != nil {
@ -811,11 +820,14 @@ func (oe *operationExecutor) generateUnmountVolumeFunc(
}
glog.Infof(
"UnmountVolume.TearDown succeeded for volume %q (volume.spec.Name: %q) pod %q (UID: %q).",
"UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
volumeToUnmount.VolumeName,
volumeToUnmount.OuterVolumeSpecName,
volumeToUnmount.PodName,
volumeToUnmount.PodUID)
volumeToUnmount.PodUID,
volumeToUnmount.InnerVolumeSpecName,
volumeToUnmount.PluginName,
volumeToUnmount.VolumeGidValue)
// Update actual state of world
markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
@ -879,7 +891,17 @@ func (oe *operationExecutor) generateUnmountDeviceFunc(
deviceToDetach.VolumeSpec.Name(),
err)
}
refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
if err != nil || len(refs) > 0 {
if err == nil {
err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs)
}
return fmt.Errorf(
"GetDeviceMountRefs check failed for volume %q (spec.Name: %q) with: %v",
deviceToDetach.VolumeName,
deviceToDetach.VolumeSpec.Name(),
err)
}
// Execute unmount
unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath)
if unmountDeviceErr != nil {

View File

@ -54,8 +54,10 @@ func GetUniqueVolumeName(pluginName, volumeName string) api.UniqueVolumeName {
// GetUniqueVolumeNameForNonAttachableVolume returns the unique volume name
// for a non-attachable volume.
func GetUniqueVolumeNameForNonAttachableVolume(podName types.UniquePodName, volumePlugin volume.VolumePlugin, podSpecName string) api.UniqueVolumeName {
return api.UniqueVolumeName(fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, podSpecName))
func GetUniqueVolumeNameForNonAttachableVolume(
podName types.UniquePodName, volumePlugin volume.VolumePlugin, volumeSpec *volume.Spec) api.UniqueVolumeName {
return api.UniqueVolumeName(
fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, volumeSpec.Name()))
}
// GetUniqueVolumeNameFromSpec uses the given VolumePlugin to generate a unique

View File

@ -135,6 +135,18 @@ func (plugin *vsphereVolumePlugin) getCloudProvider() (*vsphere.VSphere, error)
return vs, nil
}
func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
vsphereVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
VsphereVolume: &api.VsphereVirtualDiskVolumeSource{
VolumePath: volumeName,
},
},
}
return volume.NewSpecFromVolume(vsphereVolume), nil
}
// Abstract interface to disk operations.
type vdManager interface {
// Attaches the disk to the kubelet's host machine.