From 8439f81f2d9b86fd8d74a8a64b3696b8000b1967 Mon Sep 17 00:00:00 2001 From: jayunit100 Date: Fri, 20 Jan 2017 14:32:09 -0500 Subject: [PATCH] NewSchedulerFromInterface implementation --- plugin/cmd/kube-scheduler/app/server.go | 86 ++++++++++++++----------- plugin/pkg/scheduler/scheduler.go | 25 +++++++ test/integration/scheduler_perf/util.go | 21 +++--- 3 files changed, 86 insertions(+), 46 deletions(-) diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index 5569ae5f56..c05cbfbb5b 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -78,24 +78,20 @@ func Run(s *options.SchedulerServer) error { if err != nil { return fmt.Errorf("unable to create kube client: %v", err) } - config, err := createConfig(s, kubecli) + recorder := createRecorder(kubecli, s) + sched, err := createScheduler(s, kubecli, recorder) if err != nil { - return fmt.Errorf("failed to create scheduler configuration: %v", err) + return fmt.Errorf("error creating scheduler: %v", err) } - sched := scheduler.New(config) - go startHTTP(s) - run := func(_ <-chan struct{}) { sched.Run() select {} } - if !s.LeaderElection.LeaderElect { run(nil) panic("unreachable") } - id, err := os.Hostname() if err != nil { return fmt.Errorf("unable to get hostname: %v", err) @@ -109,7 +105,7 @@ func Run(s *options.SchedulerServer) error { Client: kubecli, LockConfig: resourcelock.ResourceLockConfig{ Identity: id, - EventRecorder: config.Recorder, + EventRecorder: recorder, }, } leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ @@ -127,6 +123,13 @@ func Run(s *options.SchedulerServer) error { panic("unreachable") } +func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubecli.Core().Events("")}) + return eventBroadcaster.NewRecorder(v1.EventSource{Component: s.SchedulerName}) +} + func startHTTP(s *options.SchedulerServer) { mux := http.NewServeMux() healthz.InstallHandler(mux) @@ -171,33 +174,42 @@ func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) { return cli, nil } -func createConfig(s *options.SchedulerServer, kubecli *clientset.Clientset) (*scheduler.Config, error) { - configFactory := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains) - if _, err := os.Stat(s.PolicyConfigFile); err == nil { - var ( - policy schedulerapi.Policy - configData []byte - ) - configData, err := ioutil.ReadFile(s.PolicyConfigFile) - if err != nil { - return nil, fmt.Errorf("unable to read policy config: %v", err) - } - if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil { - return nil, fmt.Errorf("invalid configuration: %v", err) - } - return configFactory.CreateFromConfig(policy) - } - - // if the config file isn't provided, use the specified (or default) provider - config, err := configFactory.CreateFromProvider(s.AlgorithmProvider) - if err != nil { - return nil, err - } - - eventBroadcaster := record.NewBroadcaster() - config.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: s.SchedulerName}) - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubecli.Core().Events("")}) - - return config, nil +// schedulerConfigurator is an interface wrapper that provides default Configuration creation based on user +// provided config file. +type schedulerConfigurator struct { + scheduler.Configurator + policyFile string + algorithmProvider string +} + +func (sc schedulerConfigurator) Create() (*scheduler.Config, error) { + if _, err := os.Stat(sc.policyFile); err != nil { + return sc.Configurator.CreateFromProvider(sc.algorithmProvider) + } + + // policy file is valid, try to create a configuration from it. + var policy schedulerapi.Policy + configData, err := ioutil.ReadFile(sc.policyFile) + if err != nil { + return nil, fmt.Errorf("unable to read policy config: %v", err) + } + if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil { + return nil, fmt.Errorf("invalid configuration: %v", err) + } + return sc.CreateFromConfig(policy) +} + +// createScheduler encapsulates the entire creation of a runnable scheduler. +func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) { + configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains) + + // Rebuild the configurator with a default Create(...) method. + configurator = &schedulerConfigurator{ + configurator, + s.PolicyConfigFile, + s.AlgorithmProvider} + + return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) { + cfg.Recorder = recorder + }) } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 51858b7b79..a3e9f00d7e 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -51,6 +51,10 @@ type Scheduler struct { config *Config } +func (sched *Scheduler) StopEverything() { + close(sched.config.StopEverything) +} + // These are the functions which need to be provided in order to build a Scheduler configuration. // An implementation of this can be seen in factory.go. type Configurator interface { @@ -78,6 +82,7 @@ type Configurator interface { CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) } +// TODO over time we should make this struct a hidden implementation detail of the scheduler. type Config struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. @@ -108,6 +113,7 @@ type Config struct { } // New returns a new scheduler. +// TODO replace this with NewFromConfigurator. func New(c *Config) *Scheduler { s := &Scheduler{ config: c, @@ -116,6 +122,25 @@ func New(c *Config) *Scheduler { return s } +// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented. +// Supports intermediate Config mutation for now if you provide modifier functions which will run after Config is created. +func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Scheduler, error) { + cfg, err := c.Create() + if err != nil { + return nil, err + } + // Mutate it if any functions were provided, changes might be required for certain types of tests (i.e. change the recorder). + for _, modifier := range modifiers { + modifier(cfg) + } + // From this point on the config is immutable to the outside. + s := &Scheduler{ + config: cfg, + } + metrics.Register() + return s, nil +} + // Run begins watching and scheduling. It starts a goroutine and returns immediately. func (s *Scheduler) Run() { go wait.Until(s.scheduleOne, 0, s.config.StopEverything) diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index efd789c668..2d98b4d37d 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -40,7 +40,7 @@ import ( // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler() (schedulerConfigFactory scheduler.Configurator, destroyFunc func()) { +func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroyFunc func()) { h := &framework.MasterHolder{Initialized: make(chan struct{})} s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -57,20 +57,23 @@ func mustSetupScheduler() (schedulerConfigFactory scheduler.Configurator, destro Burst: 5000, }) - schedulerConfigFactory = factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains) + schedulerConfigurator = factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains) - schedulerConfig, err := schedulerConfigFactory.Create() - if err != nil { - panic("Couldn't create scheduler config") - } eventBroadcaster := record.NewBroadcaster() - schedulerConfig.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: "scheduler"}) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: clientSet.Core().Events("")}) - scheduler.New(schedulerConfig).Run() + + sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) { + conf.Recorder = eventBroadcaster.NewRecorder(v1.EventSource{Component: "scheduler"}) + }) + if err != nil { + glog.Fatalf("Error creating scheduler: %v", err) + } + + sched.Run() destroyFunc = func() { glog.Infof("destroying") - close(schedulerConfig.StopEverything) + sched.StopEverything() s.Close() glog.Infof("destroyed") }