From 0339ef7961051b7d7775dd2bf27ecb301fb6ba76 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Tue, 26 Jul 2016 17:32:42 -0400 Subject: [PATCH] Fix usage of shared informer in namespace admission controllers --- pkg/admission/handler.go | 41 ++++ pkg/admission/types.go | 2 +- .../namespace/autoprovision/admission.go | 20 +- .../namespace/autoprovision/admission_test.go | 197 ++++++++++-------- .../admission/namespace/exists/admission.go | 19 +- .../namespace/exists/admission_test.go | 101 +++++++++ 6 files changed, 277 insertions(+), 103 deletions(-) diff --git a/pkg/admission/handler.go b/pkg/admission/handler.go index b4f2c57755..f19f5721ee 100644 --- a/pkg/admission/handler.go +++ b/pkg/admission/handler.go @@ -17,13 +17,25 @@ limitations under the License. package admission import ( + "time" + "k8s.io/kubernetes/pkg/util/sets" ) +const ( + // timeToWaitForReady is the amount of time to wait to let an admission controller to be ready to satisfy a request. + // this is useful when admission controllers need to warm their caches before letting requests through. + timeToWaitForReady = 10 * time.Second +) + +// ReadyFunc is a function that returns true if the admission controller is ready to handle requests. +type ReadyFunc func() bool + // Handler is a base for admission control handlers that // support a predefined set of operations type Handler struct { operations sets.String + readyFunc ReadyFunc } // Handles returns true for methods that this handler supports @@ -42,3 +54,32 @@ func NewHandler(ops ...Operation) *Handler { operations: operations, } } + +// SetReadyFunc allows late registration of a ReadyFunc to know if the handler is ready to process requests. +func (h *Handler) SetReadyFunc(readyFunc ReadyFunc) { + h.readyFunc = readyFunc +} + +// WaitForReady will wait for the readyFunc (if registered) to return ready, and in case of timeout, will return false. +func (h *Handler) WaitForReady() bool { + // there is no ready func configured, so we return immediately + if h.readyFunc == nil { + return true + } + return h.waitForReadyInternal(time.After(timeToWaitForReady)) +} + +func (h *Handler) waitForReadyInternal(timeout <-chan time.Time) bool { + // there is no configured ready func, so return immediately + if h.readyFunc == nil { + return true + } + for !h.readyFunc() { + select { + case <-time.After(100 * time.Millisecond): + case <-timeout: + return h.readyFunc() + } + } + return true +} diff --git a/pkg/admission/types.go b/pkg/admission/types.go index 4de01016eb..76b802124c 100644 --- a/pkg/admission/types.go +++ b/pkg/admission/types.go @@ -26,7 +26,7 @@ type Validator interface { Validate() error } -// WantsNamespaceInformer defines a function which sets NamespaceInformer for admission plugins that need it +// WantsInformerFactory defines a function which sets InformerFactory for admission plugins that need it type WantsInformerFactory interface { SetInformerFactory(informers.SharedInformerFactory) Validator diff --git a/plugin/pkg/admission/namespace/autoprovision/admission.go b/plugin/pkg/admission/namespace/autoprovision/admission.go index 3f248ef598..4b06f63df1 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission.go @@ -22,9 +22,11 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "fmt" + "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework/informers" ) @@ -39,8 +41,8 @@ func init() { // It is useful in deployments that do not want to restrict creation of a namespace prior to its usage. type provision struct { *admission.Handler - client clientset.Interface - informerFactory informers.SharedInformerFactory + client clientset.Interface + namespaceInformer framework.SharedIndexInformer } var _ = admission.WantsInformerFactory(&provision{}) @@ -52,7 +54,10 @@ func (p *provision) Admit(a admission.Attributes) (err error) { if len(a.GetNamespace()) == 0 || a.GetKind().GroupKind() == api.Kind("Namespace") { return nil } - + // we need to wait for our caches to warm + if !p.WaitForReady() { + return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) + } namespace := &api.Namespace{ ObjectMeta: api.ObjectMeta{ Name: a.GetNamespace(), @@ -60,7 +65,7 @@ func (p *provision) Admit(a admission.Attributes) (err error) { }, Status: api.NamespaceStatus{}, } - _, exists, err := p.informerFactory.Namespaces().Informer().GetStore().Get(namespace) + _, exists, err := p.namespaceInformer.GetStore().Get(namespace) if err != nil { return admission.NewForbidden(a, err) } @@ -83,12 +88,13 @@ func NewProvision(c clientset.Interface) admission.Interface { } func (p *provision) SetInformerFactory(f informers.SharedInformerFactory) { - p.informerFactory = f + p.namespaceInformer = f.Namespaces().Informer() + p.SetReadyFunc(p.namespaceInformer.HasSynced) } func (p *provision) Validate() error { - if p.informerFactory == nil { - return fmt.Errorf("namespace autoprovision plugin needs SharedInformerFactory") + if p.namespaceInformer == nil { + return fmt.Errorf("missing namespaceInformer") } return nil } diff --git a/plugin/pkg/admission/namespace/autoprovision/admission_test.go b/plugin/pkg/admission/namespace/autoprovision/admission_test.go index 93440f6f94..3b96b77b30 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission_test.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission_test.go @@ -17,12 +17,15 @@ limitations under the License. package autoprovision import ( + "fmt" "testing" "time" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller/framework/informers" @@ -30,124 +33,140 @@ import ( "k8s.io/kubernetes/pkg/util/wait" ) -// TestAdmission verifies a namespace is created on create requests for namespace managed resources -func TestAdmission(t *testing.T) { - namespace := "test" +// newHandlerForTest returns the admission controller configured for testing. +func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { + f := informers.NewSharedInformerFactory(c, 5*time.Minute) + handler := NewProvision(c) + plugins := []admission.Interface{handler} + pluginInitializer := admission.NewPluginInitializer(f) + pluginInitializer.Initialize(plugins) + err := admission.Validate(plugins) + return handler, f, err +} + +// newMockClientForTest creates a mock client that returns a client configured for the specified list of namespaces. +func newMockClientForTest(namespaces []string) *fake.Clientset { mockClient := &fake.Clientset{} - informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) - informerFactory.Namespaces() - informerFactory.Start(wait.NeverStop) - handler := &provision{ - client: mockClient, - informerFactory: informerFactory, - } - pod := api.Pod{ + mockClient.AddReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) { + namespaceList := &api.NamespaceList{ + ListMeta: unversioned.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", len(namespaces)), + }, + } + for i, ns := range namespaces { + namespaceList.Items = append(namespaceList.Items, api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: ns, + ResourceVersion: fmt.Sprintf("%d", i), + }, + }) + } + return true, namespaceList, nil + }) + return mockClient +} + +// newPod returns a new pod for the specified namespace +func newPod(namespace string) api.Pod { + return api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, Spec: api.PodSpec{ Volumes: []api.Volume{{Name: "vol"}}, Containers: []api.Container{{Name: "ctr", Image: "image"}}, }, } - err := handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) +} + +// hasCreateNamespaceAction returns true if it has the create namespace action +func hasCreateNamespaceAction(mockClient *fake.Clientset) bool { + for _, action := range mockClient.Actions() { + if action.GetVerb() == "create" && action.GetResource().Resource == "namespaces" { + return true + } + } + return false +} + +// TestAdmission verifies a namespace is created on create requests for namespace managed resources +func TestAdmission(t *testing.T) { + namespace := "test" + mockClient := newMockClientForTest([]string{}) + handler, informerFactory, err := newHandlerForTest(mockClient) if err != nil { - t.Errorf("Unexpected error returned from admission handler") + t.Errorf("unexpected error initializing handler: %v", err) } - actions := mockClient.Actions() - if len(actions) != 1 { - t.Errorf("Expected a create-namespace request") + informerFactory.Start(wait.NeverStop) + + pod := newPod(namespace) + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("unexpected error returned from admission handler") } - if !actions[0].Matches("create", "namespaces") { - t.Errorf("Expected a create-namespace request to be made via the client") + if !hasCreateNamespaceAction(mockClient) { + t.Errorf("expected create namespace action") } } // TestAdmissionNamespaceExists verifies that no client call is made when a namespace already exists -// func TestAdmissionNamespaceExists(t *testing.T) { -// namespace := "test" -// mockClient := &fake.Clientset{} -// informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) -// informerFactory.Namespaces().Informer().GetStore().Add(&api.Namespace{ -// ObjectMeta: api.ObjectMeta{Name: namespace}, -// }) -// informerFactory.Start(wait.NeverStop) -// handler := &provision{ -// client: mockClient, -// informerFactory: informerFactory, -// } -// pod := api.Pod{ -// ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, -// Spec: api.PodSpec{ -// Volumes: []api.Volume{{Name: "vol"}}, -// Containers: []api.Container{{Name: "ctr", Image: "image"}}, -// }, -// } -// err := handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) -// if err != nil { -// t.Errorf("Unexpected error returned from admission handler") -// } -// if len(mockClient.Actions()) != 0 { -// t.Errorf("No client request should have been made") -// } -// } +func TestAdmissionNamespaceExists(t *testing.T) { + namespace := "test" + mockClient := newMockClientForTest([]string{namespace}) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) + + pod := newPod(namespace) + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("unexpected error returned from admission handler") + } + if hasCreateNamespaceAction(mockClient) { + t.Errorf("unexpected create namespace action") + } +} // TestIgnoreAdmission validates that a request is ignored if its not a create func TestIgnoreAdmission(t *testing.T) { namespace := "test" - mockClient := &fake.Clientset{} - handler := admission.NewChainHandler(NewProvision(mockClient)) - pod := api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, - Spec: api.PodSpec{ - Volumes: []api.Volume{{Name: "vol"}}, - Containers: []api.Container{{Name: "ctr", Image: "image"}}, - }, - } - err := handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Update, nil)) + mockClient := newMockClientForTest([]string{}) + handler, informerFactory, err := newHandlerForTest(mockClient) if err != nil { - t.Errorf("Unexpected error returned from admission handler") + t.Errorf("unexpected error initializing handler: %v", err) } - if len(mockClient.Actions()) != 0 { - t.Errorf("No client request should have been made") + informerFactory.Start(wait.NeverStop) + chainHandler := admission.NewChainHandler(handler) + + pod := newPod(namespace) + err = chainHandler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Update, nil)) + if err != nil { + t.Errorf("unexpected error returned from admission handler") + } + if hasCreateNamespaceAction(mockClient) { + t.Errorf("unexpected create namespace action") } } -// TestAdmissionNamespaceExistsUnknownToHandler -func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) { +func TestAdmissionWithLatentCache(t *testing.T) { namespace := "test" - mockClient := &fake.Clientset{} + mockClient := newMockClientForTest([]string{}) mockClient.AddReactor("create", "namespaces", func(action core.Action) (bool, runtime.Object, error) { return true, nil, errors.NewAlreadyExists(api.Resource("namespaces"), namespace) }) - informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) - informerFactory.Namespaces() + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } informerFactory.Start(wait.NeverStop) - handler := &provision{ - client: mockClient, - informerFactory: informerFactory, - } - pod := api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, - Spec: api.PodSpec{ - Volumes: []api.Volume{{Name: "vol"}}, - Containers: []api.Container{{Name: "ctr", Image: "image"}}, - }, - } - err := handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) - if err != nil { - t.Errorf("Unexpected error returned from admission handler") - } -} -// TestAdmissionNamespaceValidate -func TestAdmissionNamespaceValidate(t *testing.T) { - mockClient := &fake.Clientset{} - informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) - handler := &provision{ - client: mockClient, - } - handler.SetInformerFactory(informerFactory) - err := handler.Validate() + pod := newPod(namespace) + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) if err != nil { - t.Errorf("Failed to initialize informer") + t.Errorf("unexpected error returned from admission handler") + } + + if !hasCreateNamespaceAction(mockClient) { + t.Errorf("expected create namespace action") } } diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index ab031ee607..e20e3b4ea7 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -22,9 +22,11 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "fmt" + "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework/informers" ) @@ -39,8 +41,8 @@ func init() { // It is useful in deployments that want to enforce pre-declaration of a Namespace resource. type exists struct { *admission.Handler - client clientset.Interface - informerFactory informers.SharedInformerFactory + client clientset.Interface + namespaceInformer framework.SharedIndexInformer } var _ = admission.WantsInformerFactory(&exists{}) @@ -53,6 +55,10 @@ func (e *exists) Admit(a admission.Attributes) (err error) { return nil } + // we need to wait for our caches to warm + if !e.WaitForReady() { + return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) + } namespace := &api.Namespace{ ObjectMeta: api.ObjectMeta{ Name: a.GetNamespace(), @@ -60,7 +66,7 @@ func (e *exists) Admit(a admission.Attributes) (err error) { }, Status: api.NamespaceStatus{}, } - _, exists, err := e.informerFactory.Namespaces().Informer().GetStore().Get(namespace) + _, exists, err := e.namespaceInformer.GetStore().Get(namespace) if err != nil { return errors.NewInternalError(err) } @@ -89,12 +95,13 @@ func NewExists(c clientset.Interface) admission.Interface { } func (e *exists) SetInformerFactory(f informers.SharedInformerFactory) { - e.informerFactory = f + e.namespaceInformer = f.Namespaces().Informer() + e.SetReadyFunc(e.namespaceInformer.HasSynced) } func (e *exists) Validate() error { - if e.informerFactory == nil { - return fmt.Errorf("namespace exists plugin needs a namespace informer") + if e.namespaceInformer == nil { + return fmt.Errorf("missing namespaceInformer") } return nil } diff --git a/plugin/pkg/admission/namespace/exists/admission_test.go b/plugin/pkg/admission/namespace/exists/admission_test.go index 5153a5d2df..f2b832b65f 100644 --- a/plugin/pkg/admission/namespace/exists/admission_test.go +++ b/plugin/pkg/admission/namespace/exists/admission_test.go @@ -15,3 +15,104 @@ limitations under the License. */ package exists + +import ( + "fmt" + "testing" + "time" + + "k8s.io/kubernetes/pkg/admission" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/controller/framework/informers" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" +) + +// newHandlerForTest returns the admission controller configured for testing. +func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { + f := informers.NewSharedInformerFactory(c, 5*time.Minute) + handler := NewExists(c) + plugins := []admission.Interface{handler} + pluginInitializer := admission.NewPluginInitializer(f) + pluginInitializer.Initialize(plugins) + err := admission.Validate(plugins) + return handler, f, err +} + +// newMockClientForTest creates a mock client that returns a client configured for the specified list of namespaces. +func newMockClientForTest(namespaces []string) *fake.Clientset { + mockClient := &fake.Clientset{} + mockClient.AddReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) { + namespaceList := &api.NamespaceList{ + ListMeta: unversioned.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", len(namespaces)), + }, + } + for i, ns := range namespaces { + namespaceList.Items = append(namespaceList.Items, api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: ns, + ResourceVersion: fmt.Sprintf("%d", i), + }, + }) + } + return true, namespaceList, nil + }) + return mockClient +} + +// newPod returns a new pod for the specified namespace +func newPod(namespace string) api.Pod { + return api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, + Spec: api.PodSpec{ + Volumes: []api.Volume{{Name: "vol"}}, + Containers: []api.Container{{Name: "ctr", Image: "image"}}, + }, + } +} + +// TestAdmissionNamespaceExists verifies pod is admitted only if namespace exists. +func TestAdmissionNamespaceExists(t *testing.T) { + namespace := "test" + mockClient := newMockClientForTest([]string{namespace}) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) + + pod := newPod(namespace) + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("unexpected error returned from admission handler") + } +} + +// TestAdmissionNamespaceDoesNotExist verifies pod is not admitted if namespace does not exist. +func TestAdmissionNamespaceDoesNotExist(t *testing.T) { + namespace := "test" + mockClient := newMockClientForTest([]string{}) + mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("nope, out of luck") + }) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) + + pod := newPod(namespace) + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err == nil { + actions := "" + for _, action := range mockClient.Actions() { + actions = actions + action.GetVerb() + ":" + action.GetResource().Resource + ":" + action.GetSubresource() + ", " + } + t.Errorf("expected error returned from admission handler: %v", actions) + } +}