mirror of https://github.com/k3s-io/k3s
Consolidated CSIDriver logic under CSIDriverRegistry flag
parent
d472a54777
commit
4ca39ef0ed
|
@ -391,16 +391,6 @@ const (
|
|||
//
|
||||
// Allow TTL controller to clean up Pods and Jobs after they finish.
|
||||
TTLAfterFinished utilfeature.Feature = "TTLAfterFinished"
|
||||
|
||||
// owner: @jsafrane
|
||||
// Kubernetes skips attaching CSI volumes that don't require attachment.
|
||||
//
|
||||
CSISkipAttach utilfeature.Feature = "CSISkipAttach"
|
||||
|
||||
// owner: @jsafrane
|
||||
//
|
||||
// Kubelet sends pod information in NodePublish CSI call when a CSI driver wants so.
|
||||
CSIPodInfo utilfeature.Feature = "CSIPodInfo"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -467,8 +457,6 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
|
|||
VolumeSnapshotDataSource: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
ProcMountType: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
TTLAfterFinished: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
CSISkipAttach: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
CSIPodInfo: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
|
||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||
// unintentionally on either side:
|
||||
|
|
|
@ -27,6 +27,7 @@ go_library(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library",
|
||||
|
@ -59,6 +60,7 @@ go_test(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
|
||||
|
|
|
@ -30,13 +30,16 @@ import (
|
|||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||
)
|
||||
|
@ -202,14 +205,7 @@ func TestAttacherAttach(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAttacherWithCSIDriver(t *testing.T) {
|
||||
originalFeatures := utilfeature.DefaultFeatureGate.DeepCopy()
|
||||
defer func() {
|
||||
utilfeature.DefaultFeatureGate = originalFeatures
|
||||
}()
|
||||
err := utilfeature.DefaultFeatureGate.Set("CSISkipAttach=true")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set CSISkipAttach=true: %s", err)
|
||||
}
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -277,14 +273,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) {
|
||||
originalFeatures := utilfeature.DefaultFeatureGate.DeepCopy()
|
||||
defer func() {
|
||||
utilfeature.DefaultFeatureGate = originalFeatures
|
||||
}()
|
||||
err := utilfeature.DefaultFeatureGate.Set("CSISkipAttach=true")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set CSISkipAttach=true: %s", err)
|
||||
}
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)()
|
||||
|
||||
// In order to detect if the volume plugin would skip WaitForAttach for non-attachable drivers,
|
||||
// we do not instantiate any VolumeAttachment. So if the plugin does not skip attach, WaitForVolumeAttachment
|
||||
|
@ -940,11 +929,11 @@ func newTestWatchPlugin(t *testing.T, csiClient *fakecsi.Clientset) (*csiPlugin,
|
|||
t.Fatalf("cannot assert plugin to be type csiPlugin")
|
||||
}
|
||||
|
||||
for {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||
// Wait until the informer in CSI volume plugin has all CSIDrivers.
|
||||
if csiPlug.csiDriverInformer.Informer().HasSynced() {
|
||||
break
|
||||
}
|
||||
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
|
||||
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
|
||||
})
|
||||
}
|
||||
|
||||
return csiPlug, fakeWatcher, tmpDir, fakeClient
|
||||
|
|
|
@ -238,7 +238,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
|||
}
|
||||
|
||||
func (c *csiMountMgr) podAttributes() (map[string]string, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||
return nil, nil
|
||||
}
|
||||
if c.plugin.csiDriverLister == nil {
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
storage "k8s.io/api/storage/v1beta1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
|
||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||
|
@ -95,7 +96,7 @@ func TestMounterGetPath(t *testing.T) {
|
|||
}
|
||||
|
||||
func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIPodInfo, podInfoEnabled)()
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, podInfoEnabled)()
|
||||
tests := []struct {
|
||||
name string
|
||||
driver string
|
||||
|
@ -154,12 +155,13 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
|
|||
plug, tmpDir := newTestPlugin(t, fakeClient, fakeCSIClient)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
for {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||
// Wait until the informer in CSI volume plugin has all CSIDrivers.
|
||||
if plug.csiDriverInformer.Informer().HasSynced() {
|
||||
break
|
||||
}
|
||||
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
|
||||
return plug.csiDriverInformer.Informer().HasSynced(), nil
|
||||
})
|
||||
}
|
||||
|
||||
pv := makeTestPV("test-pv", 10, test.driver, testVol)
|
||||
pv.Spec.CSI.VolumeAttributes = test.attributes
|
||||
pvName := pv.GetName()
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
|
||||
csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1"
|
||||
csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
|
||||
|
@ -160,12 +161,21 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
|
|||
func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
||||
p.host = host
|
||||
|
||||
// Initializing csiDrivers map and label management channels
|
||||
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
|
||||
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host.GetKubeClient(), host.GetCSIClient())
|
||||
kubeClient := host.GetKubeClient()
|
||||
if kubeClient == nil {
|
||||
return fmt.Errorf("error getting kube client")
|
||||
}
|
||||
|
||||
csiClient := host.GetCSIClient()
|
||||
if csiClient != nil {
|
||||
var csiClient csiclientset.Interface
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) ||
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
|
||||
csiClient = host.GetCSIClient()
|
||||
if csiClient == nil {
|
||||
return fmt.Errorf("error getting CSI client")
|
||||
}
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||
// Start informer for CSIDrivers.
|
||||
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
|
||||
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
|
||||
|
@ -173,6 +183,10 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
|||
go factory.Start(wait.NeverStop)
|
||||
}
|
||||
|
||||
// Initializing csiDrivers map and label management channels
|
||||
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
|
||||
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), kubeClient, csiClient)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -514,7 +528,7 @@ func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapP
|
|||
}
|
||||
|
||||
func (p *csiPlugin) skipAttach(driver string) (bool, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||
return false, nil
|
||||
}
|
||||
if p.csiDriverLister == nil {
|
||||
|
|
|
@ -27,10 +27,12 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||
)
|
||||
|
@ -73,11 +75,11 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecs
|
|||
t.Fatalf("cannot assert plugin to be type csiPlugin")
|
||||
}
|
||||
|
||||
for {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||
// Wait until the informer in CSI volume plugin has all CSIDrivers.
|
||||
if csiPlug.csiDriverInformer.Informer().HasSynced() {
|
||||
break
|
||||
}
|
||||
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
|
||||
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
|
||||
})
|
||||
}
|
||||
|
||||
return csiPlug, tmpDir
|
||||
|
|
|
@ -28,6 +28,12 @@ import (
|
|||
"k8s.io/client-go/kubernetes"
|
||||
kstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
testInformerSyncPeriod = 100 * time.Millisecond
|
||||
testInformerSyncTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) {
|
||||
|
|
|
@ -51,6 +51,7 @@ go_test(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",
|
||||
|
|
|
@ -315,6 +315,10 @@ func (nim *nodeInfoManager) updateCSINodeInfo(
|
|||
driverNodeID string,
|
||||
topology *csipb.Topology) error {
|
||||
|
||||
if nim.csiKubeClient == nil {
|
||||
return fmt.Errorf("CSI client cannot be nil")
|
||||
}
|
||||
|
||||
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
nodeInfo, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
|
||||
if nodeInfo == nil || errors.IsNotFound(err) {
|
||||
|
|
|
@ -18,7 +18,6 @@ package nodeinfomanager
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/container-storage-interface/spec/lib/go/csi/v0"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
|
@ -26,6 +25,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
|
||||
csifake "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
|
||||
|
@ -496,9 +496,7 @@ func TestRemoveNodeInfo_CSINodeInfoDisabled(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAddNodeInfoExistingAnnotation(t *testing.T) {
|
||||
csiNodeInfoEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo)
|
||||
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.CSINodeInfo))
|
||||
defer utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, csiNodeInfoEnabled))
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, true)()
|
||||
|
||||
driverName := "com.example.csi/driver1"
|
||||
nodeID := "com.example.csi/some-node"
|
||||
|
@ -561,9 +559,7 @@ func TestAddNodeInfoExistingAnnotation(t *testing.T) {
|
|||
}
|
||||
|
||||
func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []testcase) {
|
||||
wasEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo)
|
||||
utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, csiNodeInfoEnabled))
|
||||
defer utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%t", features.CSINodeInfo, wasEnabled))
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSINodeInfo, csiNodeInfoEnabled)()
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Logf("test case: %q", tc.name)
|
||||
|
|
|
@ -73,7 +73,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
|
|||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
|
||||
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "create", "delete", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie())
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csidrivers").RuleOrDie())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -159,7 +159,7 @@ func NodeRules() []rbacv1.PolicyRule {
|
|||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIPersistentVolume) {
|
||||
volAttachRule := rbacv1helpers.NewRule("get").Groups(storageGroup).Resources("volumeattachments").RuleOrDie()
|
||||
nodePolicyRules = append(nodePolicyRules, volAttachRule)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) || utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
|
||||
csiDriverRule := rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csidrivers").RuleOrDie()
|
||||
nodePolicyRules = append(nodePolicyRules, csiDriverRule)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue