mirror of https://github.com/k3s-io/k3s
Collapse duplicate informer creation paths
parent
3afd822537
commit
816f6d32ca
|
@ -197,10 +197,10 @@ func Run(s *options.CMServer) error {
|
|||
}
|
||||
|
||||
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
|
||||
podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
|
||||
nodeInformer := informers.CreateSharedNodeIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)())
|
||||
pvcInformer := informers.CreateSharedPVCIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pvc-informer")), ResyncPeriod(s)())
|
||||
pvInformer := informers.CreateSharedPVIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pv-informer")), ResyncPeriod(s)())
|
||||
podInformer := informers.NewPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
|
||||
nodeInformer := informers.NewNodeInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)())
|
||||
pvcInformer := informers.NewPVCInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pvc-informer")), ResyncPeriod(s)())
|
||||
pvInformer := informers.NewPVInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pv-informer")), ResyncPeriod(s)())
|
||||
informers := map[reflect.Type]framework.SharedIndexInformer{}
|
||||
informers[reflect.TypeOf(&api.Pod{})] = podInformer
|
||||
informers[reflect.TypeOf(&api.Node{})] = nodeInformer
|
||||
|
|
|
@ -205,7 +205,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
|
|||
}
|
||||
|
||||
func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
|
||||
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
|
||||
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
|
||||
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
|
||||
dsc.internalPodInformer = podInformer
|
||||
|
||||
|
|
|
@ -113,7 +113,7 @@ func NewEndpointController(podInformer framework.SharedIndexInformer, client *cl
|
|||
|
||||
// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
|
||||
func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
|
||||
podInformer := informers.CreateSharedPodIndexInformer(client, resyncPeriod())
|
||||
podInformer := informers.NewPodInformer(client, resyncPeriod())
|
||||
e := NewEndpointController(podInformer, client)
|
||||
e.internalPodInformer = podInformer
|
||||
|
||||
|
|
|
@ -22,8 +22,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// PodInformer is type of SharedIndexInformer which watches and lists all pods.
|
||||
|
@ -43,26 +41,12 @@ func (f *podInformer) Informer() framework.SharedIndexInformer {
|
|||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
informerObj := &api.Pod{}
|
||||
informerType := reflect.TypeOf(informerObj)
|
||||
informerType := reflect.TypeOf(&api.Pod{})
|
||||
informer, exists := f.informers[informerType]
|
||||
if exists {
|
||||
return informer
|
||||
}
|
||||
|
||||
informer = framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return f.client.Core().Pods(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return f.client.Core().Pods(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
informerObj,
|
||||
f.defaultResync,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
informer = NewPodInformer(f.client, f.defaultResync)
|
||||
f.informers[informerType] = informer
|
||||
|
||||
return informer
|
||||
|
@ -92,25 +76,13 @@ type namespaceInformer struct {
|
|||
func (f *namespaceInformer) Informer() framework.SharedIndexInformer {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
informerObj := &api.Namespace{}
|
||||
informerType := reflect.TypeOf(informerObj)
|
||||
|
||||
informerType := reflect.TypeOf(&api.Namespace{})
|
||||
informer, exists := f.informers[informerType]
|
||||
if exists {
|
||||
return informer
|
||||
}
|
||||
informer = framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return f.client.Core().Namespaces().List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return f.client.Core().Namespaces().Watch(options)
|
||||
},
|
||||
},
|
||||
informerObj,
|
||||
f.defaultResync,
|
||||
cache.Indexers{},
|
||||
)
|
||||
informer = NewNamespaceInformer(f.client, f.defaultResync)
|
||||
f.informers[informerType] = informer
|
||||
|
||||
return informer
|
||||
|
@ -141,26 +113,12 @@ func (f *nodeInformer) Informer() framework.SharedIndexInformer {
|
|||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
informerObj := &api.Node{}
|
||||
informerType := reflect.TypeOf(informerObj)
|
||||
informerType := reflect.TypeOf(&api.Node{})
|
||||
informer, exists := f.informers[informerType]
|
||||
if exists {
|
||||
return informer
|
||||
}
|
||||
|
||||
informer = framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return f.client.Core().Nodes().List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return f.client.Core().Nodes().Watch(options)
|
||||
},
|
||||
},
|
||||
informerObj,
|
||||
f.defaultResync,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
informer = NewNodeInformer(f.client, f.defaultResync)
|
||||
f.informers[informerType] = informer
|
||||
|
||||
return informer
|
||||
|
@ -191,26 +149,12 @@ func (f *pvcInformer) Informer() framework.SharedIndexInformer {
|
|||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
informerObj := &api.PersistentVolumeClaim{}
|
||||
informerType := reflect.TypeOf(informerObj)
|
||||
informerType := reflect.TypeOf(&api.PersistentVolumeClaim{})
|
||||
informer, exists := f.informers[informerType]
|
||||
if exists {
|
||||
return informer
|
||||
}
|
||||
|
||||
informer = framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
informerObj,
|
||||
f.defaultResync,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
informer = NewPVCInformer(f.client, f.defaultResync)
|
||||
f.informers[informerType] = informer
|
||||
|
||||
return informer
|
||||
|
@ -241,26 +185,12 @@ func (f *pvInformer) Informer() framework.SharedIndexInformer {
|
|||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
informerObj := &api.PersistentVolume{}
|
||||
informerType := reflect.TypeOf(informerObj)
|
||||
informerType := reflect.TypeOf(&api.PersistentVolume{})
|
||||
informer, exists := f.informers[informerType]
|
||||
if exists {
|
||||
return informer
|
||||
}
|
||||
|
||||
informer = framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return f.client.Core().PersistentVolumes().List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return f.client.Core().PersistentVolumes().Watch(options)
|
||||
},
|
||||
},
|
||||
informerObj,
|
||||
f.defaultResync,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
informer = NewPVInformer(f.client, f.defaultResync)
|
||||
f.informers[informerType] = informer
|
||||
|
||||
return informer
|
||||
|
|
|
@ -101,27 +101,8 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer {
|
|||
return &pvInformer{sharedInformerFactory: f}
|
||||
}
|
||||
|
||||
// CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods
|
||||
func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
sharedInformer := framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return client.Core().Pods(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return client.Core().Pods(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.Pod{},
|
||||
resyncPeriod,
|
||||
cache.Indexers{},
|
||||
)
|
||||
|
||||
return sharedInformer
|
||||
}
|
||||
|
||||
// CreateSharedPodIndexInformer returns a SharedIndexInformer that lists and watches all pods
|
||||
func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
// NewPodInformer returns a SharedIndexInformer that lists and watches all pods
|
||||
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
sharedIndexInformer := framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
|
@ -139,8 +120,8 @@ func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.
|
|||
return sharedIndexInformer
|
||||
}
|
||||
|
||||
// CreateSharedNodeIndexInformer returns a SharedIndexInformer that lists and watches all nodes
|
||||
func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes
|
||||
func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
sharedIndexInformer := framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
|
@ -157,8 +138,8 @@ func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time
|
|||
return sharedIndexInformer
|
||||
}
|
||||
|
||||
// CreateSharedPVCIndexInformer returns a SharedIndexInformer that lists and watches all PVCs
|
||||
func CreateSharedPVCIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs
|
||||
func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
sharedIndexInformer := framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
|
@ -175,8 +156,8 @@ func CreateSharedPVCIndexInformer(client clientset.Interface, resyncPeriod time.
|
|||
return sharedIndexInformer
|
||||
}
|
||||
|
||||
// CreateSharedPVIndexInformer returns a SharedIndexInformer that lists and watches all PVs
|
||||
func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs
|
||||
func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
sharedIndexInformer := framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
|
@ -193,8 +174,8 @@ func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.D
|
|||
return sharedIndexInformer
|
||||
}
|
||||
|
||||
// CreateSharedNamespaceIndexInformer returns a SharedIndexInformer that lists and watches namespaces
|
||||
func CreateSharedNamespaceIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces
|
||||
func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
|
||||
sharedIndexInformer := framework.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
|
|
|
@ -135,7 +135,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie
|
|||
}
|
||||
|
||||
func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
|
||||
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
|
||||
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
|
||||
jm := NewJobController(podInformer, kubeClient)
|
||||
jm.internalPodInformer = podInformer
|
||||
|
||||
|
|
|
@ -343,7 +343,7 @@ func NewNodeControllerFromClient(
|
|||
serviceCIDR *net.IPNet,
|
||||
nodeCIDRMaskSize int,
|
||||
allocateNodeCIDRs bool) (*NodeController, error) {
|
||||
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, controller.NoResyncPeriodFunc())
|
||||
podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc())
|
||||
nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod,
|
||||
nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
|
||||
if err != nil {
|
||||
|
|
|
@ -184,7 +184,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra
|
|||
|
||||
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
|
||||
func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
|
||||
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
|
||||
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
|
||||
garbageCollectorEnabled := false
|
||||
rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
|
||||
rsc.internalPodInformer = podInformer
|
||||
|
|
|
@ -189,7 +189,7 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer frame
|
|||
|
||||
// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
|
||||
func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
|
||||
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
|
||||
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
|
||||
garbageCollectorEnabled := false
|
||||
rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
|
||||
rm.internalPodInformer = podInformer
|
||||
|
@ -198,7 +198,7 @@ func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interfac
|
|||
|
||||
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
|
||||
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
|
||||
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
|
||||
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
|
||||
garbageCollectorEnabled := false
|
||||
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
|
||||
rm.internalPodInformer = podInformer
|
||||
|
|
|
@ -129,7 +129,7 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon
|
|||
break
|
||||
}
|
||||
|
||||
r.podInformer = informers.CreateSharedPodInformer(r.kubeClient, options.ResyncPeriod())
|
||||
r.podInformer = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
|
||||
result = r.podInformer
|
||||
|
||||
case api.Kind("Service"):
|
||||
|
|
|
@ -28,10 +28,10 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
|
|||
// Arrange
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
resyncPeriod := 5 * time.Minute
|
||||
podInformer := informers.CreateSharedPodIndexInformer(fakeKubeClient, resyncPeriod)
|
||||
nodeInformer := informers.CreateSharedNodeIndexInformer(fakeKubeClient, resyncPeriod)
|
||||
pvcInformer := informers.CreateSharedPVCIndexInformer(fakeKubeClient, resyncPeriod)
|
||||
pvInformer := informers.CreateSharedPVIndexInformer(fakeKubeClient, resyncPeriod)
|
||||
podInformer := informers.NewPodInformer(fakeKubeClient, resyncPeriod)
|
||||
nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod)
|
||||
pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod)
|
||||
pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod)
|
||||
|
||||
// Act
|
||||
_, err := NewAttachDetachController(
|
||||
|
|
|
@ -47,7 +47,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
|
|||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
ad := operationexecutor.NewOperationExecutor(
|
||||
fakeKubeClient, volumePluginMgr)
|
||||
nodeInformer := informers.CreateSharedNodeIndexInformer(
|
||||
nodeInformer := informers.NewNodeInformer(
|
||||
fakeKubeClient, resyncPeriod)
|
||||
nsu := statusupdater.NewNodeStatusUpdater(
|
||||
fakeKubeClient, nodeInformer, asw)
|
||||
|
|
|
@ -141,7 +141,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl
|
|||
resyncPeriodFunc := func() time.Duration {
|
||||
return resyncPeriod
|
||||
}
|
||||
podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
|
||||
podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
|
||||
rm := replicaset.NewReplicaSetController(
|
||||
podInformer,
|
||||
internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")),
|
||||
|
|
|
@ -138,7 +138,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl
|
|||
resyncPeriodFunc := func() time.Duration {
|
||||
return resyncPeriod
|
||||
}
|
||||
podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
|
||||
podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
|
||||
rm := replication.NewReplicationManager(
|
||||
podInformer,
|
||||
internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")),
|
||||
|
|
Loading…
Reference in New Issue