mirror of https://github.com/k3s-io/k3s
Merge pull request #43149 from deads2k/server-07-clean-kube-start
Automatic merge from submit-queue break kube-apiserver start into stages This is a code shuffle which breaks the kube-apiserver start into 1. set defaults on the options 1. create the generic config from the options 1. create the master config from the generic config and the options This makes apiserver composition easy/possible later on.pull/6/head
commit
b8fc6a093a
|
@ -69,6 +69,8 @@ go_library(
|
|||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||
"//vendor:k8s.io/apiserver/pkg/admission",
|
||||
"//vendor:k8s.io/apiserver/pkg/authentication/authenticator",
|
||||
"//vendor:k8s.io/apiserver/pkg/authorization/authorizer",
|
||||
"//vendor:k8s.io/apiserver/pkg/server",
|
||||
"//vendor:k8s.io/apiserver/pkg/server/filters",
|
||||
"//vendor:k8s.io/apiserver/pkg/server/storage",
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -45,6 +46,8 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/filters"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
|
@ -116,60 +119,25 @@ func RunServer(config *master.Config, sharedInformers informers.SharedInformerFa
|
|||
|
||||
// BuildMasterConfig creates all the resources for running the API server, but runs none of them
|
||||
func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
|
||||
// set defaults
|
||||
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil {
|
||||
// set defaults in the options before trying to create the generic config
|
||||
if err := defaultOptions(s); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error determining service IP ranges: %v", err)
|
||||
}
|
||||
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), apiServerServiceIP); err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating self-signed certificates: %v", err)
|
||||
}
|
||||
if err := s.CloudProvider.DefaultExternalHost(s.GenericServerRunOptions); err != nil {
|
||||
return nil, nil, fmt.Errorf("error setting the external host value: %v", err)
|
||||
}
|
||||
|
||||
s.Authentication.ApplyAuthorization(s.Authorization)
|
||||
|
||||
// validate options
|
||||
if errs := s.Validate(); len(errs) != 0 {
|
||||
return nil, nil, utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
// create config from options
|
||||
genericConfig := genericapiserver.NewConfig().
|
||||
WithSerializer(api.Codecs)
|
||||
genericConfig, sharedInformers, err := BuildGenericConfig(s)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.InsecureServing.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.Audit.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.Features.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil {
|
||||
return nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
|
||||
}
|
||||
|
||||
// Use protobufs for self-communication.
|
||||
// Since not every generic apiserver has to support protobufs, we
|
||||
// cannot default to it in generic apiserver and need to explicitly
|
||||
// set it in kube-apiserver.
|
||||
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
|
||||
|
||||
capabilities.Initialize(capabilities.Capabilities{
|
||||
AllowPrivileged: s.AllowPrivileged,
|
||||
// TODO(vmarmol): Implement support for HostNetworkSources.
|
||||
|
@ -217,164 +185,20 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
|
|||
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
|
||||
proxyDialerFn = nodeTunneler.Dial
|
||||
}
|
||||
|
||||
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
|
||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
|
||||
|
||||
if s.Etcd.StorageConfig.DeserializationCacheSize == 0 {
|
||||
// When size of cache is not explicitly set, estimate its size based on
|
||||
// target memory usage.
|
||||
glog.V(2).Infof("Initializing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
||||
|
||||
// This is the heuristics that from memory capacity is trying to infer
|
||||
// the maximum number of nodes in the cluster and set cache sizes based
|
||||
// on that value.
|
||||
// From our documentation, we officially recomment 120GB machines for
|
||||
// 2000 nodes, and we scale from that point. Thus we assume ~60MB of
|
||||
// capacity per node.
|
||||
// TODO: We may consider deciding that some percentage of memory will
|
||||
// be used for the deserialization cache and divide it by the max object
|
||||
// size to compute its size. We may even go further and measure
|
||||
// collective sizes of the objects in the cache.
|
||||
clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60
|
||||
s.Etcd.StorageConfig.DeserializationCacheSize = 25 * clusterSize
|
||||
if s.Etcd.StorageConfig.DeserializationCacheSize < 1000 {
|
||||
s.Etcd.StorageConfig.DeserializationCacheSize = 1000
|
||||
}
|
||||
}
|
||||
|
||||
storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error generating storage version map: %s", err)
|
||||
}
|
||||
storageFactory, err := kubeapiserver.NewStorageFactory(
|
||||
s.Etcd.StorageConfig, s.Etcd.DefaultStorageMediaType, api.Codecs,
|
||||
serverstorage.NewDefaultResourceEncodingConfig(api.Registry), storageGroupsToEncodingVersion,
|
||||
// FIXME: this GroupVersionResource override should be configurable
|
||||
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
|
||||
master.DefaultAPIResourceConfigSource(), s.APIEnablement.RuntimeConfig)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error in initializing storage factory: %s", err)
|
||||
}
|
||||
// keep Deployments in extensions for backwards compatibility, we'll have to migrate at some point, eventually
|
||||
storageFactory.AddCohabitatingResources(extensions.Resource("deployments"), apps.Resource("deployments"))
|
||||
for _, override := range s.Etcd.EtcdServersOverrides {
|
||||
tokens := strings.Split(override, "#")
|
||||
if len(tokens) != 2 {
|
||||
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
||||
continue
|
||||
}
|
||||
|
||||
apiresource := strings.Split(tokens[0], "/")
|
||||
if len(apiresource) != 2 {
|
||||
glog.Errorf("invalid resource definition: %s", tokens[0])
|
||||
continue
|
||||
}
|
||||
group := apiresource[0]
|
||||
resource := apiresource[1]
|
||||
groupResource := schema.GroupResource{Group: group, Resource: resource}
|
||||
|
||||
servers := strings.Split(tokens[1], ";")
|
||||
storageFactory.SetEtcdLocation(groupResource, servers)
|
||||
}
|
||||
|
||||
// Default to the private server key for service account token signing
|
||||
if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
|
||||
if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
|
||||
s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
|
||||
} else {
|
||||
glog.Warning("No TLS key provided, service account token authentication disabled")
|
||||
}
|
||||
}
|
||||
|
||||
authenticatorConfig := s.Authentication.ToAuthenticationConfig()
|
||||
if s.Authentication.ServiceAccounts.Lookup {
|
||||
// If we need to look up service accounts and tokens,
|
||||
// go directly to etcd to avoid recursive auth insanity
|
||||
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to get serviceaccounts storage: %v", err)
|
||||
}
|
||||
authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
|
||||
}
|
||||
|
||||
client, err := internalclientset.NewForConfig(genericConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
|
||||
if len(kubeAPIVersions) == 0 {
|
||||
return nil, nil, fmt.Errorf("failed to create clientset: %v", err)
|
||||
}
|
||||
|
||||
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
|
||||
// groups. This leads to a nil client above and undefined behaviour further down.
|
||||
//
|
||||
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
|
||||
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
|
||||
}
|
||||
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
|
||||
|
||||
if client == nil {
|
||||
// TODO: Remove check once client can never be nil.
|
||||
glog.Errorf("Failed to setup bootstrap token authenticator because the loopback clientset was not setup properly.")
|
||||
} else {
|
||||
authenticatorConfig.BootstrapTokenAuthenticator = bootstrap.NewTokenAuthenticator(
|
||||
sharedInformers.Core().InternalVersion().Secrets().Lister().Secrets(v1.NamespaceSystem),
|
||||
)
|
||||
}
|
||||
|
||||
apiAuthenticator, securityDefinitions, err := authenticatorConfig.New()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid authentication config: %v", err)
|
||||
}
|
||||
|
||||
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
|
||||
apiAuthorizer, err := authorizationConfig.New()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid Authorization Config: %v", err)
|
||||
}
|
||||
|
||||
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
|
||||
var cloudConfig []byte
|
||||
|
||||
if s.CloudProvider.CloudConfigFile != "" {
|
||||
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
|
||||
if err != nil {
|
||||
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
|
||||
}
|
||||
}
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig)
|
||||
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to read plugin config: %v", err)
|
||||
}
|
||||
admissionController, err := admission.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize plugins: %v", err)
|
||||
}
|
||||
|
||||
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
|
||||
Dial: proxyDialerFn,
|
||||
TLSClientConfig: proxyTLSClientConfig,
|
||||
})
|
||||
kubeVersion := version.Get()
|
||||
|
||||
genericConfig.Version = &kubeVersion
|
||||
genericConfig.Authenticator = apiAuthenticator
|
||||
genericConfig.Authorizer = apiAuthorizer
|
||||
genericConfig.AdmissionControl = admissionController
|
||||
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme)
|
||||
genericConfig.OpenAPIConfig.PostProcessSpec = postProcessOpenAPISpecForBackwardCompatibility
|
||||
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
|
||||
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
|
||||
genericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
|
||||
genericConfig.EnableMetrics = true
|
||||
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
|
||||
sets.NewString("watch", "proxy"),
|
||||
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
|
||||
)
|
||||
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
|
||||
storageFactory, err := BuildStorageFactory(s)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
|
@ -386,6 +210,7 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
|
|||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
config := &master.Config{
|
||||
GenericConfig: genericConfig,
|
||||
|
||||
|
@ -419,13 +244,235 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
|
|||
MasterCount: s.MasterCount,
|
||||
}
|
||||
|
||||
return config, sharedInformers, nil
|
||||
}
|
||||
|
||||
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
|
||||
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, error) {
|
||||
genericConfig := genericapiserver.NewConfig().WithSerializer(api.Codecs)
|
||||
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.InsecureServing.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.Audit.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.Features.ApplyTo(genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, api.Scheme)
|
||||
genericConfig.OpenAPIConfig.PostProcessSpec = postProcessOpenAPISpecForBackwardCompatibility
|
||||
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
|
||||
genericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
|
||||
genericConfig.EnableMetrics = true
|
||||
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
|
||||
sets.NewString("watch", "proxy"),
|
||||
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
|
||||
)
|
||||
|
||||
kubeVersion := version.Get()
|
||||
genericConfig.Version = &kubeVersion
|
||||
|
||||
storageFactory, err := BuildStorageFactory(s)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Use protobufs for self-communication.
|
||||
// Since not every generic apiserver has to support protobufs, we
|
||||
// cannot default to it in generic apiserver and need to explicitly
|
||||
// set it in kube-apiserver.
|
||||
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
|
||||
|
||||
client, err := internalclientset.NewForConfig(genericConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
|
||||
if len(kubeAPIVersions) == 0 {
|
||||
return nil, nil, fmt.Errorf("failed to create clientset: %v", err)
|
||||
}
|
||||
|
||||
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
|
||||
// groups. This leads to a nil client above and undefined behaviour further down.
|
||||
//
|
||||
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
|
||||
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
|
||||
}
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
|
||||
|
||||
genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid authentication config: %v", err)
|
||||
}
|
||||
|
||||
genericConfig.Authorizer, err = BuildAuthorizer(s, sharedInformers)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("invalid authorization config: %v", err)
|
||||
}
|
||||
|
||||
genericConfig.AdmissionControl, err = BuildAdmission(s, client, sharedInformers, genericConfig.Authorizer)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
|
||||
}
|
||||
|
||||
return genericConfig, sharedInformers, nil
|
||||
}
|
||||
|
||||
// BuildAdmission constructs the admission chain
|
||||
func BuildAdmission(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.Interface, error) {
|
||||
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
|
||||
var cloudConfig []byte
|
||||
var err error
|
||||
|
||||
if s.CloudProvider.CloudConfigFile != "" {
|
||||
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
|
||||
if err != nil {
|
||||
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
|
||||
}
|
||||
}
|
||||
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig)
|
||||
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read plugin config: %v", err)
|
||||
}
|
||||
return admission.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
|
||||
}
|
||||
|
||||
// BuildAuthenticator constructs the authenticator
|
||||
func BuildAuthenticator(s *options.ServerRunOptions, storageFactory serverstorage.StorageFactory, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory) (authenticator.Request, *spec.SecurityDefinitions, error) {
|
||||
authenticatorConfig := s.Authentication.ToAuthenticationConfig()
|
||||
if s.Authentication.ServiceAccounts.Lookup {
|
||||
// we have to go direct to storage because the clientsets fail when they're initialized with some API versions excluded
|
||||
// we should stop trying to control them like that.
|
||||
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to get serviceaccounts storage: %v", err)
|
||||
}
|
||||
authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
|
||||
}
|
||||
if client == nil || reflect.ValueOf(client).IsNil() {
|
||||
// TODO: Remove check once client can never be nil.
|
||||
glog.Errorf("Failed to setup bootstrap token authenticator because the loopback clientset was not setup properly.")
|
||||
} else {
|
||||
authenticatorConfig.BootstrapTokenAuthenticator = bootstrap.NewTokenAuthenticator(
|
||||
sharedInformers.Core().InternalVersion().Secrets().Lister().Secrets(v1.NamespaceSystem),
|
||||
)
|
||||
}
|
||||
return authenticatorConfig.New()
|
||||
}
|
||||
|
||||
// BuildAuthorizer constructs the authorizer
|
||||
func BuildAuthorizer(s *options.ServerRunOptions, sharedInformers informers.SharedInformerFactory) (authorizer.Authorizer, error) {
|
||||
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
|
||||
return authorizationConfig.New()
|
||||
}
|
||||
|
||||
// BuildStorageFactory constructs the storage factory
|
||||
func BuildStorageFactory(s *options.ServerRunOptions) (*serverstorage.DefaultStorageFactory, error) {
|
||||
storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error generating storage version map: %s", err)
|
||||
}
|
||||
storageFactory, err := kubeapiserver.NewStorageFactory(
|
||||
s.Etcd.StorageConfig, s.Etcd.DefaultStorageMediaType, api.Codecs,
|
||||
serverstorage.NewDefaultResourceEncodingConfig(api.Registry), storageGroupsToEncodingVersion,
|
||||
// FIXME: this GroupVersionResource override should be configurable
|
||||
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
|
||||
master.DefaultAPIResourceConfigSource(), s.APIEnablement.RuntimeConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in initializing storage factory: %s", err)
|
||||
}
|
||||
|
||||
// keep Deployments in extensions for backwards compatibility, we'll have to migrate at some point, eventually
|
||||
storageFactory.AddCohabitatingResources(extensions.Resource("deployments"), apps.Resource("deployments"))
|
||||
for _, override := range s.Etcd.EtcdServersOverrides {
|
||||
tokens := strings.Split(override, "#")
|
||||
if len(tokens) != 2 {
|
||||
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
||||
continue
|
||||
}
|
||||
|
||||
apiresource := strings.Split(tokens[0], "/")
|
||||
if len(apiresource) != 2 {
|
||||
glog.Errorf("invalid resource definition: %s", tokens[0])
|
||||
continue
|
||||
}
|
||||
group := apiresource[0]
|
||||
resource := apiresource[1]
|
||||
groupResource := schema.GroupResource{Group: group, Resource: resource}
|
||||
|
||||
servers := strings.Split(tokens[1], ";")
|
||||
storageFactory.SetEtcdLocation(groupResource, servers)
|
||||
}
|
||||
|
||||
return storageFactory, nil
|
||||
}
|
||||
|
||||
func defaultOptions(s *options.ServerRunOptions) error {
|
||||
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil {
|
||||
return err
|
||||
}
|
||||
_, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error determining service IP ranges: %v", err)
|
||||
}
|
||||
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), apiServerServiceIP); err != nil {
|
||||
return fmt.Errorf("error creating self-signed certificates: %v", err)
|
||||
}
|
||||
if err := s.CloudProvider.DefaultExternalHost(s.GenericServerRunOptions); err != nil {
|
||||
return fmt.Errorf("error setting the external host value: %v", err)
|
||||
}
|
||||
|
||||
s.Authentication.ApplyAuthorization(s.Authorization)
|
||||
|
||||
// Default to the private server key for service account token signing
|
||||
if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
|
||||
if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
|
||||
s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
|
||||
} else {
|
||||
glog.Warning("No TLS key provided, service account token authentication disabled")
|
||||
}
|
||||
}
|
||||
|
||||
if s.Etcd.StorageConfig.DeserializationCacheSize == 0 {
|
||||
// When size of cache is not explicitly set, estimate its size based on
|
||||
// target memory usage.
|
||||
glog.V(2).Infof("Initializing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
||||
|
||||
// This is the heuristics that from memory capacity is trying to infer
|
||||
// the maximum number of nodes in the cluster and set cache sizes based
|
||||
// on that value.
|
||||
// From our documentation, we officially recomment 120GB machines for
|
||||
// 2000 nodes, and we scale from that point. Thus we assume ~60MB of
|
||||
// capacity per node.
|
||||
// TODO: We may consider deciding that some percentage of memory will
|
||||
// be used for the deserialization cache and divide it by the max object
|
||||
// size to compute its size. We may even go further and measure
|
||||
// collective sizes of the objects in the cache.
|
||||
clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60
|
||||
s.Etcd.StorageConfig.DeserializationCacheSize = 25 * clusterSize
|
||||
if s.Etcd.StorageConfig.DeserializationCacheSize < 1000 {
|
||||
s.Etcd.StorageConfig.DeserializationCacheSize = 1000
|
||||
}
|
||||
}
|
||||
if s.Etcd.EnableWatchCache {
|
||||
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
||||
cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
|
||||
cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
|
||||
}
|
||||
|
||||
return config, sharedInformers, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func readCAorNil(file string) ([]byte, error) {
|
||||
|
|
Loading…
Reference in New Issue