Revert "Revert "scheduler: align with ctrl-managers and apiservers, add https+auth in options""

This reverts commit 7b93d81a97.
pull/8/head
Dr. Stefan Schimanski 2018-05-10 10:23:13 +02:00
parent d0f4a8fa17
commit a3a52a8cf7
15 changed files with 1319 additions and 608 deletions

View File

@ -30,6 +30,8 @@ import (
type InsecureServingInfo struct {
// Listener is the secure server network listener.
Listener net.Listener
// optional server name for log messages
Name string
}
// Serve starts an insecure http server with the given handler. It fails only if
@ -41,6 +43,10 @@ func (s *InsecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.D
MaxHeaderBytes: 1 << 20,
}
glog.Infof("Serving insecurely on %s", s.Listener.Addr())
if len(s.Name) > 0 {
glog.Infof("Serving %s insecurely on %s", s.Name, s.Listener.Addr())
} else {
glog.Infof("Serving insecurely on %s", s.Listener.Addr())
}
return server.RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
}

View File

@ -23,7 +23,6 @@ import (
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server/options"
genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app"
"k8s.io/kubernetes/pkg/apis/componentconfig"
)
// InsecureServingOptions are for creating an unauthenticated, unauthorized, insecure port.
@ -39,16 +38,20 @@ type InsecureServingOptions struct {
// either Listener or BindAddress/BindPort/BindNetwork is set,
// if Listener is set, use it and omit BindAddress/BindPort/BindNetwork.
Listener net.Listener
// ListenFunc can be overridden to create a custom listener, e.g. for mocking in tests.
// It defaults to options.CreateListener.
ListenFunc func(network, addr string) (net.Listener, int, error)
}
// Validate ensures that the insecure port values within the range of the port.
func (s *InsecureServingOptions) Validate() []error {
errors := []error{}
if s == nil {
return nil
}
errors := []error{}
if s.BindPort < 0 || s.BindPort > 32767 {
errors = append(errors, fmt.Errorf("--insecure-port %v must be between 0 and 32767, inclusive. 0 for turning off insecure (HTTP) port", s.BindPort))
}
@ -61,20 +64,10 @@ func (s *InsecureServingOptions) AddFlags(fs *pflag.FlagSet) {
if s == nil {
return
}
}
// AddDeprecatedFlags adds deprecated flags related to insecure serving for controller manager to the specified FlagSet.
// TODO: remove it until kops stop using `--address`
func (s *InsecureServingOptions) AddDeprecatedFlags(fs *pflag.FlagSet) {
if s == nil {
return
}
fs.IPVar(&s.BindAddress, "address", s.BindAddress,
"DEPRECATED: the IP address on which to listen for the --port port. See --bind-address instead.")
fs.IPVar(&s.BindAddress, "address", s.BindAddress, "DEPRECATED: the IP address on which to listen for the --port port (set to 0.0.0.0 for all IPv4 interfaces and :: for all IPv6 interfaces). See --bind-address instead.")
// MarkDeprecated hides the flag from the help. We don't want that:
// fs.MarkDeprecated("address", "see --bind-address instead.")
fs.IntVar(&s.BindPort, "port", s.BindPort, "DEPRECATED: the port on which to serve HTTP insecurely without authentication and authorization. If 0, don't serve HTTPS at all. See --secure-port instead.")
// MarkDeprecated hides the flag from the help. We don't want that:
// fs.MarkDeprecated("port", "see --secure-port instead.")
@ -82,7 +75,7 @@ func (s *InsecureServingOptions) AddDeprecatedFlags(fs *pflag.FlagSet) {
// ApplyTo adds InsecureServingOptions to the insecureserverinfo amd kube-controller manager configuration.
// Note: the double pointer allows to set the *InsecureServingInfo to nil without referencing the struct hosting this pointer.
func (s *InsecureServingOptions) ApplyTo(c **genericcontrollermanager.InsecureServingInfo, cfg *componentconfig.KubeCloudSharedConfiguration) error {
func (s *InsecureServingOptions) ApplyTo(c **genericcontrollermanager.InsecureServingInfo) error {
if s == nil {
return nil
}
@ -92,8 +85,12 @@ func (s *InsecureServingOptions) ApplyTo(c **genericcontrollermanager.InsecureSe
if s.Listener == nil {
var err error
listen := options.CreateListener
if s.ListenFunc != nil {
listen = s.ListenFunc
}
addr := net.JoinHostPort(s.BindAddress.String(), fmt.Sprintf("%d", s.BindPort))
s.Listener, s.BindPort, err = options.CreateListener(s.BindNetwork, addr)
s.Listener, s.BindPort, err = listen(s.BindNetwork, addr)
if err != nil {
return fmt.Errorf("failed to create listener: %v", err)
}
@ -103,10 +100,5 @@ func (s *InsecureServingOptions) ApplyTo(c **genericcontrollermanager.InsecureSe
Listener: s.Listener,
}
// sync back to component config
// TODO: find more elegant way than synching back the values.
cfg.Port = int32(s.BindPort)
cfg.Address = s.BindAddress.String()
return nil
}

View File

@ -225,7 +225,6 @@ func (o *GenericControllerManagerOptions) AddFlags(fs *pflag.FlagSet) {
o.ServiceController.AddFlags(fs)
o.SecureServing.AddFlags(fs)
o.InsecureServing.AddFlags(fs)
o.InsecureServing.AddDeprecatedFlags(fs)
o.Authentication.AddFlags(fs)
o.Authorization.AddFlags(fs)
}
@ -304,7 +303,7 @@ func (o *GenericControllerManagerOptions) ApplyTo(c *genericcontrollermanager.Co
if err := o.SecureServing.ApplyTo(&c.SecureServing); err != nil {
return err
}
if err := o.InsecureServing.ApplyTo(&c.InsecureServing, &c.ComponentConfig.KubeCloudShared); err != nil {
if err := o.InsecureServing.ApplyTo(&c.InsecureServing); err != nil {
return err
}
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
@ -314,6 +313,11 @@ func (o *GenericControllerManagerOptions) ApplyTo(c *genericcontrollermanager.Co
return err
}
// sync back to component config
// TODO: find more elegant way than synching back the values.
c.ComponentConfig.KubeCloudShared.Port = int32(o.InsecureServing.BindPort)
c.ComponentConfig.KubeCloudShared.Address = o.InsecureServing.BindAddress.String()
var err error
c.Kubeconfig, err = clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
if err != nil {

View File

@ -10,14 +10,12 @@ go_library(
srcs = ["server.go"],
importpath = "k8s.io/kubernetes/cmd/kube-scheduler/app",
deps = [
"//cmd/kube-scheduler/app/config:go_default_library",
"//cmd/kube-scheduler/app/options:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
"//pkg/client/leaderelectionconfig:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubectl/cmd/util:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/api:go_default_library",
@ -30,30 +28,23 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/mux:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/routes:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/informers/storage/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
"//vendor/k8s.io/client-go/tools/leaderelection:go_default_library",
"//vendor/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
@ -66,6 +57,10 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//cmd/kube-scheduler/app/config:all-srcs",
"//cmd/kube-scheduler/app/options:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,33 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["config.go"],
importpath = "k8s.io/kubernetes/cmd/kube-scheduler/app/config",
visibility = ["//visibility:public"],
deps = [
"//cmd/controller-manager/app:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/leaderelection:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,75 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/cmd/controller-manager/app"
"k8s.io/kubernetes/pkg/apis/componentconfig"
)
// Config has all the context to run a Scheduler
type Config struct {
// config is the scheduler server's configuration object.
ComponentConfig componentconfig.KubeSchedulerConfiguration
InsecureServing *app.InsecureServingInfo // nil will disable serving on an insecure port
InsecureMetricsServing *app.InsecureServingInfo // non-nil if metrics should be served independently
Authentication apiserver.AuthenticationInfo
Authorization apiserver.AuthorizationInfo
SecureServing *apiserver.SecureServingInfo
Client clientset.Interface
InformerFactory informers.SharedInformerFactory
PodInformer coreinformers.PodInformer
EventClient v1core.EventsGetter
Recorder record.EventRecorder
Broadcaster record.EventBroadcaster
// LeaderElection is optional.
LeaderElection *leaderelection.LeaderElectionConfig
}
type completedConfig struct {
*Config
}
// CompletedConfig same as Config, just to swap private object.
type CompletedConfig struct {
// Embed a private pointer that cannot be instantiated outside of this package.
*completedConfig
}
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig {
cc := completedConfig{c}
if c.InsecureServing != nil {
c.InsecureServing.Name = "healthz"
}
if c.InsecureMetricsServing != nil {
c.InsecureMetricsServing.Name = "metrics"
}
return CompletedConfig{&cc}
}

View File

@ -0,0 +1,67 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"configfile.go",
"deprecated.go",
"insecure_serving.go",
"options.go",
],
importpath = "k8s.io/kubernetes/cmd/kube-scheduler/app/options",
visibility = ["//visibility:public"],
deps = [
"//cmd/controller-manager/app/options:go_default_library",
"//cmd/kube-scheduler/app/config:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
"//pkg/client/leaderelectionconfig:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
"//vendor/k8s.io/client-go/tools/leaderelection:go_default_library",
"//vendor/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["insecure_serving_test.go"],
embed = [":go_default_library"],
deps = [
"//cmd/controller-manager/app/options:go_default_library",
"//cmd/kube-scheduler/app/config:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
],
)

View File

@ -0,0 +1,89 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"errors"
"fmt"
"io/ioutil"
"os"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/apis/componentconfig"
componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
)
var (
configScheme = runtime.NewScheme()
configCodecs = serializer.NewCodecFactory(configScheme)
)
func init() {
componentconfig.AddToScheme(configScheme)
componentconfigv1alpha1.AddToScheme(configScheme)
}
func loadConfigFromFile(file string) (*componentconfig.KubeSchedulerConfiguration, error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
return loadConfig(data)
}
func loadConfig(data []byte) (*componentconfig.KubeSchedulerConfiguration, error) {
configObj, gvk, err := configCodecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil {
return nil, err
}
config, ok := configObj.(*componentconfig.KubeSchedulerConfiguration)
if !ok {
return nil, fmt.Errorf("got unexpected config type: %v", gvk)
}
return config, nil
}
// WriteConfigFile writes the config into the given file name as YAML.
func WriteConfigFile(fileName string, cfg *componentconfig.KubeSchedulerConfiguration) error {
var encoder runtime.Encoder
mediaTypes := configCodecs.SupportedMediaTypes()
for _, info := range mediaTypes {
if info.MediaType == "application/yaml" {
encoder = info.Serializer
break
}
}
if encoder == nil {
return errors.New("unable to locate yaml encoder")
}
encoder = json.NewYAMLSerializer(json.DefaultMetaFactory, configScheme, configScheme)
encoder = configCodecs.EncoderForVersion(encoder, componentconfigv1alpha1.SchemeGroupVersion)
configFile, err := os.Create(fileName)
if err != nil {
return err
}
defer configFile.Close()
if err := encoder.Encode(cfg, configFile); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,116 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/scheduler/factory"
)
// DeprecatedOptions contains deprecated options and their flags.
// TODO remove these fields once the deprecated flags are removed.
type DeprecatedOptions struct {
// The fields below here are placeholders for flags that can't be directly
// mapped into componentconfig.KubeSchedulerConfiguration.
PolicyConfigFile string
PolicyConfigMapName string
PolicyConfigMapNamespace string
UseLegacyPolicyConfig bool
AlgorithmProvider string
}
// AddFlags adds flags for the deprecated options.
func (o *DeprecatedOptions) AddFlags(fs *pflag.FlagSet, cfg *componentconfig.KubeSchedulerConfiguration) {
if o == nil {
return
}
// TODO: unify deprecation mechanism, string prefix or MarkDeprecated (the latter hides the flag. We also don't want that).
fs.StringVar(&o.AlgorithmProvider, "algorithm-provider", o.AlgorithmProvider, "DEPRECATED: the scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders())
fs.StringVar(&o.PolicyConfigFile, "policy-config-file", o.PolicyConfigFile, "DEPRECATED: file with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config=true")
usage := fmt.Sprintf("DEPRECATED: name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config=false. The config must be provided as the value of an element in 'Data' map with the key='%v'", componentconfig.SchedulerPolicyConfigMapKey)
fs.StringVar(&o.PolicyConfigMapName, "policy-configmap", o.PolicyConfigMapName, usage)
fs.StringVar(&o.PolicyConfigMapNamespace, "policy-configmap-namespace", o.PolicyConfigMapNamespace, "DEPRECATED: the namespace where policy ConfigMap is located. The kube-system namespace will be used if this is not provided or is empty.")
fs.BoolVar(&o.UseLegacyPolicyConfig, "use-legacy-policy-config", o.UseLegacyPolicyConfig, "DEPRECATED: when set to true, scheduler will ignore policy ConfigMap and uses policy config file")
fs.BoolVar(&cfg.EnableProfiling, "profiling", cfg.EnableProfiling, "DEPRECATED: enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&cfg.EnableContentionProfiling, "contention-profiling", cfg.EnableContentionProfiling, "DEPRECATED: enable lock contention profiling, if profiling is enabled")
fs.StringVar(&cfg.ClientConnection.KubeConfigFile, "kubeconfig", cfg.ClientConnection.KubeConfigFile, "DEPRECATED: path to kubeconfig file with authorization and master location information.")
fs.StringVar(&cfg.ClientConnection.ContentType, "kube-api-content-type", cfg.ClientConnection.ContentType, "DEPRECATED: content type of requests sent to apiserver.")
fs.Float32Var(&cfg.ClientConnection.QPS, "kube-api-qps", cfg.ClientConnection.QPS, "DEPRECATED: QPS to use while talking with kubernetes apiserver")
fs.Int32Var(&cfg.ClientConnection.Burst, "kube-api-burst", cfg.ClientConnection.Burst, "DEPRECATED: burst to use while talking with kubernetes apiserver")
fs.StringVar(&cfg.SchedulerName, "scheduler-name", cfg.SchedulerName, "DEPRECATED: name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's \"spec.schedulerName\".")
fs.StringVar(&cfg.LeaderElection.LockObjectNamespace, "lock-object-namespace", cfg.LeaderElection.LockObjectNamespace, "DEPRECATED: define the namespace of the lock object.")
fs.StringVar(&cfg.LeaderElection.LockObjectName, "lock-object-name", cfg.LeaderElection.LockObjectName, "DEPRECATED: define the name of the lock object.")
fs.Int32Var(&cfg.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", cfg.HardPodAffinitySymmetricWeight,
"RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+
"to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.")
fs.MarkDeprecated("hard-pod-affinity-symmetric-weight", "This option was moved to the policy configuration file")
fs.StringVar(&cfg.FailureDomains, "failure-domains", cfg.FailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.")
fs.MarkDeprecated("failure-domains", "Doesn't have any effect. Will be removed in future version.")
}
// Validate validates the deprecated scheduler options.
func (o *DeprecatedOptions) Validate() []error {
if o != nil {
return nil
}
return nil
}
// ApplyTo sets cfg.AlgorithmSource from flags passed on the command line in the following precedence order:
//
// 1. --use-legacy-policy-config to use a policy file.
// 2. --policy-configmap to use a policy config map value.
// 3. --algorithm-provider to use a named algorithm provider.
func (o *DeprecatedOptions) ApplyTo(cfg *componentconfig.KubeSchedulerConfiguration) error {
if o == nil {
return nil
}
switch {
case o.UseLegacyPolicyConfig || (len(o.PolicyConfigFile) > 0 && o.PolicyConfigMapName == ""):
cfg.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
Policy: &componentconfig.SchedulerPolicySource{
File: &componentconfig.SchedulerPolicyFileSource{
Path: o.PolicyConfigFile,
},
},
}
case len(o.PolicyConfigMapName) > 0:
cfg.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
Policy: &componentconfig.SchedulerPolicySource{
ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{
Name: o.PolicyConfigMapName,
Namespace: o.PolicyConfigMapNamespace,
},
},
}
case len(o.AlgorithmProvider) > 0:
cfg.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
Provider: &o.AlgorithmProvider,
}
}
return nil
}

View File

@ -0,0 +1,164 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"net"
"strconv"
"github.com/spf13/pflag"
controlleroptions "k8s.io/kubernetes/cmd/controller-manager/app/options"
schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/pkg/apis/componentconfig"
)
// CombinedInsecureServingOptions sets up up to two insecure listeners for healthz and metrics. The flags
// override the ComponentConfig and InsecureServingOptions values for both.
type CombinedInsecureServingOptions struct {
Healthz *controlleroptions.InsecureServingOptions
Metrics *controlleroptions.InsecureServingOptions
BindPort int // overrides the structs above on ApplyTo, ignored on ApplyToFromLoadedConfig
BindAddress string // overrides the structs above on ApplyTo, ignored on ApplyToFromLoadedConfig
}
// AddFlags adds flags for the insecure serving options.
func (o *CombinedInsecureServingOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.StringVar(&o.BindAddress, "address", o.BindAddress, "DEPRECATED: the IP address on which to listen for the --port port (set to 0.0.0.0 for all IPv4 interfaces and :: for all IPv6 interfaces). See --bind-address instead.")
// MarkDeprecated hides the flag from the help. We don't want that:
// fs.MarkDeprecated("address", "see --bind-address instead.")
fs.IntVar(&o.BindPort, "port", o.BindPort, "DEPRECATED: the port on which to serve HTTP insecurely without authentication and authorization. If 0, don't serve HTTPS at all. See --secure-port instead.")
// MarkDeprecated hides the flag from the help. We don't want that:
// fs.MarkDeprecated("port", "see --secure-port instead.")
}
func (o *CombinedInsecureServingOptions) applyTo(c *schedulerappconfig.Config, componentConfig *componentconfig.KubeSchedulerConfiguration) error {
if err := updateAddressFromInsecureServingOptions(&componentConfig.HealthzBindAddress, o.Healthz); err != nil {
return err
}
if err := updateAddressFromInsecureServingOptions(&componentConfig.MetricsBindAddress, o.Metrics); err != nil {
return err
}
if err := o.Healthz.ApplyTo(&c.InsecureServing); err != nil {
return err
}
if o.Metrics != nil && (c.ComponentConfig.MetricsBindAddress != c.ComponentConfig.HealthzBindAddress || o.Healthz == nil) {
if err := o.Metrics.ApplyTo(&c.InsecureMetricsServing); err != nil {
return err
}
}
return nil
}
// ApplyTo applies the insecure serving options to the given scheduler app configuration, and updates the componentConfig.
func (o *CombinedInsecureServingOptions) ApplyTo(c *schedulerappconfig.Config, componentConfig *componentconfig.KubeSchedulerConfiguration) error {
if o == nil {
componentConfig.HealthzBindAddress = ""
componentConfig.MetricsBindAddress = ""
return nil
}
if o.Healthz != nil {
o.Healthz.BindPort = o.BindPort
o.Healthz.BindAddress = net.ParseIP(o.BindAddress)
}
if o.Metrics != nil {
o.Metrics.BindPort = o.BindPort
o.Metrics.BindAddress = net.ParseIP(o.BindAddress)
}
return o.applyTo(c, componentConfig)
}
// ApplyToFromLoadedConfig updates the insecure serving options from the component config and then appies it to the given scheduler app configuration.
func (o *CombinedInsecureServingOptions) ApplyToFromLoadedConfig(c *schedulerappconfig.Config, componentConfig *componentconfig.KubeSchedulerConfiguration) error {
if o == nil {
return nil
}
if err := updateInsecureServingOptionsFromAddress(o.Healthz, componentConfig.HealthzBindAddress); err != nil {
return fmt.Errorf("invalid healthz address: %v", err)
}
if err := updateInsecureServingOptionsFromAddress(o.Metrics, componentConfig.MetricsBindAddress); err != nil {
return fmt.Errorf("invalid metrics address: %v", err)
}
return o.applyTo(c, componentConfig)
}
func updateAddressFromInsecureServingOptions(addr *string, is *controlleroptions.InsecureServingOptions) error {
if is == nil {
*addr = ""
} else {
if is.Listener != nil {
*addr = is.Listener.Addr().String()
} else if is.BindPort == 0 {
*addr = ""
} else {
*addr = net.JoinHostPort(is.BindAddress.String(), strconv.Itoa(is.BindPort))
}
}
return nil
}
func updateInsecureServingOptionsFromAddress(is *controlleroptions.InsecureServingOptions, addr string) error {
if is == nil {
return nil
}
if len(addr) == 0 {
is.BindPort = 0
return nil
}
host, portInt, err := splitHostIntPort(addr)
if err != nil {
return fmt.Errorf("invalid address %q", addr)
}
is.BindAddress = net.ParseIP(host)
is.BindPort = portInt
return nil
}
// Validate validates the insecure serving options.
func (o *CombinedInsecureServingOptions) Validate() []error {
if o == nil {
return nil
}
errors := []error{}
if o.BindPort <= 0 || o.BindPort > 32767 {
errors = append(errors, fmt.Errorf("--insecure-port %v must be between 0 and 32767, inclusive. 0 for turning off insecure (HTTP) port", o.BindPort))
}
if len(o.BindAddress) > 0 && net.ParseIP(o.BindAddress) == nil {
errors = append(errors, fmt.Errorf("--address has no valid IP address"))
}
return errors
}

View File

@ -0,0 +1,276 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"net"
"strconv"
"testing"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/kubernetes/cmd/controller-manager/app/options"
schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/pkg/apis/componentconfig"
)
func TestOptions_ApplyTo(t *testing.T) {
tests := []struct {
name string
options Options
configLoaded bool
expectHealthzBindAddress, expectMetricsBindAddress string
expectInsecureServingAddress, expectInsecureMetricsServingAddress string
expectInsecureServingPort, expectInsecureMetricsServingPort int
wantErr bool
}{
{
name: "no config, zero port",
options: Options{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HealthzBindAddress: "1.2.3.4:1234",
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &options.InsecureServingOptions{},
Metrics: &options.InsecureServingOptions{},
BindPort: 0,
},
},
configLoaded: false,
},
{
name: "config loaded, non-nil healthz",
options: Options{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HealthzBindAddress: "1.2.3.4:1234",
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &options.InsecureServingOptions{},
BindPort: 0,
},
},
configLoaded: true,
expectHealthzBindAddress: "1.2.3.4:1234",
expectInsecureServingPort: 1234,
expectInsecureServingAddress: "1.2.3.4",
},
{
name: "config loaded, non-nil metrics",
options: Options{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HealthzBindAddress: "1.2.3.4:1234",
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Metrics: &options.InsecureServingOptions{},
BindPort: 0,
},
},
configLoaded: true,
expectMetricsBindAddress: "1.2.3.4:1234",
expectInsecureMetricsServingPort: 1234,
expectInsecureMetricsServingAddress: "1.2.3.4",
},
{
name: "config loaded, all set, zero BindPort",
options: Options{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HealthzBindAddress: "1.2.3.4:1234",
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &options.InsecureServingOptions{},
Metrics: &options.InsecureServingOptions{},
BindPort: 0,
},
},
configLoaded: true,
expectHealthzBindAddress: "1.2.3.4:1234",
expectInsecureServingPort: 1234,
expectInsecureServingAddress: "1.2.3.4",
expectMetricsBindAddress: "1.2.3.4:1234",
},
{
name: "config loaded, all set, different addresses",
options: Options{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HealthzBindAddress: "1.2.3.4:1234",
MetricsBindAddress: "1.2.3.4:1235",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &options.InsecureServingOptions{},
Metrics: &options.InsecureServingOptions{},
BindPort: 0,
},
},
configLoaded: true,
expectHealthzBindAddress: "1.2.3.4:1234",
expectInsecureServingPort: 1234,
expectInsecureServingAddress: "1.2.3.4",
expectMetricsBindAddress: "1.2.3.4:1235",
expectInsecureMetricsServingPort: 1235,
expectInsecureMetricsServingAddress: "1.2.3.4",
},
{
name: "no config, all set, port passed",
options: Options{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HealthzBindAddress: "1.2.3.4:1234",
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &options.InsecureServingOptions{},
Metrics: &options.InsecureServingOptions{},
BindPort: 1236,
BindAddress: "1.2.3.4",
},
},
configLoaded: false,
expectHealthzBindAddress: "1.2.3.4:1236",
expectInsecureServingPort: 1236,
expectInsecureServingAddress: "1.2.3.4",
expectMetricsBindAddress: "1.2.3.4:1236",
},
{
name: "no config, all set, address passed",
options: Options{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HealthzBindAddress: "1.2.3.4:1234",
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &options.InsecureServingOptions{},
Metrics: &options.InsecureServingOptions{},
BindAddress: "2.3.4.5",
BindPort: 1234,
},
},
configLoaded: false,
expectHealthzBindAddress: "2.3.4.5:1234",
expectInsecureServingPort: 1234,
expectInsecureServingAddress: "2.3.4.5",
expectMetricsBindAddress: "2.3.4.5:1234",
},
{
name: "no config, all set, zero port passed",
options: Options{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HealthzBindAddress: "1.2.3.4:1234",
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &options.InsecureServingOptions{},
Metrics: &options.InsecureServingOptions{},
BindAddress: "2.3.4.5",
BindPort: 0,
},
},
configLoaded: false,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d-%s", i, tt.name), func(t *testing.T) {
c := schedulerappconfig.Config{
ComponentConfig: tt.options.ComponentConfig,
}
if tt.options.CombinedInsecureServing != nil {
if tt.options.CombinedInsecureServing.Healthz != nil {
tt.options.CombinedInsecureServing.Healthz.ListenFunc = createMockListener
}
if tt.options.CombinedInsecureServing.Metrics != nil {
tt.options.CombinedInsecureServing.Metrics.ListenFunc = createMockListener
}
}
if tt.configLoaded {
if err := tt.options.CombinedInsecureServing.ApplyToFromLoadedConfig(&c, &c.ComponentConfig); (err != nil) != tt.wantErr {
t.Fatalf("%d - Options.ApplyTo() error = %v, wantErr %v", i, err, tt.wantErr)
}
} else {
if err := tt.options.CombinedInsecureServing.ApplyTo(&c, &c.ComponentConfig); (err != nil) != tt.wantErr {
t.Fatalf("%d - Options.ApplyTo() error = %v, wantErr %v", i, err, tt.wantErr)
}
}
if got, expect := c.ComponentConfig.HealthzBindAddress, tt.expectHealthzBindAddress; got != expect {
t.Errorf("%d - expected HealthzBindAddress %q, got %q", i, expect, got)
}
if got, expect := c.ComponentConfig.MetricsBindAddress, tt.expectMetricsBindAddress; got != expect {
t.Errorf("%d - expected MetricsBindAddress %q, got %q", i, expect, got)
}
if got, expect := c.InsecureServing != nil, tt.expectInsecureServingPort != 0; got != expect {
t.Errorf("%d - expected InsecureServing != nil to be %v, got %v", i, expect, got)
} else if c.InsecureServing != nil {
if got, expect := c.InsecureServing.Listener.(*mockListener).address, tt.expectInsecureServingAddress; got != expect {
t.Errorf("%d - expected healthz address %q, got %q", i, expect, got)
}
if got, expect := c.InsecureServing.Listener.(*mockListener).port, tt.expectInsecureServingPort; got != expect {
t.Errorf("%d - expected healthz port %v, got %v", i, expect, got)
}
}
if got, expect := c.InsecureMetricsServing != nil, tt.expectInsecureMetricsServingPort != 0; got != expect {
t.Errorf("%d - expected Metrics != nil to be %v, got %v", i, expect, got)
} else if c.InsecureMetricsServing != nil {
if got, expect := c.InsecureMetricsServing.Listener.(*mockListener).address, tt.expectInsecureMetricsServingAddress; got != expect {
t.Errorf("%d - expected metrics address %q, got %q", i, expect, got)
}
if got, expect := c.InsecureMetricsServing.Listener.(*mockListener).port, tt.expectInsecureMetricsServingPort; got != expect {
t.Errorf("%d - expected metrics port %v, got %v", i, expect, got)
}
}
})
}
}
type mockListener struct {
address string
port int
}
func createMockListener(network, addr string) (net.Listener, int, error) {
host, portInt, err := splitHostIntPort(addr)
if err != nil {
return nil, 0, err
}
if portInt == 0 {
portInt = rand.IntnRange(1, 32767)
}
return &mockListener{host, portInt}, portInt, nil
}
func (l *mockListener) Accept() (net.Conn, error) { return nil, nil }
func (l *mockListener) Close() error { return nil }
func (l *mockListener) Addr() net.Addr {
return mockAddr(net.JoinHostPort(l.address, strconv.Itoa(l.port)))
}
type mockAddr string
func (a mockAddr) Network() string { return "tcp" }
func (a mockAddr) String() string { return string(a) }

