2019-01-12 04:58:27 +00:00
/ *
Copyright 2017 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package csi
import (
"errors"
"fmt"
"os"
"path"
"sort"
"strings"
2019-04-09 17:34:40 +00:00
"sync"
2019-01-12 04:58:27 +00:00
"time"
"context"
"k8s.io/klog"
api "k8s.io/api/core/v1"
2019-04-07 17:07:55 +00:00
apierrs "k8s.io/apimachinery/pkg/api/errors"
2019-01-12 04:58:27 +00:00
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
2019-08-30 18:33:25 +00:00
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2019-01-12 04:58:27 +00:00
utilversion "k8s.io/apimachinery/pkg/util/version"
2019-08-30 18:33:25 +00:00
"k8s.io/apimachinery/pkg/util/wait"
2019-04-07 17:07:55 +00:00
utilfeature "k8s.io/apiserver/pkg/util/feature"
2019-01-12 04:58:27 +00:00
clientset "k8s.io/client-go/kubernetes"
2019-07-14 07:58:54 +00:00
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
2019-08-30 18:33:25 +00:00
csitranslationplugins "k8s.io/csi-translation-lib/plugins"
2019-04-07 17:07:55 +00:00
"k8s.io/kubernetes/pkg/features"
2019-01-12 04:58:27 +00:00
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
)
const (
2019-04-07 17:07:55 +00:00
// CSIPluginName is the name of the in-tree CSI Plugin
CSIPluginName = "kubernetes.io/csi"
2019-01-12 04:58:27 +00:00
// TODO (vladimirvivien) implement a more dynamic way to discover
// the unix domain socket path for each installed csi driver.
// TODO (vladimirvivien) would be nice to name socket with a .sock extension
// for consistency.
csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock"
2019-08-19 16:30:36 +00:00
csiTimeout = 2 * time . Minute
2019-01-12 04:58:27 +00:00
volNameSep = "^"
volDataFileName = "vol_data.json"
fsTypeBlockName = "block"
2019-04-07 17:07:55 +00:00
// TODO: increase to something useful
csiResyncPeriod = time . Minute
2019-01-12 04:58:27 +00:00
)
2019-04-09 17:34:40 +00:00
var (
WaitForValidHostName bool
csiPluginInstance * csiPlugin
csiPluginLock sync . Mutex
)
2019-04-07 17:07:55 +00:00
var deprecatedSocketDirVersions = [ ] string { "0.1.0" , "0.2.0" , "0.3.0" , "0.4.0" }
2019-01-12 04:58:27 +00:00
type csiPlugin struct {
2019-07-14 07:58:54 +00:00
host volume . VolumeHost
blockEnabled bool
csiDriverLister storagelisters . CSIDriverLister
2019-01-12 04:58:27 +00:00
}
2019-04-07 17:07:55 +00:00
//TODO (vladimirvivien) add this type to storage api
type driverMode string
const persistentDriverMode driverMode = "persistent"
const ephemeralDriverMode driverMode = "ephemeral"
2019-01-12 04:58:27 +00:00
// ProbeVolumePlugins returns implemented plugins
func ProbeVolumePlugins ( ) [ ] volume . VolumePlugin {
2019-04-09 17:34:40 +00:00
csiPluginLock . Lock ( )
defer csiPluginLock . Unlock ( )
if csiPluginInstance != nil {
return [ ] volume . VolumePlugin { csiPluginInstance }
}
csiPluginInstance = & csiPlugin {
2019-04-07 17:07:55 +00:00
host : nil ,
blockEnabled : utilfeature . DefaultFeatureGate . Enabled ( features . CSIBlockVolume ) ,
2019-01-12 04:58:27 +00:00
}
2019-04-09 17:34:40 +00:00
return [ ] volume . VolumePlugin { csiPluginInstance }
2019-01-12 04:58:27 +00:00
}
// volume.VolumePlugin methods
var _ volume . VolumePlugin = & csiPlugin { }
// RegistrationHandler is the handler which is fed to the pluginwatcher API.
type RegistrationHandler struct {
}
// TODO (verult) consider using a struct instead of global variables
// csiDrivers map keep track of all registered CSI drivers on the node and their
// corresponding sockets
2019-04-07 17:07:55 +00:00
var csiDrivers = & DriversStore { }
2019-01-12 04:58:27 +00:00
var nim nodeinfomanager . Interface
// PluginHandler is the plugin registration handler interface passed to the
// pluginwatcher module in kubelet
var PluginHandler = & RegistrationHandler { }
// ValidatePlugin is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by CSI Driver registrar side car.
func ( h * RegistrationHandler ) ValidatePlugin ( pluginName string , endpoint string , versions [ ] string , foundInDeprecatedDir bool ) error {
klog . Infof ( log ( "Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s, foundInDeprecatedDir: %v" ,
pluginName , endpoint , strings . Join ( versions , "," ) , foundInDeprecatedDir ) )
if foundInDeprecatedDir {
// CSI 0.x drivers used /var/lib/kubelet/plugins as the socket dir.
// This was deprecated as the socket dir for kubelet drivers, in lieu of a dedicated dir /var/lib/kubelet/plugins_registry
// The deprecated dir will only be allowed for a whitelisted set of old versions.
// CSI 1.x drivers should use the /var/lib/kubelet/plugins_registry
if ! isDeprecatedSocketDirAllowed ( versions ) {
err := fmt . Errorf ( "socket for CSI driver %q versions %v was found in a deprecated dir. Drivers implementing CSI 1.x+ must use the new dir" , pluginName , versions )
klog . Error ( err )
return err
}
}
_ , err := h . validateVersions ( "ValidatePlugin" , pluginName , endpoint , versions )
return err
}
// RegisterPlugin is called when a plugin can be registered
func ( h * RegistrationHandler ) RegisterPlugin ( pluginName string , endpoint string , versions [ ] string ) error {
klog . Infof ( log ( "Register new plugin with name: %s at endpoint: %s" , pluginName , endpoint ) )
highestSupportedVersion , err := h . validateVersions ( "RegisterPlugin" , pluginName , endpoint , versions )
if err != nil {
return err
}
2019-04-07 17:07:55 +00:00
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name.
csiDrivers . Set ( pluginName , Driver {
endpoint : endpoint ,
highestSupportedVersion : highestSupportedVersion ,
} )
2019-01-12 04:58:27 +00:00
// Get node info from the driver.
csi , err := newCsiDriverClient ( csiDriverName ( pluginName ) )
if err != nil {
return err
}
ctx , cancel := context . WithTimeout ( context . Background ( ) , csiTimeout )
defer cancel ( )
driverNodeID , maxVolumePerNode , accessibleTopology , err := csi . NodeGetInfo ( ctx )
if err != nil {
if unregErr := unregisterDriver ( pluginName ) ; unregErr != nil {
2019-04-07 17:07:55 +00:00
klog . Error ( log ( "registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v" , unregErr ) )
2019-01-12 04:58:27 +00:00
}
return err
}
err = nim . InstallCSIDriver ( pluginName , driverNodeID , maxVolumePerNode , accessibleTopology )
if err != nil {
if unregErr := unregisterDriver ( pluginName ) ; unregErr != nil {
klog . Error ( log ( "registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v" , unregErr ) )
}
return err
}
return nil
}
func ( h * RegistrationHandler ) validateVersions ( callerName , pluginName string , endpoint string , versions [ ] string ) ( * utilversion . Version , error ) {
if len ( versions ) == 0 {
err := fmt . Errorf ( "%s for CSI driver %q failed. Plugin returned an empty list for supported versions" , callerName , pluginName )
klog . Error ( err )
return nil , err
}
// Validate version
newDriverHighestVersion , err := highestSupportedVersion ( versions )
if err != nil {
err := fmt . Errorf ( "%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v" , callerName , pluginName , versions , err )
klog . Error ( err )
return nil , err
}
2019-04-07 17:07:55 +00:00
existingDriver , driverExists := csiDrivers . Get ( pluginName )
2019-01-12 04:58:27 +00:00
if driverExists {
if ! existingDriver . highestSupportedVersion . LessThan ( newDriverHighestVersion ) {
err := fmt . Errorf ( "%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q" , callerName , pluginName , existingDriver . highestSupportedVersion )
klog . Error ( err )
return nil , err
}
}
return newDriverHighestVersion , nil
}
// DeRegisterPlugin is called when a plugin removed its socket, signaling
// it is no longer available
func ( h * RegistrationHandler ) DeRegisterPlugin ( pluginName string ) {
klog . V ( 4 ) . Info ( log ( "registrationHandler.DeRegisterPlugin request for plugin %s" , pluginName ) )
if err := unregisterDriver ( pluginName ) ; err != nil {
klog . Error ( log ( "registrationHandler.DeRegisterPlugin failed: %v" , err ) )
}
}
func ( p * csiPlugin ) Init ( host volume . VolumeHost ) error {
2019-04-09 17:34:40 +00:00
csiPluginLock . Lock ( )
defer csiPluginLock . Unlock ( )
if WaitForValidHostName && host . GetHostName ( ) == "" {
for {
if p . host != nil {
return nil
}
csiPluginLock . Unlock ( )
time . Sleep ( time . Second )
klog . Infof ( "Waiting for CSI volume hostname" )
csiPluginLock . Lock ( )
}
}
2019-01-12 04:58:27 +00:00
p . host = host
2019-04-07 17:07:55 +00:00
if utilfeature . DefaultFeatureGate . Enabled ( features . CSIDriverRegistry ) {
csiClient := host . GetKubeClient ( )
if csiClient == nil {
klog . Warning ( log ( "kubeclient not set, assuming standalone kubelet" ) )
} else {
2019-07-14 07:58:54 +00:00
// set CSIDriverLister
adcHost , ok := host . ( volume . AttachDetachVolumeHost )
if ok {
p . csiDriverLister = adcHost . CSIDriverLister ( )
if p . csiDriverLister == nil {
klog . Error ( log ( "CSIDriverLister not found on AttachDetachVolumeHost" ) )
}
}
kletHost , ok := host . ( volume . KubeletVolumeHost )
if ok {
p . csiDriverLister = kletHost . CSIDriverLister ( )
if p . csiDriverLister == nil {
klog . Error ( log ( "CSIDriverLister not found on KubeletVolumeHost" ) )
}
}
2019-04-07 17:07:55 +00:00
}
}
2019-01-12 04:58:27 +00:00
2019-08-30 18:33:25 +00:00
var migratedPlugins = map [ string ] ( func ( ) bool ) {
csitranslationplugins . GCEPDInTreePluginName : func ( ) bool {
return utilfeature . DefaultFeatureGate . Enabled ( features . CSIMigration ) && utilfeature . DefaultFeatureGate . Enabled ( features . CSIMigrationGCE )
} ,
csitranslationplugins . AWSEBSInTreePluginName : func ( ) bool {
return utilfeature . DefaultFeatureGate . Enabled ( features . CSIMigration ) && utilfeature . DefaultFeatureGate . Enabled ( features . CSIMigrationAWS )
} ,
csitranslationplugins . CinderInTreePluginName : func ( ) bool {
return utilfeature . DefaultFeatureGate . Enabled ( features . CSIMigration ) && utilfeature . DefaultFeatureGate . Enabled ( features . CSIMigrationOpenStack )
} ,
}
2019-04-07 17:07:55 +00:00
// Initializing the label management channels
2019-08-30 18:33:25 +00:00
nim = nodeinfomanager . NewNodeInfoManager ( host . GetNodeName ( ) , host , migratedPlugins )
if utilfeature . DefaultFeatureGate . Enabled ( features . CSINodeInfo ) &&
utilfeature . DefaultFeatureGate . Enabled ( features . CSIMigration ) {
// This function prevents Kubelet from posting Ready status until CSINodeInfo
// is both installed and initialized
if err := initializeCSINode ( host ) ; err != nil {
return fmt . Errorf ( "failed to initialize CSINodeInfo: %v" , err )
}
}
2019-01-12 04:58:27 +00:00
return nil
}
2019-08-30 18:33:25 +00:00
func initializeCSINode ( host volume . VolumeHost ) error {
kvh , ok := host . ( volume . KubeletVolumeHost )
if ! ok {
klog . V ( 4 ) . Info ( "Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet" )
return nil
}
kubeClient := host . GetKubeClient ( )
if kubeClient == nil {
// Kubelet running in standalone mode. Skip CSINodeInfo initialization
klog . Warning ( "Skipping CSINodeInfo initialization, kubelet running in standalone mode" )
return nil
}
kvh . SetKubeletError ( errors . New ( "CSINodeInfo is not yet initialized" ) )
go func ( ) {
defer utilruntime . HandleCrash ( )
// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
// after max retry steps.
initBackoff := wait . Backoff {
Steps : 6 ,
Duration : 15 * time . Millisecond ,
Factor : 6.0 ,
Jitter : 0.1 ,
}
err := wait . ExponentialBackoff ( initBackoff , func ( ) ( bool , error ) {
klog . V ( 4 ) . Infof ( "Initializing migrated drivers on CSINodeInfo" )
err := nim . InitializeCSINodeWithAnnotation ( )
if err != nil {
kvh . SetKubeletError ( fmt . Errorf ( "Failed to initialize CSINodeInfo: %v" , err ) )
klog . Errorf ( "Failed to initialize CSINodeInfo: %v" , err )
return false , nil
}
// Successfully initialized drivers, allow Kubelet to post Ready
kvh . SetKubeletError ( nil )
return true , nil
} )
if err != nil {
// 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin)
// are permanently enabled the apiserver/controllers can assume that the kubelet is
// using CSI for all Migrated volume plugins. Then all the CSINode initialization
// code can be dropped from Kubelet.
// Kill the Kubelet process and allow it to restart to retry initialization
klog . Fatalf ( "Failed to initialize CSINodeInfo after retrying" )
}
} ( )
return nil
}
2019-01-12 04:58:27 +00:00
func ( p * csiPlugin ) GetPluginName ( ) string {
2019-04-07 17:07:55 +00:00
return CSIPluginName
2019-01-12 04:58:27 +00:00
}
// GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
// That string value is used in Detach() to extract driver name and volumeName.
func ( p * csiPlugin ) GetVolumeName ( spec * volume . Spec ) ( string , error ) {
2019-04-07 17:07:55 +00:00
csi , err := getPVSourceFromSpec ( spec )
2019-01-12 04:58:27 +00:00
if err != nil {
klog . Error ( log ( "plugin.GetVolumeName failed to extract volume source from spec: %v" , err ) )
return "" , err
}
// return driverName<separator>volumeHandle
return fmt . Sprintf ( "%s%s%s" , csi . Driver , volNameSep , csi . VolumeHandle ) , nil
}
func ( p * csiPlugin ) CanSupport ( spec * volume . Spec ) bool {
// TODO (vladimirvivien) CanSupport should also take into account
// the availability/registration of specified Driver in the volume source
2019-04-07 17:07:55 +00:00
if spec == nil {
return false
}
2019-08-30 18:33:25 +00:00
if utilfeature . DefaultFeatureGate . Enabled ( features . CSIInlineVolume ) {
return ( spec . PersistentVolume != nil && spec . PersistentVolume . Spec . CSI != nil ) ||
( spec . Volume != nil && spec . Volume . CSI != nil )
}
2019-04-07 17:07:55 +00:00
2019-01-12 04:58:27 +00:00
return spec . PersistentVolume != nil && spec . PersistentVolume . Spec . CSI != nil
}
2019-04-07 17:07:55 +00:00
func ( p * csiPlugin ) IsMigratedToCSI ( ) bool {
return false
}
2019-01-12 04:58:27 +00:00
func ( p * csiPlugin ) RequiresRemount ( ) bool {
return false
}
func ( p * csiPlugin ) NewMounter (
spec * volume . Spec ,
pod * api . Pod ,
_ volume . VolumeOptions ) ( volume . Mounter , error ) {
2019-04-07 17:07:55 +00:00
2019-08-30 18:33:25 +00:00
volSrc , pvSrc , err := getSourceFromSpec ( spec )
2019-01-12 04:58:27 +00:00
if err != nil {
return nil , err
}
2019-04-07 17:07:55 +00:00
var (
driverName string
volumeHandle string
readOnly bool
)
switch {
2019-08-30 18:33:25 +00:00
case volSrc != nil && utilfeature . DefaultFeatureGate . Enabled ( features . CSIInlineVolume ) :
volumeHandle = makeVolumeHandle ( string ( pod . UID ) , spec . Name ( ) )
driverName = volSrc . Driver
if volSrc . ReadOnly != nil {
readOnly = * volSrc . ReadOnly
}
2019-04-07 17:07:55 +00:00
case pvSrc != nil :
driverName = pvSrc . Driver
volumeHandle = pvSrc . VolumeHandle
readOnly = spec . ReadOnly
default :
return nil , fmt . Errorf ( "volume source not found in volume.Spec" )
}
driverMode , err := p . getDriverMode ( spec )
2019-01-12 04:58:27 +00:00
if err != nil {
return nil , err
}
k8s := p . host . GetKubeClient ( )
if k8s == nil {
klog . Error ( log ( "failed to get a kubernetes client" ) )
return nil , errors . New ( "failed to get a Kubernetes client" )
}
mounter := & csiMountMgr {
plugin : p ,
k8s : k8s ,
spec : spec ,
pod : pod ,
podUID : pod . UID ,
2019-04-07 17:07:55 +00:00
driverName : csiDriverName ( driverName ) ,
driverMode : driverMode ,
volumeID : volumeHandle ,
2019-01-12 04:58:27 +00:00
specVolumeID : spec . Name ( ) ,
readOnly : readOnly ,
}
2019-04-07 17:07:55 +00:00
mounter . csiClientGetter . driverName = csiDriverName ( driverName )
2019-01-12 04:58:27 +00:00
// Save volume info in pod dir
dir := mounter . GetPath ( )
dataDir := path . Dir ( dir ) // dropoff /mount at end
if err := os . MkdirAll ( dataDir , 0750 ) ; err != nil {
klog . Error ( log ( "failed to create dir %#v: %v" , dataDir , err ) )
return nil , err
}
klog . V ( 4 ) . Info ( log ( "created path successfully [%s]" , dataDir ) )
2019-08-30 18:33:25 +00:00
mounter . MetricsProvider = NewMetricsCsi ( volumeHandle , dir , csiDriverName ( driverName ) )
2019-01-12 04:58:27 +00:00
// persist volume info data for teardown
node := string ( p . host . GetNodeName ( ) )
volData := map [ string ] string {
2019-04-07 17:07:55 +00:00
volDataKey . specVolID : spec . Name ( ) ,
volDataKey . volHandle : volumeHandle ,
volDataKey . driverName : driverName ,
volDataKey . nodeName : node ,
volDataKey . driverMode : string ( driverMode ) ,
2019-01-12 04:58:27 +00:00
}
2019-04-07 17:07:55 +00:00
attachID := getAttachmentName ( volumeHandle , driverName , node )
volData [ volDataKey . attachmentID ] = attachID
2019-01-12 04:58:27 +00:00
if err := saveVolumeData ( dataDir , volDataFileName , volData ) ; err != nil {
klog . Error ( log ( "failed to save volume info data: %v" , err ) )
if err := os . RemoveAll ( dataDir ) ; err != nil {
klog . Error ( log ( "failed to remove dir after error [%s]: %v" , dataDir , err ) )
return nil , err
}
return nil , err
}
klog . V ( 4 ) . Info ( log ( "mounter created successfully" ) )
return mounter , nil
}
func ( p * csiPlugin ) NewUnmounter ( specName string , podUID types . UID ) ( volume . Unmounter , error ) {
klog . V ( 4 ) . Infof ( log ( "setting up unmounter for [name=%v, podUID=%v]" , specName , podUID ) )
unmounter := & csiMountMgr {
plugin : p ,
podUID : podUID ,
specVolumeID : specName ,
}
// load volume info from file
dir := unmounter . GetPath ( )
dataDir := path . Dir ( dir ) // dropoff /mount at end
data , err := loadVolumeData ( dataDir , volDataFileName )
if err != nil {
klog . Error ( log ( "unmounter failed to load volume data file [%s]: %v" , dir , err ) )
return nil , err
}
unmounter . driverName = csiDriverName ( data [ volDataKey . driverName ] )
unmounter . volumeID = data [ volDataKey . volHandle ]
2019-03-29 00:03:05 +00:00
unmounter . csiClientGetter . driverName = unmounter . driverName
2019-01-12 04:58:27 +00:00
return unmounter , nil
}
func ( p * csiPlugin ) ConstructVolumeSpec ( volumeName , mountPath string ) ( * volume . Spec , error ) {
klog . V ( 4 ) . Info ( log ( "plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]" , volumeName , mountPath ) )
volData , err := loadVolumeData ( mountPath , volDataFileName )
if err != nil {
klog . Error ( log ( "plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v" , mountPath , err ) )
return nil , err
}
klog . V ( 4 ) . Info ( log ( "plugin.ConstructVolumeSpec extracted [%#v]" , volData ) )
2019-04-07 17:07:55 +00:00
var spec * volume . Spec
2019-08-30 18:33:25 +00:00
inlineEnabled := utilfeature . DefaultFeatureGate . Enabled ( features . CSIInlineVolume )
if inlineEnabled {
mode := driverMode ( volData [ volDataKey . driverMode ] )
switch {
case mode == ephemeralDriverMode :
spec = p . constructVolSourceSpec ( volData [ volDataKey . specVolID ] , volData [ volDataKey . driverName ] )
case mode == persistentDriverMode :
fallthrough
default :
spec = p . constructPVSourceSpec ( volData [ volDataKey . specVolID ] , volData [ volDataKey . driverName ] , volData [ volDataKey . volHandle ] )
}
} else {
spec = p . constructPVSourceSpec ( volData [ volDataKey . specVolID ] , volData [ volDataKey . driverName ] , volData [ volDataKey . volHandle ] )
}
2019-04-07 17:07:55 +00:00
return spec , nil
}
// constructVolSourceSpec constructs volume.Spec with CSIVolumeSource
func ( p * csiPlugin ) constructVolSourceSpec ( volSpecName , driverName string ) * volume . Spec {
vol := & api . Volume {
Name : volSpecName ,
VolumeSource : api . VolumeSource {
CSI : & api . CSIVolumeSource {
Driver : driverName ,
} ,
} ,
}
return volume . NewSpecFromVolume ( vol )
}
//constructPVSourceSpec constructs volume.Spec with CSIPersistentVolumeSource
func ( p * csiPlugin ) constructPVSourceSpec ( volSpecName , driverName , volumeHandle string ) * volume . Spec {
2019-01-12 04:58:27 +00:00
fsMode := api . PersistentVolumeFilesystem
pv := & api . PersistentVolume {
ObjectMeta : meta . ObjectMeta {
2019-04-07 17:07:55 +00:00
Name : volSpecName ,
2019-01-12 04:58:27 +00:00
} ,
Spec : api . PersistentVolumeSpec {
PersistentVolumeSource : api . PersistentVolumeSource {
CSI : & api . CSIPersistentVolumeSource {
2019-04-07 17:07:55 +00:00
Driver : driverName ,
VolumeHandle : volumeHandle ,
2019-01-12 04:58:27 +00:00
} ,
} ,
VolumeMode : & fsMode ,
} ,
}
2019-04-07 17:07:55 +00:00
return volume . NewSpecFromPersistentVolume ( pv , false )
2019-01-12 04:58:27 +00:00
}
func ( p * csiPlugin ) SupportsMountOption ( ) bool {
// TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags
// to probe for the result for this method
// (bswartz) Until the CSI spec supports probing, our only option is to
// make plugins register their support for mount options or lack thereof
// directly with kubernetes.
return true
}
func ( p * csiPlugin ) SupportsBulkVolumeVerification ( ) bool {
return false
}
// volume.AttachableVolumePlugin methods
var _ volume . AttachableVolumePlugin = & csiPlugin { }
var _ volume . DeviceMountableVolumePlugin = & csiPlugin { }
func ( p * csiPlugin ) NewAttacher ( ) ( volume . Attacher , error ) {
2019-08-30 18:33:25 +00:00
return p . newAttacherDetacher ( )
2019-01-12 04:58:27 +00:00
}
func ( p * csiPlugin ) NewDeviceMounter ( ) ( volume . DeviceMounter , error ) {
return p . NewAttacher ( )
}
func ( p * csiPlugin ) NewDetacher ( ) ( volume . Detacher , error ) {
2019-08-30 18:33:25 +00:00
return p . newAttacherDetacher ( )
2019-01-12 04:58:27 +00:00
}
2019-08-30 18:33:25 +00:00
func ( p * csiPlugin ) CanAttach ( spec * volume . Spec ) ( bool , error ) {
2019-04-07 17:07:55 +00:00
driverMode , err := p . getDriverMode ( spec )
if err != nil {
2019-08-30 18:33:25 +00:00
return false , err
2019-04-07 17:07:55 +00:00
}
if driverMode == ephemeralDriverMode {
2019-08-30 18:33:25 +00:00
klog . V ( 5 ) . Info ( log ( "plugin.CanAttach = false, ephemeral mode detected for spec %v" , spec . Name ( ) ) )
return false , nil
2019-04-07 17:07:55 +00:00
}
pvSrc , err := getCSISourceFromSpec ( spec )
if err != nil {
2019-08-30 18:33:25 +00:00
return false , err
2019-04-07 17:07:55 +00:00
}
driverName := pvSrc . Driver
skipAttach , err := p . skipAttach ( driverName )
if err != nil {
2019-08-30 18:33:25 +00:00
return false , err
2019-04-07 17:07:55 +00:00
}
2019-08-30 18:33:25 +00:00
return ! skipAttach , nil
}
// CanDeviceMount returns true if the spec supports device mount
func ( p * csiPlugin ) CanDeviceMount ( spec * volume . Spec ) ( bool , error ) {
driverMode , err := p . getDriverMode ( spec )
if err != nil {
return false , err
}
if driverMode == ephemeralDriverMode {
klog . V ( 5 ) . Info ( log ( "plugin.CanDeviceMount skipped ephemeral mode detected for spec %v" , spec . Name ( ) ) )
return false , nil
}
return true , nil
2019-04-07 17:07:55 +00:00
}
2019-01-12 04:58:27 +00:00
func ( p * csiPlugin ) NewDeviceUnmounter ( ) ( volume . DeviceUnmounter , error ) {
return p . NewDetacher ( )
}
func ( p * csiPlugin ) GetDeviceMountRefs ( deviceMountPath string ) ( [ ] string , error ) {
m := p . host . GetMounter ( p . GetPluginName ( ) )
return m . GetMountRefs ( deviceMountPath )
}
// BlockVolumePlugin methods
var _ volume . BlockVolumePlugin = & csiPlugin { }
func ( p * csiPlugin ) NewBlockVolumeMapper ( spec * volume . Spec , podRef * api . Pod , opts volume . VolumeOptions ) ( volume . BlockVolumeMapper , error ) {
2019-04-07 17:07:55 +00:00
if ! p . blockEnabled {
return nil , errors . New ( "CSIBlockVolume feature not enabled" )
}
pvSource , err := getCSISourceFromSpec ( spec )
if err != nil {
return nil , err
}
readOnly , err := getReadOnlyFromSpec ( spec )
if err != nil {
return nil , err
}
klog . V ( 4 ) . Info ( log ( "setting up block mapper for [volume=%v,driver=%v]" , pvSource . VolumeHandle , pvSource . Driver ) )
k8s := p . host . GetKubeClient ( )
if k8s == nil {
klog . Error ( log ( "failed to get a kubernetes client" ) )
return nil , errors . New ( "failed to get a Kubernetes client" )
}
mapper := & csiBlockMapper {
k8s : k8s ,
plugin : p ,
volumeID : pvSource . VolumeHandle ,
driverName : csiDriverName ( pvSource . Driver ) ,
readOnly : readOnly ,
spec : spec ,
specName : spec . Name ( ) ,
podUID : podRef . UID ,
}
mapper . csiClientGetter . driverName = csiDriverName ( pvSource . Driver )
// Save volume info in pod dir
dataDir := getVolumeDeviceDataDir ( spec . Name ( ) , p . host )
if err := os . MkdirAll ( dataDir , 0750 ) ; err != nil {
klog . Error ( log ( "failed to create data dir %s: %v" , dataDir , err ) )
return nil , err
}
klog . V ( 4 ) . Info ( log ( "created path successfully [%s]" , dataDir ) )
// persist volume info data for teardown
node := string ( p . host . GetNodeName ( ) )
attachID := getAttachmentName ( pvSource . VolumeHandle , pvSource . Driver , node )
volData := map [ string ] string {
volDataKey . specVolID : spec . Name ( ) ,
volDataKey . volHandle : pvSource . VolumeHandle ,
volDataKey . driverName : pvSource . Driver ,
volDataKey . nodeName : node ,
volDataKey . attachmentID : attachID ,
}
if err := saveVolumeData ( dataDir , volDataFileName , volData ) ; err != nil {
klog . Error ( log ( "failed to save volume info data: %v" , err ) )
if err := os . RemoveAll ( dataDir ) ; err != nil {
klog . Error ( log ( "failed to remove dir after error [%s]: %v" , dataDir , err ) )
return nil , err
}
return nil , err
}
return mapper , nil
2019-01-12 04:58:27 +00:00
}
func ( p * csiPlugin ) NewBlockVolumeUnmapper ( volName string , podUID types . UID ) ( volume . BlockVolumeUnmapper , error ) {
2019-04-07 17:07:55 +00:00
if ! p . blockEnabled {
return nil , errors . New ( "CSIBlockVolume feature not enabled" )
}
klog . V ( 4 ) . Infof ( log ( "setting up block unmapper for [Spec=%v, podUID=%v]" , volName , podUID ) )
unmapper := & csiBlockMapper {
plugin : p ,
podUID : podUID ,
specName : volName ,
}
// load volume info from file
dataDir := getVolumeDeviceDataDir ( unmapper . specName , p . host )
data , err := loadVolumeData ( dataDir , volDataFileName )
if err != nil {
klog . Error ( log ( "unmapper failed to load volume data file [%s]: %v" , dataDir , err ) )
return nil , err
}
unmapper . driverName = csiDriverName ( data [ volDataKey . driverName ] )
unmapper . volumeID = data [ volDataKey . volHandle ]
unmapper . csiClientGetter . driverName = unmapper . driverName
if err != nil {
return nil , err
}
return unmapper , nil
2019-01-12 04:58:27 +00:00
}
func ( p * csiPlugin ) ConstructBlockVolumeSpec ( podUID types . UID , specVolName , mapPath string ) ( * volume . Spec , error ) {
2019-04-07 17:07:55 +00:00
if ! p . blockEnabled {
return nil , errors . New ( "CSIBlockVolume feature not enabled" )
}
klog . V ( 4 ) . Infof ( "plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]" , string ( podUID ) , specVolName , mapPath )
dataDir := getVolumeDeviceDataDir ( specVolName , p . host )
volData , err := loadVolumeData ( dataDir , volDataFileName )
if err != nil {
klog . Error ( log ( "plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v" , mapPath , err ) )
return nil , err
}
klog . V ( 4 ) . Info ( log ( "plugin.ConstructBlockVolumeSpec extracted [%#v]" , volData ) )
blockMode := api . PersistentVolumeBlock
pv := & api . PersistentVolume {
ObjectMeta : meta . ObjectMeta {
Name : volData [ volDataKey . specVolID ] ,
} ,
Spec : api . PersistentVolumeSpec {
PersistentVolumeSource : api . PersistentVolumeSource {
CSI : & api . CSIPersistentVolumeSource {
Driver : volData [ volDataKey . driverName ] ,
VolumeHandle : volData [ volDataKey . volHandle ] ,
} ,
} ,
VolumeMode : & blockMode ,
} ,
}
return volume . NewSpecFromPersistentVolume ( pv , false ) , nil
2019-01-12 04:58:27 +00:00
}
2019-04-07 17:07:55 +00:00
// skipAttach looks up CSIDriver object associated with driver name
2019-08-30 18:33:25 +00:00
// to determine if driver requires attachment volume operation
2019-01-12 04:58:27 +00:00
func ( p * csiPlugin ) skipAttach ( driver string ) ( bool , error ) {
2019-04-07 17:07:55 +00:00
if ! utilfeature . DefaultFeatureGate . Enabled ( features . CSIDriverRegistry ) {
return false , nil
}
2019-07-14 07:58:54 +00:00
kletHost , ok := p . host . ( volume . KubeletVolumeHost )
if ok {
kletHost . WaitForCacheSync ( )
}
2019-04-07 17:07:55 +00:00
if p . csiDriverLister == nil {
return false , errors . New ( "CSIDriver lister does not exist" )
}
csiDriver , err := p . csiDriverLister . Get ( driver )
if err != nil {
if apierrs . IsNotFound ( err ) {
// Don't skip attach if CSIDriver does not exist
return false , nil
}
return false , err
}
if csiDriver . Spec . AttachRequired != nil && * csiDriver . Spec . AttachRequired == false {
return true , nil
}
2019-01-12 04:58:27 +00:00
return false , nil
}
2019-04-07 17:07:55 +00:00
// getDriverMode returns the driver mode for the specified spec: {persistent|ephemeral}.
// 1) If mode cannot be determined, it will default to "persistent".
// 2) If Mode cannot be resolved to either {persistent | ephemeral}, an error is returned
// See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/20190122-csi-inline-volumes.md
func ( p * csiPlugin ) getDriverMode ( spec * volume . Spec ) ( driverMode , error ) {
2019-08-30 18:33:25 +00:00
// TODO (vladimirvivien) ultimately, mode will be retrieved from CSIDriver.Spec.Mode.
// However, in alpha version, mode is determined by the volume source:
// 1) if volume.Spec.Volume.CSI != nil -> mode is ephemeral
// 2) if volume.Spec.PersistentVolume.Spec.CSI != nil -> persistent
volSrc , _ , err := getSourceFromSpec ( spec )
if err != nil {
return "" , err
}
if volSrc != nil && utilfeature . DefaultFeatureGate . Enabled ( features . CSIInlineVolume ) {
return ephemeralDriverMode , nil
}
2019-04-07 17:07:55 +00:00
return persistentDriverMode , nil
}
2019-01-12 04:58:27 +00:00
func ( p * csiPlugin ) getPublishContext ( client clientset . Interface , handle , driver , nodeName string ) ( map [ string ] string , error ) {
skip , err := p . skipAttach ( driver )
if err != nil {
return nil , err
}
if skip {
return nil , nil
}
attachID := getAttachmentName ( handle , driver , nodeName )
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
2019-04-07 17:07:55 +00:00
attachment , err := client . StorageV1 ( ) . VolumeAttachments ( ) . Get ( attachID , meta . GetOptions { } )
2019-01-12 04:58:27 +00:00
if err != nil {
return nil , err // This err already has enough context ("VolumeAttachment xyz not found")
}
if attachment == nil {
err = errors . New ( "no existing VolumeAttachment found" )
return nil , err
}
return attachment . Status . AttachmentMetadata , nil
}
2019-08-30 18:33:25 +00:00
func ( p * csiPlugin ) newAttacherDetacher ( ) ( * csiAttacher , error ) {
k8s := p . host . GetKubeClient ( )
if k8s == nil {
klog . Error ( log ( "unable to get kubernetes client from host" ) )
return nil , errors . New ( "unable to get Kubernetes client" )
}
return & csiAttacher {
plugin : p ,
k8s : k8s ,
waitSleepTime : 1 * time . Second ,
} , nil
}
2019-01-12 04:58:27 +00:00
func unregisterDriver ( driverName string ) error {
2019-04-07 17:07:55 +00:00
csiDrivers . Delete ( driverName )
2019-01-12 04:58:27 +00:00
if err := nim . UninstallCSIDriver ( driverName ) ; err != nil {
klog . Errorf ( "Error uninstalling CSI driver: %v" , err )
return err
}
return nil
}
// Return the highest supported version
func highestSupportedVersion ( versions [ ] string ) ( * utilversion . Version , error ) {
if len ( versions ) == 0 {
return nil , fmt . Errorf ( "CSI driver reporting empty array for supported versions" )
}
// Sort by lowest to highest version
sort . Slice ( versions , func ( i , j int ) bool {
parsedVersionI , err := utilversion . ParseGeneric ( versions [ i ] )
if err != nil {
// Push bad values to the bottom
return true
}
parsedVersionJ , err := utilversion . ParseGeneric ( versions [ j ] )
if err != nil {
// Push bad values to the bottom
return false
}
return parsedVersionI . LessThan ( parsedVersionJ )
} )
for i := len ( versions ) - 1 ; i >= 0 ; i -- {
highestSupportedVersion , err := utilversion . ParseGeneric ( versions [ i ] )
if err != nil {
return nil , err
}
if highestSupportedVersion . Major ( ) <= 1 {
return highestSupportedVersion , nil
}
}
return nil , fmt . Errorf ( "None of the CSI versions reported by this driver are supported" )
}
// Only drivers that implement CSI 0.x are allowed to use deprecated socket dir.
func isDeprecatedSocketDirAllowed ( versions [ ] string ) bool {
for _ , version := range versions {
if isV0Version ( version ) {
return true
}
}
return false
}
func isV0Version ( version string ) bool {
parsedVersion , err := utilversion . ParseGeneric ( version )
if err != nil {
return false
}
return parsedVersion . Major ( ) == 0
}