CSINodeInfo/CSIDriver controller changes

This is the 2nd PR to move CSINodeInfo/CSIDriver APIs to
v1beta1 core storage APIs. It includes controller side changes.
It depends on the PR with API changes:
https://github.com/kubernetes/kubernetes/pull/73883
pull/564/head
Xing Yang 2019-02-19 21:44:17 -08:00
parent 3260d3a7b5
commit 6265f4f78c
35 changed files with 292 additions and 754 deletions

View File

@ -134,7 +134,6 @@ go_library(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/component-base/cli/globalflag:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1:go_default_library",
"//staging/src/k8s.io/metrics/pkg/client/custom_metrics:go_default_library",
"//staging/src/k8s.io/metrics/pkg/client/external_metrics:go_default_library",

View File

@ -37,7 +37,6 @@ import (
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/kubernetes/pkg/controller"
cloudcontroller "k8s.io/kubernetes/pkg/controller/cloud"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
@ -216,14 +215,10 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
return nil, true, fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period")
}
csiClientConfig := ctx.ClientBuilder.ConfigOrDie("attachdetach-controller")
// csiClient works with CRDs that support json only
csiClientConfig.ContentType = "application/json"
attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController(
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
csiclientset.NewForConfigOrDie(csiClientConfig),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),

View File

@ -136,7 +136,6 @@ go_library(
"//staging/src/k8s.io/client-go/util/keyutil:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/kubelet/config/v1beta1:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/coreos/go-systemd/daemon:go_default_library",

View File

@ -57,7 +57,6 @@ import (
"k8s.io/client-go/util/keyutil"
cloudprovider "k8s.io/cloud-provider"
cliflag "k8s.io/component-base/cli/flag"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
@ -400,7 +399,6 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err
DockerClientConfig: dockerClientConfig,
KubeClient: nil,
HeartbeatClient: nil,
CSIClient: nil,
EventClient: nil,
Mounter: mounter,
Subpather: subpather,
@ -598,10 +596,6 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
// CRDs are JSON only, and client renegotiation for streaming is not correct as per #67803
crdClientConfig := restclient.CopyConfig(clientConfig)
crdClientConfig.ContentType = "application/json"
kubeDeps.CSIClient, err = csiclientset.NewForConfig(crdClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet storage client: %v", err)
}
kubeDeps.NodeAPIClient, err = nodeapiclientset.NewForConfig(crdClientConfig)
if err != nil {

View File

@ -40,7 +40,6 @@ go_library(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -39,7 +39,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
@ -98,7 +97,6 @@ type AttachDetachController interface {
// NewAttachDetachController returns a new instance of AttachDetachController.
func NewAttachDetachController(
kubeClient clientset.Interface,
csiClient csiclient.Interface,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
@ -125,7 +123,6 @@ func NewAttachDetachController(
// deleted (probably can't do this with sharedInformer), etc.
adc := &attachDetachController{
kubeClient: kubeClient,
csiClient: csiClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
@ -241,10 +238,6 @@ type attachDetachController struct {
// the API server.
kubeClient clientset.Interface
// csiClient is the csi.storage.k8s.io API client used by volumehost to communicate with
// the API server.
csiClient csiclient.Interface
// pvcLister is the shared PVC lister used to fetch and store PVC
// objects from the API server. It is shared with other controllers and
// therefore the PVC objects in its store should be treated as immutable.
@ -766,10 +759,6 @@ func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
return adc.recorder
}
func (adc *attachDetachController) GetCSIClient() csiclient.Interface {
return adc.csiClient
}
func (adc *attachDetachController) GetSubpather() subpath.Interface {
// Subpaths not needed in attachdetach controller
return nil

View File

@ -40,7 +40,6 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
// Act
_, err := NewAttachDetachController(
fakeKubeClient,
nil, /* csiClient */
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
@ -215,7 +214,6 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
// Create the controller
adcObj, err := NewAttachDetachController(
fakeKubeClient,
nil, /* csiClient */
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumeClaims(),

View File

@ -36,7 +36,6 @@ go_library(
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -38,7 +38,6 @@ import (
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/expand/cache"
@ -336,11 +335,6 @@ func (expc *expandController) GetEventRecorder() record.EventRecorder {
return expc.recorder
}
func (expc *expandController) GetCSIClient() csiclientset.Interface {
// No volume plugin in expand controller needs csi.storage.k8s.io
return nil
}
func (expc *expandController) GetSubpather() subpath.Interface {
// not needed for expand controller
return nil

View File

@ -61,7 +61,6 @@ go_library(
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -26,7 +26,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/mount"
vol "k8s.io/kubernetes/pkg/volume"
@ -133,11 +132,6 @@ func (ctrl *PersistentVolumeController) GetEventRecorder() record.EventRecorder
return ctrl.eventRecorder
}
func (ctrl *PersistentVolumeController) GetCSIClient() csiclientset.Interface {
// No volume plugin needs csi.storage.k8s.io client in PV controller.
return nil
}
func (ctrl *PersistentVolumeController) GetSubpather() subpath.Interface {
// No volume plugin needs Subpaths in PV controller.
return nil

View File

@ -195,12 +195,14 @@ const (
// owner: @saad-ali
// alpha: v1.12
// Enable all logic related to the CSIDriver API object in csi.storage.k8s.io
// beta: v1.14
// Enable all logic related to the CSIDriver API object in storage.k8s.io
CSIDriverRegistry utilfeature.Feature = "CSIDriverRegistry"
// owner: @verult
// alpha: v1.12
// Enable all logic related to the CSINodeInfo API object in csi.storage.k8s.io
// beta: v1.14
// Enable all logic related to the CSINode API object in storage.k8s.io
CSINodeInfo utilfeature.Feature = "CSINodeInfo"
// owner @MrHohn
@ -448,8 +450,8 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
MountContainers: {Default: false, PreRelease: utilfeature.Alpha},
VolumeScheduling: {Default: true, PreRelease: utilfeature.GA, LockToDefault: true}, // remove in 1.16
CSIPersistentVolume: {Default: true, PreRelease: utilfeature.GA},
CSIDriverRegistry: {Default: false, PreRelease: utilfeature.Alpha},
CSINodeInfo: {Default: false, PreRelease: utilfeature.Alpha},
CSIDriverRegistry: {Default: true, PreRelease: utilfeature.Beta},
CSINodeInfo: {Default: true, PreRelease: utilfeature.Beta},
CustomPodDNS: {Default: true, PreRelease: utilfeature.GA, LockToDefault: true}, // remove in 1.16
BlockVolume: {Default: true, PreRelease: utilfeature.Beta},
StorageObjectInUseProtection: {Default: true, PreRelease: utilfeature.GA},

View File

@ -140,7 +140,6 @@ go_library(
"//staging/src/k8s.io/client-go/util/certificate:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/clientset/versioned:go_default_library",
"//third_party/forked/golang/expansion:go_default_library",
"//vendor/github.com/golang/groupcache/lru:go_default_library",

View File

@ -51,7 +51,6 @@ import (
"k8s.io/client-go/util/certificate"
"k8s.io/client-go/util/flowcontrol"
cloudprovider "k8s.io/cloud-provider"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/klog"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
@ -249,7 +248,6 @@ type Dependencies struct {
HeartbeatClient clientset.Interface
OnHeartbeatFailure func()
KubeClient clientset.Interface
CSIClient csiclientset.Interface
NodeAPIClient nodeapiclientset.Interface
Mounter mount.Interface
OOMAdjuster *oom.OOMAdjuster
@ -493,7 +491,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
hostnameOverridden: len(hostnameOverride) > 0,
nodeName: nodeName,
kubeClient: kubeDeps.KubeClient,
csiClient: kubeDeps.CSIClient,
heartbeatClient: kubeDeps.HeartbeatClient,
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
rootDirectory: rootDirectory,
@ -898,7 +895,6 @@ type Kubelet struct {
nodeName types.NodeName
runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface
csiClient csiclientset.Interface
heartbeatClient clientset.Interface
iptClient utilipt.Interface
rootDirectory string

View File

@ -30,7 +30,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/configmap"
"k8s.io/kubernetes/pkg/kubelet/container"
@ -123,10 +122,6 @@ func (kvh *kubeletVolumeHost) GetKubeClient() clientset.Interface {
return kvh.kubelet.kubeClient
}
func (kvh *kubeletVolumeHost) GetCSIClient() csiclientset.Interface {
return kvh.kubelet.csiClient
}
func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
return kvh.kubelet.subpather
}

View File

@ -32,7 +32,6 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -28,10 +28,10 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
@ -58,6 +58,7 @@ go_test(
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -71,8 +72,6 @@ go_test(
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -37,7 +37,6 @@ import (
fakeclient "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
utiltesting "k8s.io/client-go/util/testing"
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
@ -236,12 +235,12 @@ func TestAttacherWithCSIDriver(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeCSIClient := fakecsi.NewSimpleClientset(
fakeClient := fakeclient.NewSimpleClientset(
getCSIDriver("not-attachable", nil, &bFalse),
getCSIDriver("attachable", nil, &bTrue),
getCSIDriver("nil", nil, nil),
)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, fakeCSIClient)
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@ -274,10 +273,10 @@ func TestAttacherWithCSIDriver(t *testing.T) {
t.Errorf("Attach() failed: %s", err)
}
if test.expectVolumeAttachment && attachID == "" {
t.Errorf("Epected attachID, got nothing")
t.Errorf("Expected attachID, got nothing")
}
if !test.expectVolumeAttachment && attachID != "" {
t.Errorf("Epected empty attachID, got %q", attachID)
t.Errorf("Expected empty attachID, got %q", attachID)
}
})
}
@ -318,12 +317,12 @@ func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeCSIClient := fakecsi.NewSimpleClientset(
fakeClient := fakeclient.NewSimpleClientset(
getCSIDriver("not-attachable", nil, &bFalse),
getCSIDriver("attachable", nil, &bTrue),
getCSIDriver("nil", nil, nil),
)
plug, tmpDir := newTestPlugin(t, nil, fakeCSIClient)
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@ -519,7 +518,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
}
func TestAttacherVolumesAreAttached(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@ -981,23 +980,20 @@ func TestAttacherUnmountDevice(t *testing.T) {
}
// create a plugin mgr to load plugins and setup a fake client
func newTestWatchPlugin(t *testing.T, csiClient *fakecsi.Clientset) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
fakeClient := fakeclient.NewSimpleClientset()
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil))
fakeClient.Fake.WatchReactionChain = fakeClient.Fake.WatchReactionChain[:1]
if csiClient == nil {
csiClient = fakecsi.NewSimpleClientset()
if fakeClient == nil {
fakeClient = fakeclient.NewSimpleClientset()
}
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil))
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
csiClient,
nil,
"node",
)

View File

@ -52,7 +52,7 @@ func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T
func TestBlockMapperGetGlobalMapPath(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
// TODO (vladimirvivien) specName with slashes will not work
@ -93,7 +93,7 @@ func TestBlockMapperGetGlobalMapPath(t *testing.T) {
func TestBlockMapperGetStagingPath(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
@ -130,7 +130,7 @@ func TestBlockMapperGetStagingPath(t *testing.T) {
func TestBlockMapperGetPublishPath(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
@ -167,7 +167,7 @@ func TestBlockMapperGetPublishPath(t *testing.T) {
func TestBlockMapperGetDeviceMapPath(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
@ -208,14 +208,13 @@ func TestBlockMapperGetDeviceMapPath(t *testing.T) {
func TestBlockMapperSetupDevice(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
nil,
"fakeNode",
)
plug.host = host
@ -275,14 +274,13 @@ func TestBlockMapperSetupDevice(t *testing.T) {
func TestBlockMapperMapDevice(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
nil,
"fakeNode",
)
plug.host = host
@ -358,14 +356,13 @@ func TestBlockMapperMapDevice(t *testing.T) {
func TestBlockMapperTearDownDevice(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
nil,
"fakeNode",
)
plug.host = host

View File

@ -50,7 +50,6 @@ var (
"nodeName",
"attachmentID",
}
currentPodInfoMountVersion = "v1"
)
type csiMountMgr struct {
@ -247,8 +246,8 @@ func (c *csiMountMgr) podAttributes() (map[string]string, error) {
return nil, err
}
// if PodInfoOnMountVersion is not set or not v1 we do not set pod attributes
if csiDriver.Spec.PodInfoOnMountVersion == nil || *csiDriver.Spec.PodInfoOnMountVersion != currentPodInfoMountVersion {
// if PodInfoOnMount is not set or false we do not set pod attributes
if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false {
klog.V(4).Infof(log("CSIDriver %q does not require pod information", c.driverName))
return nil, nil
}

View File

@ -29,14 +29,13 @@ import (
api "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
fakeclient "k8s.io/client-go/kubernetes/fake"
csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
@ -53,7 +52,7 @@ var (
)
func TestMounterGetPath(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
// TODO (vladimirvivien) specName with slashes will not work
@ -142,17 +141,17 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
},
}
emptyPodMountInfoVersion := ""
noPodMountInfo := false
currentPodInfoMount := true
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
klog.Infof("Starting test %s", test.name)
fakeClient := fakeclient.NewSimpleClientset()
fakeCSIClient := fakecsi.NewSimpleClientset(
getCSIDriver("no-info", &emptyPodMountInfoVersion, nil),
getCSIDriver("info", &currentPodInfoMountVersion, nil),
fakeClient := fakeclient.NewSimpleClientset(
getCSIDriver("no-info", &noPodMountInfo, nil),
getCSIDriver("info", &currentPodInfoMount, nil),
getCSIDriver("nil", nil, nil),
)
plug, tmpDir := newTestPlugin(t, fakeClient, fakeCSIClient)
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
@ -269,7 +268,7 @@ func TestMounterSetUp(t *testing.T) {
}
func TestMounterSetUpWithFSGroup(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient, nil)
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
testCases := []struct {
@ -393,7 +392,7 @@ func TestMounterSetUpWithFSGroup(t *testing.T) {
}
func TestUnmounterTeardown(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, testDriver, testVol)
@ -443,7 +442,7 @@ func TestUnmounterTeardown(t *testing.T) {
}
func TestSaveVolumeData(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
@ -490,13 +489,13 @@ func TestSaveVolumeData(t *testing.T) {
}
}
func getCSIDriver(name string, podInfoMountVersion *string, attachable *bool) *csiapi.CSIDriver {
return &csiapi.CSIDriver{
func getCSIDriver(name string, podInfoMount *bool, attachable *bool) *storagev1beta1.CSIDriver {
return &storagev1beta1.CSIDriver{
ObjectMeta: meta.ObjectMeta{
Name: name,
},
Spec: csiapi.CSIDriverSpec{
PodInfoOnMountVersion: podInfoMountVersion,
Spec: storagev1beta1.CSIDriverSpec{
PodInfoOnMount: podInfoMount,
AttachRequired: attachable,
},
}

View File

@ -36,10 +36,10 @@ import (
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
csiapiinformer "k8s.io/client-go/informers"
csiinformer "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes"
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1"
csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
csilister "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
@ -205,17 +205,16 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
p.host = host
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
csiClient := host.GetCSIClient()
csiClient := host.GetKubeClient()
if csiClient == nil {
klog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization")
} else {
return errors.New("unable to get Kubernetes client")
}
// Start informer for CSIDrivers.
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
p.csiDriverInformer = factory.Storage().V1beta1().CSIDrivers()
p.csiDriverLister = p.csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
}
}
// Initializing the label management channels
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)

View File

@ -32,14 +32,13 @@ import (
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
fakeclient "k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
// create a plugin mgr to load plugins and setup a fake client
func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecsi.Clientset) (*csiPlugin, string) {
func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, string) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
@ -48,13 +47,9 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecs
if client == nil {
client = fakeclient.NewSimpleClientset()
}
if csiClient == nil {
csiClient = fakecsi.NewSimpleClientset()
}
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
client,
csiClient,
nil,
"fakeNode",
)
@ -120,7 +115,7 @@ func registerFakePlugin(pluginName, endpoint string, versions []string, t *testi
func TestPluginGetPluginName(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
if plug.GetPluginName() != "kubernetes.io/csi" {
t.Errorf("unexpected plugin name %v", plug.GetPluginName())
@ -130,7 +125,7 @@ func TestPluginGetPluginName(t *testing.T) {
func TestPluginGetVolumeName(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
name string
@ -162,7 +157,7 @@ func TestPluginGetVolumeName(t *testing.T) {
func TestPluginCanSupport(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
@ -177,7 +172,7 @@ func TestPluginCanSupport(t *testing.T) {
func TestPluginConstructVolumeSpec(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {
@ -239,7 +234,7 @@ func TestPluginConstructVolumeSpec(t *testing.T) {
func TestPluginNewMounter(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.2.0"}, t)
@ -290,7 +285,7 @@ func TestPluginNewMounter(t *testing.T) {
func TestPluginNewUnmounter(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
@ -338,7 +333,7 @@ func TestPluginNewUnmounter(t *testing.T) {
func TestPluginNewAttacher(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@ -358,7 +353,7 @@ func TestPluginNewAttacher(t *testing.T) {
func TestPluginNewDetacher(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
detacher, err := plug.NewDetacher()
@ -389,10 +384,8 @@ func TestPluginCanAttach(t *testing.T) {
}
for _, test := range tests {
csiDriver := getCSIDriver(test.driverName, nil, &test.canAttach)
t.Run(test.name, func(t *testing.T) {
fakeCSIClient := fakecsi.NewSimpleClientset(csiDriver)
plug, tmpDir := newTestPlugin(t, nil, fakeCSIClient)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driverName, "test-vol"), false)
@ -408,7 +401,7 @@ func TestPluginCanAttach(t *testing.T) {
func TestPluginNewBlockMapper(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
@ -456,7 +449,7 @@ func TestPluginNewBlockMapper(t *testing.T) {
func TestPluginNewUnmapper(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
@ -516,7 +509,7 @@ func TestPluginNewUnmapper(t *testing.T) {
func TestPluginConstructBlockVolumeSpec(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
plug, tmpDir := newTestPlugin(t, nil, nil)
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)
testCases := []struct {

View File

@ -11,6 +11,7 @@ go_library(
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -19,8 +20,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -50,6 +50,8 @@ go_test(
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
@ -60,8 +62,6 @@ go_test(
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)

View File

@ -21,11 +21,11 @@ package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomana
import (
"encoding/json"
"fmt"
"strings"
"time"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -34,8 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -59,7 +58,7 @@ var (
)
// nodeInfoManager contains necessary common dependencies to update node info on both
// the Node and CSINodeInfo objects.
// the Node and CSINode objects.
type nodeInfoManager struct {
nodeName types.NodeName
volumeHost volume.VolumeHost
@ -70,7 +69,7 @@ type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
// Interface implements an interface for managing labels of a node
type Interface interface {
CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error)
CreateCSINode() (*storage.CSINode, error)
// Record in the cluster the given node information from the CSI driver with the given name.
// Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls
@ -94,9 +93,9 @@ func NewNodeInfoManager(
}
// InstallCSIDriver updates the node ID annotation in the Node object and CSIDrivers field in the
// CSINodeInfo object. If the CSINodeInfo object doesn't yet exist, it will be created.
// CSINode object. If the CSINode object doesn't yet exist, it will be created.
// If multiple calls to InstallCSIDriver() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
// CSINode update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID string, maxAttachLimit int64, topology map[string]string) error {
if driverNodeID == "" {
return fmt.Errorf("error adding CSI driver node info: driverNodeID must not be empty")
@ -120,23 +119,23 @@ func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID str
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
err = nim.updateCSINodeInfo(driverName, driverNodeID, topology)
err = nim.updateCSINode(driverName, driverNodeID, topology)
if err != nil {
return fmt.Errorf("error updating CSINodeInfo object with CSI driver node info: %v", err)
return fmt.Errorf("error updating CSINode object with CSI driver node info: %v", err)
}
}
return nil
}
// UninstallCSIDriver removes the node ID annotation from the Node object and CSIDrivers field from the
// CSINodeInfo object. If the CSINOdeInfo object contains no CSIDrivers, it will be deleted.
// CSINode object. If the CSINOdeInfo object contains no CSIDrivers, it will be deleted.
// If multiple calls to UninstallCSIDriver() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
// CSINode update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error {
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
err := nim.uninstallDriverFromCSINodeInfo(driverName)
err := nim.uninstallDriverFromCSINode(driverName)
if err != nil {
return fmt.Errorf("error uninstalling CSI driver from CSINodeInfo object %v", err)
return fmt.Errorf("error uninstalling CSI driver from CSINode object %v", err)
}
}
@ -344,55 +343,55 @@ func updateTopologyLabels(topology map[string]string) nodeUpdateFunc {
}
}
func (nim *nodeInfoManager) updateCSINodeInfo(
func (nim *nodeInfoManager) updateCSINode(
driverName string,
driverNodeID string,
topology map[string]string) error {
csiKubeClient := nim.volumeHost.GetCSIClient()
csiKubeClient := nim.volumeHost.GetKubeClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
var updateErrs []error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryUpdateCSINodeInfo(csiKubeClient, driverName, driverNodeID, topology); err != nil {
if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, topology); err != nil {
updateErrs = append(updateErrs, err)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("error updating CSINodeInfo: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
}
return nil
}
func (nim *nodeInfoManager) tryUpdateCSINodeInfo(
csiKubeClient csiclientset.Interface,
func (nim *nodeInfoManager) tryUpdateCSINode(
csiKubeClient clientset.Interface,
driverName string,
driverNodeID string,
topology map[string]string) error {
nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
nodeInfo, err := csiKubeClient.StorageV1beta1().CSINodes().Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
nodeInfo, err = nim.CreateCSINodeInfo()
nodeInfo, err = nim.CreateCSINode()
}
if err != nil {
return err
}
return nim.installDriverToCSINodeInfo(nodeInfo, driverName, driverNodeID, topology)
return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, topology)
}
func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error) {
func (nim *nodeInfoManager) CreateCSINode() (*storage.CSINode, error) {
kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil {
return nil, fmt.Errorf("error getting kube client")
}
csiKubeClient := nim.volumeHost.GetCSIClient()
csiKubeClient := nim.volumeHost.GetKubeClient()
if csiKubeClient == nil {
return nil, fmt.Errorf("error getting CSI client")
}
@ -402,7 +401,7 @@ func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error
return nil, err
}
nodeInfo := &csiv1alpha1.CSINodeInfo{
nodeInfo := &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: string(nim.nodeName),
OwnerReferences: []metav1.OwnerReference{
@ -414,24 +413,21 @@ func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error
},
},
},
Spec: csiv1alpha1.CSINodeInfoSpec{
Drivers: []csiv1alpha1.CSIDriverInfoSpec{},
},
Status: csiv1alpha1.CSINodeInfoStatus{
Drivers: []csiv1alpha1.CSIDriverInfoStatus{},
Spec: storage.CSINodeSpec{
Drivers: []storage.CSINodeDriver{},
},
}
return csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
return csiKubeClient.StorageV1beta1().CSINodes().Create(nodeInfo)
}
func (nim *nodeInfoManager) installDriverToCSINodeInfo(
nodeInfo *csiv1alpha1.CSINodeInfo,
func (nim *nodeInfoManager) installDriverToCSINode(
nodeInfo *storage.CSINode,
driverName string,
driverNodeID string,
topology map[string]string) error {
csiKubeClient := nim.volumeHost.GetCSIClient()
csiKubeClient := nim.volumeHost.GetKubeClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
@ -444,7 +440,7 @@ func (nim *nodeInfoManager) installDriverToCSINodeInfo(
specModified := true
statusModified := true
// Clone driver list, omitting the driver that matches the given driverName
newDriverSpecs := []csiv1alpha1.CSIDriverInfoSpec{}
newDriverSpecs := []storage.CSINodeDriver{}
for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
if driverInfoSpec.Name == driverName {
if driverInfoSpec.NodeID == driverNodeID &&
@ -456,106 +452,80 @@ func (nim *nodeInfoManager) installDriverToCSINodeInfo(
newDriverSpecs = append(newDriverSpecs, driverInfoSpec)
}
}
newDriverStatuses := []csiv1alpha1.CSIDriverInfoStatus{}
for _, driverInfoStatus := range nodeInfo.Status.Drivers {
if driverInfoStatus.Name == driverName {
if driverInfoStatus.Available &&
/* TODO(https://github.com/kubernetes/enhancements/issues/625): Add actual migration status */
driverInfoStatus.VolumePluginMechanism == csiv1alpha1.VolumePluginMechanismInTree {
statusModified = false
}
} else {
// Omit driverInfoSpec matching given driverName
newDriverStatuses = append(newDriverStatuses, driverInfoStatus)
}
}
if !specModified && !statusModified {
return nil
}
// Append new driver
driverSpec := csiv1alpha1.CSIDriverInfoSpec{
driverSpec := storage.CSINodeDriver{
Name: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
}
driverStatus := csiv1alpha1.CSIDriverInfoStatus{
Name: driverName,
Available: true,
// TODO(https://github.com/kubernetes/enhancements/issues/625): Add actual migration status
VolumePluginMechanism: csiv1alpha1.VolumePluginMechanismInTree,
}
newDriverSpecs = append(newDriverSpecs, driverSpec)
newDriverStatuses = append(newDriverStatuses, driverStatus)
nodeInfo.Spec.Drivers = newDriverSpecs
nodeInfo.Status.Drivers = newDriverStatuses
err := validateCSINodeInfo(nodeInfo)
if err != nil {
return err
}
_, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
_, err := csiKubeClient.StorageV1beta1().CSINodes().Update(nodeInfo)
return err
}
func (nim *nodeInfoManager) uninstallDriverFromCSINodeInfo(
func (nim *nodeInfoManager) uninstallDriverFromCSINode(
csiDriverName string) error {
csiKubeClient := nim.volumeHost.GetCSIClient()
csiKubeClient := nim.volumeHost.GetKubeClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
var updateErrs []error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryUninstallDriverFromCSINodeInfo(csiKubeClient, csiDriverName); err != nil {
if err := nim.tryUninstallDriverFromCSINode(csiKubeClient, csiDriverName); err != nil {
updateErrs = append(updateErrs, err)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("error updating CSINodeInfo: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
}
return nil
}
func (nim *nodeInfoManager) tryUninstallDriverFromCSINodeInfo(
csiKubeClient csiclientset.Interface,
func (nim *nodeInfoManager) tryUninstallDriverFromCSINode(
csiKubeClient clientset.Interface,
csiDriverName string) error {
nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos()
nodeInfoClient := csiKubeClient.StorageV1beta1().CSINodes()
nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
if err != nil && errors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}
hasModified := false
newDriverStatuses := []csiv1alpha1.CSIDriverInfoStatus{}
for _, driverStatus := range nodeInfo.Status.Drivers {
if driverStatus.Name == csiDriverName {
// Uninstall the driver if we find it
hasModified = driverStatus.Available
driverStatus.Available = false
// Uninstall CSINodeDriver with name csiDriverName
drivers := nodeInfo.Spec.Drivers[:0]
for _, driver := range nodeInfo.Spec.Drivers {
if driver.Name != csiDriverName {
drivers = append(drivers, driver)
} else {
// Found a driver with name csiDriverName
// Set hasModified to true because it will be removed
hasModified = true
}
newDriverStatuses = append(newDriverStatuses, driverStatus)
}
nodeInfo.Status.Drivers = newDriverStatuses
if !hasModified {
// No changes, don't update
return nil
}
nodeInfo.Spec.Drivers = drivers
err = validateCSINodeInfo(nodeInfo)
if err != nil {
return err
}
_, updateErr := nodeInfoClient.Update(nodeInfo)
return updateErr // do not wrap error
_, err = nodeInfoClient.Update(nodeInfo)
return err // do not wrap error
}
@ -611,50 +581,3 @@ func removeMaxAttachLimit(driverName string) nodeUpdateFunc {
return node, true, nil
}
}
// validateCSINodeInfo ensures members of CSINodeInfo object satisfies map and set semantics.
// Before calling CSINodeInfoInterface.Update(), validateCSINodeInfo() should be invoked to
// make sure the CSINodeInfo is compliant
func validateCSINodeInfo(nodeInfo *csiv1alpha1.CSINodeInfo) error {
if len(nodeInfo.Status.Drivers) < 1 {
return fmt.Errorf("at least one Driver entry is required in driver statuses")
}
if len(nodeInfo.Spec.Drivers) < 1 {
return fmt.Errorf("at least one Driver entry is required in driver specs")
}
if len(nodeInfo.Status.Drivers) != len(nodeInfo.Spec.Drivers) {
return fmt.Errorf("")
}
// check for duplicate entries for the same driver in statuses
var errors []string
driverNamesInStatuses := make(sets.String)
for _, driverInfo := range nodeInfo.Status.Drivers {
if driverNamesInStatuses.Has(driverInfo.Name) {
errors = append(errors, fmt.Sprintf("duplicate entries found for driver: %s in driver statuses", driverInfo.Name))
}
driverNamesInStatuses.Insert(driverInfo.Name)
}
// check for duplicate entries for the same driver in specs
driverNamesInSpecs := make(sets.String)
for _, driverInfo := range nodeInfo.Spec.Drivers {
if driverNamesInSpecs.Has(driverInfo.Name) {
errors = append(errors, fmt.Sprintf("duplicate entries found for driver: %s in driver specs", driverInfo.Name))
}
driverNamesInSpecs.Insert(driverInfo.Name)
topoKeys := make(sets.String)
for _, key := range driverInfo.TopologyKeys {
if topoKeys.Has(key) {
errors = append(errors, fmt.Sprintf("duplicate topology keys %s found for driver %s in driver specs", key, driverInfo.Name))
}
topoKeys.Insert(key)
}
}
// check all entries in specs and status match
if !driverNamesInSpecs.Equal(driverNamesInStatuses) {
errors = append(errors, fmt.Sprintf("list of drivers in specs: %v does not match list of drivers in statuses: %v", driverNamesInSpecs.List(), driverNamesInStatuses.List()))
}
if len(errors) == 0 {
return nil
}
return fmt.Errorf(strings.Join(errors, ", "))
}

File diff suppressed because it is too large Load Diff

View File

@ -32,7 +32,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
@ -318,9 +317,6 @@ type VolumeHost interface {
// GetKubeClient returns a client interface
GetKubeClient() clientset.Interface
// GetCSIClient returns a client interface to csi.storage.k8s.io
GetCSIClient() csiclientset.Interface
// NewWrapperMounter finds an appropriate plugin with which to handle
// the provided spec. This is used to implement volume plugins which
// "wrap" other plugins. For example, the "secret" volume is

View File

@ -29,7 +29,6 @@ go_library(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/stretchr/testify/mock:go_default_library",
"//vendor/k8s.io/utils/strings:go_default_library",
],

View File

@ -37,7 +37,6 @@ import (
"k8s.io/client-go/tools/record"
utiltesting "k8s.io/client-go/util/testing"
cloudprovider "k8s.io/cloud-provider"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/kubernetes/pkg/util/mount"
. "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
@ -65,7 +64,6 @@ const (
type fakeVolumeHost struct {
rootDir string
kubeClient clientset.Interface
csiClient csiclientset.Interface
pluginMgr VolumePluginMgr
cloud cloudprovider.Interface
mounter mount.Interface
@ -89,10 +87,9 @@ func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interf
return volHost
}
func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, csiClient csiclientset.Interface, plugins []VolumePlugin, nodeName string) *fakeVolumeHost {
func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string) *fakeVolumeHost {
volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil)
volHost.nodeName = nodeName
volHost.csiClient = csiClient
return volHost
}
@ -140,10 +137,6 @@ func (f *fakeVolumeHost) GetKubeClient() clientset.Interface {
return f.kubeClient
}
func (f *fakeVolumeHost) GetCSIClient() csiclientset.Interface {
return f.csiClient
}
func (f *fakeVolumeHost) GetCloudProvider() cloudprovider.Interface {
return f.cloud
}

View File

@ -126,7 +126,6 @@ go_library(
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset:go_default_library",
"//staging/src/k8s.io/node-api/pkg/client/clientset/versioned:go_default_library",
"//test/e2e/framework/ginkgowrapper:go_default_library",

View File

@ -46,7 +46,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
scaleclient "k8s.io/client-go/scale"
csi "k8s.io/csi-api/pkg/client/clientset/versioned"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@ -78,7 +77,6 @@ type Framework struct {
ClientSet clientset.Interface
KubemarkExternalClusterClientSet clientset.Interface
APIExtensionsClientSet apiextensionsclient.Interface
CSIClientSet csi.Interface
NodeAPIClientSet nodeapiclient.Interface
InternalClientset *internalclientset.Clientset
@ -194,12 +192,9 @@ func (f *Framework) BeforeEach() {
ExpectNoError(err)
f.DynamicClient, err = dynamic.NewForConfig(config)
ExpectNoError(err)
// csi.storage.k8s.io is based on CRD, which is served only as JSON
// node.k8s.io is based on CRD, which is served only as JSON
jsonConfig := config
jsonConfig.ContentType = "application/json"
f.CSIClientSet, err = csi.NewForConfig(jsonConfig)
ExpectNoError(err)
// node.k8s.io is also based on CRD
f.NodeAPIClientSet, err = nodeapiclient.NewForConfig(jsonConfig)
ExpectNoError(err)

View File

@ -65,7 +65,6 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/metrics:go_default_library",
"//test/e2e/framework/providers/gce:go_default_library",

View File

@ -33,7 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
//csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/storage/drivers"
@ -85,7 +85,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
sc: make(map[string]*storage.StorageClass),
tp: tp,
}
csics := f.CSIClientSet
cs := f.ClientSet
var err error
m.driver = drivers.InitMockCSIDriver(tp.registerDriver, tp.attachable, tp.podInfoVersion, tp.attachLimit)
@ -102,10 +102,10 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
}
if tp.registerDriver {
err = waitForCSIDriver(csics, m.config.GetUniqueDriverName())
err = waitForCSIDriver(cs, m.config.GetUniqueDriverName())
framework.ExpectNoError(err, "Failed to get CSIDriver : %v", err)
m.testCleanups = append(m.testCleanups, func() {
destroyCSIDriver(csics, m.config.GetUniqueDriverName())
destroyCSIDriver(cs, m.config.GetUniqueDriverName())
})
}
}
@ -515,12 +515,12 @@ func checkPodInfo(cs clientset.Interface, namespace, driverPodName, driverContai
}
}
func waitForCSIDriver(csics csiclient.Interface, driverName string) error {
func waitForCSIDriver(cs clientset.Interface, driverName string) error {
timeout := 2 * time.Minute
framework.Logf("waiting up to %v for CSIDriver %q", timeout, driverName)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(framework.Poll) {
_, err := csics.CsiV1alpha1().CSIDrivers().Get(driverName, metav1.GetOptions{})
_, err := cs.StorageV1beta1().CSIDrivers().Get(driverName, metav1.GetOptions{})
if !errors.IsNotFound(err) {
return err
}
@ -528,13 +528,13 @@ func waitForCSIDriver(csics csiclient.Interface, driverName string) error {
return fmt.Errorf("gave up after waiting %v for CSIDriver %q.", timeout, driverName)
}
func destroyCSIDriver(csics csiclient.Interface, driverName string) {
driverGet, err := csics.CsiV1alpha1().CSIDrivers().Get(driverName, metav1.GetOptions{})
func destroyCSIDriver(cs clientset.Interface, driverName string) {
driverGet, err := cs.StorageV1beta1().CSIDrivers().Get(driverName, metav1.GetOptions{})
if err == nil {
framework.Logf("deleting %s.%s: %s", driverGet.TypeMeta.APIVersion, driverGet.TypeMeta.Kind, driverGet.ObjectMeta.Name)
// Uncomment the following line to get full dump of CSIDriver object
// framework.Logf("%s", framework.PrettyPrint(driverGet))
csics.CsiV1alpha1().CSIDrivers().Delete(driverName, nil)
cs.StorageV1beta1().CSIDrivers().Delete(driverName, nil)
}
}

View File

@ -55,7 +55,11 @@ var _ = utils.SIGDescribe("CSI Volumes", func() {
curDriver := initDriver()
Context(testsuites.GetDriverNameWithFeatureTags(curDriver), func() {
testsuites.DefineTestSuite(curDriver, csiTestSuites)
// TODO(xyang): Disable the CSI tests until the sidecar container images
// are updated to support CSINodeInfo and CSIDriver Core APIs in the
// following PR:
// https://github.com/kubernetes/kubernetes/pull/73883
//testsuites.DefineTestSuite(curDriver, csiTestSuites)
})
}

View File

@ -411,7 +411,6 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
informers := informers.NewSharedInformerFactory(testClient, resyncPeriod)
ctrl, err := attachdetach.NewAttachDetachController(
testClient,
nil, /* csiClient */
informers.Core().V1().Pods(),
informers.Core().V1().Nodes(),
informers.Core().V1().PersistentVolumeClaims(),