mirror of https://github.com/k3s-io/k3s
Configurator refactoring for scheduler server
parent
d6f7ae2ffb
commit
1590f9e70f
|
@ -5,11 +5,15 @@ licenses(["notice"])
|
|||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["server.go"],
|
||||
srcs = [
|
||||
"configurator.go",
|
||||
"server.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
|
@ -53,3 +57,10 @@ filegroup(
|
|||
],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["configurator_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
|
||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events("")})
|
||||
return eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: s.SchedulerName})
|
||||
}
|
||||
|
||||
func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) {
|
||||
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to build config from flags: %v", err)
|
||||
}
|
||||
|
||||
kubeconfig.ContentType = s.ContentType
|
||||
// Override kubeconfig qps/burst settings from flags
|
||||
kubeconfig.QPS = s.KubeAPIQPS
|
||||
kubeconfig.Burst = int(s.KubeAPIBurst)
|
||||
|
||||
cli, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid API configuration: %v", err)
|
||||
}
|
||||
return cli, nil
|
||||
}
|
||||
|
||||
// createScheduler encapsulates the entire creation of a runnable scheduler.
|
||||
func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) {
|
||||
configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
|
||||
|
||||
// Rebuild the configurator with a default Create(...) method.
|
||||
configurator = &schedulerConfigurator{
|
||||
configurator,
|
||||
s.PolicyConfigFile,
|
||||
s.AlgorithmProvider}
|
||||
|
||||
return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
|
||||
cfg.Recorder = recorder
|
||||
})
|
||||
}
|
||||
|
||||
// schedulerConfigurator is an interface wrapper that provides default Configuration creation based on user
|
||||
// provided config file.
|
||||
type schedulerConfigurator struct {
|
||||
scheduler.Configurator
|
||||
policyFile string
|
||||
algorithmProvider string
|
||||
}
|
||||
|
||||
// Create implements the interface for the Configurator, hence it is exported even through the struct is not.
|
||||
func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
|
||||
if _, err := os.Stat(sc.policyFile); err != nil {
|
||||
if sc.Configurator != nil {
|
||||
return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
|
||||
}
|
||||
return nil, fmt.Errorf("Configurator was nil")
|
||||
}
|
||||
|
||||
// policy file is valid, try to create a configuration from it.
|
||||
var policy schedulerapi.Policy
|
||||
configData, err := ioutil.ReadFile(sc.policyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to read policy config: %v", err)
|
||||
}
|
||||
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
|
||||
return nil, fmt.Errorf("invalid configuration: %v", err)
|
||||
}
|
||||
return sc.CreateFromConfig(policy)
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSchedulerConfiguratorFailure(t *testing.T) {
|
||||
sc := &schedulerConfigurator{
|
||||
// policyfile and algorithm are intentionally undefined.
|
||||
}
|
||||
_, error := sc.Create()
|
||||
if error == nil {
|
||||
t.Fatalf("Expected error message when creating with incomplete configurator.")
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@ package app
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
|
@ -28,24 +27,13 @@ import (
|
|||
"strconv"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -124,13 +112,6 @@ func Run(s *options.SchedulerServer) error {
|
|||
panic("unreachable")
|
||||
}
|
||||
|
||||
func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events("")})
|
||||
return eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: s.SchedulerName})
|
||||
}
|
||||
|
||||
func startHTTP(s *options.SchedulerServer) {
|
||||
mux := http.NewServeMux()
|
||||
healthz.InstallHandler(mux)
|
||||
|
@ -156,61 +137,3 @@ func startHTTP(s *options.SchedulerServer) {
|
|||
}
|
||||
glog.Fatal(server.ListenAndServe())
|
||||
}
|
||||
|
||||
func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) {
|
||||
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to build config from flags: %v", err)
|
||||
}
|
||||
|
||||
kubeconfig.ContentType = s.ContentType
|
||||
// Override kubeconfig qps/burst settings from flags
|
||||
kubeconfig.QPS = s.KubeAPIQPS
|
||||
kubeconfig.Burst = int(s.KubeAPIBurst)
|
||||
|
||||
cli, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid API configuration: %v", err)
|
||||
}
|
||||
return cli, nil
|
||||
}
|
||||
|
||||
// schedulerConfigurator is an interface wrapper that provides default Configuration creation based on user
|
||||
// provided config file.
|
||||
type schedulerConfigurator struct {
|
||||
scheduler.Configurator
|
||||
policyFile string
|
||||
algorithmProvider string
|
||||
}
|
||||
|
||||
func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
|
||||
if _, err := os.Stat(sc.policyFile); err != nil {
|
||||
return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
|
||||
}
|
||||
|
||||
// policy file is valid, try to create a configuration from it.
|
||||
var policy schedulerapi.Policy
|
||||
configData, err := ioutil.ReadFile(sc.policyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to read policy config: %v", err)
|
||||
}
|
||||
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
|
||||
return nil, fmt.Errorf("invalid configuration: %v", err)
|
||||
}
|
||||
return sc.CreateFromConfig(policy)
|
||||
}
|
||||
|
||||
// createScheduler encapsulates the entire creation of a runnable scheduler.
|
||||
func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) {
|
||||
configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
|
||||
|
||||
// Rebuild the configurator with a default Create(...) method.
|
||||
configurator = &schedulerConfigurator{
|
||||
configurator,
|
||||
s.PolicyConfigFile,
|
||||
s.AlgorithmProvider}
|
||||
|
||||
return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
|
||||
cfg.Recorder = recorder
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue