k3s/test/integration/framework/master_utils.go

515 lines
18 KiB
Go
Raw Normal View History

/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
goruntime "runtime"
"sync"
"testing"
"time"
2015-08-05 22:05:17 +00:00
"github.com/golang/glog"
2017-01-11 14:09:48 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
2017-01-04 15:39:05 +00:00
authauthenticator "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
2017-01-05 19:47:14 +00:00
authenticatorunion "k8s.io/apiserver/pkg/authentication/request/union"
2017-01-04 14:31:53 +00:00
"k8s.io/apiserver/pkg/authentication/user"
2017-01-04 15:39:05 +00:00
authauthorizer "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
2017-01-05 19:47:14 +00:00
authorizerunion "k8s.io/apiserver/pkg/authorization/union"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend"
2017-01-19 18:27:59 +00:00
restclient "k8s.io/client-go/rest"
2017-01-30 18:39:54 +00:00
"k8s.io/client-go/tools/record"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/v1"
2016-11-18 20:55:32 +00:00
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
2016-02-17 22:06:35 +00:00
"k8s.io/kubernetes/pkg/apis/batch"
2017-01-12 20:23:11 +00:00
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
2016-11-18 20:55:32 +00:00
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
policy "k8s.io/kubernetes/pkg/apis/policy/v1alpha1"
rbac "k8s.io/kubernetes/pkg/apis/rbac/v1alpha1"
storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
2017-01-10 08:49:34 +00:00
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
2016-12-14 01:18:17 +00:00
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
2016-11-18 21:35:28 +00:00
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
2017-02-08 21:18:21 +00:00
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
2016-09-29 19:10:04 +00:00
"k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/kubectl"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/util/env"
"k8s.io/kubernetes/pkg/version"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/plugin/pkg/admission/admit"
"github.com/go-openapi/spec"
"github.com/pborman/uuid"
)
const (
2015-05-21 21:10:25 +00:00
// Timeout used in benchmarks, to eg: scale an rc
DefaultTimeout = 30 * time.Minute
// Rc manifest used to create pods for benchmarks.
// TODO: Convert this to a full path?
TestRCManifest = "benchmark-controller.json"
)
// MasterComponents is a control struct for all master components started via NewMasterComponents.
// TODO: Include all master components (scheduler, nodecontroller).
// TODO: Reconcile with integration.go, currently the master used there doesn't understand
// how to restart cleanly, which is required for each iteration of a benchmark. The integration
// tests also don't make it easy to isolate and turn off components at will.
type MasterComponents struct {
// Raw http server in front of the master
ApiServer *httptest.Server
2015-07-24 11:09:49 +00:00
// Kubernetes master, contains an embedded etcd storage
KubeMaster *master.Master
// Restclient used to talk to the kubernetes master
2016-10-18 13:00:38 +00:00
ClientSet clientset.Interface
// Replication controller manager
2015-07-31 11:38:04 +00:00
ControllerManager *replicationcontroller.ReplicationManager
// Channel for stop signals to rc manager
rcStopCh chan struct{}
// Used to stop master components individually, and via MasterComponents.Stop
once sync.Once
}
// Config is a struct of configuration directives for NewMasterComponents.
type Config struct {
// If nil, a default is used, partially filled configs will not get populated.
MasterConfig *master.Config
StartReplicationManager bool
// Client throttling qps
QPS float32
// Client burst qps, also burst replicas allowed in rc manager
Burst int
// TODO: Add configs for endpoints controller, scheduler etc
}
// NewMasterComponents creates, initializes and starts master components based on the given config.
func NewMasterComponents(c *Config) *MasterComponents {
m, s := startMasterOrDie(c.MasterConfig, nil, nil)
// TODO: Allow callers to pipe through a different master url and create a client/start components using it.
glog.Infof("Master %+v", s.URL)
// TODO: caesarxuchao: remove this client when the refactoring of client libraray is done.
2017-01-12 18:17:43 +00:00
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}, QPS: c.QPS, Burst: c.Burst})
rcStopCh := make(chan struct{})
2017-02-08 21:18:21 +00:00
informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
controllerManager := replicationcontroller.NewReplicationManager(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().ReplicationControllers(), clientset, c.Burst, 4096, false)
// TODO: Support events once we can cleanly shutdown an event recorder.
controllerManager.SetEventRecorder(&record.FakeRecorder{})
if c.StartReplicationManager {
informerFactory.Start(rcStopCh)
go controllerManager.Run(goruntime.NumCPU(), rcStopCh)
}
return &MasterComponents{
ApiServer: s,
KubeMaster: m,
2016-10-18 13:00:38 +00:00
ClientSet: clientset,
ControllerManager: controllerManager,
rcStopCh: rcStopCh,
}
}
// alwaysAllow always allows an action
type alwaysAllow struct{}
func (alwaysAllow) Authorize(requestAttributes authauthorizer.Attributes) (bool, string, error) {
return true, "always allow", nil
}
// alwaysEmpty simulates "no authentication" for old tests
func alwaysEmpty(req *http.Request) (user.Info, bool, error) {
return &user.DefaultInfo{
Name: "",
}, true, nil
}
// MasterReceiver can be used to provide the master to a custom incoming server function
type MasterReceiver interface {
SetMaster(m *master.Master)
}
// MasterHolder implements
type MasterHolder struct {
Initialized chan struct{}
M *master.Master
}
func (h *MasterHolder) SetMaster(m *master.Master) {
h.M = m
close(h.Initialized)
}
// startMasterOrDie starts a kubernetes master and an httpserver to handle api requests
func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server) {
var m *master.Master
var s *httptest.Server
if incomingServer != nil {
s = incomingServer
} else {
s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
2016-09-30 16:16:32 +00:00
m.GenericAPIServer.Handler.ServeHTTP(w, req)
}))
}
if masterConfig == nil {
masterConfig = NewMasterConfig()
masterConfig.GenericConfig.EnableProfiling = true
2016-10-31 13:53:57 +00:00
masterConfig.GenericConfig.EnableMetrics = true
masterConfig.GenericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, api.Scheme)
masterConfig.GenericConfig.OpenAPIConfig.Info = &spec.Info{
InfoProps: spec.InfoProps{
Title: "Kubernetes",
Version: "unversioned",
},
}
masterConfig.GenericConfig.OpenAPIConfig.DefaultResponse = &spec.Response{
ResponseProps: spec.ResponseProps{
Description: "Default Response.",
},
}
masterConfig.GenericConfig.OpenAPIConfig.GetDefinitions = openapi.GetOpenAPIDefinitions
masterConfig.GenericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
}
2016-09-29 19:10:04 +00:00
// set the loopback client config
if masterConfig.GenericConfig.LoopbackClientConfig == nil {
masterConfig.GenericConfig.LoopbackClientConfig = &restclient.Config{QPS: 50, Burst: 100, ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
2016-09-29 19:10:04 +00:00
}
masterConfig.GenericConfig.LoopbackClientConfig.Host = s.URL
privilegedLoopbackToken := uuid.NewRandom().String()
// wrap any available authorizer
tokens := make(map[string]*user.DefaultInfo)
tokens[privilegedLoopbackToken] = &user.DefaultInfo{
Name: user.APIServerUser,
UID: uuid.NewRandom().String(),
Groups: []string{user.SystemPrivilegedGroup},
}
2016-09-29 19:10:04 +00:00
tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens)
if masterConfig.GenericConfig.Authenticator == nil {
masterConfig.GenericConfig.Authenticator = authenticatorunion.New(tokenAuthenticator, authauthenticator.RequestFunc(alwaysEmpty))
} else {
2016-09-29 19:10:04 +00:00
masterConfig.GenericConfig.Authenticator = authenticatorunion.New(tokenAuthenticator, masterConfig.GenericConfig.Authenticator)
}
2016-09-29 19:10:04 +00:00
if masterConfig.GenericConfig.Authorizer != nil {
tokenAuthorizer := authorizerfactory.NewPrivilegedGroups(user.SystemPrivilegedGroup)
2016-09-29 19:10:04 +00:00
masterConfig.GenericConfig.Authorizer = authorizerunion.New(tokenAuthorizer, masterConfig.GenericConfig.Authorizer)
} else {
masterConfig.GenericConfig.Authorizer = alwaysAllow{}
2016-09-29 19:10:04 +00:00
}
masterConfig.GenericConfig.LoopbackClientConfig.BearerToken = privilegedLoopbackToken
m, err := masterConfig.Complete().New()
if err != nil {
glog.Fatalf("error in bringing up the master: %v", err)
}
if masterReceiver != nil {
masterReceiver.SetMaster(m)
}
2016-10-31 12:48:59 +00:00
// TODO have this start method actually use the normal start sequence for the API server
// this method never actually calls the `Run` method for the API server
// fire the post hooks ourselves
m.GenericAPIServer.PrepareRun()
m.GenericAPIServer.RunPostStartHooks()
cfg := *masterConfig.GenericConfig.LoopbackClientConfig
2016-11-21 02:55:31 +00:00
cfg.ContentConfig.GroupVersion = &schema.GroupVersion{}
privilegedClient, err := restclient.RESTClientFor(&cfg)
if err != nil {
glog.Fatal(err)
}
err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (bool, error) {
result := privilegedClient.Get().AbsPath("/healthz").Do()
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
glog.Fatal(err)
}
// wait for services to be ready
if masterConfig.EnableCoreControllers {
// TODO Once /healthz is updated for posthooks, we'll wait for good health
coreClient := coreclient.NewForConfigOrDie(&cfg)
svcWatch, err := coreClient.Services(metav1.NamespaceDefault).Watch(metav1.ListOptions{})
if err != nil {
glog.Fatal(err)
}
_, err = watch.Until(30*time.Second, svcWatch, func(event watch.Event) (bool, error) {
if event.Type != watch.Added {
return false, nil
}
if event.Object.(*v1.Service).Name == "kubernetes" {
return true, nil
}
return false, nil
})
if err != nil {
glog.Fatal(err)
}
}
2015-09-30 07:56:51 +00:00
return m, s
}
func parseCIDROrDie(cidr string) *net.IPNet {
_, parsed, err := net.ParseCIDR(cidr)
if err != nil {
glog.Fatalf("error while parsing CIDR: %s", cidr)
}
return parsed
}
// return the EtcdURL
func GetEtcdURLFromEnv() string {
url := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
glog.V(4).Infof("Using KUBE_INTEGRATION_ETCD_URL=%q", url)
return url
}
// Returns a basic master config.
func NewMasterConfig() *master.Config {
2016-04-20 04:11:39 +00:00
config := storagebackend.Config{
ServerList: []string{GetEtcdURLFromEnv()},
2016-06-16 21:27:12 +00:00
// This causes the integration tests to exercise the etcd
// prefix code, so please don't change without ensuring
// sufficient coverage in other ways.
Prefix: uuid.New(),
Copier: api.Scheme,
}
info, _ := runtime.SerializerInfoForMediaType(api.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
ns := NewSingleContentTypeSerializer(api.Scheme, info)
storageFactory := genericapiserver.NewDefaultStorageFactory(config, runtime.ContentTypeJSON, ns, genericapiserver.NewDefaultResourceEncodingConfig(api.Registry), master.DefaultAPIResourceConfigSource())
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: v1.GroupName, Resource: genericapiserver.AllResources},
"",
ns)
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: autoscaling.GroupName, Resource: genericapiserver.AllResources},
"",
ns)
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: batch.GroupName, Resource: genericapiserver.AllResources},
"",
ns)
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: apps.GroupName, Resource: genericapiserver.AllResources},
"",
ns)
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: extensions.GroupName, Resource: genericapiserver.AllResources},
"",
ns)
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: policy.GroupName, Resource: genericapiserver.AllResources},
"",
ns)
2016-05-25 21:25:56 +00:00
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: rbac.GroupName, Resource: genericapiserver.AllResources},
2016-05-25 21:25:56 +00:00
"",
ns)
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: certificates.GroupName, Resource: genericapiserver.AllResources},
"",
ns)
2016-09-01 15:29:26 +00:00
storageFactory.SetSerializer(
2016-11-21 02:55:31 +00:00
schema.GroupResource{Group: storage.GroupName, Resource: genericapiserver.AllResources},
2016-09-01 15:29:26 +00:00
"",
ns)
genericConfig := genericapiserver.NewConfig().WithSerializer(api.Codecs)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
genericConfig.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
genericConfig.AdmissionControl = admit.NewAlwaysAdmit()
2016-10-31 12:48:59 +00:00
genericConfig.EnableMetrics = true
genericConfig.RESTOptionsGetter = &kubeapiserver.RESTOptionsFactory{
StorageFactory: storageFactory,
EnableWatchCache: true,
EnableGarbageCollection: true,
DeleteCollectionWorkers: 1,
}
return &master.Config{
2016-12-05 15:57:54 +00:00
GenericConfig: genericConfig,
APIResourceConfigSource: master.DefaultAPIResourceConfigSource(),
StorageFactory: storageFactory,
EnableCoreControllers: true,
KubeletClientConfig: kubeletclient.KubeletClientConfig{Port: 10250},
APIServerServicePort: 443,
MasterCount: 1,
}
}
// Returns the master config appropriate for most integration tests.
func NewIntegrationTestMasterConfig() *master.Config {
masterConfig := NewMasterConfig()
masterConfig.EnableCoreControllers = true
masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
2016-12-05 15:57:54 +00:00
masterConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource()
return masterConfig
}
func (m *MasterComponents) stopRCManager() {
close(m.rcStopCh)
}
func (m *MasterComponents) Stop(apiServer, rcManager bool) {
glog.Infof("Stopping master components")
if rcManager {
// Ordering matters because the apiServer will only shutdown when pending
// requests are done
m.once.Do(m.stopRCManager)
}
if apiServer {
m.ApiServer.Close()
}
}
2016-11-18 20:55:32 +00:00
func CreateTestingNamespace(baseName string, apiserver *httptest.Server, t *testing.T) *v1.Namespace {
// TODO: Create a namespace with a given basename.
// Currently we neither create the namespace nor delete all its contents at the end.
// But as long as tests are not using the same namespaces, this should work fine.
2016-11-18 20:55:32 +00:00
return &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
// TODO: Once we start creating namespaces, switch to GenerateName.
Name: baseName,
},
}
}
2016-11-18 20:55:32 +00:00
func DeleteTestingNamespace(ns *v1.Namespace, apiserver *httptest.Server, t *testing.T) {
// TODO: Remove all resources from a given namespace once we implement CreateTestingNamespace.
}
// RCFromManifest reads a .json file and returns the rc in it.
2016-11-18 20:55:32 +00:00
func RCFromManifest(fileName string) *v1.ReplicationController {
data, err := ioutil.ReadFile(fileName)
if err != nil {
glog.Fatalf("Unexpected error reading rc manifest %v", err)
}
2016-11-18 20:55:32 +00:00
var controller v1.ReplicationController
if err := runtime.DecodeInto(testapi.Default.Codec(), data, &controller); err != nil {
glog.Fatalf("Unexpected error reading rc manifest %v", err)
}
return &controller
}
// StopRC stops the rc via kubectl's stop library
2016-11-18 20:55:32 +00:00
func StopRC(rc *v1.ReplicationController, clientset internalclientset.Interface) error {
2016-09-21 14:20:25 +00:00
reaper, err := kubectl.ReaperFor(api.Kind("ReplicationController"), clientset)
if err != nil || reaper == nil {
return err
}
err = reaper.Stop(rc.Namespace, rc.Name, 0, nil)
if err != nil {
return err
}
return nil
}
2015-05-21 21:10:25 +00:00
// ScaleRC scales the given rc to the given replicas.
2016-11-18 20:55:32 +00:00
func ScaleRC(name, ns string, replicas int32, clientset internalclientset.Interface) (*api.ReplicationController, error) {
2016-09-21 14:20:25 +00:00
scaler, err := kubectl.ScalerFor(api.Kind("ReplicationController"), clientset)
if err != nil {
return nil, err
}
2015-08-08 01:52:23 +00:00
retry := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout}
waitForReplicas := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout}
2015-05-21 21:10:25 +00:00
err = scaler.Scale(ns, name, uint(replicas), nil, retry, waitForReplicas)
if err != nil {
return nil, err
}
2016-12-07 14:40:26 +00:00
scaled, err := clientset.Core().ReplicationControllers(ns).Get(name, metav1.GetOptions{})
if err != nil {
return nil, err
}
2015-05-21 21:10:25 +00:00
return scaled, nil
}
2016-07-04 13:16:13 +00:00
func RunAMaster(masterConfig *master.Config) (*master.Master, *httptest.Server) {
if masterConfig == nil {
masterConfig = NewMasterConfig()
masterConfig.GenericConfig.EnableProfiling = true
2016-10-31 13:53:57 +00:00
masterConfig.GenericConfig.EnableMetrics = true
}
return startMasterOrDie(masterConfig, nil, nil)
}
func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, masterReceiver MasterReceiver) (*master.Master, *httptest.Server) {
return startMasterOrDie(masterConfig, s, masterReceiver)
}
// Task is a function passed to worker goroutines by RunParallel.
// The function needs to implement its own thread safety.
type Task func(id int) error
// RunParallel spawns a goroutine per task in the given queue
func RunParallel(task Task, numTasks, numWorkers int) {
start := time.Now()
if numWorkers <= 0 {
numWorkers = numTasks
}
defer func() {
glog.Infof("RunParallel took %v for %d tasks and %d workers", time.Since(start), numTasks, numWorkers)
}()
var wg sync.WaitGroup
semCh := make(chan struct{}, numWorkers)
wg.Add(numTasks)
for id := 0; id < numTasks; id++ {
go func(id int) {
semCh <- struct{}{}
err := task(id)
if err != nil {
glog.Fatalf("Worker failed with %v", err)
}
<-semCh
wg.Done()
}(id)
}
wg.Wait()
close(semCh)
}