Merge pull request #19503 from markturansky/attacher_interface

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-02-09 20:50:41 -08:00
commit c6ed624bfb
7 changed files with 187 additions and 31 deletions

View File

@ -1887,7 +1887,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
runningSet.Insert(string(pod.ID))
}
for name, vol := range currentVolumes {
for name, cleanerTuple := range currentVolumes {
if _, ok := desiredVolumes[name]; !ok {
parts := strings.Split(name, "/")
if runningSet.Has(parts[0]) {
@ -1900,10 +1900,19 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
// TODO(yifan): Refactor this hacky string manipulation.
kl.volumeManager.DeleteVolumes(types.UID(parts[0]))
//TODO (jonesdl) This should not block other kubelet synchronization procedures
err := vol.TearDown()
err := cleanerTuple.Cleaner.TearDown()
if err != nil {
glog.Errorf("Could not tear down volume %q: %v", name, err)
}
// volume is unmounted. some volumes also require detachment from the node.
if cleanerTuple.Detacher != nil {
detacher := *cleanerTuple.Detacher
err = detacher.Detach()
if err != nil {
glog.Errorf("Could not detach volume %q: %v", name, err)
}
}
}
}
return nil

View File

@ -480,7 +480,8 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
func TestMountExternalVolumes(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubelet.volumePluginMgr.InitPlugins([]volume.VolumePlugin{&volume.FakeVolumePlugin{PluginName: "fake", Host: nil}}, &volumeHost{kubelet})
plug := &volume.FakeVolumePlugin{PluginName: "fake", Host: nil}
kubelet.volumePluginMgr.InitPlugins([]volume.VolumePlugin{plug}, &volumeHost{kubelet})
pod := api.Pod{
ObjectMeta: api.ObjectMeta{
@ -510,6 +511,9 @@ func TestMountExternalVolumes(t *testing.T) {
t.Errorf("api.Pod volumes map is missing key: %s. %#v", name, podVolumes)
}
}
if plug.NewAttacherCallCount != 1 {
t.Errorf("Expected plugin NewAttacher to be called %d times but got %d", 1, plug.NewAttacherCallCount)
}
}
func TestGetPodVolumesFromDisk(t *testing.T) {
@ -541,7 +545,7 @@ func TestGetPodVolumesFromDisk(t *testing.T) {
for _, ep := range expectedPaths {
found := false
for _, cl := range volumesFound {
if ep == cl.GetPath() {
if ep == cl.Cleaner.GetPath() {
found = true
break
}
@ -550,6 +554,9 @@ func TestGetPodVolumesFromDisk(t *testing.T) {
t.Errorf("Could not find a volume with path %s", ep)
}
}
if plug.NewDetacherCallCount != len(volsOnDisk) {
t.Errorf("Expected plugin NewDetacher to be called %d times but got %d", len(volsOnDisk), plug.NewDetacherCallCount)
}
}
// Test for https://github.com/kubernetes/kubernetes/pull/19600
@ -628,7 +635,7 @@ func TestCleanupOrphanedVolumes(t *testing.T) {
for _, ep := range pathsOnDisk {
found := false
for _, cl := range volumesFound {
if ep == cl.GetPath() {
if ep == cl.Cleaner.GetPath() {
found = true
break
}
@ -648,7 +655,7 @@ func TestCleanupOrphanedVolumes(t *testing.T) {
t.Errorf("Expected to find 0 cleaners, got %d", len(volumesFound))
}
for _, cl := range volumesFound {
t.Errorf("Found unexpected volume %s", cl.GetPath())
t.Errorf("Found unexpected volume %s", cl.Cleaner.GetPath())
}
}

View File

@ -111,23 +111,6 @@ func (vh *volumeHost) GetHostName() string {
return vh.kubelet.hostname
}
func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) {
plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, fmt.Errorf("can't use volume plugins for %s: %v", spec.Name(), err)
}
if plugin == nil {
// Not found but not an error
return nil, nil
}
builder, err := plugin.NewBuilder(spec, pod, opts)
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume plugin for %s: %v", spec.Name(), err)
}
glog.V(3).Infof("Used volume plugin %q for %s", plugin.Name(), spec.Name())
return builder, nil
}
func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, error) {
podVolumes := make(kubecontainer.VolumeMap)
for i := range pod.Spec.Volumes {
@ -152,6 +135,22 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap,
if builder == nil {
return nil, errUnsupportedVolumeType
}
// some volumes require attachment before builder's setup.
// The plugin can be nil, but non-nil errors are legitimate errors.
// For non-nil plugins, Attachment to a node is required before Builder's setup.
attacher, err := kl.newVolumeAttacherFromPlugins(internal, pod, volume.VolumeOptions{RootContext: rootContext})
if err != nil {
glog.Errorf("Could not create volume attacher for pod %s: %v", pod.UID, err)
return nil, err
}
if attacher != nil {
err = attacher.Attach()
if err != nil {
return nil, err
}
}
err = builder.SetUp(fsGroup)
if err != nil {
return nil, err
@ -206,14 +205,22 @@ func (kl *Kubelet) getPodVolumes(podUID types.UID) ([]*volumeTuple, error) {
return volumes, nil
}
// cleanerTuple is a union struct to allow separating detaching from the cleaner.
// some volumes require detachment but not all. Cleaner cannot be nil but Detacher is optional.
type cleanerTuple struct {
Cleaner volume.Cleaner
Detacher *volume.Detacher
}
// getPodVolumesFromDisk examines directory structure to determine volumes that
// are presently active and mounted. Returns a map of volume.Cleaner types.
func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner {
currentVolumes := make(map[string]volume.Cleaner)
// are presently active and mounted. Returns a union struct containing a volume.Cleaner
// and potentially a volume.Detacher.
func (kl *Kubelet) getPodVolumesFromDisk() map[string]cleanerTuple {
currentVolumes := make(map[string]cleanerTuple)
podUIDs, err := kl.listPodsFromDisk()
if err != nil {
glog.Errorf("Could not get pods from disk: %v", err)
return map[string]volume.Cleaner{}
return map[string]cleanerTuple{}
}
// Find the volumes for each on-disk pod.
for _, podUID := range podUIDs {
@ -239,12 +246,58 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner {
glog.Errorf("Could not create volume cleaner for %s: %v", volume.Name, errUnsupportedVolumeType)
continue
}
currentVolumes[identifier] = cleaner
tuple := cleanerTuple{Cleaner: cleaner}
detacher, err := kl.newVolumeDetacherFromPlugins(volume.Kind, volume.Name, podUID)
// plugin can be nil but a non-nil error is a legitimate error
if err != nil {
glog.Errorf("Could not create volume detacher for %s: %v", volume.Name, err)
continue
}
if detacher != nil {
tuple.Detacher = &detacher
}
currentVolumes[identifier] = tuple
}
}
return currentVolumes
}
func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) {
plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, fmt.Errorf("can't use volume plugins for %s: %v", spec.Name(), err)
}
if plugin == nil {
// Not found but not an error
return nil, nil
}
builder, err := plugin.NewBuilder(spec, pod, opts)
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume builder for %s: %v", spec.Name(), err)
}
glog.V(3).Infof("Used volume plugin %q to mount %s", plugin.Name(), spec.Name())
return builder, nil
}
func (kl *Kubelet) newVolumeAttacherFromPlugins(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Attacher, error) {
plugin, err := kl.volumePluginMgr.FindAttachablePluginBySpec(spec)
if err != nil {
return nil, fmt.Errorf("can't use volume plugins for %s: %v", spec.Name(), err)
}
if plugin == nil {
// Not found but not an error.
return nil, nil
}
attacher, err := plugin.NewAttacher(spec)
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume attacher for %s: %v", spec.Name(), err)
}
glog.V(3).Infof("Used volume plugin %q to attach %s/%s", plugin.Name(), spec.Name())
return attacher, nil
}
func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID) (volume.Cleaner, error) {
plugName := strings.UnescapeQualifiedNameForDisk(kind)
plugin, err := kl.volumePluginMgr.FindPluginByName(plugName)
@ -260,6 +313,25 @@ func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume plugin for %s/%s: %v", podUID, kind, err)
}
glog.V(3).Infof("Used volume plugin %q for %s/%s", plugin.Name(), podUID, kind)
glog.V(3).Infof("Used volume plugin %q to unmount %s/%s", plugin.Name(), podUID, kind)
return cleaner, nil
}
func (kl *Kubelet) newVolumeDetacherFromPlugins(kind string, name string, podUID types.UID) (volume.Detacher, error) {
plugName := strings.UnescapeQualifiedNameForDisk(kind)
plugin, err := kl.volumePluginMgr.FindAttachablePluginByName(plugName)
if err != nil {
return nil, fmt.Errorf("can't use volume plugins for %s/%s: %v", podUID, kind, err)
}
if plugin == nil {
// Not found but not an error.
return nil, nil
}
detacher, err := plugin.NewDetacher(name, podUID)
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume plugin for %s/%s: %v", podUID, kind, err)
}
glog.V(3).Infof("Used volume plugin %q to detach %s/%s", plugin.Name(), podUID, kind)
return detacher, nil
}

View File

@ -117,6 +117,14 @@ type ProvisionableVolumePlugin interface {
NewProvisioner(options VolumeOptions) (Provisioner, error)
}
// AttachableVolumePlugin is an extended interface of VolumePlugin and is used for volumes that require attachment
// to a node before mounting.
type AttachableVolumePlugin interface {
VolumePlugin
NewAttacher(spec *Spec) (Attacher, error)
NewDetacher(name string, podUID types.UID) (Detacher, error)
}
// VolumeHost is an interface that plugins can use to access the kubelet.
type VolumeHost interface {
// GetPluginDir returns the absolute path to a directory under which
@ -384,6 +392,34 @@ func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (ProvisionableV
return nil, fmt.Errorf("no creatable volume plugin matched")
}
// FindAttachablePluginBySpec fetches a persistent volume plugin by name. Unlike the other "FindPlugin" methods, this
// does not return error if no plugin is found. All volumes require a builder and cleaner, but not every volume will
// have an attacher/detacher.
func (pm *VolumePluginMgr) FindAttachablePluginBySpec(spec *Spec) (AttachableVolumePlugin, error) {
volumePlugin, err := pm.FindPluginBySpec(spec)
if err != nil {
return nil, err
}
if attachableVolumePlugin, ok := volumePlugin.(AttachableVolumePlugin); ok {
return attachableVolumePlugin, nil
}
return nil, nil
}
// FindAttachablePluginByName fetches an attachable volume plugin by name. Unlike the other "FindPlugin" methods, this
// does not return error if no plugin is found. All volumes require a builder and cleaner, but not every volume will
// have an attacher/detacher.
func (pm *VolumePluginMgr) FindAttachablePluginByName(name string) (AttachableVolumePlugin, error) {
volumePlugin, err := pm.FindPluginByName(name)
if err != nil {
return nil, err
}
if attachablePlugin, ok := volumePlugin.(AttachableVolumePlugin); ok {
return attachablePlugin, nil
}
return nil, nil
}
// NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler pod. By default, a recycler pod simply runs
// "rm -rf" on a volume and tests for emptiness. Most attributes of the template will be correct for most
// plugin implementations. The following attributes can be overridden per plugin via configuration:

View File

@ -134,12 +134,15 @@ type FakeVolumePlugin struct {
Host VolumeHost
Config VolumeConfig
LastProvisionerOptions VolumeOptions
NewAttacherCallCount int
NewDetacherCallCount int
}
var _ VolumePlugin = &FakeVolumePlugin{}
var _ RecyclableVolumePlugin = &FakeVolumePlugin{}
var _ DeletableVolumePlugin = &FakeVolumePlugin{}
var _ ProvisionableVolumePlugin = &FakeVolumePlugin{}
var _ AttachableVolumePlugin = &FakeVolumePlugin{}
func (plugin *FakeVolumePlugin) Init(host VolumeHost) error {
plugin.Host = host
@ -163,6 +166,16 @@ func (plugin *FakeVolumePlugin) NewCleaner(volName string, podUID types.UID) (Cl
return &FakeVolume{podUID, volName, plugin, MetricsNil{}}, nil
}
func (plugin *FakeVolumePlugin) NewAttacher(spec *Spec) (Attacher, error) {
plugin.NewAttacherCallCount = plugin.NewAttacherCallCount + 1
return &FakeVolume{}, nil
}
func (plugin *FakeVolumePlugin) NewDetacher(name string, podUID types.UID) (Detacher, error) {
plugin.NewDetacherCallCount = plugin.NewDetacherCallCount + 1
return &FakeVolume{}, nil
}
func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) {
return &fakeRecycler{"/attributesTransferredFromSpec", MetricsNil{}}, nil
}
@ -215,6 +228,14 @@ func (fv *FakeVolume) TearDownAt(dir string) error {
return os.RemoveAll(dir)
}
func (fv *FakeVolume) Attach() error {
return nil
}
func (fv *FakeVolume) Detach() error {
return nil
}
type fakeRecycler struct {
path string
MetricsNil

View File

@ -117,7 +117,7 @@ type Provisioner interface {
NewPersistentVolumeTemplate() (*api.PersistentVolume, error)
}
// Delete removes the resource from the underlying storage provider. Calls to this method should block until
// Deleter removes the resource from the underlying storage provider. Calls to this method should block until
// the deletion is complete. Any error returned indicates the volume has failed to be reclaimed.
// A nil return indicates success.
type Deleter interface {
@ -126,6 +126,17 @@ type Deleter interface {
Delete() error
}
// Attacher can attach a volume to a node.
type Attacher interface {
Volume
Attach() error
}
// Detacher can detach a volume from a node.
type Detacher interface {
Detach() error
}
func RenameDirectory(oldPath, newName string) (string, error) {
newPath, err := ioutil.TempDir(path.Dir(oldPath), newName)
if err != nil {

View File

@ -51,7 +51,7 @@ func TestPersistentVolumeRecycler(t *testing.T) {
testClient := clientset.NewForConfigOrDie(&client.Config{Host: s.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
host := volume.NewFakeVolumeHost("/tmp/fake", nil, nil)
plugins := []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}}}
plugins := []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}, 0, 0}}
cloud := &fake_cloud.FakeCloud{}
binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Second)