wire in aggregation

pull/6/head
deads2k 2017-03-10 13:51:02 -05:00
parent b28966b48a
commit 8e26fa25da
28 changed files with 18558 additions and 84 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ limitations under the License.
package main
import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
)
@ -32,7 +33,7 @@ func NewKubeAPIServer() *Server {
SimpleUsage: "apiserver",
Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.",
Run: func(_ *Server, args []string) error {
return app.Run(s)
return app.Run(s, wait.NeverStop)
},
}
s.AddFlags(hks.Flags())

View File

@ -25,6 +25,7 @@ go_library(
"//pkg/version/prometheus:go_default_library",
"//pkg/version/verflag:go_default_library",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/util/flag",
"//vendor:k8s.io/apiserver/pkg/util/logs",
],

View File

@ -24,6 +24,7 @@ import (
"os"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
@ -47,7 +48,7 @@ func main() {
verflag.PrintAndExitIfRequested()
if err := app.Run(s); err != nil {
if err := app.Run(s, wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

View File

@ -10,6 +10,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"aggregator.go",
"plugins.go",
"server.go",
],
@ -32,6 +33,7 @@ go_library(
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/kubeapiserver/authenticator:go_default_library",
"//pkg/master:go_default_library",
"//pkg/master/thirdparty:go_default_library",
"//pkg/master/tunneler:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/version:go_default_library",
@ -62,6 +64,7 @@ go_library(
"//vendor:github.com/spf13/cobra",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/openapi",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
@ -73,7 +76,15 @@ go_library(
"//vendor:k8s.io/apiserver/pkg/authorization/authorizer",
"//vendor:k8s.io/apiserver/pkg/server",
"//vendor:k8s.io/apiserver/pkg/server/filters",
"//vendor:k8s.io/apiserver/pkg/server/healthz",
"//vendor:k8s.io/apiserver/pkg/server/mux",
"//vendor:k8s.io/apiserver/pkg/server/options",
"//vendor:k8s.io/apiserver/pkg/server/storage",
"//vendor:k8s.io/client-go/kubernetes",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration",
"//vendor:k8s.io/kube-aggregator/pkg/apiserver",
"//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion",
"//vendor:k8s.io/kube-aggregator/pkg/controllers/autoregister",
],
)

View File

@ -0,0 +1,189 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to create a Kubernetes
// APIServer by binding together the API, master and APIServer infrastructure.
// It can be configured and called directly or via the hyperkube framework.
package app
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
genericoptions "k8s.io/apiserver/pkg/server/options"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/master/thirdparty"
)
func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config, commandOptions *options.ServerRunOptions) (*aggregatorapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
genericConfig := kubeAPIServerConfig
genericConfig.FallThroughHandler = mux.NewPathRecorderMux()
// the aggregator doesn't wire these up. It just delegates them to the kubeapiserver
genericConfig.EnableSwaggerUI = false
genericConfig.OpenAPIConfig = nil
genericConfig.SwaggerConfig = nil
// copy the loopbackclientconfig. We're going to change the contenttype back to json until we get protobuf serializations for it
t := *kubeAPIServerConfig.LoopbackClientConfig
genericConfig.LoopbackClientConfig = &t
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = ""
// copy the etcd options so we don't mutate originals.
etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Codec = aggregatorapiserver.Codecs.LegacyCodec(schema.GroupVersion{Group: "apiregistration.k8s.io", Version: "v1alpha1"})
etcdOptions.StorageConfig.Copier = aggregatorapiserver.Scheme
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
client, err := kubeclientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
var certBytes, keyBytes []byte
if len(commandOptions.ProxyClientCertFile) > 0 && len(commandOptions.ProxyClientKeyFile) > 0 {
certBytes, err = ioutil.ReadFile(commandOptions.ProxyClientCertFile)
if err != nil {
return nil, err
}
keyBytes, err = ioutil.ReadFile(commandOptions.ProxyClientKeyFile)
if err != nil {
return nil, err
}
}
aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericConfig,
CoreAPIServerClient: client,
ProxyClientCert: certBytes,
ProxyClientKey: keyBytes,
}
return aggregatorConfig, nil
}
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer, stopCh)
if err != nil {
return nil, err
}
// create controllers for auto-registration
apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(), apiRegistrationClient)
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
tprRegistrationController := thirdparty.NewAutoRegistrationController(sharedInformers.Extensions().InternalVersion().ThirdPartyResources(), autoRegistrationController)
aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go autoRegistrationController.Run(5, stopCh)
go tprRegistrationController.Run(5, stopCh)
return nil
})
aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error {
items, err := aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices().Lister().List(labels.Everything())
if err != nil {
return err
}
missing := []apiregistration.APIService{}
for _, apiService := range apiServices {
found := false
for _, item := range items {
if item.Name == apiService.Name {
found = true
break
}
}
if !found {
missing = append(missing, *apiService)
}
}
if len(missing) > 0 {
return fmt.Errorf("missing APIService: %v", missing)
}
return nil
}))
return aggregatorServer, nil
}
func makeAPIService(gv schema.GroupVersion) *apiregistration.APIService {
return &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
Spec: apiregistration.APIServiceSpec{
Group: gv.Group,
Version: gv.Version,
Priority: 100,
},
}
}
func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*apiregistration.APIService {
apiServices := []*apiregistration.APIService{}
for _, curr := range delegateAPIServer.ListedPaths() {
if curr == "/api/v1" {
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
registration.AddAPIServiceToSync(apiService)
apiServices = append(apiServices, apiService)
continue
}
if !strings.HasPrefix(curr, "/apis/") {
continue
}
// this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
tokens := strings.Split(curr, "/")
if len(tokens) != 4 {
continue
}
apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
// TODO this is probably an indication that we need explicit and precise control over the discovery chain
// but for now its a special case
// apps has to come last for compatibility with 1.5 kubectl clients
if apiService.Spec.Group == "apps" {
apiService.Spec.Priority = 110
}
registration.AddAPIServiceToSync(apiService)
apiServices = append(apiServices, apiService)
}
return apiServices
}

View File

@ -63,6 +63,9 @@ type ServerRunOptions struct {
ServiceNodePortRange utilnet.PortRange
SSHKeyfile string
SSHUser string
ProxyClientCertFile string
ProxyClientKeyFile string
}
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
@ -200,4 +203,10 @@ func (s *ServerRunOptions) AddFlags(fs *pflag.FlagSet) {
"If true, server will do its best to fix the update request to pass the validation, "+
"e.g., setting empty UID in update request to its existing value. This flag can be turned off "+
"after we fix all the clients that send malformed updates.")
fs.StringVar(&s.ProxyClientCertFile, "proxy-client-cert-file", s.ProxyClientCertFile,
"client certificate used to prove the identity of the aggragator or kube-apiserver when it proxies requests to a user api-server")
fs.StringVar(&s.ProxyClientKeyFile, "proxy-client-key-file", s.ProxyClientKeyFile,
"client certificate key used to prove the identity of the aggragator or kube-apiserver when it proxies requests to a user api-server")
}

View File

@ -43,7 +43,6 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authentication/authenticator"
@ -51,6 +50,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/cmd/kube-apiserver/app/preflight"
"k8s.io/kubernetes/pkg/api"
@ -94,31 +94,52 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(s *options.ServerRunOptions) error {
config, sharedInformers, err := BuildMasterConfig(s)
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
kubeAPIServerConfig, sharedInformers, err := CreateKubeAPIServerConfig(runOptions)
if err != nil {
return err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers, stopCh)
if err != nil {
return err
}
return RunServer(config, sharedInformers, wait.NeverStop)
// if we're starting up a hacked up version of this API server for a weird test case,
// just start the API server as is because clients don't get built correctly when you do this
if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
return kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh)
}
// RunServer uses the provided config and shared informers to run the apiserver. It does not return.
func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
m, err := config.Complete().New()
// otherwise go down the normal path of standing the aggregator up in front of the API server
// this wires up openapi
kubeAPIServer.GenericAPIServer.PrepareRun()
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions)
if err != nil {
return err
}
m.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, stopCh)
if err != nil {
return err
}
return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
}
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New()
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
sharedInformers.Start(stopCh)
return nil
})
return m.GenericAPIServer.PrepareRun().Run(stopCh)
return kubeAPIServer, nil
}
// BuildMasterConfig creates all the resources for running the API server, but runs none of them
func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
// set defaults in the options before trying to create the generic config
if err := defaultOptions(s); err != nil {
return nil, nil, err

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -149,7 +149,7 @@ do
done
if [ "x$GO_OUT" == "x" ]; then
make -C "${KUBE_ROOT}" WHAT="cmd/kubectl cmd/hyperkube vendor/k8s.io/kube-aggregator"
make -C "${KUBE_ROOT}" WHAT="cmd/kubectl cmd/hyperkube"
else
echo "skipped the build."
fi
@ -460,6 +460,8 @@ function start_apiserver {
--requestheader-extra-headers-prefix=X-Remote-Extra- \
--requestheader-client-ca-file="${CERT_DIR}/request-header-ca.crt" \
--requestheader-allowed-names=system:auth-proxy \
--proxy-client-cert-file="${CERT_DIR}/client-auth-proxy.crt" \
--proxy-client-key-file="${CERT_DIR}/client-auth-proxy.key" \
--cors-allowed-origins="${API_CORS_ALLOWED_ORIGINS}" >"${APISERVER_LOG}" 2>&1 &
APISERVER_PID=$!

View File

@ -58,7 +58,7 @@ API_HOST=${API_HOST:-127.0.0.1}
kube::etcd::start
# Start kube-apiserver
# Start kube-apiserver, with alpha api versions on so we can harvest their swagger docs
kube::log::status "Starting kube-apiserver"
"${KUBE_OUTPUT_HOSTBIN}/kube-apiserver" \
--insecure-bind-address="${API_HOST}" \
@ -67,6 +67,8 @@ kube::log::status "Starting kube-apiserver"
--etcd-servers="http://${ETCD_HOST}:${ETCD_PORT}" \
--advertise-address="10.10.10.10" \
--cert-dir="${TMP_DIR}/certs" \
--runtime-config="batch/v2alpha1" \
--runtime-config="autoscaling/v2alpha1" \
--service-cluster-ip-range="10.0.0.0/24" >/tmp/swagger-api-server.log 2>&1 &
APISERVER_PID=$!

View File

@ -203,6 +203,7 @@ var ignoredResources = map[schema.GroupVersionResource]struct{}{
schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "subjectaccessreviews"}: {},
schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "selfsubjectaccessreviews"}: {},
schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {},
schema.GroupVersionResource{Group: "apiregistration.k8s.io", Version: "v1alpha1", Resource: "apiservices"}: {},
}
func (gb *GraphBuilder) enqueueChanges(e *event) {

View File

@ -99,8 +99,7 @@ func NewAutoRegistrationController(tprInformer informers.ThirdPartyResourceInfor
return c
}
func (c *tprRegistrationController) Run(threadiness int, stopCh chan struct{}) {
// don't let panics crash the process
func (c *tprRegistrationController) Run(threadiness int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()

View File

@ -347,10 +347,6 @@ func (c *Config) Complete() completedConfig {
c.FallThroughHandler = mux.NewPathRecorderMux()
}
if c.FallThroughHandler == nil {
c.FallThroughHandler = mux.NewPathRecorderMux()
}
return completedConfig{c}
}

View File

@ -310,6 +310,11 @@ func (s *GenericAPIServer) EffectiveSecurePort() int {
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
glog.Warningf("Skipping API %v because it has no resources.", groupVersion)
continue
}
apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
if apiGroupInfo.OptionsExternalVersion != nil {
apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion

View File

@ -51,13 +51,17 @@ func (m *PathRecorderMux) ListedPaths() []string {
return handledPaths
}
// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) Handle(path string, handler http.Handler) {
func (m *PathRecorderMux) trackCallers(path string) {
if existingStack, ok := m.pathStacks[path]; ok {
utilruntime.HandleError(fmt.Errorf("registered %q from %v", path, existingStack))
}
m.pathStacks[path] = string(debug.Stack())
}
// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) Handle(path string, handler http.Handler) {
m.trackCallers(path)
m.exposedPaths = append(m.exposedPaths, path)
m.mux.Handle(path, handler)
@ -66,10 +70,7 @@ func (m *PathRecorderMux) Handle(path string, handler http.Handler) {
// HandleFunc registers the handler function for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) HandleFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
if existingStack, ok := m.pathStacks[path]; ok {
utilruntime.HandleError(fmt.Errorf("registered %q from %v", path, existingStack))
}
m.pathStacks[path] = string(debug.Stack())
m.trackCallers(path)
m.exposedPaths = append(m.exposedPaths, path)
m.mux.HandleFunc(path, handler)
@ -78,21 +79,15 @@ func (m *PathRecorderMux) HandleFunc(path string, handler func(http.ResponseWrit
// UnlistedHandle registers the handler for the given pattern, but doesn't list it.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) UnlistedHandle(path string, handler http.Handler) {
if existingStack, ok := m.pathStacks[path]; ok {
utilruntime.HandleError(fmt.Errorf("registered %q from %v", path, existingStack))
}
m.pathStacks[path] = string(debug.Stack())
m.mux.Handle(path, handler)
m.trackCallers(path)
m.mux.Handle(path, handler)
}
// UnlistedHandleFunc registers the handler function for the given pattern, but doesn't list it.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) UnlistedHandleFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
if existingStack, ok := m.pathStacks[path]; ok {
utilruntime.HandleError(fmt.Errorf("registered %q from %v", path, existingStack))
}
m.pathStacks[path] = string(debug.Stack())
m.trackCallers(path)
m.mux.HandleFunc(path, handler)
}

View File

@ -107,7 +107,7 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
}
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
c.RESTOptionsGetter = &simpleRestOptionsFactory{Options: *s}
c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}
return nil
}
@ -116,11 +116,11 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
return nil
}
type simpleRestOptionsFactory struct {
type SimpleRestOptionsFactory struct {
Options EtcdOptions
}
func (f *simpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &f.Options.StorageConfig,
Decorator: generic.UndecoratedStorage,

View File

@ -20,7 +20,6 @@ import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -127,10 +126,9 @@ func GetAPIGroupResources(cl DiscoveryInterface) ([]*APIGroupResources, error) {
for _, version := range group.Versions {
resources, err := cl.ServerResourcesForGroupVersion(version.GroupVersion)
if err != nil {
if errors.IsNotFound(err) {
continue // ignore as this can race with deletion of 3rd party APIs
}
return nil, err
// continue as best we can
// TODO track the errors and update callers to handle partial errors.
continue
}
groupResources.VersionedResources[version.Version] = resources.APIResources
}

View File

@ -106,6 +106,9 @@ type APIAggregator struct {
serviceLister v1listers.ServiceLister
// endpointsLister is used by the aggregator handler to determine whether or not to try to expose the group
endpointsLister v1listers.EndpointsLister
// provided for easier embedding
APIRegistrationInformers informers.SharedInformerFactory
}
type completedConfig struct {
@ -114,7 +117,8 @@ type completedConfig struct {
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() completedConfig {
// generic discovery must be disabled so that we can register our own discovery handler
// the kube aggregator wires its own discovery mechanism
// TODO eventually collapse this by extracting all of the discovery out
c.GenericConfig.EnableDiscovery = false
c.GenericConfig.Complete()
@ -131,17 +135,17 @@ func (c *Config) SkipComplete() completedConfig {
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, stopCh <-chan struct{}) (*APIAggregator, error) {
genericServer, err := c.Config.GenericConfig.SkipComplete().NewWithDelegate(delegationTarget) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(
internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
)
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)
genericServer, err := c.Config.GenericConfig.SkipComplete().NewWithDelegate(delegationTarget) // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
}
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
@ -153,6 +157,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
serviceLister: kubeInformers.Core().V1().Services().Lister(),
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
APIRegistrationInformers: informerFactory,
}
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs)
@ -182,7 +187,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
apiserviceRegistrationController.Run(stopCh)
go apiserviceRegistrationController.Run(stopCh)
return nil
})
@ -214,8 +219,8 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
}
proxyHandler.updateAPIService(apiService)
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.FallThroughHandler.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.FallThroughHandler.Handle(proxyPath+"/", proxyHandler)
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(proxyPath+"/", proxyHandler)
// if we're dealing with the legacy group, we're done here
if apiService.Name == legacyAPIServiceName {
@ -235,10 +240,11 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
lister: s.lister,
serviceLister: s.serviceLister,
endpointsLister: s.endpointsLister,
delegate: s.GenericAPIServer.FallThroughHandler,
}
// aggregation is protected
s.GenericAPIServer.FallThroughHandler.UnlistedHandle(groupPath, groupDiscoveryHandler)
s.GenericAPIServer.FallThroughHandler.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(groupPath, groupDiscoveryHandler)
s.GenericAPIServer.HandlerContainer.ServeMux.Handle(groupPath+"/", groupDiscoveryHandler)
s.handledGroups.Insert(apiService.Spec.Group)
}

