Merge pull request #74747 from liggitt/quota-deadlock

quota controller fixes
k3s-v1.15.3
Kubernetes Prow Robot 2019-03-27 09:04:48 -07:00 committed by GitHub
commit a8cbb22506
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1304 additions and 42 deletions

View File

@ -873,7 +873,6 @@ func TestGarbageCollectorSync(t *testing.T) {
go gc.Sync(fakeDiscoveryClient, 200*time.Millisecond, stopCh)
// Wait until the sync discovers the initial resources
fmt.Printf("Test output")
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)

View File

@ -29,6 +29,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
@ -53,12 +54,14 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],

View File

@ -30,7 +30,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
@ -320,12 +322,12 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQuota) (err error) {
// quota is dirty if any part of spec hard limits differs from the status hard limits
dirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
// dirty tracks if the usage status differs from the previous sync,
// if so, we send a new usage with latest status
// if this is our first sync, it will be dirty by default, since we need track usage
dirty = dirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
used := v1.ResourceList{}
if resourceQuota.Status.Used != nil {
@ -333,9 +335,12 @@ func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQ
}
hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard)
errors := []error{}
newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector)
if err != nil {
return err
// if err is non-nil, remember it to return, but continue updating status with any resources in newUsage
errors = append(errors, err)
}
for key, value := range newUsage {
used[key] = value
@ -358,9 +363,11 @@ func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota *v1.ResourceQ
// there was a change observed by this controller that requires we update quota
if dirty {
_, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(usage)
return err
if err != nil {
errors = append(errors, err)
}
}
return nil
return utilerrors.NewAggregate(errors)
}
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
@ -423,26 +430,66 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
return
}
// Something has changed, so track the new state and perform a sync.
klog.V(2).Infof("syncing resource quota controller with updated resources from discovery: %v", newResources)
oldResources = newResources
// Ensure workers are paused to avoid processing events before informers
// have resynced.
rq.workerLock.Lock()
defer rq.workerLock.Unlock()
// Something has changed, so track the new state and perform a sync.
if klog.V(2) {
klog.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources))
}
// Perform the monitor resync and wait for controllers to report cache sync.
if err := rq.resyncMonitors(newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return
}
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
// wait for caches to fill for a while (our sync period).
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
return
}
// success, remember newly synced resources
oldResources = newResources
klog.V(2).Infof("synced quota controller")
}, period, stopCh)
}
// printDiff returns a human-readable summary of what resources were added and removed
func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
removed := sets.NewString()
for oldResource := range oldResources {
if _, ok := newResources[oldResource]; !ok {
removed.Insert(fmt.Sprintf("%+v", oldResource))
}
}
added := sets.NewString()
for newResource := range newResources {
if _, ok := oldResources[newResource]; !ok {
added.Insert(fmt.Sprintf("%+v", newResource))
}
}
return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
}
// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
stopChWithTimeout := make(chan struct{})
go func() {
defer close(stopChWithTimeout)
select {
case <-stopCh:
case <-time.After(timeout):
}
}()
return stopChWithTimeout
}
// resyncMonitors starts or stops quota monitors as needed to ensure that all
// (and only) those resources present in the map are monitored.
func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {

View File

@ -18,18 +18,24 @@ package resourcequota
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller"
@ -78,12 +84,29 @@ func newGenericLister(groupResource schema.GroupResource, items []runtime.Object
return cache.NewGenericLister(store, groupResource)
}
func newErrorLister() cache.GenericLister {
return errorLister{}
}
type errorLister struct {
}
func (errorLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
return nil, fmt.Errorf("error listing")
}
func (errorLister) Get(name string) (runtime.Object, error) {
return nil, fmt.Errorf("error getting")
}
func (errorLister) ByNamespace(namespace string) cache.GenericNamespaceLister {
return errorLister{}
}
type quotaController struct {
*ResourceQuotaController
stop chan struct{}
}
func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc) quotaController {
func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc, discoveryFunc NamespacedResourcesFunc) quotaController {
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
quotaConfiguration := install.NewQuotaConfigurationForControllers(lister)
alwaysStarted := make(chan struct{})
@ -94,9 +117,10 @@ func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister
ResyncPeriod: controller.NoResyncPeriodFunc,
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
DiscoveryFunc: mockDiscoveryFunc,
DiscoveryFunc: discoveryFunc,
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
InformersStarted: alwaysStarted,
InformerFactory: informerFactory,
}
qc, err := NewResourceQuotaController(resourceQuotaControllerOptions)
if err != nil {
@ -199,9 +223,11 @@ func newTestPodsWithPriorityClasses() []runtime.Object {
func TestSyncResourceQuota(t *testing.T) {
testCases := map[string]struct {
gvr schema.GroupVersionResource
errorGVR schema.GroupVersionResource
items []runtime.Object
quota v1.ResourceQuota
status v1.ResourceQuotaStatus
expectedError string
expectedActionSet sets.String
}{
"non-matching-best-effort-scoped-quota": {
@ -693,18 +719,75 @@ func TestSyncResourceQuota(t *testing.T) {
expectedActionSet: sets.NewString(),
items: []runtime.Object{},
},
"quota-missing-status-with-calculation-error": {
errorGVR: v1.SchemeGroupVersion.WithResource("pods"),
quota: v1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: v1.ResourceQuotaSpec{
Hard: v1.ResourceList{
v1.ResourcePods: resource.MustParse("1"),
},
},
Status: v1.ResourceQuotaStatus{},
},
status: v1.ResourceQuotaStatus{
Hard: v1.ResourceList{
v1.ResourcePods: resource.MustParse("1"),
},
},
expectedError: "error listing",
expectedActionSet: sets.NewString("update-resourcequotas-status"),
items: []runtime.Object{},
},
"quota-missing-status-with-partial-calculation-error": {
gvr: v1.SchemeGroupVersion.WithResource("configmaps"),
errorGVR: v1.SchemeGroupVersion.WithResource("pods"),
quota: v1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: v1.ResourceQuotaSpec{
Hard: v1.ResourceList{
v1.ResourcePods: resource.MustParse("1"),
v1.ResourceConfigMaps: resource.MustParse("1"),
},
},
Status: v1.ResourceQuotaStatus{},
},
status: v1.ResourceQuotaStatus{
Hard: v1.ResourceList{
v1.ResourcePods: resource.MustParse("1"),
v1.ResourceConfigMaps: resource.MustParse("1"),
},
Used: v1.ResourceList{
v1.ResourceConfigMaps: resource.MustParse("0"),
},
},
expectedError: "error listing",
expectedActionSet: sets.NewString("update-resourcequotas-status"),
items: []runtime.Object{},
},
}
for testName, testCase := range testCases {
kubeClient := fake.NewSimpleClientset(&testCase.quota)
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items),
testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items),
testCase.errorGVR: newErrorLister(),
}
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig))
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc)
defer close(qc.stop)
if err := qc.syncResourceQuota(&testCase.quota); err != nil {
t.Fatalf("test: %s, unexpected error: %v", testName, err)
if len(testCase.expectedError) == 0 || !strings.Contains(err.Error(), testCase.expectedError) {
t.Fatalf("test: %s, unexpected error: %v", testName, err)
}
} else if len(testCase.expectedError) > 0 {
t.Fatalf("test: %s, expected error %q, got none", testName, testCase.expectedError)
}
actionSet := sets.NewString()
@ -715,8 +798,17 @@ func TestSyncResourceQuota(t *testing.T) {
t.Errorf("test: %s,\nExpected actions:\n%v\n but got:\n%v\nDifference:\n%v", testName, testCase.expectedActionSet, actionSet, testCase.expectedActionSet.Difference(actionSet))
}
lastActionIndex := len(kubeClient.Actions()) - 1
usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota)
var usage *v1.ResourceQuota
actions := kubeClient.Actions()
for i := len(actions) - 1; i >= 0; i-- {
if updateAction, ok := actions[i].(core.UpdateAction); ok {
usage = updateAction.GetObject().(*v1.ResourceQuota)
break
}
}
if usage == nil {
t.Errorf("test: %s,\nExpected update action usage, got none: actions:\n%v", testName, actions)
}
// ensure usage is as expected
if len(usage.Status.Hard) != len(testCase.status.Hard) {
@ -751,7 +843,7 @@ func TestAddQuota(t *testing.T) {
gvr: newGenericLister(gvr.GroupResource(), newTestPods()),
}
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig))
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), mockDiscoveryFunc)
defer close(qc.stop)
testCases := []struct {
@ -909,3 +1001,252 @@ func TestAddQuota(t *testing.T) {
}
}
}
// TestDiscoverySync ensures that a discovery client error
// will not cause the quota controller to block infinitely.
func TestDiscoverySync(t *testing.T) {
serverResources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
},
},
}
unsyncableServerResources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
{Name: "secrets", Namespaced: true, Kind: "Secret", Verbs: metav1.Verbs{"create", "delete", "list", "watch"}},
},
},
}
fakeDiscoveryClient := &fakeServerResources{
PreferredResources: serverResources,
Error: nil,
Lock: sync.Mutex{},
InterfaceUsedCount: 0,
}
testHandler := &fakeActionHandler{
response: map[string]FakeResponse{
"GET" + "/api/v1/pods": {
200,
[]byte("{}"),
},
"GET" + "/api/v1/secrets": {
404,
[]byte("{}"),
},
},
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
clientConfig.ContentConfig.NegotiatedSerializer = nil
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
pods := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
secrets := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
pods: newGenericLister(pods.GroupResource(), []runtime.Object{}),
secrets: newGenericLister(secrets.GroupResource(), []runtime.Object{}),
}
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig), fakeDiscoveryClient.ServerPreferredNamespacedResources)
defer close(qc.stop)
stopSync := make(chan struct{})
defer close(stopSync)
// The pseudo-code of Sync():
// Sync(client, period, stopCh):
// wait.Until() loops with `period` until the `stopCh` is closed :
// GetQuotableResources()
// resyncMonitors()
// controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced.
//
// Setting the period to 200ms allows the WaitForCacheSync() to check
// for cache sync ~2 times in every wait.Until() loop.
//
// The 1s sleep in the test allows GetQuotableResources and
// resyncMonitors to run ~5 times to ensure the changes to the
// fakeDiscoveryClient are picked up.
go qc.Sync(fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond, stopSync)
// Wait until the sync discovers the initial resources
time.Sleep(1 * time.Second)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to be running but it is blocked: %v", err)
}
// Simulate the discovery client returning an error
fakeDiscoveryClient.setPreferredResources(nil)
fakeDiscoveryClient.setError(fmt.Errorf("Error calling discoveryClient.ServerPreferredResources()"))
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
// Remove the error from being returned and see if the quota sync is still working
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
}
// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources)
fakeDiscoveryClient.setError(nil)
// Wait until sync discovers the change
time.Sleep(1 * time.Second)
// Put the resources back to normal and ensure quota sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)
err = expectSyncNotBlocked(fakeDiscoveryClient, &qc.workerLock)
if err != nil {
t.Fatalf("Expected quotacontroller.Sync to still be running but it is blocked: %v", err)
}
}
// testServerAndClientConfig returns a server that listens and a config that can reference it
func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *rest.Config) {
srv := httptest.NewServer(http.HandlerFunc(handler))
config := &rest.Config{
Host: srv.URL,
}
return srv, config
}
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
before := fakeDiscoveryClient.getInterfaceUsedCount()
t := 1 * time.Second
time.Sleep(t)
after := fakeDiscoveryClient.getInterfaceUsedCount()
if before == after {
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
}
workerLockAcquired := make(chan struct{})
go func() {
workerLock.Lock()
workerLock.Unlock()
close(workerLockAcquired)
}()
select {
case <-workerLockAcquired:
return nil
case <-time.After(t):
return fmt.Errorf("workerLock blocked for at least %v", t)
}
}
type fakeServerResources struct {
PreferredResources []*metav1.APIResourceList
Error error
Lock sync.Mutex
InterfaceUsedCount int
}
func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
return nil, nil
}
func (_ *fakeServerResources) ServerResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}
func (_ *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}
func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.PreferredResources = resources
}
func (f *fakeServerResources) setError(err error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.Error = err
}
func (f *fakeServerResources) getInterfaceUsedCount() int {
f.Lock.Lock()
defer f.Lock.Unlock()
return f.InterfaceUsedCount
}
func (f *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.InterfaceUsedCount++
return f.PreferredResources, f.Error
}
// fakeAction records information about requests to aid in testing.
type fakeAction struct {
method string
path string
query string
}
// String returns method=path to aid in testing
func (f *fakeAction) String() string {
return strings.Join([]string{f.method, f.path}, "=")
}
type FakeResponse struct {
statusCode int
content []byte
}
// fakeActionHandler holds a list of fakeActions received
type fakeActionHandler struct {
// statusCode and content returned by this handler for different method + path.
response map[string]FakeResponse
lock sync.Mutex
actions []fakeAction
}
// ServeHTTP logs the action that occurred and always returns the associated status code
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
func() {
f.lock.Lock()
defer f.lock.Unlock()
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery})
fakeResponse, ok := f.response[request.Method+request.URL.Path]
if !ok {
fakeResponse.statusCode = 200
fakeResponse.content = []byte("{\"kind\": \"List\"}")
}
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(fakeResponse.statusCode)
response.Write(fakeResponse.content)
}()
// This is to allow the fakeActionHandler to simulate a watch being opened
if strings.Contains(request.URL.RawQuery, "watch=true") {
hijacker, ok := response.(http.Hijacker)
if !ok {
return
}
connection, _, err := hijacker.Hijack()
if err != nil {
return
}
defer connection.Close()
time.Sleep(30 * time.Second)
}
}