View File

@ -0,0 +1,303 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"net"
"os"
"strconv"
"github.com/golang/glog"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
controlleroptions "k8s.io/kubernetes/cmd/controller-manager/app/options"
schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/componentconfig"
componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
"k8s.io/kubernetes/pkg/scheduler/factory"
)
// Options has all the params needed to run a Scheduler
type Options struct {
// The default values. These are overridden if ConfigFile is set or by values in InsecureServing.
ComponentConfig componentconfig.KubeSchedulerConfiguration
SecureServing *apiserveroptions.SecureServingOptions
CombinedInsecureServing *CombinedInsecureServingOptions
Authentication *apiserveroptions.DelegatingAuthenticationOptions
Authorization *apiserveroptions.DelegatingAuthorizationOptions
Deprecated *DeprecatedOptions
// ConfigFile is the location of the scheduler server's configuration file.
ConfigFile string
// WriteConfigTo is the path where the default configuration will be written.
WriteConfigTo string
Master string
}
// NewOptions returns default scheduler app options.
func NewOptions() (*Options, error) {
cfg, err := newDefaultComponentConfig()
if err != nil {
return nil, err
}
hhost, hport, err := splitHostIntPort(cfg.HealthzBindAddress)
if err != nil {
return nil, err
}
o := &Options{
ComponentConfig: *cfg,
SecureServing: nil, // TODO: enable with apiserveroptions.NewSecureServingOptions()
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &controlleroptions.InsecureServingOptions{
BindNetwork: "tcp",
},
Metrics: &controlleroptions.InsecureServingOptions{
BindNetwork: "tcp",
},
BindPort: hport,
BindAddress: hhost,
},
Authentication: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthenticationOptions()
Authorization: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthorizationOptions()
Deprecated: &DeprecatedOptions{
UseLegacyPolicyConfig: false,
PolicyConfigMapNamespace: metav1.NamespaceSystem,
},
}
return o, nil
}
func splitHostIntPort(s string) (string, int, error) {
host, port, err := net.SplitHostPort(s)
if err != nil {
return "", 0, err
}
portInt, err := strconv.Atoi(port)
if err != nil {
return "", 0, err
}
return host, portInt, err
}
func newDefaultComponentConfig() (*componentconfig.KubeSchedulerConfiguration, error) {
cfgv1alpha1 := componentconfigv1alpha1.KubeSchedulerConfiguration{}
configScheme.Default(&cfgv1alpha1)
cfg := componentconfig.KubeSchedulerConfiguration{}
if err := configScheme.Convert(&cfgv1alpha1, &cfg, nil); err != nil {
return nil, err
}
return &cfg, nil
}
// AddFlags adds flags for the scheduler options.
func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
o.SecureServing.AddFlags(fs)
o.CombinedInsecureServing.AddFlags(fs)
o.Authentication.AddFlags(fs)
o.Authorization.AddFlags(fs)
o.Deprecated.AddFlags(fs, &o.ComponentConfig)
leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, fs)
utilfeature.DefaultFeatureGate.AddFlag(fs)
}
// ApplyTo applies the scheduler options to the given scheduler app configuration.
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
glog.Warning("WARNING: all flags other than --config, --write-config-to, and --cleanup are deprecated. Please begin using a config file ASAP.")
}
if len(o.ConfigFile) == 0 {
c.ComponentConfig = o.ComponentConfig
// only apply deprecated flags if no config file is loaded (this is the old behaviour).
if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
return err
}
if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
return err
}
} else {
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// use the loaded config file only, with the exception of --address and --port. This means that
// none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
// behaviour of the flags we have to keep.
c.ComponentConfig = *cfg
if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
return err
}
}
if err := o.SecureServing.ApplyTo(&c.SecureServing); err != nil {
return err
}
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
}
return o.Authorization.ApplyTo(&c.Authorization)
}
// Validate validates all the required options.
func (o *Options) Validate() []error {
var errs []error
errs = append(errs, o.SecureServing.Validate()...)
errs = append(errs, o.CombinedInsecureServing.Validate()...)
errs = append(errs, o.Authentication.Validate()...)
errs = append(errs, o.Authorization.Validate()...)
errs = append(errs, o.Deprecated.Validate()...)
return errs
}
// Config return a scheduler config object
func (o *Options) Config() (*schedulerappconfig.Config, error) {
// prepare kube clients.
client, leaderElectionClient, eventClient, err := createClients(o.ComponentConfig.ClientConnection, o.Master)
if err != nil {
return nil, err
}
// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: o.ComponentConfig.SchedulerName})
// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
if o.ComponentConfig.LeaderElection.LeaderElect {
leaderElectionConfig, err = makeLeaderElectionConfig(o.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
if err != nil {
return nil, err
}
}
c := &schedulerappconfig.Config{
Client: client,
InformerFactory: informers.NewSharedInformerFactory(client, 0),
PodInformer: factory.NewPodInformer(client, 0),
EventClient: eventClient,
Recorder: recorder,
Broadcaster: eventBroadcaster,
LeaderElection: leaderElectionConfig,
}
if err := o.ApplyTo(c); err != nil {
return nil, err
}
return c, nil
}
// makeLeaderElectionConfig builds a leader election configuration. It will
// create a new resource lock associated with the configuration.
func makeLeaderElectionConfig(config componentconfig.KubeSchedulerLeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("unable to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(config.ResourceLock,
config.LockObjectNamespace,
config.LockObjectName,
client.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
return nil, fmt.Errorf("couldn't create resource lock: %v", err)
}
return &leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: config.LeaseDuration.Duration,
RenewDeadline: config.RenewDeadline.Duration,
RetryPeriod: config.RetryPeriod.Duration,
}, nil
}
// createClients creates a kube client and an event client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClients(config componentconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, clientset.Interface, v1core.EventsGetter, error) {
if len(config.KubeConfigFile) == 0 && len(masterOverride) == 0 {
glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
}
// This creates a client, first loading any specified kubeconfig
// file, and then overriding the Master flag, if non-empty.
kubeConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.KubeConfigFile},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
if err != nil {
return nil, nil, nil, err
}
kubeConfig.AcceptContentTypes = config.AcceptContentTypes
kubeConfig.ContentType = config.ContentType
kubeConfig.QPS = config.QPS
//TODO make config struct use int instead of int32?
kubeConfig.Burst = int(config.Burst)
client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "scheduler"))
if err != nil {
return nil, nil, nil, err
}
leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "leader-election"))
if err != nil {
return nil, nil, nil, err
}
eventClient, err := clientset.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, nil, err
}
return client, leaderElectionClient, eventClient.CoreV1(), nil
}

