demonstrate separation of controller intializers

pull/6/head
deads2k 2016-12-02 14:18:16 -05:00
parent 45a436ac24
commit 5788317953
3 changed files with 152 additions and 68 deletions

View File

@ -181,7 +181,7 @@ func Run(s *options.CMServer) error {
clientBuilder = rootClientBuilder
}
err := StartControllers(s, rootClientBuilder, clientBuilder, stop)
err := StartControllers(newControllerInitializers(), s, rootClientBuilder, clientBuilder, stop)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
}
@ -224,6 +224,127 @@ func Run(s *options.CMServer) error {
panic("unreachable")
}
type ControllerContext struct {
// ClientBuilder will provide a client for this controller to use
ClientBuilder controller.ControllerClientBuilder
// InformerFactory gives access to informers for the controller
InformerFactory informers.SharedInformerFactory
// Options provides access to init options for a given controller
Options options.CMServer
// AvailableResources is a map listing currently available resources
AvailableResources map[schema.GroupVersionResource]bool
// Stop is the stop channel
Stop <-chan struct{}
}
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal`
type InitFunc func(ctx ControllerContext) (bool, error)
func newControllerInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startEndpointController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
return controllers
}
func startEndpointController(ctx ControllerContext) (bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
return true, nil
}
func startReplicationController(ctx ControllerContext) (bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Pods().Informer(),
ctx.ClientBuilder.ClientOrDie("replication-controller"),
ResyncPeriod(&ctx.Options),
replicationcontroller.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRC),
ctx.Options.EnableGarbageCollector,
).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
return true, nil
}
func startPodGCController(ctx ControllerContext) (bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Pods().Informer(),
int(ctx.Options.TerminatedPodGCThreshold),
).Run(ctx.Stop)
return true, nil
}
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
api.Kind("ConfigMap"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
GroupKindsToReplenish: groupKindsToReplenish,
}
go resourcequotacontroller.NewResourceQuotaController(
resourceQuotaControllerOptions,
).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)
return true, nil
}
func startNamespaceController(ctx ControllerContext) (bool, error) {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := ctx.ClientBuilder.ClientOrDie("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(ctx.ClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
// TODO: consider using a list-watch + cache here rather than polling
resources, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
return true, fmt.Errorf("failed to get preferred server resources: %v", err)
}
gvrs, err := discovery.GroupVersionResources(resources)
if err != nil {
return true, fmt.Errorf("failed to parse preferred server resources: %v", err)
}
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found {
// make discovery static
snapshot, err := discoverResourcesFn()
if err != nil {
return true, fmt.Errorf("failed to get server resources: %v", err)
}
discoverResourcesFn = func() ([]*metav1.APIResourceList, error) {
return snapshot, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop)
return true, nil
}
// TODO: In general, any controller checking this needs to be dynamic so
// users don't have to restart their controller manager if they change the apiserver.
func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) {
@ -264,7 +385,7 @@ func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma
return allResources, nil
}
func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
sharedInformers := informers.NewSharedInformerFactory(rootClientBuilder.ClientOrDie("shared-informers"), nil, ResyncPeriod(s)())
// always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
@ -302,23 +423,28 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
return err
}
go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("endpoint-controller")).
Run(int(s.ConcurrentEndpointSyncs), stop)
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
Options: *s,
AvailableResources: availableResources,
Stop: stop,
}
for controllerName, initFn := range controllers {
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go replicationcontroller.NewReplicationManager(
sharedInformers.Pods().Informer(),
clientBuilder.ClientOrDie("replication-controller"),
ResyncPeriod(s),
replicationcontroller.BurstReplicas,
int(s.LookupCacheSizeForRC),
s.EnableGarbageCollector,
).Run(int(s.ConcurrentRCSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go podgc.NewPodGC(clientBuilder.ClientOrDie("pod-garbage-collector"), sharedInformers.Pods().Informer(),
int(s.TerminatedPodGCThreshold)).Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof("Starting %q", controllerName)
started, err := initFn(ctx)
if err != nil {
glog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
glog.Warningf("Skipping %q", controllerName)
}
glog.Infof("Started %q", controllerName)
}
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
@ -367,57 +493,6 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
}
resourceQuotaControllerClient := clientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers)
groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
api.Kind("ConfigMap"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish,
}
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
// Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := clientBuilder.ClientOrDie("namespace-controller")
namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
// TODO: consider using a list-watch + cache here rather than polling
resources, err := namespaceKubeClient.Discovery().ServerResources()
if err != nil {
return fmt.Errorf("failed to get preferred server resources: %v", err)
}
gvrs, err := discovery.GroupVersionResources(resources)
if err != nil {
return fmt.Errorf("failed to parse preferred server resources: %v", err)
}
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found {
// make discovery static
snapshot, err := discoverResourcesFn()
if err != nil {
return fmt.Errorf("failed to get server resources: %v", err)
}
discoverResourcesFn = func() ([]*metav1.APIResourceList, error) {
return snapshot, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] {
go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientBuilder.ClientOrDie("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), stop)
@ -546,6 +621,9 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if s.EnableGarbageCollector {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := registered.RESTMapper()
gcClientset := clientBuilder.ClientOrDie("generic-garbage-collector")
preferredResources, err := gcClientset.Discovery().ServerPreferredResources()
if err != nil {

View File

@ -39,5 +39,6 @@ go_library(
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View File

@ -21,6 +21,8 @@ import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
@ -90,8 +92,11 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
glog.V(1).Infoln("Starting informer factory")
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
glog.V(2).Infof("Starting informer for %v", informerType)
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}