k8s csi code change

pull/6/head
Serguei Bezverkhi 2018-02-23 16:50:43 -05:00
parent 8b09d4e8b6
commit a6ca466859
10 changed files with 195 additions and 83 deletions

2
Godeps/Godeps.json generated
View File

@ -454,7 +454,7 @@
{
"ImportPath": "github.com/container-storage-interface/spec/lib/go/csi",
"Comment": "v0.1.0-5-g7ab01a9",
"Rev": "7ab01a90da87f9fef3ee1de0494962fdefaf7db7"
"Rev": "91c189774c16b0661255943c09ea9d97d5a423e7"
},
{
"ImportPath": "github.com/containerd/console",

View File

@ -30,7 +30,7 @@ func getClaimRefNamespace(pv *api.PersistentVolume) string {
}
// Visitor is called with each object's namespace and name, and returns true if visiting should continue
type Visitor func(namespace, name string) (shouldContinue bool)
type Visitor func(namespace, name string, kubeletVisible bool) (shouldContinue bool)
// VisitPVSecretNames invokes the visitor function with the name of every secret
// referenced by the PV spec. If visitor returns false, visiting is short-circuited.
@ -40,11 +40,11 @@ func VisitPVSecretNames(pv *api.PersistentVolume, visitor Visitor) bool {
switch {
case source.AzureFile != nil:
if source.AzureFile.SecretNamespace != nil && len(*source.AzureFile.SecretNamespace) > 0 {
if len(source.AzureFile.SecretName) > 0 && !visitor(*source.AzureFile.SecretNamespace, source.AzureFile.SecretName) {
if len(source.AzureFile.SecretName) > 0 && !visitor(*source.AzureFile.SecretNamespace, source.AzureFile.SecretName, true /* kubeletVisible */) {
return false
}
} else {
if len(source.AzureFile.SecretName) > 0 && !visitor(getClaimRefNamespace(pv), source.AzureFile.SecretName) {
if len(source.AzureFile.SecretName) > 0 && !visitor(getClaimRefNamespace(pv), source.AzureFile.SecretName, true /* kubeletVisible */) {
return false
}
}
@ -57,7 +57,7 @@ func VisitPVSecretNames(pv *api.PersistentVolume, visitor Visitor) bool {
// use the secret namespace if namespace is set
ns = source.CephFS.SecretRef.Namespace
}
if !visitor(ns, source.CephFS.SecretRef.Name) {
if !visitor(ns, source.CephFS.SecretRef.Name, true /* kubeletVisible */) {
return false
}
}
@ -69,7 +69,7 @@ func VisitPVSecretNames(pv *api.PersistentVolume, visitor Visitor) bool {
// use the secret namespace if namespace is set
ns = source.FlexVolume.SecretRef.Namespace
}
if !visitor(ns, source.FlexVolume.SecretRef.Name) {
if !visitor(ns, source.FlexVolume.SecretRef.Name, true /* kubeletVisible */) {
return false
}
}
@ -81,7 +81,7 @@ func VisitPVSecretNames(pv *api.PersistentVolume, visitor Visitor) bool {
// use the secret namespace if namespace is set
ns = source.RBD.SecretRef.Namespace
}
if !visitor(ns, source.RBD.SecretRef.Name) {
if !visitor(ns, source.RBD.SecretRef.Name, true /* kubeletVisible */) {
return false
}
}
@ -91,7 +91,7 @@ func VisitPVSecretNames(pv *api.PersistentVolume, visitor Visitor) bool {
if source.ScaleIO.SecretRef != nil && len(source.ScaleIO.SecretRef.Namespace) > 0 {
ns = source.ScaleIO.SecretRef.Namespace
}
if !visitor(ns, source.ScaleIO.SecretRef.Name) {
if !visitor(ns, source.ScaleIO.SecretRef.Name, true /* kubeletVisible */) {
return false
}
}
@ -103,14 +103,30 @@ func VisitPVSecretNames(pv *api.PersistentVolume, visitor Visitor) bool {
// use the secret namespace if namespace is set
ns = source.ISCSI.SecretRef.Namespace
}
if !visitor(ns, source.ISCSI.SecretRef.Name) {
if !visitor(ns, source.ISCSI.SecretRef.Name, true /* kubeletVisible */) {
return false
}
}
case source.StorageOS != nil:
if source.StorageOS.SecretRef != nil && !visitor(source.StorageOS.SecretRef.Namespace, source.StorageOS.SecretRef.Name) {
if source.StorageOS.SecretRef != nil && !visitor(source.StorageOS.SecretRef.Namespace, source.StorageOS.SecretRef.Name, true /* kubeletVisible */) {
return false
}
case source.CSI != nil:
if source.CSI.ControllerPublishSecretRef != nil {
if !visitor(source.CSI.ControllerPublishSecretRef.Namespace, source.CSI.ControllerPublishSecretRef.Name, false /* kubeletVisible */) {
return false
}
}
if source.CSI.NodePublishSecretRef != nil {
if !visitor(source.CSI.NodePublishSecretRef.Namespace, source.CSI.NodePublishSecretRef.Name, true /* kubeletVisible */) {
return false
}
}
if source.CSI.NodeStageSecretRef != nil {
if !visitor(source.CSI.NodeStageSecretRef.Namespace, source.CSI.NodeStageSecretRef.Name, true /* kubeletVisible */) {
return false
}
}
}
return true
}

View File

@ -117,11 +117,32 @@ func TestPVSecrets(t *testing.T) {
SecretRef: &api.ObjectReference{
Name: "Spec.PersistentVolumeSource.StorageOS.SecretRef",
Namespace: "storageosns"}}}}},
{Spec: api.PersistentVolumeSpec{
ClaimRef: &api.ObjectReference{Namespace: "claimrefns", Name: "claimrefname"},
PersistentVolumeSource: api.PersistentVolumeSource{
CSI: &api.CSIPersistentVolumeSource{
ControllerPublishSecretRef: &api.SecretReference{
Name: "Spec.PersistentVolumeSource.CSI.ControllerPublishSecretRef",
Namespace: "csi"}}}}},
{Spec: api.PersistentVolumeSpec{
ClaimRef: &api.ObjectReference{Namespace: "claimrefns", Name: "claimrefname"},
PersistentVolumeSource: api.PersistentVolumeSource{
CSI: &api.CSIPersistentVolumeSource{
NodePublishSecretRef: &api.SecretReference{
Name: "Spec.PersistentVolumeSource.CSI.NodePublishSecretRef",
Namespace: "csi"}}}}},
{Spec: api.PersistentVolumeSpec{
ClaimRef: &api.ObjectReference{Namespace: "claimrefns", Name: "claimrefname"},
PersistentVolumeSource: api.PersistentVolumeSource{
CSI: &api.CSIPersistentVolumeSource{
NodeStageSecretRef: &api.SecretReference{
Name: "Spec.PersistentVolumeSource.CSI.NodeStageSecretRef",
Namespace: "csi"}}}}},
}
extractedNames := sets.NewString()
extractedNamesWithNamespace := sets.NewString()
for _, pv := range pvs {
VisitPVSecretNames(pv, func(namespace, name string) bool {
VisitPVSecretNames(pv, func(namespace, name string, kubeletVisible bool) bool {
extractedNames.Insert(name)
extractedNamesWithNamespace.Insert(namespace + "/" + name)
return true
@ -143,6 +164,9 @@ func TestPVSecrets(t *testing.T) {
"Spec.PersistentVolumeSource.ScaleIO.SecretRef",
"Spec.PersistentVolumeSource.ISCSI.SecretRef",
"Spec.PersistentVolumeSource.StorageOS.SecretRef",
"Spec.PersistentVolumeSource.CSI.ControllerPublishSecretRef",
"Spec.PersistentVolumeSource.CSI.NodePublishSecretRef",
"Spec.PersistentVolumeSource.CSI.NodeStageSecretRef",
)
secretPaths := collectSecretPaths(t, nil, "", reflect.TypeOf(&api.PersistentVolume{}))
secretPaths = secretPaths.Difference(excludedSecretPaths)
@ -184,6 +208,10 @@ func TestPVSecrets(t *testing.T) {
"iscsi/Spec.PersistentVolumeSource.ISCSI.SecretRef",
"storageosns/Spec.PersistentVolumeSource.StorageOS.SecretRef",
"csi/Spec.PersistentVolumeSource.CSI.ControllerPublishSecretRef",
"csi/Spec.PersistentVolumeSource.CSI.NodePublishSecretRef",
"csi/Spec.PersistentVolumeSource.CSI.NodeStageSecretRef",
)
if missingNames := expectedNamespacedNames.Difference(extractedNamesWithNamespace); len(missingNames) > 0 {
t.Logf("Missing expected namespaced names:\n%s", strings.Join(missingNames.List(), "\n"))

View File

@ -1452,6 +1452,45 @@ func validateCSIPersistentVolumeSource(csi *core.CSIPersistentVolumeSource, fldP
allErrs = append(allErrs, field.Required(fldPath.Child("volumeHandle"), ""))
}
if csi.ControllerPublishSecretRef != nil {
if len(csi.ControllerPublishSecretRef.Name) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("controllerPublishSecretRef", "name"), ""))
} else {
allErrs = append(allErrs, ValidateDNS1123Label(csi.ControllerPublishSecretRef.Name, fldPath.Child("name"))...)
}
if len(csi.ControllerPublishSecretRef.Namespace) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("controllerPublishSecretRef", "namespace"), ""))
} else {
allErrs = append(allErrs, ValidateDNS1123Label(csi.ControllerPublishSecretRef.Namespace, fldPath.Child("namespace"))...)
}
}
if csi.NodePublishSecretRef != nil {
if len(csi.NodePublishSecretRef.Name) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("nodePublishSecretRef ", "name"), ""))
} else {
allErrs = append(allErrs, ValidateDNS1123Label(csi.NodePublishSecretRef.Name, fldPath.Child("name"))...)
}
if len(csi.NodePublishSecretRef.Namespace) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("nodePublishSecretRef ", "namespace"), ""))
} else {
allErrs = append(allErrs, ValidateDNS1123Label(csi.NodePublishSecretRef.Namespace, fldPath.Child("namespace"))...)
}
}
if csi.NodeStageSecretRef != nil {
if len(csi.NodeStageSecretRef.Name) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("nodeStageSecretRef", "name"), ""))
} else {
allErrs = append(allErrs, ValidateDNS1123Label(csi.NodeStageSecretRef.Name, fldPath.Child("name"))...)
}
if len(csi.NodeStageSecretRef.Namespace) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("nodeStageSecretRef", "namespace"), ""))
} else {
allErrs = append(allErrs, ValidateDNS1123Label(csi.NodeStageSecretRef.Namespace, fldPath.Child("namespace"))...)
}
}
return allErrs
}

View File

@ -32,7 +32,6 @@ import (
type csiClient interface {
AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error
NodeProbe(ctx grpctx.Context, ver *csipb.Version) error
NodePublishVolume(
ctx grpctx.Context,
volumeid string,
@ -41,9 +40,15 @@ type csiClient interface {
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
volumeAttribs map[string]string,
nodePublishCredentials map[string]string,
fsType string,
) error
NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error
NodeUnpublishVolume(
ctx grpctx.Context,
volID string,
targetPath string,
nodeUnpublishCredentials map[string]string,
) error
}
// csiClient encapsulates all csi-plugin methods
@ -146,13 +151,6 @@ func (c *csiDriverClient) AssertSupportedVersion(ctx grpctx.Context, ver *csipb.
return nil
}
func (c *csiDriverClient) NodeProbe(ctx grpctx.Context, ver *csipb.Version) error {
glog.V(4).Info(log("sending NodeProbe rpc call to csi driver: [version %v]", ver))
req := &csipb.NodeProbeRequest{Version: ver}
_, err := c.nodeClient.NodeProbe(ctx, req)
return err
}
func (c *csiDriverClient) NodePublishVolume(
ctx grpctx.Context,
volID string,
@ -161,6 +159,7 @@ func (c *csiDriverClient) NodePublishVolume(
accessMode api.PersistentVolumeAccessMode,
volumeInfo map[string]string,
volumeAttribs map[string]string,
nodePublishCredentials map[string]string,
fsType string,
) error {
glog.V(4).Info(log("calling NodePublishVolume rpc [volid=%s,target_path=%s]", volID, targetPath))
@ -176,13 +175,13 @@ func (c *csiDriverClient) NodePublishVolume(
}
req := &csipb.NodePublishVolumeRequest{
Version: csiVersion,
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishInfo: volumeInfo,
VolumeAttributes: volumeAttribs,
Version: csiVersion,
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishInfo: volumeInfo,
VolumeAttributes: volumeAttribs,
NodePublishCredentials: nodePublishCredentials,
VolumeCapability: &csipb.VolumeCapability{
AccessMode: &csipb.VolumeCapability_AccessMode{
Mode: asCSIAccessMode(accessMode),
@ -199,7 +198,7 @@ func (c *csiDriverClient) NodePublishVolume(
return err
}
func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error {
func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string, nodeUnpublishCredentials map[string]string) error {
glog.V(4).Info(log("calling NodeUnpublishVolume rpc: [volid=%s, target_path=%s", volID, targetPath))
if volID == "" {
return errors.New("missing volume id")
@ -213,9 +212,10 @@ func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string,
}
req := &csipb.NodeUnpublishVolumeRequest{
Version: csiVersion,
VolumeId: volID,
TargetPath: targetPath,
Version: csiVersion,
VolumeId: volID,
TargetPath: targetPath,
NodeUnpublishCredentials: nodeUnpublishCredentials,
}
_, err := c.nodeClient.NodeUnpublishVolume(ctx, req)

View File

@ -68,28 +68,6 @@ func TestClientAssertSupportedVersion(t *testing.T) {
}
}
func TestClientNodeProbe(t *testing.T) {
testCases := []struct {
testName string
ver *csipb.Version
mustFail bool
err error
}{
{testName: "supported version", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}},
{testName: "grpc error", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}, mustFail: true, err: errors.New("grpc error")},
}
for _, tc := range testCases {
t.Logf("test case: %s", tc.testName)
client := setupClient(t)
client.nodeClient.(*fake.NodeClient).SetNextError(tc.err)
err := client.NodeProbe(grpctx.Background(), tc.ver)
if tc.mustFail && err == nil {
t.Error("test must fail, but err = nil")
}
}
}
func TestClientNodePublishVolume(t *testing.T) {
testCases := []struct {
name string
@ -119,6 +97,7 @@ func TestClientNodePublishVolume(t *testing.T) {
api.ReadWriteOnce,
map[string]string{"device": "/dev/null"},
map[string]string{"attr0": "val0"},
map[string]string{},
tc.fsType,
)
@ -147,7 +126,8 @@ func TestClientNodeUnpublishVolume(t *testing.T) {
for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
client.nodeClient.(*fake.NodeClient).SetNextError(tc.err)
err := client.NodeUnpublishVolume(grpctx.Background(), tc.volID, tc.targetPath)
nodeUnpublishCredentials := map[string]string{}
err := client.NodeUnpublishVolume(grpctx.Background(), tc.volID, tc.targetPath, nodeUnpublishCredentials)
if tc.mustFail && err == nil {
t.Error("test must fail, but err is nil")
}

View File

@ -87,8 +87,6 @@ func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) s
var _ volume.Mounter = &csiMountMgr{}
func (c *csiMountMgr) CanMount() error {
//TODO (vladimirvivien) use this method to probe controller using CSI.NodeProbe() call
// to ensure Node service is ready in the CSI plugin
return nil
}
@ -129,13 +127,6 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
return err
}
// probe driver
// TODO (vladimirvivien) move probe call where it is done only when it is needed.
if err := csi.NodeProbe(ctx, csiVersion); err != nil {
glog.Error(log("mounter.SetUpAt failed to probe driver: %v", err))
return err
}
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
if c.volumeInfo == nil {
attachment, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
@ -188,7 +179,10 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
if len(fsType) == 0 {
fsType = defaultFSType
}
nodePublishCredentials := map[string]string{}
if csiSource.NodePublishSecretRef != nil {
nodePublishCredentials = getCredentialsFromSecret(c.k8s, csiSource.NodePublishSecretRef)
}
err = csi.NodePublishVolume(
ctx,
c.volumeID,
@ -197,6 +191,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
accessMode,
c.volumeInfo,
attribs,
nodePublishCredentials,
fsType,
)
@ -244,6 +239,12 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
return nil
}
csiSource, err := getCSISourceFromSpec(c.spec)
if err != nil {
glog.Error(log("mounter.TearDownAt failed to get CSI persistent source: %v", err))
return err
}
// load volume info from file
dataDir := path.Dir(dir) // dropoff /mount at end
data, err := loadVolumeData(dataDir, volDataFileName)
@ -273,7 +274,11 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
return err
}
if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil {
nodeUnpublishCredentials := map[string]string{}
if csiSource.NodePublishSecretRef != nil {
nodeUnpublishCredentials = getCredentialsFromSecret(c.k8s, csiSource.NodePublishSecretRef)
}
if err := csi.NodeUnpublishVolume(ctx, volID, dir, nodeUnpublishCredentials); err != nil {
glog.Errorf(log("mounter.TearDownAt failed: %v", err))
return err
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"github.com/golang/glog"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) map[string]string {
credentials := map[string]string{}
secret, err := k8s.CoreV1().Secrets(secretRef.Namespace).Get(secretRef.Name, meta.GetOptions{})
if err != nil {
glog.Warningf("failed to find the secret %s in the namespace %s with error: %v\n", secretRef.Name, secretRef.Namespace, err)
return credentials
}
for key, value := range secret.Data {
credentials[key] = string(value)
}
return credentials
}

View File

@ -67,6 +67,16 @@ func (f *IdentityClient) GetPluginInfo(ctx context.Context, in *csipb.GetPluginI
return nil, nil
}
// GetPluginCapabilities implements csi method
func (f *IdentityClient) GetPluginCapabilities(ctx context.Context, in *csipb.GetPluginCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.GetPluginCapabilitiesResponse, error) {
return nil, nil
}
// Probe implements csi method
func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts ...grpc.CallOption) (*csipb.ProbeResponse, error) {
return nil, nil
}
// NodeClient returns CSI node client
type NodeClient struct {
nodePublishedVolumes map[string]string
@ -110,17 +120,6 @@ func (f *NodeClient) NodePublishVolume(ctx grpctx.Context, req *csipb.NodePublis
return &csipb.NodePublishVolumeResponse{}, nil
}
// NodeProbe implements csi NodeProbe
func (f *NodeClient) NodeProbe(ctx context.Context, req *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if req.Version == nil {
return nil, errors.New("missing version")
}
return &csipb.NodeProbeResponse{}, nil
}
// NodeUnpublishVolume implements csi method
func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) {
if f.nextErr != nil {
@ -147,6 +146,16 @@ func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetC
return nil, nil
}
// NodeStageVolume implements csi method
func (f *NodeClient) NodeStageVolume(ctx context.Context, in *csipb.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeStageVolumeResponse, error) {
return nil, nil
}
// NodeUnstageVolume implements csi method
func (f *NodeClient) NodeUnstageVolume(ctx context.Context, in *csipb.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnstageVolumeResponse, error) {
return nil, nil
}
// ControllerClient represents a CSI Controller client
type ControllerClient struct {
nextCapabilities []*csipb.ControllerServiceCapability
@ -224,8 +233,3 @@ func (f *ControllerClient) ListVolumes(ctx context.Context, in *csipb.ListVolume
func (f *ControllerClient) GetCapacity(ctx context.Context, in *csipb.GetCapacityRequest, opts ...grpc.CallOption) (*csipb.GetCapacityResponse, error) {
return nil, nil
}
// ControllerProbe implements csi method
func (f *ControllerClient) ControllerProbe(ctx context.Context, in *csipb.ControllerProbeRequest, opts ...grpc.CallOption) (*csipb.ControllerProbeResponse, error) {
return nil, nil
}

View File

@ -253,9 +253,11 @@ func (g *Graph) AddPV(pv *api.PersistentVolume) {
// since we don't know the other end of the pvc -> pod -> node chain (or it may not even exist yet), we can't decorate these edges with kubernetes node info
g.graph.SetEdge(simple.Edge{F: pvVertex, T: g.getOrCreateVertex_locked(pvcVertexType, pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)})
pvutil.VisitPVSecretNames(pv, func(namespace, secret string) bool {
pvutil.VisitPVSecretNames(pv, func(namespace, secret string, kubeletVisible bool) bool {
// This grants access to the named secret in the same namespace as the bound PVC
g.graph.SetEdge(simple.Edge{F: g.getOrCreateVertex_locked(secretVertexType, namespace, secret), T: pvVertex})
if kubeletVisible {
g.graph.SetEdge(simple.Edge{F: g.getOrCreateVertex_locked(secretVertexType, namespace, secret), T: pvVertex})
}
return true
})
}