Merge pull request #74835 from davidz627/feature/adcFallback

Add logic for initializing CSINode on Node startup for CSI Migration [Replaces #70909]
pull/564/head
Kubernetes Prow Robot 2019-03-07 18:39:25 -08:00 committed by GitHub
commit ad27abde62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 592 additions and 37 deletions

View File

@ -223,6 +223,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.InformerFactory.Storage().V1beta1().CSINodes(),
ctx.Cloud,
ProbeAttachableVolumePlugins(),
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),

View File

@ -101,4 +101,10 @@ const (
// This annotation will be used to compute the in-cluster network programming latency SLI, see
// https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time"
// MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated
// list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode.
// This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or
// CSI Backend for a volume plugin on a specific node.
MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins"
)

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/controller/volume/attachdetach/reconciler:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
@ -31,11 +32,14 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",

View File

@ -30,11 +30,14 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
@ -47,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -101,6 +105,7 @@ func NewAttachDetachController(
nodeInformer coreinformers.NodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
csiNodeInformer storageinformers.CSINodeInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber,
@ -136,6 +141,12 @@ func NewAttachDetachController(
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
adc.csiNodeLister = csiNodeInformer.Lister()
adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced
}
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
}
@ -257,6 +268,9 @@ type attachDetachController struct {
nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced
csiNodeLister storagelisters.CSINodeLister
csiNodeSynced kcache.InformerSynced
// cloud provider used by volume host
cloud cloudprovider.Interface
@ -309,7 +323,12 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting attach detach controller")
defer klog.Infof("Shutting down attach detach controller")
if !controller.WaitForCacheSync("attach detach", stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) {
synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced}
if adc.csiNodeSynced != nil {
synced = append(synced, adc.csiNodeSynced)
}
if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
return
}
@ -643,6 +662,17 @@ func (adc *attachDetachController) processVolumesInUse(
}
}
var _ volume.VolumeHost = &attachDetachController{}
var _ volume.AttachDetachVolumeHost = &attachDetachController{}
func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister {
return adc.csiNodeLister
}
func (adc *attachDetachController) IsAttachDetachController() bool {
return true
}
// VolumeHost implementation
// This is an unfortunate requirement of the current factoring of volume plugin
// initializing code. It requires kubelet specific methods used by the mounting

View File

@ -44,6 +44,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
nil, /* cloud */
nil, /* plugins */
nil, /* prober */
@ -218,6 +219,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
nil, /* cloud */
plugins,
prober,

View File

@ -776,6 +776,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
tokenManager := token.NewManager(kubeDeps.KubeClient)
// NewInitializedVolumePluginMgr intializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
// which affects node ready status. This function must be called before Kubelet is initialized so that the Node
// ReadyState is accurate with the storage state.
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil {

View File

@ -544,7 +544,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
nodestatus.RemoveOutOfDiskCondition(),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event

View File

@ -440,6 +440,7 @@ func ReadyCondition(
nowFunc func() time.Time, // typically Kubelet.clock.Now
runtimeErrorsFunc func() error, // typically Kubelet.runtimeState.runtimeErrors
networkErrorsFunc func() error, // typically Kubelet.runtimeState.networkErrors
storageErrorsFunc func() error, // typically Kubelet.runtimeState.storageErrors
appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
@ -456,7 +457,7 @@ func ReadyCondition(
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc()}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc()}
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)

View File

