Add support for attacher/detacher interface in Flex volume

pull/6/head
Chakravarthy Nelluri 2017-02-21 07:49:36 -05:00
parent 46b20acba2
commit 0d2af70e95
33 changed files with 1928 additions and 888 deletions

View File

@ -19,10 +19,13 @@
usage() {
err "Invalid usage. Usage: "
err "\t$0 init"
err "\t$0 attach <json params>"
err "\t$0 detach <mount device>"
err "\t$0 mount <mount dir> <mount device> <json params>"
err "\t$0 unmount <mount dir>"
err "\t$0 attach <json params> <nodename>"
err "\t$0 detach <mount device> <nodename>"
err "\t$0 waitforattach <mount device> <json params>"
err "\t$0 mountdevice <mount dir> <mount device> <json params>"
err "\t$0 unmountdevice <mount dir>"
err "\t$0 getvolumename <json params>"
err "\t$0 isattached <json params> <nodename>"
exit 1
}
@ -43,16 +46,23 @@ ismounted() {
fi
}
attach() {
VOLUMEID=$(echo $1 | jq -r '.volumeID')
SIZE=$(echo $1 | jq -r '.size')
VG=$(echo $1|jq -r '.volumegroup')
getdevice() {
VOLUMEID=$(echo ${JSON_PARAMS} | jq -r '.volumeID')
VG=$(echo ${JSON_PARAMS}|jq -r '.volumegroup')
# LVM substitutes - with --
VOLUMEID=`echo $VOLUMEID|sed s/-/--/g`
VG=`echo $VG|sed s/-/--/g`
DMDEV="/dev/mapper/${VG}-${VOLUMEID}"
echo ${DMDEV}
}
attach() {
JSON_PARAMS=$1
SIZE=$(echo $1 | jq -r '.size')
DMDEV=$(getdevice)
if [ ! -b "${DMDEV}" ]; then
err "{\"status\": \"Failure\", \"message\": \"Volume ${VOLUMEID} does not exist\"}"
exit 1
@ -66,7 +76,12 @@ detach() {
exit 0
}
domount() {
waitforattach() {
shift
attach $*
}
domountdevice() {
MNTPATH=$1
DMDEV=$2
FSTYPE=$(echo $3|jq -r '.["kubernetes.io/fsType"]')
@ -101,8 +116,13 @@ domount() {
exit 0
}
unmount() {
unmountdevice() {
MNTPATH=$1
if [ ! -d ${MNTPATH} ]; then
log "{\"status\": \"Success\"}"
exit 0
fi
if [ $(ismounted) -eq 0 ] ; then
log "{\"status\": \"Success\"}"
exit 0
@ -113,12 +133,27 @@ unmount() {
err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}"
exit 1
fi
rmdir ${MNTPATH} &> /dev/null
log "{\"status\": \"Success\"}"
exit 0
}
getvolumename() {
JSON_PARAMS=$1
DMDEV=$(getdevice)
# get lvm device UUID
UUID=`lvs -o lv_uuid --noheadings ${DMDEV} 2>/dev/null|tr -d " "`
log "{\"status\": \"Success\", \"volumeName\":\"${UUID}\"}"
exit 0
}
isattached() {
log "{\"status\": \"Success\", \"attached\":true}"
exit 0
}
op=$1
if [ "$op" = "init" ]; then
@ -139,14 +174,24 @@ case "$op" in
detach)
detach $*
;;
mount)
domount $*
waitforattach)
waitforattach $*
;;
unmount)
unmount $*
mountdevice)
domountdevice $*
;;
unmountdevice)
unmountdevice $*
;;
getvolumename)
getvolumename $*
;;
isattached)
isattached $*
;;
*)
usage
err "{ \"status\": \"Not supported\" }"
exit 1
esac
exit 1

120
examples/volumes/flexvolume/nfs Executable file
View File

