diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index bc27e3567a..9812f8f1b1 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -30,7 +30,6 @@ import ( "net/http" "net/http/pprof" "os" - "reflect" "strconv" "time" @@ -50,7 +49,6 @@ import ( "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" - "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/job" @@ -197,22 +195,14 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { - podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) - nodeInformer := informers.CreateSharedNodeIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)()) - pvcInformer := informers.CreateSharedPVCIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pvc-informer")), ResyncPeriod(s)()) - pvInformer := informers.CreateSharedPVIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pv-informer")), ResyncPeriod(s)()) - informers := map[reflect.Type]framework.SharedIndexInformer{} - informers[reflect.TypeOf(&api.Pod{})] = podInformer - informers[reflect.TypeOf(&api.Node{})] = nodeInformer - informers[reflect.TypeOf(&api.PersistentVolumeClaim{})] = pvcInformer - informers[reflect.TypeOf(&api.PersistentVolume{})] = pvInformer + sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)()) - go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). + go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go replicationcontroller.NewReplicationManager( - podInformer, + sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), ResyncPeriod(s), replicationcontroller.BurstReplicas, @@ -240,7 +230,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if err != nil { glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } - nodeController, err := nodecontroller.NewNodeController(podInformer, cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), + nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) @@ -284,7 +274,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig KubeClient: resourceQuotaControllerClient, ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), Registry: resourceQuotaRegistry, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(podInformer, resourceQuotaControllerClient), + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers.Pods().Informer(), resourceQuotaControllerClient), ReplenishmentResyncPeriod: ResyncPeriod(s), GroupKindsToReplenish: groupKindsToReplenish, } @@ -344,14 +334,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)). + go daemon.NewDaemonSetsController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)). Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). + go job.NewJobController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). Run(int(s.ConcurrentJobSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -365,7 +355,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). + go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). Run(int(s.ConcurrentRSSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -380,7 +370,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Starting PetSet controller") resyncPeriod := ResyncPeriod(s)() go petset.NewPetSetController( - podInformer, + sharedInformers.Pods().Informer(), // TODO: Switch to using clientset kubeClient, resyncPeriod, @@ -410,10 +400,10 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig attachDetachController, attachDetachControllerErr := attachdetach.NewAttachDetachController( clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")), - podInformer, - nodeInformer, - pvcInformer, - pvInformer, + sharedInformers.Pods().Informer(), + sharedInformers.Nodes().Informer(), + sharedInformers.PersistentVolumeClaims().Informer(), + sharedInformers.PersistentVolumes().Informer(), cloud, ProbeAttachableVolumePlugins(s.VolumeConfiguration)) if attachDetachControllerErr != nil { @@ -497,10 +487,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig } } - // run the shared informers - for _, informer := range informers { - go informer.Run(wait.NeverStop) - } + sharedInformers.Start(stop) select {} } diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index c713f4a72a..c65d9c13a7 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -205,7 +205,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie } func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize) dsc.internalPodInformer = podInformer diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index ce557bda81..77a2738351 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -113,7 +113,7 @@ func NewEndpointController(podInformer framework.SharedIndexInformer, client *cl // NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { - podInformer := informers.CreateSharedPodIndexInformer(client, resyncPeriod()) + podInformer := informers.NewPodInformer(client, resyncPeriod()) e := NewEndpointController(podInformer, client) e.internalPodInformer = podInformer diff --git a/pkg/controller/framework/informers/core.go b/pkg/controller/framework/informers/core.go index 744aaabad4..a4f40b5870 100644 --- a/pkg/controller/framework/informers/core.go +++ b/pkg/controller/framework/informers/core.go @@ -22,8 +22,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/watch" ) // PodInformer is type of SharedIndexInformer which watches and lists all pods. @@ -43,26 +41,12 @@ func (f *podInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.Pod{} - informerType := reflect.TypeOf(informerObj) + informerType := reflect.TypeOf(&api.Pod{}) informer, exists := f.informers[informerType] if exists { return informer } - - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + informer = NewPodInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer @@ -92,25 +76,13 @@ type namespaceInformer struct { func (f *namespaceInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.Namespace{} - informerType := reflect.TypeOf(informerObj) + + informerType := reflect.TypeOf(&api.Namespace{}) informer, exists := f.informers[informerType] if exists { return informer } - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().Namespaces().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().Namespaces().Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{}, - ) + informer = NewNamespaceInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer @@ -141,26 +113,12 @@ func (f *nodeInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.Node{} - informerType := reflect.TypeOf(informerObj) + informerType := reflect.TypeOf(&api.Node{}) informer, exists := f.informers[informerType] if exists { return informer } - - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().Nodes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().Nodes().Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + informer = NewNodeInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer @@ -191,26 +149,12 @@ func (f *pvcInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.PersistentVolumeClaim{} - informerType := reflect.TypeOf(informerObj) + informerType := reflect.TypeOf(&api.PersistentVolumeClaim{}) informer, exists := f.informers[informerType] if exists { return informer } - - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + informer = NewPVCInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer @@ -241,26 +185,12 @@ func (f *pvInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.PersistentVolume{} - informerType := reflect.TypeOf(informerObj) + informerType := reflect.TypeOf(&api.PersistentVolume{}) informer, exists := f.informers[informerType] if exists { return informer } - - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().PersistentVolumes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().PersistentVolumes().Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + informer = NewPVInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go index 850b42af00..de1a6918db 100644 --- a/pkg/controller/framework/informers/factory.go +++ b/pkg/controller/framework/informers/factory.go @@ -46,15 +46,20 @@ type sharedInformerFactory struct { client clientset.Interface lock sync.Mutex defaultResync time.Duration - informers map[reflect.Type]framework.SharedIndexInformer + + informers map[reflect.Type]framework.SharedIndexInformer + // startedInformers is used for tracking which informers have been started + // this allows calling of Start method multiple times + startedInformers map[reflect.Type]bool } // NewSharedInformerFactory constructs a new instance of sharedInformerFactory func NewSharedInformerFactory(client clientset.Interface, defaultResync time.Duration) SharedInformerFactory { return &sharedInformerFactory{ - client: client, - defaultResync: defaultResync, - informers: make(map[reflect.Type]framework.SharedIndexInformer), + client: client, + defaultResync: defaultResync, + informers: make(map[reflect.Type]framework.SharedIndexInformer), + startedInformers: make(map[reflect.Type]bool), } } @@ -63,8 +68,11 @@ func (s *sharedInformerFactory) Start(stopCh <-chan struct{}) { s.lock.Lock() defer s.lock.Unlock() - for _, informer := range s.informers { - go informer.Run(stopCh) + for informerType, informer := range s.informers { + if !s.startedInformers[informerType] { + go informer.Run(stopCh) + s.startedInformers[informerType] = true + } } } @@ -93,27 +101,8 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer { return &pvInformer{sharedInformerFactory: f} } -// CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods -func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { - sharedInformer := framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod, - cache.Indexers{}, - ) - - return sharedInformer -} - -// CreateSharedPodIndexInformer returns a SharedIndexInformer that lists and watches all pods -func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewPodInformer returns a SharedIndexInformer that lists and watches all pods +func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -131,8 +120,8 @@ func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time. return sharedIndexInformer } -// CreateSharedNodeIndexInformer returns a SharedIndexInformer that lists and watches all nodes -func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes +func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -149,8 +138,8 @@ func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time return sharedIndexInformer } -// CreateSharedPVCIndexInformer returns a SharedIndexInformer that lists and watches all PVCs -func CreateSharedPVCIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs +func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -167,8 +156,8 @@ func CreateSharedPVCIndexInformer(client clientset.Interface, resyncPeriod time. return sharedIndexInformer } -// CreateSharedPVIndexInformer returns a SharedIndexInformer that lists and watches all PVs -func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs +func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -185,8 +174,8 @@ func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.D return sharedIndexInformer } -// CreateSharedNamespaceIndexInformer returns a SharedIndexInformer that lists and watches namespaces -func CreateSharedNamespaceIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces +func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 46156d6879..b4639dec58 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -135,7 +135,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie } func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) jm := NewJobController(podInformer, kubeClient) jm.internalPodInformer = podInformer diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 5c20a1c6dc..77aa730ef7 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -343,7 +343,7 @@ func NewNodeControllerFromClient( serviceCIDR *net.IPNet, nodeCIDRMaskSize int, allocateNodeCIDRs bool) (*NodeController, error) { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, controller.NoResyncPeriodFunc()) + podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc()) nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs) if err != nil { diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 4ffa0b4837..976cb545bf 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -184,7 +184,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) garbageCollectorEnabled := false rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rsc.internalPodInformer = podInformer diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index e5ae8d1c1f..91660a5307 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -189,7 +189,7 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer frame // NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests. func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) garbageCollectorEnabled := false rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rm.internalPodInformer = podInformer @@ -198,7 +198,7 @@ func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interfac // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) garbageCollectorEnabled := false rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rm.internalPodInformer = podInformer diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go index f5c92b2b7f..46448f1d73 100644 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ b/pkg/controller/resourcequota/replenishment_controller.go @@ -129,7 +129,7 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon break } - r.podInformer = informers.CreateSharedPodInformer(r.kubeClient, options.ResyncPeriod()) + r.podInformer = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod()) result = r.podInformer case api.Kind("Service"): diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index dcf68470e4..5fad9405db 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -28,10 +28,10 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { // Arrange fakeKubeClient := controllervolumetesting.CreateTestClient() resyncPeriod := 5 * time.Minute - podInformer := informers.CreateSharedPodIndexInformer(fakeKubeClient, resyncPeriod) - nodeInformer := informers.CreateSharedNodeIndexInformer(fakeKubeClient, resyncPeriod) - pvcInformer := informers.CreateSharedPVCIndexInformer(fakeKubeClient, resyncPeriod) - pvInformer := informers.CreateSharedPVIndexInformer(fakeKubeClient, resyncPeriod) + podInformer := informers.NewPodInformer(fakeKubeClient, resyncPeriod) + nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod) + pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod) + pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod) // Act _, err := NewAttachDetachController( diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index 5ba08fcdef..4b2e564518 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -47,7 +47,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { fakeKubeClient := controllervolumetesting.CreateTestClient() ad := operationexecutor.NewOperationExecutor( fakeKubeClient, volumePluginMgr) - nodeInformer := informers.CreateSharedNodeIndexInformer( + nodeInformer := informers.NewNodeInformer( fakeKubeClient, resyncPeriod) nsu := statusupdater.NewNodeStatusUpdater( fakeKubeClient, nodeInformer, asw) diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index db79f578ba..5c34942f90 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -141,7 +141,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl resyncPeriodFunc := func() time.Duration { return resyncPeriod } - podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) + podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) rm := replicaset.NewReplicaSetController( podInformer, internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")), diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index 1b23a8940d..1aaed555e0 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -138,7 +138,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl resyncPeriodFunc := func() time.Duration { return resyncPeriod } - podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) + podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) rm := replication.NewReplicationManager( podInformer, internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")),