mirror of https://github.com/k3s-io/k3s
Pass new CSI API Client and informer to Volume Plugins
parent
c8ff210d91
commit
7d673cb8f0
|
@ -125,6 +125,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/metrics/pkg/client/custom_metrics:go_default_library",
|
||||
"//staging/src/k8s.io/metrics/pkg/client/external_metrics:go_default_library",
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
||||
"k8s.io/client-go/dynamic"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
|
@ -192,9 +193,14 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
|
|||
if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
|
||||
return nil, true, fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.")
|
||||
}
|
||||
csiClientConfig := ctx.ClientBuilder.ConfigOrDie("attachdetach-controller")
|
||||
// csiClient works with CRDs that support json only
|
||||
csiClientConfig.ContentType = "application/json"
|
||||
|
||||
attachDetachController, attachDetachControllerErr :=
|
||||
attachdetach.NewAttachDetachController(
|
||||
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
|
||||
csiclientset.NewForConfigOrDie(csiClientConfig),
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
|
|
|
@ -121,6 +121,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/certificate:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//vendor/github.com/coreos/go-systemd/daemon:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/github.com/spf13/cobra:go_default_library",
|
||||
|
|
|
@ -55,6 +55,7 @@ import (
|
|||
"k8s.io/client-go/tools/record"
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
"k8s.io/client-go/util/certificate"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/cmd/kubelet/app/options"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
|
@ -387,6 +388,7 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err
|
|||
DockerClientConfig: dockerClientConfig,
|
||||
KubeClient: nil,
|
||||
HeartbeatClient: nil,
|
||||
CSIClient: nil,
|
||||
EventClient: nil,
|
||||
Mounter: mounter,
|
||||
OOMAdjuster: oom.NewOOMAdjuster(),
|
||||
|
@ -607,6 +609,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
|
|||
glog.Warningf("Failed to create API Server client for heartbeat: %v", err)
|
||||
}
|
||||
|
||||
// csiClient works with CRDs that support json only
|
||||
clientConfig.ContentType = "application/json"
|
||||
csiClient, err := csiclientset.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to create CSI API client: %v", err)
|
||||
}
|
||||
|
||||
kubeDeps.KubeClient = kubeClient
|
||||
if heartbeatClient != nil {
|
||||
kubeDeps.HeartbeatClient = heartbeatClient
|
||||
|
@ -615,6 +624,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
|
|||
if eventClient != nil {
|
||||
kubeDeps.EventClient = eventClient
|
||||
}
|
||||
kubeDeps.CSIClient = csiClient
|
||||
}
|
||||
|
||||
// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
|
||||
|
|
|
@ -39,6 +39,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
kcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
|
@ -96,6 +97,7 @@ type AttachDetachController interface {
|
|||
// NewAttachDetachController returns a new instance of AttachDetachController.
|
||||
func NewAttachDetachController(
|
||||
kubeClient clientset.Interface,
|
||||
csiClient csiclientset.Interface,
|
||||
podInformer coreinformers.PodInformer,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
|
@ -122,6 +124,7 @@ func NewAttachDetachController(
|
|||
// deleted (probably can't do this with sharedInformer), etc.
|
||||
adc := &attachDetachController{
|
||||
kubeClient: kubeClient,
|
||||
csiClient: csiClient,
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
pvcsSynced: pvcInformer.Informer().HasSynced,
|
||||
pvLister: pvInformer.Lister(),
|
||||
|
@ -237,6 +240,10 @@ type attachDetachController struct {
|
|||
// the API server.
|
||||
kubeClient clientset.Interface
|
||||
|
||||
// csiClient is the csi.storage.k8s.io API client used by volumehost to communicate with
|
||||
// the API server.
|
||||
csiClient csiclientset.Interface
|
||||
|
||||
// pvcLister is the shared PVC lister used to fetch and store PVC
|
||||
// objects from the API server. It is shared with other controllers and
|
||||
// therefore the PVC objects in its store should be treated as immutable.
|
||||
|
@ -751,3 +758,7 @@ func (adc *attachDetachController) GetNodeName() types.NodeName {
|
|||
func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
|
||||
return adc.recorder
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) GetCSIClient() csiclientset.Interface {
|
||||
return adc.csiClient
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
|
|||
// Act
|
||||
_, err := NewAttachDetachController(
|
||||
fakeKubeClient,
|
||||
nil, /* csiClient */
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
|
@ -214,6 +215,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
|
|||
// Create the controller
|
||||
adcObj, err := NewAttachDetachController(
|
||||
fakeKubeClient,
|
||||
nil,
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
|
|
|
@ -38,6 +38,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
kcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/events"
|
||||
|
@ -324,3 +325,8 @@ func (expc *expandController) GetNodeName() types.NodeName {
|
|||
func (expc *expandController) GetEventRecorder() record.EventRecorder {
|
||||
return expc.recorder
|
||||
}
|
||||
|
||||
func (expc *expandController) GetCSIClient() csiclientset.Interface {
|
||||
// No volume plugin in expand controller needs csi.storage.k8s.io
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/types"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
vol "k8s.io/kubernetes/pkg/volume"
|
||||
|
@ -123,3 +124,8 @@ func (ctrl *PersistentVolumeController) GetNodeName() types.NodeName {
|
|||
func (ctrl *PersistentVolumeController) GetEventRecorder() record.EventRecorder {
|
||||
return ctrl.eventRecorder
|
||||
}
|
||||
|
||||
func (ctrl *PersistentVolumeController) GetCSIClient() csiclientset.Interface {
|
||||
// No volume plugin needs csi.storage.k8s.io client in PV controller.
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -135,6 +135,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/util/certificate:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/integer:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//third_party/forked/golang/expansion:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/github.com/golang/groupcache/lru:go_default_library",
|
||||
|
|
|
@ -53,6 +53,7 @@ import (
|
|||
"k8s.io/client-go/util/certificate"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/client-go/util/integer"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
|
@ -243,6 +244,7 @@ type Dependencies struct {
|
|||
HeartbeatClient clientset.Interface
|
||||
OnHeartbeatFailure func()
|
||||
KubeClient clientset.Interface
|
||||
CSIClient csiclientset.Interface
|
||||
Mounter mount.Interface
|
||||
OOMAdjuster *oom.OOMAdjuster
|
||||
OSInterface kubecontainer.OSInterface
|
||||
|
@ -484,6 +486,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||
hostnameOverridden: len(hostnameOverride) > 0,
|
||||
nodeName: nodeName,
|
||||
kubeClient: kubeDeps.KubeClient,
|
||||
csiClient: kubeDeps.CSIClient,
|
||||
heartbeatClient: kubeDeps.HeartbeatClient,
|
||||
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
|
||||
rootDirectory: rootDirectory,
|
||||
|
@ -876,6 +879,7 @@ type Kubelet struct {
|
|||
nodeName types.NodeName
|
||||
runtimeCache kubecontainer.RuntimeCache
|
||||
kubeClient clientset.Interface
|
||||
csiClient csiclientset.Interface
|
||||
heartbeatClient clientset.Interface
|
||||
iptClient utilipt.Interface
|
||||
rootDirectory string
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
|
@ -121,6 +122,10 @@ func (kvh *kubeletVolumeHost) GetKubeClient() clientset.Interface {
|
|||
return kvh.kubelet.kubeClient
|
||||
}
|
||||
|
||||
func (kvh *kubeletVolumeHost) GetCSIClient() csiclientset.Interface {
|
||||
return kvh.kubelet.csiClient
|
||||
}
|
||||
|
||||
func (kvh *kubeletVolumeHost) NewWrapperMounter(
|
||||
volName string,
|
||||
spec volume.Spec,
|
||||
|
|
|
@ -30,6 +30,7 @@ go_library(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -23,9 +23,12 @@ go_library(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library",
|
||||
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/google.golang.org/grpc:go_default_library",
|
||||
|
|
|
@ -26,11 +26,15 @@ import (
|
|||
"time"
|
||||
|
||||
"context"
|
||||
|
||||
"github.com/golang/glog"
|
||||
api "k8s.io/api/core/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
|
||||
csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csi/labelmanager"
|
||||
|
@ -48,11 +52,15 @@ const (
|
|||
volNameSep = "^"
|
||||
volDataFileName = "vol_data.json"
|
||||
fsTypeBlockName = "block"
|
||||
|
||||
// TODO: increase to something useful
|
||||
csiResyncPeriod = time.Minute
|
||||
)
|
||||
|
||||
type csiPlugin struct {
|
||||
host volume.VolumeHost
|
||||
blockEnabled bool
|
||||
host volume.VolumeHost
|
||||
blockEnabled bool
|
||||
csiDriverInformer csiinformer.CSIDriverInformer
|
||||
}
|
||||
|
||||
// ProbeVolumePlugins returns implemented plugins
|
||||
|
@ -132,6 +140,14 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
|||
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
|
||||
lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient())
|
||||
|
||||
csiClient := host.GetCSIClient()
|
||||
if csiClient != nil {
|
||||
// Start informer for CSIDrivers.
|
||||
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
|
||||
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
|
||||
go factory.Start(wait.NeverStop)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/validation"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
|
||||
|
@ -309,6 +310,9 @@ type VolumeHost interface {
|
|||
// GetKubeClient returns a client interface
|
||||
GetKubeClient() clientset.Interface
|
||||
|
||||
// GetCSIClient returns a client interface to csi.storage.k8s.io
|
||||
GetCSIClient() csiclientset.Interface
|
||||
|
||||
// NewWrapperMounter finds an appropriate plugin with which to handle
|
||||
// the provided spec. This is used to implement volume plugins which
|
||||
// "wrap" other plugins. For example, the "secret" volume is
|
||||
|
|
|
@ -29,6 +29,7 @@ go_library(
|
|||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
|
||||
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/mock:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
utilstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||
|
@ -49,6 +50,7 @@ import (
|
|||
type fakeVolumeHost struct {
|
||||
rootDir string
|
||||
kubeClient clientset.Interface
|
||||
csiClient csiclientset.Interface
|
||||
pluginMgr VolumePluginMgr
|
||||
cloud cloudprovider.Interface
|
||||
mounter mount.Interface
|
||||
|
@ -113,6 +115,10 @@ func (f *fakeVolumeHost) GetKubeClient() clientset.Interface {
|
|||
return f.kubeClient
|
||||
}
|
||||
|
||||
func (f *fakeVolumeHost) GetCSIClient() csiclientset.Interface {
|
||||
return f.csiClient
|
||||
}
|
||||
|
||||
func (f *fakeVolumeHost) GetCloudProvider() cloudprovider.Interface {
|
||||
return f.cloud
|
||||
}
|
||||
|
|
|
@ -412,6 +412,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
|
|||
informers := informers.NewSharedInformerFactory(testClient, resyncPeriod)
|
||||
ctrl, err := attachdetach.NewAttachDetachController(
|
||||
testClient,
|
||||
nil, /* csiClient */
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().Nodes(),
|
||||
informers.Core().V1().PersistentVolumeClaims(),
|
||||
|
|
Loading…
Reference in New Issue