mirror of https://github.com/k3s-io/k3s
Switch resourcequota controller to shared informers
parent
436fa5c9d1
commit
d820e3928c
|
@ -76,7 +76,7 @@ func startPodGCController(ctx ControllerContext) (bool, error) {
|
|||
|
||||
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
|
||||
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
|
||||
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
|
||||
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.NewInformerFactory)
|
||||
groupKindsToReplenish := []schema.GroupKind{
|
||||
api.Kind("Pod"),
|
||||
api.Kind("Service"),
|
||||
|
@ -87,9 +87,10 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) {
|
|||
}
|
||||
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
||||
KubeClient: resourceQuotaControllerClient,
|
||||
ResourceQuotaInformer: ctx.NewInformerFactory.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
|
||||
Registry: resourceQuotaRegistry,
|
||||
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient),
|
||||
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.NewInformerFactory),
|
||||
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
|
||||
GroupKindsToReplenish: groupKindsToReplenish,
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ go_library(
|
|||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/apis/componentconfig:go_default_library",
|
||||
"//pkg/apis/componentconfig/install:go_default_library",
|
||||
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
|
||||
"//pkg/util/taints:go_default_library",
|
||||
"//vendor:github.com/spf13/pflag",
|
||||
|
|
|
@ -26,6 +26,8 @@ import (
|
|||
utilflag "k8s.io/apiserver/pkg/util/flag"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
// Need to make sure the componentconfig api is installed so defaulting funcs work
|
||||
_ "k8s.io/kubernetes/pkg/apis/componentconfig/install"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||
utiltaints "k8s.io/kubernetes/pkg/util/taints"
|
||||
|
||||
|
|
|
@ -20,20 +20,23 @@ go_library(
|
|||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
|
||||
"//pkg/client/listers/core/v1:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/quota:go_default_library",
|
||||
"//pkg/quota/evaluator/core:go_default_library",
|
||||
"//pkg/util/metrics:go_default_library",
|
||||
"//vendor:github.com/golang/glog",
|
||||
"//vendor:k8s.io/apimachinery/pkg/api/equality",
|
||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||
"//vendor:k8s.io/apimachinery/pkg/api/meta",
|
||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||
"//vendor:k8s.io/apimachinery/pkg/labels",
|
||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
||||
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
||||
"//vendor:k8s.io/client-go/tools/cache",
|
||||
"//vendor:k8s.io/client-go/util/workqueue",
|
||||
],
|
||||
|
@ -51,6 +54,7 @@ go_test(
|
|||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/quota/generic:go_default_library",
|
||||
"//pkg/quota/install:go_default_library",
|
||||
|
|
|
@ -22,19 +22,15 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
"k8s.io/kubernetes/pkg/quota/evaluator/core"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
)
|
||||
|
||||
// ReplenishmentFunc is a function that is invoked when controller sees a change
|
||||
|
@ -96,151 +92,96 @@ type ReplenishmentControllerFactory interface {
|
|||
|
||||
// replenishmentControllerFactory implements ReplenishmentControllerFactory
|
||||
type replenishmentControllerFactory struct {
|
||||
kubeClient clientset.Interface
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
}
|
||||
|
||||
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers
|
||||
// to replenish resources when updated or deleted
|
||||
func NewReplenishmentControllerFactory(f informers.SharedInformerFactory, kubeClient clientset.Interface) ReplenishmentControllerFactory {
|
||||
func NewReplenishmentControllerFactory(f informers.SharedInformerFactory) ReplenishmentControllerFactory {
|
||||
return &replenishmentControllerFactory{
|
||||
kubeClient: kubeClient,
|
||||
sharedInformerFactory: f,
|
||||
}
|
||||
}
|
||||
|
||||
// NewReplenishmentControllerFactoryFromClient returns a factory that knows how to build controllers to replenish resources
|
||||
// when updated or deleted using the specified client.
|
||||
func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory {
|
||||
return NewReplenishmentControllerFactory(nil, kubeClient)
|
||||
}
|
||||
|
||||
// controllerFor returns a replenishment controller for the specified group resource.
|
||||
func controllerFor(
|
||||
groupResource schema.GroupResource,
|
||||
f informers.SharedInformerFactory,
|
||||
handlerFuncs cache.ResourceEventHandlerFuncs,
|
||||
) (cache.Controller, error) {
|
||||
genericInformer, err := f.ForResource(groupResource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informer := genericInformer.Informer()
|
||||
informer.AddEventHandler(handlerFuncs)
|
||||
return informer.GetController(), nil
|
||||
}
|
||||
|
||||
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.Controller, err error) {
|
||||
if r.kubeClient != nil && r.kubeClient.Core().RESTClient().GetRateLimiter() != nil {
|
||||
metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().RESTClient().GetRateLimiter())
|
||||
}
|
||||
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) {
|
||||
var (
|
||||
informer informers.GenericInformer
|
||||
err error
|
||||
)
|
||||
|
||||
switch options.GroupKind {
|
||||
case api.Kind("Pod"):
|
||||
if r.sharedInformerFactory != nil {
|
||||
result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
|
||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("pods"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: PodReplenishmentUpdateFunc(options),
|
||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||
})
|
||||
break
|
||||
}
|
||||
result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
|
||||
case api.Kind("Service"):
|
||||
// TODO move to informer when defined
|
||||
_, result = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return r.kubeClient.Core().Services(metav1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return r.kubeClient.Core().Services(metav1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.Service{},
|
||||
options.ResyncPeriod(),
|
||||
)
|
||||
case api.Kind("Service"):
|
||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("services"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: ServiceReplenishmentUpdateFunc(options),
|
||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||
},
|
||||
options.ResyncPeriod(),
|
||||
)
|
||||
case api.Kind("ReplicationController"):
|
||||
// TODO move to informer when defined
|
||||
_, result = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return r.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return r.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.ReplicationController{},
|
||||
options.ResyncPeriod(),
|
||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("replicationcontrollers"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||
},
|
||||
options.ResyncPeriod(),
|
||||
)
|
||||
case api.Kind("PersistentVolumeClaim"):
|
||||
if r.sharedInformerFactory != nil {
|
||||
result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||
})
|
||||
break
|
||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths...
|
||||
_, result = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return r.kubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return r.kubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.PersistentVolumeClaim{},
|
||||
options.ResyncPeriod(),
|
||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||
},
|
||||
options.ResyncPeriod(),
|
||||
)
|
||||
case api.Kind("Secret"):
|
||||
// TODO move to informer when defined
|
||||
_, result = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return r.kubeClient.Core().Secrets(metav1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return r.kubeClient.Core().Secrets(metav1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.Secret{},
|
||||
options.ResyncPeriod(),
|
||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("secrets"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||
},
|
||||
options.ResyncPeriod(),
|
||||
)
|
||||
case api.Kind("ConfigMap"):
|
||||
// TODO move to informer when defined
|
||||
_, result = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return r.kubeClient.Core().ConfigMaps(metav1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return r.kubeClient.Core().ConfigMaps(metav1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.ConfigMap{},
|
||||
options.ResyncPeriod(),
|
||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("configmaps"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||
},
|
||||
options.ResyncPeriod(),
|
||||
)
|
||||
default:
|
||||
return nil, NewUnhandledGroupKindError(options.GroupKind)
|
||||
}
|
||||
return result, err
|
||||
return informer.Informer().GetController(), nil
|
||||
}
|
||||
|
||||
// ServiceReplenishmentUpdateFunc will replenish if the service was quota tracked has changed service type
|
||||
|
|
|
@ -17,22 +17,26 @@ limitations under the License.
|
|||
package resourcequota
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
|
||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
|
@ -42,6 +46,8 @@ import (
|
|||
type ResourceQuotaControllerOptions struct {
|
||||
// Must have authority to list all quotas, and update quota status
|
||||
KubeClient clientset.Interface
|
||||
// Shared informer for resource quotas
|
||||
ResourceQuotaInformer coreinformers.ResourceQuotaInformer
|
||||
// Controls full recalculation of quota usage
|
||||
ResyncPeriod controller.ResyncPeriodFunc
|
||||
// Knows how to calculate usage
|
||||
|
@ -59,10 +65,10 @@ type ResourceQuotaControllerOptions struct {
|
|||
type ResourceQuotaController struct {
|
||||
// Must have authority to list all resources in the system, and update quota status
|
||||
kubeClient clientset.Interface
|
||||
// An index of resource quota objects by namespace
|
||||
rqIndexer cache.Indexer
|
||||
// Watches changes to all resource quota
|
||||
rqController cache.Controller
|
||||
// A lister/getter of resource quota objects
|
||||
rqLister corelisters.ResourceQuotaLister
|
||||
// A list of functions that return true when their caches have synced
|
||||
informerSyncedFuncs []cache.InformerSynced
|
||||
// ResourceQuota objects that need to be synchronized
|
||||
queue workqueue.RateLimitingInterface
|
||||
// missingUsageQueue holds objects that are missing the initial usage information
|
||||
|
@ -81,6 +87,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||
// build the resource quota controller
|
||||
rq := &ResourceQuotaController{
|
||||
kubeClient: options.KubeClient,
|
||||
rqLister: options.ResourceQuotaInformer.Lister(),
|
||||
informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced},
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
|
||||
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
|
||||
resyncPeriod: options.ResyncPeriod,
|
||||
|
@ -93,18 +101,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||
// set the synchronization handler
|
||||
rq.syncHandler = rq.syncResourceQuotaFromKey
|
||||
|
||||
// build the controller that observes quota
|
||||
rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return rq.kubeClient.Core().ResourceQuotas(metav1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return rq.kubeClient.Core().ResourceQuotas(metav1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&v1.ResourceQuota{},
|
||||
rq.resyncPeriod(),
|
||||
options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: rq.addQuota,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
|
@ -128,7 +125,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||
// way of achieving this is by performing a `stop` operation on the controller.
|
||||
DeleteFunc: rq.enqueueResourceQuota,
|
||||
},
|
||||
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
||||
rq.resyncPeriod(),
|
||||
)
|
||||
|
||||
for _, groupKindToReplenish := range options.GroupKindsToReplenish {
|
||||
|
@ -141,7 +138,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||
if err != nil {
|
||||
glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
|
||||
} else {
|
||||
rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController)
|
||||
// make sure we wait for each shared informer's cache to sync
|
||||
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, replenishmentController.HasSynced)
|
||||
}
|
||||
}
|
||||
return rq
|
||||
|
@ -150,8 +148,18 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
|
||||
func (rq *ResourceQuotaController) enqueueAll() {
|
||||
defer glog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
|
||||
for _, k := range rq.rqIndexer.ListKeys() {
|
||||
rq.queue.Add(k)
|
||||
rqs, err := rq.rqLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
|
||||
return
|
||||
}
|
||||
for i := range rqs {
|
||||
key, err := controller.KeyFunc(rqs[i])
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", rqs[i], err))
|
||||
continue
|
||||
}
|
||||
rq.queue.Add(key)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -228,18 +236,24 @@ func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface)
|
|||
// Run begins quota controller using the specified number of workers
|
||||
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
go rq.rqController.Run(stopCh)
|
||||
|
||||
glog.Infof("Starting resource quota controller")
|
||||
|
||||
// the controllers that replenish other resources to respond rapidly to state changes
|
||||
for _, replenishmentController := range rq.replenishmentControllers {
|
||||
go replenishmentController.Run(stopCh)
|
||||
}
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, rq.informerSyncedFuncs...) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||
return
|
||||
}
|
||||
|
||||
// the workers that chug through the quota calculation backlog
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
|
||||
go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
|
||||
}
|
||||
// the timer for how often we do a full recalculation across all quotas
|
||||
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down ResourceQuotaController")
|
||||
rq.queue.ShutDown()
|
||||
|
@ -252,8 +266,12 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err
|
|||
glog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
obj, exists, err := rq.rqIndexer.GetByKey(key)
|
||||
if !exists {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
quota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
|
||||
if errors.IsNotFound(err) {
|
||||
glog.Infof("Resource quota has been deleted %v", key)
|
||||
return nil
|
||||
}
|
||||
|
@ -262,17 +280,16 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err
|
|||
rq.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
quota := *obj.(*v1.ResourceQuota)
|
||||
return rq.syncResourceQuota(quota)
|
||||
}
|
||||
|
||||
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
|
||||
func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota v1.ResourceQuota) (err error) {
|
||||
func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota *v1.ResourceQuota) (err error) {
|
||||
// quota is dirty if any part of spec hard limits differs from the status hard limits
|
||||
dirty := !apiequality.Semantic.DeepEqual(v1ResourceQuota.Spec.Hard, v1ResourceQuota.Status.Hard)
|
||||
|
||||
resourceQuota := api.ResourceQuota{}
|
||||
if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(&v1ResourceQuota, &resourceQuota, nil); err != nil {
|
||||
if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(v1ResourceQuota, &resourceQuota, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -338,11 +355,14 @@ func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, na
|
|||
}
|
||||
|
||||
// check if this namespace even has a quota...
|
||||
indexKey := &v1.ResourceQuota{}
|
||||
indexKey.Namespace = namespace
|
||||
resourceQuotas, err := rq.rqIndexer.Index("namespace", indexKey)
|
||||
resourceQuotas, err := rq.rqLister.ResourceQuotas(namespace).List(labels.Everything())
|
||||
if errors.IsNotFound(err) {
|
||||
utilruntime.HandleError(fmt.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod()))
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod())
|
||||
utilruntime.HandleError(fmt.Errorf("error checking to see if namespace %s has any ResourceQuota associated with it: %v", namespace, err))
|
||||
return
|
||||
}
|
||||
if len(resourceQuotas) == 0 {
|
||||
return
|
||||
|
@ -350,7 +370,7 @@ func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, na
|
|||
|
||||
// only queue those quotas that are tracking a resource associated with this kind.
|
||||
for i := range resourceQuotas {
|
||||
resourceQuota := resourceQuotas[i].(*v1.ResourceQuota)
|
||||
resourceQuota := resourceQuotas[i]
|
||||
internalResourceQuota := &api.ResourceQuota{}
|
||||
if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil {
|
||||
glog.Error(err)
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/quota/generic"
|
||||
"k8s.io/kubernetes/pkg/quota/install"
|
||||
|
@ -106,8 +107,10 @@ func TestSyncResourceQuota(t *testing.T) {
|
|||
}
|
||||
|
||||
kubeClient := fake.NewSimpleClientset(&podList, &resourceQuota)
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||
KubeClient: kubeClient,
|
||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
Registry: install.NewRegistry(kubeClient, nil),
|
||||
GroupKindsToReplenish: []schema.GroupKind{
|
||||
|
@ -116,11 +119,11 @@ func TestSyncResourceQuota(t *testing.T) {
|
|||
api.Kind("ReplicationController"),
|
||||
api.Kind("PersistentVolumeClaim"),
|
||||
},
|
||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
}
|
||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||
err := quotaController.syncResourceQuota(resourceQuota)
|
||||
err := quotaController.syncResourceQuota(&resourceQuota)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -191,8 +194,10 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) {
|
|||
}
|
||||
|
||||
kubeClient := fake.NewSimpleClientset(&resourceQuota)
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||
KubeClient: kubeClient,
|
||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
Registry: install.NewRegistry(kubeClient, nil),
|
||||
GroupKindsToReplenish: []schema.GroupKind{
|
||||
|
@ -201,11 +206,11 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) {
|
|||
api.Kind("ReplicationController"),
|
||||
api.Kind("PersistentVolumeClaim"),
|
||||
},
|
||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
}
|
||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||
err := quotaController.syncResourceQuota(resourceQuota)
|
||||
err := quotaController.syncResourceQuota(&resourceQuota)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -279,8 +284,10 @@ func TestSyncResourceQuotaSpecHardChange(t *testing.T) {
|
|||
}
|
||||
|
||||
kubeClient := fake.NewSimpleClientset(&resourceQuota)
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||
KubeClient: kubeClient,
|
||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
Registry: install.NewRegistry(kubeClient, nil),
|
||||
GroupKindsToReplenish: []schema.GroupKind{
|
||||
|
@ -289,11 +296,11 @@ func TestSyncResourceQuotaSpecHardChange(t *testing.T) {
|
|||
api.Kind("ReplicationController"),
|
||||
api.Kind("PersistentVolumeClaim"),
|
||||
},
|
||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
}
|
||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||
err := quotaController.syncResourceQuota(resourceQuota)
|
||||
err := quotaController.syncResourceQuota(&resourceQuota)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -367,8 +374,10 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
|
|||
}
|
||||
|
||||
kubeClient := fake.NewSimpleClientset(&v1.PodList{}, &resourceQuota)
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||
KubeClient: kubeClient,
|
||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
Registry: install.NewRegistry(kubeClient, nil),
|
||||
GroupKindsToReplenish: []schema.GroupKind{
|
||||
|
@ -377,11 +386,11 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
|
|||
api.Kind("ReplicationController"),
|
||||
api.Kind("PersistentVolumeClaim"),
|
||||
},
|
||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
}
|
||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||
err := quotaController.syncResourceQuota(resourceQuota)
|
||||
err := quotaController.syncResourceQuota(&resourceQuota)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -399,8 +408,10 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
|
|||
|
||||
func TestAddQuota(t *testing.T) {
|
||||
kubeClient := fake.NewSimpleClientset()
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||
KubeClient: kubeClient,
|
||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
Registry: install.NewRegistry(kubeClient, nil),
|
||||
GroupKindsToReplenish: []schema.GroupKind{
|
||||
|
@ -408,7 +419,7 @@ func TestAddQuota(t *testing.T) {
|
|||
api.Kind("ReplicationController"),
|
||||
api.Kind("PersistentVolumeClaim"),
|
||||
},
|
||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
}
|
||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||
|
|
|
@ -28,7 +28,7 @@ go_library(
|
|||
"//pkg/api/validation:go_default_library",
|
||||
"//pkg/apis/storage/util:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/kubelet/qos:go_default_library",
|
||||
"//pkg/quota:go_default_library",
|
||||
"//pkg/quota/generic:go_default_library",
|
||||
|
|
|
@ -30,7 +30,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apis/storage/util"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/quota/generic"
|
||||
)
|
||||
|
@ -83,7 +83,7 @@ func listPersistentVolumeClaimsByNamespaceFuncUsingClient(kubeClient clientset.I
|
|||
func NewPersistentVolumeClaimEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
|
||||
listFuncByNamespace := listPersistentVolumeClaimsByNamespaceFuncUsingClient(kubeClient)
|
||||
if f != nil {
|
||||
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, schema.GroupResource{Resource: "persistentvolumeclaims"})
|
||||
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"))
|
||||
}
|
||||
return &pvcEvaluator{
|
||||
listFuncByNamespace: listFuncByNamespace,
|
||||
|
|
|
@ -31,7 +31,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/quota/generic"
|
||||
|
@ -71,7 +71,7 @@ func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic.
|
|||
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
|
||||
listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient)
|
||||
if f != nil {
|
||||
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, schema.GroupResource{Resource: "pods"})
|
||||
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("pods"))
|
||||
}
|
||||
return &podEvaluator{
|
||||
listFuncByNamespace: listFuncByNamespace,
|
||||
|
|
|
@ -19,7 +19,7 @@ package core
|
|||
import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/quota/generic"
|
||||
)
|
||||
|
|
|
@ -16,7 +16,7 @@ go_library(
|
|||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/quota:go_default_library",
|
||||
"//vendor:k8s.io/apimachinery/pkg/api/resource",
|
||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||
|
|
|
@ -26,18 +26,18 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
)
|
||||
|
||||
// ListResourceUsingInformerFunc returns a listing function based on the shared informer factory for the specified resource.
|
||||
func ListResourceUsingInformerFunc(f informers.SharedInformerFactory, groupResource schema.GroupResource) ListFuncByNamespace {
|
||||
func ListResourceUsingInformerFunc(f informers.SharedInformerFactory, resource schema.GroupVersionResource) ListFuncByNamespace {
|
||||
return func(namespace string, options metav1.ListOptions) ([]runtime.Object, error) {
|
||||
labelSelector, err := labels.Parse(options.LabelSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informer, err := f.ForResource(groupResource)
|
||||
informer, err := f.ForResource(resource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ go_library(
|
|||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/quota:go_default_library",
|
||||
"//pkg/quota/evaluator/core:go_default_library",
|
||||
],
|
||||
|
|
|
@ -18,7 +18,7 @@ package install
|
|||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/quota/evaluator/core"
|
||||
)
|
||||
|
|
|
@ -281,15 +281,17 @@ func ClusterRoles() []rbac.ClusterRole {
|
|||
rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(),
|
||||
|
||||
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources(
|
||||
"configmaps",
|
||||
"namespaces",
|
||||
"nodes",
|
||||
"persistentvolumeclaims",
|
||||
"persistentvolumes",
|
||||
"pods",
|
||||
"replicationcontrollers",
|
||||
"resourcequotas",
|
||||
"secrets",
|
||||
"services",
|
||||
"serviceaccounts",
|
||||
"replicationcontrollers",
|
||||
).RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(),
|
||||
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
|
||||
|
|
|
@ -446,12 +446,14 @@ items:
|
|||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- configmaps
|
||||
- namespaces
|
||||
- nodes
|
||||
- persistentvolumeclaims
|
||||
- persistentvolumes
|
||||
- pods
|
||||
- replicationcontrollers
|
||||
- resourcequotas
|
||||
- secrets
|
||||
- serviceaccounts
|
||||
- services
|
||||
|
|
|
@ -94,7 +94,6 @@ func TestQuota(t *testing.T) {
|
|||
false,
|
||||
)
|
||||
rm.SetEventRecorder(&record.FakeRecorder{})
|
||||
informers.Start(controllerCh)
|
||||
go rm.Run(3, controllerCh)
|
||||
|
||||
resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil)
|
||||
|
@ -103,13 +102,15 @@ func TestQuota(t *testing.T) {
|
|||
}
|
||||
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
||||
KubeClient: clientset,
|
||||
ResourceQuotaInformer: informers.Core().V1().ResourceQuotas(),
|
||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
Registry: resourceQuotaRegistry,
|
||||
GroupKindsToReplenish: groupKindsToReplenish,
|
||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(clientset),
|
||||
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(informers),
|
||||
}
|
||||
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, controllerCh)
|
||||
informers.Start(controllerCh)
|
||||
|
||||
startTime := time.Now()
|
||||
scale(t, ns2.Name, clientset)
|
||||
|
|
Loading…
Reference in New Issue