Merge pull request #54529 from vladimirvivien/k8s-csi-intree-plugin

Automatic merge from submit-queue (batch tested with PRs 54529, 53765). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Kubernetes CSI - in-tree Plugin Implementation

**What this PR does / why we need it**:
This PR is part of the internal Kubernetes CSI Volume plugin.  It implements the Attach/Detach/Mount/Unmount API.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: https://github.com/kubernetes/features/issues/178

**Special notes for your reviewer**:
- Implements feature https://github.com/kubernetes/features/issues/178
- Designed https://github.com/kubernetes/community/pull/1258

Other CSI Volume plugin PRs
- CSI Persistent Volume Source - https://github.com/kubernetes/kubernetes/pull/55204

**Release note**:
```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2017-11-21 21:38:56 -08:00 committed by GitHub
commit 1bdb61f828
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 5090 additions and 2 deletions

6
Godeps/Godeps.json generated
View File

@ -436,6 +436,10 @@
"Comment": "v0.1.0-62-g8d75e11",
"Rev": "8d75e11374a1928608c906fe745b538483e7aeb2"
},
{
"ImportPath": "github.com/container-storage-interface/spec/lib/go/csi",
"Rev": "ec298903f94e1d6d954de121b28044a2e1fdbf48"
},
{
"ImportPath": "github.com/containernetworking/cni/libcni",
"Comment": "v0.6.0",
@ -2266,6 +2270,7 @@
},
{
"ImportPath": "github.com/pelletier/go-buffruneio",
"Comment": "v0.1.0",
"Rev": "df1e16fde7fc330a0ca68167c23bf7ed6ac31d6d"
},
{
@ -2584,6 +2589,7 @@
},
{
"ImportPath": "github.com/xiang90/probing",
"Comment": "0.0.1",
"Rev": "07dd2e8dfe18522e9c447ba95f2fe95262f63bb2"
},
{

209
Godeps/LICENSES generated
View File

@ -12333,6 +12333,215 @@ SOFTWARE.
================================================================================
================================================================================
= vendor/github.com/container-storage-interface/spec/lib/go/csi licensed under: =
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
= vendor/github.com/container-storage-interface/spec/LICENSE e3fc50a88d0a364313df4b21ef20c29e
================================================================================
================================================================================
= vendor/github.com/containernetworking/cni/libcni licensed under: =

View File

@ -90,6 +90,7 @@ go_library(
"//pkg/volume/azure_dd:go_default_library",
"//pkg/volume/azure_file:go_default_library",
"//pkg/volume/cinder:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/fc:go_default_library",
"//pkg/volume/flexvolume:go_default_library",
"//pkg/volume/flocker:go_default_library",

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/volume/azure_dd"
"k8s.io/kubernetes/pkg/volume/azure_file"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/fc"
"k8s.io/kubernetes/pkg/volume/flexvolume"
"k8s.io/kubernetes/pkg/volume/flocker"
@ -58,6 +59,9 @@ import (
"k8s.io/kubernetes/pkg/volume/storageos"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/vsphere_volume"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// ProbeAttachableVolumePlugins collects all volume plugins for the attach/
@ -79,6 +83,9 @@ func ProbeAttachableVolumePlugins() []volume.VolumePlugin {
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
}
return allPlugins
}
@ -105,6 +112,9 @@ func ProbeExpandableVolumePlugins(config componentconfig.VolumeConfiguration) []
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
}
return allPlugins
}

View File

@ -77,6 +77,7 @@ go_library(
"//pkg/volume/cephfs:go_default_library",
"//pkg/volume/cinder:go_default_library",
"//pkg/volume/configmap:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/downwardapi:go_default_library",
"//pkg/volume/empty_dir:go_default_library",
"//pkg/volume/fc:go_default_library",

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/volume/cephfs"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/configmap"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/downwardapi"
"k8s.io/kubernetes/pkg/volume/empty_dir"
"k8s.io/kubernetes/pkg/volume/fc"
@ -58,6 +59,9 @@ import (
"k8s.io/kubernetes/pkg/volume/vsphere_volume"
// Cloud providers
_ "k8s.io/kubernetes/pkg/cloudprovider/providers"
// features check
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// ProbeVolumePlugins collects all volume plugins into an easy to use list.
@ -96,6 +100,9 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
}
return allPlugins
}

View File

@ -1,9 +1,10 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"csi_attacher.go",
"csi_client.go",
"csi_mounter.go",
"csi_plugin.go",
],
@ -13,8 +14,45 @@ go_library(
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1alpha1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"csi_attacher_test.go",
"csi_client_test.go",
"csi_mounter_test.go",
"csi_plugin_test.go",
],
importpath = "k8s.io/kubernetes/pkg/volume/csi",
library = ":go_default_library",
deps = [
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/fake:go_default_library",
"//pkg/volume/testing:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1alpha1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],
)
@ -27,7 +65,10 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/volume/csi/fake:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

4
pkg/volume/csi/OWNERS Normal file
View File

@ -0,0 +1,4 @@
approvers:
- jsafrane
- saad-ali
- vladimirvivien

View File

@ -0,0 +1,268 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"crypto/sha256"
"errors"
"fmt"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/volume"
)
type csiAttacher struct {
plugin *csiPlugin
k8s kubernetes.Interface
waitSleepTime time.Duration
}
// volume.Attacher methods
var _ volume.Attacher = &csiAttacher{}
func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
if spec == nil {
glog.Error(log("attacher.Attach missing volume.Spec"))
return "", errors.New("missing spec")
}
csiSource, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("attacher.Attach failed to get CSI persistent source: %v", err))
return "", errors.New("missing CSI persistent volume")
}
pvName := spec.PersistentVolume.GetName()
attachID := getAttachmentName(csiSource.VolumeHandle, string(nodeName))
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: string(nodeName),
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{Attached: false},
}
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
alreadyExist := false
if err != nil {
if !apierrs.IsAlreadyExists(err) {
glog.Error(log("attacher.Attach failed: %v", err))
return "", err
}
alreadyExist = true
}
if alreadyExist {
glog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attach.GetName(), csiSource.VolumeHandle))
} else {
glog.V(4).Info(log("attachment [%v] for volume [%v] created successfully, will start probing for updates", attach.GetName(), csiSource.VolumeHandle))
}
// probe for attachment update here
// NOTE: any error from waiting for attachment is logged only. This is because
// the primariy intent of the enclosing method is to create VolumeAttachment.
// DONOT return that error here as it is mitigated in attacher.WaitForAttach.
if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {
glog.Error(log("attacher.Attach encountered error during attachment probing: %v", err))
}
return attachID, nil
}
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, attachID string, pod *v1.Pod, timeout time.Duration) (string, error) {
source, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
return "", err
}
return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
}
func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
ticker := time.NewTicker(c.waitSleepTime)
defer ticker.Stop()
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
//TODO (vladimirvivien) instead of polling api-server, change to a api-server watch
for {
select {
case <-ticker.C:
glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
glog.Error(log("attacher.WaitForAttach failed (will continue to try): %v", err))
continue
}
// if being deleted, fail fast
if attach.GetDeletionTimestamp() != nil {
glog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachID))
return "", errors.New("volume attachment is being deleted")
}
// attachment OK
if attach.Status.Attached {
return attachID, nil
}
// driver reports attach error
attachErr := attach.Status.AttachError
if attachErr != nil {
glog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
return "", errors.New(attachErr.Message)
}
case <-timer.C:
glog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle)
}
}
}
func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
glog.V(4).Info(log("probing attachment status for %d volumes ", len(specs)))
attached := make(map[*volume.Spec]bool)
for _, spec := range specs {
if spec == nil {
glog.Error(log("attacher.VolumesAreAttached missing volume.Spec"))
return nil, errors.New("missing spec")
}
source, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("attacher.VolumesAreAttached failed: %v", err))
continue
}
attachID := getAttachmentName(source.VolumeHandle, string(nodeName))
glog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
glog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
continue
}
attached[spec] = attach.Status.Attached
}
return attached, nil
}
func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
glog.V(4).Info(log("attacher.GetDeviceMountPath is not implemented"))
return "", nil
}
func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
glog.V(4).Info(log("attacher.MountDevice is not implemented"))
return nil
}
var _ volume.Detacher = &csiAttacher{}
func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
if volumeName == "" {
glog.Error(log("detacher.Detach missing value for parameter volumeName"))
return errors.New("missing exepected parameter volumeName")
}
parts := strings.Split(volumeName, volNameSep)
if len(parts) != 2 {
glog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
return errors.New("volumeName missing expected data")
}
volID := parts[1]
attachID := getAttachmentName(volID, string(nodeName))
err := c.k8s.StorageV1alpha1().VolumeAttachments().Delete(attachID, nil)
if err != nil {
glog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
return err
}
glog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
return c.waitForVolumeDetachment(volID, attachID)
}
func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error {
glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
ticker := time.NewTicker(c.waitSleepTime)
defer ticker.Stop()
timeout := c.waitSleepTime * 10
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
//TODO (vladimirvivien) instead of polling api-server, change to a api-server watch
for {
select {
case <-ticker.C:
glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
if apierrs.IsNotFound(err) {
//object deleted or never existed, done
glog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
return nil
}
glog.Error(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
continue
}
// driver reports attach error
detachErr := attach.Status.DetachError
if detachErr != nil {
glog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
return errors.New(detachErr.Message)
}
case <-timer.C:
glog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return fmt.Errorf("detachment timed out for volume %v", volumeHandle)
}
}
}
func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
glog.V(4).Info(log("detacher.UnmountDevice is not implemented"))
return nil
}
func hashAttachmentName(volName, nodeName string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", volName, nodeName)))
return fmt.Sprintf("%x", result)
}
func getAttachmentName(volName, nodeName string) string {
// TODO consider using a different prefix for attachment
return fmt.Sprintf("pv-%s", hashAttachmentName(volName, nodeName))
}

View File

@ -0,0 +1,277 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"crypto/sha256"
"fmt"
"os"
"testing"
"time"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
)
func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttachment {
return &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: nodeName,
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{
Attached: false,
AttachError: nil,
DetachError: nil,
},
}
}
func TestAttacherAttach(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
testCases := []struct {
name string
pv *v1.PersistentVolume
nodeName string
attachHash [32]byte
shouldFail bool
}{
{
name: "test ok 1",
pv: makeTestPV("test-pv-001", 10, testDriver, "test-vol-1"),
nodeName: "test-node",
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-1", "test-node"))),
},
{
name: "test ok 2",
pv: makeTestPV("test-pv-002", 10, testDriver, "test-vol-002"),
nodeName: "test-node",
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-002", "test-node"))),
},
{
name: "missing spec",
pv: nil,
nodeName: "test-node",
attachHash: sha256.Sum256([]byte(fmt.Sprintf("%s%s", "test-vol-3", "test-node"))),
shouldFail: true,
},
}
for _, tc := range testCases {
var spec *volume.Spec
if tc.pv != nil {
spec = volume.NewSpecFromPersistentVolume(tc.pv, tc.pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
}
attachID, err := csiAttacher.Attach(spec, types.NodeName(tc.nodeName))
if tc.shouldFail && err == nil {
t.Error("expected failure, but got nil err")
}
if attachID != "" {
expectedID := fmt.Sprintf("pv-%x", tc.attachHash)
if attachID != expectedID {
t.Errorf("expecting attachID %v, got %v", expectedID, attachID)
}
}
}
}
func TestAttacherWaitForVolumeAttachment(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
testCases := []struct {
name string
attached bool
attachErr *storage.VolumeError
sleepTime time.Duration
timeout time.Duration
shouldFail bool
}{
{name: "attach ok", attached: true, sleepTime: 10 * time.Millisecond, timeout: 50 * time.Millisecond},
{name: "attachment error", attachErr: &storage.VolumeError{Message: "missing volume"}, sleepTime: 10 * time.Millisecond, timeout: 30 * time.Millisecond},
{name: "time ran out", attached: false, sleepTime: 5 * time.Millisecond},
}
for i, tc := range testCases {
t.Logf("running test: %v", tc.name)
pvName := fmt.Sprintf("test-pv-%d", i)
attachID := fmt.Sprintf("pv-%s", hashAttachmentName(pvName, nodeName))
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = tc.attached
attachment.Status.AttachError = tc.attachErr
csiAttacher.waitSleepTime = tc.sleepTime
go func() {
_, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
}()
retID, err := csiAttacher.waitForVolumeAttachment("test-vol", attachID, tc.timeout)
if tc.shouldFail && err == nil {
t.Error("expecting failure, but err is nil")
}
if tc.attachErr != nil {
if tc.attachErr.Message != err.Error() {
t.Errorf("expecting error [%v], got [%v]", tc.attachErr.Message, err.Error())
}
}
if err == nil && retID != attachID {
t.Errorf("attacher.WaitForAttach not returning attachment ID")
}
}
}
func TestAttacherVolumesAreAttached(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
testCases := []struct {
name string
attachedStats map[string]bool
}{
{"attach + detach", map[string]bool{"vol-01": true, "vol-02": true, "vol-03": false, "vol-04": false, "vol-05": true}},
{"all detached", map[string]bool{"vol-11": false, "vol-12": false, "vol-13": false, "vol-14": false, "vol-15": false}},
{"all attached", map[string]bool{"vol-21": true, "vol-22": true, "vol-23": true, "vol-24": true, "vol-25": true}},
}
for _, tc := range testCases {
var specs []*volume.Spec
// create and save volume attchments
for volName, stat := range tc.attachedStats {
pv := makeTestPV("test-pv", 10, testDriver, volName)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
specs = append(specs, spec)
attachID := getAttachmentName(volName, nodeName)
attachment := makeTestAttachment(attachID, nodeName, pv.GetName())
attachment.Status.Attached = stat
_, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
}
// retrieve attached status
stats, err := csiAttacher.VolumesAreAttached(specs, types.NodeName(nodeName))
if err != nil {
t.Fatal(err)
}
if len(tc.attachedStats) != len(stats) {
t.Errorf("expecting %d attachment status, got %d", len(tc.attachedStats), len(stats))
}
// compare attachment status for each spec
for spec, stat := range stats {
source, err := getCSISourceFromSpec(spec)
if err != nil {
t.Error(err)
}
if stat != tc.attachedStats[source.VolumeHandle] {
t.Errorf("expecting volume attachment %t, got %t", tc.attachedStats[source.VolumeHandle], stat)
}
}
}
}
func TestAttacherDetach(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
testCases := []struct {
name string
volID string
attachID string
shouldFail bool
}{
{name: "normal test", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-001", nodeName))},
{name: "normal test 2", volID: "vol-002", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName))},
{name: "object not found", volID: "vol-001", attachID: fmt.Sprintf("pv-%s", hashAttachmentName("vol-002", nodeName)), shouldFail: true},
}
for _, tc := range testCases {
pv := makeTestPV("test-pv", 10, testDriver, tc.volID)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
attachment := makeTestAttachment(tc.attachID, nodeName, "test-pv")
_, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
volumeName, err := plug.GetVolumeName(spec)
if err != nil {
t.Errorf("test case %s failed: %v", tc.name, err)
}
err = csiAttacher.Detach(volumeName, types.NodeName(nodeName))
if tc.shouldFail && err == nil {
t.Fatal("expecting failure, but err = nil")
}
if !tc.shouldFail && err != nil {
t.Fatalf("unexpected err: %v", err)
}
attach, err := csiAttacher.k8s.StorageV1alpha1().VolumeAttachments().Get(tc.attachID, meta.GetOptions{})
if err != nil {
if !apierrs.IsNotFound(err) {
t.Fatalf("unexpected err: %v", err)
}
} else {
if attach == nil {
t.Errorf("expecting attachment not to be nil, but it is")
}
}
}
}

View File

@ -0,0 +1,233 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"bytes"
"errors"
"fmt"
"net"
"time"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
grpctx "golang.org/x/net/context"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
)
type csiClient interface {
AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error
NodePublishVolume(
ctx grpctx.Context,
volumeid string,
readOnly bool,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
fsType string,
) error
NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error
}
// csiClient encapsulates all csi-plugin methods
type csiDriverClient struct {
network string
addr string
conn *grpc.ClientConn
idClient csipb.IdentityClient
nodeClient csipb.NodeClient
ctrlClient csipb.ControllerClient
versionAsserted bool
versionSupported bool
publishAsserted bool
publishCapable bool
}
func newCsiDriverClient(network, addr string) *csiDriverClient {
return &csiDriverClient{network: network, addr: addr}
}
// assertConnection ensures a valid connection has been established
// if not, it creates a new connection and associated clients
func (c *csiDriverClient) assertConnection() error {
if c.conn == nil {
conn, err := grpc.Dial(
c.addr,
grpc.WithInsecure(),
grpc.WithDialer(func(target string, timeout time.Duration) (net.Conn, error) {
return net.Dial(c.network, target)
}),
)
if err != nil {
return err
}
c.conn = conn
c.idClient = csipb.NewIdentityClient(conn)
c.nodeClient = csipb.NewNodeClient(conn)
c.ctrlClient = csipb.NewControllerClient(conn)
// set supported version
}
return nil
}
// AssertSupportedVersion ensures driver supports specified spec version.
// If version is not supported, the assertion fails with an error.
// This test should be done early during the storage operation flow to avoid
// unnecessary calls later.
func (c *csiDriverClient) AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error {
if c.versionAsserted {
if !c.versionSupported {
return fmt.Errorf("version %s not supported", verToStr(ver))
}
return nil
}
if err := c.assertConnection(); err != nil {
c.versionAsserted = false
return err
}
glog.V(4).Info(log("asserting version supported by driver"))
rsp, err := c.idClient.GetSupportedVersions(ctx, &csipb.GetSupportedVersionsRequest{})
if err != nil {
c.versionAsserted = false
return err
}
supported := false
vers := rsp.GetSupportedVersions()
glog.V(4).Info(log("driver reports %d versions supported: %s", len(vers), versToStr(vers)))
for _, v := range vers {
//TODO (vladimirvivien) use more lenient/heuristic for exact or match of ranges etc
if verToStr(v) == verToStr(ver) {
supported = true
break
}
}
c.versionAsserted = true
c.versionSupported = supported
if !supported {
return fmt.Errorf("version %s not supported", verToStr(ver))
}
glog.V(4).Info(log("version %s supported", verToStr(ver)))
return nil
}
func (c *csiDriverClient) NodePublishVolume(
ctx grpctx.Context,
volID string,
readOnly bool,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
fsType string,
) error {
if volID == "" {
return errors.New("missing volume id")
}
if targetPath == "" {
return errors.New("missing target path")
}
if err := c.assertConnection(); err != nil {
glog.Errorf("%v: failed to assert a connection: %v", csiPluginName, err)
return err
}
req := &csipb.NodePublishVolumeRequest{
Version: csiVersion,
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishVolumeInfo: volumeInfo,
VolumeCapability: &csipb.VolumeCapability{
AccessMode: &csipb.VolumeCapability_AccessMode{
Mode: asCSIAccessMode(accessMode),
},
AccessType: &csipb.VolumeCapability_Mount{
Mount: &csipb.VolumeCapability_MountVolume{
FsType: fsType,
},
},
},
}
_, err := c.nodeClient.NodePublishVolume(ctx, req)
return err
}
func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error {
if volID == "" {
return errors.New("missing volume id")
}
if targetPath == "" {
return errors.New("missing target path")
}
if err := c.assertConnection(); err != nil {
glog.Error(log("failed to assert a connection: %v", err))
return err
}
req := &csipb.NodeUnpublishVolumeRequest{
Version: csiVersion,
VolumeId: volID,
TargetPath: targetPath,
}
_, err := c.nodeClient.NodeUnpublishVolume(ctx, req)
return err
}
func asCSIAccessMode(am api.PersistentVolumeAccessMode) csipb.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
return csipb.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.ReadOnlyMany:
return csipb.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER
case api.ReadWriteMany:
return csipb.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
return csipb.VolumeCapability_AccessMode_UNKNOWN
}
func verToStr(ver *csipb.Version) string {
if ver == nil {
return ""
}
return fmt.Sprintf("%d.%d.%d", ver.GetMajor(), ver.GetMinor(), ver.GetPatch())
}
func versToStr(vers []*csipb.Version) string {
if vers == nil {
return ""
}
str := bytes.NewBufferString("[")
for _, v := range vers {
str.WriteString(fmt.Sprintf("{%s};", verToStr(v)))
}
str.WriteString("]")
return str.String()
}

View File

@ -0,0 +1,126 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"errors"
"testing"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
grpctx "golang.org/x/net/context"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)
func setupClient(t *testing.T) *csiDriverClient {
client := newCsiDriverClient("unix", "/tmp/test.sock")
client.conn = new(grpc.ClientConn) //avoids creating conn object
// setup mock grpc clients
client.idClient = fake.NewIdentityClient()
client.nodeClient = fake.NewNodeClient()
client.ctrlClient = fake.NewControllerClient()
return client
}
func TestClientAssertSupportedVersion(t *testing.T) {
testCases := []struct {
testName string
ver *csipb.Version
mustFail bool
err error
}{
{testName: "supported version", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}},
{testName: "unsupported version", ver: &csipb.Version{Major: 0, Minor: 0, Patch: 0}, mustFail: true},
{testName: "grpc error", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}, mustFail: true, err: errors.New("grpc error")},
}
for _, tc := range testCases {
t.Log("case: ", tc.testName)
client := setupClient(t)
client.idClient.(*fake.IdentityClient).SetNextError(tc.err)
err := client.AssertSupportedVersion(grpctx.Background(), tc.ver)
if tc.mustFail && err == nil {
t.Error("must fail, but err = nil")
}
}
}
func TestClientNodePublishVolume(t *testing.T) {
testCases := []struct {
name string
volID string
targetPath string
fsType string
mustFail bool
err error
}{
{name: "test ok", volID: "vol-test", targetPath: "/test/path"},
{name: "missing volID", targetPath: "/test/path", mustFail: true},
{name: "missing target path", volID: "vol-test", mustFail: true},
{name: "bad fs", volID: "vol-test", targetPath: "/test/path", fsType: "badfs", mustFail: true},
{name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
}
client := setupClient(t)
for _, tc := range testCases {
t.Log("case: ", tc.name)
client.nodeClient.(*fake.NodeClient).SetNextError(tc.err)
err := client.NodePublishVolume(
grpctx.Background(),
tc.volID,
false,
tc.targetPath,
api.ReadWriteOnce,
map[string]string{"device": "/dev/null"},
tc.fsType,
)
if tc.mustFail && err == nil {
t.Error("must fail, but err is nil: ", err)
}
}
}
func TestClientNodeUnpublishVolume(t *testing.T) {
testCases := []struct {
name string
volID string
targetPath string
mustFail bool
err error
}{
{name: "test ok", volID: "vol-test", targetPath: "/test/path"},
{name: "missing volID", targetPath: "/test/path", mustFail: true},
{name: "missing target path", volID: "vol-test", mustFail: true},
{name: "grpc error", volID: "vol-test", targetPath: "/test/path", mustFail: true, err: errors.New("grpc error")},
}
client := setupClient(t)
for _, tc := range testCases {
t.Log("case: ", tc.name)
client.nodeClient.(*fake.NodeClient).SetNextError(tc.err)
err := client.NodeUnpublishVolume(grpctx.Background(), tc.volID, tc.targetPath)
if tc.mustFail && err == nil {
t.Error("must fail, but err is nil: ", err)
}
}
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"errors"
"fmt"
"path"
"github.com/golang/glog"
grpctx "golang.org/x/net/context"
api "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
)
type csiMountMgr struct {
k8s kubernetes.Interface
csiClient csiClient
plugin *csiPlugin
driverName string
volumeID string
readOnly bool
spec *volume.Spec
pod *api.Pod
podUID types.UID
options volume.VolumeOptions
volumeInfo map[string]string
volume.MetricsNil
}
// volume.Volume methods
var _ volume.Volume = &csiMountMgr{}
func (c *csiMountMgr) GetPath() string {
return getTargetPath(c.podUID, c.driverName, c.volumeID, c.plugin.host)
}
func getTargetPath(uid types.UID, driverName string, volID string, host volume.VolumeHost) string {
// driverName validated at Mounter creation
// sanitize (replace / with ~) in volumeID before it's appended to path:w
driverPath := fmt.Sprintf("%s/%s", driverName, kstrings.EscapeQualifiedNameForDisk(volID))
return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), driverPath)
}
// volume.Mounter methods
var _ volume.Mounter = &csiMountMgr{}
func (c *csiMountMgr) CanMount() error {
//TODO (vladimirvivien) use this method to probe controller using CSI.NodeProbe() call
// to ensure Node service is ready in the CSI plugin
return nil
}
func (c *csiMountMgr) SetUp(fsGroup *int64) error {
return c.SetUpAt(c.GetPath(), fsGroup)
}
func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout)
defer cancel()
csi := c.csiClient
pvName := c.spec.PersistentVolume.GetName()
// ensure version is supported
if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil {
glog.Errorf(log("failed to assert version: %v", err))
return err
}
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
if c.volumeInfo == nil {
//TODO (vladimirvivien) consider using VolumesAttachments().Get() to retrieve
//the object directly. This requires the ability to reconstruct the ID using volumeName+nodeName (nodename may not be avilable)
attachList, err := c.k8s.StorageV1alpha1().VolumeAttachments().List(meta.ListOptions{})
if err != nil {
glog.Error(log("failed to get volume attachments: %v", err))
return err
}
var attachment *storage.VolumeAttachment
for _, attach := range attachList.Items {
if attach.Spec.Source.PersistentVolumeName != nil &&
*attach.Spec.Source.PersistentVolumeName == pvName {
attachment = &attach
break
}
}
if attachment == nil {
glog.Error(log("unable to find VolumeAttachment with PV.name = %s", pvName))
return errors.New("no existing VolumeAttachment found")
}
c.volumeInfo = attachment.Status.AttachmentMetadata
}
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := api.ReadWriteOnce
if c.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
}
err := csi.NodePublishVolume(
ctx,
c.volumeID,
c.readOnly,
dir,
accessMode,
c.volumeInfo,
"ext4", //TODO needs to be sourced from PV or somewhere else
)
if err != nil {
glog.Errorf(log("Mounter.Setup failed: %v", err))
return err
}
glog.V(4).Infof(log("successfully mounted %s", dir))
return nil
}
func (c *csiMountMgr) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: c.readOnly,
Managed: !c.readOnly,
SupportsSELinux: false,
}
}
// volume.Unmounter methods
var _ volume.Unmounter = &csiMountMgr{}
func (c *csiMountMgr) TearDown() error {
return c.TearDownAt(c.GetPath())
}
func (c *csiMountMgr) TearDownAt(dir string) error {
glog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
// extract driverName and volID from path
base, volID := path.Split(dir)
volID = kstrings.UnescapeQualifiedNameForDisk(volID)
driverName := path.Base(base)
if c.csiClient == nil {
addr := fmt.Sprintf(csiAddrTemplate, driverName)
client := newCsiDriverClient("unix", addr)
glog.V(4).Infof(log("unmounter csiClient setup [volume=%v,driver=%v]", volID, driverName))
c.csiClient = client
}
ctx, cancel := grpctx.WithTimeout(grpctx.Background(), csiTimeout)
defer cancel()
csi := c.csiClient
// TODO make all assertion calls private within the client itself
if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil {
glog.Errorf(log("failed to assert version: %v", err))
return err
}
err := csi.NodeUnpublishVolume(ctx, volID, dir)
if err != nil {
glog.Errorf(log("Mounter.Setup failed: %v", err))
return err
}
glog.V(4).Infof(log("successfully unmounted %s", dir))
return nil
}

View File

@ -0,0 +1,152 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"fmt"
"os"
"path"
"testing"
api "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1alpha1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)
var (
testDriver = "test-driver"
testVol = "vol-123"
testns = "test-ns"
testPodUID = types.UID("test-pod")
)
func TestMounterGetPath(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
csiMounter := mounter.(*csiMountMgr)
expectedPath := path.Join(tmpDir, fmt.Sprintf(
"pods/%s/volumes/kubernetes.io~csi/%s/%s",
testPodUID,
csiMounter.driverName,
csiMounter.volumeID,
))
mountPath := csiMounter.GetPath()
if mountPath != expectedPath {
t.Errorf("Got unexpected path: %s", mountPath)
}
}
func TestMounterSetUp(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
pvName := pv.GetName()
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t)
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: "pv-1234556775313",
},
Spec: storage.VolumeAttachmentSpec{
NodeName: "test-node",
Attacher: csiPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
Status: storage.VolumeAttachmentStatus{
Attached: false,
AttachError: nil,
DetachError: nil,
},
}
_, err = csiMounter.k8s.StorageV1alpha1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
// Mounter.SetUp()
if err := csiMounter.SetUp(nil); err != nil {
t.Fatalf("mounter.Setup failed: %v", err)
}
// ensure call went all the way
pubs := csiMounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes()
if pubs[csiMounter.volumeID] != csiMounter.GetPath() {
t.Error("csi server may not have received NodePublishVolume call")
}
}
func TestUnmounterTeardown(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
unmounter, err := plug.NewUnmounter(pv.ObjectMeta.Name, testPodUID)
if err != nil {
t.Fatalf("failed to make a new Unmounter: %v", err)
}
csiUnmounter := unmounter.(*csiMountMgr)
csiUnmounter.csiClient = setupClient(t)
dir := csiUnmounter.GetPath()
err = csiUnmounter.TearDownAt(dir)
if err != nil {
t.Fatal(err)
}
// ensure csi client call
pubs := csiUnmounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes()
if _, ok := pubs[csiUnmounter.volumeID]; ok {
t.Error("csi server may not have received NodeUnpublishVolume call")
}
}

View File

@ -0,0 +1,257 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"errors"
"fmt"
"path"
"regexp"
"time"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
)
const (
csiName = "csi"
csiPluginName = "kubernetes.io/csi"
// TODO (vladimirvivien) implement a more dynamic way to discover
// the unix domain socket path for each installed csi driver.
// TODO (vladimirvivien) would be nice to name socket with a .sock extension
// for consistency.
csiAddrTemplate = "/var/lib/kubelet/plugins/%v"
csiTimeout = 15 * time.Second
volNameSep = "^"
)
var (
// csiVersion supported csi version
csiVersion = &csipb.Version{Major: 0, Minor: 1, Patch: 0}
driverNameRexp = regexp.MustCompile(`^[A-Za-z]+(\.?-?_?[A-Za-z0-9-])+$`)
)
type csiPlugin struct {
host volume.VolumeHost
}
// ProbeVolumePlugins returns implemented plugins
func ProbeVolumePlugins() []volume.VolumePlugin {
p := &csiPlugin{
host: nil,
}
return []volume.VolumePlugin{p}
}
// volume.VolumePlugin methods
var _ volume.VolumePlugin = &csiPlugin{}
func (p *csiPlugin) Init(host volume.VolumeHost) error {
glog.Info(log("plugin initializing..."))
p.host = host
return nil
}
func (p *csiPlugin) GetPluginName() string {
return csiPluginName
}
// GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
// That string value is used in Detach() to extract driver name and volumeName.
func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
csi, err := getCSISourceFromSpec(spec)
if err != nil {
glog.Error(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err))
return "", err
}
//TODO (vladimirvivien) this validation should be done at the API validation check
if !isDriverNameValid(csi.Driver) {
glog.Error(log("plugin.GetVolumeName failed to create volume name: invalid csi driver name %s", csi.Driver))
return "", errors.New("invalid csi driver name")
}
// return driverName<separator>volumeHandle
return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil
}
func (p *csiPlugin) CanSupport(spec *volume.Spec) bool {
// TODO (vladimirvivien) CanSupport should also take into account
// the availability/registration of specified Driver in the volume source
return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil
}
func (p *csiPlugin) RequiresRemount() bool {
return false
}
func (p *csiPlugin) NewMounter(
spec *volume.Spec,
pod *api.Pod,
_ volume.VolumeOptions) (volume.Mounter, error) {
pvSource, err := getCSISourceFromSpec(spec)
if err != nil {
return nil, err
}
// TODO (vladimirvivien) consider moving this check in API validation
// check Driver name to conform to CSI spec
if !isDriverNameValid(pvSource.Driver) {
glog.Error(log("driver name does not conform to CSI spec: %s", pvSource.Driver))
return nil, errors.New("driver name is invalid")
}
// before it is used in any paths such as socket etc
addr := fmt.Sprintf(csiAddrTemplate, pvSource.Driver)
glog.V(4).Infof(log("setting up mounter for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
client := newCsiDriverClient("unix", addr)
k8s := p.host.GetKubeClient()
if k8s == nil {
glog.Error(log("failed to get a kubernetes client"))
return nil, errors.New("failed to get a Kubernetes client")
}
mounter := &csiMountMgr{
plugin: p,
k8s: k8s,
spec: spec,
pod: pod,
podUID: pod.UID,
driverName: pvSource.Driver,
volumeID: pvSource.VolumeHandle,
csiClient: client,
}
return mounter, nil
}
func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) {
glog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID))
unmounter := &csiMountMgr{
plugin: p,
podUID: podUID,
}
return unmounter, nil
}
func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
glog.V(4).Infof(log("constructing volume spec [pv.Name=%v, path=%v]", volumeName, mountPath))
// extract driverName/volumeId from end of mountPath
dir, volID := path.Split(mountPath)
volID = kstrings.UnescapeQualifiedNameForDisk(volID)
driverName := path.Base(dir)
// TODO (vladimirvivien) consider moving this check in API validation
if !isDriverNameValid(driverName) {
glog.Error(log("failed while reconstructing volume spec csi: driver name extracted from path is invalid: [path=%s; driverName=%s]", mountPath, driverName))
return nil, errors.New("invalid csi driver name from path")
}
glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [volumeID=%s; driverName=%s]", volID, driverName))
pv := &api.PersistentVolume{
ObjectMeta: meta.ObjectMeta{
Name: volumeName,
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeSource: api.PersistentVolumeSource{
CSI: &api.CSIPersistentVolumeSource{
Driver: driverName,
VolumeHandle: volID,
},
},
},
}
return volume.NewSpecFromPersistentVolume(pv, false), nil
}
func (p *csiPlugin) SupportsMountOption() bool {
// TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags
// to probe for the result for this method:w
return false
}
func (p *csiPlugin) SupportsBulkVolumeVerification() bool {
return false
}
// volume.AttachableVolumePlugin methods
var _ volume.AttachableVolumePlugin = &csiPlugin{}
func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
k8s := p.host.GetKubeClient()
if k8s == nil {
glog.Error(log("unable to get kubernetes client from host"))
return nil, errors.New("unable to get Kubernetes client")
}
return &csiAttacher{
plugin: p,
k8s: k8s,
waitSleepTime: 1 * time.Second,
}, nil
}
func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
k8s := p.host.GetKubeClient()
if k8s == nil {
glog.Error(log("unable to get kubernetes client from host"))
return nil, errors.New("unable to get Kubernetes client")
}
return &csiAttacher{
plugin: p,
k8s: k8s,
waitSleepTime: 1 * time.Second,
}, nil
}
func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
m := p.host.GetMounter(p.GetPluginName())
return mount.GetMountRefs(m, deviceMountPath)
}
func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) {
if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.CSI != nil {
return spec.PersistentVolume.Spec.CSI, nil
}
return nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
}
// log prepends log string with `kubernetes.io/csi`
func log(msg string, parts ...interface{}) string {
return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...)
}
// isDriverNameValid validates the driverName using CSI spec
func isDriverNameValid(name string) bool {
if len(name) == 0 || len(name) > 63 {
return false
}
return driverNameRexp.MatchString(name)
}

View File

@ -0,0 +1,297 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"fmt"
"os"
"testing"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
fakeclient "k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
// create a plugin mgr to load plugins and setup a fake client
func newTestPlugin(t *testing.T) (*csiPlugin, string) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHost(
tmpDir,
fakeClient,
nil,
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
plug, err := plugMgr.FindPluginByName(csiPluginName)
if err != nil {
t.Fatalf("can't find plugin %v", csiPluginName)
}
csiPlug, ok := plug.(*csiPlugin)
if !ok {
t.Fatalf("cannot assert plugin to be type csiPlugin")
}
return csiPlug, tmpDir
}
func makeTestPV(name string, sizeGig int, driverName, volID string) *api.PersistentVolume {
return &api.PersistentVolume{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: testns,
},
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse(
fmt.Sprintf("%dGi", sizeGig),
),
},
PersistentVolumeSource: api.PersistentVolumeSource{
CSI: &api.CSIPersistentVolumeSource{
Driver: driverName,
VolumeHandle: volID,
ReadOnly: false,
},
},
},
}
}
func TestPluginGetPluginName(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
if plug.GetPluginName() != "kubernetes.io/csi" {
t.Errorf("unexpected plugin name %v", plug.GetPluginName())
}
}
func TestPluginGetVolumeName(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
driverName string
volName string
shouldFail bool
}{
{"alphanum names", "testdr", "testvol", false},
{"mixchar driver", "test.dr.cc", "testvol", false},
{"mixchar volume", "testdr", "test-vol-name", false},
{"mixchars all", "test-driver", "test.vol.name", false},
}
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
pv := makeTestPV("test-pv", 10, tc.driverName, tc.volName)
spec := volume.NewSpecFromPersistentVolume(pv, false)
name, err := plug.GetVolumeName(spec)
if tc.shouldFail && err == nil {
t.Fatal("GetVolumeName should fail, but got err=nil")
}
if name != fmt.Sprintf("%s%s%s", tc.driverName, volNameSep, tc.volName) {
t.Errorf("unexpected volume name %s", name)
}
}
}
func TestPluginCanSupport(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
spec := volume.NewSpecFromPersistentVolume(pv, false)
if !plug.CanSupport(spec) {
t.Errorf("should support CSI spec")
}
}
func TestPluginConstructVolumeSpec(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
driverName string
volID string
shouldFail bool
}{
{"valid driver and vol", "test.csi-driver", "abc-cde", false},
{"valid driver + vol with slash", "test.csi-driver", "a/b/c/d", false},
{"invalid driver name", "_test.csi.driver>", "a/b/c/d", true},
}
for _, tc := range testCases {
dir := getTargetPath(testPodUID, tc.driverName, tc.volID, plug.host)
// rebuild spec
spec, err := plug.ConstructVolumeSpec("test-pv", dir)
if tc.shouldFail {
if err == nil {
t.Fatal("expecting ConstructVolumeSpec to fail, but got nil error")
}
continue
}
volID := spec.PersistentVolume.Spec.CSI.VolumeHandle
unsanitizedVolID := kstrings.UnescapeQualifiedNameForDisk(tc.volID)
if volID != unsanitizedVolID {
t.Errorf("expected unsanitized volID %s, got volID %s", unsanitizedVolID, volID)
}
if spec.Name() != "test-pv" {
t.Errorf("Unexpected spec name %s", spec.Name())
}
}
}
func TestPluginNewMounter(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
mounter, err := plug.NewMounter(
volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly),
&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}},
volume.VolumeOptions{},
)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
// validate mounter fields
if csiMounter.driverName != testDriver {
t.Error("mounter driver name not set")
}
if csiMounter.volumeID != testVol {
t.Error("mounter volume id not set")
}
if csiMounter.pod == nil {
t.Error("mounter pod not set")
}
if csiMounter.podUID == types.UID("") {
t.Error("mounter podUID mot set")
}
}
func TestPluginNewUnmounter(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
unmounter, err := plug.NewUnmounter(pv.ObjectMeta.Name, testPodUID)
csiUnmounter := unmounter.(*csiMountMgr)
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if csiUnmounter == nil {
t.Fatal("failed to create CSI mounter")
}
if csiUnmounter.podUID != testPodUID {
t.Error("podUID not set")
}
}
func TestValidateDriverName(t *testing.T) {
testCases := []struct {
name string
driverName string
valid bool
}{
{"ok no punctuations", "comgooglestoragecsigcepd", true},
{"ok dot only", "io.kubernetes.storage.csi.flex", true},
{"ok dash only", "io-kubernetes-storage-csi-flex", true},
{"ok underscore only", "io_kubernetes_storage_csi_flex", true},
{"ok dot underscores", "io.kubernetes.storage_csi.flex", true},
{"ok dot dash underscores", "io.kubernetes-storage.csi_flex", true},
{"invalid length 0", "", false},
{"invalid length > 63", "comgooglestoragecsigcepdcomgooglestoragecsigcepdcomgooglestoragecsigcepdcomgooglestoragecsigcepd", false},
{"invalid start char", "_comgooglestoragecsigcepd", false},
{"invalid end char", "comgooglestoragecsigcepd/", false},
{"invalid separators", "com/google/storage/csi~gcepd", false},
}
for _, tc := range testCases {
t.Logf("test case: %v", tc.name)
drValid := isDriverNameValid(tc.driverName)
if tc.valid != drValid {
t.Errorf("expecting driverName %s as valid=%t, but got valid=%t", tc.driverName, tc.valid, drValid)
}
}
}
func TestPluginNewAttacher(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
if csiAttacher.plugin == nil {
t.Error("plugin not set for attacher")
}
if csiAttacher.k8s == nil {
t.Error("Kubernetes client not set for attacher")
}
}
func TestPluginNewDetacher(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
detacher, err := plug.NewDetacher()
if err != nil {
t.Fatalf("failed to create new detacher: %v", err)
}
csiDetacher := detacher.(*csiAttacher)
if csiDetacher.plugin == nil {
t.Error("plugin not set for detacher")
}
if csiDetacher.k8s == nil {
t.Error("Kubernetes client not set for attacher")
}
}

27
pkg/volume/csi/fake/BUILD Normal file
View File

@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["fake_client.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/fake",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,224 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
"context"
"errors"
"strings"
"google.golang.org/grpc"
csipb "github.com/container-storage-interface/spec/lib/go/csi"
grpctx "golang.org/x/net/context"
)
// IdentityClient is a CSI identity client used for testing
type IdentityClient struct {
nextErr error
}
// NewIdentityClient returns a new IdentityClient
func NewIdentityClient() *IdentityClient {
return &IdentityClient{}
}
// SetNextError injects expected error
func (f *IdentityClient) SetNextError(err error) {
f.nextErr = err
}
// GetSupportedVersions returns supported version
func (f *IdentityClient) GetSupportedVersions(ctx grpctx.Context, req *csipb.GetSupportedVersionsRequest, opts ...grpc.CallOption) (*csipb.GetSupportedVersionsResponse, error) {
// short circuit with an error
if f.nextErr != nil {
return nil, f.nextErr
}
rsp := &csipb.GetSupportedVersionsResponse{
SupportedVersions: []*csipb.Version{
{Major: 0, Minor: 0, Patch: 1},
{Major: 0, Minor: 1, Patch: 0},
{Major: 1, Minor: 0, Patch: 0},
{Major: 1, Minor: 0, Patch: 1},
{Major: 1, Minor: 1, Patch: 1},
},
}
return rsp, nil
}
// GetPluginInfo returns plugin info
func (f *IdentityClient) GetPluginInfo(ctx context.Context, in *csipb.GetPluginInfoRequest, opts ...grpc.CallOption) (*csipb.GetPluginInfoResponse, error) {
return nil, nil
}
// NodeClient returns CSI node client
type NodeClient struct {
nodePublishedVolumes map[string]string
nextErr error
}
// NewNodeClient returns fake node client
func NewNodeClient() *NodeClient {
return &NodeClient{nodePublishedVolumes: make(map[string]string)}
}
// SetNextError injects next expected error
func (f *NodeClient) SetNextError(err error) {
f.nextErr = err
}
// GetNodePublishedVolumes returns node published volumes
func (f *NodeClient) GetNodePublishedVolumes() map[string]string {
return f.nodePublishedVolumes
}
// NodePublishVolume implements CSI NodePublishVolume
func (f *NodeClient) NodePublishVolume(ctx grpctx.Context, req *csipb.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodePublishVolumeResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if req.GetVolumeId() == "" {
return nil, errors.New("missing volume id")
}
if req.GetTargetPath() == "" {
return nil, errors.New("missing target path")
}
fsTypes := "ext4|xfs|zfs"
fsType := req.GetVolumeCapability().GetMount().GetFsType()
if !strings.Contains(fsTypes, fsType) {
return nil, errors.New("invlid fstype")
}
f.nodePublishedVolumes[req.GetVolumeId()] = req.GetTargetPath()
return &csipb.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume implements csi method
func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if req.GetVolumeId() == "" {
return nil, errors.New("missing volume id")
}
if req.GetTargetPath() == "" {
return nil, errors.New("missing target path")
}
delete(f.nodePublishedVolumes, req.GetVolumeId())
return &csipb.NodeUnpublishVolumeResponse{}, nil
}
// GetNodeID implements method
func (f *NodeClient) GetNodeID(ctx context.Context, in *csipb.GetNodeIDRequest, opts ...grpc.CallOption) (*csipb.GetNodeIDResponse, error) {
return nil, nil
}
// NodeProbe implements csi method
func (f *NodeClient) NodeProbe(ctx context.Context, in *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) {
return nil, nil
}
// NodeGetCapabilities implements csi method
func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) {
return nil, nil
}
// ControllerClient represents a CSI Controller client
type ControllerClient struct {
nextCapabilities []*csipb.ControllerServiceCapability
nextErr error
}
// NewControllerClient returns a ControllerClient
func NewControllerClient() *ControllerClient {
return &ControllerClient{}
}
// SetNextError injects next expected error
func (f *ControllerClient) SetNextError(err error) {
f.nextErr = err
}
// SetNextCapabilities injects next expected capabilities
func (f *ControllerClient) SetNextCapabilities(caps []*csipb.ControllerServiceCapability) {
f.nextCapabilities = caps
}
// ControllerGetCapabilities implements csi method
func (f *ControllerClient) ControllerGetCapabilities(ctx context.Context, in *csipb.ControllerGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ControllerGetCapabilitiesResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if f.nextCapabilities == nil {
f.nextCapabilities = []*csipb.ControllerServiceCapability{
{
Type: &csipb.ControllerServiceCapability_Rpc{
Rpc: &csipb.ControllerServiceCapability_RPC{
Type: csipb.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
},
},
},
}
}
return &csipb.ControllerGetCapabilitiesResponse{
Capabilities: f.nextCapabilities,
}, nil
}
// CreateVolume implements csi method
func (f *ControllerClient) CreateVolume(ctx context.Context, in *csipb.CreateVolumeRequest, opts ...grpc.CallOption) (*csipb.CreateVolumeResponse, error) {
return nil, nil
}
// DeleteVolume implements csi method
func (f *ControllerClient) DeleteVolume(ctx context.Context, in *csipb.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipb.DeleteVolumeResponse, error) {
return nil, nil
}
// ControllerPublishVolume implements csi method
func (f *ControllerClient) ControllerPublishVolume(ctx context.Context, in *csipb.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerPublishVolumeResponse, error) {
return nil, nil
}
// ControllerUnpublishVolume implements csi method
func (f *ControllerClient) ControllerUnpublishVolume(ctx context.Context, in *csipb.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerUnpublishVolumeResponse, error) {
return nil, nil
}
// ValidateVolumeCapabilities implements csi method
func (f *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *csipb.ValidateVolumeCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ValidateVolumeCapabilitiesResponse, error) {
return nil, nil
}
// ListVolumes implements csi method
func (f *ControllerClient) ListVolumes(ctx context.Context, in *csipb.ListVolumesRequest, opts ...grpc.CallOption) (*csipb.ListVolumesResponse, error) {
return nil, nil
}
// GetCapacity implements csi method
func (f *ControllerClient) GetCapacity(ctx context.Context, in *csipb.GetCapacityRequest, opts ...grpc.CallOption) (*csipb.GetCapacityResponse, error) {
return nil, nil
}
// ControllerProbe implements csi method
func (f *ControllerClient) ControllerProbe(ctx context.Context, in *csipb.ControllerProbeRequest, opts ...grpc.CallOption) (*csipb.ControllerProbeResponse, error) {
return nil, nil
}

1
vendor/BUILD vendored
View File

@ -61,6 +61,7 @@ filegroup(
"//vendor/github.com/clusterhq/flocker-go:all-srcs",
"//vendor/github.com/codedellemc/goscaleio:all-srcs",
"//vendor/github.com/codegangsta/negroni:all-srcs",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:all-srcs",
"//vendor/github.com/containernetworking/cni/libcni:all-srcs",
"//vendor/github.com/containernetworking/cni/pkg/invoke:all-srcs",
"//vendor/github.com/containernetworking/cni/pkg/types:all-srcs",

View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["csi.pb.go"],
importpath = "github.com/container-storage-interface/spec/lib/go/csi",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/golang/protobuf/proto:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

File diff suppressed because it is too large Load Diff