diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index cefe43fead..8f9283856d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -235,6 +235,16 @@ func (s *CMServer) ResyncPeriod() time.Duration { return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor) } +func clientForUserAgentOrDie(config client.Config, userAgent string) *client.Client { + fullUserAgent := client.DefaultKubernetesUserAgent() + "/" + userAgent + config.UserAgent = fullUserAgent + kubeClient, err := client.New(&config) + if err != nil { + glog.Fatalf("Invalid API configuration: %v", err) + } + return kubeClient +} + // Run runs the CMServer. This should never exit. func (s *CMServer) Run(_ []string) error { kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig) @@ -268,14 +278,17 @@ func (s *CMServer) Run(_ []string) error { glog.Fatal(server.ListenAndServe()) }() - go endpointcontroller.NewEndpointController(kubeClient, s.ResyncPeriod). + go endpointcontroller.NewEndpointController(clientForUserAgentOrDie(*kubeconfig, "endpoint-controller"), s.ResyncPeriod). Run(s.ConcurrentEndpointSyncs, util.NeverStop) - go replicationcontroller.NewReplicationManager(kubeClient, s.ResyncPeriod, replicationcontroller.BurstReplicas). - Run(s.ConcurrentRCSyncs, util.NeverStop) + go replicationcontroller.NewReplicationManager( + clientForUserAgentOrDie(*kubeconfig, "replication-controller"), + s.ResyncPeriod, + replicationcontroller.BurstReplicas, + ).Run(s.ConcurrentRCSyncs, util.NeverStop) if s.TerminatedPodGCThreshold > 0 { - go gc.New(kubeClient, s.ResyncPeriod, s.TerminatedPodGCThreshold). + go gc.New(clientForUserAgentOrDie(*kubeconfig, "garbage-collector"), s.ResyncPeriod, s.TerminatedPodGCThreshold). Run(util.NeverStop) } @@ -284,13 +297,13 @@ func (s *CMServer) Run(_ []string) error { glog.Fatalf("Cloud provider could not be initialized: %v", err) } - nodeController := nodecontroller.NewNodeController(cloud, kubeClient, + nodeController := nodecontroller.NewNodeController(cloud, clientForUserAgentOrDie(*kubeconfig, "node-controller"), s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) - serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) + serviceController := servicecontroller.New(cloud, clientForUserAgentOrDie(*kubeconfig, "service-controller"), s.ClusterName) if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil { glog.Errorf("Failed to start service controller: %v", err) } @@ -301,7 +314,7 @@ func (s *CMServer) Run(_ []string) error { } else if routes, ok := cloud.Routes(); !ok { glog.Warning("allocate-node-cidrs is set, but cloud provider does not support routes. Will not manage routes.") } else { - routeController := routecontroller.New(routes, kubeClient, s.ClusterName, &s.ClusterCIDR) + routeController := routecontroller.New(routes, clientForUserAgentOrDie(*kubeconfig, "route-controller"), s.ClusterName, &s.ClusterCIDR) routeController.Run(s.NodeSyncPeriod) } } else { @@ -309,7 +322,8 @@ func (s *CMServer) Run(_ []string) error { } go resourcequotacontroller.NewResourceQuotaController( - kubeClient, controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop) + clientForUserAgentOrDie(*kubeconfig, "resourcequota-controller"), + controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop) // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. @@ -331,7 +345,7 @@ func (s *CMServer) Run(_ []string) error { glog.Fatalf("Failed to get supported resources from server: %v", err) } - namespacecontroller.NewNamespaceController(kubeClient, versions, s.NamespaceSyncPeriod).Run() + namespacecontroller.NewNamespaceController(clientForUserAgentOrDie(*kubeconfig, "namespace-controller"), versions, s.NamespaceSyncPeriod).Run() groupVersion := "extensions/v1beta1" resources, found := resourceMap[groupVersion] @@ -340,34 +354,41 @@ func (s *CMServer) Run(_ []string) error { glog.Infof("Starting %s apis", groupVersion) if containsResource(resources, "horizontalpodautoscalers") { glog.Infof("Starting horizontal pod controller.") - metricsClient := metrics.NewHeapsterMetricsClient(kubeClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) - podautoscaler.NewHorizontalController(kubeClient, metricsClient). + hpaClient := clientForUserAgentOrDie(*kubeconfig, "horizontal-pod-autoscaler") + metricsClient := metrics.NewHeapsterMetricsClient( + hpaClient, + metrics.DefaultHeapsterNamespace, + metrics.DefaultHeapsterScheme, + metrics.DefaultHeapsterService, + metrics.DefaultHeapsterPort, + ) + podautoscaler.NewHorizontalController(hpaClient, metricsClient). Run(s.HorizontalPodAutoscalerSyncPeriod) } if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(kubeClient, s.ResyncPeriod). + go daemon.NewDaemonSetsController(clientForUserAgentOrDie(*kubeconfig, "daemon-set-controller"), s.ResyncPeriod). Run(s.ConcurrentDSCSyncs, util.NeverStop) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(kubeClient, s.ResyncPeriod). + go job.NewJobController(clientForUserAgentOrDie(*kubeconfig, "job-controller"), s.ResyncPeriod). Run(s.ConcurrentJobSyncs, util.NeverStop) } if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") - deployment.New(kubeClient). + deployment.New(clientForUserAgentOrDie(*kubeconfig, "deployment-controller")). Run(s.DeploymentControllerSyncPeriod) } } - pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) + pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-binder"), s.PVClaimBinderSyncPeriod) pvclaimBinder.Run() - pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)) + pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)) if err != nil { glog.Fatalf("Failed to start persistent volume recycler: %+v", err) } @@ -393,7 +414,7 @@ func (s *CMServer) Run(_ []string) error { glog.Errorf("Error reading key for service account token controller: %v", err) } else { serviceaccount.NewTokensController( - kubeClient, + clientForUserAgentOrDie(*kubeconfig, "tokens-controller"), serviceaccount.TokensControllerOptions{ TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey), RootCA: rootCA, @@ -403,7 +424,7 @@ func (s *CMServer) Run(_ []string) error { } serviceaccount.NewServiceAccountsController( - kubeClient, + clientForUserAgentOrDie(*kubeconfig, "service-account-controller"), serviceaccount.DefaultServiceAccountsControllerOptions(), ).Run()