View File

@ -18,48 +18,36 @@ limitations under the License.
package app
import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"reflect"
goruntime "runtime"
"strconv"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/componentconfig"
componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
@ -73,300 +61,11 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
// SchedulerServer has all the context and params needed to run a Scheduler
type Options struct {
// ConfigFile is the location of the scheduler server's configuration file.
ConfigFile string
// WriteConfigTo is the path where the default configuration will be written.
WriteConfigTo string
// config is the scheduler server's configuration object.
config *componentconfig.KubeSchedulerConfiguration
scheme *runtime.Scheme
codecs serializer.CodecFactory
// The fields below here are placeholders for flags that can't be directly
// mapped into componentconfig.KubeSchedulerConfiguration.
//
// TODO remove these fields once the deprecated flags are removed.
// master is the address of the Kubernetes API server (overrides any
// value in kubeconfig).
master string
healthzAddress string
healthzPort int32
policyConfigFile string
policyConfigMapName string
policyConfigMapNamespace string
useLegacyPolicyConfig bool
algorithmProvider string
}
// AddFlags adds flags for a specific SchedulerServer to the specified FlagSet
func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file.")
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
// All flags below here are deprecated and will eventually be removed.
fs.Int32Var(&o.healthzPort, "port", ports.SchedulerPort, "The port that the scheduler's http service runs on")
fs.StringVar(&o.healthzAddress, "address", o.healthzAddress, "The IP address to serve on (set to 0.0.0.0 for all IPv4 interfaces and :: for all IPv6 interfaces).")
fs.StringVar(&o.algorithmProvider, "algorithm-provider", o.algorithmProvider, "The scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders())
fs.StringVar(&o.policyConfigFile, "policy-config-file", o.policyConfigFile, "File with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config=true")
usage := fmt.Sprintf("Name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config=false. The config must be provided as the value of an element in 'Data' map with the key='%v'", componentconfig.SchedulerPolicyConfigMapKey)
fs.StringVar(&o.policyConfigMapName, "policy-configmap", o.policyConfigMapName, usage)
fs.StringVar(&o.policyConfigMapNamespace, "policy-configmap-namespace", o.policyConfigMapNamespace, "The namespace where policy ConfigMap is located. The kube-system namespace will be used if this is not provided or is empty.")
fs.BoolVar(&o.useLegacyPolicyConfig, "use-legacy-policy-config", o.useLegacyPolicyConfig, "When set to true, scheduler will ignore policy ConfigMap and uses policy config file")
fs.BoolVar(&o.config.EnableProfiling, "profiling", o.config.EnableProfiling, "Enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&o.config.EnableContentionProfiling, "contention-profiling", o.config.EnableContentionProfiling, "Enable lock contention profiling, if profiling is enabled")
fs.StringVar(&o.master, "master", o.master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.StringVar(&o.config.ClientConnection.KubeConfigFile, "kubeconfig", o.config.ClientConnection.KubeConfigFile, "Path to kubeconfig file with authorization and master location information.")
fs.StringVar(&o.config.ClientConnection.ContentType, "kube-api-content-type", o.config.ClientConnection.ContentType, "Content type of requests sent to apiserver.")
fs.Float32Var(&o.config.ClientConnection.QPS, "kube-api-qps", o.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver")
fs.Int32Var(&o.config.ClientConnection.Burst, "kube-api-burst", o.config.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&o.config.SchedulerName, "scheduler-name", o.config.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's \"spec.schedulerName\".")
fs.StringVar(&o.config.LeaderElection.LockObjectNamespace, "lock-object-namespace", o.config.LeaderElection.LockObjectNamespace, "Define the namespace of the lock object.")
fs.StringVar(&o.config.LeaderElection.LockObjectName, "lock-object-name", o.config.LeaderElection.LockObjectName, "Define the name of the lock object.")
fs.Int32Var(&o.config.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", o.config.HardPodAffinitySymmetricWeight,
"RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+
"to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.")
fs.MarkDeprecated("hard-pod-affinity-symmetric-weight", "This option was moved to the policy configuration file")
fs.StringVar(&o.config.FailureDomains, "failure-domains", o.config.FailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.")
fs.MarkDeprecated("failure-domains", "Doesn't have any effect. Will be removed in future version.")
leaderelectionconfig.BindFlags(&o.config.LeaderElection.LeaderElectionConfiguration, fs)
utilfeature.DefaultFeatureGate.AddFlag(fs)
}
func NewOptions() (*Options, error) {
o := &Options{
config: new(componentconfig.KubeSchedulerConfiguration),
useLegacyPolicyConfig: false,
policyConfigMapNamespace: metav1.NamespaceSystem,
}
o.scheme = runtime.NewScheme()
o.codecs = serializer.NewCodecFactory(o.scheme)
if err := componentconfig.AddToScheme(o.scheme); err != nil {
return nil, err
}
if err := componentconfigv1alpha1.AddToScheme(o.scheme); err != nil {
return nil, err
}
externalConfig := &componentconfigv1alpha1.KubeSchedulerConfiguration{}
// Assume we are starting with an empty external configuration, we apply
// defaults and then convert it into an internal data structure. This helps
// ensure that all the defaults are applied correctly (example LeaderElect)
o.scheme.Default(externalConfig)
if err := o.scheme.Convert(externalConfig, o.config, nil); err != nil {
return nil, err
}
return o, nil
}
func (o *Options) Complete() error {
if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
glog.Warning("WARNING: all flags other than --config, --write-config-to, and --cleanup are deprecated. Please begin using a config file ASAP.")
o.applyDeprecatedHealthzAddressToConfig()
o.applyDeprecatedHealthzPortToConfig()
o.applyDeprecatedAlgorithmSourceOptionsToConfig()
}
if len(o.ConfigFile) > 0 {
if c, err := o.loadConfigFromFile(o.ConfigFile); err != nil {
return err
} else {
o.config = c
}
}
// Apply algorithms based on feature gates.
// TODO: make configurable?
algorithmprovider.ApplyFeatureGates()
return nil
}
// applyDeprecatedHealthzAddressToConfig sets o.config.HealthzBindAddress and
// o.config.MetricsBindAddress from flags passed on the command line based on
// the following rules:
//
// 1. If --address is empty, leave the config as-is.
// 2. Otherwise, use the value of --address for the address portion of
// o.config.HealthzBindAddress
func (o *Options) applyDeprecatedHealthzAddressToConfig() {
if len(o.healthzAddress) == 0 {
return
}
_, port, err := net.SplitHostPort(o.config.HealthzBindAddress)
if err != nil {
glog.Fatalf("invalid healthz bind address %q: %v", o.config.HealthzBindAddress, err)
}
o.config.HealthzBindAddress = net.JoinHostPort(o.healthzAddress, port)
o.config.MetricsBindAddress = net.JoinHostPort(o.healthzAddress, port)
}
// applyDeprecatedHealthzPortToConfig sets o.config.HealthzBindAddress and
// o.config.MetricsBindAddress from flags passed on the command line based on
// the following rules:
//
// 1. If --port is -1, disable the healthz server.
// 2. Otherwise, use the value of --port for the port portion of
// o.config.HealthzBindAddress
func (o *Options) applyDeprecatedHealthzPortToConfig() {
if o.healthzPort == -1 {
o.config.HealthzBindAddress = ""
o.config.MetricsBindAddress = ""
return
}
host, _, err := net.SplitHostPort(o.config.HealthzBindAddress)
if err != nil {
glog.Fatalf("invalid healthz bind address %q: %v", o.config.HealthzBindAddress, err)
}
o.config.HealthzBindAddress = net.JoinHostPort(host, strconv.Itoa(int(o.healthzPort)))
o.config.MetricsBindAddress = net.JoinHostPort(host, strconv.Itoa(int(o.healthzPort)))
}
// applyDeprecatedAlgorithmSourceOptionsToConfig sets o.config.AlgorithmSource from
// flags passed on the command line in the following precedence order:
//
// 1. --use-legacy-policy-config to use a policy file.
// 2. --policy-configmap to use a policy config map value.
// 3. --algorithm-provider to use a named algorithm provider.
func (o *Options) applyDeprecatedAlgorithmSourceOptionsToConfig() {
switch {
case o.useLegacyPolicyConfig || (len(o.policyConfigFile) > 0 && o.policyConfigMapName == ""):
o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
Policy: &componentconfig.SchedulerPolicySource{
File: &componentconfig.SchedulerPolicyFileSource{
Path: o.policyConfigFile,
},
},
}
case len(o.policyConfigMapName) > 0:
o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
Policy: &componentconfig.SchedulerPolicySource{
ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{
Name: o.policyConfigMapName,
Namespace: o.policyConfigMapNamespace,
},
},
}
case len(o.algorithmProvider) > 0:
o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
Provider: &o.algorithmProvider,
}
}
}
// Validate validates all the required options.
func (o *Options) Validate(args []string) error {
if len(args) != 0 {
return errors.New("no arguments are supported")
}
return nil
}
// loadConfigFromFile loads the contents of file and decodes it as a
// KubeSchedulerConfiguration object.
func (o *Options) loadConfigFromFile(file string) (*componentconfig.KubeSchedulerConfiguration, error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
return o.loadConfig(data)
}
// loadConfig decodes data as a KubeSchedulerConfiguration object.
func (o *Options) loadConfig(data []byte) (*componentconfig.KubeSchedulerConfiguration, error) {
configObj, gvk, err := o.codecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil {
return nil, err
}
config, ok := configObj.(*componentconfig.KubeSchedulerConfiguration)
if !ok {
return nil, fmt.Errorf("got unexpected config type: %v", gvk)
}
return config, nil
}
func (o *Options) ApplyDefaults(in *componentconfig.KubeSchedulerConfiguration) (*componentconfig.KubeSchedulerConfiguration, error) {
external, err := o.scheme.ConvertToVersion(in, componentconfigv1alpha1.SchemeGroupVersion)
if err != nil {
return nil, err
}
o.scheme.Default(external)
internal, err := o.scheme.ConvertToVersion(external, componentconfig.SchemeGroupVersion)
if err != nil {
return nil, err
}
out := internal.(*componentconfig.KubeSchedulerConfiguration)
return out, nil
}
func (o *Options) Run() error {
// If user only want to generate a configure file.
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}
server, err := NewSchedulerServer(o.config, o.master)
if err != nil {
return err
}
stop := make(chan struct{})
return server.Run(stop)
}
func (o *Options) writeConfigFile() error {
var encoder runtime.Encoder
mediaTypes := o.codecs.SupportedMediaTypes()
for _, info := range mediaTypes {
if info.MediaType == "application/yaml" {
encoder = info.Serializer
break
}
}
if encoder == nil {
return errors.New("unable to locate yaml encoder")
}
encoder = json.NewYAMLSerializer(json.DefaultMetaFactory, o.scheme, o.scheme)
encoder = o.codecs.EncoderForVersion(encoder, componentconfigv1alpha1.SchemeGroupVersion)
configFile, err := os.Create(o.WriteConfigTo)
if err != nil {
return err
}
defer configFile.Close()
if err := encoder.Encode(o.config, configFile); err != nil {
return err
}
glog.Infof("Wrote configuration to: %s\n", o.WriteConfigTo)
return nil
}
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
opts, err := NewOptions()
opts, err := options.NewOptions()
if err != nil {
glog.Fatalf("unable to initialize command options: %v", err)
}
@ -384,15 +83,36 @@ through the API as necessary.`,
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags())
cmdutil.CheckErr(opts.Complete())
cmdutil.CheckErr(opts.Validate(args))
cmdutil.CheckErr(opts.Run())
},
}
if len(args) != 0 {
fmt.Fprint(os.Stderr, "arguments are not supported\n")
}
opts.config, err = opts.ApplyDefaults(opts.config)
if err != nil {
glog.Fatalf("unable to apply config defaults: %v", err)
if errs := opts.Validate(); len(errs) > 0 {
fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
os.Exit(1)
}
if len(opts.WriteConfigTo) > 0 {
if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
glog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
return
}
c, err := opts.Config()
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
stopCh := make(chan struct{})
if err := Run(c.Complete(), stopCh); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
opts.AddFlags(cmd.Flags())
@ -401,210 +121,23 @@ through the API as necessary.`,
return cmd
}
// SchedulerServer represents all the parameters required to start the
// Kubernetes scheduler server.
type SchedulerServer struct {
SchedulerName string
Client clientset.Interface
InformerFactory informers.SharedInformerFactory
PodInformer coreinformers.PodInformer
AlgorithmSource componentconfig.SchedulerAlgorithmSource
HardPodAffinitySymmetricWeight int32
EventClient v1core.EventsGetter
Recorder record.EventRecorder
Broadcaster record.EventBroadcaster
// LeaderElection is optional.
LeaderElection *leaderelection.LeaderElectionConfig
// HealthzServer is optional.
HealthzServer *http.Server
// MetricsServer is optional.
MetricsServer *http.Server
// Disable pod preemption or not.
DisablePreemption bool
}
// NewSchedulerServer creates a runnable SchedulerServer from configuration.
func NewSchedulerServer(config *componentconfig.KubeSchedulerConfiguration, master string) (*SchedulerServer, error) {
if config == nil {
return nil, errors.New("config is required")
}
// Configz registration.
// only register if we're actually exposing it somewhere
if len(config.MetricsBindAddress) > 0 || len(config.HealthzBindAddress) > 0 {
if c, err := configz.New("componentconfig"); err == nil {
c.Set(config)
} else {
return nil, fmt.Errorf("unable to register configz: %s", err)
}
}
// Prepare some Kube clients.
client, leaderElectionClient, eventClient, err := createClients(config.ClientConnection, master)
if err != nil {
return nil, err
}
// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: config.SchedulerName})
// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
if config.LeaderElection.LeaderElect {
leaderElectionConfig, err = makeLeaderElectionConfig(config.LeaderElection, leaderElectionClient, recorder)
if err != nil {
return nil, err
}
}
// Prepare a healthz server. If the metrics bind address is the same as the
// healthz bind address, consolidate the servers into one.
var healthzServer *http.Server
if len(config.HealthzBindAddress) > 0 {
healthzServer = makeHealthzServer(config)
}
// Prepare a separate metrics server only if the bind address differs from the
// healthz bind address.
var metricsServer *http.Server
if len(config.MetricsBindAddress) > 0 && config.HealthzBindAddress != config.MetricsBindAddress {
metricsServer = makeMetricsServer(config)
}
return &SchedulerServer{
SchedulerName: config.SchedulerName,
Client: client,
InformerFactory: informers.NewSharedInformerFactory(client, 0),
PodInformer: factory.NewPodInformer(client, 0),
AlgorithmSource: config.AlgorithmSource,
HardPodAffinitySymmetricWeight: config.HardPodAffinitySymmetricWeight,
EventClient: eventClient,
Recorder: recorder,
Broadcaster: eventBroadcaster,
LeaderElection: leaderElectionConfig,
HealthzServer: healthzServer,
MetricsServer: metricsServer,
DisablePreemption: config.DisablePreemption,
}, nil
}
// makeLeaderElectionConfig builds a leader election configuration. It will
// create a new resource lock associated with the configuration.
func makeLeaderElectionConfig(config componentconfig.KubeSchedulerLeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("unable to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(config.ResourceLock,
config.LockObjectNamespace,
config.LockObjectName,
client.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
return nil, fmt.Errorf("couldn't create resource lock: %v", err)
}
return &leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: config.LeaseDuration.Duration,
RenewDeadline: config.RenewDeadline.Duration,
RetryPeriod: config.RetryPeriod.Duration,
}, nil
}
// makeHealthzServer creates a healthz server from the config, and will also
// embed the metrics handler if the healthz and metrics address configurations
// are the same.
func makeHealthzServer(config *componentconfig.KubeSchedulerConfiguration) *http.Server {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
healthz.InstallHandler(pathRecorderMux)
if config.HealthzBindAddress == config.MetricsBindAddress {
configz.InstallHandler(pathRecorderMux)
pathRecorderMux.Handle("/metrics", prometheus.Handler())
}
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)
if config.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
}
return &http.Server{
Addr: config.HealthzBindAddress,
Handler: pathRecorderMux,
}
}
// makeMetricsServer builds a metrics server from the config.
func makeMetricsServer(config *componentconfig.KubeSchedulerConfiguration) *http.Server {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
configz.InstallHandler(pathRecorderMux)
pathRecorderMux.Handle("/metrics", prometheus.Handler())
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)
if config.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
}
return &http.Server{
Addr: config.MetricsBindAddress,
Handler: pathRecorderMux,
}
}
// createClients creates a kube client and an event client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClients(config componentconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, clientset.Interface, v1core.EventsGetter, error) {
if len(config.KubeConfigFile) == 0 && len(masterOverride) == 0 {
glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
}
// This creates a client, first loading any specified kubeconfig
// file, and then overriding the Master flag, if non-empty.
kubeConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.KubeConfigFile},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
if err != nil {
return nil, nil, nil, err
}
kubeConfig.AcceptContentTypes = config.AcceptContentTypes
kubeConfig.ContentType = config.ContentType
kubeConfig.QPS = config.QPS
//TODO make config struct use int instead of int32?
kubeConfig.Burst = int(config.Burst)
client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "scheduler"))
if err != nil {
return nil, nil, nil, err
}
leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "leader-election"))
if err != nil {
return nil, nil, nil, err
}
eventClient, err := clientset.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, nil, err
}
return client, leaderElectionClient, eventClient.CoreV1(), nil
}
// Run runs the SchedulerServer. This should never exit.
func (s *SchedulerServer) Run(stop chan struct{}) error {
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
// Apply algorithms based on feature gates.
// TODO: make configurable?
algorithmprovider.ApplyFeatureGates()
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(c.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// Build a scheduler config from the provided algorithm source.
schedulerConfig, err := s.SchedulerConfig()
schedulerConfig, err := NewSchedulerConfig(c)
if err != nil {
return err
}
@ -613,39 +146,45 @@ func (s *SchedulerServer) Run(stop chan struct{}) error {
sched := scheduler.NewFromConfig(schedulerConfig)
// Prepare the event broadcaster.
if !reflect.ValueOf(s.Broadcaster).IsNil() && !reflect.ValueOf(s.EventClient).IsNil() {
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
if c.Broadcaster != nil && c.EventClient != nil {
c.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.EventClient.Events("")})
}
// Start up the healthz server.
if s.HealthzServer != nil {
if c.InsecureServing != nil {
separateMetrics := c.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&c.ComponentConfig, separateMetrics), nil, nil)
// TODO: fail early as all other Kubernetes binaries
go wait.Until(func() {
glog.Infof("starting healthz server on %v", s.HealthzServer.Addr)
err := s.HealthzServer.ListenAndServe()
if err != nil {
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to start healthz server: %v", err))
}
}, 5*time.Second, stop)
}, 5*time.Second, stopCh)
}
// Start up the metrics server.
if s.MetricsServer != nil {
if c.InsecureServing != nil {
handler := buildHandlerChain(newMetricsHandler(&c.ComponentConfig), nil, nil)
// TODO: fail early as all other Kubernetes binaries
go wait.Until(func() {
glog.Infof("starting metrics server on %v", s.MetricsServer.Addr)
err := s.MetricsServer.ListenAndServe()
if err != nil {
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to start metrics server: %v", err))
}
}, 5*time.Second, stop)
}, 5*time.Second, stopCh)
}
if c.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&c.ComponentConfig, false), c.Authentication.Authenticator, c.Authorization.Authorizer)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
// Start all informers.
go s.PodInformer.Informer().Run(stop)
s.InformerFactory.Start(stop)
go c.PodInformer.Informer().Run(stopCh)
c.InformerFactory.Start(stopCh)
// Wait for all caches to sync before scheduling.
s.InformerFactory.WaitForCacheSync(stop)
controller.WaitForCacheSync("scheduler", stop, s.PodInformer.Informer().HasSynced)
c.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)
// Prepare a reusable run function.
run := func(stopCh <-chan struct{}) {
@ -654,14 +193,14 @@ func (s *SchedulerServer) Run(stop chan struct{}) error {
}
// If leader election is enabled, run via LeaderElector until done and exit.
if s.LeaderElection != nil {
s.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
if c.LeaderElection != nil {
c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
utilruntime.HandleError(fmt.Errorf("lost master"))
},
}
leaderElector, err := leaderelection.NewLeaderElector(*s.LeaderElection)
leaderElector, err := leaderelection.NewLeaderElector(*c.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
@ -672,13 +211,59 @@ func (s *SchedulerServer) Run(stop chan struct{}) error {
}
// Leader election is disabled, so run inline until done.
run(stop)
run(stopCh)
return fmt.Errorf("finished without leader elect")
}
// SchedulerConfig creates the scheduler configuration. This is exposed for use
// by tests.
func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
// buildHandlerChain wraps the given handler with the standard filters.
func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler {
requestInfoResolver := &apirequest.RequestInfoFactory{}
failedHandler := genericapifilters.Unauthorized(legacyscheme.Codecs, false)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithAuthorization(handler, authz, legacyscheme.Codecs)
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}
// newMetricsHandler builds a metrics server from the config.
func newMetricsHandler(config *componentconfig.KubeSchedulerConfiguration) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
configz.InstallHandler(pathRecorderMux)
pathRecorderMux.Handle("/metrics", prometheus.Handler())
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)
if config.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
}
return pathRecorderMux
}
// newHealthzServer creates a healthz server from the config, and will also
// embed the metrics handler if the healthz and metrics address configurations
// are the same.
func newHealthzHandler(config *componentconfig.KubeSchedulerConfiguration, separateMetrics bool) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
healthz.InstallHandler(pathRecorderMux)
if !separateMetrics {
configz.InstallHandler(pathRecorderMux)
pathRecorderMux.Handle("/metrics", prometheus.Handler())
}
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)
if config.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
}
return pathRecorderMux
}
// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
@ -686,7 +271,7 @@ func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(
s.SchedulerName,
s.ComponentConfig.SchedulerName,
s.Client,
s.InformerFactory.Core().V1().Nodes(),
s.PodInformer,
@ -698,12 +283,12 @@ func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
s.InformerFactory.Core().V1().Services(),
s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
s.HardPodAffinitySymmetricWeight,
s.ComponentConfig.HardPodAffinitySymmetricWeight,
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
s.DisablePreemption,
s.ComponentConfig.DisablePreemption,
)
source := s.AlgorithmSource
source := s.ComponentConfig.AlgorithmSource
var config *scheduler.Config
switch {
case source.Provider != nil:
@ -759,6 +344,6 @@ func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
// Additional tweaks to the config produced by the configurator.
config.Recorder = s.Recorder
config.DisablePreemption = s.DisablePreemption
config.DisablePreemption = s.ComponentConfig.DisablePreemption
return config, nil
}

