mirror of https://github.com/k3s-io/k3s
Delay CSI client initialization
parent
b150560107
commit
fa926ed6e0
|
@ -37,8 +37,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type csiBlockMapper struct {
|
type csiBlockMapper struct {
|
||||||
|
csiClientGetter
|
||||||
k8s kubernetes.Interface
|
k8s kubernetes.Interface
|
||||||
csiClient csiClient
|
|
||||||
plugin *csiPlugin
|
plugin *csiPlugin
|
||||||
driverName csiDriverName
|
driverName csiDriverName
|
||||||
specName string
|
specName string
|
||||||
|
@ -247,14 +247,20 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
csiClient, err := m.csiClientGetter.Get()
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
// Call NodeStageVolume
|
// Call NodeStageVolume
|
||||||
stagingPath, err := m.stageVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment)
|
stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call NodePublishVolume
|
// Call NodePublishVolume
|
||||||
publishPath, err := m.publishVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment, stagingPath)
|
publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -326,6 +332,12 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
csiClient, err := m.csiClientGetter.Get()
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Call NodeUnpublishVolume
|
// Call NodeUnpublishVolume
|
||||||
publishPath := m.getPublishPath()
|
publishPath := m.getPublishPath()
|
||||||
if _, err := os.Stat(publishPath); err != nil {
|
if _, err := os.Stat(publishPath); err != nil {
|
||||||
|
@ -335,7 +347,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err := m.unpublishVolumeForBlock(ctx, m.csiClient, publishPath)
|
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -350,7 +362,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err := m.unstageVolumeForBlock(ctx, m.csiClient, stagingPath)
|
err := m.unstageVolumeForBlock(ctx, csiClient, stagingPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
|
@ -807,3 +808,36 @@ func versionRequiresV0Client(version *utilversion.Version) bool {
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CSI client getter with cache.
|
||||||
|
// This provides a method to initialize CSI client with driver name and caches
|
||||||
|
// it for later use. When CSI clients have not been discovered yet (e.g.
|
||||||
|
// on kubelet restart), client initialization will fail. Users of CSI client (e.g.
|
||||||
|
// mounter manager and block mapper) can use this to delay CSI client
|
||||||
|
// initialization until needed.
|
||||||
|
type csiClientGetter struct {
|
||||||
|
sync.RWMutex
|
||||||
|
csiClient csiClient
|
||||||
|
driverName csiDriverName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *csiClientGetter) Get() (csiClient, error) {
|
||||||
|
c.RLock()
|
||||||
|
if c.csiClient != nil {
|
||||||
|
c.RUnlock()
|
||||||
|
return c.csiClient, nil
|
||||||
|
}
|
||||||
|
c.RUnlock()
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
// Double-checking locking criterion.
|
||||||
|
if c.csiClient != nil {
|
||||||
|
return c.csiClient, nil
|
||||||
|
}
|
||||||
|
csi, err := newCsiDriverClient(c.driverName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c.csiClient = csi
|
||||||
|
return c.csiClient, nil
|
||||||
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type csiMountMgr struct {
|
type csiMountMgr struct {
|
||||||
csiClient csiClient
|
csiClientGetter
|
||||||
k8s kubernetes.Interface
|
k8s kubernetes.Interface
|
||||||
plugin *csiPlugin
|
plugin *csiPlugin
|
||||||
driverName csiDriverName
|
driverName csiDriverName
|
||||||
|
@ -111,7 +111,11 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
csi := c.csiClient
|
csi, err := c.csiClientGetter.Get()
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -343,7 +347,11 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
|
||||||
klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
|
klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
|
||||||
|
|
||||||
volID := c.volumeID
|
volID := c.volumeID
|
||||||
csi := c.csiClient
|
csi, err := c.csiClientGetter.Get()
|
||||||
|
if err != nil {
|
||||||
|
klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
|
@ -383,11 +383,6 @@ func (p *csiPlugin) NewMounter(
|
||||||
return nil, errors.New("failed to get a Kubernetes client")
|
return nil, errors.New("failed to get a Kubernetes client")
|
||||||
}
|
}
|
||||||
|
|
||||||
csi, err := newCsiDriverClient(csiDriverName(driverName))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
mounter := &csiMountMgr{
|
mounter := &csiMountMgr{
|
||||||
plugin: p,
|
plugin: p,
|
||||||
k8s: k8s,
|
k8s: k8s,
|
||||||
|
@ -398,9 +393,9 @@ func (p *csiPlugin) NewMounter(
|
||||||
driverMode: driverMode,
|
driverMode: driverMode,
|
||||||
volumeID: volumeHandle,
|
volumeID: volumeHandle,
|
||||||
specVolumeID: spec.Name(),
|
specVolumeID: spec.Name(),
|
||||||
csiClient: csi,
|
|
||||||
readOnly: readOnly,
|
readOnly: readOnly,
|
||||||
}
|
}
|
||||||
|
mounter.csiClientGetter.driverName = csiDriverName(driverName)
|
||||||
|
|
||||||
// Save volume info in pod dir
|
// Save volume info in pod dir
|
||||||
dir := mounter.GetPath()
|
dir := mounter.GetPath()
|
||||||
|
@ -458,10 +453,7 @@ func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmo
|
||||||
}
|
}
|
||||||
unmounter.driverName = csiDriverName(data[volDataKey.driverName])
|
unmounter.driverName = csiDriverName(data[volDataKey.driverName])
|
||||||
unmounter.volumeID = data[volDataKey.volHandle]
|
unmounter.volumeID = data[volDataKey.volHandle]
|
||||||
unmounter.csiClient, err = newCsiDriverClient(unmounter.driverName)
|
unmounter.csiClientGetter.driverName = unmounter.driverName
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return unmounter, nil
|
return unmounter, nil
|
||||||
}
|
}
|
||||||
|
@ -638,10 +630,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
|
klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
|
||||||
client, err := newCsiDriverClient(csiDriverName(pvSource.Driver))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
k8s := p.host.GetKubeClient()
|
k8s := p.host.GetKubeClient()
|
||||||
if k8s == nil {
|
if k8s == nil {
|
||||||
|
@ -650,7 +638,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
|
||||||
}
|
}
|
||||||
|
|
||||||
mapper := &csiBlockMapper{
|
mapper := &csiBlockMapper{
|
||||||
csiClient: client,
|
|
||||||
k8s: k8s,
|
k8s: k8s,
|
||||||
plugin: p,
|
plugin: p,
|
||||||
volumeID: pvSource.VolumeHandle,
|
volumeID: pvSource.VolumeHandle,
|
||||||
|
@ -660,6 +647,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
|
||||||
specName: spec.Name(),
|
specName: spec.Name(),
|
||||||
podUID: podRef.UID,
|
podUID: podRef.UID,
|
||||||
}
|
}
|
||||||
|
mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
|
||||||
|
|
||||||
// Save volume info in pod dir
|
// Save volume info in pod dir
|
||||||
dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
|
dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
|
||||||
|
@ -714,7 +702,7 @@ func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (vo
|
||||||
}
|
}
|
||||||
unmapper.driverName = csiDriverName(data[volDataKey.driverName])
|
unmapper.driverName = csiDriverName(data[volDataKey.driverName])
|
||||||
unmapper.volumeID = data[volDataKey.volHandle]
|
unmapper.volumeID = data[volDataKey.volHandle]
|
||||||
unmapper.csiClient, err = newCsiDriverClient(unmapper.driverName)
|
unmapper.csiClientGetter.driverName = unmapper.driverName
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -614,7 +614,8 @@ func TestPluginNewMounter(t *testing.T) {
|
||||||
if string(csiMounter.podUID) != string(test.podUID) {
|
if string(csiMounter.podUID) != string(test.podUID) {
|
||||||
t.Error("mounter podUID not set")
|
t.Error("mounter podUID not set")
|
||||||
}
|
}
|
||||||
if csiMounter.csiClient == nil {
|
csiClient, err := csiMounter.csiClientGetter.Get()
|
||||||
|
if csiClient == nil {
|
||||||
t.Error("mounter csiClient is nil")
|
t.Error("mounter csiClient is nil")
|
||||||
}
|
}
|
||||||
if csiMounter.driverMode != test.driverMode {
|
if csiMounter.driverMode != test.driverMode {
|
||||||
|
@ -732,7 +733,8 @@ func TestPluginNewMounterWithInline(t *testing.T) {
|
||||||
if string(csiMounter.podUID) != string(test.podUID) {
|
if string(csiMounter.podUID) != string(test.podUID) {
|
||||||
t.Error("mounter podUID not set")
|
t.Error("mounter podUID not set")
|
||||||
}
|
}
|
||||||
if csiMounter.csiClient == nil {
|
csiClient, err := csiMounter.csiClientGetter.Get()
|
||||||
|
if csiClient == nil {
|
||||||
t.Error("mounter csiClient is nil")
|
t.Error("mounter csiClient is nil")
|
||||||
}
|
}
|
||||||
if csiMounter.driverMode != test.driverMode {
|
if csiMounter.driverMode != test.driverMode {
|
||||||
|
@ -815,8 +817,9 @@ func TestPluginNewUnmounter(t *testing.T) {
|
||||||
t.Error("podUID not set")
|
t.Error("podUID not set")
|
||||||
}
|
}
|
||||||
|
|
||||||
if csiUnmounter.csiClient == nil {
|
csiClient, err := csiUnmounter.csiClientGetter.Get()
|
||||||
t.Error("unmounter csiClient is nil")
|
if csiClient == nil {
|
||||||
|
t.Error("mounter csiClient is nil")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -932,7 +935,8 @@ func TestPluginNewBlockMapper(t *testing.T) {
|
||||||
if csiMapper.podUID == types.UID("") {
|
if csiMapper.podUID == types.UID("") {
|
||||||
t.Error("CSI block mapper missing pod.UID")
|
t.Error("CSI block mapper missing pod.UID")
|
||||||
}
|
}
|
||||||
if csiMapper.csiClient == nil {
|
csiClient, err := csiMapper.csiClientGetter.Get()
|
||||||
|
if csiClient == nil {
|
||||||
t.Error("mapper csiClient is nil")
|
t.Error("mapper csiClient is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -994,7 +998,8 @@ func TestPluginNewUnmapper(t *testing.T) {
|
||||||
t.Error("specName not set")
|
t.Error("specName not set")
|
||||||
}
|
}
|
||||||
|
|
||||||
if csiUnmapper.csiClient == nil {
|
csiClient, err := csiUnmapper.csiClientGetter.Get()
|
||||||
|
if csiClient == nil {
|
||||||
t.Error("unmapper csiClient is nil")
|
t.Error("unmapper csiClient is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue