add GenericAPIServer posthooks for initialization

pull/6/head
deads2k 2016-08-26 11:06:27 -04:00
parent a089791f3e
commit 7d1f13d3e0
9 changed files with 275 additions and 6 deletions

View File

@ -199,6 +199,7 @@ plugin/pkg/auth/authenticator/password/allow
plugin/pkg/auth/authenticator/request/basicauth
plugin/pkg/auth/authenticator/request/union
plugin/pkg/auth/authorizer
plugin/pkg/auth/authorizer/rbac/bootstrappolicy
plugin/pkg/client/auth
plugin/pkg/client/auth/gcp
test/e2e/cleanup

View File

@ -26,6 +26,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
systemd "github.com/coreos/go-systemd/daemon"
@ -163,6 +164,13 @@ type GenericAPIServer struct {
enableOpenAPISupport bool
openAPIInfo spec.Info
openAPIDefaultResponse spec.Response
// PostStartHooks are each called after the server has started listening, in a separate go func for each
// with no guaranteee of ordering between them. The map key is a name used for error reporting.
// It may kill the process with a panic if it wishes to by returning an error
PostStartHooks map[string]PostStartHookFunc
postStartHookLock sync.Mutex
postStartHooksCalled bool
}
func (s *GenericAPIServer) StorageDecorator() generic.StorageDecorator {
@ -276,6 +284,7 @@ func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {
return time.After(globalTimeout), ""
}
secureStartedCh := make(chan struct{})
if secureLocation != "" {
handler := apiserver.TimeoutHandler(apiserver.RecoverPanics(s.Handler), longRunningTimeout)
secureServer := &http.Server{
@ -322,6 +331,8 @@ func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {
go func() {
defer utilruntime.HandleCrash()
notifyStarted := sync.Once{}
for {
// err == systemd.SdNotifyNoSocket when not running on a systemd system
if err := systemd.SdNotify("READY=1\n"); err != nil && err != systemd.SdNotifyNoSocket {
@ -329,6 +340,10 @@ func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {
}
if err := secureServer.ListenAndServeTLS(options.TLSCertFile, options.TLSPrivateKeyFile); err != nil {
glog.Errorf("Unable to listen for secure (%v); will try again.", err)
} else {
notifyStarted.Do(func() {
close(secureStartedCh)
})
}
time.Sleep(15 * time.Second)
}
@ -338,6 +353,7 @@ func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {
if err := systemd.SdNotify("READY=1\n"); err != nil && err != systemd.SdNotifyNoSocket {
glog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
close(secureStartedCh)
}
handler := apiserver.TimeoutHandler(apiserver.RecoverPanics(s.InsecureHandler), longRunningTimeout)
@ -347,16 +363,28 @@ func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {
MaxHeaderBytes: 1 << 20,
}
insecureStartedCh := make(chan struct{})
glog.Infof("Serving insecurely on %s", insecureLocation)
go func() {
defer utilruntime.HandleCrash()
notifyStarted := sync.Once{}
for {
if err := http.ListenAndServe(); err != nil {
glog.Errorf("Unable to listen for insecure (%v); will try again.", err)
} else {
notifyStarted.Do(func() {
close(insecureStartedCh)
})
}
time.Sleep(15 * time.Second)
}
}()
<-secureStartedCh
<-insecureStartedCh
s.RunPostStartHooks(PostStartHookContext{})
select {}
}

View File

@ -46,10 +46,10 @@ import (
)
// setUp is a convience function for setting up for (most) tests.
func setUp(t *testing.T) (GenericAPIServer, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
func setUp(t *testing.T) (*GenericAPIServer, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
genericapiserver := GenericAPIServer{}
genericapiserver := &GenericAPIServer{}
config := Config{}
config.PublicAddress = net.ParseIP("192.168.10.4")
config.RequestContextMapper = api.NewRequestContextMapper()

View File

@ -0,0 +1,93 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package genericapiserver
import (
"fmt"
"github.com/golang/glog"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)
// PostStartHookFunc is a function that is called after the server has started.
// It must properly handle cases like:
// 1. asynchronous start in multiple API server processes
// 2. conflicts between the different processes all trying to perform the same action
// 3. partially complete work (API server crashes while running your hook)
// 4. API server access **BEFORE** your hook has completed
// Think of it like a mini-controller that is super privileged and gets to run in-process
// If you use this feature, tag @deads2k on github who has promised to review code for anyone's PostStartHook
// until it becomes easier to use.
type PostStartHookFunc func(context PostStartHookContext) error
// PostStartHookContext provides information about this API server to a PostStartHookFunc
type PostStartHookContext struct {
// TODO this should probably contain a cluster-admin powered client config which can be used to loopback
// to this API server. That client config doesn't exist yet.
}
// AddPostStartHook allows you to add a PostStartHook.
func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) error {
if len(name) == 0 {
return fmt.Errorf("missing name")
}
if hook == nil {
return nil
}
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
if s.postStartHooksCalled {
return fmt.Errorf("unable to add %q because PostStartHooks have already been called", name)
}
if s.PostStartHooks == nil {
s.PostStartHooks = map[string]PostStartHookFunc{}
}
if _, exists := s.PostStartHooks[name]; exists {
return fmt.Errorf("unable to add %q because it is already registered", name)
}
s.PostStartHooks[name] = hook
return nil
}
// RunPostStartHooks runs the PostStartHooks for the server
func (s *GenericAPIServer) RunPostStartHooks(context PostStartHookContext) {
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
s.postStartHooksCalled = true
for hookName, hook := range s.PostStartHooks {
go runPostStartHook(hookName, hook, context)
}
}
func runPostStartHook(name string, hook PostStartHookFunc, context PostStartHookContext) {
var err error
func() {
// don't let the hook *accidentally* panic and kill the server
defer utilruntime.HandleCrash()
err = hook(context)
}()
// if the hook intentionally wants to kill server, let it.
if err != nil {
glog.Fatalf("PostStartHook %q failed: %v", name, err)
}
}

View File

@ -167,7 +167,12 @@ type thirdPartyEntry struct {
type RESTOptionsGetter func(resource unversioned.GroupResource) generic.RESTOptions
type RESTStorageProvider interface {
NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool)
NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (groupInfo genericapiserver.APIGroupInfo, enabled bool)
}
// PostStartHookProvider is an interface in addition to provide a post start hook for the api server
type PostStartHookProvider interface {
PostStartHook() (string, genericapiserver.PostStartHookFunc, error)
}
// New returns a new instance of Master from the given config.
@ -215,7 +220,7 @@ func New(c *Config) (*Master, error) {
DisableThirdPartyControllerForTesting: m.disableThirdPartyControllerForTesting,
}
c.RESTStorageProviders[policy.GroupName] = PolicyRESTStorageProvider{}
c.RESTStorageProviders[rbac.GroupName] = RBACRESTStorageProvider{AuthorizerRBACSuperUser: c.AuthorizerRBACSuperUser}
c.RESTStorageProviders[rbac.GroupName] = &RBACRESTStorageProvider{AuthorizerRBACSuperUser: c.AuthorizerRBACSuperUser}
c.RESTStorageProviders[storage.GroupName] = StorageRESTStorageProvider{}
c.RESTStorageProviders[authenticationv1beta1.GroupName] = AuthenticationRESTStorageProvider{Authenticator: c.Authenticator}
c.RESTStorageProviders[authorization.GroupName] = AuthorizationRESTStorageProvider{Authorizer: c.Authorizer}
@ -304,6 +309,16 @@ func (m *Master) InstallAPIs(c *Config) {
}
glog.V(1).Infof("Enabling API group %q.", group)
if postHookProvider, ok := restStorageBuilder.(PostStartHookProvider); ok {
name, hook, err := postHookProvider.PostStartHook()
if err != nil {
glog.Fatalf("Error building PostStartHook: %v", err)
}
if err := m.GenericAPIServer.AddPostStartHook(name, hook); err != nil {
glog.Fatalf("Error registering PostStartHook %q: %v", name, err)
}
}
// This is here so that, if the policy group is present, the eviction
// subresource handler wil be able to find poddisruptionbudgets
// TODO(lavalamp) find a better way for groups to discover and interact

View File

@ -17,8 +17,12 @@ limitations under the License.
package master
import (
"fmt"
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/apis/rbac"
rbacapiv1alpha1 "k8s.io/kubernetes/pkg/apis/rbac/v1alpha1"
@ -36,15 +40,20 @@ import (
"k8s.io/kubernetes/pkg/registry/rolebinding"
rolebindingetcd "k8s.io/kubernetes/pkg/registry/rolebinding/etcd"
rolebindingpolicybased "k8s.io/kubernetes/pkg/registry/rolebinding/policybased"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac/bootstrappolicy"
)
type RBACRESTStorageProvider struct {
AuthorizerRBACSuperUser string
postStartHook genericapiserver.PostStartHookFunc
}
var _ RESTStorageProvider = &RBACRESTStorageProvider{}
var _ PostStartHookProvider = &RBACRESTStorageProvider{}
func (p RBACRESTStorageProvider) NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool) {
func (p *RBACRESTStorageProvider) NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(rbac.GroupName)
if apiResourceConfigSource.AnyResourcesForVersionEnabled(rbacapiv1alpha1.SchemeGroupVersion) {
@ -55,7 +64,7 @@ func (p RBACRESTStorageProvider) NewRESTStorage(apiResourceConfigSource generica
return apiGroupInfo, true
}
func (p RBACRESTStorageProvider) v1alpha1Storage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) map[string]rest.Storage {
func (p *RBACRESTStorageProvider) v1alpha1Storage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) map[string]rest.Storage {
version := rbacapiv1alpha1.SchemeGroupVersion
once := new(sync.Once)
@ -84,6 +93,8 @@ func (p RBACRESTStorageProvider) v1alpha1Storage(apiResourceConfigSource generic
if apiResourceConfigSource.ResourceEnabled(version.WithResource("clusterroles")) {
clusterRolesStorage := clusterroleetcd.NewREST(restOptionsGetter(rbac.Resource("clusterroles")))
storage["clusterroles"] = clusterrolepolicybased.NewStorage(clusterRolesStorage, newRuleValidator(), p.AuthorizerRBACSuperUser)
p.postStartHook = newPostStartHook(clusterRolesStorage)
}
if apiResourceConfigSource.ResourceEnabled(version.WithResource("clusterrolebindings")) {
clusterRoleBindingsStorage := clusterrolebindingetcd.NewREST(restOptionsGetter(rbac.Resource("clusterrolebindings")))
@ -91,3 +102,35 @@ func (p RBACRESTStorageProvider) v1alpha1Storage(apiResourceConfigSource generic
}
return storage
}
func (p *RBACRESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return "rbac/bootstrap-roles", p.postStartHook, nil
}
func newPostStartHook(directClusterRoleAccess *clusterroleetcd.REST) genericapiserver.PostStartHookFunc {
return func(genericapiserver.PostStartHookContext) error {
ctx := api.NewContext()
existingClusterRoles, err := directClusterRoleAccess.List(ctx, &api.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to initialize clusterroles: %v", err))
return nil
}
// if clusterroles already exist, then assume we don't have work to do because we've already
// initialized or another API server has started this task
if len(existingClusterRoles.(*rbac.ClusterRoleList).Items) > 0 {
return nil
}
for _, clusterRole := range bootstrappolicy.ClusterRoles() {
if _, err := directClusterRoleAccess.Create(ctx, &clusterRole); err != nil {
// don't fail on failures, try to create as many as you can
utilruntime.HandleError(fmt.Errorf("unable to initialize clusterroles: %v", err))
continue
}
glog.Infof("Created clusterrole.%s/%s", rbac.GroupName, clusterRole.Name)
}
return nil
}
}

View File

@ -0,0 +1,36 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bootstrappolicy
import (
"k8s.io/kubernetes/pkg/api"
rbacapi "k8s.io/kubernetes/pkg/apis/rbac"
)
// ClusterRoles returns the cluster roles to bootstrap an API server with
func ClusterRoles() []rbacapi.ClusterRole {
return []rbacapi.ClusterRole{
// TODO update the expression of these rules to match openshift for ease of inspection
{
ObjectMeta: api.ObjectMeta{Name: "cluster-admin"},
Rules: []rbacapi.PolicyRule{
{Verbs: []string{"*"}, APIGroups: []string{"*"}, Resources: []string{"*"}},
{Verbs: []string{"*"}, NonResourceURLs: []string{"*"}},
},
},
}
}

View File

@ -29,9 +29,11 @@ import (
"net/http/httputil"
"strings"
"testing"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
@ -41,6 +43,8 @@ import (
"k8s.io/kubernetes/pkg/auth/authenticator/bearertoken"
"k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/auth/user"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/transport"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/registry/clusterrole"
@ -52,6 +56,7 @@ import (
roleetcd "k8s.io/kubernetes/pkg/registry/role/etcd"
"k8s.io/kubernetes/pkg/registry/rolebinding"
rolebindingetcd "k8s.io/kubernetes/pkg/registry/rolebinding/etcd"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac"
"k8s.io/kubernetes/test/integration/framework"
)
@ -469,3 +474,46 @@ func TestRBAC(t *testing.T) {
}
}
}
func TestBootstrapping(t *testing.T) {
superUser := "admin"
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.Authorizer = newRBACAuthorizer(t, superUser, masterConfig)
masterConfig.Authenticator = newFakeAuthenticator()
masterConfig.AuthorizerRBACSuperUser = superUser
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
clientset := clientset.NewForConfigOrDie(&restclient.Config{BearerToken: superUser, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
watcher, err := clientset.Rbac().ClusterRoles().Watch(api.ListOptions{ResourceVersion: "0"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
_, err = watch.Until(30*time.Second, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Added {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
clusterRoles, err := clientset.Rbac().ClusterRoles().List(api.ListOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(clusterRoles.Items) == 0 {
t.Fatalf("missing cluster roles")
}
for _, clusterRole := range clusterRoles.Items {
if clusterRole.Name == "cluster-admin" {
return
}
}
t.Errorf("missing cluster-admin: %v", clusterRoles)
}

View File

@ -155,6 +155,11 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se
glog.Fatalf("error in bringing up the master: %v", err)
}
// TODO have this start method actually use the normal start sequence for the API server
// this method never actually calls the `Run` method for the API server
// fire the post hooks ourselves
m.GenericAPIServer.RunPostStartHooks(genericapiserver.PostStartHookContext{})
return m, s
}