convert daemonset controller to shared informers

pull/6/head
deads2k 2016-09-15 16:27:47 -04:00
parent 00a203df1e
commit 234d68be83
7 changed files with 224 additions and 191 deletions

View File

@ -341,7 +341,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)).
go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
@ -262,8 +263,11 @@ func (s *CMServer) Run(_ []string) error {
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, int(s.LookupCacheSizeForDaemonSet)).
informerFactory := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod())
go daemon.NewDaemonSetsController(informerFactory.DaemonSets(), informerFactory.Pods(), informerFactory.Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop)
informerFactory.Start(wait.NeverStop)
}
if containsResource(resources, "jobs") {

View File

@ -34,13 +34,11 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -66,13 +64,6 @@ type DaemonSetsController struct {
eventRecorder record.EventRecorder
podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer cache.SharedInformer
// An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
@ -82,17 +73,11 @@ type DaemonSetsController struct {
// A TTLCache of pod creates/deletes each ds expects to see
expectations controller.ControllerExpectationsInterface
// A store of daemon sets
dsStore cache.StoreToDaemonSetLister
dsStore *cache.StoreToDaemonSetLister
// A store of pods
podStore cache.StoreToPodLister
podStore *cache.StoreToPodLister
// A store of nodes
nodeStore cache.StoreToNodeLister
// Watches changes to all daemon sets.
dsController *cache.Controller
// Watches changes to all pods
podController cache.ControllerInterface
// Watches changes to all nodes.
nodeController *cache.Controller
nodeStore *cache.StoreToNodeLister
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
@ -106,7 +91,7 @@ type DaemonSetsController struct {
queue workqueue.RateLimitingInterface
}
func NewDaemonSetsController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
func NewDaemonSetsController(daemonSetInformer informers.DaemonSetInformer, podInformer informers.PodInformer, nodeInformer informers.NodeInformer, kubeClient clientset.Interface, lookupCacheSize int) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
@ -126,93 +111,61 @@ func NewDaemonSetsController(podInformer cache.SharedIndexInformer, kubeClient c
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
}
// Manage addition/update of daemon sets.
dsc.dsStore.Store, dsc.dsController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(options)
},
},
&extensions.DaemonSet{},
// TODO: Can we have much longer period here?
FullDaemonSetResyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ds := obj.(*extensions.DaemonSet)
glog.V(4).Infof("Adding daemon set %s", ds.Name)
dsc.enqueueDaemonSet(ds)
},
UpdateFunc: func(old, cur interface{}) {
oldDS := old.(*extensions.DaemonSet)
curDS := cur.(*extensions.DaemonSet)
// We should invalidate the whole lookup cache if a DS's selector has been updated.
//
// Imagine that you have two RSs:
// * old DS1
// * new DS2
// You also have a pod that is attached to DS2 (because it doesn't match DS1 selector).
// Now imagine that you are changing DS1 selector so that it is now matching that pod,
// in such case we must invalidate the whole cache so that pod could be adopted by DS1
//
// This makes the lookup cache less helpful, but selector update does not happen often,
// so it's not a big problem
if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) {
dsc.lookupCache.InvalidateAll()
}
glog.V(4).Infof("Updating daemon set %s", oldDS.Name)
dsc.enqueueDaemonSet(curDS)
},
DeleteFunc: dsc.deleteDaemonset,
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ds := obj.(*extensions.DaemonSet)
glog.V(4).Infof("Adding daemon set %s", ds.Name)
dsc.enqueueDaemonSet(ds)
},
)
UpdateFunc: func(old, cur interface{}) {
oldDS := old.(*extensions.DaemonSet)
curDS := cur.(*extensions.DaemonSet)
// We should invalidate the whole lookup cache if a DS's selector has been updated.
//
// Imagine that you have two RSs:
// * old DS1
// * new DS2
// You also have a pod that is attached to DS2 (because it doesn't match DS1 selector).
// Now imagine that you are changing DS1 selector so that it is now matching that pod,
// in such case we must invalidate the whole cache so that pod could be adopted by DS1
//
// This makes the lookup cache less helpful, but selector update does not happen often,
// so it's not a big problem
if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) {
dsc.lookupCache.InvalidateAll()
}
glog.V(4).Infof("Updating daemon set %s", oldDS.Name)
dsc.enqueueDaemonSet(curDS)
},
DeleteFunc: dsc.deleteDaemonset,
})
dsc.dsStore = daemonSetInformer.Lister()
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
})
dsc.podStore.Indexer = podInformer.GetIndexer()
dsc.podController = podInformer.GetController()
dsc.podStoreSynced = podInformer.HasSynced
dsc.podStore = podInformer.Lister()
dsc.podStoreSynced = podInformer.Informer().HasSynced
// Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change,
dsc.nodeStore.Store, dsc.nodeController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dsc.kubeClient.Core().Nodes().Watch(options)
},
},
&api.Node{},
resyncPeriod(),
cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode,
},
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode,
},
)
dsc.nodeStoreSynced = dsc.nodeController.HasSynced
dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
dsc.nodeStore = nodeInformer.Lister()
dsc.syncHandler = dsc.syncDaemonSet
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return dsc
}
func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
dsc.internalPodInformer = podInformer
return dsc
}
func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
ds, ok := obj.(*extensions.DaemonSet)
if !ok {
@ -237,9 +190,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer dsc.queue.ShutDown()
glog.Infof("Starting Daemon Sets controller manager")
go dsc.dsController.Run(stopCh)
go dsc.podController.Run(stopCh)
go dsc.nodeController.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) {
return
@ -249,10 +199,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(dsc.runWorker, time.Second, stopCh)
}
if dsc.internalPodInformer != nil {
go dsc.internalPodInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down Daemon Set Controller")
}

