From 7f88e91892c03f06e167adea051781717dd96dd9 Mon Sep 17 00:00:00 2001 From: Derek Carr Date: Fri, 27 Oct 2017 11:07:53 -0400 Subject: [PATCH] Update quota controller to monitor all types --- cmd/kube-controller-manager/app/BUILD | 2 +- cmd/kube-controller-manager/app/core.go | 35 +- pkg/controller/resourcequota/BUILD | 17 +- .../resourcequota/replenishment_controller.go | 233 ------- .../replenishment_controller_test.go | 160 ----- .../resource_quota_controller.go | 159 ++++- .../resource_quota_controller_test.go | 589 +++++++----------- .../resourcequota/resource_quota_monitor.go | 341 ++++++++++ 8 files changed, 729 insertions(+), 807 deletions(-) delete mode 100644 pkg/controller/resourcequota/replenishment_controller.go delete mode 100644 pkg/controller/resourcequota/replenishment_controller_test.go create mode 100644 pkg/controller/resourcequota/resource_quota_monitor.go diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index d5dcfcc003..7bb6183911 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -24,7 +24,6 @@ go_library( importpath = "k8s.io/kubernetes/cmd/kube-controller-manager/app", deps = [ "//cmd/kube-controller-manager/app/options:go_default_library", - "//pkg/api:go_default_library", "//pkg/api/install:go_default_library", "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/apps/install:go_default_library", @@ -78,6 +77,7 @@ go_library( "//pkg/controller/volume/expand:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/features:go_default_library", + "//pkg/quota/generic:go_default_library", "//pkg/quota/install:go_default_library", "//pkg/serviceaccount:go_default_library", "//pkg/util/configz:go_default_library", diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 33e4d2c40a..2ceb9fbc2b 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -36,7 +36,6 @@ import ( cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/controller" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" @@ -55,6 +54,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/expand" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/quota/generic" quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/util/metrics" ) @@ -240,31 +240,34 @@ func startPodGCController(ctx ControllerContext) (bool, error) { func startResourceQuotaController(ctx ControllerContext) (bool, error) { resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") - resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory) - groupKindsToReplenish := []schema.GroupKind{ - api.Kind("Pod"), - api.Kind("Service"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - api.Kind("Secret"), - api.Kind("ConfigMap"), - } + discoveryFunc := resourceQuotaControllerClient.Discovery().ServerPreferredNamespacedResources + listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource) + quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource) + resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ QuotaClient: resourceQuotaControllerClient.CoreV1(), ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(), ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration), - Registry: resourceQuotaRegistry, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory), + InformerFactory: ctx.InformerFactory, ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options), - GroupKindsToReplenish: groupKindsToReplenish, + DiscoveryFunc: discoveryFunc, + IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, + InformersStarted: ctx.InformersStarted, + Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), } if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()) } - go resourcequotacontroller.NewResourceQuotaController( - resourceQuotaControllerOptions, - ).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop) + resourceQuotaController, err := resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions) + if err != nil { + return false, err + } + go resourceQuotaController.Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop) + + // Periodically the quota controller to detect new resource types + go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop) + return true, nil } diff --git a/pkg/controller/resourcequota/BUILD b/pkg/controller/resourcequota/BUILD index a50bca09b6..dd47659ea5 100644 --- a/pkg/controller/resourcequota/BUILD +++ b/pkg/controller/resourcequota/BUILD @@ -10,8 +10,8 @@ go_library( name = "go_default_library", srcs = [ "doc.go", - "replenishment_controller.go", "resource_quota_controller.go", + "resource_quota_monitor.go", ], importpath = "k8s.io/kubernetes/pkg/controller/resourcequota", deps = [ @@ -20,6 +20,7 @@ go_library( "//pkg/controller:go_default_library", "//pkg/quota:go_default_library", "//pkg/quota/evaluator/core:go_default_library", + "//pkg/quota/generic:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", @@ -27,11 +28,12 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", @@ -43,15 +45,12 @@ go_library( go_test( name = "go_default_test", - srcs = [ - "replenishment_controller_test.go", - "resource_quota_controller_test.go", - ], + srcs = ["resource_quota_controller_test.go"], importpath = "k8s.io/kubernetes/pkg/controller/resourcequota", library = ":go_default_library", deps = [ - "//pkg/api:go_default_library", "//pkg/controller:go_default_library", + "//pkg/quota:go_default_library", "//pkg/quota/generic:go_default_library", "//pkg/quota/install:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -59,12 +58,12 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go deleted file mode 100644 index 5f063e6568..0000000000 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ /dev/null @@ -1,233 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcequota - -import ( - "fmt" - - "github.com/golang/glog" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/clock" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/quota/evaluator/core" -) - -// ReplenishmentFunc is a function that is invoked when controller sees a change -// that may require a quota to be replenished (i.e. object deletion, or object moved to terminal state) -type ReplenishmentFunc func(groupKind schema.GroupKind, namespace string, object runtime.Object) - -// ReplenishmentControllerOptions is an options struct that tells a factory -// how to configure a controller that can inform the quota system it should -// replenish quota -type ReplenishmentControllerOptions struct { - // The kind monitored for replenishment - GroupKind schema.GroupKind - // The period that should be used to re-sync the monitored resource - ResyncPeriod controller.ResyncPeriodFunc - // The function to invoke when a change is observed that should trigger - // replenishment - ReplenishmentFunc ReplenishmentFunc -} - -// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not -func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions, clock clock.Clock) func(oldObj, newObj interface{}) { - return func(oldObj, newObj interface{}) { - oldPod := oldObj.(*v1.Pod) - newPod := newObj.(*v1.Pod) - if core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock) { - options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod) - } - } -} - -// ObjectReplenishmentDeleteFunc will replenish on every delete -func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) { - return func(obj interface{}) { - metaObject, err := meta.Accessor(obj) - if err != nil { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod()) - utilruntime.HandleError(err) - return - } - metaObject, err = meta.Accessor(tombstone.Obj) - if err != nil { - glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod()) - utilruntime.HandleError(err) - return - } - } - options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil) - } -} - -// ReplenishmentControllerFactory knows how to build replenishment controllers -type ReplenishmentControllerFactory interface { - // NewController returns a controller configured with the specified options. - // This method is NOT thread-safe. - NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) -} - -// replenishmentControllerFactory implements ReplenishmentControllerFactory -type replenishmentControllerFactory struct { - sharedInformerFactory informers.SharedInformerFactory -} - -// NewReplenishmentControllerFactory returns a factory that knows how to build controllers -// to replenish resources when updated or deleted -func NewReplenishmentControllerFactory(f informers.SharedInformerFactory) ReplenishmentControllerFactory { - return &replenishmentControllerFactory{ - sharedInformerFactory: f, - } -} - -func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) { - var ( - informer informers.GenericInformer - err error - ) - - switch options.GroupKind { - case api.Kind("Pod"): - informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("pods")) - if err != nil { - return nil, err - } - clock := clock.RealClock{} - informer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - UpdateFunc: PodReplenishmentUpdateFunc(options, clock), - DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }, - options.ResyncPeriod(), - ) - case api.Kind("Service"): - informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("services")) - if err != nil { - return nil, err - } - informer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - UpdateFunc: ServiceReplenishmentUpdateFunc(options), - DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }, - options.ResyncPeriod(), - ) - case api.Kind("ReplicationController"): - informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("replicationcontrollers")) - if err != nil { - return nil, err - } - informer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }, - options.ResyncPeriod(), - ) - case api.Kind("PersistentVolumeClaim"): - informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("persistentvolumeclaims")) - if err != nil { - return nil, err - } - informer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }, - options.ResyncPeriod(), - ) - case api.Kind("Secret"): - informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("secrets")) - if err != nil { - return nil, err - } - informer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }, - options.ResyncPeriod(), - ) - case api.Kind("ConfigMap"): - informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("configmaps")) - if err != nil { - return nil, err - } - informer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }, - options.ResyncPeriod(), - ) - default: - return nil, NewUnhandledGroupKindError(options.GroupKind) - } - return informer.Informer().GetController(), nil -} - -// ServiceReplenishmentUpdateFunc will replenish if the service was quota tracked has changed service type -func ServiceReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) { - return func(oldObj, newObj interface{}) { - oldService := oldObj.(*v1.Service) - newService := newObj.(*v1.Service) - if core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService) { - options.ReplenishmentFunc(options.GroupKind, newService.Namespace, nil) - } - } -} - -type unhandledKindErr struct { - kind schema.GroupKind -} - -func (e unhandledKindErr) Error() string { - return fmt.Sprintf("no replenishment controller available for %s", e.kind) -} - -func NewUnhandledGroupKindError(kind schema.GroupKind) error { - return unhandledKindErr{kind: kind} -} - -func IsUnhandledGroupKindError(err error) bool { - if err == nil { - return false - } - _, ok := err.(unhandledKindErr) - return ok -} - -// UnionReplenishmentControllerFactory iterates through its constituent factories ignoring, UnhandledGroupKindErrors -// returning the first success or failure it hits. If there are no hits either way, it return an UnhandledGroupKind error -type UnionReplenishmentControllerFactory []ReplenishmentControllerFactory - -func (f UnionReplenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) { - for _, factory := range f { - controller, err := factory.NewController(options) - if !IsUnhandledGroupKindError(err) { - return controller, err - } - } - - return nil, NewUnhandledGroupKindError(options.GroupKind) -} diff --git a/pkg/controller/resourcequota/replenishment_controller_test.go b/pkg/controller/resourcequota/replenishment_controller_test.go deleted file mode 100644 index 45df8164ec..0000000000 --- a/pkg/controller/resourcequota/replenishment_controller_test.go +++ /dev/null @@ -1,160 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcequota - -import ( - "testing" - "time" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/controller" -) - -// testReplenishment lets us test replenishment functions are invoked -type testReplenishment struct { - groupKind schema.GroupKind - namespace string -} - -// mock function that holds onto the last kind that was replenished -func (t *testReplenishment) Replenish(groupKind schema.GroupKind, namespace string, object runtime.Object) { - t.groupKind = groupKind - t.namespace = namespace -} - -func TestPodReplenishmentUpdateFunc(t *testing.T) { - mockReplenish := &testReplenishment{} - options := ReplenishmentControllerOptions{ - GroupKind: api.Kind("Pod"), - ReplenishmentFunc: mockReplenish.Replenish, - ResyncPeriod: controller.NoResyncPeriodFunc, - } - oldPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}, - Status: v1.PodStatus{Phase: v1.PodRunning}, - } - newPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}, - Status: v1.PodStatus{Phase: v1.PodFailed}, - } - fakeClock := clock.NewFakeClock(time.Now()) - updateFunc := PodReplenishmentUpdateFunc(&options, fakeClock) - updateFunc(oldPod, newPod) - if mockReplenish.groupKind != api.Kind("Pod") { - t.Errorf("Unexpected group kind %v", mockReplenish.groupKind) - } - if mockReplenish.namespace != oldPod.Namespace { - t.Errorf("Unexpected namespace %v", mockReplenish.namespace) - } -} - -func TestObjectReplenishmentDeleteFunc(t *testing.T) { - mockReplenish := &testReplenishment{} - options := ReplenishmentControllerOptions{ - GroupKind: api.Kind("Pod"), - ReplenishmentFunc: mockReplenish.Replenish, - ResyncPeriod: controller.NoResyncPeriodFunc, - } - oldPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}, - Status: v1.PodStatus{Phase: v1.PodRunning}, - } - deleteFunc := ObjectReplenishmentDeleteFunc(&options) - deleteFunc(oldPod) - if mockReplenish.groupKind != api.Kind("Pod") { - t.Errorf("Unexpected group kind %v", mockReplenish.groupKind) - } - if mockReplenish.namespace != oldPod.Namespace { - t.Errorf("Unexpected namespace %v", mockReplenish.namespace) - } -} - -func TestServiceReplenishmentUpdateFunc(t *testing.T) { - mockReplenish := &testReplenishment{} - options := ReplenishmentControllerOptions{ - GroupKind: api.Kind("Service"), - ReplenishmentFunc: mockReplenish.Replenish, - ResyncPeriod: controller.NoResyncPeriodFunc, - } - oldService := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "mysvc"}, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeNodePort, - Ports: []v1.ServicePort{{ - Port: 80, - TargetPort: intstr.FromInt(80), - }}, - }, - } - newService := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "mysvc"}, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeClusterIP, - Ports: []v1.ServicePort{{ - Port: 80, - TargetPort: intstr.FromInt(80), - }}}, - } - updateFunc := ServiceReplenishmentUpdateFunc(&options) - updateFunc(oldService, newService) - if mockReplenish.groupKind != api.Kind("Service") { - t.Errorf("Unexpected group kind %v", mockReplenish.groupKind) - } - if mockReplenish.namespace != oldService.Namespace { - t.Errorf("Unexpected namespace %v", mockReplenish.namespace) - } - - mockReplenish = &testReplenishment{} - options = ReplenishmentControllerOptions{ - GroupKind: api.Kind("Service"), - ReplenishmentFunc: mockReplenish.Replenish, - ResyncPeriod: controller.NoResyncPeriodFunc, - } - oldService = &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "mysvc"}, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeNodePort, - Ports: []v1.ServicePort{{ - Port: 80, - TargetPort: intstr.FromInt(80), - }}, - }, - } - newService = &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "mysvc"}, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeNodePort, - Ports: []v1.ServicePort{{ - Port: 81, - TargetPort: intstr.FromInt(81), - }}}, - } - updateFunc = ServiceReplenishmentUpdateFunc(&options) - updateFunc(oldService, newService) - if mockReplenish.groupKind == api.Kind("Service") { - t.Errorf("Unexpected group kind %v", mockReplenish.groupKind) - } - if mockReplenish.namespace == oldService.Namespace { - t.Errorf("Unexpected namespace %v", mockReplenish.namespace) - } -} diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 48fad9fdc1..8964090022 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -18,6 +18,8 @@ package resourcequota import ( "fmt" + "reflect" + "sync" "time" "github.com/golang/glog" @@ -27,10 +29,11 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" @@ -42,6 +45,19 @@ import ( "k8s.io/kubernetes/pkg/quota" ) +// NamespacedResourcesFunc knows how to discover namespaced resources. +type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error) + +// ReplenishmentFunc is a signal that a resource changed in specified namespace +// that may require quota to be recalculated. +type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string) + +// InformerFactory is all the quota system needs to interface with informers. +type InformerFactory interface { + ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error) + Start(stopCh <-chan struct{}) +} + // ResourceQuotaControllerOptions holds options for creating a quota controller type ResourceQuotaControllerOptions struct { // Must have authority to list all quotas, and update quota status @@ -50,15 +66,18 @@ type ResourceQuotaControllerOptions struct { ResourceQuotaInformer coreinformers.ResourceQuotaInformer // Controls full recalculation of quota usage ResyncPeriod controller.ResyncPeriodFunc - // Knows how to calculate usage + // Maintains evaluators that know how to calculate usage for group resource Registry quota.Registry - // Knows how to build controllers that notify replenishment events - ControllerFactory ReplenishmentControllerFactory + // Discover list of supported resources on the server. + DiscoveryFunc NamespacedResourcesFunc + // A function that returns the list of resources to ignore + IgnoredResourcesFunc func() map[schema.GroupResource]struct{} + // InformersStarted knows if informers were started. + InformersStarted <-chan struct{} + // InformerFactory interfaces with informers. + InformerFactory InformerFactory // Controls full resync of objects monitored for replenishment. ReplenishmentResyncPeriod controller.ResyncPeriodFunc - // List of GroupKind objects that should be monitored for replenishment at - // a faster frequency than the quota controller recalculation interval - GroupKindsToReplenish []schema.GroupKind } // ResourceQuotaController is responsible for tracking quota usage status in the system @@ -79,9 +98,16 @@ type ResourceQuotaController struct { resyncPeriod controller.ResyncPeriodFunc // knows how to calculate usage registry quota.Registry + // knows how to monitor all the resources tracked by quota and trigger replenishment + quotaMonitor *QuotaMonitor + // controls the workers that process quotas + // this lock is acquired to control write access to the monitors and ensures that all + // monitors are synced before the controller can process quotas. + workerLock sync.RWMutex } -func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController { +// NewResourceQuotaController creates a quota controller with specified options +func NewResourceQuotaController(options *ResourceQuotaControllerOptions) (*ResourceQuotaController, error) { // build the resource quota controller rq := &ResourceQuotaController{ rqClient: options.QuotaClient, @@ -122,21 +148,30 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour rq.resyncPeriod(), ) - for _, groupKindToReplenish := range options.GroupKindsToReplenish { - controllerOptions := &ReplenishmentControllerOptions{ - GroupKind: groupKindToReplenish, - ResyncPeriod: options.ReplenishmentResyncPeriod, - ReplenishmentFunc: rq.replenishQuota, - } - replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) - if err != nil { - glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err) - } else { - // make sure we wait for each shared informer's cache to sync - rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, replenishmentController.HasSynced) - } + qm := &QuotaMonitor{ + informersStarted: options.InformersStarted, + informerFactory: options.InformerFactory, + ignoredResources: options.IgnoredResourcesFunc(), + resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"), + resyncPeriod: options.ReplenishmentResyncPeriod, + replenishmentFunc: rq.replenishQuota, + registry: rq.registry, } - return rq + rq.quotaMonitor = qm + + // do initial quota monitor setup + resources, err := GetQuotableResources(options.DiscoveryFunc) + if err != nil { + return nil, err + } + if err = qm.syncMonitors(resources); err != nil { + utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err)) + } + + // only start quota once all informers synced + rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced) + + return rq, nil } // enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics @@ -186,7 +221,7 @@ func (rq *ResourceQuotaController) addQuota(obj interface{}) { for constraint := range resourceQuota.Status.Hard { if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound { matchedResources := []api.ResourceName{api.ResourceName(constraint)} - for _, evaluator := range rq.registry.Evaluators() { + for _, evaluator := range rq.registry.List() { if intersection := evaluator.MatchingResources(matchedResources); len(intersection) > 0 { rq.missingUsageQueue.Add(key) return @@ -202,6 +237,10 @@ func (rq *ResourceQuotaController) addQuota(obj interface{}) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() { workFunc := func() bool { + + rq.workerLock.RLock() + defer rq.workerLock.RUnlock() + key, quit := queue.Get() if quit { return true @@ -235,6 +274,8 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting resource quota controller") defer glog.Infof("Shutting down resource quota controller") + go rq.quotaMonitor.Run(stopCh) + if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) { return } @@ -336,11 +377,10 @@ func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota *v1.Resourc } // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated -func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, namespace string, object runtime.Object) { - // check if the quota controller can evaluate this kind, if not, ignore it altogether... - evaluators := rq.registry.Evaluators() - evaluator, found := evaluators[groupKind] - if !found { +func (rq *ResourceQuotaController) replenishQuota(groupResource schema.GroupResource, namespace string) { + // check if the quota controller can evaluate this groupResource, if not, ignore it altogether... + evaluator := rq.registry.Get(groupResource) + if evaluator == nil { return } @@ -373,3 +413,66 @@ func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, na } } } + +// Sync periodically resyncs the controller when new resources are observed from discovery. +func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, period time.Duration, stopCh <-chan struct{}) { + // Something has changed, so track the new state and perform a sync. + oldResources := make(map[schema.GroupVersionResource]struct{}) + wait.Until(func() { + // Get the current resource list from discovery. + newResources, err := GetQuotableResources(discoveryFunc) + if err != nil { + utilruntime.HandleError(err) + return + } + + // Decide whether discovery has reported a change. + if reflect.DeepEqual(oldResources, newResources) { + glog.V(4).Infof("no resource updates from discovery, skipping resource quota sync") + return + } + + // Something has changed, so track the new state and perform a sync. + glog.V(2).Infof("syncing resource quota controller with updated resources from discovery: %v", newResources) + oldResources = newResources + + // Ensure workers are paused to avoid processing events before informers + // have resynced. + rq.workerLock.Lock() + defer rq.workerLock.Unlock() + + // Perform the monitor resync and wait for controllers to report cache sync. + if err := rq.resyncMonitors(newResources); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) + return + } + if !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync")) + } + }, period, stopCh) +} + +// resyncMonitors starts or stops quota monitors as needed to ensure that all +// (and only) those resources present in the map are monitored. +func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error { + if err := rq.quotaMonitor.syncMonitors(resources); err != nil { + return err + } + rq.quotaMonitor.startMonitors() + return nil +} + +// GetQuotableResources returns all resources that the quota system should recognize. +// It requires a resource supports the following verbs: 'create','list','delete' +func GetQuotableResources(discoveryFunc NamespacedResourcesFunc) (map[schema.GroupVersionResource]struct{}, error) { + possibleResources, err := discoveryFunc() + if err != nil { + return nil, fmt.Errorf("failed to discover resources: %v", err) + } + quotableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"create", "list", "delete"}}, possibleResources) + quotableGroupVersionResources, err := discovery.GroupVersionResources(quotableResources) + if err != nil { + return nil, fmt.Errorf("Failed to parse resources: %v", err) + } + return quotableGroupVersionResources, nil +} diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index 3ab7638599..dcbbad8b7b 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -17,19 +17,23 @@ limitations under the License. package resourcequota import ( + "fmt" "strings" "testing" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" - "k8s.io/kubernetes/pkg/api" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/generic" "k8s.io/kubernetes/pkg/quota/install" ) @@ -52,383 +56,249 @@ func getResourceRequirements(requests, limits v1.ResourceList) v1.ResourceRequir return res } +func mockDiscoveryFunc() ([]*metav1.APIResourceList, error) { + return []*metav1.APIResourceList{}, nil +} + +func mockListerForResourceFunc(listersForResource map[schema.GroupVersionResource]cache.GenericLister) quota.ListerForResourceFunc { + return func(gvr schema.GroupVersionResource) (cache.GenericLister, error) { + lister, found := listersForResource[gvr] + if !found { + return nil, fmt.Errorf("no lister found for resource") + } + return lister, nil + } +} + +func newGenericLister(groupResource schema.GroupResource, items []runtime.Object) cache.GenericLister { + store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + for _, item := range items { + store.Add(item) + } + return cache.NewGenericLister(store, groupResource) +} + +type quotaController struct { + *ResourceQuotaController + stop chan struct{} +} + +func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc) quotaController { + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaConfiguration := install.NewQuotaConfigurationForControllers(lister) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ + QuotaClient: kubeClient.Core(), + ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), + ResyncPeriod: controller.NoResyncPeriodFunc, + ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, + IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, + DiscoveryFunc: mockDiscoveryFunc, + Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), + InformersStarted: alwaysStarted, + } + qc, err := NewResourceQuotaController(resourceQuotaControllerOptions) + if err != nil { + t.Fatal(err) + } + stop := make(chan struct{}) + go informerFactory.Start(stop) + return quotaController{qc, stop} +} + +func newTestPods() []runtime.Object { + return []runtime.Object{ + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-running", Namespace: "testing"}, + Status: v1.PodStatus{Phase: v1.PodRunning}, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{{Name: "vol"}}, + Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}}, + }, + }, + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-running-2", Namespace: "testing"}, + Status: v1.PodStatus{Phase: v1.PodRunning}, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{{Name: "vol"}}, + Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}}, + }, + }, + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod-failed", Namespace: "testing"}, + Status: v1.PodStatus{Phase: v1.PodFailed}, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{{Name: "vol"}}, + Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}}, + }, + }, + } +} + func TestSyncResourceQuota(t *testing.T) { - podList := v1.PodList{ - Items: []v1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Name: "pod-running", Namespace: "testing"}, - Status: v1.PodStatus{Phase: v1.PodRunning}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{{Name: "vol"}}, - Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}}, + testCases := map[string]struct { + gvr schema.GroupVersionResource + items []runtime.Object + quota v1.ResourceQuota + status v1.ResourceQuotaStatus + expectedActionSet sets.String + }{ + "pods": { + gvr: v1.SchemeGroupVersion.WithResource("pods"), + quota: v1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "testing"}, + Spec: v1.ResourceQuotaSpec{ + Hard: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + v1.ResourceMemory: resource.MustParse("100Gi"), + v1.ResourcePods: resource.MustParse("5"), + }, }, }, - { - ObjectMeta: metav1.ObjectMeta{Name: "pod-running-2", Namespace: "testing"}, - Status: v1.PodStatus{Phase: v1.PodRunning}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{{Name: "vol"}}, - Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}}, + status: v1.ResourceQuotaStatus{ + Hard: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + v1.ResourceMemory: resource.MustParse("100Gi"), + v1.ResourcePods: resource.MustParse("5"), + }, + Used: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("200m"), + v1.ResourceMemory: resource.MustParse("2Gi"), + v1.ResourcePods: resource.MustParse("2"), }, }, - { - ObjectMeta: metav1.ObjectMeta{Name: "pod-failed", Namespace: "testing"}, - Status: v1.PodStatus{Phase: v1.PodFailed}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{{Name: "vol"}}, - Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}}, + expectedActionSet: sets.NewString( + strings.Join([]string{"update", "resourcequotas", "status"}, "-"), + ), + items: newTestPods(), + }, + "quota-spec-hard-updated": { + gvr: v1.SchemeGroupVersion.WithResource("pods"), + quota: v1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: v1.ResourceQuotaSpec{ + Hard: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + }, + }, + Status: v1.ResourceQuotaStatus{ + Hard: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + }, + Used: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + }, }, }, + status: v1.ResourceQuotaStatus{ + Hard: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + }, + Used: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + }, + }, + expectedActionSet: sets.NewString( + strings.Join([]string{"update", "resourcequotas", "status"}, "-"), + ), + items: []runtime.Object{}, }, - } - resourceQuota := v1.ResourceQuota{ - ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "testing"}, - Spec: v1.ResourceQuotaSpec{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("3"), - v1.ResourceMemory: resource.MustParse("100Gi"), - v1.ResourcePods: resource.MustParse("5"), + "quota-unchanged": { + gvr: v1.SchemeGroupVersion.WithResource("pods"), + quota: v1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: v1.ResourceQuotaSpec{ + Hard: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + }, + }, + Status: v1.ResourceQuotaStatus{ + Hard: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + }, + }, }, - }, - } - expectedUsage := v1.ResourceQuota{ - Status: v1.ResourceQuotaStatus{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("3"), - v1.ResourceMemory: resource.MustParse("100Gi"), - v1.ResourcePods: resource.MustParse("5"), - }, - Used: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("200m"), - v1.ResourceMemory: resource.MustParse("2Gi"), - v1.ResourcePods: resource.MustParse("2"), + status: v1.ResourceQuotaStatus{ + Hard: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("4"), + }, + Used: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + }, }, + expectedActionSet: sets.NewString(), + items: []runtime.Object{}, }, } - kubeClient := fake.NewSimpleClientset(&podList, &resourceQuota) - informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) - resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - QuotaClient: kubeClient.Core(), - ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), - GroupKindsToReplenish: []schema.GroupKind{ - api.Kind("Pod"), - api.Kind("Service"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - }, - ControllerFactory: NewReplenishmentControllerFactory(informerFactory), - ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, - } - quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - err := quotaController.syncResourceQuota(&resourceQuota) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - expectedActionSet := sets.NewString( - strings.Join([]string{"list", "pods", ""}, "-"), - strings.Join([]string{"update", "resourcequotas", "status"}, "-"), - ) - actionSet := sets.NewString() - for _, action := range kubeClient.Actions() { - actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-")) - } - if !actionSet.HasAll(expectedActionSet.List()...) { - t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) - } - - lastActionIndex := len(kubeClient.Actions()) - 1 - usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota) - - // ensure hard and used limits are what we expected - for k, v := range expectedUsage.Status.Hard { - actual := usage.Status.Hard[k] - actualValue := actual.String() - expectedValue := v.String() - if expectedValue != actualValue { - t.Errorf("Usage Hard: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue) + for testName, testCase := range testCases { + kubeClient := fake.NewSimpleClientset(&testCase.quota) + listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ + testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items), } - } - for k, v := range expectedUsage.Status.Used { - actual := usage.Status.Used[k] - actualValue := actual.String() - expectedValue := v.String() - if expectedValue != actualValue { - t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue) + qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig)) + defer close(qc.stop) + + if err := qc.syncResourceQuota(&testCase.quota); err != nil { + t.Fatalf("test: %s, unexpected error: %v", testName, err) } - } -} -func TestSyncResourceQuotaSpecChange(t *testing.T) { - resourceQuota := v1.ResourceQuota{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "rq", - }, - Spec: v1.ResourceQuotaSpec{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - }, - }, - Status: v1.ResourceQuotaStatus{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("3"), - }, - Used: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("0"), - }, - }, - } - - expectedUsage := v1.ResourceQuota{ - Status: v1.ResourceQuotaStatus{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - }, - Used: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("0"), - }, - }, - } - - kubeClient := fake.NewSimpleClientset(&resourceQuota) - informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) - resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - QuotaClient: kubeClient.Core(), - ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), - GroupKindsToReplenish: []schema.GroupKind{ - api.Kind("Pod"), - api.Kind("Service"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - }, - ControllerFactory: NewReplenishmentControllerFactory(informerFactory), - ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, - } - quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - err := quotaController.syncResourceQuota(&resourceQuota) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - - expectedActionSet := sets.NewString( - strings.Join([]string{"list", "pods", ""}, "-"), - strings.Join([]string{"update", "resourcequotas", "status"}, "-"), - ) - actionSet := sets.NewString() - for _, action := range kubeClient.Actions() { - actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-")) - } - if !actionSet.HasAll(expectedActionSet.List()...) { - t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) - } - - lastActionIndex := len(kubeClient.Actions()) - 1 - usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota) - - // ensure hard and used limits are what we expected - for k, v := range expectedUsage.Status.Hard { - actual := usage.Status.Hard[k] - actualValue := actual.String() - expectedValue := v.String() - if expectedValue != actualValue { - t.Errorf("Usage Hard: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue) + actionSet := sets.NewString() + for _, action := range kubeClient.Actions() { + actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-")) } - } - for k, v := range expectedUsage.Status.Used { - actual := usage.Status.Used[k] - actualValue := actual.String() - expectedValue := v.String() - if expectedValue != actualValue { - t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue) + if !actionSet.HasAll(testCase.expectedActionSet.List()...) { + t.Errorf("test: %s,\nExpected actions:\n%v\n but got:\n%v\nDifference:\n%v", testName, testCase.expectedActionSet, actionSet, testCase.expectedActionSet.Difference(actionSet)) } - } -} -func TestSyncResourceQuotaSpecHardChange(t *testing.T) { - resourceQuota := v1.ResourceQuota{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "rq", - }, - Spec: v1.ResourceQuotaSpec{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - }, - }, - Status: v1.ResourceQuotaStatus{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("3"), - v1.ResourceMemory: resource.MustParse("1Gi"), - }, - Used: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("0"), - v1.ResourceMemory: resource.MustParse("0"), - }, - }, - } + lastActionIndex := len(kubeClient.Actions()) - 1 + usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota) - expectedUsage := v1.ResourceQuota{ - Status: v1.ResourceQuotaStatus{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - }, - Used: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("0"), - }, - }, - } - - kubeClient := fake.NewSimpleClientset(&resourceQuota) - informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) - resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - QuotaClient: kubeClient.Core(), - ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), - GroupKindsToReplenish: []schema.GroupKind{ - api.Kind("Pod"), - api.Kind("Service"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - }, - ControllerFactory: NewReplenishmentControllerFactory(informerFactory), - ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, - } - quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - err := quotaController.syncResourceQuota(&resourceQuota) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - - expectedActionSet := sets.NewString( - strings.Join([]string{"list", "pods", ""}, "-"), - strings.Join([]string{"update", "resourcequotas", "status"}, "-"), - ) - actionSet := sets.NewString() - for _, action := range kubeClient.Actions() { - actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-")) - } - if !actionSet.HasAll(expectedActionSet.List()...) { - t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) - } - - lastActionIndex := len(kubeClient.Actions()) - 1 - usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota) - - // ensure hard and used limits are what we expected - for k, v := range expectedUsage.Status.Hard { - actual := usage.Status.Hard[k] - actualValue := actual.String() - expectedValue := v.String() - if expectedValue != actualValue { - t.Errorf("Usage Hard: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue) + // ensure usage is as expected + if len(usage.Status.Hard) != len(testCase.status.Hard) { + t.Errorf("test: %s, status hard lengths do not match", testName) } - } - for k, v := range expectedUsage.Status.Used { - actual := usage.Status.Used[k] - actualValue := actual.String() - expectedValue := v.String() - if expectedValue != actualValue { - t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue) + if len(usage.Status.Used) != len(testCase.status.Used) { + t.Errorf("test: %s, status used lengths do not match", testName) } - } - - // ensure usage hard and used are are synced with spec hard, not have dirty resource - for k, v := range usage.Status.Hard { - if k == v1.ResourceMemory { - t.Errorf("Unexpected Usage Hard: Key: %v, Value: %v", k, v.String()) + for k, v := range testCase.status.Hard { + actual := usage.Status.Hard[k] + actualValue := actual.String() + expectedValue := v.String() + if expectedValue != actualValue { + t.Errorf("test: %s, Usage Hard: Key: %v, Expected: %v, Actual: %v", testName, k, expectedValue, actualValue) + } } - } - - for k, v := range usage.Status.Used { - if k == v1.ResourceMemory { - t.Errorf("Unexpected Usage Used: Key: %v, Value: %v", k, v.String()) + for k, v := range testCase.status.Used { + actual := usage.Status.Used[k] + actualValue := actual.String() + expectedValue := v.String() + if expectedValue != actualValue { + t.Errorf("test: %s, Usage Used: Key: %v, Expected: %v, Actual: %v", testName, k, expectedValue, actualValue) + } } } } -func TestSyncResourceQuotaNoChange(t *testing.T) { - resourceQuota := v1.ResourceQuota{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "rq", - }, - Spec: v1.ResourceQuotaSpec{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - }, - }, - Status: v1.ResourceQuotaStatus{ - Hard: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - }, - Used: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("0"), - }, - }, - } - - kubeClient := fake.NewSimpleClientset(&v1.PodList{}, &resourceQuota) - informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) - resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - QuotaClient: kubeClient.Core(), - ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), - GroupKindsToReplenish: []schema.GroupKind{ - api.Kind("Pod"), - api.Kind("Service"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - }, - ControllerFactory: NewReplenishmentControllerFactory(informerFactory), - ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, - } - quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - err := quotaController.syncResourceQuota(&resourceQuota) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - expectedActionSet := sets.NewString( - strings.Join([]string{"list", "pods", ""}, "-"), - ) - actionSet := sets.NewString() - for _, action := range kubeClient.Actions() { - actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-")) - } - if !actionSet.HasAll(expectedActionSet.List()...) { - t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) - } -} - func TestAddQuota(t *testing.T) { kubeClient := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) - resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - QuotaClient: kubeClient.Core(), - ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), - GroupKindsToReplenish: []schema.GroupKind{ - api.Kind("Pod"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - }, - ControllerFactory: NewReplenishmentControllerFactory(informerFactory), - ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, + gvr := v1.SchemeGroupVersion.WithResource("pods") + listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{ + gvr: newGenericLister(gvr.GroupResource(), newTestPods()), } - quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - delete(quotaController.registry.(*generic.GenericRegistry).InternalEvaluators, api.Kind("Service")) + qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig)) + defer close(qc.stop) testCases := []struct { - name string - + name string quota *v1.ResourceQuota expectedPriority bool }{ @@ -491,7 +361,7 @@ func TestAddQuota(t *testing.T) { }, }, { - name: "status, missing usage, but don't care", + name: "status, missing usage, but don't care (no informer)", expectedPriority: false, quota: &v1.ResourceQuota{ ObjectMeta: metav1.ObjectMeta{ @@ -500,12 +370,12 @@ func TestAddQuota(t *testing.T) { }, Spec: v1.ResourceQuotaSpec{ Hard: v1.ResourceList{ - v1.ResourceServices: resource.MustParse("4"), + "count/foobars.example.com": resource.MustParse("4"), }, }, Status: v1.ResourceQuotaStatus{ Hard: v1.ResourceList{ - v1.ResourceServices: resource.MustParse("4"), + "count/foobars.example.com": resource.MustParse("4"), }, }, }, @@ -536,30 +406,29 @@ func TestAddQuota(t *testing.T) { } for _, tc := range testCases { - quotaController.addQuota(tc.quota) + qc.addQuota(tc.quota) if tc.expectedPriority { - if e, a := 1, quotaController.missingUsageQueue.Len(); e != a { + if e, a := 1, qc.missingUsageQueue.Len(); e != a { t.Errorf("%s: expected %v, got %v", tc.name, e, a) } - if e, a := 0, quotaController.queue.Len(); e != a { + if e, a := 0, qc.queue.Len(); e != a { t.Errorf("%s: expected %v, got %v", tc.name, e, a) } } else { - if e, a := 0, quotaController.missingUsageQueue.Len(); e != a { + if e, a := 0, qc.missingUsageQueue.Len(); e != a { t.Errorf("%s: expected %v, got %v", tc.name, e, a) } - if e, a := 1, quotaController.queue.Len(); e != a { + if e, a := 1, qc.queue.Len(); e != a { t.Errorf("%s: expected %v, got %v", tc.name, e, a) } } - - for quotaController.missingUsageQueue.Len() > 0 { - key, _ := quotaController.missingUsageQueue.Get() - quotaController.missingUsageQueue.Done(key) + for qc.missingUsageQueue.Len() > 0 { + key, _ := qc.missingUsageQueue.Get() + qc.missingUsageQueue.Done(key) } - for quotaController.queue.Len() > 0 { - key, _ := quotaController.queue.Get() - quotaController.queue.Done(key) + for qc.queue.Len() > 0 { + key, _ := qc.queue.Get() + qc.queue.Done(key) } } } diff --git a/pkg/controller/resourcequota/resource_quota_monitor.go b/pkg/controller/resourcequota/resource_quota_monitor.go new file mode 100644 index 0000000000..e5d6dc593f --- /dev/null +++ b/pkg/controller/resourcequota/resource_quota_monitor.go @@ -0,0 +1,341 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequota + +import ( + "fmt" + "sync" + "time" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/quota" + "k8s.io/kubernetes/pkg/quota/evaluator/core" + "k8s.io/kubernetes/pkg/quota/generic" +) + +type eventType int + +func (e eventType) String() string { + switch e { + case addEvent: + return "add" + case updateEvent: + return "update" + case deleteEvent: + return "delete" + default: + return fmt.Sprintf("unknown(%d)", int(e)) + } +} + +const ( + addEvent eventType = iota + updateEvent + deleteEvent +) + +type event struct { + eventType eventType + obj interface{} + oldObj interface{} + gvr schema.GroupVersionResource +} + +type QuotaMonitor struct { + // each monitor list/watches a resource and determines if we should replenish quota + monitors monitors + monitorLock sync.Mutex + // informersStarted is closed after after all of the controllers have been initialized and are running. + // After that it is safe to start them here, before that it is not. + informersStarted <-chan struct{} + + // stopCh drives shutdown. If it is nil, it indicates that Run() has not been + // called yet. If it is non-nil, then when closed it indicates everything + // should shut down. + // + // This channel is also protected by monitorLock. + stopCh <-chan struct{} + + // monitors are the producer of the resourceChanges queue + resourceChanges workqueue.RateLimitingInterface + + // interfaces with informers + informerFactory InformerFactory + + // list of resources to ignore + ignoredResources map[schema.GroupResource]struct{} + + // The period that should be used to re-sync the monitored resource + resyncPeriod controller.ResyncPeriodFunc + + // callback to alert that a change may require quota recalculation + replenishmentFunc ReplenishmentFunc + + // maintains list of evaluators + registry quota.Registry +} + +// monitor runs a Controller with a local stop channel. +type monitor struct { + controller cache.Controller + + // stopCh stops Controller. If stopCh is nil, the monitor is considered to be + // not yet started. + stopCh chan struct{} +} + +// Run is intended to be called in a goroutine. Multiple calls of this is an +// error. +func (m *monitor) Run() { + m.controller.Run(m.stopCh) +} + +type monitors map[schema.GroupVersionResource]*monitor + +func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cache.Controller, error) { + // TODO: pass this down + clock := clock.RealClock{} + handlers := cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + // TODO: leaky abstraction! live w/ it for now, but should pass down an update filter func. + // we only want to queue the updates we care about though as too much noise will overwhelm queue. + notifyUpdate := false + switch resource.GroupResource() { + case schema.GroupResource{Resource: "pods"}: + oldPod := oldObj.(*v1.Pod) + newPod := newObj.(*v1.Pod) + notifyUpdate = core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock) + case schema.GroupResource{Resource: "services"}: + oldService := oldObj.(*v1.Service) + newService := newObj.(*v1.Service) + notifyUpdate = core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService) + } + if notifyUpdate { + event := &event{ + eventType: updateEvent, + obj: newObj, + oldObj: oldObj, + gvr: resource, + } + qm.resourceChanges.Add(event) + } + }, + DeleteFunc: func(obj interface{}) { + // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it + if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = deletedFinalStateUnknown.Obj + } + event := &event{ + eventType: deleteEvent, + obj: obj, + gvr: resource, + } + qm.resourceChanges.Add(event) + }, + } + shared, err := qm.informerFactory.ForResource(resource) + if err == nil { + glog.V(4).Infof("QuotaMonitor using a shared informer for resource %q", resource.String()) + shared.Informer().AddEventHandlerWithResyncPeriod(handlers, qm.resyncPeriod()) + return shared.Informer().GetController(), nil + } + glog.V(4).Infof("QuotaMonitor unable to use a shared informer for resource %q: %v", resource.String(), err) + + // TODO: if we can share storage with garbage collector, it may make sense to support other resources + // until that time, aggregated api servers will have to run their own controller to reconcile their own quota. + return nil, fmt.Errorf("unable to monitor quota for resource %q", resource.String()) +} + +// syncMonitors rebuilds the monitor set according to the supplied resources, +// creating or deleting monitors as necessary. It will return any error +// encountered, but will make an attempt to create a monitor for each resource +// instead of immediately exiting on an error. It may be called before or after +// Run. Monitors are NOT started as part of the sync. To ensure all existing +// monitors are started, call startMonitors. +func (qm *QuotaMonitor) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error { + qm.monitorLock.Lock() + defer qm.monitorLock.Unlock() + + toRemove := qm.monitors + if toRemove == nil { + toRemove = monitors{} + } + current := monitors{} + errs := []error{} + kept := 0 + added := 0 + for resource := range resources { + if _, ok := qm.ignoredResources[resource.GroupResource()]; ok { + continue + } + if m, ok := toRemove[resource]; ok { + current[resource] = m + delete(toRemove, resource) + kept++ + continue + } + c, err := qm.controllerFor(resource) + if err != nil { + errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err)) + continue + } + + // check if we need to create an evaluator for this resource (if none previously registered) + evaluator := qm.registry.Get(resource.GroupResource()) + if evaluator == nil { + listerFunc := generic.ListerFuncForResourceFunc(qm.informerFactory.ForResource) + listResourceFunc := generic.ListResourceUsingListerFunc(listerFunc, resource) + evaluator = generic.NewObjectCountEvaluator(false, resource.GroupResource(), listResourceFunc, "") + qm.registry.Add(evaluator) + glog.Infof("QuotaMonitor created object count evaluator for %s", resource.GroupResource()) + } + + // track the monitor + current[resource] = &monitor{controller: c} + added++ + } + qm.monitors = current + + for _, monitor := range toRemove { + if monitor.stopCh != nil { + close(monitor.stopCh) + } + } + + glog.V(4).Infof("quota synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove)) + // NewAggregate returns nil if errs is 0-length + return utilerrors.NewAggregate(errs) +} + +// startMonitors ensures the current set of monitors are running. Any newly +// started monitors will also cause shared informers to be started. +// +// If called before Run, startMonitors does nothing (as there is no stop channel +// to support monitor/informer execution). +func (qm *QuotaMonitor) startMonitors() { + qm.monitorLock.Lock() + defer qm.monitorLock.Unlock() + + if qm.stopCh == nil { + return + } + + // we're waiting until after the informer start that happens once all the controllers are initialized. This ensures + // that they don't get unexpected events on their work queues. + <-qm.informersStarted + + monitors := qm.monitors + started := 0 + for _, monitor := range monitors { + if monitor.stopCh == nil { + monitor.stopCh = make(chan struct{}) + qm.informerFactory.Start(qm.stopCh) + go monitor.Run() + started++ + } + } + glog.V(4).Infof("QuotaMonitor started %d new monitors, %d currently running", started, len(monitors)) +} + +// IsSynced returns true if any monitors exist AND all those monitors' +// controllers HasSynced functions return true. This means IsSynced could return +// true at one time, and then later return false if all monitors were +// reconstructed. +func (qm *QuotaMonitor) IsSynced() bool { + qm.monitorLock.Lock() + defer qm.monitorLock.Unlock() + + if len(qm.monitors) == 0 { + return false + } + + for _, monitor := range qm.monitors { + if !monitor.controller.HasSynced() { + return false + } + } + return true +} + +// Run sets the stop channel and starts monitor execution until stopCh is +// closed. Any running monitors will be stopped before Run returns. +func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) { + glog.Infof("QuotaMonitor running") + defer glog.Infof("QuotaMonitor stopping") + + // Set up the stop channel. + qm.monitorLock.Lock() + qm.stopCh = stopCh + qm.monitorLock.Unlock() + + // Start monitors and begin change processing until the stop channel is + // closed. + qm.startMonitors() + wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh) + + // Stop any running monitors. + qm.monitorLock.Lock() + defer qm.monitorLock.Unlock() + monitors := qm.monitors + stopped := 0 + for _, monitor := range monitors { + if monitor.stopCh != nil { + stopped++ + close(monitor.stopCh) + } + } + glog.Infof("QuotaMonitor stopped %d of %d monitors", stopped, len(monitors)) +} + +func (qm *QuotaMonitor) runProcessResourceChanges() { + for qm.processResourceChanges() { + } +} + +// Dequeueing an event from resourceChanges to process +func (qm *QuotaMonitor) processResourceChanges() bool { + item, quit := qm.resourceChanges.Get() + if quit { + return false + } + defer qm.resourceChanges.Done(item) + event, ok := item.(*event) + if !ok { + utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item)) + return true + } + obj := event.obj + accessor, err := meta.Accessor(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) + return true + } + glog.V(4).Infof("QuotaMonitor process object: %s, namespace %s, name %s, uid %s, event type %v", event.gvr.String(), accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType) + qm.replenishmentFunc(event.gvr.GroupResource(), accessor.GetNamespace()) + return true +}