diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 26153d9761..31d539d3a5 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -341,7 +341,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)). + go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), int(s.LookupCacheSizeForDaemonSet)). Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index e4b780e63e..2ff3f7955f 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/controller/job" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" nodecontroller "k8s.io/kubernetes/pkg/controller/node" @@ -262,8 +263,11 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, int(s.LookupCacheSizeForDaemonSet)). + informerFactory := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod()) + + go daemon.NewDaemonSetsController(informerFactory.DaemonSets(), informerFactory.Pods(), informerFactory.Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), int(s.LookupCacheSizeForDaemonSet)). Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) + informerFactory.Start(wait.NeverStop) } if containsResource(resources, "jobs") { diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 7bd312afc2..2654636df2 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -34,13 +34,11 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" - "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -66,13 +64,6 @@ type DaemonSetsController struct { eventRecorder record.EventRecorder podControl controller.PodControlInterface - // internalPodInformer is used to hold a personal informer. If we're using - // a normal shared informer, then the informer will be started for us. If - // we have a personal informer, we must start it ourselves. If you start - // the controller using NewDaemonSetsController(passing SharedInformer), this - // will be null - internalPodInformer cache.SharedInformer - // An dsc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int @@ -82,17 +73,11 @@ type DaemonSetsController struct { // A TTLCache of pod creates/deletes each ds expects to see expectations controller.ControllerExpectationsInterface // A store of daemon sets - dsStore cache.StoreToDaemonSetLister + dsStore *cache.StoreToDaemonSetLister // A store of pods - podStore cache.StoreToPodLister + podStore *cache.StoreToPodLister // A store of nodes - nodeStore cache.StoreToNodeLister - // Watches changes to all daemon sets. - dsController *cache.Controller - // Watches changes to all pods - podController cache.ControllerInterface - // Watches changes to all nodes. - nodeController *cache.Controller + nodeStore *cache.StoreToNodeLister // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced @@ -106,7 +91,7 @@ type DaemonSetsController struct { queue workqueue.RateLimitingInterface } -func NewDaemonSetsController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { +func NewDaemonSetsController(daemonSetInformer informers.DaemonSetInformer, podInformer informers.PodInformer, nodeInformer informers.NodeInformer, kubeClient clientset.Interface, lookupCacheSize int) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -126,93 +111,61 @@ func NewDaemonSetsController(podInformer cache.SharedIndexInformer, kubeClient c expectations: controller.NewControllerExpectations(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), } - // Manage addition/update of daemon sets. - dsc.dsStore.Store, dsc.dsController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(options) - }, - }, - &extensions.DaemonSet{}, - // TODO: Can we have much longer period here? - FullDaemonSetResyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - ds := obj.(*extensions.DaemonSet) - glog.V(4).Infof("Adding daemon set %s", ds.Name) - dsc.enqueueDaemonSet(ds) - }, - UpdateFunc: func(old, cur interface{}) { - oldDS := old.(*extensions.DaemonSet) - curDS := cur.(*extensions.DaemonSet) - // We should invalidate the whole lookup cache if a DS's selector has been updated. - // - // Imagine that you have two RSs: - // * old DS1 - // * new DS2 - // You also have a pod that is attached to DS2 (because it doesn't match DS1 selector). - // Now imagine that you are changing DS1 selector so that it is now matching that pod, - // in such case we must invalidate the whole cache so that pod could be adopted by DS1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) { - dsc.lookupCache.InvalidateAll() - } - glog.V(4).Infof("Updating daemon set %s", oldDS.Name) - dsc.enqueueDaemonSet(curDS) - }, - DeleteFunc: dsc.deleteDaemonset, + daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ds := obj.(*extensions.DaemonSet) + glog.V(4).Infof("Adding daemon set %s", ds.Name) + dsc.enqueueDaemonSet(ds) }, - ) + UpdateFunc: func(old, cur interface{}) { + oldDS := old.(*extensions.DaemonSet) + curDS := cur.(*extensions.DaemonSet) + // We should invalidate the whole lookup cache if a DS's selector has been updated. + // + // Imagine that you have two RSs: + // * old DS1 + // * new DS2 + // You also have a pod that is attached to DS2 (because it doesn't match DS1 selector). + // Now imagine that you are changing DS1 selector so that it is now matching that pod, + // in such case we must invalidate the whole cache so that pod could be adopted by DS1 + // + // This makes the lookup cache less helpful, but selector update does not happen often, + // so it's not a big problem + if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) { + dsc.lookupCache.InvalidateAll() + } + + glog.V(4).Infof("Updating daemon set %s", oldDS.Name) + dsc.enqueueDaemonSet(curDS) + }, + DeleteFunc: dsc.deleteDaemonset, + }) + dsc.dsStore = daemonSetInformer.Lister() // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dsc.addPod, UpdateFunc: dsc.updatePod, DeleteFunc: dsc.deletePod, }) - dsc.podStore.Indexer = podInformer.GetIndexer() - dsc.podController = podInformer.GetController() - dsc.podStoreSynced = podInformer.HasSynced + dsc.podStore = podInformer.Lister() + dsc.podStoreSynced = podInformer.Informer().HasSynced - // Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change, - dsc.nodeStore.Store, dsc.nodeController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return dsc.kubeClient.Core().Nodes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dsc.kubeClient.Core().Nodes().Watch(options) - }, - }, - &api.Node{}, - resyncPeriod(), - cache.ResourceEventHandlerFuncs{ - AddFunc: dsc.addNode, - UpdateFunc: dsc.updateNode, - }, + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dsc.addNode, + UpdateFunc: dsc.updateNode, + }, ) - dsc.nodeStoreSynced = dsc.nodeController.HasSynced + dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced + dsc.nodeStore = nodeInformer.Lister() dsc.syncHandler = dsc.syncDaemonSet dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return dsc } -func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { - podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) - dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize) - dsc.internalPodInformer = podInformer - - return dsc -} - func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) { ds, ok := obj.(*extensions.DaemonSet) if !ok { @@ -237,9 +190,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { defer dsc.queue.ShutDown() glog.Infof("Starting Daemon Sets controller manager") - go dsc.dsController.Run(stopCh) - go dsc.podController.Run(stopCh) - go dsc.nodeController.Run(stopCh) if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) { return @@ -249,10 +199,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { go wait.Until(dsc.runWorker, time.Second, stopCh) } - if dsc.internalPodInformer != nil { - go dsc.internalPodInformer.Run(stopCh) - } - <-stopCh glog.Infof("Shutting down Daemon Set Controller") } diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index d7e5a0fe8e..82b9848495 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -29,7 +29,9 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/securitycontext" + "k8s.io/kubernetes/pkg/util/wait" ) var ( @@ -136,7 +138,11 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num func newTestController() (*DaemonSetsController, *controller.FakePodControl) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0) + informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) + + manager := NewDaemonSetsController(informerFactory.DaemonSets(), informerFactory.Pods(), informerFactory.Nodes(), clientset, 0) + informerFactory.Start(wait.NeverStop) + manager.podStoreSynced = alwaysReady manager.nodeStoreSynced = alwaysReady podControl := &controller.FakePodControl{} diff --git a/pkg/controller/informers/core.go b/pkg/controller/informers/core.go index b471427f01..3931b3d8a3 100644 --- a/pkg/controller/informers/core.go +++ b/pkg/controller/informers/core.go @@ -18,9 +18,13 @@ package informers import ( "reflect" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" ) // PodInformer is type of SharedIndexInformer which watches and lists all pods. @@ -200,3 +204,94 @@ func (f *pvInformer) Lister() *cache.StoreToPVFetcher { informer := f.Informer() return &cache.StoreToPVFetcher{Store: informer.GetStore()} } + +// NewPodInformer returns a SharedIndexInformer that lists and watches all pods +func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + sharedIndexInformer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().Pods(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Pods(api.NamespaceAll).Watch(options) + }, + }, + &api.Pod{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + return sharedIndexInformer +} + +// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes +func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + sharedIndexInformer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().Nodes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Nodes().Watch(options) + }, + }, + &api.Node{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + return sharedIndexInformer +} + +// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs +func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + sharedIndexInformer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) + }, + }, + &api.PersistentVolumeClaim{}, + resyncPeriod, + cache.Indexers{}) + + return sharedIndexInformer +} + +// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs +func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + sharedIndexInformer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().PersistentVolumes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().PersistentVolumes().Watch(options) + }, + }, + &api.PersistentVolume{}, + resyncPeriod, + cache.Indexers{}) + + return sharedIndexInformer +} + +// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces +func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + sharedIndexInformer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().Namespaces().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Namespaces().Watch(options) + }, + }, + &api.Namespace{}, + resyncPeriod, + cache.Indexers{}) + + return sharedIndexInformer +} diff --git a/pkg/controller/informers/extensions.go b/pkg/controller/informers/extensions.go new file mode 100644 index 0000000000..a0539d28fd --- /dev/null +++ b/pkg/controller/informers/extensions.go @@ -0,0 +1,70 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "reflect" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// DaemonSetInformer is type of SharedIndexInformer which watches and lists all pods. +// Interface provides constructor for informer and lister for pods +type DaemonSetInformer interface { + Informer() cache.SharedIndexInformer + Lister() *cache.StoreToDaemonSetLister +} + +type daemonSetInformer struct { + *sharedInformerFactory +} + +func (f *daemonSetInformer) Informer() cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerType := reflect.TypeOf(&extensions.DaemonSet{}) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Extensions().DaemonSets(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Extensions().DaemonSets(api.NamespaceAll).Watch(options) + }, + }, + &extensions.DaemonSet{}, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +func (f *daemonSetInformer) Lister() *cache.StoreToDaemonSetLister { + informer := f.Informer() + return &cache.StoreToDaemonSetLister{Store: informer.GetIndexer()} +} diff --git a/pkg/controller/informers/factory.go b/pkg/controller/informers/factory.go index 31329fec68..6350c16fb3 100644 --- a/pkg/controller/informers/factory.go +++ b/pkg/controller/informers/factory.go @@ -21,11 +21,8 @@ import ( "sync" "time" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/watch" ) // SharedInformerFactory provides interface which holds unique informers for pods, nodes, namespaces, persistent volume @@ -39,6 +36,8 @@ type SharedInformerFactory interface { Namespaces() NamespaceInformer PersistentVolumeClaims() PVCInformer PersistentVolumes() PVInformer + + DaemonSets() DaemonSetInformer } type sharedInformerFactory struct { @@ -100,93 +99,6 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer { return &pvInformer{sharedInformerFactory: f} } -// NewPodInformer returns a SharedIndexInformer that lists and watches all pods -func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - sharedIndexInformer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - return sharedIndexInformer -} - -// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes -func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - sharedIndexInformer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().Nodes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().Nodes().Watch(options) - }, - }, - &api.Node{}, - resyncPeriod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - - return sharedIndexInformer -} - -// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs -func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - sharedIndexInformer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) - }, - }, - &api.PersistentVolumeClaim{}, - resyncPeriod, - cache.Indexers{}) - - return sharedIndexInformer -} - -// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs -func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - sharedIndexInformer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().PersistentVolumes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().PersistentVolumes().Watch(options) - }, - }, - &api.PersistentVolume{}, - resyncPeriod, - cache.Indexers{}) - - return sharedIndexInformer -} - -// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces -func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - sharedIndexInformer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().Namespaces().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().Namespaces().Watch(options) - }, - }, - &api.Namespace{}, - resyncPeriod, - cache.Indexers{}) - - return sharedIndexInformer +func (f *sharedInformerFactory) DaemonSets() DaemonSetInformer { + return &daemonSetInformer{sharedInformerFactory: f} }