Load initializers from dynamic config

Handle failure cases on startup gracefully to avoid causing cascading
errors and poor initialization in other components. Initial errors from
config load cause the initializer to pause and hold requests. Return
typed errors to better communicate failures to clients.

Add code to handle two specific cases - admin wants to bypass
initialization defaulting, and mirror pods (which want to bypass
initialization because the kubelet owns their lifecycle).
pull/6/head
Clayton Coleman 2017-05-30 22:58:57 -04:00
parent 034f06d7e4
commit 772ab8e1b4
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
23 changed files with 617 additions and 62 deletions

View File

@ -25,6 +25,7 @@ go_library(
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/networking:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/cloudprovider:go_default_library",

View File

@ -60,6 +60,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/networking"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/cloudprovider"
@ -379,6 +380,10 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
}
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create external clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
genericConfig.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, storageFactory, client, sharedInformers)
@ -397,6 +402,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
pluginInitializer, err := BuildAdmissionPluginInitializer(
s,
client,
externalClient,
sharedInformers,
genericConfig.Authorizer,
)
@ -414,7 +420,7 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
}
// BuildAdmissionPluginInitializer constructs the admission plugin initializer
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, externalClient clientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
@ -432,7 +438,7 @@ func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client interna
// do not require us to open watches for all items tracked by quota.
quotaRegistry := quotainstall.NewRegistry(nil, nil)
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry)
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, restMapper, quotaRegistry)
// Read client cert/key for plugins that need to make calls out
if len(s.ProxyClientCertFile) > 0 && len(s.ProxyClientKeyFile) > 0 {

View File

@ -41,6 +41,7 @@ go_library(
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/install:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/cloudprovider/providers:go_default_library",

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/generated/openapi"
@ -180,6 +181,10 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("failed to create clientset: %v", err)
}
externalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return fmt.Errorf("failed to create external clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
@ -199,7 +204,7 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
// NOTE: we do not provide informers to the quota registry because admission level decisions
// do not require us to open watches for all items tracked by quota.
quotaRegistry := quotainstall.NewRegistry(nil, nil)
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry)
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, externalClient, sharedInformers, apiAuthorizer, cloudConfig, nil, quotaRegistry)
err = s.Admission.ApplyTo(
genericConfig,

View File

@ -26,6 +26,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/apis/admissionregistration:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/quota:go_default_library",

View File

@ -29,6 +29,8 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -30,6 +30,11 @@ const (
defaultFailureThreshold = 5
)
var (
ErrNotReady = fmt.Errorf("configuration is not ready")
ErrDisabled = fmt.Errorf("disabled")
)
type getFunc func() (runtime.Object, error)
// When running, poller calls `get` every `interval`. If `get` is
@ -52,7 +57,8 @@ type poller struct {
ready bool
mergedConfiguration runtime.Object
// lock much be hold when reading ready or mergedConfiguration
lock sync.RWMutex
lock sync.RWMutex
lastErr error
}
func newPoller(get getFunc) *poller {
@ -63,6 +69,12 @@ func newPoller(get getFunc) *poller {
}
}
func (a *poller) lastError(err error) {
a.lock.Lock()
defer a.lock.Unlock()
a.lastErr = err
}
func (a *poller) notReady() {
a.lock.Lock()
defer a.lock.Unlock()
@ -73,7 +85,10 @@ func (a *poller) configuration() (runtime.Object, error) {
a.lock.RLock()
defer a.lock.RUnlock()
if !a.ready {
return nil, fmt.Errorf("configuration is not ready")
if a.lastErr != nil {
return nil, a.lastErr
}
return nil, ErrNotReady
}
return a.mergedConfiguration, nil
}
@ -83,6 +98,7 @@ func (a *poller) setConfigurationAndReady(value runtime.Object) {
defer a.lock.Unlock()
a.mergedConfiguration = value
a.ready = true
a.lastErr = nil
}
func (a *poller) Run(stopCh <-chan struct{}) {
@ -93,6 +109,7 @@ func (a *poller) sync() {
configuration, err := a.get()
if err != nil {
a.failures++
a.lastError(err)
if a.failures >= a.failureThreshold {
a.notReady()
}

View File

@ -33,6 +33,20 @@ type ExternalAdmissionHookConfigurationManager struct {
*poller
}
func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager {
getFn := func() (runtime.Object, error) {
list, err := c.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
return mergeExternalAdmissionHookConfigurations(list), nil
}
return &ExternalAdmissionHookConfigurationManager{
newPoller(getFn),
}
}
// ExternalAdmissionHooks returns the merged ExternalAdmissionHookConfiguration.
func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*v1alpha1.ExternalAdmissionHookConfiguration, error) {
configuration, err := im.poller.configuration()
@ -46,17 +60,8 @@ func (im *ExternalAdmissionHookConfigurationManager) ExternalAdmissionHooks() (*
return externalAdmissionHookConfiguration, nil
}
func NewExternalAdmissionHookConfigurationManager(c ExternalAdmissionHookConfigurationLister) *ExternalAdmissionHookConfigurationManager {
getFn := func() (runtime.Object, error) {
list, err := c.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
return mergeExternalAdmissionHookConfigurations(list), nil
}
return &ExternalAdmissionHookConfigurationManager{
newPoller(getFn)}
func (im *ExternalAdmissionHookConfigurationManager) Run(stopCh <-chan struct{}) {
im.poller.Run(stopCh)
}
func mergeExternalAdmissionHookConfigurations(

View File

@ -21,6 +21,9 @@ import (
"reflect"
"sort"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
@ -34,6 +37,23 @@ type InitializerConfigurationManager struct {
*poller
}
func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager {
getFn := func() (runtime.Object, error) {
list, err := c.List(metav1.ListOptions{})
if err != nil {
if errors.IsNotFound(err) || errors.IsForbidden(err) {
glog.V(5).Infof("Initializers are disabled due to an error: %v", err)
return nil, ErrDisabled
}
return nil, err
}
return mergeInitializerConfigurations(list), nil
}
return &InitializerConfigurationManager{
newPoller(getFn),
}
}
// Initializers returns the merged InitializerConfiguration.
func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.InitializerConfiguration, error) {
configuration, err := im.poller.configuration()
@ -47,16 +67,8 @@ func (im *InitializerConfigurationManager) Initializers() (*v1alpha1.Initializer
return initializerConfiguration, nil
}
func NewInitializerConfigurationManager(c InitializerConfigurationLister) *InitializerConfigurationManager {
getFn := func() (runtime.Object, error) {
list, err := c.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
return mergeInitializerConfigurations(list), nil
}
return &InitializerConfigurationManager{
newPoller(getFn)}
func (im *InitializerConfigurationManager) Run(stopCh <-chan struct{}) {
im.poller.Run(stopCh)
}
func mergeInitializerConfigurations(initializerConfigurationList *v1alpha1.InitializerConfigurationList) *v1alpha1.InitializerConfiguration {

View File

@ -57,7 +57,7 @@ var _ WantsAuthorizer = &WantAuthorizerAdmission{}
// TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer
// interface is implemented.
func TestWantsAuthorizer(t *testing.T) {
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, nil, nil, nil)
initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, nil, nil, nil)
wantAuthorizerAdmission := &WantAuthorizerAdmission{}
initializer.Initialize(wantAuthorizerAdmission)
if wantAuthorizerAdmission.auth == nil {
@ -76,7 +76,7 @@ func (self *WantsCloudConfigAdmissionPlugin) SetCloudConfig(cloudConfig []byte)
func TestCloudConfigAdmissionPlugin(t *testing.T) {
cloudConfig := []byte("cloud-configuration")
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil)
initializer := NewPluginInitializer(nil, nil, nil, &TestAuthorizer{}, cloudConfig, nil, nil)
wantsCloudConfigAdmission := &WantsCloudConfigAdmissionPlugin{}
initializer.Initialize(wantsCloudConfigAdmission)

View File

@ -23,6 +23,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/kubernetes/pkg/apis/admissionregistration"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/quota"
@ -36,6 +37,12 @@ type WantsInternalKubeClientSet interface {
admission.Validator
}
// WantsExternalKubeClientSet defines a function which sets ClientSet for admission plugins that need it
type WantsExternalKubeClientSet interface {
SetExternalKubeClientSet(clientset.Interface)
admission.Validator
}
// WantsInternalKubeInformerFactory defines a function which sets InformerFactory for admission plugins that need it
type WantsInternalKubeInformerFactory interface {
SetInternalKubeInformerFactory(informers.SharedInformerFactory)
@ -95,6 +102,7 @@ type WebhookSource interface {
type PluginInitializer struct {
internalClient internalclientset.Interface
externalClient clientset.Interface
informers informers.SharedInformerFactory
authorizer authorizer.Authorizer
cloudConfig []byte
@ -113,14 +121,18 @@ var _ admission.PluginInitializer = &PluginInitializer{}
// NewPluginInitializer constructs new instance of PluginInitializer
// TODO: switch these parameters to use the builder pattern or just make them
// all public, this construction method is pointless boilerplate.
func NewPluginInitializer(internalClient internalclientset.Interface,
func NewPluginInitializer(
internalClient internalclientset.Interface,
externalClient clientset.Interface,
sharedInformers informers.SharedInformerFactory,
authz authorizer.Authorizer,
cloudConfig []byte,
restMapper meta.RESTMapper,
quotaRegistry quota.Registry) *PluginInitializer {
quotaRegistry quota.Registry,
) *PluginInitializer {
return &PluginInitializer{
internalClient: internalClient,
externalClient: externalClient,
informers: sharedInformers,
authorizer: authz,
cloudConfig: cloudConfig,
@ -157,6 +169,10 @@ func (i *PluginInitializer) Initialize(plugin admission.Interface) {
wants.SetInternalKubeClientSet(i.internalClient)
}
if wants, ok := plugin.(WantsExternalKubeClientSet); ok {
wants.SetExternalKubeClientSet(i.externalClient)
}
if wants, ok := plugin.(WantsInternalKubeInformerFactory); ok {
wants.SetInternalKubeInformerFactory(i.informers)
}

View File

@ -74,7 +74,7 @@ func newGCPermissionsEnforcement() *gcPermissionsEnforcement {
Handler: admission.NewHandler(admission.Create, admission.Update),
whiteList: whiteList,
}
pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil)
pluginInitializer := kubeadmission.NewPluginInitializer(nil, nil, nil, fakeAuthorizer{}, nil, api.Registry.RESTMapper(), nil)
pluginInitializer.Initialize(gcAdmit)
return gcAdmit
}

View File

@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -12,6 +13,10 @@ go_library(
srcs = ["initialization.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubeapiserver/admission/configuration:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
@ -19,6 +24,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
],
@ -36,3 +42,14 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["initialization_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
],
)

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -19,6 +19,12 @@ package initialization
import (
"fmt"
"io"
"net"
"net/url"
"os"
"strings"
"syscall"
"time"
"github.com/golang/glog"
@ -28,8 +34,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/kubeapiserver/admission/configuration"
)
// Register registers a plugin
@ -43,38 +54,131 @@ type initializerOptions struct {
Initializers []string
}
type InitializationConfig interface {
Run(stopCh <-chan struct{})
Initializers() (*v1alpha1.InitializerConfiguration, error)
}
type initializer struct {
resources map[schema.GroupResource]initializerOptions
config InitializationConfig
authorizer authorizer.Authorizer
}
// NewAlwaysAdmit creates a new always admit admission handler
// Retry config loading failures for up to four and a half seconds if the config hasn't been loaded
// yet or if the server is down. Creation requests are delayed during this interval, which prevents
// racy failures during startup until the initializer configuration becomes available.
// TODO: move into InitializationConfigurationManager, since these values depend on the config
// refresh loop.
const (
retryTemporaryConfigFailures = 8
retryTemporaryConfigInterval = 550 * time.Millisecond
)
// NewInitializer creates a new initializer plugin which assigns newly created resources initializers
// based on configuration loaded from the admission API group.
// FUTURE: this may be moved to the storage layer of the apiserver, but for now this is an alpha feature
// that can be disabled.
func NewInitializer() admission.Interface {
return &initializer{
resources: map[schema.GroupResource]initializerOptions{
//schema.GroupResource{Resource: "pods"}: {Initializers: []string{"Test"}},
},
}
return &initializer{}
}
func (i *initializer) Validate() error {
if i.config == nil {
return fmt.Errorf("the Initializer admission plugin requires a Kubernetes client to be provided")
}
i.config.Run(wait.NeverStop)
return nil
}
func (i *initializer) SetExternalKubeClientSet(client clientset.Interface) {
i.config = configuration.NewInitializerConfigurationManager(client.Admissionregistration().InitializerConfigurations())
}
func (i *initializer) SetAuthorizer(a authorizer.Authorizer) {
i.authorizer = a
}
var initializerFieldPath = field.NewPath("metadata", "initializers")
// temporaryConnectionError returns true if the error is considered temporary
func temporaryConnectionError(err error) bool {
if urlError, ok := err.(*url.Error); ok {
if urlError.Temporary() {
return true
}
if opError, ok := urlError.Err.(*net.OpError); ok {
if syscallError, ok := opError.Err.(*os.SyscallError); ok {
if errno, ok := syscallError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
return true
}
}
}
}
return false
}
// readConfigWithRetry holds requests instead of failing them if the server is not yet initialized
// or is unresponsive. It formats the returned error for client use if necessary.
func (i *initializer) readConfigWithRetry(a admission.Attributes) (*v1alpha1.InitializerConfiguration, error) {
var lastErr error
for count := 0; count < retryTemporaryConfigFailures; count++ {
if count > 0 {
time.Sleep(retryTemporaryConfigInterval)
}
// read initializers from config
config, err := i.config.Initializers()
if err == nil {
return config, nil
}
// if initializer configuration is disabled, fail open
if err == configuration.ErrDisabled {
return &v1alpha1.InitializerConfiguration{}, nil
}
// retry certain errors
lastErr = err
if err != configuration.ErrNotReady && !temporaryConnectionError(err) {
break
}
}
e := errors.NewServerTimeout(a.GetResource().GroupResource(), "create", 1)
if lastErr == configuration.ErrNotReady {
e.ErrStatus.Message = fmt.Sprintf("Waiting for initialization configuration to load: %v", lastErr)
e.ErrStatus.Reason = "LoadingConfiguration"
e.ErrStatus.Details.Causes = append(e.ErrStatus.Details.Causes, metav1.StatusCause{
Type: "InitializerConfigurationPending",
Message: "The server is waiting for the initializer configuration to be loaded.",
})
} else {
e.ErrStatus.Message = fmt.Sprintf("Unable to refresh the initializer configuration: %v", lastErr)
e.ErrStatus.Reason = "LoadingConfiguration"
e.ErrStatus.Details.Causes = append(e.ErrStatus.Details.Causes, metav1.StatusCause{
Type: "InitializerConfigurationFailure",
Message: "An error has occurred while refreshing the initializer configuration, no resources can be created until a refresh succeeds.",
})
}
return nil, e
}
// Admit checks for create requests to add initializers, or update request to enforce invariants.
// The admission controller fails open if the object doesn't have ObjectMeta (can't be initialized).
// A client with sufficient permission ("initialize" verb on resource) can specify its own initializers
// or an empty initializers struct (which bypasses initialization). Only clients with the initialize verb
// can update objects that have not completed initialization. Sub resources can still be modified on
// resources that are undergoing initialization.
// TODO: once this logic is ready for beta, move it into the REST storage layer.
func (i *initializer) Admit(a admission.Attributes) (err error) {
// TODO: sub-resource action should be denied until the object is initialized
if len(a.GetSubresource()) > 0 {
switch a.GetOperation() {
case admission.Create, admission.Update:
default:
return nil
}
resource, ok := i.resources[a.GetResource().GroupResource()]
if !ok {
// TODO: should sub-resource action should be denied until the object is initialized?
if len(a.GetSubresource()) > 0 {
return nil
}
@ -87,15 +191,44 @@ func (i *initializer) Admit(a admission.Attributes) (err error) {
return nil
}
existing := accessor.GetInitializers()
// it must be possible for some users to bypass initialization - for now, check the initialize operation
if existing != nil {
if err := i.canInitialize(a); err != nil {
glog.V(5).Infof("Admin bypassing initialization for %s", a.GetResource())
// it must be possible for some users to bypass initialization - for now, check the initialize operation
if err := i.canInitialize(a, "create with initializers denied"); err != nil {
return err
}
}
// allow administrators to bypass initialization by setting an empty initializers struct
if len(existing.Pending) == 0 && existing.Result == nil {
accessor.SetInitializers(nil)
return nil
}
} else {
glog.V(5).Infof("Checking initialization for %s", a.GetResource())
// TODO: pull this from config
accessor.SetInitializers(copiedInitializers(resource.Initializers))
config, err := i.readConfigWithRetry(a)
if err != nil {
return err
}
// Mirror pods are exempt from initialization because they are created and initialized
// on the Kubelet before they appear in the API.
// TODO: once this moves to REST storage layer, this becomes a pod specific concern
if pod, ok := a.GetObject().(*api.Pod); ok && pod != nil {
if _, isMirror := pod.Annotations[api.MirrorPodAnnotationKey]; isMirror {
return nil
}
}
names := findInitializers(config, a.GetResource())
if len(names) == 0 {
glog.V(5).Infof("No initializers needed")
return nil
}
glog.V(5).Infof("Found initializers for %s: %v", a.GetResource(), names)
accessor.SetInitializers(newInitializers(names))
}
case admission.Update:
accessor, err := meta.Accessor(a.GetObject())
@ -113,13 +246,20 @@ func (i *initializer) Admit(a admission.Attributes) (err error) {
}
existing := existingAccessor.GetInitializers()
// updates on initialized resources are allowed
if updated == nil && existing == nil {
return nil
}
glog.V(5).Infof("Modifying uninitialized resource %s", a.GetResource())
// because we are called before validation, we need to ensure the update transition is valid.
if errs := validation.ValidateInitializersUpdate(updated, existing, initializerFieldPath); len(errs) > 0 {
return errors.NewInvalid(a.GetKind().GroupKind(), a.GetName(), errs)
}
// caller must have the ability to mutate un-initialized resources
if err := i.canInitialize(a); err != nil {
if err := i.canInitialize(a, "update to uninitialized resource denied"); err != nil {
return err
}
@ -129,7 +269,7 @@ func (i *initializer) Admit(a admission.Attributes) (err error) {
return nil
}
func (i *initializer) canInitialize(a admission.Attributes) error {
func (i *initializer) canInitialize(a admission.Attributes, message string) error {
// if no authorizer is present, the initializer plugin allows modification of uninitialized resources
if i.authorizer == nil {
glog.V(4).Infof("No authorizer provided to initialization admission control, unable to check permissions")
@ -150,16 +290,17 @@ func (i *initializer) canInitialize(a admission.Attributes) error {
return err
}
if !authorized {
return fmt.Errorf("user must have permission to initialize resources: %s", reason)
return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("%s: %s", message, reason))
}
return nil
}
func (i *initializer) Handles(op admission.Operation) bool {
return true
return op == admission.Create || op == admission.Update
}
func copiedInitializers(names []string) *metav1.Initializers {
// newInitializers populates an Initializers struct.
func newInitializers(names []string) *metav1.Initializers {
if len(names) == 0 {
return nil
}
@ -171,3 +312,71 @@ func copiedInitializers(names []string) *metav1.Initializers {
Pending: init,
}
}
// findInitializers returns the list of initializer names that apply to a config. It returns an empty list
// if no initializers apply.
func findInitializers(initializers *v1alpha1.InitializerConfiguration, gvr schema.GroupVersionResource) []string {
var names []string
for _, init := range initializers.Initializers {
if !matchRule(init.Rules, gvr) {
continue
}
names = append(names, init.Name)
}
return names
}
// matchRule returns true if any rule matches the provided group version resource.
func matchRule(rules []v1alpha1.Rule, gvr schema.GroupVersionResource) bool {
for _, rule := range rules {
if !hasGroup(rule.APIGroups, gvr.Group) {
return false
}
if !hasVersion(rule.APIVersions, gvr.Version) {
return false
}
if !hasResource(rule.Resources, gvr.Resource) {
return false
}
}
return len(rules) > 0
}
func hasGroup(groups []string, group string) bool {
if groups[0] == "*" {
return true
}
for _, g := range groups {
if g == group {
return true
}
}
return false
}
func hasVersion(versions []string, version string) bool {
if versions[0] == "*" {
return true
}
for _, v := range versions {
if v == version {
return true
}
}
return false
}
func hasResource(resources []string, resource string) bool {
if resources[0] == "*" || resources[0] == "*/*" {
return true
}
for _, r := range resources {
if strings.Contains(r, "/") {
continue
}
if r == resource {
return true
}
}
return false
}

View File

@ -0,0 +1,99 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package initialization
import (
"reflect"
"testing"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
)
func newInitializer(name string, rules ...v1alpha1.Rule) *v1alpha1.InitializerConfiguration {
return addInitializer(&v1alpha1.InitializerConfiguration{}, name, rules...)
}
func addInitializer(base *v1alpha1.InitializerConfiguration, name string, rules ...v1alpha1.Rule) *v1alpha1.InitializerConfiguration {
base.Initializers = append(base.Initializers, v1alpha1.Initializer{
Name: name,
Rules: rules,
})
return base
}
func TestFindInitializers(t *testing.T) {
type args struct {
initializers *v1alpha1.InitializerConfiguration
gvr schema.GroupVersionResource
}
tests := []struct {
name string
args args
want []string
}{
{
name: "empty",
args: args{
gvr: schema.GroupVersionResource{},
initializers: newInitializer("1"),
},
},
{
name: "everything",
args: args{
gvr: schema.GroupVersionResource{},
initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{"*"}, APIVersions: []string{"*"}, Resources: []string{"*"}}),
},
want: []string{"1"},
},
{
name: "empty group",
args: args{
gvr: schema.GroupVersionResource{},
initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"*"}}),
},
want: []string{"1"},
},
{
name: "pod",
args: args{
gvr: schema.GroupVersionResource{Resource: "pods"},
initializers: addInitializer(
newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}),
"2", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}},
),
},
want: []string{"1", "2"},
},
{
name: "multiple matches",
args: args{
gvr: schema.GroupVersionResource{Resource: "pods"},
initializers: newInitializer("1", v1alpha1.Rule{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}),
},
want: []string{"1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := findInitializers(tt.args.initializers, tt.args.gvr); !reflect.DeepEqual(got, tt.want) {
t.Errorf("findInitializers() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -595,7 +595,7 @@ func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.Sh
if err != nil {
return nil, f, err
}
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.Validate(handler)
return handler, f, err

View File

@ -38,7 +38,7 @@ import (
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewProvision()
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.Validate(handler)
return handler, f, err

View File

@ -37,7 +37,7 @@ import (
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewExists()
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.Validate(handler)
return handler, f, err

View File

@ -191,7 +191,7 @@ func TestHandles(t *testing.T) {
func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewPodNodeSelector(nil)
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.Validate(handler)
return handler, f, err

View File

@ -193,7 +193,7 @@ func newHandlerForTest(c clientset.Interface) (*podTolerationsPlugin, informers.
return nil, nil, err
}
handler := NewPodTolerationsPlugin(pluginConfig)
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
pluginInitializer := kubeadmission.NewPluginInitializer(c, nil, f, nil, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.Validate(handler)
return handler, f, err

View File

@ -41,7 +41,7 @@ type AdmissionOptions struct {
func NewAdmissionOptions() *AdmissionOptions {
options := &AdmissionOptions{
Plugins: &admission.Plugins{},
PluginNames: []string{"Initializers"},
PluginNames: []string{},
}
server.RegisterAllAdmissionPlugins(options.Plugins)
return options

View File

@ -13,6 +13,9 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/apis/admissionregistration/v1alpha1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/retry:go_default_library",
"//test/e2e/framework:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",

View File

@ -23,10 +23,14 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/admissionregistration/v1alpha1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
clientretry "k8s.io/kubernetes/pkg/client/retry"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -65,10 +69,19 @@ var _ = framework.KubeDescribe("Initializers", func() {
// verify that we can update an initializing pod
pod, err := c.Core().Pods(ns).Get(podName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
pod.Annotations = map[string]string{"update-1": "test"}
pod, err = c.Core().Pods(ns).Update(pod)
Expect(err).NotTo(HaveOccurred())
// verify the list call filters out uninitialized pods
pods, err := c.Core().Pods(ns).List(metav1.ListOptions{IncludeUninitialized: true})
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(1))
pods, err = c.Core().Pods(ns).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(0))
// clear initializers
pod.Initializers = nil
pod, err = c.Core().Pods(ns).Update(pod)
@ -93,17 +106,120 @@ var _ = framework.KubeDescribe("Initializers", func() {
}
})
It("should dynamically register and apply initializers to pods [Serial]", func() {
ns := f.Namespace.Name
c := f.ClientSet
podName := "uninitialized-pod"
framework.Logf("Creating pod %s", podName)
// create and register an initializer
initializerName := "pod.test.e2e.kubernetes.io"
initializerConfigName := "e2e-test-initializer"
_, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(&v1alpha1.InitializerConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName},
Initializers: []v1alpha1.Initializer{
{
Name: initializerName,
Rules: []v1alpha1.Rule{
{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}},
},
},
},
})
Expect(err).NotTo(HaveOccurred())
// we must remove the initializer when the test is complete and ensure no pods are pending for that initializer
defer func() {
if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) {
framework.Logf("got error on deleting %s", initializerConfigName)
}
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
// clear our initializer from anyone who got it
removeInitializersFromAllPods(c, initializerName)
}()
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
// run create that blocks
ch := make(chan struct{})
go func() {
defer close(ch)
_, err := c.Core().Pods(ns).Create(newPod(podName))
Expect(err).NotTo(HaveOccurred())
}()
// wait until the pod shows up uninitialized
By("Waiting until the pod is visible to a client")
var pod *v1.Pod
err = wait.PollImmediate(2*time.Second, 15*time.Second, func() (bool, error) {
pod, err = c.Core().Pods(ns).Get(podName, metav1.GetOptions{IncludeUninitialized: true})
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
})
Expect(err).NotTo(HaveOccurred())
Expect(pod.Initializers).NotTo(BeNil())
Expect(pod.Initializers.Pending).To(HaveLen(1))
Expect(pod.Initializers.Pending[0].Name).To(Equal(initializerName))
// pretend we are an initializer
By("Completing initialization")
pod.Initializers = nil
pod, err = c.Core().Pods(ns).Update(pod)
Expect(err).NotTo(HaveOccurred())
// ensure create call returns
<-ch
// pod should now start running
err = framework.WaitForPodRunningInNamespace(c, pod)
Expect(err).NotTo(HaveOccurred())
// bypass initialization by explicitly passing an empty pending list
By("Setting an empty initializer as an admin to bypass initialization")
podName = "preinitialized-pod"
pod = newUninitializedPod(podName)
pod.Initializers.Pending = nil
pod, err = c.Core().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred())
Expect(pod.Initializers).To(BeNil())
// bypass initialization for mirror pods
By("Creating a mirror pod that bypasses initialization")
podName = "mirror-pod"
pod = newPod(podName)
pod.Annotations = map[string]string{
v1.MirrorPodAnnotationKey: "true",
}
pod.Spec.NodeName = "node-does-not-yet-exist"
pod, err = c.Core().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred())
Expect(pod.Initializers).To(BeNil())
Expect(pod.Annotations[v1.MirrorPodAnnotationKey]).To(Equal("true"))
})
})
func newUninitializedPod(podName string) *v1.Pod {
pod := newPod(podName)
pod.Initializers = &metav1.Initializers{
Pending: []metav1.Initializer{{Name: "Test"}},
}
return pod
}
func newPod(podName string) *v1.Pod {
containerName := fmt.Sprintf("%s-container", podName)
port := 8080
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Initializers: &metav1.Initializers{
Pending: []metav1.Initializer{{Name: "Test"}},
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@ -119,3 +235,48 @@ func newUninitializedPod(podName string) *v1.Pod {
}
return pod
}
// removeInitializersFromAllPods walks all pods and ensures they don't have the provided initializer,
// to guarantee completing the test doesn't block the entire cluster.
func removeInitializersFromAllPods(c clientset.Interface, initializerName string) {
pods, err := c.Core().Pods("").List(metav1.ListOptions{IncludeUninitialized: true})
if err != nil {
return
}
for _, p := range pods.Items {
if p.Initializers == nil {
continue
}
err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
pod, err := c.Core().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{IncludeUninitialized: true})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if pod.Initializers == nil {
return nil
}
var updated []metav1.Initializer
for _, pending := range pod.Initializers.Pending {
if pending.Name != initializerName {
updated = append(updated, pending)
}
}
if len(updated) == len(pod.Initializers.Pending) {
return nil
}
pod.Initializers.Pending = updated
if len(updated) == 0 {
pod.Initializers = nil
}
framework.Logf("Found initializer on pod %s in ns %s", pod.Name, pod.Namespace)
_, err = c.Core().Pods(p.Namespace).Update(pod)
return err
})
if err != nil {
framework.Logf("Unable to remove initializer from pod %s in ns %s: %v", p.Name, p.Namespace, err)
}
}
}