Add ADC Fallback if Node doesn't have driver installed

pull/564/head
David Zhu 2019-03-05 10:57:18 -08:00
parent 34d9ee5b9b
commit 7d2f4e97b8
17 changed files with 225 additions and 44 deletions

View File

@ -223,6 +223,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
ctx.InformerFactory.Core().V1().Nodes(), ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(), ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.InformerFactory.Storage().V1beta1().CSINodes(),
ctx.Cloud, ctx.Cloud,
ProbeAttachableVolumePlugins(), ProbeAttachableVolumePlugins(),
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),

View File

@ -101,4 +101,7 @@ const (
// This annotation will be used to compute the in-cluster network programming latency SLI, see // This annotation will be used to compute the in-cluster network programming latency SLI, see
// https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time"
// TODO(dyzz) Comment
MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins"
) )

View File

@ -32,10 +32,12 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",

View File

@ -31,10 +31,12 @@ import (
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
kcache "k8s.io/client-go/tools/cache" kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -101,6 +103,7 @@ func NewAttachDetachController(
nodeInformer coreinformers.NodeInformer, nodeInformer coreinformers.NodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer, pvInformer coreinformers.PersistentVolumeInformer,
csiNodeInformer storageinformers.CSINodeInformer,
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
plugins []volume.VolumePlugin, plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber, prober volume.DynamicPluginProber,
@ -132,6 +135,8 @@ func NewAttachDetachController(
podIndexer: podInformer.Informer().GetIndexer(), podIndexer: podInformer.Informer().GetIndexer(),
nodeLister: nodeInformer.Lister(), nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced, nodesSynced: nodeInformer.Informer().HasSynced,
csiNodeLister: csiNodeInformer.Lister(),
csiNodeSynced: csiNodeInformer.Informer().HasSynced,
cloud: cloud, cloud: cloud,
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"), pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
} }
@ -257,6 +262,9 @@ type attachDetachController struct {
nodeLister corelisters.NodeLister nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced nodesSynced kcache.InformerSynced
csiNodeLister storagelisters.CSINodeLister
csiNodeSynced kcache.InformerSynced
// cloud provider used by volume host // cloud provider used by volume host
cloud cloudprovider.Interface cloud cloudprovider.Interface
@ -643,6 +651,17 @@ func (adc *attachDetachController) processVolumesInUse(
} }
} }
var _ volume.VolumeHost = &attachDetachController{}
var _ volume.AttachDetachVolumeHost = &attachDetachController{}
func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister {
return adc.csiNodeLister
}
func (adc *attachDetachController) CSINodeSynced() bool {
return adc.csiNodeSynced()
}
// VolumeHost implementation // VolumeHost implementation
// This is an unfortunate requirement of the current factoring of volume plugin // This is an unfortunate requirement of the current factoring of volume plugin
// initializing code. It requires kubelet specific methods used by the mounting // initializing code. It requires kubelet specific methods used by the mounting

View File

@ -44,6 +44,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(), informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
nil, /* cloud */ nil, /* cloud */
nil, /* plugins */ nil, /* plugins */
nil, /* prober */ nil, /* prober */
@ -218,6 +219,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(), informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(), informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
nil, /* cloud */ nil, /* cloud */
plugins, plugins,
prober, prober,

View File

