From fa926ed6e0a3970cc2682e203c7a3f2e9568d132 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Sat, 9 Mar 2019 13:44:08 +0800 Subject: [PATCH] Delay CSI client initialization --- pkg/volume/csi/csi_block.go | 22 +++++++++++++++----- pkg/volume/csi/csi_client.go | 34 +++++++++++++++++++++++++++++++ pkg/volume/csi/csi_mounter.go | 14 ++++++++++--- pkg/volume/csi/csi_plugin.go | 20 ++++-------------- pkg/volume/csi/csi_plugin_test.go | 17 ++++++++++------ 5 files changed, 77 insertions(+), 30 deletions(-) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index d8296c9eda..a1daf21cfe 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -37,8 +37,8 @@ import ( ) type csiBlockMapper struct { + csiClientGetter k8s kubernetes.Interface - csiClient csiClient plugin *csiPlugin driverName csiDriverName specName string @@ -247,14 +247,20 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) { ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() + csiClient, err := m.csiClientGetter.Get() + if err != nil { + klog.Error(log("blockMapper.SetUpDevice failed to get CSI client: %v", err)) + return "", err + } + // Call NodeStageVolume - stagingPath, err := m.stageVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment) + stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) if err != nil { return "", err } // Call NodePublishVolume - publishPath, err := m.publishVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment, stagingPath) + publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath) if err != nil { return "", err } @@ -326,6 +332,12 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() + csiClient, err := m.csiClientGetter.Get() + if err != nil { + klog.Error(log("blockMapper.TearDownDevice failed to get CSI client: %v", err)) + return err + } + // Call NodeUnpublishVolume publishPath := m.getPublishPath() if _, err := os.Stat(publishPath); err != nil { @@ -335,7 +347,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error return err } } else { - err := m.unpublishVolumeForBlock(ctx, m.csiClient, publishPath) + err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath) if err != nil { return err } @@ -350,7 +362,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error return err } } else { - err := m.unstageVolumeForBlock(ctx, m.csiClient, stagingPath) + err := m.unstageVolumeForBlock(ctx, csiClient, stagingPath) if err != nil { return err } diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 075123080b..272c5eee78 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -23,6 +23,7 @@ import ( "io" "net" "strings" + "sync" "time" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" @@ -807,3 +808,36 @@ func versionRequiresV0Client(version *utilversion.Version) bool { return false } + +// CSI client getter with cache. +// This provides a method to initialize CSI client with driver name and caches +// it for later use. When CSI clients have not been discovered yet (e.g. +// on kubelet restart), client initialization will fail. Users of CSI client (e.g. +// mounter manager and block mapper) can use this to delay CSI client +// initialization until needed. +type csiClientGetter struct { + sync.RWMutex + csiClient csiClient + driverName csiDriverName +} + +func (c *csiClientGetter) Get() (csiClient, error) { + c.RLock() + if c.csiClient != nil { + c.RUnlock() + return c.csiClient, nil + } + c.RUnlock() + c.Lock() + defer c.Unlock() + // Double-checking locking criterion. + if c.csiClient != nil { + return c.csiClient, nil + } + csi, err := newCsiDriverClient(c.driverName) + if err != nil { + return nil, err + } + c.csiClient = csi + return c.csiClient, nil +} diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index f61ccc33d2..ccdce2ace2 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -56,7 +56,7 @@ var ( ) type csiMountMgr struct { - csiClient csiClient + csiClientGetter k8s kubernetes.Interface plugin *csiPlugin driverName csiDriverName @@ -111,7 +111,11 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { return nil } - csi := c.csiClient + csi, err := c.csiClientGetter.Get() + if err != nil { + klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err)) + return err + } ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() @@ -343,7 +347,11 @@ func (c *csiMountMgr) TearDownAt(dir string) error { klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir)) volID := c.volumeID - csi := c.csiClient + csi, err := c.csiClientGetter.Get() + if err != nil { + klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err)) + return err + } ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index f1ac67aced..ade21aa109 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -383,11 +383,6 @@ func (p *csiPlugin) NewMounter( return nil, errors.New("failed to get a Kubernetes client") } - csi, err := newCsiDriverClient(csiDriverName(driverName)) - if err != nil { - return nil, err - } - mounter := &csiMountMgr{ plugin: p, k8s: k8s, @@ -398,9 +393,9 @@ func (p *csiPlugin) NewMounter( driverMode: driverMode, volumeID: volumeHandle, specVolumeID: spec.Name(), - csiClient: csi, readOnly: readOnly, } + mounter.csiClientGetter.driverName = csiDriverName(driverName) // Save volume info in pod dir dir := mounter.GetPath() @@ -458,10 +453,7 @@ func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmo } unmounter.driverName = csiDriverName(data[volDataKey.driverName]) unmounter.volumeID = data[volDataKey.volHandle] - unmounter.csiClient, err = newCsiDriverClient(unmounter.driverName) - if err != nil { - return nil, err - } + unmounter.csiClientGetter.driverName = unmounter.driverName return unmounter, nil } @@ -638,10 +630,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt } klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver)) - client, err := newCsiDriverClient(csiDriverName(pvSource.Driver)) - if err != nil { - return nil, err - } k8s := p.host.GetKubeClient() if k8s == nil { @@ -650,7 +638,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt } mapper := &csiBlockMapper{ - csiClient: client, k8s: k8s, plugin: p, volumeID: pvSource.VolumeHandle, @@ -660,6 +647,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt specName: spec.Name(), podUID: podRef.UID, } + mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver) // Save volume info in pod dir dataDir := getVolumeDeviceDataDir(spec.Name(), p.host) @@ -714,7 +702,7 @@ func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (vo } unmapper.driverName = csiDriverName(data[volDataKey.driverName]) unmapper.volumeID = data[volDataKey.volHandle] - unmapper.csiClient, err = newCsiDriverClient(unmapper.driverName) + unmapper.csiClientGetter.driverName = unmapper.driverName if err != nil { return nil, err } diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index a45e0dcb99..4747ba4db3 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -614,7 +614,8 @@ func TestPluginNewMounter(t *testing.T) { if string(csiMounter.podUID) != string(test.podUID) { t.Error("mounter podUID not set") } - if csiMounter.csiClient == nil { + csiClient, err := csiMounter.csiClientGetter.Get() + if csiClient == nil { t.Error("mounter csiClient is nil") } if csiMounter.driverMode != test.driverMode { @@ -732,7 +733,8 @@ func TestPluginNewMounterWithInline(t *testing.T) { if string(csiMounter.podUID) != string(test.podUID) { t.Error("mounter podUID not set") } - if csiMounter.csiClient == nil { + csiClient, err := csiMounter.csiClientGetter.Get() + if csiClient == nil { t.Error("mounter csiClient is nil") } if csiMounter.driverMode != test.driverMode { @@ -815,8 +817,9 @@ func TestPluginNewUnmounter(t *testing.T) { t.Error("podUID not set") } - if csiUnmounter.csiClient == nil { - t.Error("unmounter csiClient is nil") + csiClient, err := csiUnmounter.csiClientGetter.Get() + if csiClient == nil { + t.Error("mounter csiClient is nil") } } @@ -932,7 +935,8 @@ func TestPluginNewBlockMapper(t *testing.T) { if csiMapper.podUID == types.UID("") { t.Error("CSI block mapper missing pod.UID") } - if csiMapper.csiClient == nil { + csiClient, err := csiMapper.csiClientGetter.Get() + if csiClient == nil { t.Error("mapper csiClient is nil") } @@ -994,7 +998,8 @@ func TestPluginNewUnmapper(t *testing.T) { t.Error("specName not set") } - if csiUnmapper.csiClient == nil { + csiClient, err := csiUnmapper.csiClientGetter.Get() + if csiClient == nil { t.Error("unmapper csiClient is nil") }