View File

@ -284,11 +284,13 @@ func (qm *QuotaMonitor) IsSynced() bool {
defer qm.monitorLock.Unlock()
if len(qm.monitors) == 0 {
klog.V(4).Info("quota monitor not synced: no monitors")
return false
}
for _, monitor := range qm.monitors {
for resource, monitor := range qm.monitors {
if !monitor.controller.HasSynced() {
klog.V(4).Infof("quota monitor not synced: %v", resource)
return false
}
}

View File

@ -18,6 +18,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",

View File

@ -1,9 +1,6 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@ -38,3 +35,14 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["evaluator_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -18,6 +18,7 @@ package generic
import (
"fmt"
"sync/atomic"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -33,17 +34,83 @@ import (
// InformerForResourceFunc knows how to provision an informer
type InformerForResourceFunc func(schema.GroupVersionResource) (informers.GenericInformer, error)
// ListerFuncForResourceFunc knows how to provision a lister from an informer func
// ListerFuncForResourceFunc knows how to provision a lister from an informer func.
// The lister returns errors until the informer has synced.
func ListerFuncForResourceFunc(f InformerForResourceFunc) quota.ListerForResourceFunc {
return func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
informer, err := f(gvr)
if err != nil {
return nil, err
}
return informer.Lister(), nil
return &protectedLister{
hasSynced: cachedHasSynced(informer.Informer().HasSynced),
notReadyErr: fmt.Errorf("%v not yet synced", gvr),
delegate: informer.Lister(),
}, nil
}
}
// cachedHasSynced returns a function that calls hasSynced() until it returns true once, then returns true
func cachedHasSynced(hasSynced func() bool) func() bool {
cache := &atomic.Value{}
cache.Store(false)
return func() bool {
if cache.Load().(bool) {
// short-circuit if already synced
return true
}
if hasSynced() {
// remember we synced
cache.Store(true)
return true
}
return false
}
}
// protectedLister returns notReadyError if hasSynced returns false, otherwise delegates to delegate
type protectedLister struct {
hasSynced func() bool
notReadyErr error
delegate cache.GenericLister
}
func (p *protectedLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
if !p.hasSynced() {
return nil, p.notReadyErr
}
return p.delegate.List(selector)
}
func (p *protectedLister) Get(name string) (runtime.Object, error) {
if !p.hasSynced() {
return nil, p.notReadyErr
}
return p.delegate.Get(name)
}
func (p *protectedLister) ByNamespace(namespace string) cache.GenericNamespaceLister {
return &protectedNamespaceLister{p.hasSynced, p.notReadyErr, p.delegate.ByNamespace(namespace)}
}
// protectedNamespaceLister returns notReadyError if hasSynced returns false, otherwise delegates to delegate
type protectedNamespaceLister struct {
hasSynced func() bool
notReadyErr error
delegate cache.GenericNamespaceLister
}
func (p *protectedNamespaceLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
if !p.hasSynced() {
return nil, p.notReadyErr
}
return p.delegate.List(selector)
}
func (p *protectedNamespaceLister) Get(name string) (runtime.Object, error) {
if !p.hasSynced() {
return nil, p.notReadyErr
}
return p.delegate.Get(name)
}
// ListResourceUsingListerFunc returns a listing function based on the shared informer factory for the specified resource.
func ListResourceUsingListerFunc(l quota.ListerForResourceFunc, resource schema.GroupVersionResource) ListFuncByNamespace {
return func(namespace string) ([]runtime.Object, error) {

View File

@ -0,0 +1,131 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"errors"
"testing"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
func TestCachedHasSynced(t *testing.T) {
called := 0
result := false
cachedFunc := cachedHasSynced(func() bool {
called++
return result
})
if cachedFunc() {
t.Fatal("expected false")
}
if called != 1 {
t.Fatalf("expected called=1, got %d", called)
}
if cachedFunc() {
t.Fatal("expected false")
}
if called != 2 {
t.Fatalf("expected called=2, got %d", called)
}
result = true
if !cachedFunc() {
t.Fatal("expected true")
}
if called != 3 {
t.Fatalf("expected called=3, got %d", called)
}
if !cachedFunc() {
t.Fatal("expected true")
}
if called != 3 {
// no more calls once we return true
t.Fatalf("expected called=3, got %d", called)
}
}
func TestProtectedLister(t *testing.T) {
hasSynced := false
notReadyErr := errors.New("not ready")
fake := &fakeLister{}
l := &protectedLister{
hasSynced: func() bool { return hasSynced },
notReadyErr: notReadyErr,
delegate: fake,
}
if _, err := l.List(nil); err != notReadyErr {
t.Fatalf("expected %v, got %v", notReadyErr, err)
}
if _, err := l.Get(""); err != notReadyErr {
t.Fatalf("expected %v, got %v", notReadyErr, err)
}
if fake.called != 0 {
t.Fatalf("expected called=0, got %d", fake.called)
}
fake.called = 0
hasSynced = true
if _, err := l.List(nil); err != errFakeLister {
t.Fatalf("expected %v, got %v", errFakeLister, err)
}
if _, err := l.Get(""); err != errFakeLister {
t.Fatalf("expected %v, got %v", errFakeLister, err)
}
if fake.called != 2 {
t.Fatalf("expected called=2, got %d", fake.called)
}
fake.called = 0
hasSynced = false
if _, err := l.List(nil); err != notReadyErr {
t.Fatalf("expected %v, got %v", notReadyErr, err)
}
if _, err := l.Get(""); err != notReadyErr {
t.Fatalf("expected %v, got %v", notReadyErr, err)
}
if fake.called != 0 {
t.Fatalf("expected called=2, got %d", fake.called)
}
}
var errFakeLister = errors.New("errFakeLister")
type fakeLister struct {
called int
}
func (f *fakeLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
f.called++
return nil, errFakeLister
}
func (f *fakeLister) Get(name string) (runtime.Object, error) {
f.called++
return nil, errFakeLister
}
func (f *fakeLister) ByNamespace(namespace string) cache.GenericNamespaceLister {
panic("not implemented")
}

View File

@ -17,10 +17,12 @@ limitations under the License.
package quota
import (
"sort"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
)
@ -186,7 +188,12 @@ func ResourceNames(resources corev1.ResourceList) []corev1.ResourceName {
// Contains returns true if the specified item is in the list of items
func Contains(items []corev1.ResourceName, item corev1.ResourceName) bool {
return ToSet(items).Has(string(item))
for _, i := range items {
if i == item {
return true
}
}
return false
}
// ContainsPrefix returns true if the specified item has a prefix that contained in given prefix Set
@ -199,15 +206,32 @@ func ContainsPrefix(prefixSet []string, item corev1.ResourceName) bool {
return false
}
// Intersection returns the intersection of both list of resources
// Intersection returns the intersection of both list of resources, deduped and sorted
func Intersection(a []corev1.ResourceName, b []corev1.ResourceName) []corev1.ResourceName {
setA := ToSet(a)
setB := ToSet(b)
setC := setA.Intersection(setB)
result := []corev1.ResourceName{}
for _, resourceName := range setC.List() {
result = append(result, corev1.ResourceName(resourceName))
result := make([]corev1.ResourceName, 0, len(a))
for _, item := range a {
if Contains(result, item) {
continue
}
if !Contains(b, item) {
continue
}
result = append(result, item)
}
sort.Slice(result, func(i, j int) bool { return result[i] < result[j] })
return result
}
// Difference returns the list of resources resulting from a-b, deduped and sorted
func Difference(a []corev1.ResourceName, b []corev1.ResourceName) []corev1.ResourceName {
result := make([]corev1.ResourceName, 0, len(a))
for _, item := range a {
if Contains(b, item) || Contains(result, item) {
continue
}
result = append(result, item)
}
sort.Slice(result, func(i, j int) bool { return result[i] < result[j] })
return result
}
@ -243,7 +267,8 @@ func ToSet(resourceNames []corev1.ResourceName) sets.String {
return result
}
// CalculateUsage calculates and returns the requested ResourceList usage
// CalculateUsage calculates and returns the requested ResourceList usage.
// If an error is returned, usage only contains the resources which encountered no calculation errors.
func CalculateUsage(namespaceName string, scopes []corev1.ResourceQuotaScope, hardLimits corev1.ResourceList, registry Registry, scopeSelector *corev1.ScopeSelector) (corev1.ResourceList, error) {
// find the intersection between the hard resources on the quota
// and the resources this controller can track to know what we can
@ -257,6 +282,8 @@ func CalculateUsage(namespaceName string, scopes []corev1.ResourceQuotaScope, ha
// NOTE: the intersection just removes duplicates since the evaluator match intersects with hard
matchedResources := Intersection(hardResources, potentialResources)
errors := []error{}
// sum the observed usage from each evaluator
newUsage := corev1.ResourceList{}
for _, evaluator := range evaluators {
@ -269,7 +296,11 @@ func CalculateUsage(namespaceName string, scopes []corev1.ResourceQuotaScope, ha
usageStatsOptions := UsageStatsOptions{Namespace: namespaceName, Scopes: scopes, Resources: intersection, ScopeSelector: scopeSelector}
stats, err := evaluator.UsageStats(usageStatsOptions)
if err != nil {
return nil, err
// remember the error
errors = append(errors, err)
// exclude resources which encountered calculation errors
matchedResources = Difference(matchedResources, intersection)
continue
}
newUsage = Add(newUsage, stats.Used)
}
@ -278,5 +309,5 @@ func CalculateUsage(namespaceName string, scopes []corev1.ResourceQuotaScope, ha
// merge our observed usage with the quota usage status
// if the new usage is different than the last usage, we will need to do an update
newUsage = Mask(newUsage, matchedResources)
return newUsage, nil
return newUsage, utilerrors.NewAggregate(errors)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package quota
import (
"reflect"
"testing"
corev1 "k8s.io/api/core/v1"
@ -319,3 +320,93 @@ func TestIsNegative(t *testing.T) {
}
}
}
func TestIntersection(t *testing.T) {
testCases := map[string]struct {
a []corev1.ResourceName
b []corev1.ResourceName
expected []corev1.ResourceName
}{
"empty": {
a: []corev1.ResourceName{},
b: []corev1.ResourceName{},
expected: []corev1.ResourceName{},
},
"equal": {
a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
b: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
expected: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
},
"a has extra": {
a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
b: []corev1.ResourceName{corev1.ResourceCPU},
expected: []corev1.ResourceName{corev1.ResourceCPU},
},
"b has extra": {
a: []corev1.ResourceName{corev1.ResourceCPU},
b: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
expected: []corev1.ResourceName{corev1.ResourceCPU},
},
"dedupes": {
a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceCPU, corev1.ResourceMemory, corev1.ResourceMemory},
b: []corev1.ResourceName{corev1.ResourceCPU},
expected: []corev1.ResourceName{corev1.ResourceCPU},
},
"sorts": {
a: []corev1.ResourceName{corev1.ResourceMemory, corev1.ResourceMemory, corev1.ResourceCPU, corev1.ResourceCPU},
b: []corev1.ResourceName{corev1.ResourceMemory, corev1.ResourceMemory, corev1.ResourceCPU, corev1.ResourceCPU},
expected: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
},
}
for testName, testCase := range testCases {
actual := Intersection(testCase.a, testCase.b)
if !reflect.DeepEqual(actual, testCase.expected) {
t.Errorf("%s expected: %#v, actual: %#v", testName, testCase.expected, actual)
}
}
}
func TestDifference(t *testing.T) {
testCases := map[string]struct {
a []corev1.ResourceName
b []corev1.ResourceName
expected []corev1.ResourceName
}{
"empty": {
a: []corev1.ResourceName{},
b: []corev1.ResourceName{},
expected: []corev1.ResourceName{},
},
"equal": {
a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
b: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
expected: []corev1.ResourceName{},
},
"a has extra": {
a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
b: []corev1.ResourceName{corev1.ResourceCPU},
expected: []corev1.ResourceName{corev1.ResourceMemory},
},
"b has extra": {
a: []corev1.ResourceName{corev1.ResourceCPU},
b: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
expected: []corev1.ResourceName{},
},
"dedupes": {
a: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceCPU, corev1.ResourceMemory, corev1.ResourceMemory},
b: []corev1.ResourceName{corev1.ResourceCPU},
expected: []corev1.ResourceName{corev1.ResourceMemory},
},
"sorts": {
a: []corev1.ResourceName{corev1.ResourceMemory, corev1.ResourceMemory, corev1.ResourceCPU, corev1.ResourceCPU},
b: []corev1.ResourceName{},
expected: []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory},
},
}
for testName, testCase := range testCases {
actual := Difference(testCase.a, testCase.b)
if !reflect.DeepEqual(actual, testCase.expected) {
t.Errorf("%s expected: %#v, actual: %#v", testName, testCase.expected, actual)
}
}
}

View File

@ -1098,10 +1098,12 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
func TestHasUsageStats(t *testing.T) {
testCases := map[string]struct {
a corev1.ResourceQuota
relevant []corev1.ResourceName
expected bool
}{
"empty": {
a: corev1.ResourceQuota{Status: corev1.ResourceQuotaStatus{Hard: corev1.ResourceList{}}},
relevant: []corev1.ResourceName{corev1.ResourceMemory},
expected: true,
},
"hard-only": {
@ -1113,6 +1115,7 @@ func TestHasUsageStats(t *testing.T) {
Used: corev1.ResourceList{},
},
},
relevant: []corev1.ResourceName{corev1.ResourceMemory},
expected: false,
},
"hard-used": {
@ -1126,11 +1129,27 @@ func TestHasUsageStats(t *testing.T) {
},
},
},
relevant: []corev1.ResourceName{corev1.ResourceMemory},
expected: true,
},
"hard-used-relevant": {
a: corev1.ResourceQuota{
Status: corev1.ResourceQuotaStatus{
Hard: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("1Gi"),
corev1.ResourcePods: resource.MustParse("1"),
},
Used: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("500Mi"),
},
},
},
relevant: []corev1.ResourceName{corev1.ResourceMemory},
expected: true,
},
}
for testName, testCase := range testCases {
if result := hasUsageStats(&testCase.a); result != testCase.expected {
if result := hasUsageStats(&testCase.a, testCase.relevant); result != testCase.expected {
t.Errorf("%s expected: %v, actual: %v", testName, testCase.expected, result)
}
}

View File

@ -460,8 +460,8 @@ func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluat
if err := evaluator.Constraints(restrictedResources, inputObject); err != nil {
return nil, admission.NewForbidden(a, fmt.Errorf("failed quota: %s: %v", resourceQuota.Name, err))
}
if !hasUsageStats(&resourceQuota) {
return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name))
if !hasUsageStats(&resourceQuota, restrictedResources) {
return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s, resources: %s", resourceQuota.Name, prettyPrintResourceNames(restrictedResources)))
}
interestingQuotaIndexes = append(interestingQuotaIndexes, i)
localRestrictedResourcesSet := quota.ToSet(restrictedResources)
@ -702,9 +702,13 @@ func prettyPrintResourceNames(a []corev1.ResourceName) string {
return strings.Join(values, ",")
}
// hasUsageStats returns true if for each hard constraint there is a value for its current usage
func hasUsageStats(resourceQuota *corev1.ResourceQuota) bool {
// hasUsageStats returns true if for each hard constraint in interestingResources there is a value for its current usage
func hasUsageStats(resourceQuota *corev1.ResourceQuota, interestingResources []corev1.ResourceName) bool {
interestingSet := quota.ToSet(interestingResources)
for resourceName := range resourceQuota.Status.Hard {
if !interestingSet.Has(string(resourceName)) {
continue
}
if _, found := resourceQuota.Status.Used[resourceName]; !found {
return false
}

View File

@ -1186,6 +1186,7 @@ type PodSecurityPolicyList struct {
Items []PodSecurityPolicy `json:"items" protobuf:"bytes,2,rep,name=items"`
}
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// DEPRECATED 1.9 - This group version of NetworkPolicy is deprecated by networking/v1/NetworkPolicy.

View File

@ -12,6 +12,7 @@ go_library(
"deployment.go",
"ingress.go",
"interface.go",
"networkpolicy.go",
"podsecuritypolicy.go",
"replicaset.go",
],

View File

@ -30,6 +30,8 @@ type Interface interface {
Deployments() DeploymentInformer
// Ingresses returns a IngressInformer.
Ingresses() IngressInformer
// NetworkPolicies returns a NetworkPolicyInformer.
NetworkPolicies() NetworkPolicyInformer
// PodSecurityPolicies returns a PodSecurityPolicyInformer.
PodSecurityPolicies() PodSecurityPolicyInformer
// ReplicaSets returns a ReplicaSetInformer.
@ -62,6 +64,11 @@ func (v *version) Ingresses() IngressInformer {
return &ingressInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// NetworkPolicies returns a NetworkPolicyInformer.
func (v *version) NetworkPolicies() NetworkPolicyInformer {
return &networkPolicyInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// PodSecurityPolicies returns a PodSecurityPolicyInformer.
func (v *version) PodSecurityPolicies() PodSecurityPolicyInformer {
return &podSecurityPolicyInformer{factory: v.factory, tweakListOptions: v.tweakListOptions}

View File

@ -0,0 +1,89 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by informer-gen. DO NOT EDIT.
package v1beta1
import (
time "time"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
internalinterfaces "k8s.io/client-go/informers/internalinterfaces"
kubernetes "k8s.io/client-go/kubernetes"
v1beta1 "k8s.io/client-go/listers/extensions/v1beta1"
cache "k8s.io/client-go/tools/cache"
)
// NetworkPolicyInformer provides access to a shared informer and lister for
// NetworkPolicies.
type NetworkPolicyInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1beta1.NetworkPolicyLister
}
type networkPolicyInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// NewNetworkPolicyInformer constructs a new informer for NetworkPolicy type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewNetworkPolicyInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredNetworkPolicyInformer(client, namespace, resyncPeriod, indexers, nil)
}
// NewFilteredNetworkPolicyInformer constructs a new informer for NetworkPolicy type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredNetworkPolicyInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ExtensionsV1beta1().NetworkPolicies(namespace).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ExtensionsV1beta1().NetworkPolicies(namespace).Watch(options)
},
},
&extensionsv1beta1.NetworkPolicy{},
resyncPeriod,
indexers,
)
}
func (f *networkPolicyInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredNetworkPolicyInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *networkPolicyInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&extensionsv1beta1.NetworkPolicy{}, f.defaultInformer)
}
func (f *networkPolicyInformer) Lister() v1beta1.NetworkPolicyLister {
return v1beta1.NewNetworkPolicyLister(f.Informer().GetIndexer())
}

View File

@ -206,6 +206,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource
return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1beta1().Deployments().Informer()}, nil
case extensionsv1beta1.SchemeGroupVersion.WithResource("ingresses"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1beta1().Ingresses().Informer()}, nil
case extensionsv1beta1.SchemeGroupVersion.WithResource("networkpolicies"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1beta1().NetworkPolicies().Informer()}, nil
case extensionsv1beta1.SchemeGroupVersion.WithResource("podsecuritypolicies"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1beta1().PodSecurityPolicies().Informer()}, nil
case extensionsv1beta1.SchemeGroupVersion.WithResource("replicasets"):

View File

@ -15,6 +15,7 @@ go_library(
"extensions_client.go",
"generated_expansion.go",
"ingress.go",
"networkpolicy.go",
"podsecuritypolicy.go",
"replicaset.go",
],

View File

@ -30,6 +30,7 @@ type ExtensionsV1beta1Interface interface {
DaemonSetsGetter
DeploymentsGetter
IngressesGetter
NetworkPoliciesGetter
PodSecurityPoliciesGetter
ReplicaSetsGetter
}
@ -51,6 +52,10 @@ func (c *ExtensionsV1beta1Client) Ingresses(namespace string) IngressInterface {
return newIngresses(c, namespace)
}
func (c *ExtensionsV1beta1Client) NetworkPolicies(namespace string) NetworkPolicyInterface {
return newNetworkPolicies(c, namespace)
}
func (c *ExtensionsV1beta1Client) PodSecurityPolicies() PodSecurityPolicyInterface {
return newPodSecurityPolicies(c)
}

View File

@ -14,6 +14,7 @@ go_library(
"fake_deployment_expansion.go",
"fake_extensions_client.go",
"fake_ingress.go",
"fake_networkpolicy.go",
"fake_podsecuritypolicy.go",
"fake_replicaset.go",
],

View File

@ -40,6 +40,10 @@ func (c *FakeExtensionsV1beta1) Ingresses(namespace string) v1beta1.IngressInter
return &FakeIngresses{c, namespace}
}
func (c *FakeExtensionsV1beta1) NetworkPolicies(namespace string) v1beta1.NetworkPolicyInterface {
return &FakeNetworkPolicies{c, namespace}
}
func (c *FakeExtensionsV1beta1) PodSecurityPolicies() v1beta1.PodSecurityPolicyInterface {
return &FakePodSecurityPolicies{c}
}

View File

@ -0,0 +1,128 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
v1beta1 "k8s.io/api/extensions/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
schema "k8s.io/apimachinery/pkg/runtime/schema"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
)
// FakeNetworkPolicies implements NetworkPolicyInterface
type FakeNetworkPolicies struct {
Fake *FakeExtensionsV1beta1
ns string
}
var networkpoliciesResource = schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "networkpolicies"}
var networkpoliciesKind = schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "NetworkPolicy"}
// Get takes name of the networkPolicy, and returns the corresponding networkPolicy object, and an error if there is any.
func (c *FakeNetworkPolicies) Get(name string, options v1.GetOptions) (result *v1beta1.NetworkPolicy, err error) {
obj, err := c.Fake.
Invokes(testing.NewGetAction(networkpoliciesResource, c.ns, name), &v1beta1.NetworkPolicy{})
if obj == nil {
return nil, err
}
return obj.(*v1beta1.NetworkPolicy), err
}
// List takes label and field selectors, and returns the list of NetworkPolicies that match those selectors.
func (c *FakeNetworkPolicies) List(opts v1.ListOptions) (result *v1beta1.NetworkPolicyList, err error) {
obj, err := c.Fake.
Invokes(testing.NewListAction(networkpoliciesResource, networkpoliciesKind, c.ns, opts), &v1beta1.NetworkPolicyList{})
if obj == nil {
return nil, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &v1beta1.NetworkPolicyList{ListMeta: obj.(*v1beta1.NetworkPolicyList).ListMeta}
for _, item := range obj.(*v1beta1.NetworkPolicyList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Watch returns a watch.Interface that watches the requested networkPolicies.
func (c *FakeNetworkPolicies) Watch(opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewWatchAction(networkpoliciesResource, c.ns, opts))
}
// Create takes the representation of a networkPolicy and creates it. Returns the server's representation of the networkPolicy, and an error, if there is any.
func (c *FakeNetworkPolicies) Create(networkPolicy *v1beta1.NetworkPolicy) (result *v1beta1.NetworkPolicy, err error) {
obj, err := c.Fake.
Invokes(testing.NewCreateAction(networkpoliciesResource, c.ns, networkPolicy), &v1beta1.NetworkPolicy{})
if obj == nil {
return nil, err
}
return obj.(*v1beta1.NetworkPolicy), err
}
// Update takes the representation of a networkPolicy and updates it. Returns the server's representation of the networkPolicy, and an error, if there is any.
func (c *FakeNetworkPolicies) Update(networkPolicy *v1beta1.NetworkPolicy) (result *v1beta1.NetworkPolicy, err error) {
obj, err := c.Fake.
Invokes(testing.NewUpdateAction(networkpoliciesResource, c.ns, networkPolicy), &v1beta1.NetworkPolicy{})
if obj == nil {
return nil, err
}
return obj.(*v1beta1.NetworkPolicy), err
}
// Delete takes name of the networkPolicy and deletes it. Returns an error if one occurs.
func (c *FakeNetworkPolicies) Delete(name string, options *v1.DeleteOptions) error {
_, err := c.Fake.
Invokes(testing.NewDeleteAction(networkpoliciesResource, c.ns, name), &v1beta1.NetworkPolicy{})
return err
}
// DeleteCollection deletes a collection of objects.
func (c *FakeNetworkPolicies) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error {
action := testing.NewDeleteCollectionAction(networkpoliciesResource, c.ns, listOptions)
_, err := c.Fake.Invokes(action, &v1beta1.NetworkPolicyList{})
return err
}
// Patch applies the patch and returns the patched networkPolicy.
func (c *FakeNetworkPolicies) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.NetworkPolicy, err error) {
obj, err := c.Fake.
Invokes(testing.NewPatchSubresourceAction(networkpoliciesResource, c.ns, name, pt, data, subresources...), &v1beta1.NetworkPolicy{})
if obj == nil {
return nil, err
}
return obj.(*v1beta1.NetworkPolicy), err
}

View File

@ -22,6 +22,8 @@ type DaemonSetExpansion interface{}
type IngressExpansion interface{}
type NetworkPolicyExpansion interface{}
type PodSecurityPolicyExpansion interface{}
type ReplicaSetExpansion interface{}

View File

@ -0,0 +1,174 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package v1beta1
import (
"time"
v1beta1 "k8s.io/api/extensions/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
scheme "k8s.io/client-go/kubernetes/scheme"
rest "k8s.io/client-go/rest"
)
// NetworkPoliciesGetter has a method to return a NetworkPolicyInterface.
// A group's client should implement this interface.
type NetworkPoliciesGetter interface {
NetworkPolicies(namespace string) NetworkPolicyInterface
}
// NetworkPolicyInterface has methods to work with NetworkPolicy resources.
type NetworkPolicyInterface interface {
Create(*v1beta1.NetworkPolicy) (*v1beta1.NetworkPolicy, error)
Update(*v1beta1.NetworkPolicy) (*v1beta1.NetworkPolicy, error)
Delete(name string, options *v1.DeleteOptions) error
DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error
Get(name string, options v1.GetOptions) (*v1beta1.NetworkPolicy, error)
List(opts v1.ListOptions) (*v1beta1.NetworkPolicyList, error)
Watch(opts v1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.NetworkPolicy, err error)
NetworkPolicyExpansion
}
// networkPolicies implements NetworkPolicyInterface
type networkPolicies struct {
client rest.Interface
ns string
}
// newNetworkPolicies returns a NetworkPolicies
func newNetworkPolicies(c *ExtensionsV1beta1Client, namespace string) *networkPolicies {
return &networkPolicies{
client: c.RESTClient(),
ns: namespace,
}
}
// Get takes name of the networkPolicy, and returns the corresponding networkPolicy object, and an error if there is any.
func (c *networkPolicies) Get(name string, options v1.GetOptions) (result *v1beta1.NetworkPolicy, err error) {
result = &v1beta1.NetworkPolicy{}
err = c.client.Get().
Namespace(c.ns).
Resource("networkpolicies").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do().
Into(result)
return
}
// List takes label and field selectors, and returns the list of NetworkPolicies that match those selectors.
func (c *networkPolicies) List(opts v1.ListOptions) (result *v1beta1.NetworkPolicyList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1beta1.NetworkPolicyList{}
err = c.client.Get().
Namespace(c.ns).
Resource("networkpolicies").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do().
Into(result)
return
}
// Watch returns a watch.Interface that watches the requested networkPolicies.
func (c *networkPolicies) Watch(opts v1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("networkpolicies").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch()
}
// Create takes the representation of a networkPolicy and creates it. Returns the server's representation of the networkPolicy, and an error, if there is any.
func (c *networkPolicies) Create(networkPolicy *v1beta1.NetworkPolicy) (result *v1beta1.NetworkPolicy, err error) {
result = &v1beta1.NetworkPolicy{}
err = c.client.Post().
Namespace(c.ns).
Resource("networkpolicies").
Body(networkPolicy).
Do().
Into(result)
return
}
// Update takes the representation of a networkPolicy and updates it. Returns the server's representation of the networkPolicy, and an error, if there is any.
func (c *networkPolicies) Update(networkPolicy *v1beta1.NetworkPolicy) (result *v1beta1.NetworkPolicy, err error) {
result = &v1beta1.NetworkPolicy{}
err = c.client.Put().
Namespace(c.ns).
Resource("networkpolicies").
Name(networkPolicy.Name).
Body(networkPolicy).
Do().
Into(result)
return
}
// Delete takes name of the networkPolicy and deletes it. Returns an error if one occurs.
func (c *networkPolicies) Delete(name string, options *v1.DeleteOptions) error {
return c.client.Delete().
Namespace(c.ns).
Resource("networkpolicies").
Name(name).
Body(options).
Do().
Error()
}
// DeleteCollection deletes a collection of objects.
func (c *networkPolicies) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error {
var timeout time.Duration
if listOptions.TimeoutSeconds != nil {
timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second
}
return c.client.Delete().
Namespace(c.ns).
Resource("networkpolicies").
VersionedParams(&listOptions, scheme.ParameterCodec).
Timeout(timeout).
Body(options).
Do().
Error()
}
// Patch applies the patch and returns the patched networkPolicy.
func (c *networkPolicies) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.NetworkPolicy, err error) {
result = &v1beta1.NetworkPolicy{}
err = c.client.Patch(pt).
Namespace(c.ns).
Resource("networkpolicies").
SubResource(subresources...).
Name(name).
Body(data).
Do().
Into(result)
return
}

