diff --git a/plugin/cmd/kube-scheduler/BUILD b/plugin/cmd/kube-scheduler/BUILD index 8290c79c4f..e1763e483f 100644 --- a/plugin/cmd/kube-scheduler/BUILD +++ b/plugin/cmd/kube-scheduler/BUILD @@ -17,6 +17,7 @@ go_binary( "//pkg/version/verflag:go_default_library", "//plugin/cmd/kube-scheduler/app:go_default_library", "//plugin/cmd/kube-scheduler/app/options:go_default_library", + "//vendor:github.com/golang/glog", "//vendor:github.com/spf13/pflag", ], ) diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index 265414f386..5fa125e21c 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -73,65 +73,18 @@ through the API as necessary.`, // Run runs the specified SchedulerServer. This should never exit. func Run(s *options.SchedulerServer) error { - if c, err := configz.New("componentconfig"); err == nil { - c.Set(s.KubeSchedulerConfiguration) - } else { - glog.Errorf("unable to register configz: %s", err) - } - kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig) + kubecli, err := createClient(s) if err != nil { - glog.Errorf("unable to build config from flags: %v", err) - return err + return fmt.Errorf("unable to create kube client: %v", err) } - - kubeconfig.ContentType = s.ContentType - // Override kubeconfig qps/burst settings from flags - kubeconfig.QPS = s.KubeAPIQPS - kubeconfig.Burst = int(s.KubeAPIBurst) - + config, err := createConfig(s, kubecli) if err != nil { - glog.Fatalf("Invalid API configuration: %v", err) + return fmt.Errorf("failed to create scheduler configuration: %v", err) } - leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election")) - if err != nil { - glog.Fatalf("Invalid API configuration: %v", err) - } - - go func() { - mux := http.NewServeMux() - healthz.InstallHandler(mux) - if s.EnableProfiling { - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - if s.EnableContentionProfiling { - goruntime.SetBlockProfileRate(1) - } - } - configz.InstallHandler(mux) - mux.Handle("/metrics", prometheus.Handler()) - - server := &http.Server{ - Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))), - Handler: mux, - } - glog.Fatal(server.ListenAndServe()) - }() - - configFactory := factory.NewConfigFactory(leaderElectionClient, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains) - config, err := createConfig(s, configFactory) - - if err != nil { - glog.Fatalf("Failed to create scheduler configuration: %v", err) - } - - eventBroadcaster := record.NewBroadcaster() - config.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: s.SchedulerName}) - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: leaderElectionClient.Core().Events("")}) - sched := scheduler.New(config) + go startHTTP(s) + run := func(_ <-chan struct{}) { sched.Run() select {} @@ -139,31 +92,27 @@ func Run(s *options.SchedulerServer) error { if !s.LeaderElection.LeaderElect { run(nil) - glog.Fatal("this statement is unreachable") panic("unreachable") } id, err := os.Hostname() if err != nil { - glog.Errorf("unable to get hostname: %v", err) - return err + return fmt.Errorf("unable to get hostname: %v", err) } - // TODO: enable other lock types - rl := resourcelock.EndpointsLock{ + rl := &resourcelock.EndpointsLock{ EndpointsMeta: v1.ObjectMeta{ Namespace: "kube-system", Name: "kube-scheduler", }, - Client: leaderElectionClient, + Client: kubecli, LockConfig: resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: config.Recorder, }, } - leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ - Lock: &rl, + Lock: rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration, @@ -174,12 +123,55 @@ func Run(s *options.SchedulerServer) error { }, }, }) - - glog.Fatal("this statement is unreachable") panic("unreachable") } -func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) { +func startHTTP(s *options.SchedulerServer) { + mux := http.NewServeMux() + healthz.InstallHandler(mux) + if s.EnableProfiling { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + if s.EnableContentionProfiling { + goruntime.SetBlockProfileRate(1) + } + } + if c, err := configz.New("componentconfig"); err == nil { + c.Set(s.KubeSchedulerConfiguration) + } else { + glog.Errorf("unable to register configz: %s", err) + } + configz.InstallHandler(mux) + mux.Handle("/metrics", prometheus.Handler()) + + server := &http.Server{ + Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))), + Handler: mux, + } + glog.Fatal(server.ListenAndServe()) +} + +func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) { + kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig) + if err != nil { + return nil, fmt.Errorf("unable to build config from flags: %v", err) + } + + kubeconfig.ContentType = s.ContentType + // Override kubeconfig qps/burst settings from flags + kubeconfig.QPS = s.KubeAPIQPS + kubeconfig.Burst = int(s.KubeAPIBurst) + + cli, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election")) + if err != nil { + return nil, fmt.Errorf("invalid API configuration: %v", err) + } + return cli, nil +} + +func createConfig(s *options.SchedulerServer, kubecli *clientset.Clientset) (*scheduler.Config, error) { + configFactory := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains) if _, err := os.Stat(s.PolicyConfigFile); err == nil { var ( policy schedulerapi.Policy @@ -196,5 +188,15 @@ func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFacto } // if the config file isn't provided, use the specified (or default) provider - return configFactory.CreateFromProvider(s.AlgorithmProvider) + config, err := configFactory.CreateFromProvider(s.AlgorithmProvider) + if err != nil { + return nil, err + } + + eventBroadcaster := record.NewBroadcaster() + config.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: s.SchedulerName}) + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubecli.Core().Events("")}) + + return config, nil } diff --git a/plugin/cmd/kube-scheduler/scheduler.go b/plugin/cmd/kube-scheduler/scheduler.go index cea7a7419b..bd3369f046 100644 --- a/plugin/cmd/kube-scheduler/scheduler.go +++ b/plugin/cmd/kube-scheduler/scheduler.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" + "github.com/golang/glog" "github.com/spf13/pflag" ) @@ -36,5 +37,7 @@ func main() { verflag.PrintAndExitIfRequested() - app.Run(s) + if err := app.Run(s); err != nil { + glog.Fatalf("scheduler app failed to run: %v", err) + } }