mirror of https://github.com/k3s-io/k3s
Abstract node side functionality of attachable plugins
- Expand Attacher/Detacher interfaces to break up work more explicitly. - Add arguments to all functions to avoid having implementers store the data needed for operations. - Expand unit tests to check that Attach, Detach, WaitForAttach, WaitForDetach, MountDevice, and UnmountDevice get call where appropriet.pull/6/head
parent
ad86986c87
commit
71e7dba845
|
@ -141,6 +141,9 @@ const (
|
|||
ContainerGCPeriod = time.Minute
|
||||
// Period for performing image garbage collection.
|
||||
ImageGCPeriod = 5 * time.Minute
|
||||
|
||||
// Maximum period to wait for pod volume setup operations
|
||||
maxWaitForVolumeOps = 20 * time.Minute
|
||||
)
|
||||
|
||||
// SyncHandler is an interface implemented by Kubelet, for testability
|
||||
|
@ -2108,11 +2111,27 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
|
|||
|
||||
// volume is unmounted. some volumes also require detachment from the node.
|
||||
if cleaner.Detacher != nil && len(refs) == 1 {
|
||||
|
||||
detacher := *cleaner.Detacher
|
||||
err = detacher.Detach()
|
||||
devicePath, _, err := mount.GetDeviceNameFromMount(kl.mounter, refs[0])
|
||||
if err != nil {
|
||||
glog.Errorf("Could not find device path %v", err)
|
||||
}
|
||||
|
||||
if err = detacher.UnmountDevice(refs[0], kl.mounter); err != nil {
|
||||
glog.Errorf("Could not unmount the global mount for %q: %v", name, err)
|
||||
}
|
||||
|
||||
err = detacher.Detach(refs[0], kl.hostname)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not detach volume %q: %v", name, err)
|
||||
}
|
||||
|
||||
// TODO(swagiaal): This will block until the sync loop until device is attached
|
||||
// so all of this should be moved to a mount/unmount manager which does it asynchronously
|
||||
if err = detacher.WaitForDetach(devicePath, maxWaitForVolumeOps); err != nil {
|
||||
glog.Errorf("Error while waiting for detach: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -522,6 +522,17 @@ func TestMountExternalVolumes(t *testing.T) {
|
|||
if plug.NewAttacherCallCount != 1 {
|
||||
t.Errorf("Expected plugin NewAttacher to be called %d times but got %d", 1, plug.NewAttacherCallCount)
|
||||
}
|
||||
|
||||
attacher := plug.Attachers[0]
|
||||
if attacher.AttachCallCount != 1 {
|
||||
t.Errorf("Expected Attach to be called")
|
||||
}
|
||||
if attacher.WaitForAttachCallCount != 1 {
|
||||
t.Errorf("Expected WaitForAttach to be called")
|
||||
}
|
||||
if attacher.MountDeviceCallCount != 1 {
|
||||
t.Errorf("Expected MountDevice to be called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPodVolumesFromDisk(t *testing.T) {
|
||||
|
@ -589,6 +600,11 @@ func TestCleanupOrphanedVolumes(t *testing.T) {
|
|||
fv := volumetest.FakeVolume{PodUID: volsOnDisk[i].podUID, VolName: volsOnDisk[i].volName, Plugin: plug}
|
||||
fv.SetUp(nil)
|
||||
pathsOnDisk = append(pathsOnDisk, fv.GetPath())
|
||||
|
||||
// Simulate the global mount so that the fakeMounter returns the
|
||||
// expected number of refs for the attached disk.
|
||||
kubelet.mounter.Mount(fv.GetPath(), fv.GetPath(), "fakefs", nil)
|
||||
kubelet.mounter.Mount(fv.GetPath(), "/path/fake/device", "fake", nil)
|
||||
}
|
||||
|
||||
// store the claim in fake kubelet database
|
||||
|
@ -637,6 +653,14 @@ func TestCleanupOrphanedVolumes(t *testing.T) {
|
|||
t.Errorf("cleanupOrphanedVolumes failed: %v", err)
|
||||
}
|
||||
|
||||
if len(plug.Unmounters) != len(volsOnDisk) {
|
||||
t.Errorf("Unexpected number of unmounters created. Expected %d got %d", len(volsOnDisk), len(plug.Unmounters))
|
||||
}
|
||||
for _, unmounter := range plug.Unmounters {
|
||||
if unmounter.TearDownCallCount != 0 {
|
||||
t.Errorf("Unexpected number of calls to TearDown() %d for volume %v", unmounter.TearDownCallCount, unmounter)
|
||||
}
|
||||
}
|
||||
volumesFound := kubelet.getPodVolumesFromDisk()
|
||||
if len(volumesFound) != len(pathsOnDisk) {
|
||||
t.Errorf("Expected to find %d unmounters, got %d", len(pathsOnDisk), len(volumesFound))
|
||||
|
@ -666,6 +690,34 @@ func TestCleanupOrphanedVolumes(t *testing.T) {
|
|||
for _, cl := range volumesFound {
|
||||
t.Errorf("Found unexpected volume %s", cl.Unmounter.GetPath())
|
||||
}
|
||||
|
||||
// Two unmounters created by the previous calls to cleanupOrphanedVolumes and getPodVolumesFromDisk
|
||||
expectedUnmounters := len(volsOnDisk) + 2
|
||||
if len(plug.Unmounters) != expectedUnmounters {
|
||||
t.Errorf("Unexpected number of unmounters created. Expected %d got %d", expectedUnmounters, len(plug.Unmounters))
|
||||
}
|
||||
|
||||
// This is the unmounter which was actually used to perform a tear down.
|
||||
unmounter := plug.Unmounters[2]
|
||||
|
||||
if unmounter.TearDownCallCount != 1 {
|
||||
t.Errorf("Unexpected number of calls to TearDown() %d for volume %v", unmounter.TearDownCallCount, unmounter)
|
||||
}
|
||||
|
||||
if plug.NewDetacherCallCount != expectedUnmounters {
|
||||
t.Errorf("Expected plugin NewDetacher to be called %d times but got %d", expectedUnmounters, plug.NewDetacherCallCount)
|
||||
}
|
||||
|
||||
detacher := plug.Detachers[2]
|
||||
if detacher.DetachCallCount != 1 {
|
||||
t.Errorf("Expected Detach to be called")
|
||||
}
|
||||
if detacher.WaitForDetachCallCount != 1 {
|
||||
t.Errorf("Expected WaitForDetach to be called")
|
||||
}
|
||||
if detacher.UnmountDeviceCallCount != 1 {
|
||||
t.Errorf("Expected UnmountDevice to be called")
|
||||
}
|
||||
}
|
||||
|
||||
type stubVolume struct {
|
||||
|
|
|
@ -116,7 +116,6 @@ func (vh *volumeHost) GetHostName() string {
|
|||
func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, error) {
|
||||
podVolumes := make(kubecontainer.VolumeMap)
|
||||
for i := range pod.Spec.Volumes {
|
||||
volSpec := &pod.Spec.Volumes[i]
|
||||
var fsGroup *int64
|
||||
if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.FSGroup != nil {
|
||||
fsGroup = pod.Spec.SecurityContext.FSGroup
|
||||
|
@ -128,8 +127,8 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap,
|
|||
}
|
||||
|
||||
// Try to use a plugin for this volume.
|
||||
internal := volume.NewSpecFromVolume(volSpec)
|
||||
mounter, err := kl.newVolumeMounterFromPlugins(internal, pod, volume.VolumeOptions{RootContext: rootContext})
|
||||
volSpec := volume.NewSpecFromVolume(&pod.Spec.Volumes[i])
|
||||
mounter, err := kl.newVolumeMounterFromPlugins(volSpec, pod, volume.VolumeOptions{RootContext: rootContext})
|
||||
if err != nil {
|
||||
glog.Errorf("Could not create volume mounter for pod %s: %v", pod.UID, err)
|
||||
return nil, err
|
||||
|
@ -138,23 +137,33 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap,
|
|||
// some volumes require attachment before mounter's setup.
|
||||
// The plugin can be nil, but non-nil errors are legitimate errors.
|
||||
// For non-nil plugins, Attachment to a node is required before Mounter's setup.
|
||||
attacher, err := kl.newVolumeAttacherFromPlugins(internal, pod, volume.VolumeOptions{RootContext: rootContext})
|
||||
attacher, err := kl.newVolumeAttacherFromPlugins(volSpec, pod, volume.VolumeOptions{RootContext: rootContext})
|
||||
if err != nil {
|
||||
glog.Errorf("Could not create volume attacher for pod %s: %v", pod.UID, err)
|
||||
return nil, err
|
||||
}
|
||||
if attacher != nil {
|
||||
err = attacher.Attach()
|
||||
err = attacher.Attach(volSpec, kl.hostname)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
devicePath, err := attacher.WaitForAttach(volSpec, maxWaitForVolumeOps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
deviceMountPath := attacher.GetDeviceMountPath(volSpec)
|
||||
if err = attacher.MountDevice(devicePath, deviceMountPath, kl.mounter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = mounter.SetUp(fsGroup)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
podVolumes[volSpec.Name] = kubecontainer.VolumeInfo{Mounter: mounter}
|
||||
podVolumes[volSpec.Volume.Name] = kubecontainer.VolumeInfo{Mounter: mounter}
|
||||
}
|
||||
return podVolumes, nil
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
|
@ -137,6 +138,11 @@ type FakeVolumePlugin struct {
|
|||
LastProvisionerOptions VolumeOptions
|
||||
NewAttacherCallCount int
|
||||
NewDetacherCallCount int
|
||||
|
||||
Mounters []*FakeVolume
|
||||
Unmounters []*FakeVolume
|
||||
Attachers []*FakeVolume
|
||||
Detachers []*FakeVolume
|
||||
}
|
||||
|
||||
var _ VolumePlugin = &FakeVolumePlugin{}
|
||||
|
@ -145,6 +151,12 @@ var _ DeletableVolumePlugin = &FakeVolumePlugin{}
|
|||
var _ ProvisionableVolumePlugin = &FakeVolumePlugin{}
|
||||
var _ AttachableVolumePlugin = &FakeVolumePlugin{}
|
||||
|
||||
func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
|
||||
volume := &FakeVolume{}
|
||||
*list = append(*list, volume)
|
||||
return volume
|
||||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) Init(host VolumeHost) error {
|
||||
plugin.Host = host
|
||||
return nil
|
||||
|
@ -160,21 +172,31 @@ func (plugin *FakeVolumePlugin) CanSupport(spec *Spec) bool {
|
|||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) NewMounter(spec *Spec, pod *api.Pod, opts VolumeOptions) (Mounter, error) {
|
||||
return &FakeVolume{pod.UID, spec.Name(), plugin, MetricsNil{}}, nil
|
||||
volume := plugin.getFakeVolume(&plugin.Mounters)
|
||||
volume.PodUID = pod.UID
|
||||
volume.VolName = spec.Name()
|
||||
volume.Plugin = plugin
|
||||
volume.MetricsNil = MetricsNil{}
|
||||
return volume, nil
|
||||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) NewUnmounter(volName string, podUID types.UID) (Unmounter, error) {
|
||||
return &FakeVolume{podUID, volName, plugin, MetricsNil{}}, nil
|
||||
volume := plugin.getFakeVolume(&plugin.Unmounters)
|
||||
volume.PodUID = podUID
|
||||
volume.VolName = volName
|
||||
volume.Plugin = plugin
|
||||
volume.MetricsNil = MetricsNil{}
|
||||
return volume, nil
|
||||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) NewAttacher(spec *Spec) (Attacher, error) {
|
||||
plugin.NewAttacherCallCount = plugin.NewAttacherCallCount + 1
|
||||
return &FakeVolume{}, nil
|
||||
return plugin.getFakeVolume(&plugin.Attachers), nil
|
||||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) NewDetacher(name string, podUID types.UID) (Detacher, error) {
|
||||
plugin.NewDetacherCallCount = plugin.NewDetacherCallCount + 1
|
||||
return &FakeVolume{}, nil
|
||||
return plugin.getFakeVolume(&plugin.Detachers), nil
|
||||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) {
|
||||
|
@ -199,6 +221,16 @@ type FakeVolume struct {
|
|||
VolName string
|
||||
Plugin *FakeVolumePlugin
|
||||
MetricsNil
|
||||
|
||||
SetUpCallCount int
|
||||
TearDownCallCount int
|
||||
AttachCallCount int
|
||||
DetachCallCount int
|
||||
WaitForAttachCallCount int
|
||||
WaitForDetachCallCount int
|
||||
MountDeviceCallCount int
|
||||
UnmountDeviceCallCount int
|
||||
GetDeviceMountPathCallCount int
|
||||
}
|
||||
|
||||
func (_ *FakeVolume) GetAttributes() Attributes {
|
||||
|
@ -210,6 +242,7 @@ func (_ *FakeVolume) GetAttributes() Attributes {
|
|||
}
|
||||
|
||||
func (fv *FakeVolume) SetUp(fsGroup *int64) error {
|
||||
fv.SetUpCallCount++
|
||||
return fv.SetUpAt(fv.GetPath(), fsGroup)
|
||||
}
|
||||
|
||||
|
@ -222,6 +255,7 @@ func (fv *FakeVolume) GetPath() string {
|
|||
}
|
||||
|
||||
func (fv *FakeVolume) TearDown() error {
|
||||
fv.TearDownCallCount++
|
||||
return fv.TearDownAt(fv.GetPath())
|
||||
}
|
||||
|
||||
|
@ -229,11 +263,38 @@ func (fv *FakeVolume) TearDownAt(dir string) error {
|
|||
return os.RemoveAll(dir)
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) Attach() error {
|
||||
func (fv *FakeVolume) Attach(spec *Spec, hostName string) error {
|
||||
fv.AttachCallCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) Detach() error {
|
||||
func (fv *FakeVolume) WaitForAttach(spec *Spec, spectimeout time.Duration) (string, error) {
|
||||
fv.WaitForAttachCallCount++
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) GetDeviceMountPath(spec *Spec) string {
|
||||
fv.GetDeviceMountPathCallCount++
|
||||
return ""
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) MountDevice(devicePath string, deviceMountPath string, mounter mount.Interface) error {
|
||||
fv.MountDeviceCallCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) Detach(deviceMountPath string, hostName string) error {
|
||||
fv.DetachCallCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) WaitForDetach(devicePath string, timeout time.Duration) error {
|
||||
fv.WaitForDetachCallCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fv *FakeVolume) UnmountDevice(globalMountPath string, mounter mount.Interface) error {
|
||||
fv.UnmountDeviceCallCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -20,15 +20,18 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
)
|
||||
|
||||
// Volume represents a directory used by pods or hosts on a node.
|
||||
// All method implementations of methods in the volume interface must be idempotent.
|
||||
type Volume interface {
|
||||
// GetPath returns the directory path the volume is mounted to.
|
||||
// GetPath returns the path to which the volume should be
|
||||
// mounted for the pod.
|
||||
GetPath() string
|
||||
|
||||
// MetricsProvider embeds methods for exposing metrics (e.g. used,available space).
|
||||
|
@ -129,12 +132,41 @@ type Deleter interface {
|
|||
// Attacher can attach a volume to a node.
|
||||
type Attacher interface {
|
||||
Volume
|
||||
Attach() error
|
||||
|
||||
// Attach the volume specified by the given spec to the given host
|
||||
Attach(spec *Spec, hostName string) error
|
||||
|
||||
// WaitForAttach blocks until the device is attached to this
|
||||
// node. If it successfully attaches, the path to the device
|
||||
// is returned. Otherwise, if the device does not attach after
|
||||
// the given timeout period, an error will be returned.
|
||||
WaitForAttach(spec *Spec, timeout time.Duration) (string, error)
|
||||
|
||||
// GetDeviceMountPath returns a path where the device should
|
||||
// be mounted after it is attached. This is a global mount
|
||||
// point which should be bind mounted for individual volumes.
|
||||
GetDeviceMountPath(spec *Spec) string
|
||||
|
||||
// MountDevice mounts the disk to a global path which
|
||||
// individual pods can then bind mount
|
||||
MountDevice(devicePath string, deviceMountPath string, mounter mount.Interface) error
|
||||
}
|
||||
|
||||
// Detacher can detach a volume from a node.
|
||||
type Detacher interface {
|
||||
Detach() error
|
||||
|
||||
// Detach the given volume from the given host.
|
||||
Detach(deviceMountPath string, hostName string) error
|
||||
|
||||
// WaitForDetach blocks until the device is detached from this
|
||||
// node. If the device does not detach within the given timout
|
||||
// period an error is returned.
|
||||
WaitForDetach(devicePath string, timout time.Duration) error
|
||||
|
||||
// UnmountDevice unmounts the global mount of the disk. This
|
||||
// should only be called once all bind mounts have been
|
||||
// unmounted.
|
||||
UnmountDevice(globalMountPath string, mounter mount.Interface) error
|
||||
}
|
||||
|
||||
func RenameDirectory(oldPath, newName string) (string, error) {
|
||||
|
|
|
@ -54,7 +54,7 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
|||
testClient := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000, Burst: 100000})
|
||||
host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)
|
||||
|
||||
plugins := []volume.VolumePlugin{&volumetest.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}, 0, 0}}
|
||||
plugins := []volume.VolumePlugin{&volumetest.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}, 0, 0, nil, nil, nil, nil}}
|
||||
cloud := &fake_cloud.FakeCloud{}
|
||||
|
||||
binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Second)
|
||||
|
|
Loading…
Reference in New Issue