Merge pull request #8964 from bprashanth/apiserver_cmd

Extend read/write connection timeout of apiserver
pull/6/head
Brian Grant 2015-06-03 09:31:37 -07:00
commit 4e2d295046
11 changed files with 69 additions and 31 deletions

View File

@ -25,6 +25,7 @@ MASTER_DISK_TYPE=pd-ssd
MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-20GB}
MINION_DISK_TYPE=pd-standard
MINION_DISK_SIZE=${MINION_DISK_SIZE:-100GB}
KUBE_APISERVER_REQUEST_TIMEOUT=300
OS_DISTRIBUTION=${KUBE_OS_DISTRIBUTION:-debian}
MASTER_IMAGE=${KUBE_GCE_MASTER_IMAGE:-container-vm-v20150505}
@ -53,9 +54,9 @@ ENABLE_DOCKER_REGISTRY_CACHE=true
ENABLE_NODE_MONITORING="${KUBE_ENABLE_NODE_MONITORING:-true}"
# Optional: Cluster monitoring to setup as part of the cluster bring up:
# none - No cluster monitoring setup
# influxdb - Heapster, InfluxDB, and Grafana
# google - Heapster, Google Cloud Monitoring, and Google Cloud Logging
# none - No cluster monitoring setup
# influxdb - Heapster, InfluxDB, and Grafana
# google - Heapster, Google Cloud Monitoring, and Google Cloud Logging
ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}"
# Optional: Enable node logging.

View File

@ -50,6 +50,12 @@ function ensure-install-dir() {
cd ${INSTALL_DIR}
}
function salt-apiserver-timeout-grain() {
cat <<EOF >>/etc/salt/minion.d/grains.conf
minRequestTimeout: '$1'
EOF
}
function set-broken-motd() {
echo -e '\nBroken (or in progress) GCE Kubernetes node setup! Suggested first step:\n tail /var/log/startupscript.log\n' > /etc/motd
}
@ -538,6 +544,9 @@ function configure-salt() {
salt-run-local
if [[ "${KUBERNETES_MASTER}" == "true" ]]; then
salt-master-role
if [ -n "${KUBE_APISERVER_REQUEST_TIMEOUT:-}" ]; then
salt-apiserver-timeout-grain $KUBE_APISERVER_REQUEST_TIMEOUT
fi
else
salt-node-role
salt-docker-opts

View File

@ -47,7 +47,11 @@ ADMISSION_CONTROL: $(yaml-quote ${ADMISSION_CONTROL:-})
MASTER_IP_RANGE: $(yaml-quote ${MASTER_IP_RANGE})
CA_CERT: $(yaml-quote ${CA_CERT_BASE64:-})
EOF
if [ -n "${KUBE_APISERVER_REQUEST_TIMEOUT:-}" ]; then
cat >>$file <<EOF
KUBE_APISERVER_REQUEST_TIMEOUT: $(yaml-quote ${KUBE_APISERVER_REQUEST_TIMEOUT})
EOF
fi
if [[ "${master}" == "true" ]]; then
# Master-only env vars.
cat >>$file <<EOF

View File

@ -59,6 +59,11 @@
{% set token_auth_file = "--token_auth_file=/dev/null" -%}
{% set basic_auth_file = "" -%}
{% set min_request_timeout = "" -%}
{% if grains.minRequestTimeout is defined -%}
{% set min_request_timeout = "--min-request-timeout=" + grains.minRequestTimeout -%}
{% endif -%}
{% if grains.cloud is defined -%}
{% if grains.cloud in [ 'aws', 'gce', 'vagrant' ] -%}
{% set token_auth_file = "--token_auth_file=/srv/kubernetes/known_tokens.csv" -%}
@ -79,7 +84,7 @@
{% set runtime_config = "--runtime_config=" + grains.runtime_config -%}
{% endif -%}
{% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file -%}
{% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%}
{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure_port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address -%}
{

View File

@ -96,6 +96,7 @@ type APIServer struct {
ClusterName string
EnableProfiling bool
MaxRequestsInFlight int
MinRequestTimeout int
LongRunningRequestRE string
}
@ -204,6 +205,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
fs.StringVar(&s.ExternalHost, "external-hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)")
fs.IntVar(&s.MaxRequestsInFlight, "max-requests-inflight", 400, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.")
fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", 1800, "An optional field indicating the minimum number of seconds a handler must keep a request open before timing it out. Currently only honored by the watch request handler, which picks a randomized value above this number as the connection timeout, to spread out load.")
fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
}
@ -380,6 +382,7 @@ func (s *APIServer) Run(_ []string) error {
MasterServiceNamespace: s.MasterServiceNamespace,
ClusterName: s.ClusterName,
ExternalHost: s.ExternalHost,
MinRequestTimeout: s.MinRequestTimeout,
}
m := master.New(config)

View File

@ -37,9 +37,10 @@ import (
)
type APIInstaller struct {
group *APIGroupVersion
info *APIRequestInfoResolver
prefix string // Path prefix where API resources are to be registered.
group *APIGroupVersion
info *APIRequestInfoResolver
prefix string // Path prefix where API resources are to be registered.
minRequestTimeout int
}
// Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc).
@ -419,7 +420,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
addParams(route, action.Params)
ws.Route(route)
case "LIST": // List all resources of a kind.
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)).
Filter(m).
Doc("list objects of kind "+kind).
Operation("list"+kind).
@ -492,7 +493,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
ws.Route(route)
// TODO: deprecated
case "WATCH": // Watch a resource.
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)).
Filter(m).
Doc("watch changes to an object of kind "+kind).
Operation("watch"+kind).
@ -506,7 +507,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
ws.Route(route)
// TODO: deprecated
case "WATCHLIST": // Watch all resources of a kind.
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)).
Filter(m).
Doc("watch individual changes to a list of "+kind).
Operation("watch"+kind+"list").

