2016-10-08 00:39:21 +00:00
/ *
Copyright 2016 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package app
import (
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
2017-02-28 18:43:08 +00:00
goruntime "runtime"
2016-10-08 00:39:21 +00:00
"strconv"
2017-07-13 13:08:43 +00:00
"strings"
2016-10-08 00:39:21 +00:00
"time"
2017-07-15 05:25:54 +00:00
"k8s.io/api/core/v1"
2017-01-11 14:09:48 +00:00
"k8s.io/apimachinery/pkg/util/wait"
2017-01-17 10:38:25 +00:00
"k8s.io/apiserver/pkg/server/healthz"
2017-06-23 20:56:37 +00:00
"k8s.io/client-go/informers"
2017-07-08 00:19:03 +00:00
"k8s.io/client-go/kubernetes"
2017-06-23 20:56:37 +00:00
clientset "k8s.io/client-go/kubernetes"
2017-01-30 18:39:54 +00:00
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
2017-01-19 18:27:59 +00:00
restclient "k8s.io/client-go/rest"
2017-01-20 18:06:17 +00:00
"k8s.io/client-go/tools/clientcmd"
2017-07-07 20:59:32 +00:00
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
2017-01-30 18:39:54 +00:00
"k8s.io/client-go/tools/record"
2016-10-08 00:39:21 +00:00
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
2017-10-16 11:41:50 +00:00
"k8s.io/kubernetes/pkg/api/legacyscheme"
2016-10-08 00:39:21 +00:00
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
2017-04-17 20:13:55 +00:00
cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud"
2016-10-08 00:39:21 +00:00
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
"k8s.io/kubernetes/pkg/util/configz"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
const (
2017-10-03 05:45:14 +00:00
// ControllerStartJitter is the jitter value used when starting controller managers.
2016-10-08 00:39:21 +00:00
ControllerStartJitter = 1.0
)
2016-12-17 17:27:48 +00:00
// NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters
func NewCloudControllerManagerCommand ( ) * cobra . Command {
s := options . NewCloudControllerManagerServer ( )
2016-10-08 00:39:21 +00:00
s . AddFlags ( pflag . CommandLine )
cmd := & cobra . Command {
Use : "cloud-controller-manager" ,
2016-12-17 17:27:48 +00:00
Long : ` The Cloud controller manager is a daemon that embeds
the cloud specific control loops shipped with Kubernetes . ` ,
2016-10-08 00:39:21 +00:00
Run : func ( cmd * cobra . Command , args [ ] string ) {
} ,
}
return cmd
}
2017-01-03 23:33:01 +00:00
// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server
func resyncPeriod ( s * options . CloudControllerManagerServer ) func ( ) time . Duration {
2016-10-08 00:39:21 +00:00
return func ( ) time . Duration {
factor := rand . Float64 ( ) + 1
return time . Duration ( float64 ( s . MinResyncPeriod . Nanoseconds ( ) ) * factor )
}
}
// Run runs the ExternalCMServer. This should never exit.
2017-10-27 17:55:31 +00:00
func Run ( s * options . CloudControllerManagerServer ) error {
if s . CloudProvider == "" {
2017-10-27 17:55:39 +00:00
glog . Fatalf ( "--cloud-provider cannot be empty" )
2017-10-27 17:55:31 +00:00
}
cloud , err := cloudprovider . InitCloudProvider ( s . CloudProvider , s . CloudConfigFile )
if err != nil {
glog . Fatalf ( "Cloud provider could not be initialized: %v" , err )
}
if cloud . HasClusterID ( ) == false {
if s . AllowUntaggedCloud == true {
glog . Warning ( "detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues" )
} else {
glog . Fatalf ( "no ClusterID found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option" )
}
}
2016-10-08 00:39:21 +00:00
if c , err := configz . New ( "componentconfig" ) ; err == nil {
c . Set ( s . KubeControllerManagerConfiguration )
} else {
glog . Errorf ( "unable to register configz: %s" , err )
}
kubeconfig , err := clientcmd . BuildConfigFromFlags ( s . Master , s . Kubeconfig )
if err != nil {
return err
}
2016-12-17 17:27:48 +00:00
// Set the ContentType of the requests from kube client
2016-10-08 00:39:21 +00:00
kubeconfig . ContentConfig . ContentType = s . ContentType
// Override kubeconfig qps/burst settings from flags
kubeconfig . QPS = s . KubeAPIQPS
kubeconfig . Burst = int ( s . KubeAPIBurst )
kubeClient , err := clientset . NewForConfig ( restclient . AddUserAgent ( kubeconfig , "cloud-controller-manager" ) )
if err != nil {
glog . Fatalf ( "Invalid API configuration: %v" , err )
}
2017-07-08 00:19:03 +00:00
leaderElectionClient := kubernetes . NewForConfigOrDie ( restclient . AddUserAgent ( kubeconfig , "leader-election" ) )
2016-10-08 00:39:21 +00:00
2016-12-17 17:27:48 +00:00
// Start the external controller manager server
2017-08-26 06:55:53 +00:00
go startHTTP ( s )
2016-10-08 00:39:21 +00:00
2017-08-26 06:55:53 +00:00
recorder := createRecorder ( kubeClient )
2016-10-08 00:39:21 +00:00
run := func ( stop <- chan struct { } ) {
rootClientBuilder := controller . SimpleControllerClientBuilder {
ClientConfig : kubeconfig ,
}
var clientBuilder controller . ControllerClientBuilder
2017-08-17 05:41:37 +00:00
if s . UseServiceAccountCredentials {
2016-10-08 00:39:21 +00:00
clientBuilder = controller . SAControllerClientBuilder {
2017-02-17 20:48:22 +00:00
ClientConfig : restclient . AnonymousClientConfig ( kubeconfig ) ,
2017-07-25 04:35:12 +00:00
CoreClient : kubeClient . CoreV1 ( ) ,
2017-02-17 20:48:22 +00:00
AuthenticationClient : kubeClient . Authentication ( ) ,
Namespace : "kube-system" ,
2016-10-08 00:39:21 +00:00
}
} else {
clientBuilder = rootClientBuilder
}
2017-08-17 05:41:37 +00:00
err := StartControllers ( s , kubeconfig , clientBuilder , stop , recorder , cloud )
2016-10-08 00:39:21 +00:00
glog . Fatalf ( "error running controllers: %v" , err )
panic ( "unreachable" )
}
if ! s . LeaderElection . LeaderElect {
run ( nil )
panic ( "unreachable" )
}
2016-12-17 17:27:48 +00:00
// Identity used to distinguish between multiple cloud controller manager instances
2016-10-08 00:39:21 +00:00
id , err := os . Hostname ( )
if err != nil {
return err
}
2016-12-17 17:27:48 +00:00
// Lock required for leader election
2017-11-06 00:06:06 +00:00
rl , err := resourcelock . New ( s . LeaderElection . ResourceLock ,
"kube-system" ,
"cloud-controller-manager" ,
leaderElectionClient . CoreV1 ( ) ,
resourcelock . ResourceLockConfig {
2016-10-08 00:39:21 +00:00
Identity : id + "-external-cloud-controller" ,
EventRecorder : recorder ,
2017-11-06 00:06:06 +00:00
} )
if err != nil {
glog . Fatalf ( "error creating lock: %v" , err )
2016-10-08 00:39:21 +00:00
}
2016-12-17 17:27:48 +00:00
// Try and become the leader and start cloud controller manager loops
2016-10-08 00:39:21 +00:00
leaderelection . RunOrDie ( leaderelection . LeaderElectionConfig {
2017-11-06 00:06:06 +00:00
Lock : rl ,
2016-10-08 00:39:21 +00:00
LeaseDuration : s . LeaderElection . LeaseDuration . Duration ,
RenewDeadline : s . LeaderElection . RenewDeadline . Duration ,
RetryPeriod : s . LeaderElection . RetryPeriod . Duration ,
Callbacks : leaderelection . LeaderCallbacks {
OnStartedLeading : run ,
OnStoppedLeading : func ( ) {
glog . Fatalf ( "leaderelection lost" )
} ,
} ,
} )
panic ( "unreachable" )
}
2016-12-17 17:27:48 +00:00
// StartControllers starts the cloud specific controller loops.
2017-08-17 05:41:37 +00:00
func StartControllers ( s * options . CloudControllerManagerServer , kubeconfig * restclient . Config , clientBuilder controller . ControllerClientBuilder , stop <- chan struct { } , recorder record . EventRecorder , cloud cloudprovider . Interface ) error {
2016-12-17 17:27:48 +00:00
// Function to build the kube client object
2016-10-08 00:39:21 +00:00
client := func ( serviceAccountName string ) clientset . Interface {
2017-08-17 05:41:37 +00:00
return clientBuilder . ClientOrDie ( serviceAccountName )
2016-10-08 00:39:21 +00:00
}
2017-05-17 21:38:25 +00:00
if cloud != nil {
// Initialize the cloud provider with a reference to the clientBuilder
cloud . Initialize ( clientBuilder )
}
2017-02-06 18:35:50 +00:00
versionedClient := client ( "shared-informers" )
2017-02-24 14:52:43 +00:00
sharedInformers := informers . NewSharedInformerFactory ( versionedClient , resyncPeriod ( s ) ( ) )
2016-10-08 00:39:21 +00:00
2016-12-17 17:27:48 +00:00
// Start the CloudNodeController
2017-04-17 20:13:55 +00:00
nodeController := cloudcontrollers . NewCloudNodeController (
2017-02-24 14:52:43 +00:00
sharedInformers . Core ( ) . V1 ( ) . Nodes ( ) ,
2016-10-08 00:39:21 +00:00
client ( "cloud-node-controller" ) , cloud ,
2017-03-29 23:21:42 +00:00
s . NodeMonitorPeriod . Duration ,
s . NodeStatusUpdateFrequency . Duration )
2017-02-06 07:33:27 +00:00
2016-10-08 00:39:21 +00:00
nodeController . Run ( )
time . Sleep ( wait . Jitter ( s . ControllerStartInterval . Duration , ControllerStartJitter ) )
2017-04-17 20:13:55 +00:00
// Start the PersistentVolumeLabelController
pvlController := cloudcontrollers . NewPersistentVolumeLabelController ( client ( "pvl-controller" ) , cloud )
threads := 5
go pvlController . Run ( threads , stop )
time . Sleep ( wait . Jitter ( s . ControllerStartInterval . Duration , ControllerStartJitter ) )
2016-12-17 17:27:48 +00:00
// Start the service controller
2017-02-13 19:28:12 +00:00
serviceController , err := servicecontroller . New (
cloud ,
client ( "service-controller" ) ,
2017-02-24 14:52:43 +00:00
sharedInformers . Core ( ) . V1 ( ) . Services ( ) ,
sharedInformers . Core ( ) . V1 ( ) . Nodes ( ) ,
2017-02-13 19:28:12 +00:00
s . ClusterName ,
)
2016-10-08 00:39:21 +00:00
if err != nil {
glog . Errorf ( "Failed to start service controller: %v" , err )
} else {
2017-02-13 19:28:12 +00:00
go serviceController . Run ( stop , int ( s . ConcurrentServiceSyncs ) )
2017-09-06 02:41:31 +00:00
time . Sleep ( wait . Jitter ( s . ControllerStartInterval . Duration , ControllerStartJitter ) )
2016-10-08 00:39:21 +00:00
}
2016-12-17 17:27:48 +00:00
// If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller
2016-10-08 00:39:21 +00:00
if s . AllocateNodeCIDRs && s . ConfigureCloudRoutes {
if routes , ok := cloud . Routes ( ) ; ! ok {
glog . Warning ( "configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes." )
} else {
2017-07-13 13:08:43 +00:00
var clusterCIDR * net . IPNet
if len ( strings . TrimSpace ( s . ClusterCIDR ) ) != 0 {
_ , clusterCIDR , err = net . ParseCIDR ( s . ClusterCIDR )
if err != nil {
glog . Warningf ( "Unsuccessful parsing of cluster CIDR %v: %v" , s . ClusterCIDR , err )
}
}
2017-02-24 14:52:43 +00:00
routeController := routecontroller . New ( routes , client ( "route-controller" ) , sharedInformers . Core ( ) . V1 ( ) . Nodes ( ) , s . ClusterName , clusterCIDR )
2017-04-12 05:03:41 +00:00
go routeController . Run ( stop , s . RouteReconciliationPeriod . Duration )
2016-10-08 00:39:21 +00:00
time . Sleep ( wait . Jitter ( s . ControllerStartInterval . Duration , ControllerStartJitter ) )
}
} else {
glog . Infof ( "Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v." , s . AllocateNodeCIDRs , s . ConfigureCloudRoutes )
}
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
err = wait . PollImmediate ( time . Second , 10 * time . Second , func ( ) ( bool , error ) {
2017-06-14 03:54:23 +00:00
if _ , err = restclient . ServerAPIVersions ( kubeconfig ) ; err == nil {
2016-10-08 00:39:21 +00:00
return true , nil
}
glog . Errorf ( "Failed to get api versions from server: %v" , err )
return false , nil
} )
if err != nil {
glog . Fatalf ( "Failed to get api versions from server: %v" , err )
}
sharedInformers . Start ( stop )
select { }
}
2017-08-26 06:55:53 +00:00
func startHTTP ( s * options . CloudControllerManagerServer ) {
mux := http . NewServeMux ( )
healthz . InstallHandler ( mux )
if s . EnableProfiling {
mux . HandleFunc ( "/debug/pprof/" , pprof . Index )
mux . HandleFunc ( "/debug/pprof/profile" , pprof . Profile )
mux . HandleFunc ( "/debug/pprof/symbol" , pprof . Symbol )
mux . HandleFunc ( "/debug/pprof/trace" , pprof . Trace )
if s . EnableContentionProfiling {
goruntime . SetBlockProfileRate ( 1 )
}
}
configz . InstallHandler ( mux )
mux . Handle ( "/metrics" , prometheus . Handler ( ) )
server := & http . Server {
Addr : net . JoinHostPort ( s . Address , strconv . Itoa ( int ( s . Port ) ) ) ,
Handler : mux ,
}
glog . Fatal ( server . ListenAndServe ( ) )
}
func createRecorder ( kubeClient * clientset . Clientset ) record . EventRecorder {
eventBroadcaster := record . NewBroadcaster ( )
eventBroadcaster . StartLogging ( glog . Infof )
eventBroadcaster . StartRecordingToSink ( & v1core . EventSinkImpl { Interface : v1core . New ( kubeClient . CoreV1 ( ) . RESTClient ( ) ) . Events ( "" ) } )
2017-10-16 11:41:50 +00:00
return eventBroadcaster . NewRecorder ( legacyscheme . Scheme , v1 . EventSource { Component : "cloud-controller-manager" } )
2017-08-26 06:55:53 +00:00
}