@ -895,6 +895,7 @@ func TestReadyCondition(t *testing.T) {
node *v1.Node
runtimeErrors error
networkErrors error
storageErrors error
appArmorValidateHostFunc func() error
cmStatus cm.Status
expectConditions []v1.NodeCondition
@ -929,6 +930,12 @@ func TestReadyCondition(t *testing.T) {
},
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)},
},
{
desc: "new, not ready: storage errors",
node: withCapacity.DeepCopy(),
storageErrors: errors.New("some storage error"),
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "some storage error", now, now)},
},
{
desc: "new, not ready: runtime and network errors",
node: withCapacity.DeepCopy(),
@ -1003,6 +1010,9 @@ func TestReadyCondition(t *testing.T) {
networkErrorsFunc := func() error {
return tc.networkErrors
}
storageErrorsFunc := func() error {
return tc.storageErrors
}
cmStatusFunc := func() cm.Status {
return tc.cmStatus
}
@ -1014,7 +1024,7 @@ func TestReadyCondition(t *testing.T) {
})
}
// construct setter
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc)
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc)
// call setter on node
if err := setter(tc.node); err != nil {
t.Fatalf("unexpected error: %v", err)

View File

@ -30,6 +30,7 @@ type runtimeState struct {
lastBaseRuntimeSync time.Time
baseRuntimeSyncThreshold time.Duration
networkError error
storageError error
cidr string
healthChecks []*healthCheck
}
@ -61,6 +62,12 @@ func (s *runtimeState) setNetworkState(err error) {
s.networkError = err
}
func (s *runtimeState) setStorageState(err error) {
s.Lock()
defer s.Unlock()
s.storageError = err
}
func (s *runtimeState) setPodCIDR(cidr string) {
s.Lock()
defer s.Unlock()
@ -101,6 +108,16 @@ func (s *runtimeState) networkErrors() error {
return utilerrors.NewAggregate(errs)
}
func (s *runtimeState) storageErrors() error {
s.RLock()
defer s.RUnlock()
errs := []error{}
if s.storageError != nil {
errs = append(errs, s.storageError)
}
return utilerrors.NewAggregate(errs)
}
func newRuntimeState(
runtimeSyncThreshold time.Duration,
) *runtimeState {

View File

@ -80,6 +80,7 @@ func NewInitializedVolumePluginMgr(
// Compile-time check to ensure kubeletVolumeHost implements the VolumeHost interface
var _ volume.VolumeHost = &kubeletVolumeHost{}
var _ volume.KubeletVolumeHost = &kubeletVolumeHost{}
func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string {
return kvh.kubelet.getPluginDir(pluginName)
@ -94,6 +95,10 @@ type kubeletVolumeHost struct {
mountPodManager mountpod.Manager
}
func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
kvh.kubelet.runtimeState.setStorageState(err)
}
func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
return kvh.kubelet.getVolumeDevicePluginDir(pluginName)
}

View File

@ -30,6 +30,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -24,6 +24,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
@ -32,6 +33,7 @@ go_library(
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -33,6 +33,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -40,6 +41,7 @@ import (
csiinformer "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes"
csilister "k8s.io/client-go/listers/storage/v1beta1"
csitranslationplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
@ -216,15 +218,84 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
go factory.Start(wait.NeverStop)
}
var migratedPlugins = map[string](func() bool){
csitranslationplugins.GCEPDInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
},
csitranslationplugins.AWSEBSInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS)
},
csitranslationplugins.CinderInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack)
},
}
// Initializing the label management channels
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins)
// TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver
// objects for migrated drivers.
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
// This function prevents Kubelet from posting Ready status until CSINodeInfo
// is both installed and initialized
if err := initializeCSINode(host); err != nil {
return fmt.Errorf("failed to initialize CSINodeInfo: %v", err)
}
}
return nil
}
func initializeCSINode(host volume.VolumeHost) error {
kvh, ok := host.(volume.KubeletVolumeHost)
if !ok {
klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet")
return nil
}
kubeClient := host.GetKubeClient()
if kubeClient == nil {
// Kubelet running in standalone mode. Skip CSINodeInfo initialization
klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode")
return nil
}
kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized"))
go func() {
defer utilruntime.HandleCrash()
// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
// after max retry steps.
initBackoff := wait.Backoff{
Steps: 6,
Duration: 15 * time.Millisecond,
Factor: 6.0,
Jitter: 0.1,
}
err := wait.ExponentialBackoff(initBackoff, func() (bool, error) {
klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo")
err := nim.InitializeCSINodeWithAnnotation()
if err != nil {
kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err))
klog.Errorf("Failed to initialize CSINodeInfo: %v", err)
return false, nil
}
// Successfully initialized drivers, allow Kubelet to post Ready
kvh.SetKubeletError(nil)
return true, nil
})
if err != nil {
// 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin)
// are permanently enabled the apiserver/controllers can assume that the kubelet is
// using CSI for all Migrated volume plugins. Then all the CSINode initialization
// code can be dropped from Kubelet.
// Kill the Kubelet process and allow it to restart to retry initialization
klog.Fatalf("Failed to initialize CSINodeInfo after retrying")
}
}()
return nil
}
func (p *csiPlugin) GetPluginName() string {
return CSIPluginName
}

View File