View File

@ -15,6 +15,7 @@ go_library(
"deployment_expansion.go",
"expansion_generated.go",
"ingress.go",
"networkpolicy.go",
"podsecuritypolicy.go",
"replicaset.go",
"replicaset_expansion.go",

View File

@ -26,6 +26,14 @@ type IngressListerExpansion interface{}
// IngressNamespaceLister.
type IngressNamespaceListerExpansion interface{}
// NetworkPolicyListerExpansion allows custom methods to be added to
// NetworkPolicyLister.
type NetworkPolicyListerExpansion interface{}
// NetworkPolicyNamespaceListerExpansion allows custom methods to be added to
// NetworkPolicyNamespaceLister.
type NetworkPolicyNamespaceListerExpansion interface{}
// PodSecurityPolicyListerExpansion allows custom methods to be added to
// PodSecurityPolicyLister.
type PodSecurityPolicyListerExpansion interface{}

View File

@ -0,0 +1,94 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by lister-gen. DO NOT EDIT.
package v1beta1
import (
v1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)
// NetworkPolicyLister helps list NetworkPolicies.
type NetworkPolicyLister interface {
// List lists all NetworkPolicies in the indexer.
List(selector labels.Selector) (ret []*v1beta1.NetworkPolicy, err error)
// NetworkPolicies returns an object that can list and get NetworkPolicies.
NetworkPolicies(namespace string) NetworkPolicyNamespaceLister
NetworkPolicyListerExpansion
}
// networkPolicyLister implements the NetworkPolicyLister interface.
type networkPolicyLister struct {
indexer cache.Indexer
}
// NewNetworkPolicyLister returns a new NetworkPolicyLister.
func NewNetworkPolicyLister(indexer cache.Indexer) NetworkPolicyLister {
return &networkPolicyLister{indexer: indexer}
}
// List lists all NetworkPolicies in the indexer.
func (s *networkPolicyLister) List(selector labels.Selector) (ret []*v1beta1.NetworkPolicy, err error) {
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
ret = append(ret, m.(*v1beta1.NetworkPolicy))
})
return ret, err
}
// NetworkPolicies returns an object that can list and get NetworkPolicies.
func (s *networkPolicyLister) NetworkPolicies(namespace string) NetworkPolicyNamespaceLister {
return networkPolicyNamespaceLister{indexer: s.indexer, namespace: namespace}
}
// NetworkPolicyNamespaceLister helps list and get NetworkPolicies.
type NetworkPolicyNamespaceLister interface {
// List lists all NetworkPolicies in the indexer for a given namespace.
List(selector labels.Selector) (ret []*v1beta1.NetworkPolicy, err error)
// Get retrieves the NetworkPolicy from the indexer for a given namespace and name.
Get(name string) (*v1beta1.NetworkPolicy, error)
NetworkPolicyNamespaceListerExpansion
}
// networkPolicyNamespaceLister implements the NetworkPolicyNamespaceLister
// interface.
type networkPolicyNamespaceLister struct {
indexer cache.Indexer
namespace string
}
// List lists all NetworkPolicies in the indexer for a given namespace.
func (s networkPolicyNamespaceLister) List(selector labels.Selector) (ret []*v1beta1.NetworkPolicy, err error) {
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*v1beta1.NetworkPolicy))
})
return ret, err
}
// Get retrieves the NetworkPolicy from the indexer for a given namespace and name.
func (s networkPolicyNamespaceLister) Get(name string) (*v1beta1.NetworkPolicy, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1beta1.Resource("networkpolicy"), name)
}
return obj.(*v1beta1.NetworkPolicy), nil
}