View File

@ -181,12 +181,14 @@ type apiGroupHandler struct {
serviceLister v1listers.ServiceLister
endpointsLister v1listers.EndpointsLister
delegate http.Handler
}
func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// don't handle URLs that aren't /apis/<groupName>
if req.URL.Path != "/apis/"+r.groupName && req.URL.Path != "/apis/"+r.groupName+"/" {
http.Error(w, "", http.StatusNotFound)
r.delegate.ServeHTTP(w, req)
return
}

View File

@ -320,27 +320,40 @@ func TestAPIGroupMissing(t *testing.T) {
handler := &apiGroupHandler{
codecs: Codecs,
lister: listers.NewAPIServiceLister(indexer),
groupName: "foo",
groupName: "groupName",
delegate: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusForbidden)
}),
}
server := httptest.NewServer(handler)
defer server.Close()
// this call should delegate
resp, err := http.Get(server.URL + "/apis/groupName/foo")
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("expected %v, got %v", resp.StatusCode, http.StatusNotFound)
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("expected %v, got %v", http.StatusForbidden, resp.StatusCode)
}
// foo still has no api services for it (like it was deleted), it should 404
// groupName still has no api services for it (like it was deleted), it should 404
resp, err = http.Get(server.URL + "/apis/groupName/")
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("expected %v, got %v", resp.StatusCode, http.StatusNotFound)
t.Fatalf("expected %v, got %v", http.StatusNotFound, resp.StatusCode)
}
// missing group should delegate still has no api services for it (like it was deleted)
resp, err = http.Get(server.URL + "/apis/missing")
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("expected %v, got %v", http.StatusForbidden, resp.StatusCode)
}
}

View File

@ -110,7 +110,7 @@ func NewAutoRegisterController(apiServiceInformer informers.APIServiceInformer,
return c
}
func (c *autoRegisterController) Run(threadiness int, stopCh chan struct{}) {
func (c *autoRegisterController) Run(threadiness int, stopCh <-chan struct{}) {
// don't let panics crash the process
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end

View File

@ -55,7 +55,9 @@ func (a *APIServer) Start() error {
errCh := make(chan error)
go func() {
defer close(errCh)
err := apiserver.Run(config)
stopCh := make(chan struct{})
defer close(stopCh)
err := apiserver.Run(config, stopCh)
if err != nil {
errCh <- fmt.Errorf("run apiserver error: %v", err)
}

View File

@ -112,13 +112,18 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
kubeAPIServerOptions.Authorization.Mode = "RBAC"
config, sharedInformers, err := app.BuildMasterConfig(kubeAPIServerOptions)
kubeAPIServerConfig, sharedInformers, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
kubeClientConfigValue.Store(config.GenericConfig.LoopbackClientConfig)
kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
if err := app.RunServer(config, sharedInformers, stopCh); err != nil {
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers, wait.NeverStop)
if err != nil {
t.Fatal(err)
}
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)