streamline etcd options for aggregated api server

pull/6/head
deads2k 2017-02-07 14:35:02 -05:00
parent fd56078298
commit 470cb9d2c9
17 changed files with 84 additions and 72 deletions

View File

@ -68,7 +68,6 @@ go_library(
"//vendor:k8s.io/apiserver/pkg/endpoints/filters",
"//vendor:k8s.io/apiserver/pkg/endpoints/handlers/responsewriters",
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
"//vendor:k8s.io/apiserver/pkg/registry/generic",
"//vendor:k8s.io/apiserver/pkg/registry/generic/rest",
"//vendor:k8s.io/apiserver/pkg/registry/rest",
"//vendor:k8s.io/apiserver/pkg/server",

View File

@ -24,7 +24,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
@ -54,9 +53,6 @@ type Config struct {
// this to confirm the proxy's identity
ProxyClientCert []byte
ProxyClientKey []byte
// RESTOptionsGetter is used to construct storage for a particular resource
RESTOptionsGetter generic.RESTOptionsGetter
}
// APIAggregator contains state for a Kubernetes cluster master/api server.
@ -144,7 +140,7 @@ func (c completedConfig) New() (*APIAggregator, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, api.Registry, api.Scheme, api.ParameterCodec, api.Codecs)
apiGroupInfo.GroupMeta.GroupVersion = v1alpha1.SchemeGroupVersion
v1alpha1storage := map[string]rest.Storage{}
v1alpha1storage["apiservices"] = apiservicestorage.NewREST(c.RESTOptionsGetter)
v1alpha1storage["apiservices"] = apiservicestorage.NewREST(c.GenericConfig.RESTOptionsGetter)
apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = v1alpha1storage
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {

View File

@ -19,15 +19,11 @@ go_library(
"//pkg/kubectl/cmd/util:go_default_library",
"//vendor:github.com/pborman/uuid",
"//vendor:github.com/spf13/cobra",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/registry/generic",
"//vendor:k8s.io/apiserver/pkg/registry/generic/registry",
"//vendor:k8s.io/apiserver/pkg/server",
"//vendor:k8s.io/apiserver/pkg/server/filters",
"//vendor:k8s.io/apiserver/pkg/server/options",
"//vendor:k8s.io/apiserver/pkg/storage/storagebackend",
"//vendor:k8s.io/client-go/rest",
],
)

View File

@ -24,15 +24,11 @@ import (
"github.com/pborman/uuid"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/api"
@ -59,14 +55,11 @@ type AggregatorOptions struct {
// NewCommandStartMaster provides a CLI handler for 'start master' command
func NewCommandStartAggregator(out, err io.Writer) *cobra.Command {
o := &AggregatorOptions{
RecommendedOptions: genericoptions.NewRecommendedOptions(api.Scheme),
RecommendedOptions: genericoptions.NewRecommendedOptions(defaultEtcdPathPrefix, api.Scheme, api.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion)),
StdOut: out,
StdErr: err,
}
o.RecommendedOptions.Etcd.StorageConfig.Type = storagebackend.StorageTypeETCD3
o.RecommendedOptions.Etcd.StorageConfig.Prefix = defaultEtcdPathPrefix
o.RecommendedOptions.Etcd.StorageConfig.Codec = api.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion)
o.RecommendedOptions.SecureServing.ServingOptions.BindPort = 443
cmd := &cobra.Command{
@ -129,7 +122,6 @@ func (o AggregatorOptions) RunAggregator() error {
config := apiserver.Config{
GenericConfig: serverConfig,
RESTOptionsGetter: &restOptionsFactory{storageConfig: &o.RecommendedOptions.Etcd.StorageConfig},
CoreAPIServerClient: coreAPIServerClient,
}
@ -150,17 +142,3 @@ func (o AggregatorOptions) RunAggregator() error {
return nil
}
type restOptionsFactory struct {
storageConfig *storagebackend.Config
}
func (f *restOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
return generic.RESTOptions{
StorageConfig: f.storageConfig,
Decorator: registry.StorageWithCacher,
DeleteCollectionWorkers: 1,
EnableGarbageCollection: false,
ResourcePrefix: f.storageConfig.Prefix + "/" + resource.Group + "/" + resource.Resource,
}, nil
}

View File

@ -68,7 +68,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(api.Scheme),
Etcd: genericoptions.NewEtcdOptions(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Audit: genericoptions.NewAuditLogOptions(),

View File

@ -321,9 +321,10 @@ func Run(s *options.ServerRunOptions) error {
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
StorageFactory: storageFactory,
EnableWatchCache: s.GenericServerRunOptions.EnableWatchCache,
EnableGarbageCollection: s.Etcd.EnableGarbageCollection,
EnableWatchCache: s.Etcd.EnableWatchCache,
EnableCoreControllers: true,
DeleteCollectionWorkers: s.GenericServerRunOptions.DeleteCollectionWorkers,
DeleteCollectionWorkers: s.Etcd.DeleteCollectionWorkers,
EventTTL: s.EventTTL,
KubeletClientConfig: s.KubeletConfig,
EnableUISupport: true,
@ -342,7 +343,7 @@ func Run(s *options.ServerRunOptions) error {
MasterCount: s.MasterCount,
}
if s.GenericServerRunOptions.EnableWatchCache {
if s.Etcd.EnableWatchCache {
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)

View File

@ -68,7 +68,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(api.Scheme),
Etcd: genericoptions.NewEtcdOptions(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -51,7 +51,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(api.Scheme),
Etcd: genericoptions.NewEtcdOptions(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Audit: genericoptions.NewAuditLogOptions(),

View File

@ -198,7 +198,7 @@ func Run(s *options.ServerRunOptions) error {
)
// TODO: Move this to generic api server (Need to move the command line flag).
if s.GenericServerRunOptions.EnableWatchCache {
if s.Etcd.EnableWatchCache {
cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
}
@ -214,14 +214,15 @@ func Run(s *options.ServerRunOptions) error {
// TODO: Refactor this code to share it with kube-apiserver rather than duplicating it here.
restOptionsFactory := &restOptionsFactory{
storageFactory: storageFactory,
enableGarbageCollection: s.Features.EnableGarbageCollection,
deleteCollectionWorkers: s.GenericServerRunOptions.DeleteCollectionWorkers,
enableGarbageCollection: s.Etcd.EnableGarbageCollection,
deleteCollectionWorkers: s.Etcd.DeleteCollectionWorkers,
}
if s.GenericServerRunOptions.EnableWatchCache {
if s.Etcd.EnableWatchCache {
restOptionsFactory.storageDecorator = genericregistry.StorageWithCacher
} else {
restOptionsFactory.storageDecorator = generic.UndecoratedStorage
}
genericConfig.RESTOptionsGetter = restOptionsFactory
installFederationAPIs(m, restOptionsFactory)
installCoreAPIs(s, m, restOptionsFactory)

View File

@ -25,6 +25,10 @@ import (
"github.com/spf13/pflag"
)
const (
DefaultEtcdPathPrefix = "/registry"
)
// StorageSerializationOptions contains the options for encoding resources.
type StorageSerializationOptions struct {
StorageVersions string

View File

@ -82,6 +82,7 @@ type Config struct {
APIResourceConfigSource genericapiserver.APIResourceConfigSource
StorageFactory genericapiserver.StorageFactory
EnableGarbageCollection bool
EnableWatchCache bool
EnableCoreControllers bool
EndpointReconcilerConfig EndpointReconcilerConfig
@ -223,7 +224,7 @@ func (c completedConfig) New() (*Master, error) {
restOptionsFactory := &restOptionsFactory{
deleteCollectionWorkers: c.DeleteCollectionWorkers,
enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
enableGarbageCollection: c.EnableGarbageCollection,
storageFactory: c.StorageFactory,
}

View File

@ -50,6 +50,7 @@ import (
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
@ -92,7 +93,6 @@ type Config struct {
EnableProfiling bool
// Requires generic profiling enabled
EnableContentionProfiling bool
EnableGarbageCollection bool
EnableMetrics bool
// Version will enable the /version endpoint if non-nil
@ -131,6 +131,8 @@ type Config struct {
OpenAPIConfig *openapicommon.Config
// SwaggerConfig will be used in generating Swagger spec. This is nil by default. Use DefaultSwaggerConfig for "working" defaults.
SwaggerConfig *swagger.Config
// RESTOptionsGetter is used to construct "normal" RESTStorage types
RESTOptionsGetter genericregistry.RESTOptionsGetter
// If specified, requests will be allocated a random timeout between this value, and twice this value.
// Note that it is up to the request handlers to ignore or honor this timeout. In seconds.
@ -196,7 +198,6 @@ func NewConfig() *Config {
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
EnableIndex: true,
EnableGarbageCollection: true,
EnableProfiling: true,
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,

View File

@ -22,13 +22,13 @@ import (
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
const (
DefaultEtcdPathPrefix = "/registry"
)
type EtcdOptions struct {
StorageConfig storagebackend.Config
@ -37,18 +37,25 @@ type EtcdOptions struct {
// To enable protobuf as storage format, it is enough
// to set it to "application/vnd.kubernetes.protobuf".
DefaultStorageMediaType string
DeleteCollectionWorkers int
EnableGarbageCollection bool
EnableWatchCache bool
}
func NewEtcdOptions(scheme *runtime.Scheme) *EtcdOptions {
func NewEtcdOptions(prefix string, copier runtime.ObjectCopier, codec runtime.Codec) *EtcdOptions {
return &EtcdOptions{
StorageConfig: storagebackend.Config{
Prefix: DefaultEtcdPathPrefix,
Prefix: prefix,
// Default cache size to 0 - if unset, its size will be set based on target
// memory usage.
DeserializationCacheSize: 0,
Copier: scheme,
Copier: copier,
Codec: codec,
},
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EnableGarbageCollection: true,
EnableWatchCache: true,
}
}
@ -70,6 +77,16 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, ""+
"The media type to use to store objects in storage. Defaults to application/json. "+
"Some resources may only support a specific media type and will ignore this setting.")
fs.IntVar(&s.DeleteCollectionWorkers, "delete-collection-workers", s.DeleteCollectionWorkers,
"Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.")
fs.BoolVar(&s.EnableGarbageCollection, "enable-garbage-collector", s.EnableGarbageCollection, ""+
"Enables the generic garbage collector. MUST be synced with the corresponding flag "+
"of the kube-controller-manager.")
// TODO: enable cache in integration tests.
fs.BoolVar(&s.EnableWatchCache, "watch-cache", s.EnableWatchCache,
"Enable watch caching in the apiserver")
fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type,
"The storage backend for persistence. Options: 'etcd3' (default), 'etcd2'.")
@ -95,3 +112,31 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.StorageConfig.Quorum, "etcd-quorum-read", s.StorageConfig.Quorum,
"If true, enable quorum read.")
}
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
c.RESTOptionsGetter = &restOptionsFactory{options: s}
return nil
}
// restOptionsFactory is a default implementation of a RESTOptionsGetter
// This will work well for most aggregated API servers. The legacy kube server needs more customization
type restOptionsFactory struct {
options *EtcdOptions
}
func (f *restOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &f.options.StorageConfig,
Decorator: registry.StorageWithCacher,
DeleteCollectionWorkers: f.options.DeleteCollectionWorkers,
EnableGarbageCollection: f.options.EnableGarbageCollection,
ResourcePrefix: f.options.StorageConfig.Prefix + "/" + resource.Group + "/" + resource.Resource,
}
if !f.options.EnableWatchCache {
ret.Decorator = generic.UndecoratedStorage
}
return ret, nil
}

View File

@ -23,7 +23,6 @@ import (
)
type FeatureOptions struct {
EnableGarbageCollection bool
EnableProfiling bool
EnableContentionProfiling bool
EnableSwaggerUI bool
@ -33,7 +32,6 @@ func NewFeatureOptions() *FeatureOptions {
defaults := server.NewConfig()
return &FeatureOptions{
EnableGarbageCollection: defaults.EnableGarbageCollection,
EnableProfiling: defaults.EnableProfiling,
EnableContentionProfiling: defaults.EnableContentionProfiling,
EnableSwaggerUI: defaults.EnableSwaggerUI,
@ -41,9 +39,6 @@ func NewFeatureOptions() *FeatureOptions {
}
func (o *FeatureOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.EnableGarbageCollection, "enable-garbage-collector", o.EnableGarbageCollection, ""+
"Enables the generic garbage collector. MUST be synced with the corresponding flag "+
"of the kube-controller-manager.")
fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling,
"Enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&o.EnableContentionProfiling, "contention-profiling", o.EnableContentionProfiling,
@ -53,7 +48,6 @@ func (o *FeatureOptions) AddFlags(fs *pflag.FlagSet) {
}
func (o *FeatureOptions) ApplyTo(c *server.Config) error {
c.EnableGarbageCollection = o.EnableGarbageCollection
c.EnableProfiling = o.EnableProfiling
c.EnableContentionProfiling = o.EnableContentionProfiling
c.EnableSwaggerUI = o.EnableSwaggerUI

View File

@ -34,9 +34,9 @@ type RecommendedOptions struct {
Features *FeatureOptions
}
func NewRecommendedOptions(scheme *runtime.Scheme) *RecommendedOptions {
func NewRecommendedOptions(prefix string, copier runtime.ObjectCopier, codec runtime.Codec) *RecommendedOptions {
return &RecommendedOptions{
Etcd: NewEtcdOptions(scheme),
Etcd: NewEtcdOptions(prefix, copier, codec),
SecureServing: NewSecureServingOptions(),
Authentication: NewDelegatingAuthenticationOptions(),
Authorization: NewDelegatingAuthorizationOptions(),
@ -55,6 +55,9 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {
}
func (o *RecommendedOptions) ApplyTo(config *server.Config) error {
if err := o.Etcd.ApplyTo(config); err != nil {
return err
}
if err := o.SecureServing.ApplyTo(config); err != nil {
return err
}

View File

@ -39,8 +39,6 @@ type ServerRunOptions struct {
AdvertiseAddress net.IP
CorsAllowedOriginList []string
DeleteCollectionWorkers int
EnableWatchCache bool
ExternalHost string
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
@ -54,8 +52,6 @@ func NewServerRunOptions() *ServerRunOptions {
return &ServerRunOptions{
AdmissionControl: "AlwaysAdmit",
DeleteCollectionWorkers: 1,
EnableWatchCache: true,
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
MinRequestTimeout: defaults.MinRequestTimeout,
@ -124,13 +120,6 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"List of allowed origins for CORS, comma separated. An allowed origin can be a regular "+
"expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.IntVar(&s.DeleteCollectionWorkers, "delete-collection-workers", s.DeleteCollectionWorkers,
"Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.")
// TODO: enable cache in integration tests.
fs.BoolVar(&s.EnableWatchCache, "watch-cache", s.EnableWatchCache,
"Enable watch caching in the apiserver")
fs.IntVar(&s.TargetRAMMB, "target-ram-mb", s.TargetRAMMB,
"Memory limit for apiserver in MB (used to configure sizes of caches, etc.)")

4
vendor/BUILD vendored
View File

@ -9098,6 +9098,7 @@ go_library(
"//vendor:k8s.io/apiserver/pkg/endpoints/filters",
"//vendor:k8s.io/apiserver/pkg/endpoints/openapi",
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
"//vendor:k8s.io/apiserver/pkg/registry/generic",
"//vendor:k8s.io/apiserver/pkg/registry/rest",
"//vendor:k8s.io/apiserver/pkg/server/filters",
"//vendor:k8s.io/apiserver/pkg/server/healthz",
@ -14106,11 +14107,14 @@ go_library(
"//vendor:gopkg.in/natefinch/lumberjack.v2",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/net",
"//vendor:k8s.io/apiserver/pkg/admission",
"//vendor:k8s.io/apiserver/pkg/authentication/authenticatorfactory",
"//vendor:k8s.io/apiserver/pkg/authorization/authorizerfactory",
"//vendor:k8s.io/apiserver/pkg/features",
"//vendor:k8s.io/apiserver/pkg/registry/generic",
"//vendor:k8s.io/apiserver/pkg/registry/generic/registry",
"//vendor:k8s.io/apiserver/pkg/server",
"//vendor:k8s.io/apiserver/pkg/storage/storagebackend",
"//vendor:k8s.io/apiserver/pkg/util/feature",