@ -20,12 +20,14 @@ package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomana
import (
"encoding/json"
goerrors "errors"
"fmt"
"strings"
"time"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -62,6 +64,7 @@ var (
type nodeInfoManager struct {
nodeName types.NodeName
volumeHost volume.VolumeHost
migratedPlugins map[string](func() bool)
}
// If no updates is needed, the function must return the same Node object as the input.
@ -69,7 +72,10 @@ type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
// Interface implements an interface for managing labels of a node
type Interface interface {
CreateCSINode() (*storage.CSINode, error)
CreateCSINode() (*storagev1beta1.CSINode, error)
// Updates or Creates the CSINode object with annotations for CSI Migration
InitializeCSINodeWithAnnotation() error
// Record in the cluster the given node information from the CSI driver with the given name.
// Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls
@ -85,10 +91,12 @@ type Interface interface {
// NewNodeInfoManager initializes nodeInfoManager
func NewNodeInfoManager(
nodeName types.NodeName,
volumeHost volume.VolumeHost) Interface {
volumeHost volume.VolumeHost,
migratedPlugins map[string](func() bool)) Interface {
return &nodeInfoManager{
nodeName: nodeName,
volumeHost: volumeHost,
migratedPlugins: migratedPlugins,
}
}
@ -384,7 +392,46 @@ func (nim *nodeInfoManager) tryUpdateCSINode(
return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology)
}
func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) {
func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
csiKubeClient := nim.volumeHost.GetKubeClient()
if csiKubeClient == nil {
return goerrors.New("error getting CSI client")
}
var updateErrs []error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil {
updateErrs = append(updateErrs, err)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
}
return nil
}
func (nim *nodeInfoManager) tryInitializeCSINodeWithAnnotation(csiKubeClient clientset.Interface) error {
nodeInfo, err := csiKubeClient.StorageV1beta1().CSINodes().Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
// CreateCSINode will set the annotation
_, err = nim.CreateCSINode()
return err
}
annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
if annotationModified {
_, err := csiKubeClient.StorageV1beta1().CSINodes().Update(nodeInfo)
return err
}
return nil
}
func (nim *nodeInfoManager) CreateCSINode() (*storagev1beta1.CSINode, error) {
kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil {
@ -401,7 +448,7 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) {
return nil, err
}
nodeInfo := &storage.CSINode{
nodeInfo := &storagev1beta1.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: string(nim.nodeName),
OwnerReferences: []metav1.OwnerReference{
@ -413,16 +460,59 @@ func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) {
},
},
},
Spec: storage.CSINodeSpec{
Drivers: []storage.CSINodeDriver{},
Spec: storagev1beta1.CSINodeSpec{
Drivers: []storagev1beta1.CSINodeDriver{},
},
}
setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo)
}
func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storagev1beta1.CSINode) (modified bool) {
if migratedPlugins == nil {
return false
}
nodeInfoAnnotations := nodeInfo.GetAnnotations()
if nodeInfoAnnotations == nil {
nodeInfoAnnotations = map[string]string{}
}
var oldAnnotationSet sets.String
mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey]
tok := strings.Split(mpa, ",")
if len(mpa) == 0 {
oldAnnotationSet = sets.NewString()
} else {
oldAnnotationSet = sets.NewString(tok...)
}
newAnnotationSet := sets.NewString()
for pluginName, migratedFunc := range migratedPlugins {
if migratedFunc() {
newAnnotationSet.Insert(pluginName)
}
}
if oldAnnotationSet.Equal(newAnnotationSet) {
return false
}
nas := strings.Join(newAnnotationSet.List(), ",")
if len(nas) != 0 {
nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
} else {
delete(nodeInfoAnnotations, v1.MigratedPluginsAnnotationKey)
}
nodeInfo.Annotations = nodeInfoAnnotations
return true
}
func (nim *nodeInfoManager) installDriverToCSINode(
nodeInfo *storage.CSINode,
nodeInfo *storagev1beta1.CSINode,
driverName string,
driverNodeID string,
topology map[string]string) error {
@ -438,9 +528,8 @@ func (nim *nodeInfoManager) installDriverToCSINode(
}
specModified := true
statusModified := true
// Clone driver list, omitting the driver that matches the given driverName
newDriverSpecs := []storage.CSINodeDriver{}
newDriverSpecs := []storagev1beta1.CSINodeDriver{}
for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
if driverInfoSpec.Name == driverName {
if driverInfoSpec.NodeID == driverNodeID &&
@ -453,12 +542,14 @@ func (nim *nodeInfoManager) installDriverToCSINode(
}
}
if !specModified && !statusModified {
annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
if !specModified && !annotationModified {
return nil
}
// Append new driver
driverSpec := storage.CSINodeDriver{
driverSpec := storagev1beta1.CSINodeDriver{
Name: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
@ -517,6 +608,7 @@ func (nim *nodeInfoManager) tryUninstallDriverFromCSINode(
hasModified = true
}
}
if !hasModified {
// No changes, don't update
return nil

View File

@ -19,6 +19,7 @@ package nodeinfomanager
import (
"encoding/json"
"fmt"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
@ -545,6 +546,156 @@ func TestUninstallCSIDriverCSINodeInfoDisabled(t *testing.T) {
test(t, false /* addNodeInfo */, false /* csiNodeInfoEnabled */, testcases)
}
func TestSetMigrationAnnotation(t *testing.T) {
testcases := []struct {
name string
migratedPlugins map[string](func() bool)
existingNode *storage.CSINode
expectedNode *storage.CSINode
expectModified bool
}{
{
name: "nil migrated plugins",
existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
},
{
name: "one modified plugin",
migratedPlugins: map[string](func() bool){
"test": func() bool { return true },
},
existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"},
},
},
expectModified: true,
},
{
name: "existing plugin",
migratedPlugins: map[string](func() bool){
"test": func() bool { return true },
},
existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"},
},
},
expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"},
},
},
expectModified: false,
},
{
name: "remove plugin",
migratedPlugins: map[string](func() bool){},
existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test"},
},
},
expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{},
},
},
expectModified: true,
},
{
name: "one modified plugin, other annotations stable",
migratedPlugins: map[string](func() bool){
"test": func() bool { return true },
},
existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{"other": "annotation"},
},
},
expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test", "other": "annotation"},
},
},
expectModified: true,
},
{
name: "multiple plugins modified, other annotations stable",
migratedPlugins: map[string](func() bool){
"test": func() bool { return true },
"foo": func() bool { return false },
},
existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{"other": "annotation", v1.MigratedPluginsAnnotationKey: "foo"},
},
},
expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "test", "other": "annotation"},
},
},
expectModified: true,
},
{
name: "multiple plugins added, other annotations stable",
migratedPlugins: map[string](func() bool){
"test": func() bool { return true },
"foo": func() bool { return true },
},
existingNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{"other": "annotation"},
},
},
expectedNode: &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{v1.MigratedPluginsAnnotationKey: "foo,test", "other": "annotation"},
},
},
expectModified: true,
},
}
for _, tc := range testcases {
t.Logf("test case: %s", tc.name)
modified := setMigrationAnnotation(tc.migratedPlugins, tc.existingNode)
if modified != tc.expectModified {
t.Errorf("Expected modified to be %v but got %v instead", tc.expectModified, modified)
}
if !reflect.DeepEqual(tc.expectedNode, tc.existingNode) {
t.Errorf("Expected CSINode: %v, but got: %v", tc.expectedNode, tc.existingNode)
}
}
}
func TestInstallCSIDriverExistingAnnotation(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, true)()
@ -591,7 +742,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) {
nodeName,
)
nim := NewNodeInfoManager(types.NodeName(nodeName), host)
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
// Act
_, err = nim.CreateCSINode()
@ -649,7 +800,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t
nil,
nodeName,
)
nim := NewNodeInfoManager(types.NodeName(nodeName), host)
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
//// Act
nim.CreateCSINode()