View File

@ -23,6 +23,7 @@ go_test(
tags = ["integration"],
deps = [
"//cmd/kube-scheduler/app:go_default_library",
"//cmd/kube-scheduler/app/config:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",

View File

@ -43,6 +43,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
schedulerapp "k8s.io/kubernetes/cmd/kube-scheduler/app"
schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/scheduler"
@ -181,17 +182,19 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
ss := &schedulerapp.SchedulerServer{
SchedulerName: v1.DefaultSchedulerName,
AlgorithmSource: componentconfig.SchedulerAlgorithmSource{
Policy: &componentconfig.SchedulerPolicySource{
ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{
Namespace: policyConfigMap.Namespace,
Name: policyConfigMap.Name,
ss := &schedulerappconfig.Config{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
SchedulerName: v1.DefaultSchedulerName,
AlgorithmSource: componentconfig.SchedulerAlgorithmSource{
Policy: &componentconfig.SchedulerPolicySource{
ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{
Namespace: policyConfigMap.Namespace,
Name: policyConfigMap.Name,
},
},
},
},
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
Client: clientSet,
InformerFactory: informerFactory,
PodInformer: factory.NewPodInformer(clientSet, 0),
@ -200,7 +203,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
Broadcaster: eventBroadcaster,
}
config, err := ss.SchedulerConfig()
config, err := schedulerapp.NewSchedulerConfig(ss.Complete())
if err != nil {
t.Fatalf("couldn't make scheduler config: %v", err)
}
@ -240,17 +243,19 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
ss := &schedulerapp.SchedulerServer{
SchedulerName: v1.DefaultSchedulerName,
AlgorithmSource: componentconfig.SchedulerAlgorithmSource{
Policy: &componentconfig.SchedulerPolicySource{
ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{
Namespace: "non-existent-config",
Name: "non-existent-config",
ss := &schedulerappconfig.Config{
ComponentConfig: componentconfig.KubeSchedulerConfiguration{
SchedulerName: v1.DefaultSchedulerName,
AlgorithmSource: componentconfig.SchedulerAlgorithmSource{
Policy: &componentconfig.SchedulerPolicySource{
ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{
Namespace: "non-existent-config",
Name: "non-existent-config",
},
},
},
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
},
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
Client: clientSet,
InformerFactory: informerFactory,
PodInformer: factory.NewPodInformer(clientSet, 0),
@ -259,7 +264,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
Broadcaster: eventBroadcaster,
}
_, err := ss.SchedulerConfig()
_, err := schedulerapp.NewSchedulerConfig(ss.Complete())
if err == nil {
t.Fatalf("Creation of scheduler didn't fail while the policy ConfigMap didn't exist.")
}