From 71e7dba84525761db1436cf2c7daaffb2a730d27 Mon Sep 17 00:00:00 2001 From: Sami Wagiaalla Date: Fri, 29 Apr 2016 15:29:59 -0400 Subject: [PATCH] Abstract node side functionality of attachable plugins - Expand Attacher/Detacher interfaces to break up work more explicitly. - Add arguments to all functions to avoid having implementers store the data needed for operations. - Expand unit tests to check that Attach, Detach, WaitForAttach, WaitForDetach, MountDevice, and UnmountDevice get call where appropriet. --- pkg/kubelet/kubelet.go | 21 +++++- pkg/kubelet/kubelet_test.go | 52 +++++++++++++++ pkg/kubelet/volumes.go | 21 ++++-- pkg/volume/testing/testing.go | 73 +++++++++++++++++++-- pkg/volume/volume.go | 38 ++++++++++- test/integration/persistent_volumes_test.go | 2 +- 6 files changed, 190 insertions(+), 17 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 91f8665ce9..e97c1bc717 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -141,6 +141,9 @@ const ( ContainerGCPeriod = time.Minute // Period for performing image garbage collection. ImageGCPeriod = 5 * time.Minute + + // Maximum period to wait for pod volume setup operations + maxWaitForVolumeOps = 20 * time.Minute ) // SyncHandler is an interface implemented by Kubelet, for testability @@ -2108,11 +2111,27 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco // volume is unmounted. some volumes also require detachment from the node. if cleaner.Detacher != nil && len(refs) == 1 { + detacher := *cleaner.Detacher - err = detacher.Detach() + devicePath, _, err := mount.GetDeviceNameFromMount(kl.mounter, refs[0]) + if err != nil { + glog.Errorf("Could not find device path %v", err) + } + + if err = detacher.UnmountDevice(refs[0], kl.mounter); err != nil { + glog.Errorf("Could not unmount the global mount for %q: %v", name, err) + } + + err = detacher.Detach(refs[0], kl.hostname) if err != nil { glog.Errorf("Could not detach volume %q: %v", name, err) } + + // TODO(swagiaal): This will block until the sync loop until device is attached + // so all of this should be moved to a mount/unmount manager which does it asynchronously + if err = detacher.WaitForDetach(devicePath, maxWaitForVolumeOps); err != nil { + glog.Errorf("Error while waiting for detach: %v", err) + } } } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index de92dd462c..b3306194b3 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -522,6 +522,17 @@ func TestMountExternalVolumes(t *testing.T) { if plug.NewAttacherCallCount != 1 { t.Errorf("Expected plugin NewAttacher to be called %d times but got %d", 1, plug.NewAttacherCallCount) } + + attacher := plug.Attachers[0] + if attacher.AttachCallCount != 1 { + t.Errorf("Expected Attach to be called") + } + if attacher.WaitForAttachCallCount != 1 { + t.Errorf("Expected WaitForAttach to be called") + } + if attacher.MountDeviceCallCount != 1 { + t.Errorf("Expected MountDevice to be called") + } } func TestGetPodVolumesFromDisk(t *testing.T) { @@ -589,6 +600,11 @@ func TestCleanupOrphanedVolumes(t *testing.T) { fv := volumetest.FakeVolume{PodUID: volsOnDisk[i].podUID, VolName: volsOnDisk[i].volName, Plugin: plug} fv.SetUp(nil) pathsOnDisk = append(pathsOnDisk, fv.GetPath()) + + // Simulate the global mount so that the fakeMounter returns the + // expected number of refs for the attached disk. + kubelet.mounter.Mount(fv.GetPath(), fv.GetPath(), "fakefs", nil) + kubelet.mounter.Mount(fv.GetPath(), "/path/fake/device", "fake", nil) } // store the claim in fake kubelet database @@ -637,6 +653,14 @@ func TestCleanupOrphanedVolumes(t *testing.T) { t.Errorf("cleanupOrphanedVolumes failed: %v", err) } + if len(plug.Unmounters) != len(volsOnDisk) { + t.Errorf("Unexpected number of unmounters created. Expected %d got %d", len(volsOnDisk), len(plug.Unmounters)) + } + for _, unmounter := range plug.Unmounters { + if unmounter.TearDownCallCount != 0 { + t.Errorf("Unexpected number of calls to TearDown() %d for volume %v", unmounter.TearDownCallCount, unmounter) + } + } volumesFound := kubelet.getPodVolumesFromDisk() if len(volumesFound) != len(pathsOnDisk) { t.Errorf("Expected to find %d unmounters, got %d", len(pathsOnDisk), len(volumesFound)) @@ -666,6 +690,34 @@ func TestCleanupOrphanedVolumes(t *testing.T) { for _, cl := range volumesFound { t.Errorf("Found unexpected volume %s", cl.Unmounter.GetPath()) } + + // Two unmounters created by the previous calls to cleanupOrphanedVolumes and getPodVolumesFromDisk + expectedUnmounters := len(volsOnDisk) + 2 + if len(plug.Unmounters) != expectedUnmounters { + t.Errorf("Unexpected number of unmounters created. Expected %d got %d", expectedUnmounters, len(plug.Unmounters)) + } + + // This is the unmounter which was actually used to perform a tear down. + unmounter := plug.Unmounters[2] + + if unmounter.TearDownCallCount != 1 { + t.Errorf("Unexpected number of calls to TearDown() %d for volume %v", unmounter.TearDownCallCount, unmounter) + } + + if plug.NewDetacherCallCount != expectedUnmounters { + t.Errorf("Expected plugin NewDetacher to be called %d times but got %d", expectedUnmounters, plug.NewDetacherCallCount) + } + + detacher := plug.Detachers[2] + if detacher.DetachCallCount != 1 { + t.Errorf("Expected Detach to be called") + } + if detacher.WaitForDetachCallCount != 1 { + t.Errorf("Expected WaitForDetach to be called") + } + if detacher.UnmountDeviceCallCount != 1 { + t.Errorf("Expected UnmountDevice to be called") + } } type stubVolume struct { diff --git a/pkg/kubelet/volumes.go b/pkg/kubelet/volumes.go index e8c2f38d08..fe73d5f7c0 100644 --- a/pkg/kubelet/volumes.go +++ b/pkg/kubelet/volumes.go @@ -116,7 +116,6 @@ func (vh *volumeHost) GetHostName() string { func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, error) { podVolumes := make(kubecontainer.VolumeMap) for i := range pod.Spec.Volumes { - volSpec := &pod.Spec.Volumes[i] var fsGroup *int64 if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.FSGroup != nil { fsGroup = pod.Spec.SecurityContext.FSGroup @@ -128,8 +127,8 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, } // Try to use a plugin for this volume. - internal := volume.NewSpecFromVolume(volSpec) - mounter, err := kl.newVolumeMounterFromPlugins(internal, pod, volume.VolumeOptions{RootContext: rootContext}) + volSpec := volume.NewSpecFromVolume(&pod.Spec.Volumes[i]) + mounter, err := kl.newVolumeMounterFromPlugins(volSpec, pod, volume.VolumeOptions{RootContext: rootContext}) if err != nil { glog.Errorf("Could not create volume mounter for pod %s: %v", pod.UID, err) return nil, err @@ -138,23 +137,33 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, // some volumes require attachment before mounter's setup. // The plugin can be nil, but non-nil errors are legitimate errors. // For non-nil plugins, Attachment to a node is required before Mounter's setup. - attacher, err := kl.newVolumeAttacherFromPlugins(internal, pod, volume.VolumeOptions{RootContext: rootContext}) + attacher, err := kl.newVolumeAttacherFromPlugins(volSpec, pod, volume.VolumeOptions{RootContext: rootContext}) if err != nil { glog.Errorf("Could not create volume attacher for pod %s: %v", pod.UID, err) return nil, err } if attacher != nil { - err = attacher.Attach() + err = attacher.Attach(volSpec, kl.hostname) if err != nil { return nil, err } + + devicePath, err := attacher.WaitForAttach(volSpec, maxWaitForVolumeOps) + if err != nil { + return nil, err + } + + deviceMountPath := attacher.GetDeviceMountPath(volSpec) + if err = attacher.MountDevice(devicePath, deviceMountPath, kl.mounter); err != nil { + return nil, err + } } err = mounter.SetUp(fsGroup) if err != nil { return nil, err } - podVolumes[volSpec.Name] = kubecontainer.VolumeInfo{Mounter: mounter} + podVolumes[volSpec.Volume.Name] = kubecontainer.VolumeInfo{Mounter: mounter} } return podVolumes, nil } diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 2361aa59e3..e7ecb96f35 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -23,6 +23,7 @@ import ( "os/exec" "path" "strings" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" @@ -137,6 +138,11 @@ type FakeVolumePlugin struct { LastProvisionerOptions VolumeOptions NewAttacherCallCount int NewDetacherCallCount int + + Mounters []*FakeVolume + Unmounters []*FakeVolume + Attachers []*FakeVolume + Detachers []*FakeVolume } var _ VolumePlugin = &FakeVolumePlugin{} @@ -145,6 +151,12 @@ var _ DeletableVolumePlugin = &FakeVolumePlugin{} var _ ProvisionableVolumePlugin = &FakeVolumePlugin{} var _ AttachableVolumePlugin = &FakeVolumePlugin{} +func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume { + volume := &FakeVolume{} + *list = append(*list, volume) + return volume +} + func (plugin *FakeVolumePlugin) Init(host VolumeHost) error { plugin.Host = host return nil @@ -160,21 +172,31 @@ func (plugin *FakeVolumePlugin) CanSupport(spec *Spec) bool { } func (plugin *FakeVolumePlugin) NewMounter(spec *Spec, pod *api.Pod, opts VolumeOptions) (Mounter, error) { - return &FakeVolume{pod.UID, spec.Name(), plugin, MetricsNil{}}, nil + volume := plugin.getFakeVolume(&plugin.Mounters) + volume.PodUID = pod.UID + volume.VolName = spec.Name() + volume.Plugin = plugin + volume.MetricsNil = MetricsNil{} + return volume, nil } func (plugin *FakeVolumePlugin) NewUnmounter(volName string, podUID types.UID) (Unmounter, error) { - return &FakeVolume{podUID, volName, plugin, MetricsNil{}}, nil + volume := plugin.getFakeVolume(&plugin.Unmounters) + volume.PodUID = podUID + volume.VolName = volName + volume.Plugin = plugin + volume.MetricsNil = MetricsNil{} + return volume, nil } func (plugin *FakeVolumePlugin) NewAttacher(spec *Spec) (Attacher, error) { plugin.NewAttacherCallCount = plugin.NewAttacherCallCount + 1 - return &FakeVolume{}, nil + return plugin.getFakeVolume(&plugin.Attachers), nil } func (plugin *FakeVolumePlugin) NewDetacher(name string, podUID types.UID) (Detacher, error) { plugin.NewDetacherCallCount = plugin.NewDetacherCallCount + 1 - return &FakeVolume{}, nil + return plugin.getFakeVolume(&plugin.Detachers), nil } func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) { @@ -199,6 +221,16 @@ type FakeVolume struct { VolName string Plugin *FakeVolumePlugin MetricsNil + + SetUpCallCount int + TearDownCallCount int + AttachCallCount int + DetachCallCount int + WaitForAttachCallCount int + WaitForDetachCallCount int + MountDeviceCallCount int + UnmountDeviceCallCount int + GetDeviceMountPathCallCount int } func (_ *FakeVolume) GetAttributes() Attributes { @@ -210,6 +242,7 @@ func (_ *FakeVolume) GetAttributes() Attributes { } func (fv *FakeVolume) SetUp(fsGroup *int64) error { + fv.SetUpCallCount++ return fv.SetUpAt(fv.GetPath(), fsGroup) } @@ -222,6 +255,7 @@ func (fv *FakeVolume) GetPath() string { } func (fv *FakeVolume) TearDown() error { + fv.TearDownCallCount++ return fv.TearDownAt(fv.GetPath()) } @@ -229,11 +263,38 @@ func (fv *FakeVolume) TearDownAt(dir string) error { return os.RemoveAll(dir) } -func (fv *FakeVolume) Attach() error { +func (fv *FakeVolume) Attach(spec *Spec, hostName string) error { + fv.AttachCallCount++ return nil } -func (fv *FakeVolume) Detach() error { +func (fv *FakeVolume) WaitForAttach(spec *Spec, spectimeout time.Duration) (string, error) { + fv.WaitForAttachCallCount++ + return "", nil +} + +func (fv *FakeVolume) GetDeviceMountPath(spec *Spec) string { + fv.GetDeviceMountPathCallCount++ + return "" +} + +func (fv *FakeVolume) MountDevice(devicePath string, deviceMountPath string, mounter mount.Interface) error { + fv.MountDeviceCallCount++ + return nil +} + +func (fv *FakeVolume) Detach(deviceMountPath string, hostName string) error { + fv.DetachCallCount++ + return nil +} + +func (fv *FakeVolume) WaitForDetach(devicePath string, timeout time.Duration) error { + fv.WaitForDetachCallCount++ + return nil +} + +func (fv *FakeVolume) UnmountDevice(globalMountPath string, mounter mount.Interface) error { + fv.UnmountDeviceCallCount++ return nil } diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 3444127bb3..88a4e6d871 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -20,15 +20,18 @@ import ( "io/ioutil" "os" "path" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/util/mount" ) // Volume represents a directory used by pods or hosts on a node. // All method implementations of methods in the volume interface must be idempotent. type Volume interface { - // GetPath returns the directory path the volume is mounted to. + // GetPath returns the path to which the volume should be + // mounted for the pod. GetPath() string // MetricsProvider embeds methods for exposing metrics (e.g. used,available space). @@ -129,12 +132,41 @@ type Deleter interface { // Attacher can attach a volume to a node. type Attacher interface { Volume - Attach() error + + // Attach the volume specified by the given spec to the given host + Attach(spec *Spec, hostName string) error + + // WaitForAttach blocks until the device is attached to this + // node. If it successfully attaches, the path to the device + // is returned. Otherwise, if the device does not attach after + // the given timeout period, an error will be returned. + WaitForAttach(spec *Spec, timeout time.Duration) (string, error) + + // GetDeviceMountPath returns a path where the device should + // be mounted after it is attached. This is a global mount + // point which should be bind mounted for individual volumes. + GetDeviceMountPath(spec *Spec) string + + // MountDevice mounts the disk to a global path which + // individual pods can then bind mount + MountDevice(devicePath string, deviceMountPath string, mounter mount.Interface) error } // Detacher can detach a volume from a node. type Detacher interface { - Detach() error + + // Detach the given volume from the given host. + Detach(deviceMountPath string, hostName string) error + + // WaitForDetach blocks until the device is detached from this + // node. If the device does not detach within the given timout + // period an error is returned. + WaitForDetach(devicePath string, timout time.Duration) error + + // UnmountDevice unmounts the global mount of the disk. This + // should only be called once all bind mounts have been + // unmounted. + UnmountDevice(globalMountPath string, mounter mount.Interface) error } func RenameDirectory(oldPath, newName string) (string, error) { diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index aec4c8046d..82206a4259 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -54,7 +54,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { testClient := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000, Burst: 100000}) host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil) - plugins := []volume.VolumePlugin{&volumetest.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}, 0, 0}} + plugins := []volume.VolumePlugin{&volumetest.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}, 0, 0, nil, nil, nil, nil}} cloud := &fake_cloud.FakeCloud{} binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Second)