diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b4dd591fe4..b88ff60e70 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1887,7 +1887,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco runningSet.Insert(string(pod.ID)) } - for name, vol := range currentVolumes { + for name, cleanerTuple := range currentVolumes { if _, ok := desiredVolumes[name]; !ok { parts := strings.Split(name, "/") if runningSet.Has(parts[0]) { @@ -1900,10 +1900,19 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco // TODO(yifan): Refactor this hacky string manipulation. kl.volumeManager.DeleteVolumes(types.UID(parts[0])) //TODO (jonesdl) This should not block other kubelet synchronization procedures - err := vol.TearDown() + err := cleanerTuple.Cleaner.TearDown() if err != nil { glog.Errorf("Could not tear down volume %q: %v", name, err) } + + // volume is unmounted. some volumes also require detachment from the node. + if cleanerTuple.Detacher != nil { + detacher := *cleanerTuple.Detacher + err = detacher.Detach() + if err != nil { + glog.Errorf("Could not detach volume %q: %v", name, err) + } + } } } return nil diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index bbb74e505a..9ae5cf2fde 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -480,7 +480,8 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { func TestMountExternalVolumes(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet - kubelet.volumePluginMgr.InitPlugins([]volume.VolumePlugin{&volume.FakeVolumePlugin{PluginName: "fake", Host: nil}}, &volumeHost{kubelet}) + plug := &volume.FakeVolumePlugin{PluginName: "fake", Host: nil} + kubelet.volumePluginMgr.InitPlugins([]volume.VolumePlugin{plug}, &volumeHost{kubelet}) pod := api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -510,6 +511,9 @@ func TestMountExternalVolumes(t *testing.T) { t.Errorf("api.Pod volumes map is missing key: %s. %#v", name, podVolumes) } } + if plug.NewAttacherCallCount != 1 { + t.Errorf("Expected plugin NewAttacher to be called %d times but got %d", 1, plug.NewAttacherCallCount) + } } func TestGetPodVolumesFromDisk(t *testing.T) { @@ -541,7 +545,7 @@ func TestGetPodVolumesFromDisk(t *testing.T) { for _, ep := range expectedPaths { found := false for _, cl := range volumesFound { - if ep == cl.GetPath() { + if ep == cl.Cleaner.GetPath() { found = true break } @@ -550,6 +554,9 @@ func TestGetPodVolumesFromDisk(t *testing.T) { t.Errorf("Could not find a volume with path %s", ep) } } + if plug.NewDetacherCallCount != len(volsOnDisk) { + t.Errorf("Expected plugin NewDetacher to be called %d times but got %d", len(volsOnDisk), plug.NewDetacherCallCount) + } } // Test for https://github.com/kubernetes/kubernetes/pull/19600 @@ -628,7 +635,7 @@ func TestCleanupOrphanedVolumes(t *testing.T) { for _, ep := range pathsOnDisk { found := false for _, cl := range volumesFound { - if ep == cl.GetPath() { + if ep == cl.Cleaner.GetPath() { found = true break } @@ -648,7 +655,7 @@ func TestCleanupOrphanedVolumes(t *testing.T) { t.Errorf("Expected to find 0 cleaners, got %d", len(volumesFound)) } for _, cl := range volumesFound { - t.Errorf("Found unexpected volume %s", cl.GetPath()) + t.Errorf("Found unexpected volume %s", cl.Cleaner.GetPath()) } } diff --git a/pkg/kubelet/volumes.go b/pkg/kubelet/volumes.go index a20b57628a..da88df0747 100644 --- a/pkg/kubelet/volumes.go +++ b/pkg/kubelet/volumes.go @@ -111,23 +111,6 @@ func (vh *volumeHost) GetHostName() string { return vh.kubelet.hostname } -func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) { - plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec) - if err != nil { - return nil, fmt.Errorf("can't use volume plugins for %s: %v", spec.Name(), err) - } - if plugin == nil { - // Not found but not an error - return nil, nil - } - builder, err := plugin.NewBuilder(spec, pod, opts) - if err != nil { - return nil, fmt.Errorf("failed to instantiate volume plugin for %s: %v", spec.Name(), err) - } - glog.V(3).Infof("Used volume plugin %q for %s", plugin.Name(), spec.Name()) - return builder, nil -} - func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, error) { podVolumes := make(kubecontainer.VolumeMap) for i := range pod.Spec.Volumes { @@ -152,6 +135,22 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, if builder == nil { return nil, errUnsupportedVolumeType } + + // some volumes require attachment before builder's setup. + // The plugin can be nil, but non-nil errors are legitimate errors. + // For non-nil plugins, Attachment to a node is required before Builder's setup. + attacher, err := kl.newVolumeAttacherFromPlugins(internal, pod, volume.VolumeOptions{RootContext: rootContext}) + if err != nil { + glog.Errorf("Could not create volume attacher for pod %s: %v", pod.UID, err) + return nil, err + } + if attacher != nil { + err = attacher.Attach() + if err != nil { + return nil, err + } + } + err = builder.SetUp(fsGroup) if err != nil { return nil, err @@ -206,14 +205,22 @@ func (kl *Kubelet) getPodVolumes(podUID types.UID) ([]*volumeTuple, error) { return volumes, nil } +// cleanerTuple is a union struct to allow separating detaching from the cleaner. +// some volumes require detachment but not all. Cleaner cannot be nil but Detacher is optional. +type cleanerTuple struct { + Cleaner volume.Cleaner + Detacher *volume.Detacher +} + // getPodVolumesFromDisk examines directory structure to determine volumes that -// are presently active and mounted. Returns a map of volume.Cleaner types. -func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner { - currentVolumes := make(map[string]volume.Cleaner) +// are presently active and mounted. Returns a union struct containing a volume.Cleaner +// and potentially a volume.Detacher. +func (kl *Kubelet) getPodVolumesFromDisk() map[string]cleanerTuple { + currentVolumes := make(map[string]cleanerTuple) podUIDs, err := kl.listPodsFromDisk() if err != nil { glog.Errorf("Could not get pods from disk: %v", err) - return map[string]volume.Cleaner{} + return map[string]cleanerTuple{} } // Find the volumes for each on-disk pod. for _, podUID := range podUIDs { @@ -239,12 +246,58 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner { glog.Errorf("Could not create volume cleaner for %s: %v", volume.Name, errUnsupportedVolumeType) continue } - currentVolumes[identifier] = cleaner + + tuple := cleanerTuple{Cleaner: cleaner} + detacher, err := kl.newVolumeDetacherFromPlugins(volume.Kind, volume.Name, podUID) + // plugin can be nil but a non-nil error is a legitimate error + if err != nil { + glog.Errorf("Could not create volume detacher for %s: %v", volume.Name, err) + continue + } + if detacher != nil { + tuple.Detacher = &detacher + } + currentVolumes[identifier] = tuple } } return currentVolumes } +func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) { + plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec) + if err != nil { + return nil, fmt.Errorf("can't use volume plugins for %s: %v", spec.Name(), err) + } + if plugin == nil { + // Not found but not an error + return nil, nil + } + builder, err := plugin.NewBuilder(spec, pod, opts) + if err != nil { + return nil, fmt.Errorf("failed to instantiate volume builder for %s: %v", spec.Name(), err) + } + glog.V(3).Infof("Used volume plugin %q to mount %s", plugin.Name(), spec.Name()) + return builder, nil +} + +func (kl *Kubelet) newVolumeAttacherFromPlugins(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Attacher, error) { + plugin, err := kl.volumePluginMgr.FindAttachablePluginBySpec(spec) + if err != nil { + return nil, fmt.Errorf("can't use volume plugins for %s: %v", spec.Name(), err) + } + if plugin == nil { + // Not found but not an error. + return nil, nil + } + + attacher, err := plugin.NewAttacher(spec) + if err != nil { + return nil, fmt.Errorf("failed to instantiate volume attacher for %s: %v", spec.Name(), err) + } + glog.V(3).Infof("Used volume plugin %q to attach %s/%s", plugin.Name(), spec.Name()) + return attacher, nil +} + func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID) (volume.Cleaner, error) { plugName := strings.UnescapeQualifiedNameForDisk(kind) plugin, err := kl.volumePluginMgr.FindPluginByName(plugName) @@ -260,6 +313,25 @@ func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID if err != nil { return nil, fmt.Errorf("failed to instantiate volume plugin for %s/%s: %v", podUID, kind, err) } - glog.V(3).Infof("Used volume plugin %q for %s/%s", plugin.Name(), podUID, kind) + glog.V(3).Infof("Used volume plugin %q to unmount %s/%s", plugin.Name(), podUID, kind) return cleaner, nil } + +func (kl *Kubelet) newVolumeDetacherFromPlugins(kind string, name string, podUID types.UID) (volume.Detacher, error) { + plugName := strings.UnescapeQualifiedNameForDisk(kind) + plugin, err := kl.volumePluginMgr.FindAttachablePluginByName(plugName) + if err != nil { + return nil, fmt.Errorf("can't use volume plugins for %s/%s: %v", podUID, kind, err) + } + if plugin == nil { + // Not found but not an error. + return nil, nil + } + + detacher, err := plugin.NewDetacher(name, podUID) + if err != nil { + return nil, fmt.Errorf("failed to instantiate volume plugin for %s/%s: %v", podUID, kind, err) + } + glog.V(3).Infof("Used volume plugin %q to detach %s/%s", plugin.Name(), podUID, kind) + return detacher, nil +} diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index dd0fa9aacb..21ef5a9cbb 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -117,6 +117,14 @@ type ProvisionableVolumePlugin interface { NewProvisioner(options VolumeOptions) (Provisioner, error) } +// AttachableVolumePlugin is an extended interface of VolumePlugin and is used for volumes that require attachment +// to a node before mounting. +type AttachableVolumePlugin interface { + VolumePlugin + NewAttacher(spec *Spec) (Attacher, error) + NewDetacher(name string, podUID types.UID) (Detacher, error) +} + // VolumeHost is an interface that plugins can use to access the kubelet. type VolumeHost interface { // GetPluginDir returns the absolute path to a directory under which @@ -384,6 +392,34 @@ func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (ProvisionableV return nil, fmt.Errorf("no creatable volume plugin matched") } +// FindAttachablePluginBySpec fetches a persistent volume plugin by name. Unlike the other "FindPlugin" methods, this +// does not return error if no plugin is found. All volumes require a builder and cleaner, but not every volume will +// have an attacher/detacher. +func (pm *VolumePluginMgr) FindAttachablePluginBySpec(spec *Spec) (AttachableVolumePlugin, error) { + volumePlugin, err := pm.FindPluginBySpec(spec) + if err != nil { + return nil, err + } + if attachableVolumePlugin, ok := volumePlugin.(AttachableVolumePlugin); ok { + return attachableVolumePlugin, nil + } + return nil, nil +} + +// FindAttachablePluginByName fetches an attachable volume plugin by name. Unlike the other "FindPlugin" methods, this +// does not return error if no plugin is found. All volumes require a builder and cleaner, but not every volume will +// have an attacher/detacher. +func (pm *VolumePluginMgr) FindAttachablePluginByName(name string) (AttachableVolumePlugin, error) { + volumePlugin, err := pm.FindPluginByName(name) + if err != nil { + return nil, err + } + if attachablePlugin, ok := volumePlugin.(AttachableVolumePlugin); ok { + return attachablePlugin, nil + } + return nil, nil +} + // NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler pod. By default, a recycler pod simply runs // "rm -rf" on a volume and tests for emptiness. Most attributes of the template will be correct for most // plugin implementations. The following attributes can be overridden per plugin via configuration: diff --git a/pkg/volume/testing.go b/pkg/volume/testing.go index 5ff37a4af1..3990fb884f 100644 --- a/pkg/volume/testing.go +++ b/pkg/volume/testing.go @@ -134,12 +134,15 @@ type FakeVolumePlugin struct { Host VolumeHost Config VolumeConfig LastProvisionerOptions VolumeOptions + NewAttacherCallCount int + NewDetacherCallCount int } var _ VolumePlugin = &FakeVolumePlugin{} var _ RecyclableVolumePlugin = &FakeVolumePlugin{} var _ DeletableVolumePlugin = &FakeVolumePlugin{} var _ ProvisionableVolumePlugin = &FakeVolumePlugin{} +var _ AttachableVolumePlugin = &FakeVolumePlugin{} func (plugin *FakeVolumePlugin) Init(host VolumeHost) error { plugin.Host = host @@ -163,6 +166,16 @@ func (plugin *FakeVolumePlugin) NewCleaner(volName string, podUID types.UID) (Cl return &FakeVolume{podUID, volName, plugin, MetricsNil{}}, nil } +func (plugin *FakeVolumePlugin) NewAttacher(spec *Spec) (Attacher, error) { + plugin.NewAttacherCallCount = plugin.NewAttacherCallCount + 1 + return &FakeVolume{}, nil +} + +func (plugin *FakeVolumePlugin) NewDetacher(name string, podUID types.UID) (Detacher, error) { + plugin.NewDetacherCallCount = plugin.NewDetacherCallCount + 1 + return &FakeVolume{}, nil +} + func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) { return &fakeRecycler{"/attributesTransferredFromSpec", MetricsNil{}}, nil } @@ -215,6 +228,14 @@ func (fv *FakeVolume) TearDownAt(dir string) error { return os.RemoveAll(dir) } +func (fv *FakeVolume) Attach() error { + return nil +} + +func (fv *FakeVolume) Detach() error { + return nil +} + type fakeRecycler struct { path string MetricsNil diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 6a7c039b4b..3bc3798053 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -117,7 +117,7 @@ type Provisioner interface { NewPersistentVolumeTemplate() (*api.PersistentVolume, error) } -// Delete removes the resource from the underlying storage provider. Calls to this method should block until +// Deleter removes the resource from the underlying storage provider. Calls to this method should block until // the deletion is complete. Any error returned indicates the volume has failed to be reclaimed. // A nil return indicates success. type Deleter interface { @@ -126,6 +126,17 @@ type Deleter interface { Delete() error } +// Attacher can attach a volume to a node. +type Attacher interface { + Volume + Attach() error +} + +// Detacher can detach a volume from a node. +type Detacher interface { + Detach() error +} + func RenameDirectory(oldPath, newName string) (string, error) { newPath, err := ioutil.TempDir(path.Dir(oldPath), newName) if err != nil { diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index 27ff590423..1500abb3cb 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -51,7 +51,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { testClient := clientset.NewForConfigOrDie(&client.Config{Host: s.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) host := volume.NewFakeVolumeHost("/tmp/fake", nil, nil) - plugins := []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}}} + plugins := []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}, 0, 0}} cloud := &fake_cloud.FakeCloud{} binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Second)