@ -0,0 +1,120 @@
#!/bin/bash
# Copyright 2015 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Notes:
# - Please install "jq" package before using this driver.
usage() {
err "Invalid usage. Usage: "
err "\t$0 init"
err "\t$0 mount <mount dir> <json params>"
err "\t$0 unmount <mount dir>"
err "\t$0 getvolumename <json params>"
exit 1
}
err() {
echo -ne $* 1>&2
}
log() {
echo -ne $* >&1
}
ismounted() {
MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1`
if [ "${MOUNT}" == "${MNTPATH}" ]; then
echo "1"
else
echo "0"
fi
}
domount() {
MNTPATH=$1
NFS_SERVER=$(echo $2 | jq -r '.server')
SHARE=$(echo $2 | jq -r '.share')
if [ $(ismounted) -eq 1 ] ; then
log "{\"status\": \"Success\"}"
exit 0
fi
mkdir -p ${MNTPATH} &> /dev/null
mount -t nfs ${NFS_SERVER}:/${SHARE} ${MNTPATH} &> /dev/null
if [ $? -ne 0 ]; then
err "{ \"status\": \"Failure\", \"message\": \"Failed to mount ${NFS_SERVER}:${SHARE} at ${MNTPATH}\"}"
exit 1
fi
log "{\"status\": \"Success\"}"
exit 0
}
unmount() {
MNTPATH=$1
if [ $(ismounted) -eq 0 ] ; then
log "{\"status\": \"Success\"}"
exit 0
fi
umount ${MNTPATH} &> /dev/null
if [ $? -ne 0 ]; then
err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}"
exit 1
fi
log "{\"status\": \"Success\"}"
exit 0
}
getvolumename() {
NFS_SERVER=$(echo $1 | jq -r '.server')
SHARE=$(echo $1 | jq -r '.share')
log "{\"status\": \"Success\", \"volumeName\": \"${NFS_SERVER}/${SHARE}\"}"
exit 0
}
op=$1
if [ "$op" = "init" ]; then
log "{\"status\": \"Success\"}"
exit 0
fi
if [ $# -lt 2 ]; then
usage
fi
shift
case "$op" in
mount)
domount $*
;;
unmount)
unmount $*
;;
getvolumename)
getvolumename $*
;;
*)
err "{ \"status\": \"Not supported\" }"
exit 1
esac
exit 1

View File

@ -0,0 +1,22 @@
apiVersion: v1
kind: Pod
metadata:
name: nginx-nfs
namespace: default
spec:
containers:
- name: nginx-nfs
image: nginx
volumeMounts:
- name: test
mountPath: /data
ports:
- containerPort: 80
volumes:
- name: test
flexVolume:
driver: "k8s/nfs"
fsType: "nfs"
options:
server: "172.16.0.25"
share: "dws_nas_scratch"

View File

@ -2,6 +2,7 @@ apiVersion: v1
kind: Pod
metadata:
name: nginx
namespace: default
spec:
containers:
- name: nginx
@ -20,4 +21,3 @@ spec:
volumeID: "vol1"
size: "1000m"
volumegroup: "kube_vg"

View File

@ -251,3 +251,7 @@ func (eic execInContainer) SetStdin(in io.Reader) {
func (eic execInContainer) SetStdout(out io.Writer) {
//unimplemented
}
func (eic execInContainer) Stop() {
//unimplemented
}

View File

@ -44,6 +44,8 @@ func (f *FakeCmd) SetStdin(in io.Reader) {}
func (f *FakeCmd) SetStdout(out io.Writer) {}
func (f *FakeCmd) Stop() {}
type fakeExitError struct {
exited bool
statusCode int

View File

@ -20,6 +20,7 @@ import (
"io"
osexec "os/exec"
"syscall"
"time"
)
// ErrExecutableNotFound is returned if the executable is not found.
@ -48,6 +49,11 @@ type Cmd interface {
SetDir(dir string)
SetStdin(in io.Reader)
SetStdout(out io.Writer)
// Stops the command by sending SIGTERM. It is not guaranteed the
// process will stop before this function returns. If the process is not
// responding, an internal timer function will send a SIGKILL to force
// terminate after 10 seconds.
Stop()
}
// ExitError is an interface that presents an API similar to os.ProcessState, which is
@ -110,6 +116,21 @@ func (cmd *cmdWrapper) Output() ([]byte, error) {
return out, nil
}
// Stop is part of the Cmd interface.
func (cmd *cmdWrapper) Stop() {
c := (*osexec.Cmd)(cmd)
if c.ProcessState.Exited() {
return
}
c.Process.Signal(syscall.SIGTERM)
time.AfterFunc(10*time.Second, func() {
if c.ProcessState.Exited() {
return
}
c.Process.Signal(syscall.SIGKILL)
})
}
func handleError(err error) error {
if ee, ok := err.(*osexec.ExitError); ok {
// Force a compile fail if exitErrorWrapper can't convert to ExitError.

View File

@ -90,6 +90,10 @@ func (fake *FakeCmd) Output() ([]byte, error) {
return nil, fmt.Errorf("unimplemented")
}
func (fake *FakeCmd) Stop() {
// no-op
}
// A simple fake ExitError type.
type FakeExitError struct {
Status int

View File

@ -11,8 +11,20 @@ load(
go_library(
name = "go_default_library",
srcs = [
"flexvolume.go",
"flexvolume_util.go",
"attacher.go",
"attacher-defaults.go",
"detacher.go",
"detacher-defaults.go",
"driver-call.go",
"mounter.go",
"mounter-defaults.go",
"plugin.go",
"plugin-defaults.go",
"probe.go",
"unmounter.go",
"unmounter-defaults.go",
"util.go",
"volume.go",
],
tags = ["automanaged"],
deps = [
@ -23,14 +35,21 @@ go_library(
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
],
)
go_test(
name = "go_default_test",
srcs = ["flexvolume_test.go"],
srcs = [
"attacher_test.go",
"common_test.go",
"detacher_test.go",
"flexvolume_test.go",
"mounter_test.go",
"plugin_test.go",
"unmounter_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [

View File

@ -1,6 +1,7 @@
approvers:
- chakri-nelluri
- saad-ali
- MikaelCluseau
reviewers:
- ivan4th
- rata
@ -20,3 +21,4 @@ reviewers:
- rkouj
- msau42
- chakri-nelluri
- MikaelCluseau

View File

@ -0,0 +1,73 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"fmt"
"path"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
)
type attacherDefaults flexVolumeAttacher
// Attach is part of the volume.Attacher interface
func (a *attacherDefaults) Attach(spec *volume.Spec, hostName types.NodeName) (string, error) {
glog.Warning(logPrefix(a.plugin), "using default Attach for volume ", spec.Name, ", host ", hostName)
return "", nil
}
// WaitForAttach is part of the volume.Attacher interface
func (a *attacherDefaults) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
glog.Warning(logPrefix(a.plugin), "using default WaitForAttach for volume ", spec.Name, ", device ", devicePath)
return devicePath, nil
}
// GetDeviceMountPath is part of the volume.Attacher interface
func (a *attacherDefaults) GetDeviceMountPath(spec *volume.Spec, mountsDir string) (string, error) {
glog.Warning(logPrefix(a.plugin), "using default GetDeviceMountPath for volume ", spec.Name, ", mountsDir ", mountsDir)
volumeName, err := a.plugin.GetVolumeName(spec)
if err != nil {
return "", fmt.Errorf("GetVolumeName failed from GetDeviceMountPath: %s", err)
}
return path.Join(mountsDir, volumeName), nil
}
// MountDevice is part of the volume.Attacher interface
func (a *attacherDefaults) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, mounter mount.Interface) error {
glog.Warning(logPrefix(a.plugin), "using default MountDevice for volume ", spec.Name, ", device ", devicePath, ", deviceMountPath ", deviceMountPath)
volSource, readOnly := getVolumeSource(spec)
options := make([]string, 0)
if readOnly {
options = append(options, "ro")
} else {
options = append(options, "rw")
}
diskMounter := &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()}
return diskMounter.FormatAndMount(devicePath, deviceMountPath, volSource.FSType, options)
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"path"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
)
type flexVolumeAttacher struct {
plugin *flexVolumePlugin
}
var _ volume.Attacher = &flexVolumeAttacher{}
// Attach is part of the volume.Attacher interface
func (a *flexVolumeAttacher) Attach(spec *volume.Spec, hostName types.NodeName) (string, error) {
call := a.plugin.NewDriverCall(attachCmd)
call.AppendSpec(spec, a.plugin.host, nil)
call.Append(string(hostName))
status, err := call.Run()
if isCmdNotSupportedErr(err) {
return (*attacherDefaults)(a).Attach(spec, hostName)
} else if err != nil {
return "", err
}
return status.DevicePath, err
}
// WaitForAttach is part of the volume.Attacher interface
func (a *flexVolumeAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
call := a.plugin.NewDriverCallWithTimeout(waitForAttachCmd, timeout)
call.Append(devicePath)
call.AppendSpec(spec, a.plugin.host, nil)
status, err := call.Run()
if isCmdNotSupportedErr(err) {
return (*attacherDefaults)(a).WaitForAttach(spec, devicePath, timeout)
} else if err != nil {
return "", err
}
return status.DevicePath, nil
}
// GetDeviceMountPath is part of the volume.Attacher interface
func (a *flexVolumeAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
mountsDir := path.Join(a.plugin.host.GetPluginDir(flexVolumePluginName), a.plugin.driverName, "mounts")
return (*attacherDefaults)(a).GetDeviceMountPath(spec, mountsDir)
}
// MountDevice is part of the volume.Attacher interface
func (a *flexVolumeAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
// Mount only once.
alreadyMounted, err := prepareForMount(a.plugin.host.GetMounter(), deviceMountPath)
if err != nil {
return err
}
if alreadyMounted {
return nil
}
call := a.plugin.NewDriverCall(mountDeviceCmd)
call.Append(deviceMountPath)
call.Append(devicePath)
call.AppendSpec(spec, a.plugin.host, nil)
_, err = call.Run()
if isCmdNotSupportedErr(err) {
// Devicepath is empty if the plugin does not support attach calls. Ignore mountDevice calls if the
// plugin does not implement attach interface.
if devicePath != "" {
return (*attacherDefaults)(a).MountDevice(spec, devicePath, deviceMountPath, a.plugin.host.GetMounter())
} else {
return nil
}
}
return err
}
func (a *flexVolumeAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
for _, spec := range specs {
volumesAttachedCheck[spec] = true
call := a.plugin.NewDriverCall(isAttached)
call.AppendSpec(spec, a.plugin.host, nil)
call.Append(string(nodeName))
status, err := call.Run()
if isCmdNotSupportedErr(err) {
return nil, nil
} else if err == nil {
if !status.Attached {
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume (%q) is no longer attached", spec.Name())
}
}
}
return volumesAttachedCheck, nil
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"k8s.io/kubernetes/pkg/volume"
"testing"
"time"
)
func TestAttach(t *testing.T) {
spec := fakeVolumeSpec()
plugin, _ := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, notSupportedOutput(), attachCmd,
specJson(plugin, spec, nil), "localhost"),
)
a, _ := plugin.NewAttacher()
a.Attach(spec, "localhost")
}
func TestWaitForAttach(t *testing.T) {
spec := fakeVolumeSpec()
plugin, _ := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, notSupportedOutput(), waitForAttachCmd, "/dev/sdx",
specJson(plugin, spec, nil)),
)
a, _ := plugin.NewAttacher()
a.WaitForAttach(spec, "/dev/sdx", 1*time.Second)
}
func TestMountDevice(t *testing.T) {
spec := fakeVolumeSpec()
plugin, rootDir := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, notSupportedOutput(), mountDeviceCmd, rootDir+"/mount-dir", "/dev/sdx",
specJson(plugin, spec, nil)),
)
a, _ := plugin.NewAttacher()
a.MountDevice(spec, "/dev/sdx", rootDir+"/mount-dir")
}
func TestIsVolumeAttached(t *testing.T) {
spec := fakeVolumeSpec()
plugin, _ := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, notSupportedOutput(), isAttached, specJson(plugin, spec, nil), "localhost"),
)
a, _ := plugin.NewAttacher()
specs := []*volume.Spec{spec}
a.VolumesAreAttached(specs, "localhost")
}

View File

@ -0,0 +1,139 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"encoding/json"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
)
func testPlugin() (*flexVolumePlugin, string) {
rootDir, err := utiltesting.MkTmpdir("flexvolume_test")
if err != nil {
panic("error creating temp dir: " + err.Error())
}
return &flexVolumePlugin{
driverName: "test",
execPath: "/plugin",
host: volumetesting.NewFakeVolumeHost(rootDir, nil, nil),
unsupportedCommands: []string{},
}, rootDir
}
func assertDriverCall(t *testing.T, output exec.FakeCombinedOutputAction, expectedCommand string, expectedArgs ...string) exec.FakeCommandAction {
return func(cmd string, args ...string) exec.Cmd {
if cmd != "/plugin/test" {
t.Errorf("Wrong executable called: got %v, expected %v", cmd, "/plugin/test")
}
if args[0] != expectedCommand {
t.Errorf("Wrong command called: got %v, expected %v", args[0], expectedCommand)
}
cmdArgs := args[1:]
if !sameArgs(cmdArgs, expectedArgs) {
t.Errorf("Wrong args for %s: got %v, expected %v", args[0], cmdArgs, expectedArgs)
}
return &exec.FakeCmd{
Argv: args,
CombinedOutputScript: []exec.FakeCombinedOutputAction{output},
}
}
}
func fakeRunner(fakeCommands ...exec.FakeCommandAction) exec.Interface {
return &exec.FakeExec{
CommandScript: fakeCommands,
}
}
func fakeResultOutput(result interface{}) exec.FakeCombinedOutputAction {
return func() ([]byte, error) {
bytes, err := json.Marshal(result)
if err != nil {
panic("Unable to marshal result: " + err.Error())
}
return bytes, nil
}
}
func successOutput() exec.FakeCombinedOutputAction {
return fakeResultOutput(&DriverStatus{StatusSuccess, "", "", "", true})
}
func notSupportedOutput() exec.FakeCombinedOutputAction {
return fakeResultOutput(&DriverStatus{StatusNotSupported, "", "", "", false})
}
func sameArgs(args, expectedArgs []string) bool {
if len(args) != len(expectedArgs) {
return false
}
for i, v := range args {
if v != expectedArgs[i] {
return false
}
}
return true
}
func fakeVolumeSpec() *volume.Spec {
vol := &v1.Volume{
Name: "vol1",
VolumeSource: v1.VolumeSource{
FlexVolume: &v1.FlexVolumeSource{
Driver: "kubernetes.io/fakeAttacher",
ReadOnly: false,
},
},
}
return volume.NewSpecFromVolume(vol)
}
func fakePersistentVolumeSpec() *volume.Spec {
vol := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "vol1",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
FlexVolume: &v1.FlexVolumeSource{
Driver: "kubernetes.io/fakeAttacher",
ReadOnly: false,
},
},
},
}
return volume.NewSpecFromPersistentVolume(vol, false)
}
func specJson(plugin *flexVolumePlugin, spec *volume.Spec, extraOptions map[string]string) string {
o, err := NewOptionsForDriver(spec, plugin.host, extraOptions)
if err != nil {
panic("Failed to convert spec: " + err.Error())
}
bytes, err := json.Marshal(o)
if err != nil {
panic("Unable to marshal result: " + err.Error())
}
return string(bytes)
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume/util"
)
type detacherDefaults flexVolumeDetacher
// Detach is part of the volume.Detacher interface.
func (d *detacherDefaults) Detach(deviceName string, hostName types.NodeName) error {
glog.Warning(logPrefix(d.plugin), "using default Detach for device ", deviceName, ", host ", hostName)
return nil
}
// WaitForDetach is part of the volume.Detacher interface.
func (d *detacherDefaults) WaitForDetach(devicePath string, timeout time.Duration) error {
glog.Warning(logPrefix(d.plugin), "using default WaitForDetach for device ", devicePath)
return nil
}
// UnmountDevice is part of the volume.Detacher interface.
func (d *detacherDefaults) UnmountDevice(deviceMountPath string) error {
glog.Warning(logPrefix(d.plugin), "using default UnmountDevice for device mount path ", deviceMountPath)
return util.UnmountPath(deviceMountPath, d.plugin.host.GetMounter())
}

View File

@ -0,0 +1,97 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"fmt"
"os"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
type flexVolumeDetacher struct {
plugin *flexVolumePlugin
}
var _ volume.Detacher = &flexVolumeDetacher{}
// Detach is part of the volume.Detacher interface.
func (d *flexVolumeDetacher) Detach(deviceName string, hostName types.NodeName) error {
call := d.plugin.NewDriverCall(detachCmd)
call.Append(deviceName)
call.Append(string(hostName))
_, err := call.Run()
if isCmdNotSupportedErr(err) {
return (*detacherDefaults)(d).Detach(deviceName, hostName)
}
return err
}
// WaitForDetach is part of the volume.Detacher interface.
func (d *flexVolumeDetacher) WaitForDetach(devicePath string, timeout time.Duration) error {
call := d.plugin.NewDriverCallWithTimeout(waitForDetachCmd, timeout)
call.Append(devicePath)
_, err := call.Run()
if isCmdNotSupportedErr(err) {
return (*detacherDefaults)(d).WaitForDetach(devicePath, timeout)
}
return err
}
// UnmountDevice is part of the volume.Detacher interface.
func (d *flexVolumeDetacher) UnmountDevice(deviceMountPath string) error {
if pathExists, pathErr := util.PathExists(deviceMountPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.Warningf("Warning: Unmount skipped because path does not exist: %v", deviceMountPath)
return nil
}
notmnt, err := isNotMounted(d.plugin.host.GetMounter(), deviceMountPath)
if err != nil {
return err
}
if notmnt {
glog.Warningf("Warning: Path: %v already unmounted", deviceMountPath)
} else {
call := d.plugin.NewDriverCall(unmountDeviceCmd)
call.Append(deviceMountPath)
_, err := call.Run()
if isCmdNotSupportedErr(err) {
err = (*detacherDefaults)(d).UnmountDevice(deviceMountPath)
}
if err != nil {
return err
}
}
// Flexvolume driver may remove the directory. Ignore if it does.
if pathExists, pathErr := util.PathExists(deviceMountPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
return nil
}
return os.Remove(deviceMountPath)
}

View File

@ -0,0 +1,43 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"testing"
)
func TestDetach(t *testing.T) {
plugin, _ := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, notSupportedOutput(), detachCmd,
"sdx", "localhost"),
)
d, _ := plugin.NewDetacher()
d.Detach("sdx", "localhost")
}
func TestUnmountDevice(t *testing.T) {
plugin, rootDir := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, notSupportedOutput(), unmountDeviceCmd,
rootDir+"/mount-dir"),
)
d, _ := plugin.NewDetacher()
d.UnmountDevice(rootDir + "/mount-dir")
}

View File

@ -0,0 +1,224 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/volume"
)
const (
// Driver calls
initCmd = "init"
getVolumeNameCmd = "getvolumename"
isAttached = "isattached"
attachCmd = "attach"
waitForAttachCmd = "waitforattach"
mountDeviceCmd = "mountdevice"
detachCmd = "detach"
waitForDetachCmd = "waitfordetach"
unmountDeviceCmd = "unmountdevice"
mountCmd = "mount"
unmountCmd = "unmount"
// Option keys
optionFSType = "kubernetes.io/fsType"
optionReadWrite = "kubernetes.io/readwrite"
optionKeySecret = "kubernetes.io/secret"
optionFSGroup = "kubernetes.io/fsGroup"
optionMountsDir = "kubernetes.io/mountsDir"
)
const (
// StatusSuccess represents the successful completion of command.
StatusSuccess = "Success"
// StatusFailed represents that the command failed.
StatusFailure = "Failed"
// StatusNotSupported represents that the command is not supported.
StatusNotSupported = "Not supported"
)
var (
TimeoutError = fmt.Errorf("Timeout")
)
// DriverCall implements the basic contract between FlexVolume and its driver.
// The caller is responsible for providing the required args.
type DriverCall struct {
Command string
Timeout time.Duration
plugin *flexVolumePlugin
args []string
}
func (plugin *flexVolumePlugin) NewDriverCall(command string) *DriverCall {
return plugin.NewDriverCallWithTimeout(command, 0)
}
func (plugin *flexVolumePlugin) NewDriverCallWithTimeout(command string, timeout time.Duration) *DriverCall {
return &DriverCall{
Command: command,
Timeout: timeout,
plugin: plugin,
args: []string{command},
}
}
func (dc *DriverCall) Append(arg string) {
dc.args = append(dc.args, arg)
}
func (dc *DriverCall) AppendSpec(spec *volume.Spec, host volume.VolumeHost, extraOptions map[string]string) error {
optionsForDriver, err := NewOptionsForDriver(spec, host, extraOptions)
if err != nil {
return err
}
jsonBytes, err := json.Marshal(optionsForDriver)
if err != nil {
return fmt.Errorf("Failed to marshal spec, error: %s", err.Error())
}
dc.Append(string(jsonBytes))
return nil
}
func (dc *DriverCall) Run() (*DriverStatus, error) {
if dc.plugin.isUnsupported(dc.Command) {
return nil, errors.New(StatusNotSupported)
}
execPath := dc.plugin.getExecutable()
cmd := dc.plugin.runner.Command(execPath, dc.args...)
timeout := false
if dc.Timeout > 0 {
timer := time.AfterFunc(dc.Timeout, func() {
timeout = true
cmd.Stop()
})
defer timer.Stop()
}
output, execErr := cmd.CombinedOutput()
if execErr != nil {
if timeout {
return nil, TimeoutError
}
_, err := handleCmdResponse(dc.Command, output)
if err == nil {
glog.Errorf("FlexVolume: driver bug: %s: exec error (%s) but no error in response.", execPath, execErr)
return nil, execErr
}
if isCmdNotSupportedErr(err) {
dc.plugin.unsupported(dc.Command)
} else {
glog.Warningf("FlexVolume: driver call failed: executable: %s, args: %s, error: %s, output: %s", execPath, dc.args, execErr.Error(), output)
}
return nil, err
}
status, err := handleCmdResponse(dc.Command, output)
if err != nil {
if isCmdNotSupportedErr(err) {
dc.plugin.unsupported(dc.Command)
}
return nil, err
}
return status, nil
}
// OptionsForDriver represents the spec given to the driver.
type OptionsForDriver map[string]string
func NewOptionsForDriver(spec *volume.Spec, host volume.VolumeHost, extraOptions map[string]string) (OptionsForDriver, error) {
volSource, readOnly := getVolumeSource(spec)
options := map[string]string{}
options[optionFSType] = volSource.FSType
if readOnly {
options[optionReadWrite] = "ro"
} else {
options[optionReadWrite] = "rw"
}
for key, value := range extraOptions {
options[key] = value
}
for key, value := range volSource.Options {
options[key] = value
}
return OptionsForDriver(options), nil
}
// DriverStatus represents the return value of the driver callout.
type DriverStatus struct {
// Status of the callout. One of "Success", "Failure" or "Not supported".
Status string `json:"status"`
// Reason for success/failure.
Message string `json:"message,omitempty"`
// Path to the device attached. This field is valid only for attach calls.
// ie: /dev/sdx
DevicePath string `json:"device,omitempty"`
// Cluster wide unique name of the volume.
VolumeName string `json:"volumeName,omitempty"`
// Represents volume is attached on the node
Attached bool `json:"attached,omitempty"`
}
// isCmdNotSupportedErr checks if the error corresponds to command not supported by
// driver.
func isCmdNotSupportedErr(err error) bool {
if err != nil && err.Error() == StatusNotSupported {
return true
}
return false
}
// handleCmdResponse processes the command output and returns the appropriate
// error code or message.
func handleCmdResponse(cmd string, output []byte) (*DriverStatus, error) {
var status DriverStatus
if err := json.Unmarshal(output, &status); err != nil {
glog.Errorf("Failed to unmarshal output for command: %s, output: %s, error: %s", cmd, string(output), err.Error())
return nil, err
} else if status.Status == StatusNotSupported {
glog.V(5).Infof("%s command is not supported by the driver", cmd)
return nil, errors.New(status.Status)
} else if status.Status != StatusSuccess {
errMsg := fmt.Sprintf("%s command failed, status: %s, reason: %s", cmd, status.Status, status.Message)
glog.Errorf(errMsg)
return nil, fmt.Errorf("%s", errMsg)
}
return &status, nil
}

View File

@ -1,440 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"encoding/base64"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin {
plugins := []volume.VolumePlugin{}
files, _ := ioutil.ReadDir(pluginDir)
for _, f := range files {
// only directories are counted as plugins
// and pluginDir/dirname/dirname should be an executable
// unless dirname contains '~' for escaping namespace
// e.g. dirname = vendor~cifs
// then, executable will be pluginDir/dirname/cifs
if f.IsDir() {
execPath := path.Join(pluginDir, f.Name())
plugins = append(plugins, &flexVolumePlugin{driverName: utilstrings.UnescapePluginName(f.Name()), execPath: execPath})
}
}
return plugins
}
// FlexVolumePlugin object.
type flexVolumePlugin struct {
driverName string
execPath string
host volume.VolumeHost
}
// Init initializes the plugin.
func (plugin *flexVolumePlugin) Init(host volume.VolumeHost) error {
plugin.host = host
// call the init script
u := &flexVolumeUtil{}
return u.init(plugin)
}
func (plugin *flexVolumePlugin) getExecutable() string {
parts := strings.Split(plugin.driverName, "/")
execName := parts[len(parts)-1]
return path.Join(plugin.execPath, execName)
}
func (plugin *flexVolumePlugin) GetPluginName() string {
return plugin.driverName
}
func (plugin *flexVolumePlugin) GetVolumeName(spec *volume.Spec) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
return volumeSource.Driver, nil
}
// CanSupport checks whether the plugin can support the input volume spec.
func (plugin *flexVolumePlugin) CanSupport(spec *volume.Spec) bool {
source, _, _ := getVolumeSource(spec)
return (source != nil) && (source.Driver == plugin.driverName)
}
func (plugin *flexVolumePlugin) RequiresRemount() bool {
return false
}
// GetAccessModes gets the allowed access modes for this plugin.
func (plugin *flexVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
v1.ReadOnlyMany,
}
}
// NewMounter is the mounter routine to build the volume.
func (plugin *flexVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
fv, _, err := getVolumeSource(spec)
if err != nil {
return nil, err
}
secrets := make(map[string]string)
if fv.SecretRef != nil {
kubeClient := plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, fmt.Errorf("Cannot get kube client")
}
secretName, err := kubeClient.Core().Secrets(pod.Namespace).Get(fv.SecretRef.Name, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("Couldn't get secret %v/%v err: %v", pod.Namespace, fv.SecretRef, err)
return nil, err
}
for name, data := range secretName.Data {
secrets[name] = base64.StdEncoding.EncodeToString(data)
glog.V(1).Infof("found flex volume secret info: %s", name)
}
}
return plugin.newMounterInternal(spec, pod, &flexVolumeUtil{}, plugin.host.GetMounter(), exec.New(), secrets)
}
// newMounterInternal is the internal mounter routine to build the volume.
func (plugin *flexVolumePlugin) newMounterInternal(spec *volume.Spec, pod *v1.Pod, manager flexVolumeManager, mounter mount.Interface, runner exec.Interface, secrets map[string]string) (volume.Mounter, error) {
source, _, err := getVolumeSource(spec)
if err != nil {
return nil, err
}
return &flexVolumeMounter{
flexVolumeDisk: &flexVolumeDisk{
podUID: pod.UID,
podNamespace: pod.Namespace,
podName: pod.Name,
volName: spec.Name(),
driverName: source.Driver,
execPath: plugin.getExecutable(),
mounter: mounter,
plugin: plugin,
secrets: secrets,
},
fsType: source.FSType,
readOnly: source.ReadOnly,
options: source.Options,
runner: runner,
manager: manager,
blockDeviceMounter: &mount.SafeFormatAndMount{Interface: mounter, Runner: runner},
}, nil
}
// NewUnmounter is the unmounter routine to clean the volume.
func (plugin *flexVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
return plugin.newUnmounterInternal(volName, podUID, &flexVolumeUtil{}, plugin.host.GetMounter(), exec.New())
}
// newUnmounterInternal is the internal unmounter routine to clean the volume.
func (plugin *flexVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager flexVolumeManager, mounter mount.Interface, runner exec.Interface) (volume.Unmounter, error) {
return &flexVolumeUnmounter{
flexVolumeDisk: &flexVolumeDisk{
podUID: podUID,
volName: volName,
driverName: plugin.driverName,
execPath: plugin.getExecutable(),
mounter: mounter,
plugin: plugin,
},
runner: runner,
manager: manager,
}, nil
}
func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, sourceName string) (*volume.Spec, error) {
flexVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
FlexVolume: &v1.FlexVolumeSource{
Driver: sourceName,
},
},
}
return volume.NewSpecFromVolume(flexVolume), nil
}
// flexVolume is the disk resource provided by this plugin.
type flexVolumeDisk struct {
// podUID is the UID of the pod.
podUID types.UID
// podNamespace is the namespace of the pod.
podNamespace string
// podName is the name of the pod.
podName string
// volName is the name of the pod volume.
volName string
// driverName is the name of the plugin driverName.
driverName string
// Driver executable used to setup the volume.
execPath string
// mounter provides the interface that is used to mount the actual
// block device.
mounter mount.Interface
// secret for the volume.
secrets map[string]string
plugin *flexVolumePlugin
}
// FlexVolumeUnmounter is the disk that will be cleaned by this plugin.
type flexVolumeUnmounter struct {
*flexVolumeDisk
// Runner used to teardown the volume.
runner exec.Interface
// manager is the utility interface that provides API calls to the
// driverName to setup & teardown disks
manager flexVolumeManager
volume.MetricsNil
}
// FlexVolumeMounter is the disk that will be exposed by this plugin.
type flexVolumeMounter struct {
*flexVolumeDisk
// fsType is the type of the filesystem to create on the volume.
fsType string
// readOnly specifies whether the disk will be setup as read-only.
readOnly bool
// options are the extra params that will be passed to the plugin
// driverName.
options map[string]string
// Runner used to setup the volume.
runner exec.Interface
// manager is the utility interface that provides API calls to the
// driverName to setup & teardown disks
manager flexVolumeManager
// blockDeviceMounter provides the interface to create filesystem if the
// filesystem doesn't exist.
blockDeviceMounter mount.Interface
volume.MetricsNil
}
// SetUp creates new directory.
func (f *flexVolumeMounter) SetUp(fsGroup *int64) error {
return f.SetUpAt(f.GetPath(), fsGroup)
}
// GetAttributes get the flex volume attributes. The attributes will be queried
// using plugin callout after we finalize the callout syntax.
func (f flexVolumeMounter) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: f.readOnly,
Managed: false,
SupportsSELinux: false,
}
}
// Checks prior to mount operations to verify that the required components (binaries, etc.)
// to mount the volume are available on the underlying node.
// If not, it returns an error
func (f *flexVolumeMounter) CanMount() error {
return nil
}
// flexVolumeManager is the abstract interface to flex volume ops.
type flexVolumeManager interface {
// Attaches the disk to the kubelet's host machine.
attach(mounter *flexVolumeMounter) (string, error)
// Detaches the disk from the kubelet's host machine.
detach(unmounter *flexVolumeUnmounter, dir string) error
// Mounts the disk on the Kubelet's host machine.
mount(mounter *flexVolumeMounter, mnt, dir string) error
// Unmounts the disk from the Kubelet's host machine.
unmount(unounter *flexVolumeUnmounter, dir string) error
}
// SetUpAt creates new directory.
func (f *flexVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
notmnt, err := f.blockDeviceMounter.IsLikelyNotMountPoint(dir)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("Cannot validate mount point: %s %v", dir, err)
return err
}
if !notmnt {
return nil
}
if f.options == nil {
f.options = make(map[string]string)
}
f.options[optionFSType] = f.fsType
// Read write mount options.
if f.readOnly {
f.options[optionReadWrite] = "ro"
} else {
f.options[optionReadWrite] = "rw"
}
// Extract secret and pass it as options.
for name, secret := range f.secrets {
f.options[optionKeySecret+"/"+name] = secret
}
glog.V(4).Infof("attempting to attach volume: %s with options %v", f.volName, f.options)
device, err := f.manager.attach(f)
if err != nil {
if !isCmdNotSupportedErr(err) {
glog.Errorf("failed to attach volume: %s", f.volName)
return err
}
// Attach not supported or required. Continue to mount.
}
glog.V(4).Infof("attempting to mount volume: %s", f.volName)
if err := f.manager.mount(f, device, dir); err != nil {
if !isCmdNotSupportedErr(err) {
glog.Errorf("failed to mount volume: %s", f.volName)
return err
}
options := make([]string, 0)
if f.readOnly {
options = append(options, "ro")
} else {
options = append(options, "rw")
}
// Extract secret and pass it as options.
for name, secret := range f.secrets {
f.options[optionKeySecret+"/"+name] = secret
}
os.MkdirAll(dir, 0750)
// Mount not supported by driver. Use core mounting logic.
glog.V(4).Infof("attempting to mount the volume: %s to device: %s", f.volName, device)
err = f.blockDeviceMounter.Mount(string(device), dir, f.fsType, options)
if err != nil {
glog.Errorf("failed to mount the volume: %s to device: %s, error: %v", f.volName, device, err)
return err
}
}
glog.V(4).Infof("Successfully mounted volume: %s on device: %s", f.volName, device)
return nil
}
// IsReadOnly returns true if the volume is read only.
func (f *flexVolumeMounter) IsReadOnly() bool {
return f.readOnly
}
// GetPathFromPlugin gets the actual volume mount directory based on plugin.
func (f *flexVolumeDisk) GetPath() string {
name := f.driverName
return f.plugin.host.GetPodVolumeDir(f.podUID, utilstrings.EscapeQualifiedNameForDisk(name), f.volName)
}
// TearDown simply deletes everything in the directory.
func (f *flexVolumeUnmounter) TearDown() error {
path := f.GetPath()
return f.TearDownAt(path)
}
// TearDownAt simply deletes everything in the directory.
func (f *flexVolumeUnmounter) TearDownAt(dir string) error {
if pathExists, pathErr := util.PathExists(dir); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir)
return nil
}
notmnt, err := f.mounter.IsLikelyNotMountPoint(dir)
if err != nil {
glog.Errorf("Error checking mount point %s, error: %v", dir, err)
return err
}
if notmnt {
return os.Remove(dir)
}
device, refCount, err := mount.GetDeviceNameFromMount(f.mounter, dir)
if err != nil {
glog.Errorf("Failed to get reference count for volume: %s", dir)
return err
}
if err := f.manager.unmount(f, dir); err != nil {
if !isCmdNotSupportedErr(err) {
glog.Errorf("Failed to unmount volume %s", f.volName)
return err
}
// Unmount not supported by the driver. Use core unmount logic.
if err := f.mounter.Unmount(dir); err != nil {
glog.Errorf("Failed to unmount volume: %s, error: %v", dir, err)
return err
}
}
if refCount == 1 {
if err := f.manager.detach(f, device); err != nil {
if !isCmdNotSupportedErr(err) {
glog.Errorf("Failed to teardown volume: %s, error: %v", dir, err)
return err
}
// Teardown not supported by driver. Unmount is good enough.
}
}
notmnt, err = f.mounter.IsLikelyNotMountPoint(dir)
if err != nil {
glog.Errorf("Error checking mount point %s, error: %v", dir, err)
return err
}
if notmnt {
return os.Remove(dir)
}
return nil
}
func getVolumeSource(spec *volume.Spec) (*v1.FlexVolumeSource, bool, error) {
if spec.Volume != nil && spec.Volume.FlexVolume != nil {
return spec.Volume.FlexVolume, spec.Volume.FlexVolume.ReadOnly, nil
} else if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.FlexVolume != nil {
return spec.PersistentVolume.Spec.FlexVolume, spec.ReadOnly, nil
}
return nil, false, fmt.Errorf("Spec does not reference a Flex volume type")
}

View File

@ -18,19 +18,14 @@ package flexvolume
import (
"bytes"
"encoding/base64"
"fmt"
"os"
"path"
"testing"
"text/template"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
@ -55,21 +50,22 @@ elif [ "$1" == "detach" -a $# -eq 2 ]; then
"status": "Success"
}'
exit 0
elif [ "$1" == "mount" -a $# -eq 4 ]; then
elif [ "$1" == "getvolumename" -a $# -eq 4 ]; then
echo -n '{
"status": "Not supported"
"status": "Success",
"volume": "fakevolume"
}'
exit 0
elif [ "$1" == "unmount" -a $# -eq 2 ]; then
elif [ "$1" == "isattached" -a $# -eq 2 ]; then
echo -n '{
"status": "Not supported"
"status": "Success",
"attached": true
}'
exit 0
fi
echo -n '{
"status": "Failure",
"reason": "Invalid usage"
"status": "Not supported"
}'
exit 1
@ -85,14 +81,10 @@ if [ "$1" == "init" -a $# -eq 1 ]; then
exit 0
fi
if [ "$1" == "attach" -a $# -eq 2 ]; then
if [ "$1" == "getvolumename" -a $# -eq 2 ]; then
echo -n '{
"status": "Not supported"
}'
exit 0
elif [ "$1" == "detach" -a $# -eq 2 ]; then
echo -n '{
"status": "Not supported"
"status": "Success",
"volumeName": "fakevolume"
}'
exit 0
elif [ "$1" == "mount" -a $# -eq 4 ]; then
@ -126,8 +118,7 @@ elif [ "$1" == "unmount" -a $# -eq 2 ]; then
fi
echo -n '{
"status": "Failure",
"reason": "Invalid usage"
"status": "Not Supported"
}'
exit 1
@ -230,189 +221,3 @@ func contains(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAcc
}
return false
}
func doTestPluginAttachDetach(t *testing.T, spec *volume.Spec, tmpDir string) {
plugMgr := volume.VolumePluginMgr{}
installPluginUnderTest(t, "kubernetes.io", "fakeAttacher", tmpDir, execScriptTempl1, nil)
plugMgr.InitPlugins(ProbeVolumePlugins(tmpDir), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
plugin, err := plugMgr.FindPluginByName("kubernetes.io/fakeAttacher")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
fake := &mount.FakeMounter{}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}}
secretMap := make(map[string]string)
secretMap["flexsecret"] = base64.StdEncoding.EncodeToString([]byte("foo"))
mounter, err := plugin.(*flexVolumePlugin).newMounterInternal(spec, pod, &flexVolumeUtil{}, fake, exec.New(), secretMap)
volumePath := mounter.GetPath()
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Errorf("Got a nil Mounter")
}
path := mounter.GetPath()
expectedPath := fmt.Sprintf("%s/pods/poduid/volumes/kubernetes.io~fakeAttacher/vol1", tmpDir)
if path != expectedPath {
t.Errorf("Unexpected path, expected %q, got: %q", expectedPath, path)
}
if err := mounter.SetUp(nil); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(volumePath); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", volumePath)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
t.Logf("Setup successful")
if mounter.(*flexVolumeMounter).readOnly {
t.Errorf("The volume source should not be read-only and it is.")
}
if len(fake.Log) != 1 {
t.Errorf("Mount was not called exactly one time. It was called %d times.", len(fake.Log))
} else {
if fake.Log[0].Action != mount.FakeActionMount {
t.Errorf("Unexpected mounter action: %#v", fake.Log[0])
}
}
fake.ResetLog()
unmounter, err := plugin.(*flexVolumePlugin).newUnmounterInternal("vol1", types.UID("poduid"), &flexVolumeUtil{}, fake, exec.New())
if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err)
}
if unmounter == nil {
t.Errorf("Got a nil Unmounter")
}
if err := unmounter.TearDown(); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(volumePath); err == nil {
t.Errorf("TearDown() failed, volume path still exists: %s", volumePath)
} else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err)
}
if len(fake.Log) != 1 {
t.Errorf("Unmount was not called exactly one time. It was called %d times.", len(fake.Log))
} else {
if fake.Log[0].Action != mount.FakeActionUnmount {
t.Errorf("Unexpected mounter action: %#v", fake.Log[0])
}
}
fake.ResetLog()
}
func doTestPluginMountUnmount(t *testing.T, spec *volume.Spec, tmpDir string) {
tmpDir, err := utiltesting.MkTmpdir("flexvolume_test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
installPluginUnderTest(t, "kubernetes.io", "fakeMounter", tmpDir, execScriptTempl2, nil)
plugMgr.InitPlugins(ProbeVolumePlugins(tmpDir), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
plugin, err := plugMgr.FindPluginByName("kubernetes.io/fakeMounter")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
fake := &mount.FakeMounter{}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}}
// Use nil secret to test for nil secret case.
mounter, err := plugin.(*flexVolumePlugin).newMounterInternal(spec, pod, &flexVolumeUtil{}, fake, exec.New(), nil)
volumePath := mounter.GetPath()
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Errorf("Got a nil Mounter")
}
path := mounter.GetPath()
expectedPath := fmt.Sprintf("%s/pods/poduid/volumes/kubernetes.io~fakeMounter/vol1", tmpDir)
if path != expectedPath {
t.Errorf("Unexpected path, expected %q, got: %q", expectedPath, path)
}
if err := mounter.SetUp(nil); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(volumePath); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", volumePath)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
t.Logf("Setup successful")
if mounter.(*flexVolumeMounter).readOnly {
t.Errorf("The volume source should not be read-only and it is.")
}
unmounter, err := plugin.(*flexVolumePlugin).newUnmounterInternal("vol1", types.UID("poduid"), &flexVolumeUtil{}, fake, exec.New())
if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err)
}
if unmounter == nil {
t.Errorf("Got a nil Unmounter")
}
if err := unmounter.TearDown(); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(volumePath); err == nil {
t.Errorf("TearDown() failed, volume path still exists: %s", volumePath)
} else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err)
}
}
func TestPluginVolumeAttacher(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("flexvolume_test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
vol := &v1.Volume{
Name: "vol1",
VolumeSource: v1.VolumeSource{FlexVolume: &v1.FlexVolumeSource{Driver: "kubernetes.io/fakeAttacher", ReadOnly: false}},
}
doTestPluginAttachDetach(t, volume.NewSpecFromVolume(vol), tmpDir)
}
func TestPluginVolumeMounter(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("flexvolume_test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
vol := &v1.Volume{
Name: "vol1",
VolumeSource: v1.VolumeSource{FlexVolume: &v1.FlexVolumeSource{Driver: "kubernetes.io/fakeMounter", ReadOnly: false}},
}
doTestPluginMountUnmount(t, volume.NewSpecFromVolume(vol), tmpDir)
}
func TestPluginPersistentVolume(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("flexvolume_test")
if err != nil {
t.Fatalf("error creating temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
vol := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "vol1",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
FlexVolume: &v1.FlexVolumeSource{Driver: "kubernetes.io/fakeAttacher", ReadOnly: false},
},
},
}
doTestPluginAttachDetach(t, volume.NewSpecFromPersistentVolume(vol, false), tmpDir)
}

View File

@ -1,221 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"encoding/json"
"errors"
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/exec"
)
const (
initCmd = "init"
attachCmd = "attach"
detachCmd = "detach"
mountCmd = "mount"
unmountCmd = "unmount"
optionFSType = "kubernetes.io/fsType"
optionReadWrite = "kubernetes.io/readwrite"
optionKeySecret = "kubernetes.io/secret"
)
const (
// StatusSuccess represents the successful completion of command.
StatusSuccess = "Success"
// StatusFailed represents that the command failed.
StatusFailure = "Failed"
// StatusNotSupported represents that the command is not supported.
StatusNotSupported = "Not supported"
)
// FlexVolumeDriverStatus represents the return value of the driver callout.
type FlexVolumeDriverStatus struct {
// Status of the callout. One of "Success" or "Failure".
Status string
// Message is the reason for failure.
Message string
// Device assigned by the driver.
Device string `json:"device"`
}
// flexVolumeUtil is the utility structure to setup and teardown devices from
// the host.
type flexVolumeUtil struct{}
// isCmdNotSupportedErr checks if the error corresponds to command not supported by
// driver.
func isCmdNotSupportedErr(err error) bool {
if err.Error() == StatusNotSupported {
return true
}
return false
}
// handleCmdResponse processes the command output and returns the appropriate
// error code or message.
func handleCmdResponse(cmd string, output []byte) (*FlexVolumeDriverStatus, error) {
var status FlexVolumeDriverStatus
if err := json.Unmarshal(output, &status); err != nil {
glog.Errorf("Failed to unmarshal output for command: %s, output: %s, error: %s", cmd, output, err.Error())
return nil, err
} else if status.Status == StatusNotSupported {
glog.V(5).Infof("%s command is not supported by the driver", cmd)
return nil, errors.New(status.Status)
} else if status.Status != StatusSuccess {
errMsg := fmt.Sprintf("%s command failed, status: %s, reason: %s", cmd, status.Status, status.Message)
glog.Errorf(errMsg)
return nil, fmt.Errorf("%s", errMsg)
}
return &status, nil
}
// init initializes the plugin.
func (u *flexVolumeUtil) init(plugin *flexVolumePlugin) error {
// call the init script
output, err := exec.New().Command(plugin.getExecutable(), initCmd).CombinedOutput()
if err != nil {
glog.Errorf("Failed to init driver: %s, error: %s", plugin.driverName, err.Error())
_, err := handleCmdResponse(initCmd, output)
return err
}
glog.V(5).Infof("Successfully initialized driver %s", plugin.driverName)
return nil
}
// Attach exposes a volume on the host.
func (u *flexVolumeUtil) attach(f *flexVolumeMounter) (string, error) {
execPath := f.execPath
var options string
if f.options != nil {
out, err := json.Marshal(f.options)
if err != nil {
glog.Errorf("Failed to marshal plugin options, error: %s", err.Error())
return "", err
}
if len(out) != 0 {
options = string(out)
} else {
options = ""
}
}
cmd := f.runner.Command(execPath, attachCmd, options)
output, err := cmd.CombinedOutput()
if err != nil {
glog.Errorf("Failed to attach volume %s, output: %s, error: %s", f.volName, output, err.Error())
_, err := handleCmdResponse(attachCmd, output)
return "", err
}
status, err := handleCmdResponse(attachCmd, output)
if err != nil {
return "", err
}
glog.Infof("Successfully attached volume %s on device: %s", f.volName, status.Device)
return status.Device, nil
}
// Detach detaches a volume from the host.
func (u *flexVolumeUtil) detach(f *flexVolumeUnmounter, mntDevice string) error {
execPath := f.execPath
// Executable provider command.
cmd := f.runner.Command(execPath, detachCmd, mntDevice)
output, err := cmd.CombinedOutput()
if err != nil {
glog.Errorf("Failed to detach volume %s, output: %s, error: %s", f.volName, output, err.Error())
_, err := handleCmdResponse(detachCmd, output)
return err
}
_, err = handleCmdResponse(detachCmd, output)
if err != nil {
return err
}
glog.Infof("Successfully detached volume %s on device: %s", f.volName, mntDevice)
return nil
}
// Mount mounts the volume on the host.
func (u *flexVolumeUtil) mount(f *flexVolumeMounter, mntDevice, dir string) error {
execPath := f.execPath
var options string
if f.options != nil {
out, err := json.Marshal(f.options)
if err != nil {
glog.Errorf("Failed to marshal plugin options, error: %s", err.Error())
return err
}
if len(out) != 0 {
options = string(out)
} else {
options = ""
}
}
// Executable provider command.
cmd := f.runner.Command(execPath, mountCmd, dir, mntDevice, options)
output, err := cmd.CombinedOutput()
if err != nil {
glog.Errorf("Failed to mount volume %s, output: %s, error: %s", f.volName, output, err.Error())
_, err := handleCmdResponse(mountCmd, output)
return err
}
_, err = handleCmdResponse(mountCmd, output)
if err != nil {
return err
}
glog.Infof("Successfully mounted volume %s on dir: %s", f.volName, dir)
return nil
}
// Unmount unmounts the volume on the host.
func (u *flexVolumeUtil) unmount(f *flexVolumeUnmounter, dir string) error {
execPath := f.execPath
// Executable provider command.
cmd := f.runner.Command(execPath, unmountCmd, dir)
output, err := cmd.CombinedOutput()
if err != nil {
glog.Errorf("Failed to unmount volume %s, output: %s, error: %s", f.volName, output, err.Error())
_, err := handleCmdResponse(unmountCmd, output)
return err
}
_, err = handleCmdResponse(unmountCmd, output)
if err != nil {
return err
}
glog.Infof("Successfully unmounted volume %s on dir: %s", f.volName, dir)
return nil
}

View File

@ -0,0 +1,61 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/volume"
)
type mounterDefaults flexVolumeMounter
// SetUpAt is part of the volume.Mounter interface.
// This implementation relies on the attacher's device mount path and does a bind mount to dir.
func (f *mounterDefaults) SetUpAt(dir string, fsGroup *int64) error {
glog.Warning(logPrefix(f.plugin), "using default SetUpAt to ", dir)
a, err := f.plugin.NewAttacher()
if err != nil {
return fmt.Errorf("NewAttacher failed: %v", err)
}
src, err := a.GetDeviceMountPath(f.spec)
if err != nil {
return fmt.Errorf("GetDeviceMountPath failed: %v", err)
}
if err := doMount(f.mounter, src, dir, "auto", []string{"bind"}); err != nil {
return err
}
return nil
}
// Returns the default volume attributes.
func (f *mounterDefaults) GetAttributes() volume.Attributes {
glog.V(5).Infof(logPrefix(f.plugin), "using default GetAttributes")
return volume.Attributes{
ReadOnly: f.readOnly,
Managed: !f.readOnly,
SupportsSELinux: true,
}
}
func (f *mounterDefaults) CanMount() error {
return nil
}

View File

@ -0,0 +1,104 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"strconv"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
)
// FlexVolumeMounter is the disk that will be exposed by this plugin.
type flexVolumeMounter struct {
*flexVolume
// Runner used to setup the volume.
runner exec.Interface
// blockDeviceMounter provides the interface to create filesystem if the
// filesystem doesn't exist.
blockDeviceMounter mount.Interface
// the considered volume spec
spec *volume.Spec
readOnly bool
volume.MetricsNil
}
var _ volume.Mounter = &flexVolumeMounter{}
// Mounter interface
// SetUp creates new directory.
func (f *flexVolumeMounter) SetUp(fsGroup *int64) error {
return f.SetUpAt(f.GetPath(), fsGroup)
}
// SetUpAt creates new directory.
func (f *flexVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
// Mount only once.
alreadyMounted, err := prepareForMount(f.mounter, dir)
if err != nil {
return err
}
if alreadyMounted {
return nil
}
call := f.plugin.NewDriverCall(mountCmd)
// Interface parameters
call.Append(dir)
extraOptions := make(map[string]string)
// Extract secret and pass it as options.
if err := addSecretsToOptions(extraOptions, f.spec, f.podNamespace, f.driverName, f.plugin.host); err != nil {
return err
}
// Implicit parameters
if fsGroup != nil {
extraOptions[optionFSGroup] = strconv.FormatInt(*fsGroup, 10)
}
call.AppendSpec(f.spec, f.plugin.host, extraOptions)
_, err = call.Run()
if isCmdNotSupportedErr(err) {
err = (*mounterDefaults)(f).SetUpAt(dir, fsGroup)
}
if err != nil {
return err
}
if !f.readOnly {
volume.SetVolumeOwnership(f, fsGroup)
}
return nil
}
// GetAttributes get the flex volume attributes. The attributes will be queried
// using plugin callout after we finalize the callout syntax.
func (f *flexVolumeMounter) GetAttributes() volume.Attributes {
return (*mounterDefaults)(f).GetAttributes()
}
func (f *flexVolumeMounter) CanMount() error {
return nil
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"testing"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/mount"
)
func TestSetUpAt(t *testing.T) {
spec := fakeVolumeSpec()
pod := &v1.Pod{}
mounter := &mount.FakeMounter{}
plugin, rootDir := testPlugin()
plugin.unsupportedCommands = []string{"unsupportedCmd"}
plugin.runner = fakeRunner(
// first call without fsGroup
assertDriverCall(t, successOutput(), mountCmd, rootDir+"/mount-dir",
specJson(plugin, spec, nil)),
// second test has fsGroup
assertDriverCall(t, notSupportedOutput(), mountCmd, rootDir+"/mount-dir",
specJson(plugin, spec, map[string]string{
optionFSGroup: "42",
})),
assertDriverCall(t, fakeVolumeNameOutput("sdx"), getVolumeNameCmd,
specJson(plugin, spec, nil)),
)
m, _ := plugin.newMounterInternal(spec, pod, mounter, plugin.runner)
m.SetUpAt(rootDir+"/mount-dir", nil)
fsGroup := int64(42)
m.SetUpAt(rootDir+"/mount-dir", &fsGroup)
}

View File

@ -0,0 +1,34 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/volume"
)
type pluginDefaults flexVolumePlugin
func logPrefix(plugin *flexVolumePlugin) string {
return "flexVolume driver " + plugin.driverName + ": "
}
func (plugin *pluginDefaults) GetVolumeName(spec *volume.Spec) (string, error) {
glog.Warning(logPrefix((*flexVolumePlugin)(plugin)), "using default GetVolumeName for volume ", spec.Name)
return spec.Name(), nil
}

View File

@ -0,0 +1,191 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"path"
"strings"
"sync"
"k8s.io/apimachinery/pkg/types"
api "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/mount"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
)
const flexVolumePluginName = "kubernetes.io/flexvolume"
// FlexVolumePlugin object.
type flexVolumePlugin struct {
driverName string
execPath string
host volume.VolumeHost
runner exec.Interface
sync.Mutex
unsupportedCommands []string
}
var _ volume.AttachableVolumePlugin = &flexVolumePlugin{}
var _ volume.PersistentVolumePlugin = &flexVolumePlugin{}
// Init is part of the volume.VolumePlugin interface.
func (plugin *flexVolumePlugin) Init(host volume.VolumeHost) error {
plugin.host = host
// call the init script
call := plugin.NewDriverCall(initCmd)
_, err := call.Run()
return err
}
func (plugin *flexVolumePlugin) getExecutable() string {
parts := strings.Split(plugin.driverName, "/")
execName := parts[len(parts)-1]
return path.Join(plugin.execPath, execName)
}
// Name is part of the volume.VolumePlugin interface.
func (plugin *flexVolumePlugin) GetPluginName() string {
return plugin.driverName
}
// GetVolumeName is part of the volume.VolumePlugin interface.
func (plugin *flexVolumePlugin) GetVolumeName(spec *volume.Spec) (string, error) {
call := plugin.NewDriverCall(getVolumeNameCmd)
call.AppendSpec(spec, plugin.host, nil)
status, err := call.Run()
if isCmdNotSupportedErr(err) {
return (*pluginDefaults)(plugin).GetVolumeName(spec)
} else if err != nil {
return "", err
}
return utilstrings.EscapeQualifiedNameForDisk(status.VolumeName), nil
}
// CanSupport is part of the volume.VolumePlugin interface.
func (plugin *flexVolumePlugin) CanSupport(spec *volume.Spec) bool {
source, _ := getVolumeSource(spec)
return (source != nil) && (source.Driver == plugin.driverName)
}
// RequiresRemount is part of the volume.VolumePlugin interface.
func (plugin *flexVolumePlugin) RequiresRemount() bool {
return false
}
// GetAccessModes gets the allowed access modes for this plugin.
func (plugin *flexVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMode {
return []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
}
}
// NewMounter is part of the volume.VolumePlugin interface.
func (plugin *flexVolumePlugin) NewMounter(spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod, plugin.host.GetMounter(), plugin.runner)
}
// newMounterInternal is the internal mounter routine to build the volume.
func (plugin *flexVolumePlugin) newMounterInternal(spec *volume.Spec, pod *api.Pod, mounter mount.Interface, runner exec.Interface) (volume.Mounter, error) {
source, readOnly := getVolumeSource(spec)
return &flexVolumeMounter{
flexVolume: &flexVolume{
driverName: source.Driver,
execPath: plugin.getExecutable(),
mounter: mounter,
plugin: plugin,
podUID: pod.UID,
podNamespace: pod.Namespace,
volName: spec.Name(),
},
runner: runner,
spec: spec,
readOnly: readOnly,
blockDeviceMounter: &mount.SafeFormatAndMount{Interface: mounter, Runner: runner},
}, nil
}
// NewUnmounter is part of the volume.VolumePlugin interface.
func (plugin *flexVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
return plugin.newUnmounterInternal(volName, podUID, plugin.host.GetMounter(), plugin.runner)
}
// newUnmounterInternal is the internal unmounter routine to clean the volume.
func (plugin *flexVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, mounter mount.Interface, runner exec.Interface) (volume.Unmounter, error) {
return &flexVolumeUnmounter{
flexVolume: &flexVolume{
driverName: plugin.driverName,
execPath: plugin.getExecutable(),
mounter: mounter,
plugin: plugin,
podUID: podUID,
volName: volName,
},
runner: runner,
}, nil
}
// NewAttacher is part of the volume.AttachableVolumePlugin interface.
func (plugin *flexVolumePlugin) NewAttacher() (volume.Attacher, error) {
return &flexVolumeAttacher{plugin}, nil
}
// NewDetacher is part of the volume.AttachableVolumePlugin interface.
func (plugin *flexVolumePlugin) NewDetacher() (volume.Detacher, error) {
return &flexVolumeDetacher{plugin}, nil
}
// ConstructVolumeSpec is part of the volume.AttachableVolumePlugin interface.
func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
flexVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
FlexVolume: &api.FlexVolumeSource{
Driver: plugin.driverName,
},
},
}
return volume.NewSpecFromVolume(flexVolume), nil
}
// Mark the given commands as unsupported.
func (plugin *flexVolumePlugin) unsupported(commands ...string) {
plugin.Lock()
defer plugin.Unlock()
plugin.unsupportedCommands = append(plugin.unsupportedCommands, commands...)
}
// Returns true iff the given command is known to be unsupported.
func (plugin *flexVolumePlugin) isUnsupported(command string) bool {
plugin.Lock()
defer plugin.Unlock()
for _, unsupportedCommand := range plugin.unsupportedCommands {
if command == unsupportedCommand {
return true
}
}
return false
}
func (plugin *flexVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter()
return mount.GetMountRefs(mounter, deviceMountPath)
}

View File

@ -0,0 +1,56 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"testing"
"k8s.io/kubernetes/pkg/util/exec"
)
func TestInit(t *testing.T) {
plugin, _ := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, successOutput(), "init"),
)
plugin.Init(plugin.host)
}
func fakeVolumeNameOutput(name string) exec.FakeCombinedOutputAction {
return fakeResultOutput(&DriverStatus{
Status: StatusSuccess,
VolumeName: name,
})
}
func TestGetVolumeName(t *testing.T) {
spec := fakeVolumeSpec()
plugin, _ := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, fakeVolumeNameOutput("/dev/sdx"), getVolumeNameCmd,
specJson(plugin, spec, nil)),
)
name, err := plugin.GetVolumeName(spec)
if err != nil {
t.Errorf("GetVolumeName() failed: %v", err)
}
expectedName := "~dev~sdx"
if name != expectedName {
t.Errorf("GetVolumeName() returned %v instead of %v", name, expectedName)
}
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"io/ioutil"
"path"
"k8s.io/kubernetes/pkg/util/exec"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
)
// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin {
plugins := []volume.VolumePlugin{}
files, _ := ioutil.ReadDir(pluginDir)
for _, f := range files {
// only directories are counted as plugins
// and pluginDir/dirname/dirname should be an executable
// unless dirname contains '~' for escaping namespace
// e.g. dirname = vendor~cifs
// then, executable will be pluginDir/dirname/cifs
if f.IsDir() {
execPath := path.Join(pluginDir, f.Name())
plugins = append(plugins, &flexVolumePlugin{
driverName: utilstrings.UnescapePluginName(f.Name()),
execPath: execPath,
runner: exec.New(),
unsupportedCommands: []string{},
})
}
}
return plugins
}

View File

@ -0,0 +1,29 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/volume/util"
)
type unmounterDefaults flexVolumeUnmounter
func (f *unmounterDefaults) TearDownAt(dir string) error {
glog.Warning(logPrefix(f.plugin), "using default TearDownAt for ", dir)
return util.UnmountPath(dir, f.mounter)
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"fmt"
"os"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
// FlexVolumeUnmounter is the disk that will be cleaned by this plugin.
type flexVolumeUnmounter struct {
*flexVolume
// Runner used to teardown the volume.
runner exec.Interface
volume.MetricsNil
}
var _ volume.Unmounter = &flexVolumeUnmounter{}
// Unmounter interface
func (f *flexVolumeUnmounter) TearDown() error {
path := f.GetPath()
return f.TearDownAt(path)
}
func (f *flexVolumeUnmounter) TearDownAt(dir string) error {
if pathExists, pathErr := util.PathExists(dir); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir)
return nil
}
notmnt, err := isNotMounted(f.mounter, dir)
if err != nil {
return err
}
if notmnt {
glog.Warningf("Warning: Path: %v already unmounted", dir)
} else {
call := f.plugin.NewDriverCall(unmountCmd)
call.Append(dir)
_, err := call.Run()
if isCmdNotSupportedErr(err) {
err = (*unmounterDefaults)(f).TearDownAt(dir)
}
if err != nil {
return err
}
}
// Flexvolume driver may remove the directory. Ignore if it does.
if pathExists, pathErr := util.PathExists(dir); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
return nil
}
return os.Remove(dir)
}

View File

@ -0,0 +1,37 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"testing"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
)
func TestTearDownAt(t *testing.T) {
mounter := &mount.FakeMounter{}
plugin, rootDir := testPlugin()
plugin.runner = fakeRunner(
assertDriverCall(t, notSupportedOutput(), unmountCmd,
rootDir+"/mount-dir"),
)
u, _ := plugin.newUnmounterInternal("volName", types.UID("poduid"), mounter, plugin.runner)
u.TearDownAt(rootDir + "/mount-dir")
}

View File

@ -0,0 +1,101 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"encoding/base64"
"fmt"
"os"
"github.com/golang/glog"
api "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
func addSecretsToOptions(options map[string]string, spec *volume.Spec, namespace string, driverName string, host volume.VolumeHost) error {
fv, _ := getVolumeSource(spec)
if fv.SecretRef == nil {
return nil
}
kubeClient := host.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("Cannot get kube client")
}
secrets, err := util.GetSecretForPV(namespace, fv.SecretRef.Name, driverName, host.GetKubeClient())
if err != nil {
err = fmt.Errorf("Couldn't get secret %v/%v err: %v", namespace, fv.SecretRef.Name, err)
return err
}
for name, data := range secrets {
options[optionKeySecret+"/"+name] = base64.StdEncoding.EncodeToString([]byte(data))
glog.V(1).Infof("found flex volume secret info: %s", name)
}
return nil
}
func getVolumeSource(spec *volume.Spec) (volumeSource *api.FlexVolumeSource, readOnly bool) {
if spec.Volume != nil && spec.Volume.FlexVolume != nil {
volumeSource = spec.Volume.FlexVolume
readOnly = volumeSource.ReadOnly
} else if spec.PersistentVolume != nil {
volumeSource = spec.PersistentVolume.Spec.FlexVolume
readOnly = spec.ReadOnly
}
return
}
func prepareForMount(mounter mount.Interface, deviceMountPath string) (bool, error) {
notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(deviceMountPath, 0750); err != nil {
return false, err
}
notMnt = true
} else {
return false, err
}
}
return !notMnt, nil
}
// Mounts the device at the given path.
// It is expected that prepareForMount has been called before.
func doMount(mounter mount.Interface, devicePath, deviceMountPath, fsType string, options []string) error {
err := mounter.Mount(devicePath, deviceMountPath, fsType, options)
if err != nil {
glog.Errorf("Failed to mount the volume at %s, device: %s, error: %s", deviceMountPath, devicePath, err.Error())
return err
}
return nil
}
func isNotMounted(mounter mount.Interface, deviceMountPath string) (bool, error) {
notmnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
glog.Errorf("Error checking mount point %s, error: %v", deviceMountPath, err)
return false, err
}
return notmnt, nil
}

View File

@ -0,0 +1,48 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
)
type flexVolume struct {
// driverName is the name of the plugin driverName.
driverName string
// Driver executable used to setup the volume.
execPath string
// mounter provides the interface that is used to mount the actual
// block device.
mounter mount.Interface
// podUID is the UID of the pod.
podUID types.UID
// podNamespace is the namespace of the pod.
podNamespace string
// volName is the name of the pod's volume.
volName string
// the underlying plugin
plugin *flexVolumePlugin
}
// volume.Volume interface
func (f *flexVolume) GetPath() string {
name := f.driverName
return f.plugin.host.GetPodVolumeDir(f.podUID, utilstrings.EscapeQualifiedNameForDisk(name), f.volName)
}