View File

@ -29,7 +29,9 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/util/wait"
)
var (
@ -136,7 +138,11 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0)
informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
manager := NewDaemonSetsController(informerFactory.DaemonSets(), informerFactory.Pods(), informerFactory.Nodes(), clientset, 0)
informerFactory.Start(wait.NeverStop)
manager.podStoreSynced = alwaysReady
manager.nodeStoreSynced = alwaysReady
podControl := &controller.FakePodControl{}

View File

@ -18,9 +18,13 @@ package informers
import (
"reflect"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// PodInformer is type of SharedIndexInformer which watches and lists all pods.
@ -200,3 +204,94 @@ func (f *pvInformer) Lister() *cache.StoreToPVFetcher {
informer := f.Informer()
return &cache.StoreToPVFetcher{Store: informer.GetStore()}
}
// NewPodInformer returns a SharedIndexInformer that lists and watches all pods
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}
// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes
func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Nodes().Watch(options)
},
},
&api.Node{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
return sharedIndexInformer
}
// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs
func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}
// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs
func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}
// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces
func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"reflect"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// DaemonSetInformer is type of SharedIndexInformer which watches and lists all pods.
// Interface provides constructor for informer and lister for pods
type DaemonSetInformer interface {
Informer() cache.SharedIndexInformer
Lister() *cache.StoreToDaemonSetLister
}
type daemonSetInformer struct {
*sharedInformerFactory
}
func (f *daemonSetInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&extensions.DaemonSet{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Extensions().DaemonSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Extensions().DaemonSets(api.NamespaceAll).Watch(options)
},
},
&extensions.DaemonSet{},
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
func (f *daemonSetInformer) Lister() *cache.StoreToDaemonSetLister {
informer := f.Informer()
return &cache.StoreToDaemonSetLister{Store: informer.GetIndexer()}
}

View File

@ -21,11 +21,8 @@ import (
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// SharedInformerFactory provides interface which holds unique informers for pods, nodes, namespaces, persistent volume
@ -39,6 +36,8 @@ type SharedInformerFactory interface {
Namespaces() NamespaceInformer
PersistentVolumeClaims() PVCInformer
PersistentVolumes() PVInformer
DaemonSets() DaemonSetInformer
}
type sharedInformerFactory struct {
@ -100,93 +99,6 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer {
return &pvInformer{sharedInformerFactory: f}
}
// NewPodInformer returns a SharedIndexInformer that lists and watches all pods
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}
// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes
func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Nodes().Watch(options)
},
},
&api.Node{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
return sharedIndexInformer
}
// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs
func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}
// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs
func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}
// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces
func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
func (f *sharedInformerFactory) DaemonSets() DaemonSetInformer {
return &daemonSetInformer{sharedInformerFactory: f}
}