Delaying kubeclient and csi client injection into CSI plugin

pull/8/head
Cheng Xing 2018-09-13 11:56:34 -07:00
parent 6eeff3e6c9
commit a8e282e905
5 changed files with 84 additions and 46 deletions

View File

@ -27,7 +27,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library",

View File

@ -36,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1"
csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
@ -161,31 +160,22 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
func (p *csiPlugin) Init(host volume.VolumeHost) error {
p.host = host
kubeClient := host.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("error getting kube client")
}
var csiClient csiclientset.Interface
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) ||
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
csiClient = host.GetCSIClient()
if csiClient == nil {
return fmt.Errorf("error getting CSI client")
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Start informer for CSIDrivers.
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
p.csiDriverLister = p.csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
csiClient := host.GetCSIClient()
if csiClient == nil {
glog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization")
} else {
// Start informer for CSIDrivers.
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
p.csiDriverLister = p.csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
}
}
// Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), kubeClient, csiClient)
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)
return nil
}

View File

@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@ -15,10 +16,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
@ -45,6 +44,7 @@ go_test(
deps = [
"//pkg/apis/core/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -53,6 +53,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",

View File

@ -31,11 +31,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
@ -49,9 +48,8 @@ var nodeKind = v1.SchemeGroupVersion.WithKind("Node")
// nodeInfoManager contains necessary common dependencies to update node info on both
// the Node and CSINodeInfo objects.
type nodeInfoManager struct {
nodeName types.NodeName
k8s kubernetes.Interface
csiKubeClient csiclientset.Interface
nodeName types.NodeName
volumeHost volume.VolumeHost
}
// If no updates is needed, the function must return the same Node object as the input.
@ -73,12 +71,10 @@ type Interface interface {
// NewNodeInfoManager initializes nodeInfoManager
func NewNodeInfoManager(
nodeName types.NodeName,
kubeClient kubernetes.Interface,
csiKubeClient csiclientset.Interface) Interface {
volumeHost volume.VolumeHost) Interface {
return &nodeInfoManager{
nodeName: nodeName,
k8s: kubeClient,
csiKubeClient: csiKubeClient,
nodeName: nodeName,
volumeHost: volumeHost,
}
}
@ -143,7 +139,12 @@ func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
nodeClient := nim.k8s.CoreV1().Nodes()
kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("error getting kube client")
}
nodeClient := kubeClient.CoreV1().Nodes()
node, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
@ -315,12 +316,13 @@ func (nim *nodeInfoManager) updateCSINodeInfo(
driverNodeID string,
topology *csipb.Topology) error {
if nim.csiKubeClient == nil {
return fmt.Errorf("CSI client cannot be nil")
csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
nodeInfo, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
return nim.createNodeInfoObject(driverName, driverNodeID, topology)
}
@ -341,6 +343,16 @@ func (nim *nodeInfoManager) createNodeInfoObject(
driverNodeID string,
topology *csipb.Topology) error {
kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("error getting kube client")
}
csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
var topologyKeys []string
if topology != nil {
for k := range topology.Segments {
@ -348,7 +360,7 @@ func (nim *nodeInfoManager) createNodeInfoObject(
}
}
node, err := nim.k8s.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
}
@ -374,7 +386,7 @@ func (nim *nodeInfoManager) createNodeInfoObject(
},
}
_, err = nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
_, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
return err // do not wrap error
}
@ -384,6 +396,11 @@ func (nim *nodeInfoManager) updateNodeInfoObject(
driverNodeID string,
topology *csipb.Topology) error {
csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
topologyKeys := make(sets.String)
if topology != nil {
for k := range topology.Segments {
@ -416,14 +433,19 @@ func (nim *nodeInfoManager) updateNodeInfoObject(
newDriverInfos = append(newDriverInfos, driverInfo)
nodeInfo.CSIDrivers = newDriverInfos
_, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
_, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
return err // do not wrap error
}
func (nim *nodeInfoManager) removeCSINodeInfo(csiDriverName string) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
nodeInfoClient := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos()
csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}
nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos()
nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
// do nothing

View File

@ -27,10 +27,12 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csifake "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/features"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"testing"
)
@ -530,10 +532,23 @@ func TestAddNodeInfoExistingAnnotation(t *testing.T) {
nodeName := tc.existingNode.Name
client := fake.NewSimpleClientset(tc.existingNode)
csiClient := csifake.NewSimpleClientset()
nim := NewNodeInfoManager(types.NodeName(nodeName), client, csiClient)
tmpDir, err := utiltesting.MkTmpdir("nodeinfomanager-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
client,
csiClient,
nil,
nodeName,
)
nim := NewNodeInfoManager(types.NodeName(nodeName), host)
// Act
err := nim.AddNodeInfo(driverName, nodeID, 0 /* maxVolumeLimit */, nil) // TODO test maxVolumeLimit
err = nim.AddNodeInfo(driverName, nodeID, 0 /* maxVolumeLimit */, nil) // TODO test maxVolumeLimit
if err != nil {
t.Errorf("expected no error from AddNodeInfo call but got: %v", err)
continue
@ -573,10 +588,21 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t
} else {
csiClient = csifake.NewSimpleClientset(tc.existingNodeInfo)
}
nim := NewNodeInfoManager(types.NodeName(nodeName), client, csiClient)
tmpDir, err := utiltesting.MkTmpdir("nodeinfomanager-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
client,
csiClient,
nil,
nodeName,
)
nim := NewNodeInfoManager(types.NodeName(nodeName), host)
//// Act
var err error
if addNodeInfo {
err = nim.AddNodeInfo(tc.driverName, tc.inputNodeID, 0 /* maxVolumeLimit */, tc.inputTopology) // TODO test maxVolumeLimit
} else {