From 6eb082d6b0d13097281af0467b1c16c13b1016c3 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 20:55:31 -0700 Subject: [PATCH 1/9] Generate networkpolicies extensions/v1beta1 client --- staging/src/k8s.io/api/extensions/v1beta1/types.go | 1 + 1 file changed, 1 insertion(+) diff --git a/staging/src/k8s.io/api/extensions/v1beta1/types.go b/staging/src/k8s.io/api/extensions/v1beta1/types.go index c34ea599d6..562d7742a8 100644 --- a/staging/src/k8s.io/api/extensions/v1beta1/types.go +++ b/staging/src/k8s.io/api/extensions/v1beta1/types.go @@ -1186,6 +1186,7 @@ type PodSecurityPolicyList struct { Items []PodSecurityPolicy `json:"items" protobuf:"bytes,2,rep,name=items"` } +// +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // DEPRECATED 1.9 - This group version of NetworkPolicy is deprecated by networking/v1/NetworkPolicy. From 0f9ebe5e16ab892cbcd27c884cea9b57b49a5c5a Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 20:55:43 -0700 Subject: [PATCH 2/9] Generated files --- .../informers/extensions/v1beta1/BUILD | 1 + .../informers/extensions/v1beta1/interface.go | 7 + .../extensions/v1beta1/networkpolicy.go | 89 +++++++++ .../src/k8s.io/client-go/informers/generic.go | 2 + .../kubernetes/typed/extensions/v1beta1/BUILD | 1 + .../extensions/v1beta1/extensions_client.go | 5 + .../typed/extensions/v1beta1/fake/BUILD | 1 + .../v1beta1/fake/fake_extensions_client.go | 4 + .../v1beta1/fake/fake_networkpolicy.go | 128 +++++++++++++ .../extensions/v1beta1/generated_expansion.go | 2 + .../typed/extensions/v1beta1/networkpolicy.go | 174 ++++++++++++++++++ .../listers/extensions/v1beta1/BUILD | 1 + .../extensions/v1beta1/expansion_generated.go | 8 + .../extensions/v1beta1/networkpolicy.go | 94 ++++++++++ 14 files changed, 517 insertions(+) create mode 100644 staging/src/k8s.io/client-go/informers/extensions/v1beta1/networkpolicy.go create mode 100644 staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/fake_networkpolicy.go create mode 100644 staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/networkpolicy.go create mode 100644 staging/src/k8s.io/client-go/listers/extensions/v1beta1/networkpolicy.go diff --git a/staging/src/k8s.io/client-go/informers/extensions/v1beta1/BUILD b/staging/src/k8s.io/client-go/informers/extensions/v1beta1/BUILD index ac71d05585..ca0d7db4c8 100644 --- a/staging/src/k8s.io/client-go/informers/extensions/v1beta1/BUILD +++ b/staging/src/k8s.io/client-go/informers/extensions/v1beta1/BUILD @@ -12,6 +12,7 @@ go_library( "deployment.go", "ingress.go", "interface.go", + "networkpolicy.go", "podsecuritypolicy.go", "replicaset.go", ], diff --git a/staging/src/k8s.io/client-go/informers/extensions/v1beta1/interface.go b/staging/src/k8s.io/client-go/informers/extensions/v1beta1/interface.go index a259d27ae3..6f0bea7e87 100644 --- a/staging/src/k8s.io/client-go/informers/extensions/v1beta1/interface.go +++ b/staging/src/k8s.io/client-go/informers/extensions/v1beta1/interface.go @@ -30,6 +30,8 @@ type Interface interface { Deployments() DeploymentInformer // Ingresses returns a IngressInformer. Ingresses() IngressInformer + // NetworkPolicies returns a NetworkPolicyInformer. + NetworkPolicies() NetworkPolicyInformer // PodSecurityPolicies returns a PodSecurityPolicyInformer. PodSecurityPolicies() PodSecurityPolicyInformer // ReplicaSets returns a ReplicaSetInformer. @@ -62,6 +64,11 @@ func (v *version) Ingresses() IngressInformer { return &ingressInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// NetworkPolicies returns a NetworkPolicyInformer. +func (v *version) NetworkPolicies() NetworkPolicyInformer { + return &networkPolicyInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // PodSecurityPolicies returns a PodSecurityPolicyInformer. func (v *version) PodSecurityPolicies() PodSecurityPolicyInformer { return &podSecurityPolicyInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} diff --git a/staging/src/k8s.io/client-go/informers/extensions/v1beta1/networkpolicy.go b/staging/src/k8s.io/client-go/informers/extensions/v1beta1/networkpolicy.go new file mode 100644 index 0000000000..92f4f04007 --- /dev/null +++ b/staging/src/k8s.io/client-go/informers/extensions/v1beta1/networkpolicy.go @@ -0,0 +1,89 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1beta1 + +import ( + time "time" + + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + internalinterfaces "k8s.io/client-go/informers/internalinterfaces" + kubernetes "k8s.io/client-go/kubernetes" + v1beta1 "k8s.io/client-go/listers/extensions/v1beta1" + cache "k8s.io/client-go/tools/cache" +) + +// NetworkPolicyInformer provides access to a shared informer and lister for +// NetworkPolicies. +type NetworkPolicyInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1beta1.NetworkPolicyLister +} + +type networkPolicyInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewNetworkPolicyInformer constructs a new informer for NetworkPolicy type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewNetworkPolicyInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredNetworkPolicyInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredNetworkPolicyInformer constructs a new informer for NetworkPolicy type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredNetworkPolicyInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ExtensionsV1beta1().NetworkPolicies(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ExtensionsV1beta1().NetworkPolicies(namespace).Watch(options) + }, + }, + &extensionsv1beta1.NetworkPolicy{}, + resyncPeriod, + indexers, + ) +} + +func (f *networkPolicyInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredNetworkPolicyInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *networkPolicyInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&extensionsv1beta1.NetworkPolicy{}, f.defaultInformer) +} + +func (f *networkPolicyInformer) Lister() v1beta1.NetworkPolicyLister { + return v1beta1.NewNetworkPolicyLister(f.Informer().GetIndexer()) +} diff --git a/staging/src/k8s.io/client-go/informers/generic.go b/staging/src/k8s.io/client-go/informers/generic.go index fd5811cd61..8b986a963d 100644 --- a/staging/src/k8s.io/client-go/informers/generic.go +++ b/staging/src/k8s.io/client-go/informers/generic.go @@ -206,6 +206,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1beta1().Deployments().Informer()}, nil case extensionsv1beta1.SchemeGroupVersion.WithResource("ingresses"): return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1beta1().Ingresses().Informer()}, nil + case extensionsv1beta1.SchemeGroupVersion.WithResource("networkpolicies"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1beta1().NetworkPolicies().Informer()}, nil case extensionsv1beta1.SchemeGroupVersion.WithResource("podsecuritypolicies"): return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1beta1().PodSecurityPolicies().Informer()}, nil case extensionsv1beta1.SchemeGroupVersion.WithResource("replicasets"): diff --git a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/BUILD b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/BUILD index 6690d96b0b..fcd824526d 100644 --- a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/BUILD +++ b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/BUILD @@ -15,6 +15,7 @@ go_library( "extensions_client.go", "generated_expansion.go", "ingress.go", + "networkpolicy.go", "podsecuritypolicy.go", "replicaset.go", ], diff --git a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/extensions_client.go b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/extensions_client.go index 0e9edf5cce..4fca2e5252 100644 --- a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/extensions_client.go +++ b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/extensions_client.go @@ -30,6 +30,7 @@ type ExtensionsV1beta1Interface interface { DaemonSetsGetter DeploymentsGetter IngressesGetter + NetworkPoliciesGetter PodSecurityPoliciesGetter ReplicaSetsGetter } @@ -51,6 +52,10 @@ func (c *ExtensionsV1beta1Client) Ingresses(namespace string) IngressInterface { return newIngresses(c, namespace) } +func (c *ExtensionsV1beta1Client) NetworkPolicies(namespace string) NetworkPolicyInterface { + return newNetworkPolicies(c, namespace) +} + func (c *ExtensionsV1beta1Client) PodSecurityPolicies() PodSecurityPolicyInterface { return newPodSecurityPolicies(c) } diff --git a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/BUILD b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/BUILD index 5de2b7dfeb..d9f3f15065 100644 --- a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/BUILD +++ b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/BUILD @@ -14,6 +14,7 @@ go_library( "fake_deployment_expansion.go", "fake_extensions_client.go", "fake_ingress.go", + "fake_networkpolicy.go", "fake_podsecuritypolicy.go", "fake_replicaset.go", ], diff --git a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/fake_extensions_client.go b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/fake_extensions_client.go index 0282c0b499..36c0d51bc3 100644 --- a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/fake_extensions_client.go +++ b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/fake_extensions_client.go @@ -40,6 +40,10 @@ func (c *FakeExtensionsV1beta1) Ingresses(namespace string) v1beta1.IngressInter return &FakeIngresses{c, namespace} } +func (c *FakeExtensionsV1beta1) NetworkPolicies(namespace string) v1beta1.NetworkPolicyInterface { + return &FakeNetworkPolicies{c, namespace} +} + func (c *FakeExtensionsV1beta1) PodSecurityPolicies() v1beta1.PodSecurityPolicyInterface { return &FakePodSecurityPolicies{c} } diff --git a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/fake_networkpolicy.go b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/fake_networkpolicy.go new file mode 100644 index 0000000000..7f4d4a5554 --- /dev/null +++ b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/fake/fake_networkpolicy.go @@ -0,0 +1,128 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1beta1 "k8s.io/api/extensions/v1beta1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeNetworkPolicies implements NetworkPolicyInterface +type FakeNetworkPolicies struct { + Fake *FakeExtensionsV1beta1 + ns string +} + +var networkpoliciesResource = schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "networkpolicies"} + +var networkpoliciesKind = schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "NetworkPolicy"} + +// Get takes name of the networkPolicy, and returns the corresponding networkPolicy object, and an error if there is any. +func (c *FakeNetworkPolicies) Get(name string, options v1.GetOptions) (result *v1beta1.NetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(networkpoliciesResource, c.ns, name), &v1beta1.NetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1beta1.NetworkPolicy), err +} + +// List takes label and field selectors, and returns the list of NetworkPolicies that match those selectors. +func (c *FakeNetworkPolicies) List(opts v1.ListOptions) (result *v1beta1.NetworkPolicyList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(networkpoliciesResource, networkpoliciesKind, c.ns, opts), &v1beta1.NetworkPolicyList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1beta1.NetworkPolicyList{ListMeta: obj.(*v1beta1.NetworkPolicyList).ListMeta} + for _, item := range obj.(*v1beta1.NetworkPolicyList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested networkPolicies. +func (c *FakeNetworkPolicies) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(networkpoliciesResource, c.ns, opts)) + +} + +// Create takes the representation of a networkPolicy and creates it. Returns the server's representation of the networkPolicy, and an error, if there is any. +func (c *FakeNetworkPolicies) Create(networkPolicy *v1beta1.NetworkPolicy) (result *v1beta1.NetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(networkpoliciesResource, c.ns, networkPolicy), &v1beta1.NetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1beta1.NetworkPolicy), err +} + +// Update takes the representation of a networkPolicy and updates it. Returns the server's representation of the networkPolicy, and an error, if there is any. +func (c *FakeNetworkPolicies) Update(networkPolicy *v1beta1.NetworkPolicy) (result *v1beta1.NetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(networkpoliciesResource, c.ns, networkPolicy), &v1beta1.NetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1beta1.NetworkPolicy), err +} + +// Delete takes name of the networkPolicy and deletes it. Returns an error if one occurs. +func (c *FakeNetworkPolicies) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(networkpoliciesResource, c.ns, name), &v1beta1.NetworkPolicy{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeNetworkPolicies) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(networkpoliciesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1beta1.NetworkPolicyList{}) + return err +} + +// Patch applies the patch and returns the patched networkPolicy. +func (c *FakeNetworkPolicies) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.NetworkPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(networkpoliciesResource, c.ns, name, pt, data, subresources...), &v1beta1.NetworkPolicy{}) + + if obj == nil { + return nil, err + } + return obj.(*v1beta1.NetworkPolicy), err +} diff --git a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/generated_expansion.go b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/generated_expansion.go index cfaeebd054..41d28f0417 100644 --- a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/generated_expansion.go +++ b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/generated_expansion.go @@ -22,6 +22,8 @@ type DaemonSetExpansion interface{} type IngressExpansion interface{} +type NetworkPolicyExpansion interface{} + type PodSecurityPolicyExpansion interface{} type ReplicaSetExpansion interface{} diff --git a/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/networkpolicy.go b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/networkpolicy.go new file mode 100644 index 0000000000..0607e2dd48 --- /dev/null +++ b/staging/src/k8s.io/client-go/kubernetes/typed/extensions/v1beta1/networkpolicy.go @@ -0,0 +1,174 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1beta1 + +import ( + "time" + + v1beta1 "k8s.io/api/extensions/v1beta1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + scheme "k8s.io/client-go/kubernetes/scheme" + rest "k8s.io/client-go/rest" +) + +// NetworkPoliciesGetter has a method to return a NetworkPolicyInterface. +// A group's client should implement this interface. +type NetworkPoliciesGetter interface { + NetworkPolicies(namespace string) NetworkPolicyInterface +} + +// NetworkPolicyInterface has methods to work with NetworkPolicy resources. +type NetworkPolicyInterface interface { + Create(*v1beta1.NetworkPolicy) (*v1beta1.NetworkPolicy, error) + Update(*v1beta1.NetworkPolicy) (*v1beta1.NetworkPolicy, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1beta1.NetworkPolicy, error) + List(opts v1.ListOptions) (*v1beta1.NetworkPolicyList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.NetworkPolicy, err error) + NetworkPolicyExpansion +} + +// networkPolicies implements NetworkPolicyInterface +type networkPolicies struct { + client rest.Interface + ns string +} + +// newNetworkPolicies returns a NetworkPolicies +func newNetworkPolicies(c *ExtensionsV1beta1Client, namespace string) *networkPolicies { + return &networkPolicies{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the networkPolicy, and returns the corresponding networkPolicy object, and an error if there is any. +func (c *networkPolicies) Get(name string, options v1.GetOptions) (result *v1beta1.NetworkPolicy, err error) { + result = &v1beta1.NetworkPolicy{} + err = c.client.Get(). + Namespace(c.ns). + Resource("networkpolicies"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of NetworkPolicies that match those selectors. +func (c *networkPolicies) List(opts v1.ListOptions) (result *v1beta1.NetworkPolicyList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1beta1.NetworkPolicyList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("networkpolicies"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested networkPolicies. +func (c *networkPolicies) Watch(opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("networkpolicies"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch() +} + +// Create takes the representation of a networkPolicy and creates it. Returns the server's representation of the networkPolicy, and an error, if there is any. +func (c *networkPolicies) Create(networkPolicy *v1beta1.NetworkPolicy) (result *v1beta1.NetworkPolicy, err error) { + result = &v1beta1.NetworkPolicy{} + err = c.client.Post(). + Namespace(c.ns). + Resource("networkpolicies"). + Body(networkPolicy). + Do(). + Into(result) + return +} + +// Update takes the representation of a networkPolicy and updates it. Returns the server's representation of the networkPolicy, and an error, if there is any. +func (c *networkPolicies) Update(networkPolicy *v1beta1.NetworkPolicy) (result *v1beta1.NetworkPolicy, err error) { + result = &v1beta1.NetworkPolicy{} + err = c.client.Put(). + Namespace(c.ns). + Resource("networkpolicies"). + Name(networkPolicy.Name). + Body(networkPolicy). + Do(). + Into(result) + return +} + +// Delete takes name of the networkPolicy and deletes it. Returns an error if one occurs. +func (c *networkPolicies) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("networkpolicies"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *networkPolicies) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + var timeout time.Duration + if listOptions.TimeoutSeconds != nil { + timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("networkpolicies"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Timeout(timeout). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched networkPolicy. +func (c *networkPolicies) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.NetworkPolicy, err error) { + result = &v1beta1.NetworkPolicy{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("networkpolicies"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/BUILD b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/BUILD index 62d18fefa1..98f2e25474 100644 --- a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/BUILD +++ b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/BUILD @@ -15,6 +15,7 @@ go_library( "deployment_expansion.go", "expansion_generated.go", "ingress.go", + "networkpolicy.go", "podsecuritypolicy.go", "replicaset.go", "replicaset_expansion.go", diff --git a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/expansion_generated.go b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/expansion_generated.go index d5c2a7a7d2..6d55ae9b8b 100644 --- a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/expansion_generated.go +++ b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/expansion_generated.go @@ -26,6 +26,14 @@ type IngressListerExpansion interface{} // IngressNamespaceLister. type IngressNamespaceListerExpansion interface{} +// NetworkPolicyListerExpansion allows custom methods to be added to +// NetworkPolicyLister. +type NetworkPolicyListerExpansion interface{} + +// NetworkPolicyNamespaceListerExpansion allows custom methods to be added to +// NetworkPolicyNamespaceLister. +type NetworkPolicyNamespaceListerExpansion interface{} + // PodSecurityPolicyListerExpansion allows custom methods to be added to // PodSecurityPolicyLister. type PodSecurityPolicyListerExpansion interface{} diff --git a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/networkpolicy.go b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/networkpolicy.go new file mode 100644 index 0000000000..782f521add --- /dev/null +++ b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/networkpolicy.go @@ -0,0 +1,94 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1beta1 "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// NetworkPolicyLister helps list NetworkPolicies. +type NetworkPolicyLister interface { + // List lists all NetworkPolicies in the indexer. + List(selector labels.Selector) (ret []*v1beta1.NetworkPolicy, err error) + // NetworkPolicies returns an object that can list and get NetworkPolicies. + NetworkPolicies(namespace string) NetworkPolicyNamespaceLister + NetworkPolicyListerExpansion +} + +// networkPolicyLister implements the NetworkPolicyLister interface. +type networkPolicyLister struct { + indexer cache.Indexer +} + +// NewNetworkPolicyLister returns a new NetworkPolicyLister. +func NewNetworkPolicyLister(indexer cache.Indexer) NetworkPolicyLister { + return &networkPolicyLister{indexer: indexer} +} + +// List lists all NetworkPolicies in the indexer. +func (s *networkPolicyLister) List(selector labels.Selector) (ret []*v1beta1.NetworkPolicy, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1beta1.NetworkPolicy)) + }) + return ret, err +} + +// NetworkPolicies returns an object that can list and get NetworkPolicies. +func (s *networkPolicyLister) NetworkPolicies(namespace string) NetworkPolicyNamespaceLister { + return networkPolicyNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// NetworkPolicyNamespaceLister helps list and get NetworkPolicies. +type NetworkPolicyNamespaceLister interface { + // List lists all NetworkPolicies in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1beta1.NetworkPolicy, err error) + // Get retrieves the NetworkPolicy from the indexer for a given namespace and name. + Get(name string) (*v1beta1.NetworkPolicy, error) + NetworkPolicyNamespaceListerExpansion +} + +// networkPolicyNamespaceLister implements the NetworkPolicyNamespaceLister +// interface. +type networkPolicyNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all NetworkPolicies in the indexer for a given namespace. +func (s networkPolicyNamespaceLister) List(selector labels.Selector) (ret []*v1beta1.NetworkPolicy, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1beta1.NetworkPolicy)) + }) + return ret, err +} + +// Get retrieves the NetworkPolicy from the indexer for a given namespace and name. +func (s networkPolicyNamespaceLister) Get(name string) (*v1beta1.NetworkPolicy, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1beta1.Resource("networkpolicy"), name) + } + return obj.(*v1beta1.NetworkPolicy), nil +} From 713a10d27678321ba215c3be56812f14a0c75ac3 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 28 Nov 2018 12:38:41 -0500 Subject: [PATCH 3/9] Fix race in quota sync test --- .../resourcequota/resource_quota_controller_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index 769b23022a..78ee509891 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -715,8 +715,17 @@ func TestSyncResourceQuota(t *testing.T) { t.Errorf("test: %s,\nExpected actions:\n%v\n but got:\n%v\nDifference:\n%v", testName, testCase.expectedActionSet, actionSet, testCase.expectedActionSet.Difference(actionSet)) } - lastActionIndex := len(kubeClient.Actions()) - 1 - usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota) + var usage *v1.ResourceQuota + actions := kubeClient.Actions() + for i := len(actions) - 1; i >= 0; i-- { + if updateAction, ok := actions[i].(core.UpdateAction); ok { + usage = updateAction.GetObject().(*v1.ResourceQuota) + break + } + } + if usage == nil { + t.Errorf("test: %s,\nExpected update action usage, got none: actions:\n%v", testName, actions) + } // ensure usage is as expected if len(usage.Status.Hard) != len(testCase.status.Hard) { From 8146fe47d81ce14e8d4730ecc1f527a525030299 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 17:09:41 -0700 Subject: [PATCH 4/9] Clean up test output --- pkg/controller/garbagecollector/garbagecollector_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 2f4c98487c..4076ef0143 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -873,7 +873,6 @@ func TestGarbageCollectorSync(t *testing.T) { go gc.Sync(fakeDiscoveryClient, 200*time.Millisecond, stopCh) // Wait until the sync discovers the initial resources - fmt.Printf("Test output") time.Sleep(1 * time.Second) err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock) From ec7a04bd20ab41bee6d85136b3be5b13048bac6e Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 18:49:56 -0700 Subject: [PATCH 5/9] Prevent quota controller using unsynced listers --- pkg/quota/v1/generic/BUILD | 16 ++- pkg/quota/v1/generic/evaluator.go | 71 +++++++++++++- pkg/quota/v1/generic/evaluator_test.go | 131 +++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 6 deletions(-) create mode 100644 pkg/quota/v1/generic/evaluator_test.go diff --git a/pkg/quota/v1/generic/BUILD b/pkg/quota/v1/generic/BUILD index c99ade4e3f..7538c4051a 100644 --- a/pkg/quota/v1/generic/BUILD +++ b/pkg/quota/v1/generic/BUILD @@ -1,9 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -38,3 +35,14 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["evaluator_test.go"], + embed = [":go_default_library"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + ], +) diff --git a/pkg/quota/v1/generic/evaluator.go b/pkg/quota/v1/generic/evaluator.go index 4c3dfdf4fa..f49e2decd4 100644 --- a/pkg/quota/v1/generic/evaluator.go +++ b/pkg/quota/v1/generic/evaluator.go @@ -18,6 +18,7 @@ package generic import ( "fmt" + "sync/atomic" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -33,17 +34,83 @@ import ( // InformerForResourceFunc knows how to provision an informer type InformerForResourceFunc func(schema.GroupVersionResource) (informers.GenericInformer, error) -// ListerFuncForResourceFunc knows how to provision a lister from an informer func +// ListerFuncForResourceFunc knows how to provision a lister from an informer func. +// The lister returns errors until the informer has synced. func ListerFuncForResourceFunc(f InformerForResourceFunc) quota.ListerForResourceFunc { return func(gvr schema.GroupVersionResource) (cache.GenericLister, error) { informer, err := f(gvr) if err != nil { return nil, err } - return informer.Lister(), nil + return &protectedLister{ + hasSynced: cachedHasSynced(informer.Informer().HasSynced), + notReadyErr: fmt.Errorf("%v not yet synced", gvr), + delegate: informer.Lister(), + }, nil } } +// cachedHasSynced returns a function that calls hasSynced() until it returns true once, then returns true +func cachedHasSynced(hasSynced func() bool) func() bool { + cache := &atomic.Value{} + cache.Store(false) + return func() bool { + if cache.Load().(bool) { + // short-circuit if already synced + return true + } + if hasSynced() { + // remember we synced + cache.Store(true) + return true + } + return false + } +} + +// protectedLister returns notReadyError if hasSynced returns false, otherwise delegates to delegate +type protectedLister struct { + hasSynced func() bool + notReadyErr error + delegate cache.GenericLister +} + +func (p *protectedLister) List(selector labels.Selector) (ret []runtime.Object, err error) { + if !p.hasSynced() { + return nil, p.notReadyErr + } + return p.delegate.List(selector) +} +func (p *protectedLister) Get(name string) (runtime.Object, error) { + if !p.hasSynced() { + return nil, p.notReadyErr + } + return p.delegate.Get(name) +} +func (p *protectedLister) ByNamespace(namespace string) cache.GenericNamespaceLister { + return &protectedNamespaceLister{p.hasSynced, p.notReadyErr, p.delegate.ByNamespace(namespace)} +} + +// protectedNamespaceLister returns notReadyError if hasSynced returns false, otherwise delegates to delegate +type protectedNamespaceLister struct { + hasSynced func() bool + notReadyErr error + delegate cache.GenericNamespaceLister +} + +func (p *protectedNamespaceLister) List(selector labels.Selector) (ret []runtime.Object, err error) { + if !p.hasSynced() { + return nil, p.notReadyErr + } + return p.delegate.List(selector) +} +func (p *protectedNamespaceLister) Get(name string) (runtime.Object, error) { + if !p.hasSynced() { + return nil, p.notReadyErr + } + return p.delegate.Get(name) +} + // ListResourceUsingListerFunc returns a listing function based on the shared informer factory for the specified resource. func ListResourceUsingListerFunc(l quota.ListerForResourceFunc, resource schema.GroupVersionResource) ListFuncByNamespace { return func(namespace string) ([]runtime.Object, error) { diff --git a/pkg/quota/v1/generic/evaluator_test.go b/pkg/quota/v1/generic/evaluator_test.go new file mode 100644 index 0000000000..ce97b1bdf2 --- /dev/null +++ b/pkg/quota/v1/generic/evaluator_test.go @@ -0,0 +1,131 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "errors" + "testing" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +func TestCachedHasSynced(t *testing.T) { + + called := 0 + result := false + cachedFunc := cachedHasSynced(func() bool { + called++ + return result + }) + + if cachedFunc() { + t.Fatal("expected false") + } + if called != 1 { + t.Fatalf("expected called=1, got %d", called) + } + + if cachedFunc() { + t.Fatal("expected false") + } + if called != 2 { + t.Fatalf("expected called=2, got %d", called) + } + + result = true + if !cachedFunc() { + t.Fatal("expected true") + } + if called != 3 { + t.Fatalf("expected called=3, got %d", called) + } + + if !cachedFunc() { + t.Fatal("expected true") + } + if called != 3 { + // no more calls once we return true + t.Fatalf("expected called=3, got %d", called) + } +} + +func TestProtectedLister(t *testing.T) { + + hasSynced := false + notReadyErr := errors.New("not ready") + fake := &fakeLister{} + l := &protectedLister{ + hasSynced: func() bool { return hasSynced }, + notReadyErr: notReadyErr, + delegate: fake, + } + if _, err := l.List(nil); err != notReadyErr { + t.Fatalf("expected %v, got %v", notReadyErr, err) + } + if _, err := l.Get(""); err != notReadyErr { + t.Fatalf("expected %v, got %v", notReadyErr, err) + } + if fake.called != 0 { + t.Fatalf("expected called=0, got %d", fake.called) + } + fake.called = 0 + + hasSynced = true + + if _, err := l.List(nil); err != errFakeLister { + t.Fatalf("expected %v, got %v", errFakeLister, err) + } + if _, err := l.Get(""); err != errFakeLister { + t.Fatalf("expected %v, got %v", errFakeLister, err) + } + if fake.called != 2 { + t.Fatalf("expected called=2, got %d", fake.called) + } + fake.called = 0 + + hasSynced = false + + if _, err := l.List(nil); err != notReadyErr { + t.Fatalf("expected %v, got %v", notReadyErr, err) + } + if _, err := l.Get(""); err != notReadyErr { + t.Fatalf("expected %v, got %v", notReadyErr, err) + } + if fake.called != 0 { + t.Fatalf("expected called=2, got %d", fake.called) + } +} + +var errFakeLister = errors.New("errFakeLister") + +type fakeLister struct { + called int +} + +func (f *fakeLister) List(selector labels.Selector) (ret []runtime.Object, err error) { + f.called++ + return nil, errFakeLister +} +func (f *fakeLister) Get(name string) (runtime.Object, error) { + f.called++ + return nil, errFakeLister +} +func (f *fakeLister) ByNamespace(namespace string) cache.GenericNamespaceLister { + panic("not implemented") +} From e5f7af70583b758978c819cd5c0c6a3ddb6fee28 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 18:52:39 -0700 Subject: [PATCH 6/9] Improve quota sync log messages --- pkg/controller/resourcequota/BUILD | 1 + .../resource_quota_controller.go | 31 ++++++++++++++++--- .../resourcequota/resource_quota_monitor.go | 4 ++- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/pkg/controller/resourcequota/BUILD b/pkg/controller/resourcequota/BUILD index 9ee06965e8..08f7bacf89 100644 --- a/pkg/controller/resourcequota/BUILD +++ b/pkg/controller/resourcequota/BUILD @@ -29,6 +29,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/discovery:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 948b31814f..4447cf5de4 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/informers" @@ -423,15 +424,16 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p return } - // Something has changed, so track the new state and perform a sync. - klog.V(2).Infof("syncing resource quota controller with updated resources from discovery: %v", newResources) - oldResources = newResources - // Ensure workers are paused to avoid processing events before informers // have resynced. rq.workerLock.Lock() defer rq.workerLock.Unlock() + // Something has changed, so track the new state and perform a sync. + if klog.V(2) { + klog.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources)) + } + // Perform the monitor resync and wait for controllers to report cache sync. if err := rq.resyncMonitors(newResources); err != nil { utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) @@ -440,9 +442,30 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync")) } + + // success, remember newly synced resources + oldResources = newResources + klog.V(2).Infof("synced quota controller") }, period, stopCh) } +// printDiff returns a human-readable summary of what resources were added and removed +func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string { + removed := sets.NewString() + for oldResource := range oldResources { + if _, ok := newResources[oldResource]; !ok { + removed.Insert(fmt.Sprintf("%+v", oldResource)) + } + } + added := sets.NewString() + for newResource := range newResources { + if _, ok := oldResources[newResource]; !ok { + added.Insert(fmt.Sprintf("%+v", newResource)) + } + } + return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List()) +} + // resyncMonitors starts or stops quota monitors as needed to ensure that all // (and only) those resources present in the map are monitored. func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error { diff --git a/pkg/controller/resourcequota/resource_quota_monitor.go b/pkg/controller/resourcequota/resource_quota_monitor.go index c3d9e1f3e6..08dcc57e69 100644 --- a/pkg/controller/resourcequota/resource_quota_monitor.go +++ b/pkg/controller/resourcequota/resource_quota_monitor.go @@ -284,11 +284,13 @@ func (qm *QuotaMonitor) IsSynced() bool { defer qm.monitorLock.Unlock() if len(qm.monitors) == 0 { + klog.V(4).Info("quota monitor not synced: no monitors") return false } - for _, monitor := range qm.monitors { + for resource, monitor := range qm.monitors { if !monitor.controller.HasSynced() { + klog.V(4).Infof("quota monitor not synced: %v", resource) return false } } From 739df5452a407636a927ac75395784d5bcca9460 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 18:56:39 -0700 Subject: [PATCH 7/9] Avoid deadlock in resource quota resync --- pkg/controller/resourcequota/BUILD | 1 + .../resource_quota_controller.go | 20 +- .../resource_quota_controller_test.go | 263 +++++++++++++++++- 3 files changed, 279 insertions(+), 5 deletions(-) diff --git a/pkg/controller/resourcequota/BUILD b/pkg/controller/resourcequota/BUILD index 08f7bacf89..2728d1cb96 100644 --- a/pkg/controller/resourcequota/BUILD +++ b/pkg/controller/resourcequota/BUILD @@ -60,6 +60,7 @@ go_test( "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", ], diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 4447cf5de4..e87e4cf49a 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -439,8 +439,13 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) return } - if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) { + // wait for caches to fill for a while (our sync period). + // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. + // informers keep attempting to sync in the background, so retrying doesn't interrupt them. + // the call to resyncMonitors on the reattempt will no-op for resources that still exist. + if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync")) + return } // success, remember newly synced resources @@ -466,6 +471,19 @@ func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List()) } +// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached +func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} { + stopChWithTimeout := make(chan struct{}) + go func() { + defer close(stopChWithTimeout) + select { + case <-stopCh: + case <-time.After(timeout): + } + }() + return stopChWithTimeout +} + // resyncMonitors starts or stops quota monitors as needed to ensure that all // (and only) those resources present in the map are monitored. func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error { diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index 78ee509891..25fae49784 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -18,8 +18,12 @@ package resourcequota import ( "fmt" + "net/http" + "net/http/httptest" "strings" + "sync" "testing" + "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -30,6 +34,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller" @@ -83,7 +88,7 @@ type quotaController struct { stop chan struct{} } -func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc) quotaController { +func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc, discoveryFunc NamespacedResourcesFunc) quotaController { informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) quotaConfiguration := install.NewQuotaConfigurationForControllers(lister) alwaysStarted := make(chan struct{}) @@ -94,9 +99,10 @@ func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister ResyncPeriod: controller.NoResyncPeriodFunc, ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, - DiscoveryFunc: mockDiscoveryFunc, + DiscoveryFunc: discoveryFunc, Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), InformersStarted: alwaysStarted, + InformerFactory: informerFactory, } qc, err := NewResourceQuotaController(resourceQuotaControllerOptions) if err != nil { @@ -700,7 +706,7 @@ func TestSyncResourceQuota(t *testing.T) { listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items), } - qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig)) + qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc) defer close(qc.stop) if err := qc.syncResourceQuota(&testCase.quota); err != nil { @@ -760,7 +766,7 @@ func TestAddQuota(t *testing.T) { gvr: newGenericLister(gvr.GroupResource(), newTestPods()), } - qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig)) + qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc) defer close(qc.stop) testCases := []struct { @@ -918,3 +924,252 @@ func TestAddQuota(t *testing.T) { } } } + +// TestDiscoverySync ensures that a discovery client error +// will not cause the quota controller to block infinitely. +func TestDiscoverySync(t *testing.T) { + serverResources := []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, + }, + }, + } + unsyncableServerResources := []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + {Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, + {Name: "secrets", Namespaced: true, Kind: "Secret", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}}, + }, + }, + } + fakeDiscoveryClient := &fakeServerResources{ + PreferredResources: serverResources, + Error: nil, + Lock: sync.Mutex{}, + InterfaceUsedCount: 0, + } + + testHandler := &fakeActionHandler{ + response: map[string]FakeResponse{ + "GET" + "/api/v1/pods": { + 200, + []byte("{}"), + }, + "GET" + "/api/v1/secrets": { + 404, + []byte("{}"), + }, + }, + } + + srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) + defer srv.Close() + clientConfig.ContentConfig.NegotiatedSerializer = nil + kubeClient, err := kubernetes.NewForConfig(clientConfig) + if err != nil { + t.Fatal(err) + } + + pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"} + listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ + pods: newGenericLister(pods.GroupResource(), []runtime.Object{}), + secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}), + } + qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources) + defer close(qc.stop) + + stopSync := make(chan struct{}) + defer close(stopSync) + // The pseudo-code of Sync(): + // Sync(client, period, stopCh): + // wait.Until() loops with `period` until the `stopCh` is closed : + // GetQuotableResources() + // resyncMonitors() + // controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced. + // + // Setting the period to 200ms allows the WaitForCacheSync() to check + // for cache sync ~2 times in every wait.Until() loop. + // + // The 1s sleep in the test allows GetQuotableResources and + // resyncMonitors to run ~5 times to ensure the changes to the + // fakeDiscoveryClient are picked up. + go qc.Sync(fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond, stopSync) + + // Wait until the sync discovers the initial resources + time.Sleep(1 * time.Second) + + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err) + } + + // Simulate the discovery client returning an error + fakeDiscoveryClient.setPreferredResources(nil) + fakeDiscoveryClient.setError(fmt.Errorf("Error calling discoveryClient.ServerPreferredResources()")) + + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + + // Remove the error from being returned and see if the quota sync is still working + fakeDiscoveryClient.setPreferredResources(serverResources) + fakeDiscoveryClient.setError(nil) + + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) + } + + // Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches + fakeDiscoveryClient.setPreferredResources(unsyncableServerResources) + fakeDiscoveryClient.setError(nil) + + // Wait until sync discovers the change + time.Sleep(1 * time.Second) + + // Put the resources back to normal and ensure quota sync recovers + fakeDiscoveryClient.setPreferredResources(serverResources) + fakeDiscoveryClient.setError(nil) + + err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock) + if err != nil { + t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err) + } +} + +// testServerAndClientConfig returns a server that listens and a config that can reference it +func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *rest.Config) { + srv := httptest.NewServer(http.HandlerFunc(handler)) + config := &rest.Config{ + Host: srv.URL, + } + return srv, config +} + +func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error { + before := fakeDiscoveryClient.getInterfaceUsedCount() + t := 1 * time.Second + time.Sleep(t) + after := fakeDiscoveryClient.getInterfaceUsedCount() + if before == after { + return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t) + } + + workerLockAcquired := make(chan struct{}) + go func() { + workerLock.Lock() + workerLock.Unlock() + close(workerLockAcquired) + }() + select { + case <-workerLockAcquired: + return nil + case <-time.After(t): + return fmt.Errorf("workerLock blocked for at least %v", t) + } +} + +type fakeServerResources struct { + PreferredResources []*metav1.APIResourceList + Error error + Lock sync.Mutex + InterfaceUsedCount int +} + +func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + return nil, nil +} + +func (_ *fakeServerResources) ServerResources() ([]*metav1.APIResourceList, error) { + return nil, nil +} + +func (_ *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) { + return nil, nil +} + +func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.PreferredResources = resources +} + +func (f *fakeServerResources) setError(err error) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.Error = err +} + +func (f *fakeServerResources) getInterfaceUsedCount() int { + f.Lock.Lock() + defer f.Lock.Unlock() + return f.InterfaceUsedCount +} + +func (f *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { + f.Lock.Lock() + defer f.Lock.Unlock() + f.InterfaceUsedCount++ + return f.PreferredResources, f.Error +} + +// fakeAction records information about requests to aid in testing. +type fakeAction struct { + method string + path string + query string +} + +// String returns method=path to aid in testing +func (f *fakeAction) String() string { + return strings.Join([]string{f.method, f.path}, "=") +} + +type FakeResponse struct { + statusCode int + content []byte +} + +// fakeActionHandler holds a list of fakeActions received +type fakeActionHandler struct { + // statusCode and content returned by this handler for different method + path. + response map[string]FakeResponse + + lock sync.Mutex + actions []fakeAction +} + +// ServeHTTP logs the action that occurred and always returns the associated status code +func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { + func() { + f.lock.Lock() + defer f.lock.Unlock() + + f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery}) + fakeResponse, ok := f.response[request.Method+request.URL.Path] + if !ok { + fakeResponse.statusCode = 200 + fakeResponse.content = []byte("{\"kind\": \"List\"}") + } + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(fakeResponse.statusCode) + response.Write(fakeResponse.content) + }() + + // This is to allow the fakeActionHandler to simulate a watch being opened + if strings.Contains(request.URL.RawQuery, "watch=true") { + hijacker, ok := response.(http.Hijacker) + if !ok { + return + } + connection, _, err := hijacker.Hijack() + if err != nil { + return + } + defer connection.Close() + time.Sleep(30 * time.Second) + } +} From 27cd2be49fdd6f5ae1866c47437e2c56a9616fea Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 20:13:51 -0700 Subject: [PATCH 8/9] Update quota status with limits even when calculating errors --- pkg/controller/resourcequota/BUILD | 1 + .../resource_quota_controller.go | 16 +++- .../resource_quota_controller_test.go | 81 ++++++++++++++++- pkg/quota/v1/BUILD | 1 + pkg/quota/v1/resources.go | 53 ++++++++--- pkg/quota/v1/resources_test.go | 91 +++++++++++++++++++ 6 files changed, 225 insertions(+), 18 deletions(-) diff --git a/pkg/controller/resourcequota/BUILD b/pkg/controller/resourcequota/BUILD index 2728d1cb96..cb2d1beac0 100644 --- a/pkg/controller/resourcequota/BUILD +++ b/pkg/controller/resourcequota/BUILD @@ -54,6 +54,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index e87e4cf49a..bac6b9ebda 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -321,12 +322,12 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err // syncResourceQuota runs a complete sync of resource quota status across all known kinds func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQuota) (err error) { // quota is dirty if any part of spec hard limits differs from the status hard limits - dirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) + statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) // dirty tracks if the usage status differs from the previous sync, // if so, we send a new usage with latest status // if this is our first sync, it will be dirty by default, since we need track usage - dirty = dirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil + dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil used := v1.ResourceList{} if resourceQuota.Status.Used != nil { @@ -334,9 +335,12 @@ func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQ } hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard) + errors := []error{} + newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector) if err != nil { - return err + // if err is non-nil, remember it to return, but continue updating status with any resources in newUsage + errors = append(errors, err) } for key, value := range newUsage { used[key] = value @@ -359,9 +363,11 @@ func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQ // there was a change observed by this controller that requires we update quota if dirty { _, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(usage) - return err + if err != nil { + errors = append(errors, err) + } } - return nil + return utilerrors.NewAggregate(errors) } // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index 25fae49784..1d92197b02 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" @@ -83,6 +84,23 @@ func newGenericLister(groupResource schema.GroupResource, items []runtime.Object return cache.NewGenericLister(store, groupResource) } +func newErrorLister() cache.GenericLister { + return errorLister{} +} + +type errorLister struct { +} + +func (errorLister) List(selector labels.Selector) (ret []runtime.Object, err error) { + return nil, fmt.Errorf("error listing") +} +func (errorLister) Get(name string) (runtime.Object, error) { + return nil, fmt.Errorf("error getting") +} +func (errorLister) ByNamespace(namespace string) cache.GenericNamespaceLister { + return errorLister{} +} + type quotaController struct { *ResourceQuotaController stop chan struct{} @@ -205,9 +223,11 @@ func newTestPodsWithPriorityClasses() []runtime.Object { func TestSyncResourceQuota(t *testing.T) { testCases := map[string]struct { gvr schema.GroupVersionResource + errorGVR schema.GroupVersionResource items []runtime.Object quota v1.ResourceQuota status v1.ResourceQuotaStatus + expectedError string expectedActionSet sets.String }{ "non-matching-best-effort-scoped-quota": { @@ -699,18 +719,75 @@ func TestSyncResourceQuota(t *testing.T) { expectedActionSet: sets.NewString(), items: []runtime.Object{}, }, + "quota-missing-status-with-calculation-error": { + errorGVR: v1.SchemeGroupVersion.WithResource("pods"), + quota: v1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: v1.ResourceQuotaSpec{ + Hard: v1.ResourceList{ + v1.ResourcePods: resource.MustParse("1"), + }, + }, + Status: v1.ResourceQuotaStatus{}, + }, + status: v1.ResourceQuotaStatus{ + Hard: v1.ResourceList{ + v1.ResourcePods: resource.MustParse("1"), + }, + }, + expectedError: "error listing", + expectedActionSet: sets.NewString("update-resourcequotas-status"), + items: []runtime.Object{}, + }, + "quota-missing-status-with-partial-calculation-error": { + gvr: v1.SchemeGroupVersion.WithResource("configmaps"), + errorGVR: v1.SchemeGroupVersion.WithResource("pods"), + quota: v1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: v1.ResourceQuotaSpec{ + Hard: v1.ResourceList{ + v1.ResourcePods: resource.MustParse("1"), + v1.ResourceConfigMaps: resource.MustParse("1"), + }, + }, + Status: v1.ResourceQuotaStatus{}, + }, + status: v1.ResourceQuotaStatus{ + Hard: v1.ResourceList{ + v1.ResourcePods: resource.MustParse("1"), + v1.ResourceConfigMaps: resource.MustParse("1"), + }, + Used: v1.ResourceList{ + v1.ResourceConfigMaps: resource.MustParse("0"), + }, + }, + expectedError: "error listing", + expectedActionSet: sets.NewString("update-resourcequotas-status"), + items: []runtime.Object{}, + }, } for testName, testCase := range testCases { kubeClient := fake.NewSimpleClientset(&testCase.quota) listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ - testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items), + testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items), + testCase.errorGVR: newErrorLister(), } qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc) defer close(qc.stop) if err := qc.syncResourceQuota(&testCase.quota); err != nil { - t.Fatalf("test: %s, unexpected error: %v", testName, err) + if len(testCase.expectedError) == 0 || !strings.Contains(err.Error(), testCase.expectedError) { + t.Fatalf("test: %s, unexpected error: %v", testName, err) + } + } else if len(testCase.expectedError) > 0 { + t.Fatalf("test: %s, expected error %q, got none", testName, testCase.expectedError) } actionSet := sets.NewString() diff --git a/pkg/quota/v1/BUILD b/pkg/quota/v1/BUILD index 7b3cb195b7..fe9f7b579d 100644 --- a/pkg/quota/v1/BUILD +++ b/pkg/quota/v1/BUILD @@ -18,6 +18,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", diff --git a/pkg/quota/v1/resources.go b/pkg/quota/v1/resources.go index b6aa3210d4..86984fc1c6 100644 --- a/pkg/quota/v1/resources.go +++ b/pkg/quota/v1/resources.go @@ -17,10 +17,12 @@ limitations under the License. package quota import ( + "sort" "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" ) @@ -186,7 +188,12 @@ func ResourceNames(resources corev1.ResourceList) []corev1.ResourceName { // Contains returns true if the specified item is in the list of items func Contains(items []corev1.ResourceName, item corev1.ResourceName) bool { - return ToSet(items).Has(string(item)) + for _, i := range items { + if i == item { + return true + } + } + return false } // ContainsPrefix returns true if the specified item has a prefix that contained in given prefix Set @@ -199,15 +206,32 @@ func ContainsPrefix(prefixSet []string, item corev1.ResourceName) bool { return false } -// Intersection returns the intersection of both list of resources +// Intersection returns the intersection of both list of resources, deduped and sorted func Intersection(a []corev1.ResourceName, b []corev1.ResourceName) []corev1.ResourceName { - setA := ToSet(a) - setB := ToSet(b) - setC := setA.Intersection(setB) - result := []corev1.ResourceName{} - for _, resourceName := range setC.List() { - result = append(result, corev1.ResourceName(resourceName)) + result := make([]corev1.ResourceName, 0, len(a)) + for _, item := range a { + if Contains(result, item) { + continue + } + if !Contains(b, item) { + continue + } + result = append(result, item) } + sort.Slice(result, func(i, j int) bool { return result[i] < result[j] }) + return result +} + +// Difference returns the list of resources resulting from a-b, deduped and sorted +func Difference(a []corev1.ResourceName, b []corev1.ResourceName) []corev1.ResourceName { + result := make([]corev1.ResourceName, 0, len(a)) + for _, item := range a { + if Contains(b, item) || Contains(result, item) { + continue + } + result = append(result, item) + } + sort.Slice(result, func(i, j int) bool { return result[i] < result[j] }) return result } @@ -243,7 +267,8 @@ func ToSet(resourceNames []corev1.ResourceName) sets.String { return result } -// CalculateUsage calculates and returns the requested ResourceList usage +// CalculateUsage calculates and returns the requested ResourceList usage. +// If an error is returned, usage only contains the resources which encountered no calculation errors. func CalculateUsage(namespaceName string, scopes []corev1.ResourceQuotaScope, hardLimits corev1.ResourceList, registry Registry, scopeSelector *corev1.ScopeSelector) (corev1.ResourceList, error) { // find the intersection between the hard resources on the quota // and the resources this controller can track to know what we can @@ -257,6 +282,8 @@ func CalculateUsage(namespaceName string, scopes []corev1.ResourceQuotaScope, ha // NOTE: the intersection just removes duplicates since the evaluator match intersects with hard matchedResources := Intersection(hardResources, potentialResources) + errors := []error{} + // sum the observed usage from each evaluator newUsage := corev1.ResourceList{} for _, evaluator := range evaluators { @@ -269,7 +296,11 @@ func CalculateUsage(namespaceName string, scopes []corev1.ResourceQuotaScope, ha usageStatsOptions := UsageStatsOptions{Namespace: namespaceName, Scopes: scopes, Resources: intersection, ScopeSelector: scopeSelector} stats, err := evaluator.UsageStats(usageStatsOptions) if err != nil { - return nil, err + // remember the error + errors = append(errors, err) + // exclude resources which encountered calculation errors + matchedResources = Difference(matchedResources, intersection) + continue } newUsage = Add(newUsage, stats.Used) } @@ -278,5 +309,5 @@ func CalculateUsage(namespaceName string, scopes []corev1.ResourceQuotaScope, ha // merge our observed usage with the quota usage status // if the new usage is different than the last usage, we will need to do an update newUsage = Mask(newUsage, matchedResources) - return newUsage, nil + return newUsage, utilerrors.NewAggregate(errors) } diff --git a/pkg/quota/v1/resources_test.go b/pkg/quota/v1/resources_test.go index 61175c706e..910c2f5112 100644 --- a/pkg/quota/v1/resources_test.go +++ b/pkg/quota/v1/resources_test.go @@ -17,6 +17,7 @@ limitations under the License. package quota import ( + "reflect" "testing" corev1 "k8s.io/api/core/v1" @@ -319,3 +320,93 @@ func TestIsNegative(t *testing.T) { } } } + +func TestIntersection(t *testing.T) { + testCases := map[string]struct { + a []corev1.ResourceName + b []corev1.ResourceName + expected []corev1.ResourceName + }{ + "empty": { + a: []corev1.ResourceName{}, + b: []corev1.ResourceName{}, + expected: []corev1.ResourceName{}, + }, + "equal": { + a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + b: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + expected: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + }, + "a has extra": { + a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + b: []corev1.ResourceName{corev1.ResourceCPU}, + expected: []corev1.ResourceName{corev1.ResourceCPU}, + }, + "b has extra": { + a: []corev1.ResourceName{corev1.ResourceCPU}, + b: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + expected: []corev1.ResourceName{corev1.ResourceCPU}, + }, + "dedupes": { + a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceCPU, corev1.ResourceMemory, corev1.ResourceMemory}, + b: []corev1.ResourceName{corev1.ResourceCPU}, + expected: []corev1.ResourceName{corev1.ResourceCPU}, + }, + "sorts": { + a: []corev1.ResourceName{corev1.ResourceMemory, corev1.ResourceMemory, corev1.ResourceCPU, corev1.ResourceCPU}, + b: []corev1.ResourceName{corev1.ResourceMemory, corev1.ResourceMemory, corev1.ResourceCPU, corev1.ResourceCPU}, + expected: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + }, + } + for testName, testCase := range testCases { + actual := Intersection(testCase.a, testCase.b) + if !reflect.DeepEqual(actual, testCase.expected) { + t.Errorf("%s expected: %#v, actual: %#v", testName, testCase.expected, actual) + } + } +} + +func TestDifference(t *testing.T) { + testCases := map[string]struct { + a []corev1.ResourceName + b []corev1.ResourceName + expected []corev1.ResourceName + }{ + "empty": { + a: []corev1.ResourceName{}, + b: []corev1.ResourceName{}, + expected: []corev1.ResourceName{}, + }, + "equal": { + a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + b: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + expected: []corev1.ResourceName{}, + }, + "a has extra": { + a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + b: []corev1.ResourceName{corev1.ResourceCPU}, + expected: []corev1.ResourceName{corev1.ResourceMemory}, + }, + "b has extra": { + a: []corev1.ResourceName{corev1.ResourceCPU}, + b: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + expected: []corev1.ResourceName{}, + }, + "dedupes": { + a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceCPU, corev1.ResourceMemory, corev1.ResourceMemory}, + b: []corev1.ResourceName{corev1.ResourceCPU}, + expected: []corev1.ResourceName{corev1.ResourceMemory}, + }, + "sorts": { + a: []corev1.ResourceName{corev1.ResourceMemory, corev1.ResourceMemory, corev1.ResourceCPU, corev1.ResourceCPU}, + b: []corev1.ResourceName{}, + expected: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}, + }, + } + for testName, testCase := range testCases { + actual := Difference(testCase.a, testCase.b) + if !reflect.DeepEqual(actual, testCase.expected) { + t.Errorf("%s expected: %#v, actual: %#v", testName, testCase.expected, actual) + } + } +} From bef996d0a4e7a8ca887f1a6aa8165daf200fa016 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 13 Mar 2019 20:53:16 -0700 Subject: [PATCH 9/9] Only reject quota admission if status is missing relevant usage --- .../admission/resourcequota/admission_test.go | 21 ++++++++++++++++++- .../pkg/admission/resourcequota/controller.go | 12 +++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 5de5a0eab4..8e0600c75a 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -1098,10 +1098,12 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { func TestHasUsageStats(t *testing.T) { testCases := map[string]struct { a corev1.ResourceQuota + relevant []corev1.ResourceName expected bool }{ "empty": { a: corev1.ResourceQuota{Status: corev1.ResourceQuotaStatus{Hard: corev1.ResourceList{}}}, + relevant: []corev1.ResourceName{corev1.ResourceMemory}, expected: true, }, "hard-only": { @@ -1113,6 +1115,7 @@ func TestHasUsageStats(t *testing.T) { Used: corev1.ResourceList{}, }, }, + relevant: []corev1.ResourceName{corev1.ResourceMemory}, expected: false, }, "hard-used": { @@ -1126,11 +1129,27 @@ func TestHasUsageStats(t *testing.T) { }, }, }, + relevant: []corev1.ResourceName{corev1.ResourceMemory}, + expected: true, + }, + "hard-used-relevant": { + a: corev1.ResourceQuota{ + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("1Gi"), + corev1.ResourcePods: resource.MustParse("1"), + }, + Used: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("500Mi"), + }, + }, + }, + relevant: []corev1.ResourceName{corev1.ResourceMemory}, expected: true, }, } for testName, testCase := range testCases { - if result := hasUsageStats(&testCase.a); result != testCase.expected { + if result := hasUsageStats(&testCase.a, testCase.relevant); result != testCase.expected { t.Errorf("%s expected: %v, actual: %v", testName, testCase.expected, result) } } diff --git a/plugin/pkg/admission/resourcequota/controller.go b/plugin/pkg/admission/resourcequota/controller.go index 1c109d2713..688e2bace1 100644 --- a/plugin/pkg/admission/resourcequota/controller.go +++ b/plugin/pkg/admission/resourcequota/controller.go @@ -460,8 +460,8 @@ func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluat if err := evaluator.Constraints(restrictedResources, inputObject); err != nil { return nil, admission.NewForbidden(a, fmt.Errorf("failed quota: %s: %v", resourceQuota.Name, err)) } - if !hasUsageStats(&resourceQuota) { - return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name)) + if !hasUsageStats(&resourceQuota, restrictedResources) { + return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s, resources: %s", resourceQuota.Name, prettyPrintResourceNames(restrictedResources))) } interestingQuotaIndexes = append(interestingQuotaIndexes, i) localRestrictedResourcesSet := quota.ToSet(restrictedResources) @@ -702,9 +702,13 @@ func prettyPrintResourceNames(a []corev1.ResourceName) string { return strings.Join(values, ",") } -// hasUsageStats returns true if for each hard constraint there is a value for its current usage -func hasUsageStats(resourceQuota *corev1.ResourceQuota) bool { +// hasUsageStats returns true if for each hard constraint in interestingResources there is a value for its current usage +func hasUsageStats(resourceQuota *corev1.ResourceQuota, interestingResources []corev1.ResourceName) bool { + interestingSet := quota.ToSet(interestingResources) for resourceName := range resourceQuota.Status.Hard { + if !interestingSet.Has(string(resourceName)) { + continue + } if _, found := resourceQuota.Status.Used[resourceName]; !found { return false }