mirror of https://github.com/k3s-io/k3s
Fix duplicated rate limit in scheduler
Remove BindingRateLimiterSaturation metrics Update generated docpull/6/head
parent
e1a71fefaa
commit
080cb60dab
|
@ -171,7 +171,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
|
|||
handler.delegate = m.Handler
|
||||
|
||||
// Scheduler
|
||||
schedulerConfigFactory := factory.NewConfigFactory(cl, nil, api.DefaultSchedulerName)
|
||||
schedulerConfigFactory := factory.NewConfigFactory(cl, api.DefaultSchedulerName)
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
glog.Fatalf("Couldn't create scheduler config: %v", err)
|
||||
|
|
|
@ -56,14 +56,12 @@ kube-scheduler
|
|||
```
|
||||
--address=0.0.0.0: The IP address to serve on (set to 0.0.0.0 for all interfaces)
|
||||
--algorithm-provider="DefaultProvider": The scheduling algorithm provider to use, one of: DefaultProvider
|
||||
--bind-pods-burst=100: Number of bindings per second scheduler is allowed to make during bursts
|
||||
--bind-pods-qps=50: Number of bindings per second scheduler is allowed to continuously make
|
||||
--google-json-key="": The Google Cloud Platform Service Account JSON Key to use for authentication.
|
||||
--kube-api-burst=100: Burst to use while talking with kubernetes apiserver
|
||||
--kube-api-qps=50: QPS to use while talking with kubernetes apiserver
|
||||
--kubeconfig="": Path to kubeconfig file with authorization and master location information.
|
||||
--leader-elect[=false]: Start a leader election client and gain leadership before executing scheduler loop. Enable this when running replicated schedulers.
|
||||
--leader-elect-lease-duration=15s: The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.
|
||||
--leader-elect-lease-duration=15s: The duration that non-leader candidates will wait after observing a leadershiprenewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.
|
||||
--leader-elect-renew-deadline=10s: The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.
|
||||
--leader-elect-retry-period=2s: The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.
|
||||
--log-flush-frequency=5s: Maximum number of seconds between log flushes
|
||||
|
@ -74,7 +72,7 @@ kube-scheduler
|
|||
--scheduler-name="default-scheduler": Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'
|
||||
```
|
||||
|
||||
###### Auto generated by spf13/cobra on 12-Jan-2016
|
||||
###### Auto generated by spf13/cobra on 13-Jan-2016
|
||||
|
||||
|
||||
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
|
||||
|
|
|
@ -37,8 +37,6 @@ type SchedulerServer struct {
|
|||
EnableProfiling bool
|
||||
Master string
|
||||
Kubeconfig string
|
||||
BindPodsQPS float32
|
||||
BindPodsBurst int
|
||||
KubeAPIQPS float32
|
||||
KubeAPIBurst int
|
||||
SchedulerName string
|
||||
|
@ -51,8 +49,6 @@ func NewSchedulerServer() *SchedulerServer {
|
|||
Port: ports.SchedulerPort,
|
||||
Address: net.ParseIP("0.0.0.0"),
|
||||
AlgorithmProvider: factory.DefaultProvider,
|
||||
BindPodsQPS: 50.0,
|
||||
BindPodsBurst: 100,
|
||||
KubeAPIQPS: 50.0,
|
||||
KubeAPIBurst: 100,
|
||||
SchedulerName: api.DefaultSchedulerName,
|
||||
|
@ -70,8 +66,6 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
|
|||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
||||
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
||||
fs.Float32Var(&s.BindPodsQPS, "bind-pods-qps", s.BindPodsQPS, "Number of bindings per second scheduler is allowed to continuously make")
|
||||
fs.IntVar(&s.BindPodsBurst, "bind-pods-burst", s.BindPodsBurst, "Number of bindings per second scheduler is allowed to make during bursts")
|
||||
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
|
||||
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
|
||||
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
|
||||
|
|
|
@ -32,7 +32,6 @@ import (
|
|||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||
|
@ -99,8 +98,9 @@ func Run(s *options.SchedulerServer) error {
|
|||
glog.Fatal(server.ListenAndServe())
|
||||
}()
|
||||
|
||||
configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst), s.SchedulerName)
|
||||
configFactory := factory.NewConfigFactory(kubeClient, s.SchedulerName)
|
||||
config, err := createConfig(s, configFactory)
|
||||
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to create scheduler configuration: %v", err)
|
||||
}
|
||||
|
|
|
@ -100,7 +100,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
|||
if !reflect.DeepEqual(policy, tc.ExpectedPolicy) {
|
||||
t.Errorf("%s: Expected:\n\t%#v\nGot:\n\t%#v", v, tc.ExpectedPolicy, policy)
|
||||
}
|
||||
_, err = factory.NewConfigFactory(nil, nil, "some-scheduler-name").CreateFromConfig(policy)
|
||||
_, err = factory.NewConfigFactory(nil, "some-scheduler-name").CreateFromConfig(policy)
|
||||
if err != nil {
|
||||
t.Errorf("%s: Error constructing: %v", v, err)
|
||||
continue
|
||||
|
|
|
@ -69,8 +69,6 @@ type ConfigFactory struct {
|
|||
|
||||
// Close this to stop all reflectors
|
||||
StopEverything chan struct{}
|
||||
// Rate limiter for binding pods
|
||||
BindPodsRateLimiter util.RateLimiter
|
||||
|
||||
scheduledPodPopulator *framework.Controller
|
||||
modeler scheduler.SystemModeler
|
||||
|
@ -82,7 +80,7 @@ type ConfigFactory struct {
|
|||
}
|
||||
|
||||
// Initializes the factory.
|
||||
func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter, schedulerName string) *ConfigFactory {
|
||||
func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory {
|
||||
c := &ConfigFactory{
|
||||
Client: client,
|
||||
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
|
||||
|
@ -99,7 +97,6 @@ func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter, sched
|
|||
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister)
|
||||
c.modeler = modeler
|
||||
c.PodLister = modeler.PodLister()
|
||||
c.BindPodsRateLimiter = rateLimiter
|
||||
|
||||
// On add/delete to the scheduled pods, remove from the assumed pods.
|
||||
// We construct this here instead of in CreateFromKeys because
|
||||
|
@ -253,9 +250,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||
NextPod: func() *api.Pod {
|
||||
return f.getNextPod()
|
||||
},
|
||||
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
|
||||
BindPodsRateLimiter: f.BindPodsRateLimiter,
|
||||
StopEverything: f.StopEverything,
|
||||
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
|
||||
StopEverything: f.StopEverything,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ func TestCreate(t *testing.T) {
|
|||
// TODO: Uncomment when fix #19254
|
||||
// defer server.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
|
||||
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)
|
||||
factory := NewConfigFactory(client, api.DefaultSchedulerName)
|
||||
factory.Create()
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,7 @@ func TestCreateFromConfig(t *testing.T) {
|
|||
// TODO: Uncomment when fix #19254
|
||||
// defer server.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
|
||||
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)
|
||||
factory := NewConfigFactory(client, api.DefaultSchedulerName)
|
||||
|
||||
// Pre-register some predicate and priority functions
|
||||
RegisterFitPredicate("PredicateOne", PredicateOne)
|
||||
|
@ -108,7 +108,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
|
|||
// TODO: Uncomment when fix #19254
|
||||
// defer server.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
|
||||
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)
|
||||
factory := NewConfigFactory(client, api.DefaultSchedulerName)
|
||||
|
||||
configData = []byte(`{}`)
|
||||
err := latestschedulerapi.Codec.DecodeInto(configData, &policy)
|
||||
|
@ -152,7 +152,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
|||
server := httptest.NewServer(mux)
|
||||
// TODO: Uncomment when fix #19254
|
||||
// defer server.Close()
|
||||
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}), nil, api.DefaultSchedulerName)
|
||||
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}), api.DefaultSchedulerName)
|
||||
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
||||
podBackoff := podBackoff{
|
||||
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
|
||||
|
@ -324,9 +324,9 @@ func TestResponsibleForPod(t *testing.T) {
|
|||
// defer server.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
|
||||
// factory of "default-scheduler"
|
||||
factoryDefaultScheduler := NewConfigFactory(client, nil, api.DefaultSchedulerName)
|
||||
factoryDefaultScheduler := NewConfigFactory(client, api.DefaultSchedulerName)
|
||||
// factory of "foo-scheduler"
|
||||
factoryFooScheduler := NewConfigFactory(client, nil, "foo-scheduler")
|
||||
factoryFooScheduler := NewConfigFactory(client, "foo-scheduler")
|
||||
// scheduler annotaions to be tested
|
||||
schedulerAnnotationFitsDefault := map[string]string{"scheduler.alpha.kubernetes.io/name": "default-scheduler"}
|
||||
schedulerAnnotationFitsFoo := map[string]string{"scheduler.alpha.kubernetes.io/name": "foo-scheduler"}
|
||||
|
|
|
@ -52,13 +52,6 @@ var (
|
|||
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
|
||||
},
|
||||
)
|
||||
BindingRateLimiterSaturation = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Name: "binding_ratelimiter_saturation",
|
||||
Help: "Binding rateLimiter's saturation rate in percentage",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
|
@ -70,7 +63,6 @@ func Register() {
|
|||
prometheus.MustRegister(E2eSchedulingLatency)
|
||||
prometheus.MustRegister(SchedulingAlgorithmLatency)
|
||||
prometheus.MustRegister(BindingLatency)
|
||||
prometheus.MustRegister(BindingRateLimiterSaturation)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -76,10 +76,6 @@ type Config struct {
|
|||
Algorithm algorithm.ScheduleAlgorithm
|
||||
Binder Binder
|
||||
|
||||
// Rate at which we can create pods
|
||||
// If this field is nil, we don't have any rate limit.
|
||||
BindPodsRateLimiter util.RateLimiter
|
||||
|
||||
// NextPod should be a function that blocks until the next pod
|
||||
// is available. We don't use a channel for this, because scheduling
|
||||
// a pod may take some amount of time and we don't want pods to get
|
||||
|
@ -108,20 +104,11 @@ func New(c *Config) *Scheduler {
|
|||
|
||||
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
|
||||
func (s *Scheduler) Run() {
|
||||
if s.config.BindPodsRateLimiter != nil {
|
||||
go util.Forever(func() {
|
||||
sat := s.config.BindPodsRateLimiter.Saturation()
|
||||
metrics.BindingRateLimiterSaturation.Set(sat)
|
||||
}, metrics.BindingSaturationReportInterval)
|
||||
}
|
||||
go util.Until(s.scheduleOne, 0, s.config.StopEverything)
|
||||
}
|
||||
|
||||
func (s *Scheduler) scheduleOne() {
|
||||
pod := s.config.NextPod()
|
||||
if s.config.BindPodsRateLimiter != nil {
|
||||
s.config.BindPodsRateLimiter.Accept()
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Attempting to schedule: %+v", pod)
|
||||
start := time.Now()
|
||||
|
|
|
@ -296,72 +296,3 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
|
|||
<-called
|
||||
events.Stop()
|
||||
}
|
||||
|
||||
// Fake rate limiter that records the 'accept' tokens from the real rate limiter
|
||||
type FakeRateLimiter struct {
|
||||
r util.RateLimiter
|
||||
acceptValues []bool
|
||||
}
|
||||
|
||||
func (fr *FakeRateLimiter) TryAccept() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (fr *FakeRateLimiter) Saturation() float64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (fr *FakeRateLimiter) Stop() {}
|
||||
|
||||
func (fr *FakeRateLimiter) Accept() {
|
||||
fr.acceptValues = append(fr.acceptValues, fr.r.TryAccept())
|
||||
}
|
||||
|
||||
func TestSchedulerRateLimitsBinding(t *testing.T) {
|
||||
scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
scheduledPodLister := &cache.StoreToPodLister{Store: scheduledPodStore}
|
||||
queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
||||
queuedPodLister := &cache.StoreToPodLister{Store: queuedPodStore}
|
||||
modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister)
|
||||
|
||||
algo := NewGenericScheduler(
|
||||
map[string]algorithm.FitPredicate{},
|
||||
[]algorithm.PriorityConfig{},
|
||||
[]algorithm.SchedulerExtender{},
|
||||
modeler.PodLister(),
|
||||
rand.New(rand.NewSource(time.Now().UnixNano())))
|
||||
|
||||
// Rate limit to 1 pod
|
||||
fr := FakeRateLimiter{util.NewTokenBucketRateLimiter(0.02, 1), []bool{}}
|
||||
c := &Config{
|
||||
Modeler: modeler,
|
||||
NodeLister: algorithm.FakeNodeLister(
|
||||
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
|
||||
),
|
||||
Algorithm: algo,
|
||||
Binder: fakeBinder{func(b *api.Binding) error {
|
||||
return nil
|
||||
}},
|
||||
NextPod: func() *api.Pod {
|
||||
return queuedPodStore.Pop().(*api.Pod)
|
||||
},
|
||||
Error: func(p *api.Pod, err error) {
|
||||
t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err)
|
||||
},
|
||||
Recorder: &record.FakeRecorder{},
|
||||
BindPodsRateLimiter: &fr,
|
||||
}
|
||||
|
||||
s := New(c)
|
||||
firstPod := podWithID("foo", "")
|
||||
secondPod := podWithID("boo", "")
|
||||
queuedPodStore.Add(firstPod)
|
||||
queuedPodStore.Add(secondPod)
|
||||
|
||||
for i, hitRateLimit := range []bool{true, false} {
|
||||
s.scheduleOne()
|
||||
if fr.acceptValues[i] != hitRateLimit {
|
||||
t.Errorf("Unexpected rate limiting, expect rate limit to be: %v but found it was %v", hitRateLimit, fr.acceptValues[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ import (
|
|||
// It returns scheduler config factory and destroyFunc which should be used to
|
||||
// remove resources after finished.
|
||||
// Notes on rate limiter:
|
||||
// - The BindPodsRateLimiter is nil, meaning no rate limits.
|
||||
// - client rate limit is set to 5000.
|
||||
func mustSetupScheduler() (schedulerConfigFactory *factory.ConfigFactory, destroyFunc func()) {
|
||||
framework.DeleteAllEtcdKeys()
|
||||
|
@ -58,7 +57,7 @@ func mustSetupScheduler() (schedulerConfigFactory *factory.ConfigFactory, destro
|
|||
Burst: 5000,
|
||||
})
|
||||
|
||||
schedulerConfigFactory = factory.NewConfigFactory(c, nil, api.DefaultSchedulerName)
|
||||
schedulerConfigFactory = factory.NewConfigFactory(c, api.DefaultSchedulerName)
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
panic("Couldn't create scheduler config")
|
||||
|
|
|
@ -241,7 +241,7 @@ func TestSchedulerExtender(t *testing.T) {
|
|||
}
|
||||
policy.APIVersion = testapi.Default.GroupVersion().String()
|
||||
|
||||
schedulerConfigFactory := factory.NewConfigFactory(restClient, nil, api.DefaultSchedulerName)
|
||||
schedulerConfigFactory := factory.NewConfigFactory(restClient, api.DefaultSchedulerName)
|
||||
schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create scheduler config: %v", err)
|
||||
|
|
|
@ -66,7 +66,7 @@ func TestUnschedulableNodes(t *testing.T) {
|
|||
|
||||
restClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
|
||||
|
||||
schedulerConfigFactory := factory.NewConfigFactory(restClient, nil, api.DefaultSchedulerName)
|
||||
schedulerConfigFactory := factory.NewConfigFactory(restClient, api.DefaultSchedulerName)
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create scheduler config: %v", err)
|
||||
|
@ -309,7 +309,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||
// 1. create and start default-scheduler
|
||||
restClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
|
||||
|
||||
schedulerConfigFactory := factory.NewConfigFactory(restClient, nil, api.DefaultSchedulerName)
|
||||
schedulerConfigFactory := factory.NewConfigFactory(restClient, api.DefaultSchedulerName)
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create scheduler config: %v", err)
|
||||
|
@ -380,7 +380,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||
// 5. create and start a scheduler with name "foo-scheduler"
|
||||
restClient2 := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
|
||||
|
||||
schedulerConfigFactory2 := factory.NewConfigFactory(restClient2, nil, "foo-scheduler")
|
||||
schedulerConfigFactory2 := factory.NewConfigFactory(restClient2, "foo-scheduler")
|
||||
schedulerConfig2, err := schedulerConfigFactory2.Create()
|
||||
if err != nil {
|
||||
t.Errorf("Couldn't create scheduler config: %v", err)
|
||||
|
|
Loading…
Reference in New Issue