diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index eb8c722dd8..d8e1fe3e9f 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -181,7 +181,7 @@ func Run(s *options.CMServer) error { clientBuilder = rootClientBuilder } - err := StartControllers(s, rootClientBuilder, clientBuilder, stop) + err := StartControllers(newControllerInitializers(), s, rootClientBuilder, clientBuilder, stop) glog.Fatalf("error running controllers: %v", err) panic("unreachable") } @@ -224,6 +224,127 @@ func Run(s *options.CMServer) error { panic("unreachable") } +type ControllerContext struct { + // ClientBuilder will provide a client for this controller to use + ClientBuilder controller.ControllerClientBuilder + + // InformerFactory gives access to informers for the controller + InformerFactory informers.SharedInformerFactory + + // Options provides access to init options for a given controller + Options options.CMServer + + // AvailableResources is a map listing currently available resources + AvailableResources map[schema.GroupVersionResource]bool + + // Stop is the stop channel + Stop <-chan struct{} +} + +// InitFunc is used to launch a particular controller. It may run additional "should I activate checks". +// Any error returned will cause the controller process to `Fatal` +type InitFunc func(ctx ControllerContext) (bool, error) + +func newControllerInitializers() map[string]InitFunc { + controllers := map[string]InitFunc{} + controllers["endpoint"] = startEndpointController + controllers["replicationcontroller"] = startReplicationController + controllers["podgc"] = startEndpointController + controllers["resourcequota"] = startResourceQuotaController + controllers["namespace"] = startNamespaceController + + return controllers +} + +func startEndpointController(ctx ControllerContext) (bool, error) { + go endpointcontroller.NewEndpointController( + ctx.InformerFactory.Pods().Informer(), + ctx.ClientBuilder.ClientOrDie("endpoint-controller"), + ).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop) + return true, nil +} + +func startReplicationController(ctx ControllerContext) (bool, error) { + go replicationcontroller.NewReplicationManager( + ctx.InformerFactory.Pods().Informer(), + ctx.ClientBuilder.ClientOrDie("replication-controller"), + ResyncPeriod(&ctx.Options), + replicationcontroller.BurstReplicas, + int(ctx.Options.LookupCacheSizeForRC), + ctx.Options.EnableGarbageCollector, + ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) + return true, nil +} + +func startPodGCController(ctx ControllerContext) (bool, error) { + go podgc.NewPodGC( + ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"), + ctx.InformerFactory.Pods().Informer(), + int(ctx.Options.TerminatedPodGCThreshold), + ).Run(ctx.Stop) + return true, nil +} + +func startResourceQuotaController(ctx ControllerContext) (bool, error) { + resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") + resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory) + groupKindsToReplenish := []schema.GroupKind{ + api.Kind("Pod"), + api.Kind("Service"), + api.Kind("ReplicationController"), + api.Kind("PersistentVolumeClaim"), + api.Kind("Secret"), + api.Kind("ConfigMap"), + } + resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ + KubeClient: resourceQuotaControllerClient, + ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration), + Registry: resourceQuotaRegistry, + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient), + ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options), + GroupKindsToReplenish: groupKindsToReplenish, + } + go resourcequotacontroller.NewResourceQuotaController( + resourceQuotaControllerOptions, + ).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop) + + return true, nil +} + +func startNamespaceController(ctx ControllerContext) (bool, error) { + // TODO: should use a dynamic RESTMapper built from the discovery results. + restMapper := registered.RESTMapper() + + // Find the list of namespaced resources via discovery that the namespace controller must manage + namespaceKubeClient := ctx.ClientBuilder.ClientOrDie("namespace-controller") + namespaceClientPool := dynamic.NewClientPool(ctx.ClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) + // TODO: consider using a list-watch + cache here rather than polling + resources, err := namespaceKubeClient.Discovery().ServerResources() + if err != nil { + return true, fmt.Errorf("failed to get preferred server resources: %v", err) + } + gvrs, err := discovery.GroupVersionResources(resources) + if err != nil { + return true, fmt.Errorf("failed to parse preferred server resources: %v", err) + } + discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources + if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found { + // make discovery static + snapshot, err := discoverResourcesFn() + if err != nil { + return true, fmt.Errorf("failed to get server resources: %v", err) + } + discoverResourcesFn = func() ([]*metav1.APIResourceList, error) { + return snapshot, nil + } + } + namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) + go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop) + + return true, nil + +} + // TODO: In general, any controller checking this needs to be dynamic so // users don't have to restart their controller manager if they change the apiserver. func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (map[schema.GroupVersionResource]bool, error) { @@ -264,7 +385,7 @@ func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma return allResources, nil } -func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { +func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error { sharedInformers := informers.NewSharedInformerFactory(rootClientBuilder.ClientOrDie("shared-informers"), nil, ResyncPeriod(s)()) // always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest @@ -302,23 +423,28 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont return err } - go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientBuilder.ClientOrDie("endpoint-controller")). - Run(int(s.ConcurrentEndpointSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + ctx := ControllerContext{ + ClientBuilder: clientBuilder, + InformerFactory: sharedInformers, + Options: *s, + AvailableResources: availableResources, + Stop: stop, + } - go replicationcontroller.NewReplicationManager( - sharedInformers.Pods().Informer(), - clientBuilder.ClientOrDie("replication-controller"), - ResyncPeriod(s), - replicationcontroller.BurstReplicas, - int(s.LookupCacheSizeForRC), - s.EnableGarbageCollector, - ).Run(int(s.ConcurrentRCSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + for controllerName, initFn := range controllers { + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - go podgc.NewPodGC(clientBuilder.ClientOrDie("pod-garbage-collector"), sharedInformers.Pods().Informer(), - int(s.TerminatedPodGCThreshold)).Run(stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + glog.V(1).Infof("Starting %q", controllerName) + started, err := initFn(ctx) + if err != nil { + glog.Errorf("Error starting %q", controllerName) + return err + } + if !started { + glog.Warningf("Skipping %q", controllerName) + } + glog.Infof("Started %q", controllerName) + } cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { @@ -367,57 +493,6 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes) } - resourceQuotaControllerClient := clientBuilder.ClientOrDie("resourcequota-controller") - resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers) - groupKindsToReplenish := []schema.GroupKind{ - api.Kind("Pod"), - api.Kind("Service"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - api.Kind("Secret"), - api.Kind("ConfigMap"), - } - resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ - KubeClient: resourceQuotaControllerClient, - ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), - Registry: resourceQuotaRegistry, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers, resourceQuotaControllerClient), - ReplenishmentResyncPeriod: ResyncPeriod(s), - GroupKindsToReplenish: groupKindsToReplenish, - } - go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - - // TODO: should use a dynamic RESTMapper built from the discovery results. - restMapper := registered.RESTMapper() - - // Find the list of namespaced resources via discovery that the namespace controller must manage - namespaceKubeClient := clientBuilder.ClientOrDie("namespace-controller") - namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) - // TODO: consider using a list-watch + cache here rather than polling - resources, err := namespaceKubeClient.Discovery().ServerResources() - if err != nil { - return fmt.Errorf("failed to get preferred server resources: %v", err) - } - gvrs, err := discovery.GroupVersionResources(resources) - if err != nil { - return fmt.Errorf("failed to parse preferred server resources: %v", err) - } - discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources - if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found { - // make discovery static - snapshot, err := discoverResourcesFn() - if err != nil { - return fmt.Errorf("failed to get server resources: %v", err) - } - discoverResourcesFn = func() ([]*metav1.APIResourceList, error) { - return snapshot, nil - } - } - namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) - go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop) - time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) - if availableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] { go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientBuilder.ClientOrDie("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)). Run(int(s.ConcurrentDaemonSetSyncs), stop) @@ -546,6 +621,9 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if s.EnableGarbageCollector { + // TODO: should use a dynamic RESTMapper built from the discovery results. + restMapper := registered.RESTMapper() + gcClientset := clientBuilder.ClientOrDie("generic-garbage-collector") preferredResources, err := gcClientset.Discovery().ServerPreferredResources() if err != nil { diff --git a/pkg/controller/informers/BUILD b/pkg/controller/informers/BUILD index 784d85d9d3..1a5eb62dd2 100644 --- a/pkg/controller/informers/BUILD +++ b/pkg/controller/informers/BUILD @@ -39,5 +39,6 @@ go_library( "//pkg/runtime:go_default_library", "//pkg/runtime/schema:go_default_library", "//pkg/watch:go_default_library", + "//vendor:github.com/golang/glog", ], ) diff --git a/pkg/controller/informers/factory.go b/pkg/controller/informers/factory.go index 04a0f9b154..6e47086e55 100644 --- a/pkg/controller/informers/factory.go +++ b/pkg/controller/informers/factory.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" @@ -90,8 +92,11 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() + glog.V(1).Infoln("Starting informer factory") + for informerType, informer := range f.informers { if !f.startedInformers[informerType] { + glog.V(2).Infof("Starting informer for %v", informerType) go informer.Run(stopCh) f.startedInformers[informerType] = true }