Apiextensions-apiserver aggregates CRD schemas

efficiently without checking conflicts, and wire up CRD discovery
controller to serve OpenAPI spec.
pull/58/head
Haowei Cai 2018-11-15 11:02:11 -08:00
parent 5bb4b32503
commit 3222a7033c
5 changed files with 318 additions and 8 deletions

View File

@ -41,6 +41,7 @@ import (
"k8s.io/apiextensions-apiserver/pkg/controller/establish"
"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
"k8s.io/apiextensions-apiserver/pkg/controller/status"
openapiaggregator "k8s.io/apiextensions-apiserver/pkg/openapi"
"k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition"
_ "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
@ -204,7 +205,13 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
s.GenericAPIServer.AddPostStartHook("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
go crdController.Run(context.StopCh)
// create OpenAPI aggregation manager in the last step because only now genericapiserver's the OpenAPI services and spec is available (after PrepareRun).
crdOpenAPIAggregationManager, err := openapiaggregator.NewAggregationManager(s.GenericAPIServer.OpenAPIService, s.GenericAPIServer.OpenAPIVersionedService, s.GenericAPIServer.StaticOpenAPISpec)
if err != nil {
return err
}
go crdController.Run(context.StopCh, crdOpenAPIAggregationManager)
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go finalizingController.Run(5, context.StopCh)

View File

@ -31,12 +31,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/discovery"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion"
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion"
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
apiextensionsopenapi "k8s.io/apiextensions-apiserver/pkg/openapi"
)
type DiscoveryController struct {
@ -50,6 +54,8 @@ type DiscoveryController struct {
syncFn func(version schema.GroupVersion) error
queue workqueue.RateLimitingInterface
openAPIAggregationManager apiextensionsopenapi.AggregationManager
}
func NewDiscoveryController(crdInformer informers.CustomResourceDefinitionInformer, versionHandler *versionDiscoveryHandler, groupHandler *groupDiscoveryHandler) *DiscoveryController {
@ -83,6 +89,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
if err != nil {
return err
}
apiServiceName := version.Group + "." + version.Version
foundVersion := false
foundGroup := false
for _, crd := range crds {
@ -119,6 +126,33 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
continue
}
foundVersion = true
if c.openAPIAggregationManager != nil && utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceValidation) {
validationSchema, err := getSchemaForVersion(crd, version.Version)
if err != nil {
return err
}
// Convert internal CustomResourceValidation to versioned CustomResourceValidation
versionedSchema := new(v1beta1.CustomResourceValidation)
if validationSchema == nil {
versionedSchema = nil
} else {
if err := v1beta1.Convert_apiextensions_CustomResourceValidation_To_v1beta1_CustomResourceValidation(validationSchema, versionedSchema, nil); err != nil {
return err
}
}
// We aggregate the schema even if it's nil as it maybe a removal of the schema for this CRD,
// and the aggreated OpenAPI spec should reflect this change.
crdspec, etag, err := apiextensionsopenapi.CustomResourceDefinitionOpenAPISpec(&crd.Spec, version.Version, versionedSchema)
if err != nil {
return err
}
// Add/update the local API service's spec for the CRD in apiExtensionsServer's
// openAPIAggregationManager
if err := c.openAPIAggregationManager.AddUpdateLocalAPIServiceSpec(apiServiceName, crdspec, etag); err != nil {
return err
}
}
verbs := metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"})
// if we're terminating we don't allow some verbs
@ -164,6 +198,14 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
if !foundGroup {
c.groupHandler.unsetDiscovery(version.Group)
c.versionHandler.unsetDiscovery(version)
if c.openAPIAggregationManager != nil && utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceValidation) {
// Remove the local API service for the CRD in apiExtensionsServer's
// openAPIAggregationManager.
// Note that we don't check if apiServiceName exists in openAPIAggregationManager
// because RemoveAPIServiceSpec properly handles non-existing API service by
// returning no error.
return c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName)
}
return nil
}
@ -180,6 +222,14 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
if !foundVersion {
c.versionHandler.unsetDiscovery(version)
if c.openAPIAggregationManager != nil && utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceValidation) {
// Remove the local API service for the CRD in apiExtensionsServer's
// openAPIAggregationManager.
// Note that we don't check if apiServiceName exists in openAPIAggregationManager
// because RemoveAPIServiceSpec properly handles non-existing API service by
// returning no error.
return c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName)
}
return nil
}
c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource {
@ -195,13 +245,15 @@ func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery)
})
}
func (c *DiscoveryController) Run(stopCh <-chan struct{}) {
func (c *DiscoveryController) Run(stopCh <-chan struct{}, crdOpenAPIAggregationManager apiextensionsopenapi.AggregationManager) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
defer klog.Infof("Shutting down DiscoveryController")
klog.Infof("Starting DiscoveryController")
c.openAPIAggregationManager = crdOpenAPIAggregationManager
if !cache.WaitForCacheSync(stopCh, c.crdsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return

View File

@ -0,0 +1,232 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi
import (
"fmt"
"sync"
"github.com/go-openapi/spec"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kube-openapi/pkg/aggregator"
"k8s.io/kube-openapi/pkg/handler"
)
// AggregationManager is the interface between OpenAPI Aggregator service and a controller
// that manages CRD openapi spec aggregation
type AggregationManager interface {
// AddUpdateLocalAPIService allows adding/updating local API service with nil handler and
// nil Spec.Service. This function can be used for local dynamic OpenAPI spec aggregation
// management (e.g. CRD)
AddUpdateLocalAPIServiceSpec(name string, spec *spec.Swagger, etag string) error
RemoveAPIServiceSpec(apiServiceName string) error
}
type specAggregator struct {
// mutex protects all members of this struct.
rwMutex sync.RWMutex
// Map of API Services' OpenAPI specs by their name
openAPISpecs map[string]*openAPISpecInfo
// provided for dynamic OpenAPI spec
openAPIService *handler.OpenAPIService
openAPIVersionedService *handler.OpenAPIService
}
var _ AggregationManager = &specAggregator{}
// NewAggregationManager constructs a specAggregator from input openAPIService, openAPIVersionedService and
// recorded static OpenAPI spec. The function returns an AggregationManager interface.
func NewAggregationManager(openAPIService, openAPIVersionedService *handler.OpenAPIService, staticSpec *spec.Swagger) (AggregationManager, error) {
// openAPIVersionedService and deprecated openAPIService should be initialized together
if (openAPIService == nil) != (openAPIVersionedService == nil) {
return nil, fmt.Errorf("unexpected openapi service initialization error")
}
return &specAggregator{
openAPISpecs: map[string]*openAPISpecInfo{
"initial_static_spec": {
spec: staticSpec,
},
},
openAPIService: openAPIService,
openAPIVersionedService: openAPIVersionedService,
}, nil
}
// openAPISpecInfo is used to store OpenAPI spec with its priority.
// It can be used to sort specs with their priorities.
type openAPISpecInfo struct {
// Name of a registered ApiService
name string
// Specification of this API Service. If null then the spec is not loaded yet.
spec *spec.Swagger
etag string
}
// buildOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
specs := []openAPISpecInfo{}
for _, specInfo := range s.openAPISpecs {
if specInfo.spec == nil {
continue
}
specs = append(specs, *specInfo)
}
if len(specs) == 0 {
return &spec.Swagger{}, nil
}
for _, specInfo := range specs {
if specToReturn == nil {
specToReturn, err = aggregator.CloneSpec(specInfo.spec)
if err != nil {
return nil, err
}
continue
}
mergeSpecs(specToReturn, specInfo.spec)
}
// Add minimum required keys if missing, to properly serve the OpenAPI spec
// through apiextensions-apiserver HTTP handler. These keys will not be
// aggregated to top-level OpenAPI spec (only paths and definitions will).
// However these keys make the OpenAPI->proto serialization happy.
if specToReturn.Info == nil {
specToReturn.Info = &spec.Info{
InfoProps: spec.InfoProps{
Title: "Kubernetes",
},
}
}
if len(specToReturn.Swagger) == 0 {
specToReturn.Swagger = "2.0"
}
return specToReturn, nil
}
// updateOpenAPISpec aggregates all OpenAPI specs. It is not thread-safe. The caller is responsible to hold proper locks.
func (s *specAggregator) updateOpenAPISpec() error {
if s.openAPIService == nil || s.openAPIVersionedService == nil {
// openAPIVersionedService and deprecated openAPIService should be initialized together
if !(s.openAPIService == nil && s.openAPIVersionedService == nil) {
return fmt.Errorf("unexpected openapi service initialization error")
}
return nil
}
specToServe, err := s.buildOpenAPISpec()
if err != nil {
return err
}
// openAPIService.UpdateSpec and openAPIVersionedService.UpdateSpec read the same swagger spec
// serially and update their local caches separately. Both endpoints will have same spec in
// their caches if the caller is holding proper locks.
err = s.openAPIService.UpdateSpec(specToServe)
if err != nil {
return err
}
return s.openAPIVersionedService.UpdateSpec(specToServe)
}
// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact
// if the update fails.
func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error {
orgSpecInfo, exists := s.openAPISpecs[specInfo.name]
s.openAPISpecs[specInfo.name] = specInfo
if err := s.updateOpenAPISpec(); err != nil {
if exists {
s.openAPISpecs[specInfo.name] = orgSpecInfo
} else {
delete(s.openAPISpecs, specInfo.name)
}
return err
}
return nil
}
// tryDeleteServiceSpecs tries delete specified specInfo from openAPISpecs map, and keeps the map intact
// if the update fails.
func (s *specAggregator) tryDeleteServiceSpecs(apiServiceName string) error {
orgSpecInfo, exists := s.openAPISpecs[apiServiceName]
if !exists {
return nil
}
delete(s.openAPISpecs, apiServiceName)
if err := s.updateOpenAPISpec(); err != nil {
s.openAPISpecs[apiServiceName] = orgSpecInfo
return err
}
return nil
}
// AddUpdateLocalAPIService allows adding/updating local API service with nil handler and
// nil Spec.Service. This function can be used for local dynamic OpenAPI spec aggregation
// management (e.g. CRD)
func (s *specAggregator) AddUpdateLocalAPIServiceSpec(name string, spec *spec.Swagger, etag string) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
return s.tryUpdatingServiceSpecs(&openAPISpecInfo{
name: name,
spec: spec,
etag: etag,
})
}
// RemoveAPIServiceSpec removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
// It is thread safe.
func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
if _, existingService := s.openAPISpecs[apiServiceName]; !existingService {
return nil
}
return s.tryDeleteServiceSpecs(apiServiceName)
}
// mergeSpecs simply adds source openapi spec to dest and ignores any path/definition
// conflicts because CRD openapi spec should not have conflict
func mergeSpecs(dest, source *spec.Swagger) {
// Paths may be empty, due to [ACL constraints](http://goo.gl/8us55a#securityFiltering).
if source.Paths == nil {
// If Path is nil, none of the model defined in Definitions is used and we
// should not do anything.
// NOTE: this should not happen for CRD specs, because we automatically construct
// the Paths for CRD specs. We use utilruntime.HandleError to log this impossible
// case
utilruntime.HandleError(fmt.Errorf("unexpected CRD spec with empty Path: %v", *source))
return
}
if dest.Paths == nil {
dest.Paths = &spec.Paths{}
}
for k, v := range source.Definitions {
if dest.Definitions == nil {
dest.Definitions = spec.Definitions{}
}
dest.Definitions[k] = v
}
for k, v := range source.Paths.Paths {
// PathItem may be empty, due to [ACL constraints](http://goo.gl/8us55a#securityFiltering).
if dest.Paths.Paths == nil {
dest.Paths.Paths = map[string]spec.PathItem{}
}
dest.Paths.Paths[k] = v
}
}

