diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 9f4c4922b4..721ebd4026 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -391,16 +391,6 @@ const ( // // Allow TTL controller to clean up Pods and Jobs after they finish. TTLAfterFinished utilfeature.Feature = "TTLAfterFinished" - - // owner: @jsafrane - // Kubernetes skips attaching CSI volumes that don't require attachment. - // - CSISkipAttach utilfeature.Feature = "CSISkipAttach" - - // owner: @jsafrane - // - // Kubelet sends pod information in NodePublish CSI call when a CSI driver wants so. - CSIPodInfo utilfeature.Feature = "CSIPodInfo" ) func init() { @@ -467,8 +457,6 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS VolumeSnapshotDataSource: {Default: false, PreRelease: utilfeature.Alpha}, ProcMountType: {Default: false, PreRelease: utilfeature.Alpha}, TTLAfterFinished: {Default: false, PreRelease: utilfeature.Alpha}, - CSISkipAttach: {Default: false, PreRelease: utilfeature.Alpha}, - CSIPodInfo: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index bb34dca4ed..0d80f8238a 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -27,6 +27,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library", "//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library", "//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library", @@ -59,6 +60,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index c39d536fe7..52ef4f38de 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -30,13 +30,16 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" clientset "k8s.io/client-go/kubernetes" fakeclient "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" utiltesting "k8s.io/client-go/util/testing" fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" ) @@ -202,14 +205,7 @@ func TestAttacherAttach(t *testing.T) { } func TestAttacherWithCSIDriver(t *testing.T) { - originalFeatures := utilfeature.DefaultFeatureGate.DeepCopy() - defer func() { - utilfeature.DefaultFeatureGate = originalFeatures - }() - err := utilfeature.DefaultFeatureGate.Set("CSISkipAttach=true") - if err != nil { - t.Fatalf("Failed to set CSISkipAttach=true: %s", err) - } + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)() tests := []struct { name string @@ -277,14 +273,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { } func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) { - originalFeatures := utilfeature.DefaultFeatureGate.DeepCopy() - defer func() { - utilfeature.DefaultFeatureGate = originalFeatures - }() - err := utilfeature.DefaultFeatureGate.Set("CSISkipAttach=true") - if err != nil { - t.Fatalf("Failed to set CSISkipAttach=true: %s", err) - } + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)() // In order to detect if the volume plugin would skip WaitForAttach for non-attachable drivers, // we do not instantiate any VolumeAttachment. So if the plugin does not skip attach, WaitForVolumeAttachment @@ -940,11 +929,11 @@ func newTestWatchPlugin(t *testing.T, csiClient *fakecsi.Clientset) (*csiPlugin, t.Fatalf("cannot assert plugin to be type csiPlugin") } - for { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { // Wait until the informer in CSI volume plugin has all CSIDrivers. - if csiPlug.csiDriverInformer.Informer().HasSynced() { - break - } + wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) { + return csiPlug.csiDriverInformer.Informer().HasSynced(), nil + }) } return csiPlug, fakeWatcher, tmpDir, fakeClient diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index b1e8ea958a..c1e59fb7b9 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -238,7 +238,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { } func (c *csiMountMgr) podAttributes() (map[string]string, error) { - if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) { + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { return nil, nil } if c.plugin.csiDriverLister == nil { diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 6d3eb49362..cd40444666 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -32,6 +32,7 @@ import ( storage "k8s.io/api/storage/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" fakeclient "k8s.io/client-go/kubernetes/fake" @@ -95,7 +96,7 @@ func TestMounterGetPath(t *testing.T) { } func MounterSetUpTests(t *testing.T, podInfoEnabled bool) { - defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIPodInfo, podInfoEnabled)() + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, podInfoEnabled)() tests := []struct { name string driver string @@ -154,12 +155,13 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) { plug, tmpDir := newTestPlugin(t, fakeClient, fakeCSIClient) defer os.RemoveAll(tmpDir) - for { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { // Wait until the informer in CSI volume plugin has all CSIDrivers. - if plug.csiDriverInformer.Informer().HasSynced() { - break - } + wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) { + return plug.csiDriverInformer.Informer().HasSynced(), nil + }) } + pv := makeTestPV("test-pv", 10, test.driver, testVol) pv.Spec.CSI.VolumeAttributes = test.attributes pvName := pv.GetName() diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 2dabb89df2..8d021d671c 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions" csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1" csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1" @@ -160,12 +161,21 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { func (p *csiPlugin) Init(host volume.VolumeHost) error { p.host = host - // Initializing csiDrivers map and label management channels - csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} - nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host.GetKubeClient(), host.GetCSIClient()) + kubeClient := host.GetKubeClient() + if kubeClient == nil { + return fmt.Errorf("error getting kube client") + } - csiClient := host.GetCSIClient() - if csiClient != nil { + var csiClient csiclientset.Interface + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) || + utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { + csiClient = host.GetCSIClient() + if csiClient == nil { + return fmt.Errorf("error getting CSI client") + } + } + + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { // Start informer for CSIDrivers. factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod) p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers() @@ -173,6 +183,10 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { go factory.Start(wait.NeverStop) } + // Initializing csiDrivers map and label management channels + csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} + nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), kubeClient, csiClient) + return nil } @@ -514,7 +528,7 @@ func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapP } func (p *csiPlugin) skipAttach(driver string) (bool, error) { - if !utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) { + if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { return false, nil } if p.csiDriverLister == nil { diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index 1a5cc07047..a4d9e9b404 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -27,10 +27,12 @@ import ( "k8s.io/apimachinery/pkg/api/resource" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" fakeclient "k8s.io/client-go/kubernetes/fake" utiltesting "k8s.io/client-go/util/testing" fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" ) @@ -73,11 +75,11 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecs t.Fatalf("cannot assert plugin to be type csiPlugin") } - for { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { // Wait until the informer in CSI volume plugin has all CSIDrivers. - if csiPlug.csiDriverInformer.Informer().HasSynced() { - break - } + wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) { + return csiPlug.csiDriverInformer.Informer().HasSynced(), nil + }) } return csiPlug, tmpDir diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index 6a0b67212e..338333e095 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -28,6 +28,12 @@ import ( "k8s.io/client-go/kubernetes" kstrings "k8s.io/kubernetes/pkg/util/strings" "k8s.io/kubernetes/pkg/volume" + "time" +) + +const ( + testInformerSyncPeriod = 100 * time.Millisecond + testInformerSyncTimeout = 30 * time.Second ) func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) { diff --git a/pkg/volume/csi/nodeinfomanager/BUILD b/pkg/volume/csi/nodeinfomanager/BUILD index c34c45eccb..e83adfed6f 100644 --- a/pkg/volume/csi/nodeinfomanager/BUILD +++ b/pkg/volume/csi/nodeinfomanager/BUILD @@ -51,6 +51,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library", "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library", diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index c17e14dda5..7754dcf28f 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -315,6 +315,10 @@ func (nim *nodeInfoManager) updateCSINodeInfo( driverNodeID string, topology *csipb.Topology) error { + if nim.csiKubeClient == nil { + return fmt.Errorf("CSI client cannot be nil") + } + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { nodeInfo, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{}) if nodeInfo == nil || errors.IsNotFound(err) { diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go index de7cfa073c..a4c70ac56f 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go @@ -18,7 +18,6 @@ package nodeinfomanager import ( "encoding/json" - "fmt" "github.com/container-storage-interface/spec/lib/go/csi/v0" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -26,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" "k8s.io/client-go/kubernetes/fake" csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1" csifake "k8s.io/csi-api/pkg/client/clientset/versioned/fake" @@ -496,9 +496,7 @@ func TestRemoveNodeInfo_CSINodeInfoDisabled(t *testing.T) { } func TestAddNodeInfoExistingAnnotation(t *testing.T) { - csiNodeInfoEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) - utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.CSINodeInfo)) - defer utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, csiNodeInfoEnabled)) + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, true)() driverName := "com.example.csi/driver1" nodeID := "com.example.csi/some-node" @@ -561,9 +559,7 @@ func TestAddNodeInfoExistingAnnotation(t *testing.T) { } func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []testcase) { - wasEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) - utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, csiNodeInfoEnabled)) - defer utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, wasEnabled)) + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, csiNodeInfoEnabled)() for _, tc := range testcases { t.Logf("test case: %q", tc.name) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 939ed861f7..ef96322cad 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -73,7 +73,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "create", "delete", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie()) - if utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csidrivers").RuleOrDie()) } } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index ef90b9b177..bf9c2dfda5 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -159,7 +159,7 @@ func NodeRules() []rbacv1.PolicyRule { if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) { volAttachRule := rbacv1helpers.NewRule("get").Groups(storageGroup).Resources("volumeattachments").RuleOrDie() nodePolicyRules = append(nodePolicyRules, volAttachRule) - if utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) || utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) { + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { csiDriverRule := rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csidrivers").RuleOrDie() nodePolicyRules = append(nodePolicyRules, csiDriverRule) }