From a8e282e9053d8ecadb4574e425755de7e4321ebc Mon Sep 17 00:00:00 2001 From: Cheng Xing Date: Thu, 13 Sep 2018 11:56:34 -0700 Subject: [PATCH] Delaying kubeclient and csi client injection into CSI plugin --- pkg/volume/csi/BUILD | 1 - pkg/volume/csi/csi_plugin.go | 32 ++++------ pkg/volume/csi/nodeinfomanager/BUILD | 5 +- .../csi/nodeinfomanager/nodeinfomanager.go | 58 +++++++++++++------ .../nodeinfomanager/nodeinfomanager_test.go | 34 +++++++++-- 5 files changed, 84 insertions(+), 46 deletions(-) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 0d80f8238a..c9adb17cc5 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -27,7 +27,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library", "//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library", "//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library", diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 8d021d671c..a9083e5359 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -36,7 +36,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" - csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions" csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1" csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1" @@ -161,31 +160,22 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { func (p *csiPlugin) Init(host volume.VolumeHost) error { p.host = host - kubeClient := host.GetKubeClient() - if kubeClient == nil { - return fmt.Errorf("error getting kube client") - } - - var csiClient csiclientset.Interface - if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) || - utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { - csiClient = host.GetCSIClient() - if csiClient == nil { - return fmt.Errorf("error getting CSI client") - } - } - if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { - // Start informer for CSIDrivers. - factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod) - p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers() - p.csiDriverLister = p.csiDriverInformer.Lister() - go factory.Start(wait.NeverStop) + csiClient := host.GetCSIClient() + if csiClient == nil { + glog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization") + } else { + // Start informer for CSIDrivers. + factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod) + p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers() + p.csiDriverLister = p.csiDriverInformer.Lister() + go factory.Start(wait.NeverStop) + } } // Initializing csiDrivers map and label management channels csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}} - nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), kubeClient, csiClient) + nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host) return nil } diff --git a/pkg/volume/csi/nodeinfomanager/BUILD b/pkg/volume/csi/nodeinfomanager/BUILD index e83adfed6f..86ac0efcd2 100644 --- a/pkg/volume/csi/nodeinfomanager/BUILD +++ b/pkg/volume/csi/nodeinfomanager/BUILD @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/features:go_default_library", + "//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -15,10 +16,8 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library", - "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], @@ -45,6 +44,7 @@ go_test( deps = [ "//pkg/apis/core/helper:go_default_library", "//pkg/features:go_default_library", + "//pkg/volume/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -53,6 +53,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library", "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library", diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index 7754dcf28f..f13a4419be 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -31,11 +31,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1" - csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" ) @@ -49,9 +48,8 @@ var nodeKind = v1.SchemeGroupVersion.WithKind("Node") // nodeInfoManager contains necessary common dependencies to update node info on both // the Node and CSINodeInfo objects. type nodeInfoManager struct { - nodeName types.NodeName - k8s kubernetes.Interface - csiKubeClient csiclientset.Interface + nodeName types.NodeName + volumeHost volume.VolumeHost } // If no updates is needed, the function must return the same Node object as the input. @@ -73,12 +71,10 @@ type Interface interface { // NewNodeInfoManager initializes nodeInfoManager func NewNodeInfoManager( nodeName types.NodeName, - kubeClient kubernetes.Interface, - csiKubeClient csiclientset.Interface) Interface { + volumeHost volume.VolumeHost) Interface { return &nodeInfoManager{ - nodeName: nodeName, - k8s: kubeClient, - csiKubeClient: csiKubeClient, + nodeName: nodeName, + volumeHost: volumeHost, } } @@ -143,7 +139,12 @@ func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error { // existing changes are not overwritten. RetryOnConflict uses // exponential backoff to avoid exhausting the apiserver. - nodeClient := nim.k8s.CoreV1().Nodes() + kubeClient := nim.volumeHost.GetKubeClient() + if kubeClient == nil { + return fmt.Errorf("error getting kube client") + } + + nodeClient := kubeClient.CoreV1().Nodes() node, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{}) if err != nil { return err // do not wrap error @@ -315,12 +316,13 @@ func (nim *nodeInfoManager) updateCSINodeInfo( driverNodeID string, topology *csipb.Topology) error { - if nim.csiKubeClient == nil { - return fmt.Errorf("CSI client cannot be nil") + csiKubeClient := nim.volumeHost.GetCSIClient() + if csiKubeClient == nil { + return fmt.Errorf("error getting CSI client") } retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - nodeInfo, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{}) + nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{}) if nodeInfo == nil || errors.IsNotFound(err) { return nim.createNodeInfoObject(driverName, driverNodeID, topology) } @@ -341,6 +343,16 @@ func (nim *nodeInfoManager) createNodeInfoObject( driverNodeID string, topology *csipb.Topology) error { + kubeClient := nim.volumeHost.GetKubeClient() + if kubeClient == nil { + return fmt.Errorf("error getting kube client") + } + + csiKubeClient := nim.volumeHost.GetCSIClient() + if csiKubeClient == nil { + return fmt.Errorf("error getting CSI client") + } + var topologyKeys []string if topology != nil { for k := range topology.Segments { @@ -348,7 +360,7 @@ func (nim *nodeInfoManager) createNodeInfoObject( } } - node, err := nim.k8s.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{}) + node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{}) if err != nil { return err // do not wrap error } @@ -374,7 +386,7 @@ func (nim *nodeInfoManager) createNodeInfoObject( }, } - _, err = nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo) + _, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo) return err // do not wrap error } @@ -384,6 +396,11 @@ func (nim *nodeInfoManager) updateNodeInfoObject( driverNodeID string, topology *csipb.Topology) error { + csiKubeClient := nim.volumeHost.GetCSIClient() + if csiKubeClient == nil { + return fmt.Errorf("error getting CSI client") + } + topologyKeys := make(sets.String) if topology != nil { for k := range topology.Segments { @@ -416,14 +433,19 @@ func (nim *nodeInfoManager) updateNodeInfoObject( newDriverInfos = append(newDriverInfos, driverInfo) nodeInfo.CSIDrivers = newDriverInfos - _, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo) + _, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo) return err // do not wrap error } func (nim *nodeInfoManager) removeCSINodeInfo(csiDriverName string) error { retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - nodeInfoClient := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos() + csiKubeClient := nim.volumeHost.GetCSIClient() + if csiKubeClient == nil { + return fmt.Errorf("error getting CSI client") + } + + nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos() nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{}) if nodeInfo == nil || errors.IsNotFound(err) { // do nothing diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go index a4c70ac56f..122463bdde 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go @@ -27,10 +27,12 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" "k8s.io/client-go/kubernetes/fake" + utiltesting "k8s.io/client-go/util/testing" csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1" csifake "k8s.io/csi-api/pkg/client/clientset/versioned/fake" "k8s.io/kubernetes/pkg/apis/core/helper" "k8s.io/kubernetes/pkg/features" + volumetest "k8s.io/kubernetes/pkg/volume/testing" "testing" ) @@ -530,10 +532,23 @@ func TestAddNodeInfoExistingAnnotation(t *testing.T) { nodeName := tc.existingNode.Name client := fake.NewSimpleClientset(tc.existingNode) csiClient := csifake.NewSimpleClientset() - nim := NewNodeInfoManager(types.NodeName(nodeName), client, csiClient) + + tmpDir, err := utiltesting.MkTmpdir("nodeinfomanager-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + host := volumetest.NewFakeVolumeHostWithCSINodeName( + tmpDir, + client, + csiClient, + nil, + nodeName, + ) + + nim := NewNodeInfoManager(types.NodeName(nodeName), host) // Act - err := nim.AddNodeInfo(driverName, nodeID, 0 /* maxVolumeLimit */, nil) // TODO test maxVolumeLimit + err = nim.AddNodeInfo(driverName, nodeID, 0 /* maxVolumeLimit */, nil) // TODO test maxVolumeLimit if err != nil { t.Errorf("expected no error from AddNodeInfo call but got: %v", err) continue @@ -573,10 +588,21 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t } else { csiClient = csifake.NewSimpleClientset(tc.existingNodeInfo) } - nim := NewNodeInfoManager(types.NodeName(nodeName), client, csiClient) + + tmpDir, err := utiltesting.MkTmpdir("nodeinfomanager-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + host := volumetest.NewFakeVolumeHostWithCSINodeName( + tmpDir, + client, + csiClient, + nil, + nodeName, + ) + nim := NewNodeInfoManager(types.NodeName(nodeName), host) //// Act - var err error if addNodeInfo { err = nim.AddNodeInfo(tc.driverName, tc.inputNodeID, 0 /* maxVolumeLimit */, tc.inputTopology) // TODO test maxVolumeLimit } else {