View File

@ -30,6 +30,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/validation"
clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
@ -278,6 +279,32 @@ type BlockVolumePlugin interface {
ConstructBlockVolumeSpec(podUID types.UID, volumeName, volumePath string) (*Spec, error)
}
// TODO(#14217)
// As part of the Volume Host refactor we are starting to create Volume Hosts
// for specific hosts. New methods for each specific host can be added here.
// Currently consumers will do type assertions to get the specific type of Volume
// Host; however, the end result should be that specific Volume Hosts are passed
// to the specific functions they are needed in (instead of using a catch-all
// VolumeHost interface)
// KubeletVolumeHost is a Kubelet specific interface that plugins can use to access the kubelet.
type KubeletVolumeHost interface {
// SetKubeletError lets plugins set an error on the Kubelet runtime status
// that will cause the Kubelet to post NotReady status with the error message provided
SetKubeletError(err error)
}
// AttachDetachVolumeHost is a AttachDetach Controller specific interface that plugins can use
// to access methods on the Attach Detach Controller.
type AttachDetachVolumeHost interface {
// CSINodeLister returns the informer lister for the CSINode API Object
CSINodeLister() storagelisters.CSINodeLister
// IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost
// to the attachDetachController
IsAttachDetachController() bool
}
// VolumeHost is an interface that plugins can use to access the kubelet.
type VolumeHost interface {
// GetPluginDir returns the absolute path to a directory under which

View File

@ -28,6 +28,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@ -17,6 +17,7 @@ limitations under the License.
package operationexecutor
import (
goerrors "errors"
"fmt"
"path"
"strings"
@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
@ -303,8 +305,14 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}
originalSpec := volumeToAttach.VolumeSpec
nu, err := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
if err != nil {
eventRecorderFunc(&err)
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.NodeUsingCSIPlugin failed", err)
}
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) {
if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) && nu {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
@ -401,17 +409,22 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
if volumeToDetach.VolumeSpec != nil {
// Get attacher plugin
nu, err := nodeUsingCSIPlugin(og, volumeToDetach.VolumeSpec, volumeToDetach.NodeName)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NodeUsingCSIPlugin failed", err)
}
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) {
if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) && nu {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
}
csiSpec, err := translateSpec(volumeToDetach.VolumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err)
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.TranslateSpec failed", err)
}
volumeToDetach.VolumeSpec = csiSpec
@ -861,7 +874,7 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
if deviceOpened {
return deviceToDetach.GenerateError(
"UnmountDevice failed",
fmt.Errorf("the device is in use when it was no longer expected to be in use"))
goerrors.New("the device is in use when it was no longer expected to be in use"))
}
klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", ""))
@ -968,7 +981,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
devicePath = pluginDevicePath
}
if len(devicePath) == 0 {
return volumeToMount.GenerateError("MapVolume failed", fmt.Errorf("Device path of the volume is empty"))
return volumeToMount.GenerateError("MapVolume failed", goerrors.New("Device path of the volume is empty"))
}
// When kubelet is containerized, devicePath may be a symlink at a place unavailable to
@ -1533,8 +1546,11 @@ func isDeviceOpened(deviceToDetach AttachedVolume, mounter mount.Interface) (boo
return deviceOpened, nil
}
// TODO(dyzz): need to also add logic to check CSINodeInfo for Kubelet migration status
func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
// TODO(#75146) Check whether the driver is installed as well so that
// we can throw a better error when the driver is not installed.
// The error should be of the approximate form:
// fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName)
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
return false
}
@ -1547,6 +1563,93 @@ func useCSIPlugin(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
return false
}
func nodeUsingCSIPlugin(og *operationGenerator, spec *volume.Spec, nodeName types.NodeName) (bool, error) {
migratable, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec)
if err != nil {
return false, err
}
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) ||
!utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) ||
!migratable {
return false, nil
}
if len(nodeName) == 0 {
return false, goerrors.New("nodeName is empty")
}
kubeClient := og.volumePluginMgr.Host.GetKubeClient()
if kubeClient == nil {
// Don't handle the controller/kubelet version skew check and fallback
// to just checking the feature gates. This can happen if
// we are in a standalone (headless) Kubelet
return true, nil
}
adcHost, ok := og.volumePluginMgr.Host.(volume.AttachDetachVolumeHost)
if !ok {
// Don't handle the controller/kubelet version skew check and fallback
// to just checking the feature gates. This can happen if
// "enableControllerAttachDetach" is set to true on kubelet
return true, nil
}
if adcHost.CSINodeLister() == nil {
return false, goerrors.New("could not find CSINodeLister in attachDetachController")
}
csiNode, err := adcHost.CSINodeLister().Get(string(nodeName))
if err != nil {
return false, err
}
ann := csiNode.GetAnnotations()
if ann == nil {
return false, nil
}
var mpaSet sets.String
mpa := ann[v1.MigratedPluginsAnnotationKey]
tok := strings.Split(mpa, ",")
if len(mpa) == 0 {
mpaSet = sets.NewString()
} else {
mpaSet = sets.NewString(tok...)
}
pluginName, err := csilib.GetInTreePluginNameFromSpec(spec.PersistentVolume, spec.Volume)
if err != nil {
return false, err
}
if len(pluginName) == 0 {
// Could not find a plugin name from translation directory, assume not translated
return false, nil
}
isMigratedOnNode := mpaSet.Has(pluginName)
if isMigratedOnNode {
installed := false
driverName, err := csilib.GetCSINameFromInTreeName(pluginName)
if err != nil {
return isMigratedOnNode, err
}
for _, driver := range csiNode.Spec.Drivers {
if driver.Name == driverName {
installed = true
break
}
}
if !installed {
return true, fmt.Errorf("in-tree plugin %s is migrated on node %s but driver %s is not installed", pluginName, string(nodeName), driverName)
}
}
return isMigratedOnNode, nil
}
func translateSpec(spec *volume.Spec) (*volume.Spec, error) {
if spec.PersistentVolume != nil {
// TranslateInTreePVToCSI will create a new PV
@ -1559,8 +1662,8 @@ func translateSpec(spec *volume.Spec) (*volume.Spec, error) {
ReadOnly: spec.ReadOnly,
}, nil
} else if spec.Volume != nil {
return &volume.Spec{}, fmt.Errorf("translation is not supported for in-line volumes yet")
return &volume.Spec{}, goerrors.New("translation is not supported for in-line volumes yet")
} else {
return &volume.Spec{}, fmt.Errorf("not a valid volume spec")
return &volume.Spec{}, goerrors.New("not a valid volume spec")
}
}

