/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package csi import ( "errors" "fmt" "os" "path" "sort" "strings" "sync" "time" "context" "k8s.io/klog" api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilversion "k8s.io/apimachinery/pkg/util/version" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager" ) const ( csiPluginName = "kubernetes.io/csi" // TODO (vladimirvivien) implement a more dynamic way to discover // the unix domain socket path for each installed csi driver. // TODO (vladimirvivien) would be nice to name socket with a .sock extension // for consistency. csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock" csiTimeout = 15 * time.Second volNameSep = "^" volDataFileName = "vol_data.json" fsTypeBlockName = "block" ) type csiPlugin struct { host volume.VolumeHost } // ProbeVolumePlugins returns implemented plugins func ProbeVolumePlugins() []volume.VolumePlugin { p := &csiPlugin{ host: nil, } return []volume.VolumePlugin{p} } // volume.VolumePlugin methods var _ volume.VolumePlugin = &csiPlugin{} type csiDriver struct { driverName string driverEndpoint string highestSupportedVersion *utilversion.Version } type csiDriversStore struct { driversMap map[string]csiDriver sync.RWMutex } // RegistrationHandler is the handler which is fed to the pluginwatcher API. type RegistrationHandler struct { } // TODO (verult) consider using a struct instead of global variables // csiDrivers map keep track of all registered CSI drivers on the node and their // corresponding sockets var csiDrivers csiDriversStore var nim nodeinfomanager.Interface // PluginHandler is the plugin registration handler interface passed to the // pluginwatcher module in kubelet var PluginHandler = &RegistrationHandler{} // ValidatePlugin is called by kubelet's plugin watcher upon detection // of a new registration socket opened by CSI Driver registrar side car. func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error { klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s, foundInDeprecatedDir: %v", pluginName, endpoint, strings.Join(versions, ","), foundInDeprecatedDir)) if foundInDeprecatedDir { // CSI 0.x drivers used /var/lib/kubelet/plugins as the socket dir. // This was deprecated as the socket dir for kubelet drivers, in lieu of a dedicated dir /var/lib/kubelet/plugins_registry // The deprecated dir will only be allowed for a whitelisted set of old versions. // CSI 1.x drivers should use the /var/lib/kubelet/plugins_registry if !isDeprecatedSocketDirAllowed(versions) { err := fmt.Errorf("socket for CSI driver %q versions %v was found in a deprecated dir. Drivers implementing CSI 1.x+ must use the new dir", pluginName, versions) klog.Error(err) return err } } _, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions) return err } // RegisterPlugin is called when a plugin can be registered func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error { klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint)) highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions) if err != nil { return err } func() { // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key // all other CSI components will be able to get the actual socket of CSI drivers by its name. // It's not necessary to lock the entire RegistrationCallback() function because only the CSI // client depends on this driver map, and the CSI client does not depend on node information // updated in the rest of the function. csiDrivers.Lock() defer csiDrivers.Unlock() csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint, highestSupportedVersion: highestSupportedVersion} }() // Get node info from the driver. csi, err := newCsiDriverClient(csiDriverName(pluginName)) if err != nil { return err } // TODO (verult) retry with exponential backoff, possibly added in csi client library. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx) if err != nil { klog.Error(log("registrationHandler.RegisterPlugin failed at CSI.NodeGetInfo: %v", err)) if unregErr := unregisterDriver(pluginName); unregErr != nil { klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous: %v", unregErr)) return unregErr } return err } err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology) if err != nil { klog.Error(log("registrationHandler.RegisterPlugin failed at AddNodeInfo: %v", err)) if unregErr := unregisterDriver(pluginName); unregErr != nil { klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr)) return unregErr } return err } return nil } func (h *RegistrationHandler) validateVersions(callerName, pluginName string, endpoint string, versions []string) (*utilversion.Version, error) { if len(versions) == 0 { err := fmt.Errorf("%s for CSI driver %q failed. Plugin returned an empty list for supported versions", callerName, pluginName) klog.Error(err) return nil, err } // Validate version newDriverHighestVersion, err := highestSupportedVersion(versions) if err != nil { err := fmt.Errorf("%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v", callerName, pluginName, versions, err) klog.Error(err) return nil, err } // Check for existing drivers with the same name var existingDriver csiDriver driverExists := false func() { csiDrivers.RLock() defer csiDrivers.RUnlock() existingDriver, driverExists = csiDrivers.driversMap[pluginName] }() if driverExists { if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) { err := fmt.Errorf("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion) klog.Error(err) return nil, err } } return newDriverHighestVersion, nil } // DeRegisterPlugin is called when a plugin removed its socket, signaling // it is no longer available func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { klog.V(4).Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName)) if err := unregisterDriver(pluginName); err != nil { klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err)) } } func (p *csiPlugin) Init(host volume.VolumeHost) error { p.host = host // Initializing csiDrivers map and label management channels csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host) // TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver // objects for migrated drivers. return nil } func (p *csiPlugin) GetPluginName() string { return csiPluginName } // GetvolumeName returns a concatenated string of CSIVolumeSource.DriverCSIVolumeSource.VolumeHandle // That string value is used in Detach() to extract driver name and volumeName. func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) { csi, err := getCSISourceFromSpec(spec) if err != nil { klog.Error(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err)) return "", err } // return driverNamevolumeHandle return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil } func (p *csiPlugin) CanSupport(spec *volume.Spec) bool { // TODO (vladimirvivien) CanSupport should also take into account // the availability/registration of specified Driver in the volume source return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil } func (p *csiPlugin) RequiresRemount() bool { return false } func (p *csiPlugin) NewMounter( spec *volume.Spec, pod *api.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { pvSource, err := getCSISourceFromSpec(spec) if err != nil { return nil, err } readOnly, err := getReadOnlyFromSpec(spec) if err != nil { return nil, err } k8s := p.host.GetKubeClient() if k8s == nil { klog.Error(log("failed to get a kubernetes client")) return nil, errors.New("failed to get a Kubernetes client") } csi, err := newCsiDriverClient(csiDriverName(pvSource.Driver)) if err != nil { return nil, err } mounter := &csiMountMgr{ plugin: p, k8s: k8s, spec: spec, pod: pod, podUID: pod.UID, driverName: csiDriverName(pvSource.Driver), volumeID: pvSource.VolumeHandle, specVolumeID: spec.Name(), csiClient: csi, readOnly: readOnly, } // Save volume info in pod dir dir := mounter.GetPath() dataDir := path.Dir(dir) // dropoff /mount at end if err := os.MkdirAll(dataDir, 0750); err != nil { klog.Error(log("failed to create dir %#v: %v", dataDir, err)) return nil, err } klog.V(4).Info(log("created path successfully [%s]", dataDir)) // persist volume info data for teardown node := string(p.host.GetNodeName()) attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node) volData := map[string]string{ volDataKey.specVolID: spec.Name(), volDataKey.volHandle: pvSource.VolumeHandle, volDataKey.driverName: pvSource.Driver, volDataKey.nodeName: node, volDataKey.attachmentID: attachID, } if err := saveVolumeData(dataDir, volDataFileName, volData); err != nil { klog.Error(log("failed to save volume info data: %v", err)) if err := os.RemoveAll(dataDir); err != nil { klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, err)) return nil, err } return nil, err } klog.V(4).Info(log("mounter created successfully")) return mounter, nil } func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) { klog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID)) unmounter := &csiMountMgr{ plugin: p, podUID: podUID, specVolumeID: specName, } // load volume info from file dir := unmounter.GetPath() dataDir := path.Dir(dir) // dropoff /mount at end data, err := loadVolumeData(dataDir, volDataFileName) if err != nil { klog.Error(log("unmounter failed to load volume data file [%s]: %v", dir, err)) return nil, err } unmounter.driverName = csiDriverName(data[volDataKey.driverName]) unmounter.volumeID = data[volDataKey.volHandle] unmounter.csiClient, err = newCsiDriverClient(unmounter.driverName) if err != nil { return nil, err } return unmounter, nil } func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { klog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath)) volData, err := loadVolumeData(mountPath, volDataFileName) if err != nil { klog.Error(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err)) return nil, err } klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData)) fsMode := api.PersistentVolumeFilesystem pv := &api.PersistentVolume{ ObjectMeta: meta.ObjectMeta{ Name: volData[volDataKey.specVolID], }, Spec: api.PersistentVolumeSpec{ PersistentVolumeSource: api.PersistentVolumeSource{ CSI: &api.CSIPersistentVolumeSource{ Driver: volData[volDataKey.driverName], VolumeHandle: volData[volDataKey.volHandle], }, }, VolumeMode: &fsMode, }, } return volume.NewSpecFromPersistentVolume(pv, false), nil } func (p *csiPlugin) SupportsMountOption() bool { // TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags // to probe for the result for this method // (bswartz) Until the CSI spec supports probing, our only option is to // make plugins register their support for mount options or lack thereof // directly with kubernetes. return true } func (p *csiPlugin) SupportsBulkVolumeVerification() bool { return false } // volume.AttachableVolumePlugin methods var _ volume.AttachableVolumePlugin = &csiPlugin{} var _ volume.DeviceMountableVolumePlugin = &csiPlugin{} func (p *csiPlugin) NewAttacher() (volume.Attacher, error) { k8s := p.host.GetKubeClient() if k8s == nil { klog.Error(log("unable to get kubernetes client from host")) return nil, errors.New("unable to get Kubernetes client") } return &csiAttacher{ plugin: p, k8s: k8s, waitSleepTime: 1 * time.Second, }, nil } func (p *csiPlugin) NewDeviceMounter() (volume.DeviceMounter, error) { return p.NewAttacher() } func (p *csiPlugin) NewDetacher() (volume.Detacher, error) { k8s := p.host.GetKubeClient() if k8s == nil { klog.Error(log("unable to get kubernetes client from host")) return nil, errors.New("unable to get Kubernetes client") } return &csiAttacher{ plugin: p, k8s: k8s, waitSleepTime: 1 * time.Second, }, nil } func (p *csiPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) { return p.NewDetacher() } func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { m := p.host.GetMounter(p.GetPluginName()) return m.GetMountRefs(deviceMountPath) } // BlockVolumePlugin methods var _ volume.BlockVolumePlugin = &csiPlugin{} func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) { return nil, errors.New("CSIBlockVolume feature not enabled") } func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) { return nil, errors.New("CSIBlockVolume feature not enabled") } func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) { return nil, errors.New("CSIBlockVolume feature not enabled") } func (p *csiPlugin) skipAttach(driver string) (bool, error) { return false, nil } func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) { skip, err := p.skipAttach(driver) if err != nil { return nil, err } if skip { return nil, nil } attachID := getAttachmentName(handle, driver, nodeName) // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName attachment, err := client.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { return nil, err // This err already has enough context ("VolumeAttachment xyz not found") } if attachment == nil { err = errors.New("no existing VolumeAttachment found") return nil, err } return attachment.Status.AttachmentMetadata, nil } func unregisterDriver(driverName string) error { func() { csiDrivers.Lock() defer csiDrivers.Unlock() delete(csiDrivers.driversMap, driverName) }() if err := nim.UninstallCSIDriver(driverName); err != nil { klog.Errorf("Error uninstalling CSI driver: %v", err) return err } return nil } // Return the highest supported version func highestSupportedVersion(versions []string) (*utilversion.Version, error) { if len(versions) == 0 { return nil, fmt.Errorf("CSI driver reporting empty array for supported versions") } // Sort by lowest to highest version sort.Slice(versions, func(i, j int) bool { parsedVersionI, err := utilversion.ParseGeneric(versions[i]) if err != nil { // Push bad values to the bottom return true } parsedVersionJ, err := utilversion.ParseGeneric(versions[j]) if err != nil { // Push bad values to the bottom return false } return parsedVersionI.LessThan(parsedVersionJ) }) for i := len(versions) - 1; i >= 0; i-- { highestSupportedVersion, err := utilversion.ParseGeneric(versions[i]) if err != nil { return nil, err } if highestSupportedVersion.Major() <= 1 { return highestSupportedVersion, nil } } return nil, fmt.Errorf("None of the CSI versions reported by this driver are supported") } // Only drivers that implement CSI 0.x are allowed to use deprecated socket dir. func isDeprecatedSocketDirAllowed(versions []string) bool { for _, version := range versions { if isV0Version(version) { return true } } return false } func isV0Version(version string) bool { parsedVersion, err := utilversion.ParseGeneric(version) if err != nil { return false } return parsedVersion.Major() == 0 }