mirror of https://github.com/k3s-io/k3s
Refactor common parts of scheduler_perf into reusable utils
parent
744b5d3357
commit
68e18a6a36
|
@ -64,6 +64,7 @@ filegroup(
|
|||
"//test/integration/storageclasses:all-srcs",
|
||||
"//test/integration/tls:all-srcs",
|
||||
"//test/integration/ttlcontroller:all-srcs",
|
||||
"//test/integration/util:all-srcs",
|
||||
"//test/integration/volume:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
|
|
|
@ -14,19 +14,13 @@ go_library(
|
|||
],
|
||||
importpath = "k8s.io/kubernetes/test/integration/scheduler_perf",
|
||||
deps = [
|
||||
"//pkg/api/legacyscheme:go_default_library",
|
||||
"//pkg/api/testapi:go_default_library",
|
||||
"//pkg/scheduler:go_default_library",
|
||||
"//pkg/scheduler/algorithmprovider:go_default_library",
|
||||
"//pkg/scheduler/factory:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//test/integration/util:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
|
@ -17,26 +17,15 @@ limitations under the License.
|
|||
package benchmark
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/integration/util"
|
||||
)
|
||||
|
||||
const enableEquivalenceCache = true
|
||||
|
||||
// mustSetupScheduler starts the following components:
|
||||
// - k8s api server (a.k.a. master)
|
||||
// - scheduler
|
||||
|
@ -44,63 +33,19 @@ const enableEquivalenceCache = true
|
|||
// remove resources after finished.
|
||||
// Notes on rate limiter:
|
||||
// - client rate limit is set to 5000.
|
||||
func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroyFunc func()) {
|
||||
|
||||
h := &framework.MasterHolder{Initialized: make(chan struct{})}
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
<-h.Initialized
|
||||
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
|
||||
}))
|
||||
|
||||
framework.RunAMasterUsingServer(framework.NewIntegrationTestMasterConfig(), s, h)
|
||||
|
||||
func mustSetupScheduler() (scheduler.Configurator, util.ShutdownFunc) {
|
||||
apiURL, apiShutdown := util.StartApiserver()
|
||||
clientSet := clientset.NewForConfigOrDie(&restclient.Config{
|
||||
Host: s.URL,
|
||||
Host: apiURL,
|
||||
ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()},
|
||||
QPS: 5000.0,
|
||||
Burst: 5000,
|
||||
})
|
||||
schedulerConfig, schedulerShutdown := util.StartScheduler(clientSet, true)
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
|
||||
|
||||
schedulerConfigurator = factory.NewConfigFactory(
|
||||
v1.DefaultSchedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.CoreV1().RESTClient()).Events("")})
|
||||
|
||||
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) {
|
||||
conf.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
|
||||
})
|
||||
if err != nil {
|
||||
glog.Fatalf("Error creating scheduler: %v", err)
|
||||
shutdownFunc := func() {
|
||||
schedulerShutdown()
|
||||
apiShutdown()
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
informerFactory.Start(stop)
|
||||
|
||||
sched.Run()
|
||||
|
||||
destroyFunc = func() {
|
||||
glog.Infof("destroying")
|
||||
sched.StopEverything()
|
||||
close(stop)
|
||||
s.Close()
|
||||
glog.Infof("destroyed")
|
||||
}
|
||||
return
|
||||
return schedulerConfig, shutdownFunc
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"util.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/test/integration/util",
|
||||
deps = [
|
||||
"//pkg/api/legacyscheme:go_default_library",
|
||||
"//pkg/scheduler:go_default_library",
|
||||
"//pkg/scheduler/factory:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
// ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module
|
||||
type ShutdownFunc func()
|
||||
|
||||
// StartApiserver starts a local API server for testing and returns the handle to the URL and the shutdown function to stop it.
|
||||
func StartApiserver() (string, ShutdownFunc) {
|
||||
h := &framework.MasterHolder{Initialized: make(chan struct{})}
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
<-h.Initialized
|
||||
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
|
||||
}))
|
||||
|
||||
framework.RunAMasterUsingServer(framework.NewIntegrationTestMasterConfig(), s, h)
|
||||
|
||||
shutdownFunc := func() {
|
||||
glog.Infof("destroying API server")
|
||||
s.Close()
|
||||
glog.Infof("destroyed API server")
|
||||
}
|
||||
return s.URL, shutdownFunc
|
||||
}
|
||||
|
||||
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface
|
||||
// and event broadcaster. It returns a handle to the configurator for the running scheduler
|
||||
// and the shutdown function to stop it.
|
||||
func StartScheduler(clientSet clientset.Interface, enableEquivalenceCache bool) (scheduler.Configurator, ShutdownFunc) {
|
||||
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
|
||||
|
||||
evtBroadcaster := record.NewBroadcaster()
|
||||
evtWatch := evtBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
|
||||
Interface: clientv1core.New(clientSet.CoreV1().RESTClient()).Events("")})
|
||||
|
||||
schedulerConfigurator := factory.NewConfigFactory(
|
||||
v1.DefaultSchedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
||||
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) {
|
||||
conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
|
||||
})
|
||||
if err != nil {
|
||||
glog.Fatalf("Error creating scheduler: %v", err)
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
informerFactory.Start(stop)
|
||||
|
||||
sched.Run()
|
||||
|
||||
shutdownFunc := func() {
|
||||
glog.Infof("destroying scheduler")
|
||||
evtWatch.Stop()
|
||||
sched.StopEverything()
|
||||
close(stop)
|
||||
glog.Infof("destroyed scheduler")
|
||||
}
|
||||
return schedulerConfigurator, shutdownFunc
|
||||
}
|
Loading…
Reference in New Issue