@ -1376,7 +1376,8 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
var pluginName string var pluginName string
provisionerName := storageClass.Provisioner provisionerName := storageClass.Provisioner
if plugin != nil { if plugin != nil {
if plugin.IsMigratedToCSI() { // TODO(dyzz) Just temporary to test without dynamic provisioning
if plugin.IsMigratedToCSI() && false {
// pluginName is not set here to align with existing behavior // pluginName is not set here to align with existing behavior
// of not setting pluginName for external provisioners (including CSI) // of not setting pluginName for external provisioners (including CSI)
// Set provisionerName to CSI plugin name for setClaimProvisioner // Set provisionerName to CSI plugin name for setClaimProvisioner
@ -1401,7 +1402,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
} }
claim = newClaim claim = newClaim
if plugin == nil || plugin.IsMigratedToCSI() { if plugin == nil {
// findProvisionablePlugin returned no error nor plugin. // findProvisionablePlugin returned no error nor plugin.
// This means that an unknown provisioner is requested. Report an event // This means that an unknown provisioner is requested. Report an event
// and wait for the external provisioner // and wait for the external provisioner

View File

@ -30,6 +30,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",

View File

@ -26,7 +26,7 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1" storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -71,7 +71,7 @@ type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
// Interface implements an interface for managing labels of a node // Interface implements an interface for managing labels of a node
type Interface interface { type Interface interface {
CreateCSINode() (*storage.CSINode, error) CreateCSINode() (*storagev1beta1.CSINode, error)
// Record in the cluster the given node information from the CSI driver with the given name. // Record in the cluster the given node information from the CSI driver with the given name.
// Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls // Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls
@ -388,7 +388,7 @@ func (nim *nodeInfoManager) tryUpdateCSINode(
return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology) return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology)
} }
func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) { func (nim *nodeInfoManager) CreateCSINode() (*storagev1beta1.CSINode, error) {
kubeClient := nim.volumeHost.GetKubeClient() kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil { if kubeClient == nil {
@ -405,7 +405,7 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) {
return nil, err return nil, err
} }
nodeInfo := &storage.CSINode{ nodeInfo := &storagev1beta1.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: string(nim.nodeName), Name: string(nim.nodeName),
OwnerReferences: []metav1.OwnerReference{ OwnerReferences: []metav1.OwnerReference{
@ -417,8 +417,8 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) {
}, },
}, },
}, },
Spec: storage.CSINodeSpec{ Spec: storagev1beta1.CSINodeSpec{
Drivers: []storage.CSINodeDriver{}, Drivers: []storagev1beta1.CSINodeDriver{},
}, },
} }
@ -427,11 +427,7 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) {
return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo) return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo)
} }
const ( func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storagev1beta1.CSINode) (modified bool) {
migratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins"
)
func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storage.CSINode) (modified bool) {
if migratedPlugins == nil { if migratedPlugins == nil {
return false return false
} }
@ -441,7 +437,7 @@ func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *
nodeInfoAnnotations = map[string]string{} nodeInfoAnnotations = map[string]string{}
} }
mpa := nodeInfoAnnotations[migratedPluginsAnnotationKey] mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey]
tok := strings.Split(mpa, ",") tok := strings.Split(mpa, ",")
oldAnnotationSet := sets.NewString(tok...) oldAnnotationSet := sets.NewString(tok...)
@ -458,9 +454,9 @@ func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *
nas := strings.Join(newAnnotationSet.List(), ",") nas := strings.Join(newAnnotationSet.List(), ",")
if len(nas) != 0 { if len(nas) != 0 {
nodeInfoAnnotations[migratedPluginsAnnotationKey] = nas nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
} else { } else {
delete(nodeInfoAnnotations, migratedPluginsAnnotationKey) delete(nodeInfoAnnotations, v1.MigratedPluginsAnnotationKey)
} }
nodeInfo.Annotations = nodeInfoAnnotations nodeInfo.Annotations = nodeInfoAnnotations
@ -468,7 +464,7 @@ func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *
} }
func (nim *nodeInfoManager) installDriverToCSINode( func (nim *nodeInfoManager) installDriverToCSINode(
nodeInfo *storage.CSINode, nodeInfo *storagev1beta1.CSINode,
driverName string, driverName string,
driverNodeID string, driverNodeID string,
topology map[string]string) error { topology map[string]string) error {
@ -486,7 +482,7 @@ func (nim *nodeInfoManager) installDriverToCSINode(
specModified := true specModified := true
statusModified := true statusModified := true
// Clone driver list, omitting the driver that matches the given driverName // Clone driver list, omitting the driver that matches the given driverName
newDriverSpecs := []storage.CSINodeDriver{} newDriverSpecs := []storagev1beta1.CSINodeDriver{}
for _, driverInfoSpec := range nodeInfo.Spec.Drivers { for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
if driverInfoSpec.Name == driverName { if driverInfoSpec.Name == driverName {
if driverInfoSpec.NodeID == driverNodeID && if driverInfoSpec.NodeID == driverNodeID &&
@ -506,7 +502,7 @@ func (nim *nodeInfoManager) installDriverToCSINode(
} }
// Append new driver // Append new driver
driverSpec := storage.CSINodeDriver{ driverSpec := storagev1beta1.CSINodeDriver{
Name: driverName, Name: driverName,
NodeID: driverNodeID, NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(), TopologyKeys: topologyKeys.List(),

View File

@ -580,7 +580,7 @@ func TestSetMigrationAnnotation(t *testing.T) {
expectedNode: &storage.CSINode{ expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "node1", Name: "node1",
Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"},
}, },
}, },
expectModified: true, expectModified: true,
@ -593,13 +593,13 @@ func TestSetMigrationAnnotation(t *testing.T) {
existingNode: &storage.CSINode{ existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "node1", Name: "node1",
Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"},
}, },
}, },
expectedNode: &storage.CSINode{ expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "node1", Name: "node1",
Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"},
}, },
}, },
expectModified: false, expectModified: false,
@ -610,7 +610,7 @@ func TestSetMigrationAnnotation(t *testing.T) {
existingNode: &storage.CSINode{ existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "node1", Name: "node1",
Annotations: map[string]string{migratedPluginsAnnotationKey: "test"}, Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"},
}, },
}, },
expectedNode: &storage.CSINode{ expectedNode: &storage.CSINode{
@ -635,7 +635,7 @@ func TestSetMigrationAnnotation(t *testing.T) {
expectedNode: &storage.CSINode{ expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "node1", Name: "node1",
Annotations: map[string]string{migratedPluginsAnnotationKey: "test", "other": "annotation"}, Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test", "other": "annotation"},
}, },
}, },
expectModified: true, expectModified: true,
@ -649,13 +649,13 @@ func TestSetMigrationAnnotation(t *testing.T) {
existingNode: &storage.CSINode{ existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "node1", Name: "node1",
Annotations: map[string]string{"other": "annotation", migratedPluginsAnnotationKey: "foo"}, Annotations: map[string]string{"other": "annotation", v1.MigratedPluginsAnnotationKey: "foo"},
}, },
}, },
expectedNode: &storage.CSINode{ expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "node1", Name: "node1",
Annotations: map[string]string{migratedPluginsAnnotationKey: "test", "other": "annotation"}, Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test", "other": "annotation"},
}, },
}, },
expectModified: true, expectModified: true,
@ -675,7 +675,7 @@ func TestSetMigrationAnnotation(t *testing.T) {
expectedNode: &storage.CSINode{ expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "node1", Name: "node1",
Annotations: map[string]string{migratedPluginsAnnotationKey: "foo,test", "other": "annotation"}, Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "foo,test", "other": "annotation"},
}, },
}, },
expectModified: true, expectModified: true,

View File

@ -30,6 +30,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog" "k8s.io/klog"
@ -278,6 +279,14 @@ type BlockVolumePlugin interface {
ConstructBlockVolumeSpec(podUID types.UID, volumeName, volumePath string) (*Spec, error) ConstructBlockVolumeSpec(podUID types.UID, volumeName, volumePath string) (*Spec, error)
} }
// TODO(#14217)
// As part of the Volume Host refactor we are starting to create Volume Hosts
// for specific hosts. New methods for each specific host can be added here.
// Currently consumers will do type assertions to get the specific type of Volume
// Host; however, the end result should be that specific Volume Hosts are passed
// to the specific functions they are needed in (instead of using a catch-all
// VolumeHost interface)
// KubeletVolumeHost is a Kubelet specific interface that plugins can use to access the kubelet. // KubeletVolumeHost is a Kubelet specific interface that plugins can use to access the kubelet.
type KubeletVolumeHost interface { type KubeletVolumeHost interface {
// SetKubeletError lets plugins set an error on the Kubelet runtime status // SetKubeletError lets plugins set an error on the Kubelet runtime status
@ -285,6 +294,17 @@ type KubeletVolumeHost interface {
SetKubeletError(err error) SetKubeletError(err error)
} }
// AttachDetachVolumeHost is a AttachDetach Controller specific interface that plugins can use
// to access methods on the Attach Detach Controller.
type AttachDetachVolumeHost interface {
// CSINodeLister returns the informer lister for the CSINode API Object
CSINodeLister() storagelisters.CSINodeLister
// CSINodeSynced returns a boolean representing whether the CSINode API Object
// informer has been synced
CSINodeSynced() bool
}
// VolumeHost is an interface that plugins can use to access the kubelet. // VolumeHost is an interface that plugins can use to access the kubelet.
type VolumeHost interface { type VolumeHost interface {
// GetPluginDir returns the absolute path to a directory under which // GetPluginDir returns the absolute path to a directory under which

View File

@ -25,9 +25,11 @@ go_library(
"//pkg/volume/util/types:go_default_library", "//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library", "//pkg/volume/util/volumepathhandler:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@ -23,9 +23,11 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
@ -84,6 +86,7 @@ func NewOperationGenerator(kubeClient clientset.Interface,
checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount, checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount,
blkUtil: blkUtil, blkUtil: blkUtil,
} }
// TODO(dyzz) look at default resync time
} }
// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
@ -303,8 +306,14 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
} }
originalSpec := volumeToAttach.VolumeSpec originalSpec := volumeToAttach.VolumeSpec
nu, err := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
if err != nil {
eventRecorderFunc(&err)
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.NodeUsingCSIPlugin failed", err)
}
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate // useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) { if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) && nu {
// The volume represented by this spec is CSI and thus should be migrated // The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil { if err != nil || attachableVolumePlugin == nil {
@ -401,17 +410,22 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
if volumeToDetach.VolumeSpec != nil { if volumeToDetach.VolumeSpec != nil {
// Get attacher plugin // Get attacher plugin
nu, err := nodeUsingCSIPlugin(og, volumeToDetach.VolumeSpec, volumeToDetach.NodeName)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NodeUsingCSIPlugin failed", err)
}
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate // useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) { if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) && nu {
// The volume represented by this spec is CSI and thus should be migrated // The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil { if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
} }
csiSpec, err := translateSpec(volumeToDetach.VolumeSpec) csiSpec, err := translateSpec(volumeToDetach.VolumeSpec)
if err != nil { if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err) return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.TranslateSpec failed", err)
} }
volumeToDetach.VolumeSpec = csiSpec volumeToDetach.VolumeSpec = csiSpec
@ -1547,6 +1561,93 @@ func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
return false return false
} }
func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) {
var err error
migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec)
if err != nil {
return false, err
}
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) ||
!utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) ||
!migratable {
return false, nil
}
if len(nodeName) == 0 {
return false, fmt.Errorf("nodeName is empty")
}
vpm := og.volumePluginMgr
kubeClient := vpm.Host.GetKubeClient()
if kubeClient == nil {
// TODO(dyzz) check this error case, what should we do in standalone kubelet mode?
return false, fmt.Errorf("failed to get kube client from volume host")
}
adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost)
if !ok {
// This function is running not on the AttachDetachController
// We assume that Kubelet is servicing this function and therefore is
// trivially "using CSI Plugin"
return true, nil
}
var csiNode *storagev1beta1.CSINode
if adcHost.CSINodeSynced() {
csiNode, err = adcHost.CSINodeLister().Get(string(nodeName))
if err != nil {
return false, err
}
} else {
// Fallback to GET
klog.Warningf("CSINode informer not synced, falling back to GET directly from API Server")
csiNode, err = kubeClient.StorageV1beta1().CSINodes().Get(string(nodeName), metav1.GetOptions{})
if err != nil {
return false, err
}
}
ann := csiNode.GetAnnotations()
if ann == nil {
return false, nil
}
mpaSet := sets.NewString(strings.Split(ann[v1.MigratedPluginsAnnotationKey], ",")...)
pluginName, err := csilib.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume)
if err != nil {
return false, err
}
if len(pluginName) == 0 {
// Could not find a plugin name from translation directory
return false, nil
}
isMigratedOnNode := mpaSet.Has(pluginName)
if isMigratedOnNode {
installed := false
driverName, err := csilib.GetCSINameFromIntreeName(pluginName)
if err != nil {
return isMigratedOnNode, err
}
for _, driver := range csiNode.Spec.Drivers {
if driver.Name == driverName {
installed = true
break
}
}
if !installed {
return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed.", pluginName, string(nodeName), driverName)
}
}
return isMigratedOnNode, nil
}
func translateSpec(spec *volume.Spec) (*volume.Spec, error) { func translateSpec(spec *volume.Spec) (*volume.Spec, error) {
if spec.PersistentVolume != nil { if spec.PersistentVolume != nil {
// TranslateInTreePVToCSI will create a new PV // TranslateInTreePVToCSI will create a new PV

View File

@ -76,6 +76,9 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csidrivers").RuleOrDie()) role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csidrivers").RuleOrDie())
} }
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csinodes").RuleOrDie())
}
} }
return role return role

