diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 9341b640d5..60a713d82a 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -36,10 +36,15 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/daemon" - kendpoint "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/deployment" + endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/gc" + "k8s.io/kubernetes/pkg/controller/job" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" nodecontroller "k8s.io/kubernetes/pkg/controller/node" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume" + "k8s.io/kubernetes/pkg/controller/podautoscaler" + "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" routecontroller "k8s.io/kubernetes/pkg/controller/route" @@ -48,6 +53,7 @@ import ( "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/contrib/mesos/pkg/profile" kmendpoint "k8s.io/kubernetes/contrib/mesos/pkg/service" @@ -120,14 +126,16 @@ func (s *CMServer) Run(_ []string) error { glog.Fatal(server.ListenAndServe()) }() - endpoints := s.createEndpointController(kubeClient) + endpoints := s.createEndpointController(clientForUserAgentOrDie(*kubeconfig, "endpoint-controller")) go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) - go replicationcontroller.NewReplicationManager(kubeClient, s.resyncPeriod, replicationcontroller.BurstReplicas). + go replicationcontroller.NewReplicationManager(clientForUserAgentOrDie(*kubeconfig, "replication-controller"), s.resyncPeriod, replicationcontroller.BurstReplicas). Run(s.ConcurrentRCSyncs, util.NeverStop) - go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod). - Run(s.ConcurrentDSCSyncs, util.NeverStop) + if s.TerminatedPodGCThreshold > 0 { + go gc.New(clientForUserAgentOrDie(*kubeconfig, "garbage-collector"), s.resyncPeriod, s.TerminatedPodGCThreshold). + Run(util.NeverStop) + } //TODO(jdef) should eventually support more cloud providers here if s.CloudProvider != mesos.ProviderName { @@ -138,18 +146,18 @@ func (s *CMServer) Run(_ []string) error { glog.Fatalf("Cloud provider could not be initialized: %v", err) } - nodeController := nodecontroller.NewNodeController(cloud, kubeClient, + nodeController := nodecontroller.NewNodeController(cloud, clientForUserAgentOrDie(*kubeconfig, "node-controller"), s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) - nodeStatusUpdaterController := node.NewStatusUpdater(kubeClient, s.NodeMonitorPeriod, time.Now) + nodeStatusUpdaterController := node.NewStatusUpdater(clientForUserAgentOrDie(*kubeconfig, "node-status-controller"), s.NodeMonitorPeriod, time.Now) if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil { glog.Fatalf("Failed to start node status update controller: %v", err) } - serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) + serviceController := servicecontroller.New(cloud, clientForUserAgentOrDie(*kubeconfig, "service-controller"), s.ClusterName) if err := serviceController.Run(s.ServiceSyncPeriod, s.NodeSyncPeriod); err != nil { glog.Errorf("Failed to start service controller: %v", err) } @@ -159,33 +167,91 @@ func (s *CMServer) Run(_ []string) error { if !ok { glog.Fatal("Cloud provider must support routes if allocate-node-cidrs is set") } - routeController := routecontroller.New(routes, kubeClient, s.ClusterName, (*net.IPNet)(&s.ClusterCIDR)) + routeController := routecontroller.New(routes, clientForUserAgentOrDie(*kubeconfig, "route-controller"), s.ClusterName, (*net.IPNet)(&s.ClusterCIDR)) routeController.Run(s.NodeSyncPeriod) } go resourcequotacontroller.NewResourceQuotaController( - kubeClient, controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop) + clientForUserAgentOrDie(*kubeconfig, "resource-quota-controller"), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop) - namespaceController := namespacecontroller.NewNamespaceController(kubeClient, &unversioned.APIVersions{}, s.NamespaceSyncPeriod) + // If apiserver is not running we should wait for some time and fail only then. This is particularly + // important when we start apiserver and controller manager at the same time. + var versionStrings []string + err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + if versionStrings, err = client.ServerAPIVersions(kubeconfig); err == nil { + return true, nil + } + glog.Errorf("Failed to get api versions from server: %v", err) + return false, nil + }) + if err != nil { + glog.Fatalf("Failed to get api versions from server: %v", err) + } + versions := &unversioned.APIVersions{Versions: versionStrings} + + resourceMap, err := kubeClient.Discovery().ServerResources() + if err != nil { + glog.Fatalf("Failed to get supported resources from server: %v", err) + } + + namespaceController := namespacecontroller.NewNamespaceController(clientForUserAgentOrDie(*kubeconfig, "namespace-controller"), &unversioned.APIVersions{}, s.NamespaceSyncPeriod) namespaceController.Run() + groupVersion := "extensions/v1beta1" + resources, found := resourceMap[groupVersion] + // TODO(k8s): this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver + if containsVersion(versions, groupVersion) && found { + glog.Infof("Starting %s apis", groupVersion) + if containsResource(resources, "horizontalpodautoscalers") { + glog.Infof("Starting horizontal pod controller.") + hpaClient := clientForUserAgentOrDie(*kubeconfig, "horizontal-pod-autoscaler") + metricsClient := metrics.NewHeapsterMetricsClient( + hpaClient, + metrics.DefaultHeapsterNamespace, + metrics.DefaultHeapsterScheme, + metrics.DefaultHeapsterService, + metrics.DefaultHeapsterPort, + ) + podautoscaler.NewHorizontalController(hpaClient, hpaClient, hpaClient, metricsClient). + Run(s.HorizontalPodAutoscalerSyncPeriod) + } + + if containsResource(resources, "daemonsets") { + glog.Infof("Starting daemon set controller") + go daemon.NewDaemonSetsController(clientForUserAgentOrDie(*kubeconfig, "daemon-set-controller"), s.resyncPeriod). + Run(s.ConcurrentDSCSyncs, util.NeverStop) + } + + if containsResource(resources, "jobs") { + glog.Infof("Starting job controller") + go job.NewJobController(clientForUserAgentOrDie(*kubeconfig, "job-controller"), s.resyncPeriod). + Run(s.ConcurrentJobSyncs, util.NeverStop) + } + + if containsResource(resources, "deployments") { + glog.Infof("Starting deployment controller") + go deployment.NewDeploymentController(clientForUserAgentOrDie(*kubeconfig, "deployment-controller"), s.resyncPeriod). + Run(s.ConcurrentDeploymentSyncs, util.NeverStop) + } + } + volumePlugins := kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags) provisioner, err := kubecontrollermanager.NewVolumeProvisioner(cloud, s.VolumeConfigFlags) if err != nil { glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.") } - pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) + pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-binder"), s.PVClaimBinderSyncPeriod) pvclaimBinder.Run() - pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud) + pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud) if err != nil { glog.Fatalf("Failed to start persistent volume recycler: %+v", err) } pvRecycler.Run() if provisioner != nil { - pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(kubeClient), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud) + pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-controller")), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud) if err != nil { glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err) } @@ -212,7 +278,7 @@ func (s *CMServer) Run(_ []string) error { glog.Errorf("Error reading key for service account token controller: %v", err) } else { serviceaccountcontroller.NewTokensController( - kubeClient, + clientForUserAgentOrDie(*kubeconfig, "tokens-controller"), serviceaccountcontroller.TokensControllerOptions{ TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey), RootCA: rootCA, @@ -222,19 +288,48 @@ func (s *CMServer) Run(_ []string) error { } serviceaccountcontroller.NewServiceAccountsController( - kubeClient, + clientForUserAgentOrDie(*kubeconfig, "service-account-controller"), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), ).Run() select {} } +func clientForUserAgentOrDie(config client.Config, userAgent string) *client.Client { + fullUserAgent := client.DefaultKubernetesUserAgent() + "/" + userAgent + config.UserAgent = fullUserAgent + kubeClient, err := client.New(&config) + if err != nil { + glog.Fatalf("Invalid API configuration: %v", err) + } + return kubeClient +} + func (s *CMServer) createEndpointController(client *client.Client) kmendpoint.EndpointController { if s.UseHostPortEndpoints { glog.V(2).Infof("Creating hostIP:hostPort endpoint controller") return kmendpoint.NewEndpointController(client) } glog.V(2).Infof("Creating podIP:containerPort endpoint controller") - stockEndpointController := kendpoint.NewEndpointController(client, s.resyncPeriod) + stockEndpointController := endpointcontroller.NewEndpointController(client, s.resyncPeriod) return stockEndpointController } + +func containsVersion(versions *unversioned.APIVersions, version string) bool { + for ix := range versions.Versions { + if versions.Versions[ix] == version { + return true + } + } + return false +} + +func containsResource(resources *unversioned.APIResourceList, resourceName string) bool { + for ix := range resources.APIResources { + resource := resources.APIResources[ix] + if resource.Name == resourceName { + return true + } + } + return false +}