View File

@ -25,7 +25,8 @@ import (
"time"
systemd "github.com/coreos/go-systemd/daemon"
"github.com/emicklei/go-restful-swagger12"
swagger "github.com/emicklei/go-restful-swagger12"
"github.com/go-openapi/spec"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/api/meta"
@ -47,6 +48,7 @@ import (
restclient "k8s.io/client-go/rest"
openapibuilder "k8s.io/kube-openapi/pkg/builder"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
openapiutil "k8s.io/kube-openapi/pkg/util"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
)
@ -124,6 +126,11 @@ type GenericAPIServer struct {
swaggerConfig *swagger.Config
openAPIConfig *openapicommon.Config
// Expose the registered OpenAPI Services and built static OpenAPI spec if openAPIConfig is non-nil
OpenAPIService *handler.OpenAPIService // for endpoint /swagger.json
OpenAPIVersionedService *handler.OpenAPIService // for endpoint /openapi/v2
StaticOpenAPISpec *spec.Swagger
// PostStartHooks are each called after the server has started listening, in a separate go func for each
// with no guarantee of ordering between them. The map key is a name used for error reporting.
// It may kill the process with a panic if it wishes to by returning an error.
@ -240,7 +247,7 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
routes.Swagger{Config: s.swaggerConfig}.Install(s.Handler.GoRestfulContainer)
}
if s.openAPIConfig != nil {
routes.OpenAPI{
s.OpenAPIService, s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{
Config: s.openAPIConfig,
}.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
}

View File

@ -18,9 +18,11 @@ package routes
import (
restful "github.com/emicklei/go-restful"
"github.com/go-openapi/spec"
"k8s.io/klog"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/kube-openapi/pkg/builder"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
)
@ -30,17 +32,27 @@ type OpenAPI struct {
Config *common.Config
}
// Install adds the SwaggerUI webservice to the given mux.
func (oa OpenAPI) Install(c *restful.Container, mux *mux.PathRecorderMux) {
// Install adds the SwaggerUI webservice to the given mux. This function returns
// the built static OpenAPI spec and the registered OpenAPI services to allow
// further OpenAPI spec aggregation.
func (oa OpenAPI) Install(c *restful.Container, mux *mux.PathRecorderMux) (openAPIService, openAPIVersionedService *handler.OpenAPIService, spec *spec.Swagger) {
var err error
// Record the static OpenAPI spec to allow further OpenAPI spec aggregation
// with this static spec on the registered OpenAPI services
spec, err = builder.BuildOpenAPISpec(c.RegisteredWebServices(), oa.Config)
if err != nil {
klog.Fatalf("Failed to build open api spec for root: %v", err)
}
// NOTE: [DEPRECATION] We will announce deprecation for format-separated endpoints for OpenAPI spec,
// and switch to a single /openapi/v2 endpoint in Kubernetes 1.10. The design doc and deprecation process
// are tracked at: https://docs.google.com/document/d/19lEqE9lc4yHJ3WJAJxS_G7TcORIJXGHyq3wpwcH28nU.
_, err := handler.BuildAndRegisterOpenAPIService("/swagger.json", c.RegisteredWebServices(), oa.Config, mux)
openAPIService, err = handler.RegisterOpenAPIService(spec, "/swagger.json", mux)
if err != nil {
klog.Fatalf("Failed to register open api spec for root: %v", err)
}
_, err = handler.BuildAndRegisterOpenAPIVersionedService("/openapi/v2", c.RegisteredWebServices(), oa.Config, mux)
openAPIVersionedService, err = handler.RegisterOpenAPIVersionedService(spec, "/openapi/v2", mux)
if err != nil {
klog.Fatalf("Failed to register versioned open api spec for root: %v", err)
}
return openAPIService, openAPIVersionedService, spec
}