View File

@ -66,6 +66,14 @@ items:
- get - get
- list - list
- watch - watch
- apiGroups:
- storage.k8s.io
resources:
- csinodes
verbs:
- get
- list
- watch
- apiVersion: rbac.authorization.k8s.io/v1 - apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:

View File

@ -97,4 +97,7 @@ const (
// This annotation will be used to compute the in-cluster network programming latency SLI, see // This annotation will be used to compute the in-cluster network programming latency SLI, see
// https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time"
// TODO(dyzz) Comment
MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins"
) )

View File

@ -95,15 +95,33 @@ func IsMigratedCSIDriverByName(csiPluginName string) bool {
return false return false
} }
// GetInTreePluginNameFromSpec returns the plugin name
func GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error) {
if pv != nil {
for _, curPlugin := range inTreePlugins {
if curPlugin.CanSupport(pv) {
return curPlugin.GetInTreePluginName(), nil
}
}
return "", fmt.Errorf("could not find in-tree plugin name from persistent volume %v", pv)
} else if vol != nil {
// TODO(dyzz): Implement inline volume migration support
return "", fmt.Errorf("inline volume migration not yet supported")
} else {
return "", fmt.Errorf("both persistent volume and volume are nil")
}
}
// GetCSINameFromInTreeName returns the name of a CSI driver that supersedes the // GetCSINameFromInTreeName returns the name of a CSI driver that supersedes the
// in-tree plugin with the given name // in-tree plugin with the given name
func GetCSINameFromInTreeName(pluginName string) (string, error) { func GetCSINameFromInTreeName(pluginName string) (string, error) {
for csiDriverName, curPlugin := range inTreePlugins { for csiDriverName, curPlugin := range inTreePlugins {
if curPlugin.GetInTreePluginName() == pluginName { if curPlugin.GetInTreePluginName() == pluginName {
return csiDriverName, nil return csiDriverName, nil
} }
} }
return "", fmt.Errorf("Could not find CSI Driver name for plugin %v", pluginName) return "", fmt.Errorf("could not find CSI Driver name for plugin %v", pluginName)
} }
// GetInTreeNameFromCSIName returns the name of the in-tree plugin superseded by // GetInTreeNameFromCSIName returns the name of the in-tree plugin superseded by

View File

@ -415,6 +415,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
informers.Core().V1().Nodes(), informers.Core().V1().Nodes(),
informers.Core().V1().PersistentVolumeClaims(), informers.Core().V1().PersistentVolumeClaims(),
informers.Core().V1().PersistentVolumes(), informers.Core().V1().PersistentVolumes(),
informers.Storage().V1beta1().CSINodes(),
cloud, cloud,
plugins, plugins,
nil, /* prober */ nil, /* prober */