View File

@ -76,6 +76,9 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csidrivers").RuleOrDie())
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("storage.k8s.io").Resources("csinodes").RuleOrDie())
}
}
return role

View File

@ -97,4 +97,10 @@ const (
// This annotation will be used to compute the in-cluster network programming latency SLI, see
// https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time"
// MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated
// list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode.
// This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or
// CSI Backend for a volume plugin on a specific node.
MigratedPluginsAnnotationKey = "storage.alpha.kubernetes.io/migrated-plugins"
)

View File

@ -17,6 +17,7 @@ limitations under the License.
package csitranslation
import (
"errors"
"fmt"
"k8s.io/api/core/v1"
@ -48,7 +49,7 @@ func TranslateInTreeStorageClassParametersToCSI(inTreePluginName string, scParam
// be modified
func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
if pv == nil {
return nil, fmt.Errorf("persistent volume was nil")
return nil, errors.New("persistent volume was nil")
}
copiedPV := pv.DeepCopy()
for _, curPlugin := range inTreePlugins {
@ -64,7 +65,7 @@ func TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, erro
// by the `Driver` field in the CSI Source. The input PV object will not be modified.
func TranslateCSIPVToInTree(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
if pv == nil || pv.Spec.CSI == nil {
return nil, fmt.Errorf("CSI persistent volume was nil")
return nil, errors.New("CSI persistent volume was nil")
}
copiedPV := pv.DeepCopy()
for driverName, curPlugin := range inTreePlugins {
@ -95,6 +96,23 @@ func IsMigratedCSIDriverByName(csiPluginName string) bool {
return false
}
// GetInTreePluginNameFromSpec returns the plugin name
func GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error) {
if pv != nil {
for _, curPlugin := range inTreePlugins {
if curPlugin.CanSupport(pv) {
return curPlugin.GetInTreePluginName(), nil
}
}
return "", fmt.Errorf("could not find in-tree plugin name from persistent volume %v", pv)
} else if vol != nil {
// TODO(dyzz): Implement inline volume migration support
return "", errors.New("inline volume migration not yet supported")
} else {
return "", errors.New("both persistent volume and volume are nil")
}
}
// GetCSINameFromInTreeName returns the name of a CSI driver that supersedes the
// in-tree plugin with the given name
func GetCSINameFromInTreeName(pluginName string) (string, error) {
@ -103,7 +121,7 @@ func GetCSINameFromInTreeName(pluginName string) (string, error) {
return csiDriverName, nil
}
}
return "", fmt.Errorf("Could not find CSI Driver name for plugin %v", pluginName)
return "", fmt.Errorf("could not find CSI Driver name for plugin %v", pluginName)
}
// GetInTreeNameFromCSIName returns the name of the in-tree plugin superseded by

View File

@ -415,6 +415,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
informers.Core().V1().Nodes(),
informers.Core().V1().PersistentVolumeClaims(),
informers.Core().V1().PersistentVolumes(),
informers.Storage().V1beta1().CSINodes(),
cloud,
plugins,
nil, /* prober */