View File

@ -140,17 +140,24 @@ const (
MaxTimeoutSecs = 600
)
// restContainer is a wrapper around a generic restful Container that also contains a MinRequestTimeout
type RestContainer struct {
*restful.Container
MinRequestTimeout int
}
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash. A restful WebService is created for the group and version.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
func (g *APIGroupVersion) InstallREST(container *RestContainer) error {
info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper}
prefix := path.Join(g.Root, g.Version)
installer := &APIInstaller{
group: g,
info: info,
prefix: prefix,
group: g,
info: info,
prefix: prefix,
minRequestTimeout: container.MinRequestTimeout,
}
ws, registrationErrors := installer.Install()
container.Add(ws)

View File

@ -231,7 +231,7 @@ func handleInternal(legacy bool, storage map[string]rest.Storage, admissionContr
container := restful.NewContainer()
container.Router(restful.CurlyRouter{})
mux := container.ServeMux
if err := group.InstallREST(container); err != nil {
if err := group.InstallREST(&RestContainer{container, 0}); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err))
}
ws := new(restful.WebService)
@ -1901,7 +1901,7 @@ func TestParentResourceIsRequired(t *testing.T) {
Codec: newCodec,
}
container := restful.NewContainer()
if err := group.InstallREST(container); err == nil {
if err := group.InstallREST(&RestContainer{container, 0}); err == nil {
t.Fatal("expected error")
}
@ -1929,7 +1929,7 @@ func TestParentResourceIsRequired(t *testing.T) {
Codec: newCodec,
}
container = restful.NewContainer()
if err := group.InstallREST(container); err != nil {
if err := group.InstallREST(&RestContainer{container, 0}); err != nil {
t.Fatal(err)
}

View File

@ -185,7 +185,7 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
}
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction {
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout int) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
@ -252,7 +252,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
errorJSON(err, scope.Codec, w)
return
}
serveWatch(watcher, scope, w, req)
serveWatch(watcher, scope, w, req, minRequestTimeout)
return
}

View File

@ -66,10 +66,12 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
}
// serveWatch handles serving requests to the server
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request) {
// Each watch gets a random timeout to avoid thundering herds. Rand is seeded once in the api installer.
timeout := time.Duration(MinTimeoutSecs+rand.Intn(MaxTimeoutSecs-MinTimeoutSecs)) * time.Second
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, minRequestTimeout int) {
var timeout time.Duration
if minRequestTimeout > 0 {
// Each watch gets a random timeout between minRequestTimeout and 2*minRequestTimeout to avoid thundering herds.
timeout = time.Duration(minRequestTimeout+rand.Intn(minRequestTimeout)) * time.Second
}
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
if err := setSelfLink(obj, req, scope.Namer); err != nil {
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)

View File

@ -111,6 +111,10 @@ type Config struct {
// If specified, all web services will be registered into this container
RestfulContainer *restful.Container
// If specified, requests will be allocated a random timeout between this value, and twice this value.
// Note that it is up to the request handlers to ignore or honor this timeout.
MinRequestTimeout int
// Number of masters running; all masters must be started with the
// same value for this field. (Numbers > 1 currently untested.)
MasterCount int
@ -153,7 +157,7 @@ type Master struct {
mux apiserver.Mux
muxHelper *apiserver.MuxHelper
handlerContainer *restful.Container
handlerContainer *apiserver.RestContainer
rootWebService *restful.WebService
enableCoreControllers bool
enableLogsSupport bool
@ -341,14 +345,16 @@ func New(c *Config) *Master {
serviceReadWritePort: 443,
}
var handlerContainer *restful.Container
if c.RestfulContainer != nil {
m.mux = c.RestfulContainer.ServeMux
m.handlerContainer = c.RestfulContainer
handlerContainer = c.RestfulContainer
} else {
mux := http.NewServeMux()
m.mux = mux
m.handlerContainer = NewHandlerContainer(mux)
handlerContainer = NewHandlerContainer(mux)
}
m.handlerContainer = &apiserver.RestContainer{handlerContainer, c.MinRequestTimeout}
// Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
m.handlerContainer.Router(restful.CurlyRouter{})
m.muxHelper = &apiserver.MuxHelper{m.mux, []string{}}
@ -507,16 +513,16 @@ func (m *Master) init(c *Config) {
}
apiserver.InstallSupport(m.muxHelper, m.rootWebService)
apiserver.AddApiWebService(m.handlerContainer, c.APIPrefix, apiVersions)
apiserver.AddApiWebService(m.handlerContainer.Container, c.APIPrefix, apiVersions)
defaultVersion := m.defaultAPIGroupVersion()
requestInfoResolver := &apiserver.APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(defaultVersion.Root, "/")), defaultVersion.Mapper}
apiserver.InstallServiceErrorHandler(m.handlerContainer, requestInfoResolver, apiVersions)
apiserver.InstallServiceErrorHandler(m.handlerContainer.Container, requestInfoResolver, apiVersions)
// Register root handler.
// We do not register this using restful Webservice since we do not want to surface this in api docs.
// Allow master to be embedded in contexts which already have something registered at the root
if c.EnableIndex {
m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer, m.muxHelper))
m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer.Container, m.muxHelper))
}
if c.EnableLogsSupport {
@ -649,7 +655,7 @@ func (m *Master) InstallSwaggerAPI() {
SwaggerPath: "/swaggerui/",
SwaggerFilePath: "/swagger-ui/",
}
swagger.RegisterSwaggerService(swaggerConfig, m.handlerContainer)
swagger.RegisterSwaggerService(swaggerConfig, m.handlerContainer.Container)
}
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {