diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 6539ef0401..1f743a48a0 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -76,7 +76,7 @@ func startPodGCController(ctx ControllerContext) (bool, error) { func startResourceQuotaController(ctx ControllerContext) (bool, error) { resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") - resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory) + resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.NewInformerFactory) groupKindsToReplenish := []schema.GroupKind{ api.Kind("Pod"), api.Kind("Service"), @@ -87,9 +87,10 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) { } resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ KubeClient: resourceQuotaControllerClient, + ResourceQuotaInformer: ctx.NewInformerFactory.Core().V1().ResourceQuotas(), ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration), Registry: resourceQuotaRegistry, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient), + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.NewInformerFactory), ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options), GroupKindsToReplenish: groupKindsToReplenish, } diff --git a/cmd/kubelet/app/options/BUILD b/cmd/kubelet/app/options/BUILD index 10f8d1f6df..6e2e9c5a4f 100644 --- a/cmd/kubelet/app/options/BUILD +++ b/cmd/kubelet/app/options/BUILD @@ -14,6 +14,7 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/apis/componentconfig:go_default_library", + "//pkg/apis/componentconfig/install:go_default_library", "//pkg/apis/componentconfig/v1alpha1:go_default_library", "//pkg/util/taints:go_default_library", "//vendor:github.com/spf13/pflag", diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index a5126d6802..c9a2663779 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -26,6 +26,8 @@ import ( utilflag "k8s.io/apiserver/pkg/util/flag" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/componentconfig" + // Need to make sure the componentconfig api is installed so defaulting funcs work + _ "k8s.io/kubernetes/pkg/apis/componentconfig/install" "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" utiltaints "k8s.io/kubernetes/pkg/util/taints" diff --git a/pkg/controller/resourcequota/BUILD b/pkg/controller/resourcequota/BUILD index beb64ca7a1..544b6df2ce 100644 --- a/pkg/controller/resourcequota/BUILD +++ b/pkg/controller/resourcequota/BUILD @@ -20,20 +20,23 @@ go_library( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/controller:go_default_library", - "//pkg/controller/informers:go_default_library", "//pkg/quota:go_default_library", "//pkg/quota/evaluator/core:go_default_library", "//pkg/util/metrics:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/equality", + "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/meta", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/workqueue", ], @@ -51,6 +54,7 @@ go_test( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/controller:go_default_library", "//pkg/quota/generic:go_default_library", "//pkg/quota/install:go_default_library", diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go index 362e9439e3..8c0a6325c2 100644 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ b/pkg/controller/resourcequota/replenishment_controller.go @@ -22,19 +22,15 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/quota/evaluator/core" - "k8s.io/kubernetes/pkg/util/metrics" ) // ReplenishmentFunc is a function that is invoked when controller sees a change @@ -96,151 +92,96 @@ type ReplenishmentControllerFactory interface { // replenishmentControllerFactory implements ReplenishmentControllerFactory type replenishmentControllerFactory struct { - kubeClient clientset.Interface sharedInformerFactory informers.SharedInformerFactory } // NewReplenishmentControllerFactory returns a factory that knows how to build controllers // to replenish resources when updated or deleted -func NewReplenishmentControllerFactory(f informers.SharedInformerFactory, kubeClient clientset.Interface) ReplenishmentControllerFactory { +func NewReplenishmentControllerFactory(f informers.SharedInformerFactory) ReplenishmentControllerFactory { return &replenishmentControllerFactory{ - kubeClient: kubeClient, sharedInformerFactory: f, } } -// NewReplenishmentControllerFactoryFromClient returns a factory that knows how to build controllers to replenish resources -// when updated or deleted using the specified client. -func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory { - return NewReplenishmentControllerFactory(nil, kubeClient) -} - -// controllerFor returns a replenishment controller for the specified group resource. -func controllerFor( - groupResource schema.GroupResource, - f informers.SharedInformerFactory, - handlerFuncs cache.ResourceEventHandlerFuncs, -) (cache.Controller, error) { - genericInformer, err := f.ForResource(groupResource) - if err != nil { - return nil, err - } - informer := genericInformer.Informer() - informer.AddEventHandler(handlerFuncs) - return informer.GetController(), nil -} - -func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.Controller, err error) { - if r.kubeClient != nil && r.kubeClient.Core().RESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().RESTClient().GetRateLimiter()) - } +func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) { + var ( + informer informers.GenericInformer + err error + ) switch options.GroupKind { case api.Kind("Pod"): - if r.sharedInformerFactory != nil { - result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{ + informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("pods")) + if err != nil { + return nil, err + } + informer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ UpdateFunc: PodReplenishmentUpdateFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }) - break - } - result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod()) - case api.Kind("Service"): - // TODO move to informer when defined - _, result = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return r.kubeClient.Core().Services(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return r.kubeClient.Core().Services(metav1.NamespaceAll).Watch(options) - }, }, - &v1.Service{}, options.ResyncPeriod(), + ) + case api.Kind("Service"): + informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("services")) + if err != nil { + return nil, err + } + informer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ UpdateFunc: ServiceReplenishmentUpdateFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, + options.ResyncPeriod(), ) case api.Kind("ReplicationController"): - // TODO move to informer when defined - _, result = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return r.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return r.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.ReplicationController{}, - options.ResyncPeriod(), + informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("replicationcontrollers")) + if err != nil { + return nil, err + } + informer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, + options.ResyncPeriod(), ) case api.Kind("PersistentVolumeClaim"): - if r.sharedInformerFactory != nil { - result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{ - DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }) - break + informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("persistentvolumeclaims")) + if err != nil { + return nil, err } - // TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths... - _, result = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return r.kubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return r.kubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.PersistentVolumeClaim{}, - options.ResyncPeriod(), + informer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, + options.ResyncPeriod(), ) case api.Kind("Secret"): - // TODO move to informer when defined - _, result = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return r.kubeClient.Core().Secrets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return r.kubeClient.Core().Secrets(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.Secret{}, - options.ResyncPeriod(), + informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("secrets")) + if err != nil { + return nil, err + } + informer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, + options.ResyncPeriod(), ) case api.Kind("ConfigMap"): - // TODO move to informer when defined - _, result = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return r.kubeClient.Core().ConfigMaps(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return r.kubeClient.Core().ConfigMaps(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.ConfigMap{}, - options.ResyncPeriod(), + informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("configmaps")) + if err != nil { + return nil, err + } + informer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, + options.ResyncPeriod(), ) default: return nil, NewUnhandledGroupKindError(options.GroupKind) } - return result, err + return informer.Informer().GetController(), nil } // ServiceReplenishmentUpdateFunc will replenish if the service was quota tracked has changed service type diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 90f442ec8b..f3fd866809 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -17,22 +17,26 @@ limitations under the License. package resourcequota import ( + "fmt" "time" "github.com/golang/glog" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/util/metrics" @@ -42,6 +46,8 @@ import ( type ResourceQuotaControllerOptions struct { // Must have authority to list all quotas, and update quota status KubeClient clientset.Interface + // Shared informer for resource quotas + ResourceQuotaInformer coreinformers.ResourceQuotaInformer // Controls full recalculation of quota usage ResyncPeriod controller.ResyncPeriodFunc // Knows how to calculate usage @@ -59,10 +65,10 @@ type ResourceQuotaControllerOptions struct { type ResourceQuotaController struct { // Must have authority to list all resources in the system, and update quota status kubeClient clientset.Interface - // An index of resource quota objects by namespace - rqIndexer cache.Indexer - // Watches changes to all resource quota - rqController cache.Controller + // A lister/getter of resource quota objects + rqLister corelisters.ResourceQuotaLister + // A list of functions that return true when their caches have synced + informerSyncedFuncs []cache.InformerSynced // ResourceQuota objects that need to be synchronized queue workqueue.RateLimitingInterface // missingUsageQueue holds objects that are missing the initial usage information @@ -81,6 +87,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour // build the resource quota controller rq := &ResourceQuotaController{ kubeClient: options.KubeClient, + rqLister: options.ResourceQuotaInformer.Lister(), + informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced}, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"), missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"), resyncPeriod: options.ResyncPeriod, @@ -93,18 +101,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour // set the synchronization handler rq.syncHandler = rq.syncResourceQuotaFromKey - // build the controller that observes quota - rq.rqIndexer, rq.rqController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return rq.kubeClient.Core().ResourceQuotas(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return rq.kubeClient.Core().ResourceQuotas(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.ResourceQuota{}, - rq.resyncPeriod(), + options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: rq.addQuota, UpdateFunc: func(old, cur interface{}) { @@ -128,7 +125,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour // way of achieving this is by performing a `stop` operation on the controller. DeleteFunc: rq.enqueueResourceQuota, }, - cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}, + rq.resyncPeriod(), ) for _, groupKindToReplenish := range options.GroupKindsToReplenish { @@ -141,7 +138,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour if err != nil { glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err) } else { - rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController) + // make sure we wait for each shared informer's cache to sync + rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, replenishmentController.HasSynced) } } return rq @@ -150,8 +148,18 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour // enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics func (rq *ResourceQuotaController) enqueueAll() { defer glog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage") - for _, k := range rq.rqIndexer.ListKeys() { - rq.queue.Add(k) + rqs, err := rq.rqLister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err)) + return + } + for i := range rqs { + key, err := controller.KeyFunc(rqs[i]) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", rqs[i], err)) + continue + } + rq.queue.Add(key) } } @@ -228,18 +236,24 @@ func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) // Run begins quota controller using the specified number of workers func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - go rq.rqController.Run(stopCh) + + glog.Infof("Starting resource quota controller") + // the controllers that replenish other resources to respond rapidly to state changes for _, replenishmentController := range rq.replenishmentControllers { go replenishmentController.Run(stopCh) } + + if !cache.WaitForCacheSync(stopCh, rq.informerSyncedFuncs...) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + // the workers that chug through the quota calculation backlog for i := 0; i < workers; i++ { go wait.Until(rq.worker(rq.queue), time.Second, stopCh) go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh) } - // the timer for how often we do a full recalculation across all quotas - go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh) <-stopCh glog.Infof("Shutting down ResourceQuotaController") rq.queue.ShutDown() @@ -252,8 +266,12 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err glog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := rq.rqIndexer.GetByKey(key) - if !exists { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + quota, err := rq.rqLister.ResourceQuotas(namespace).Get(name) + if errors.IsNotFound(err) { glog.Infof("Resource quota has been deleted %v", key) return nil } @@ -262,17 +280,16 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err rq.queue.Add(key) return err } - quota := *obj.(*v1.ResourceQuota) return rq.syncResourceQuota(quota) } // syncResourceQuota runs a complete sync of resource quota status across all known kinds -func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota v1.ResourceQuota) (err error) { +func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota *v1.ResourceQuota) (err error) { // quota is dirty if any part of spec hard limits differs from the status hard limits dirty := !apiequality.Semantic.DeepEqual(v1ResourceQuota.Spec.Hard, v1ResourceQuota.Status.Hard) resourceQuota := api.ResourceQuota{} - if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(&v1ResourceQuota, &resourceQuota, nil); err != nil { + if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(v1ResourceQuota, &resourceQuota, nil); err != nil { return err } @@ -338,11 +355,14 @@ func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, na } // check if this namespace even has a quota... - indexKey := &v1.ResourceQuota{} - indexKey.Namespace = namespace - resourceQuotas, err := rq.rqIndexer.Index("namespace", indexKey) + resourceQuotas, err := rq.rqLister.ResourceQuotas(namespace).List(labels.Everything()) + if errors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod())) + return + } if err != nil { - glog.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod()) + utilruntime.HandleError(fmt.Errorf("error checking to see if namespace %s has any ResourceQuota associated with it: %v", namespace, err)) + return } if len(resourceQuotas) == 0 { return @@ -350,7 +370,7 @@ func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, na // only queue those quotas that are tracking a resource associated with this kind. for i := range resourceQuotas { - resourceQuota := resourceQuotas[i].(*v1.ResourceQuota) + resourceQuota := resourceQuotas[i] internalResourceQuota := &api.ResourceQuota{} if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil { glog.Error(err) diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index d85d884211..8e5c86acca 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/quota/generic" "k8s.io/kubernetes/pkg/quota/install" @@ -106,21 +107,23 @@ func TestSyncResourceQuota(t *testing.T) { } kubeClient := fake.NewSimpleClientset(&podList, &resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - KubeClient: kubeClient, - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), + KubeClient: kubeClient, + ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), + ResyncPeriod: controller.NoResyncPeriodFunc, + Registry: install.NewRegistry(kubeClient, nil), GroupKindsToReplenish: []schema.GroupKind{ api.Kind("Pod"), api.Kind("Service"), api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), + ControllerFactory: NewReplenishmentControllerFactory(informerFactory), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - err := quotaController.syncResourceQuota(resourceQuota) + err := quotaController.syncResourceQuota(&resourceQuota) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -191,21 +194,23 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) { } kubeClient := fake.NewSimpleClientset(&resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - KubeClient: kubeClient, - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), + KubeClient: kubeClient, + ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), + ResyncPeriod: controller.NoResyncPeriodFunc, + Registry: install.NewRegistry(kubeClient, nil), GroupKindsToReplenish: []schema.GroupKind{ api.Kind("Pod"), api.Kind("Service"), api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), + ControllerFactory: NewReplenishmentControllerFactory(informerFactory), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - err := quotaController.syncResourceQuota(resourceQuota) + err := quotaController.syncResourceQuota(&resourceQuota) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -279,21 +284,23 @@ func TestSyncResourceQuotaSpecHardChange(t *testing.T) { } kubeClient := fake.NewSimpleClientset(&resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - KubeClient: kubeClient, - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), + KubeClient: kubeClient, + ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), + ResyncPeriod: controller.NoResyncPeriodFunc, + Registry: install.NewRegistry(kubeClient, nil), GroupKindsToReplenish: []schema.GroupKind{ api.Kind("Pod"), api.Kind("Service"), api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), + ControllerFactory: NewReplenishmentControllerFactory(informerFactory), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - err := quotaController.syncResourceQuota(resourceQuota) + err := quotaController.syncResourceQuota(&resourceQuota) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -367,21 +374,23 @@ func TestSyncResourceQuotaNoChange(t *testing.T) { } kubeClient := fake.NewSimpleClientset(&v1.PodList{}, &resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - KubeClient: kubeClient, - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), + KubeClient: kubeClient, + ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), + ResyncPeriod: controller.NoResyncPeriodFunc, + Registry: install.NewRegistry(kubeClient, nil), GroupKindsToReplenish: []schema.GroupKind{ api.Kind("Pod"), api.Kind("Service"), api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), + ControllerFactory: NewReplenishmentControllerFactory(informerFactory), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) - err := quotaController.syncResourceQuota(resourceQuota) + err := quotaController.syncResourceQuota(&resourceQuota) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -399,16 +408,18 @@ func TestSyncResourceQuotaNoChange(t *testing.T) { func TestAddQuota(t *testing.T) { kubeClient := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ - KubeClient: kubeClient, - ResyncPeriod: controller.NoResyncPeriodFunc, - Registry: install.NewRegistry(kubeClient, nil), + KubeClient: kubeClient, + ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(), + ResyncPeriod: controller.NoResyncPeriodFunc, + Registry: install.NewRegistry(kubeClient, nil), GroupKindsToReplenish: []schema.GroupKind{ api.Kind("Pod"), api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), + ControllerFactory: NewReplenishmentControllerFactory(informerFactory), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) diff --git a/pkg/quota/evaluator/core/BUILD b/pkg/quota/evaluator/core/BUILD index 960c941553..7e6a4ccae3 100644 --- a/pkg/quota/evaluator/core/BUILD +++ b/pkg/quota/evaluator/core/BUILD @@ -28,7 +28,7 @@ go_library( "//pkg/api/validation:go_default_library", "//pkg/apis/storage/util:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/kubelet/qos:go_default_library", "//pkg/quota:go_default_library", "//pkg/quota/generic:go_default_library", diff --git a/pkg/quota/evaluator/core/persistent_volume_claims.go b/pkg/quota/evaluator/core/persistent_volume_claims.go index e77126b784..a55be74ab1 100644 --- a/pkg/quota/evaluator/core/persistent_volume_claims.go +++ b/pkg/quota/evaluator/core/persistent_volume_claims.go @@ -30,7 +30,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apis/storage/util" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/generic" ) @@ -83,7 +83,7 @@ func listPersistentVolumeClaimsByNamespaceFuncUsingClient(kubeClient clientset.I func NewPersistentVolumeClaimEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator { listFuncByNamespace := listPersistentVolumeClaimsByNamespaceFuncUsingClient(kubeClient) if f != nil { - listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, schema.GroupResource{Resource: "persistentvolumeclaims"}) + listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("persistentvolumeclaims")) } return &pvcEvaluator{ listFuncByNamespace: listFuncByNamespace, diff --git a/pkg/quota/evaluator/core/pods.go b/pkg/quota/evaluator/core/pods.go index 2962c5c38f..4f105fea9a 100644 --- a/pkg/quota/evaluator/core/pods.go +++ b/pkg/quota/evaluator/core/pods.go @@ -31,7 +31,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/generic" @@ -71,7 +71,7 @@ func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic. func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator { listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient) if f != nil { - listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, schema.GroupResource{Resource: "pods"}) + listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("pods")) } return &podEvaluator{ listFuncByNamespace: listFuncByNamespace, diff --git a/pkg/quota/evaluator/core/registry.go b/pkg/quota/evaluator/core/registry.go index 410362377b..8db23fe5f2 100644 --- a/pkg/quota/evaluator/core/registry.go +++ b/pkg/quota/evaluator/core/registry.go @@ -19,7 +19,7 @@ package core import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/generic" ) diff --git a/pkg/quota/generic/BUILD b/pkg/quota/generic/BUILD index 5edc35c841..e943c4571d 100644 --- a/pkg/quota/generic/BUILD +++ b/pkg/quota/generic/BUILD @@ -16,7 +16,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/quota:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", diff --git a/pkg/quota/generic/evaluator.go b/pkg/quota/generic/evaluator.go index 7829617265..f8c61ac6a9 100644 --- a/pkg/quota/generic/evaluator.go +++ b/pkg/quota/generic/evaluator.go @@ -26,18 +26,18 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/admission" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/quota" ) // ListResourceUsingInformerFunc returns a listing function based on the shared informer factory for the specified resource. -func ListResourceUsingInformerFunc(f informers.SharedInformerFactory, groupResource schema.GroupResource) ListFuncByNamespace { +func ListResourceUsingInformerFunc(f informers.SharedInformerFactory, resource schema.GroupVersionResource) ListFuncByNamespace { return func(namespace string, options metav1.ListOptions) ([]runtime.Object, error) { labelSelector, err := labels.Parse(options.LabelSelector) if err != nil { return nil, err } - informer, err := f.ForResource(groupResource) + informer, err := f.ForResource(resource) if err != nil { return nil, err } diff --git a/pkg/quota/install/BUILD b/pkg/quota/install/BUILD index 5efc605add..d01e8b9e88 100644 --- a/pkg/quota/install/BUILD +++ b/pkg/quota/install/BUILD @@ -13,7 +13,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/quota:go_default_library", "//pkg/quota/evaluator/core:go_default_library", ], diff --git a/pkg/quota/install/registry.go b/pkg/quota/install/registry.go index c1ac2cea8b..8222376ba4 100644 --- a/pkg/quota/install/registry.go +++ b/pkg/quota/install/registry.go @@ -18,7 +18,7 @@ package install import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/evaluator/core" ) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 03ecd7ac34..6e0837960c 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -281,15 +281,17 @@ func ClusterRoles() []rbac.ClusterRole { rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(), rbac.NewRule("list", "watch").Groups(legacyGroup).Resources( + "configmaps", "namespaces", "nodes", "persistentvolumeclaims", "persistentvolumes", "pods", + "replicationcontrollers", + "resourcequotas", "secrets", "services", "serviceaccounts", - "replicationcontrollers", ).RuleOrDie(), rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index 55dc479498..dbd54ee6bc 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -446,12 +446,14 @@ items: - apiGroups: - "" resources: + - configmaps - namespaces - nodes - persistentvolumeclaims - persistentvolumes - pods - replicationcontrollers + - resourcequotas - secrets - serviceaccounts - services diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index aade89595d..3029b32a37 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -94,7 +94,6 @@ func TestQuota(t *testing.T) { false, ) rm.SetEventRecorder(&record.FakeRecorder{}) - informers.Start(controllerCh) go rm.Run(3, controllerCh) resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil) @@ -103,13 +102,15 @@ func TestQuota(t *testing.T) { } resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ KubeClient: clientset, + ResourceQuotaInformer: informers.Core().V1().ResourceQuotas(), ResyncPeriod: controller.NoResyncPeriodFunc, Registry: resourceQuotaRegistry, GroupKindsToReplenish: groupKindsToReplenish, ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(clientset), + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(informers), } go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, controllerCh) + informers.Start(controllerCh) startTime := time.Now() scale(t, ns2.Name, clientset)