mirror of https://github.com/k3s-io/k3s
Merge pull request #9841 from smarterclayton/fix_apiserver_abstractions
Cleaning up apiserver method signaturespull/6/head
commit
28197e07d6
|
@ -18,12 +18,12 @@ package apiserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
gpath "path"
|
gpath "path"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
|
@ -40,7 +40,8 @@ type APIInstaller struct {
|
||||||
group *APIGroupVersion
|
group *APIGroupVersion
|
||||||
info *APIRequestInfoResolver
|
info *APIRequestInfoResolver
|
||||||
prefix string // Path prefix where API resources are to be registered.
|
prefix string // Path prefix where API resources are to be registered.
|
||||||
minRequestTimeout int
|
minRequestTimeout time.Duration
|
||||||
|
proxyDialerFn ProxyDialerFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc).
|
// Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc).
|
||||||
|
@ -55,13 +56,13 @@ type action struct {
|
||||||
var errEmptyName = errors.NewBadRequest("name must be provided")
|
var errEmptyName = errors.NewBadRequest("name must be provided")
|
||||||
|
|
||||||
// Installs handlers for API resources.
|
// Installs handlers for API resources.
|
||||||
func (a *APIInstaller) Install(proxyDialer func(network, addr string) (net.Conn, error)) (ws *restful.WebService, errors []error) {
|
func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) {
|
||||||
errors = make([]error, 0)
|
errors = make([]error, 0)
|
||||||
|
|
||||||
// Create the WebService.
|
// Create the WebService.
|
||||||
ws = a.newWebService()
|
ws = a.newWebService()
|
||||||
|
|
||||||
proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info, proxyDialer})
|
proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info, a.proxyDialerFn})
|
||||||
|
|
||||||
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
|
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
|
||||||
paths := make([]string, len(a.group.Storage))
|
paths := make([]string, len(a.group.Storage))
|
||||||
|
|
|
@ -109,6 +109,7 @@ type Mux interface {
|
||||||
// It handles URLs of the form:
|
// It handles URLs of the form:
|
||||||
// /${storage_key}[/${object_name}]
|
// /${storage_key}[/${object_name}]
|
||||||
// Where 'storage_key' points to a rest.Storage object stored in storage.
|
// Where 'storage_key' points to a rest.Storage object stored in storage.
|
||||||
|
// This object should contain all parameterization necessary for running a particular API version
|
||||||
type APIGroupVersion struct {
|
type APIGroupVersion struct {
|
||||||
Storage map[string]rest.Storage
|
Storage map[string]rest.Storage
|
||||||
|
|
||||||
|
@ -131,8 +132,13 @@ type APIGroupVersion struct {
|
||||||
|
|
||||||
Admit admission.Interface
|
Admit admission.Interface
|
||||||
Context api.RequestContextMapper
|
Context api.RequestContextMapper
|
||||||
|
|
||||||
|
ProxyDialerFn ProxyDialerFunc
|
||||||
|
MinRequestTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ProxyDialerFunc func(network, addr string) (net.Conn, error)
|
||||||
|
|
||||||
// TODO: Pipe these in through the apiserver cmd line
|
// TODO: Pipe these in through the apiserver cmd line
|
||||||
const (
|
const (
|
||||||
// Minimum duration before timing out read/write requests
|
// Minimum duration before timing out read/write requests
|
||||||
|
@ -141,16 +147,10 @@ const (
|
||||||
MaxTimeoutSecs = 600
|
MaxTimeoutSecs = 600
|
||||||
)
|
)
|
||||||
|
|
||||||
// restContainer is a wrapper around a generic restful Container that also contains a MinRequestTimeout
|
|
||||||
type RestContainer struct {
|
|
||||||
*restful.Container
|
|
||||||
MinRequestTimeout int
|
|
||||||
}
|
|
||||||
|
|
||||||
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
|
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
|
||||||
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
|
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
|
||||||
// in a slash. A restful WebService is created for the group and version.
|
// in a slash. A restful WebService is created for the group and version.
|
||||||
func (g *APIGroupVersion) InstallREST(container *RestContainer, proxyDialer func(network, addr string) (net.Conn, error)) error {
|
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
|
||||||
info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper}
|
info := &APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(g.Root, "/")), g.Mapper}
|
||||||
|
|
||||||
prefix := path.Join(g.Root, g.Version)
|
prefix := path.Join(g.Root, g.Version)
|
||||||
|
@ -158,9 +158,10 @@ func (g *APIGroupVersion) InstallREST(container *RestContainer, proxyDialer func
|
||||||
group: g,
|
group: g,
|
||||||
info: info,
|
info: info,
|
||||||
prefix: prefix,
|
prefix: prefix,
|
||||||
minRequestTimeout: container.MinRequestTimeout,
|
minRequestTimeout: g.MinRequestTimeout,
|
||||||
|
proxyDialerFn: g.ProxyDialerFn,
|
||||||
}
|
}
|
||||||
ws, registrationErrors := installer.Install(proxyDialer)
|
ws, registrationErrors := installer.Install()
|
||||||
container.Add(ws)
|
container.Add(ws)
|
||||||
return errors.NewAggregate(registrationErrors)
|
return errors.NewAggregate(registrationErrors)
|
||||||
}
|
}
|
||||||
|
|
|
@ -226,7 +226,7 @@ func handleInternal(legacy bool, storage map[string]rest.Storage, admissionContr
|
||||||
container := restful.NewContainer()
|
container := restful.NewContainer()
|
||||||
container.Router(restful.CurlyRouter{})
|
container.Router(restful.CurlyRouter{})
|
||||||
mux := container.ServeMux
|
mux := container.ServeMux
|
||||||
if err := group.InstallREST(&RestContainer{container, 0}, nil); err != nil {
|
if err := group.InstallREST(container); err != nil {
|
||||||
panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err))
|
panic(fmt.Sprintf("unable to install container %s: %v", group.Version, err))
|
||||||
}
|
}
|
||||||
ws := new(restful.WebService)
|
ws := new(restful.WebService)
|
||||||
|
@ -1938,7 +1938,7 @@ func TestParentResourceIsRequired(t *testing.T) {
|
||||||
Codec: newCodec,
|
Codec: newCodec,
|
||||||
}
|
}
|
||||||
container := restful.NewContainer()
|
container := restful.NewContainer()
|
||||||
if err := group.InstallREST(&RestContainer{container, 0}, nil); err == nil {
|
if err := group.InstallREST(container); err == nil {
|
||||||
t.Fatal("expected error")
|
t.Fatal("expected error")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1966,7 +1966,7 @@ func TestParentResourceIsRequired(t *testing.T) {
|
||||||
Codec: newCodec,
|
Codec: newCodec,
|
||||||
}
|
}
|
||||||
container = restful.NewContainer()
|
container = restful.NewContainer()
|
||||||
if err := group.InstallREST(&RestContainer{container, 0}, nil); err != nil {
|
if err := group.InstallREST(container); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -185,7 +185,7 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
|
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
|
||||||
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout int) restful.RouteFunction {
|
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction {
|
||||||
return func(req *restful.Request, res *restful.Response) {
|
return func(req *restful.Request, res *restful.Response) {
|
||||||
w := res.ResponseWriter
|
w := res.ResponseWriter
|
||||||
|
|
||||||
|
|
|
@ -66,11 +66,11 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// serveWatch handles serving requests to the server
|
// serveWatch handles serving requests to the server
|
||||||
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, minRequestTimeout int) {
|
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, minRequestTimeout time.Duration) {
|
||||||
var timeout time.Duration
|
var timeout time.Duration
|
||||||
if minRequestTimeout > 0 {
|
if minRequestTimeout > 0 {
|
||||||
// Each watch gets a random timeout between minRequestTimeout and 2*minRequestTimeout to avoid thundering herds.
|
// Each watch gets a random timeout between minRequestTimeout and 2*minRequestTimeout to avoid thundering herds.
|
||||||
timeout = time.Duration(minRequestTimeout+rand.Intn(minRequestTimeout)) * time.Second
|
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
|
||||||
}
|
}
|
||||||
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
|
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
|
||||||
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
||||||
|
|
|
@ -117,7 +117,7 @@ type Config struct {
|
||||||
RestfulContainer *restful.Container
|
RestfulContainer *restful.Container
|
||||||
|
|
||||||
// If specified, requests will be allocated a random timeout between this value, and twice this value.
|
// If specified, requests will be allocated a random timeout between this value, and twice this value.
|
||||||
// Note that it is up to the request handlers to ignore or honor this timeout.
|
// Note that it is up to the request handlers to ignore or honor this timeout. In seconds.
|
||||||
MinRequestTimeout int
|
MinRequestTimeout int
|
||||||
|
|
||||||
// Number of masters running; all masters must be started with the
|
// Number of masters running; all masters must be started with the
|
||||||
|
@ -163,10 +163,11 @@ type Master struct {
|
||||||
serviceClusterIPRange *net.IPNet
|
serviceClusterIPRange *net.IPNet
|
||||||
serviceNodePortRange util.PortRange
|
serviceNodePortRange util.PortRange
|
||||||
cacheTimeout time.Duration
|
cacheTimeout time.Duration
|
||||||
|
minRequestTimeout time.Duration
|
||||||
|
|
||||||
mux apiserver.Mux
|
mux apiserver.Mux
|
||||||
muxHelper *apiserver.MuxHelper
|
muxHelper *apiserver.MuxHelper
|
||||||
handlerContainer *apiserver.RestContainer
|
handlerContainer *restful.Container
|
||||||
rootWebService *restful.WebService
|
rootWebService *restful.WebService
|
||||||
enableCoreControllers bool
|
enableCoreControllers bool
|
||||||
enableLogsSupport bool
|
enableLogsSupport bool
|
||||||
|
@ -210,6 +211,7 @@ type Master struct {
|
||||||
InsecureHandler http.Handler
|
InsecureHandler http.Handler
|
||||||
|
|
||||||
// Used for secure proxy
|
// Used for secure proxy
|
||||||
|
dialer apiserver.ProxyDialerFunc
|
||||||
tunnels *util.SSHTunnelList
|
tunnels *util.SSHTunnelList
|
||||||
tunnelsLock sync.Mutex
|
tunnelsLock sync.Mutex
|
||||||
installSSHKey InstallSSHKey
|
installSSHKey InstallSSHKey
|
||||||
|
@ -333,7 +335,8 @@ func New(c *Config) *Master {
|
||||||
v1: !c.DisableV1,
|
v1: !c.DisableV1,
|
||||||
requestContextMapper: c.RequestContextMapper,
|
requestContextMapper: c.RequestContextMapper,
|
||||||
|
|
||||||
cacheTimeout: c.CacheTimeout,
|
cacheTimeout: c.CacheTimeout,
|
||||||
|
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
||||||
|
|
||||||
masterCount: c.MasterCount,
|
masterCount: c.MasterCount,
|
||||||
externalHost: c.ExternalHost,
|
externalHost: c.ExternalHost,
|
||||||
|
@ -355,7 +358,7 @@ func New(c *Config) *Master {
|
||||||
m.mux = mux
|
m.mux = mux
|
||||||
handlerContainer = NewHandlerContainer(mux)
|
handlerContainer = NewHandlerContainer(mux)
|
||||||
}
|
}
|
||||||
m.handlerContainer = &apiserver.RestContainer{handlerContainer, c.MinRequestTimeout}
|
m.handlerContainer = handlerContainer
|
||||||
// Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
|
// Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
|
||||||
m.handlerContainer.Router(restful.CurlyRouter{})
|
m.handlerContainer.Router(restful.CurlyRouter{})
|
||||||
m.muxHelper = &apiserver.MuxHelper{m.mux, []string{}}
|
m.muxHelper = &apiserver.MuxHelper{m.mux, []string{}}
|
||||||
|
@ -493,7 +496,7 @@ func (m *Master) init(c *Config) {
|
||||||
"componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
|
"componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
|
||||||
}
|
}
|
||||||
|
|
||||||
var proxyDialer func(net, addr string) (net.Conn, error)
|
// establish the node proxy dialer
|
||||||
if len(c.SSHUser) > 0 {
|
if len(c.SSHUser) > 0 {
|
||||||
// Usernames are capped @ 32
|
// Usernames are capped @ 32
|
||||||
if len(c.SSHUser) > 32 {
|
if len(c.SSHUser) > 32 {
|
||||||
|
@ -501,6 +504,7 @@ func (m *Master) init(c *Config) {
|
||||||
c.SSHUser = c.SSHUser[0:32]
|
c.SSHUser = c.SSHUser[0:32]
|
||||||
}
|
}
|
||||||
glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile)
|
glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile)
|
||||||
|
|
||||||
// public keyfile is written last, so check for that.
|
// public keyfile is written last, so check for that.
|
||||||
publicKeyFile := c.SSHKeyfile + ".pub"
|
publicKeyFile := c.SSHKeyfile + ".pub"
|
||||||
exists, err := util.FileExists(publicKeyFile)
|
exists, err := util.FileExists(publicKeyFile)
|
||||||
|
@ -514,14 +518,14 @@ func (m *Master) init(c *Config) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.tunnels = &util.SSHTunnelList{}
|
m.tunnels = &util.SSHTunnelList{}
|
||||||
|
m.dialer = m.Dial
|
||||||
m.setupSecureProxy(c.SSHUser, c.SSHKeyfile, publicKeyFile)
|
m.setupSecureProxy(c.SSHUser, c.SSHKeyfile, publicKeyFile)
|
||||||
proxyDialer = m.Dial
|
|
||||||
|
|
||||||
// This is pretty ugly. A better solution would be to pull this all the way up into the
|
// This is pretty ugly. A better solution would be to pull this all the way up into the
|
||||||
// server.go file.
|
// server.go file.
|
||||||
httpKubeletClient, ok := c.KubeletClient.(*client.HTTPKubeletClient)
|
httpKubeletClient, ok := c.KubeletClient.(*client.HTTPKubeletClient)
|
||||||
if ok {
|
if ok {
|
||||||
httpKubeletClient.Config.Dial = m.Dial
|
httpKubeletClient.Config.Dial = m.dialer
|
||||||
transport, err := client.MakeTransport(httpKubeletClient.Config)
|
transport, err := client.MakeTransport(httpKubeletClient.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error setting up transport over SSH: %v", err)
|
glog.Errorf("Error setting up transport over SSH: %v", err)
|
||||||
|
@ -535,29 +539,29 @@ func (m *Master) init(c *Config) {
|
||||||
|
|
||||||
apiVersions := []string{}
|
apiVersions := []string{}
|
||||||
if m.v1beta3 {
|
if m.v1beta3 {
|
||||||
if err := m.api_v1beta3().InstallREST(m.handlerContainer, proxyDialer); err != nil {
|
if err := m.api_v1beta3().InstallREST(m.handlerContainer); err != nil {
|
||||||
glog.Fatalf("Unable to setup API v1beta3: %v", err)
|
glog.Fatalf("Unable to setup API v1beta3: %v", err)
|
||||||
}
|
}
|
||||||
apiVersions = append(apiVersions, "v1beta3")
|
apiVersions = append(apiVersions, "v1beta3")
|
||||||
}
|
}
|
||||||
if m.v1 {
|
if m.v1 {
|
||||||
if err := m.api_v1().InstallREST(m.handlerContainer, proxyDialer); err != nil {
|
if err := m.api_v1().InstallREST(m.handlerContainer); err != nil {
|
||||||
glog.Fatalf("Unable to setup API v1: %v", err)
|
glog.Fatalf("Unable to setup API v1: %v", err)
|
||||||
}
|
}
|
||||||
apiVersions = append(apiVersions, "v1")
|
apiVersions = append(apiVersions, "v1")
|
||||||
}
|
}
|
||||||
|
|
||||||
apiserver.InstallSupport(m.muxHelper, m.rootWebService)
|
apiserver.InstallSupport(m.muxHelper, m.rootWebService)
|
||||||
apiserver.AddApiWebService(m.handlerContainer.Container, c.APIPrefix, apiVersions)
|
apiserver.AddApiWebService(m.handlerContainer, c.APIPrefix, apiVersions)
|
||||||
defaultVersion := m.defaultAPIGroupVersion()
|
defaultVersion := m.defaultAPIGroupVersion()
|
||||||
requestInfoResolver := &apiserver.APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(defaultVersion.Root, "/")), defaultVersion.Mapper}
|
requestInfoResolver := &apiserver.APIRequestInfoResolver{util.NewStringSet(strings.TrimPrefix(defaultVersion.Root, "/")), defaultVersion.Mapper}
|
||||||
apiserver.InstallServiceErrorHandler(m.handlerContainer.Container, requestInfoResolver, apiVersions)
|
apiserver.InstallServiceErrorHandler(m.handlerContainer, requestInfoResolver, apiVersions)
|
||||||
|
|
||||||
// Register root handler.
|
// Register root handler.
|
||||||
// We do not register this using restful Webservice since we do not want to surface this in api docs.
|
// We do not register this using restful Webservice since we do not want to surface this in api docs.
|
||||||
// Allow master to be embedded in contexts which already have something registered at the root
|
// Allow master to be embedded in contexts which already have something registered at the root
|
||||||
if c.EnableIndex {
|
if c.EnableIndex {
|
||||||
m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer.Container, m.muxHelper))
|
m.mux.HandleFunc("/", apiserver.IndexHandler(m.handlerContainer, m.muxHelper))
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.EnableLogsSupport {
|
if c.EnableLogsSupport {
|
||||||
|
@ -682,7 +686,7 @@ func (m *Master) InstallSwaggerAPI() {
|
||||||
SwaggerPath: "/swaggerui/",
|
SwaggerPath: "/swaggerui/",
|
||||||
SwaggerFilePath: "/swagger-ui/",
|
SwaggerFilePath: "/swagger-ui/",
|
||||||
}
|
}
|
||||||
swagger.RegisterSwaggerService(swaggerConfig, m.handlerContainer.Container)
|
swagger.RegisterSwaggerService(swaggerConfig, m.handlerContainer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
|
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
|
||||||
|
@ -728,6 +732,9 @@ func (m *Master) defaultAPIGroupVersion() *apiserver.APIGroupVersion {
|
||||||
|
|
||||||
Admit: m.admissionControl,
|
Admit: m.admissionControl,
|
||||||
Context: m.requestContextMapper,
|
Context: m.requestContextMapper,
|
||||||
|
|
||||||
|
ProxyDialerFn: m.dialer,
|
||||||
|
MinRequestTimeout: m.minRequestTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue