2015-01-25 06:11:10 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2014 The Kubernetes Authors .
2015-01-25 06:11:10 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2015-10-10 03:58:57 +00:00
package resourcequota
2015-01-25 06:11:10 +00:00
import (
2017-02-10 16:25:54 +00:00
"fmt"
2017-10-27 15:07:53 +00:00
"reflect"
"sync"
2015-01-25 06:11:10 +00:00
"time"
2015-08-05 22:05:17 +00:00
"github.com/golang/glog"
2016-02-22 16:15:09 +00:00
2017-06-22 17:25:57 +00:00
"k8s.io/api/core/v1"
2017-01-25 13:39:54 +00:00
apiequality "k8s.io/apimachinery/pkg/api/equality"
2017-02-10 16:25:54 +00:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-17 03:38:19 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2017-02-10 16:25:54 +00:00
"k8s.io/apimachinery/pkg/labels"
2017-01-11 14:09:48 +00:00
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
2017-10-27 15:07:53 +00:00
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
2017-06-23 20:56:37 +00:00
coreinformers "k8s.io/client-go/informers/core/v1"
2017-07-19 19:36:45 +00:00
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
2017-06-23 20:56:37 +00:00
corelisters "k8s.io/client-go/listers/core/v1"
2017-01-24 14:11:51 +00:00
"k8s.io/client-go/tools/cache"
2017-01-27 15:20:40 +00:00
"k8s.io/client-go/util/workqueue"
2017-11-08 22:34:54 +00:00
api "k8s.io/kubernetes/pkg/apis/core"
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
2015-11-11 21:19:39 +00:00
"k8s.io/kubernetes/pkg/controller"
2016-02-22 16:15:09 +00:00
"k8s.io/kubernetes/pkg/quota"
2015-01-25 06:11:10 +00:00
)
2017-10-27 15:07:53 +00:00
// NamespacedResourcesFunc knows how to discover namespaced resources.
type NamespacedResourcesFunc func ( ) ( [ ] * metav1 . APIResourceList , error )
// ReplenishmentFunc is a signal that a resource changed in specified namespace
// that may require quota to be recalculated.
type ReplenishmentFunc func ( groupResource schema . GroupResource , namespace string )
// InformerFactory is all the quota system needs to interface with informers.
type InformerFactory interface {
ForResource ( resource schema . GroupVersionResource ) ( informers . GenericInformer , error )
Start ( stopCh <- chan struct { } )
}
2016-02-22 16:15:09 +00:00
// ResourceQuotaControllerOptions holds options for creating a quota controller
type ResourceQuotaControllerOptions struct {
// Must have authority to list all quotas, and update quota status
2017-07-19 19:36:45 +00:00
QuotaClient corev1client . ResourceQuotasGetter
2017-02-10 16:25:54 +00:00
// Shared informer for resource quotas
ResourceQuotaInformer coreinformers . ResourceQuotaInformer
2016-02-22 16:15:09 +00:00
// Controls full recalculation of quota usage
ResyncPeriod controller . ResyncPeriodFunc
2017-10-27 15:07:53 +00:00
// Maintains evaluators that know how to calculate usage for group resource
2016-02-22 16:15:09 +00:00
Registry quota . Registry
2017-10-27 15:07:53 +00:00
// Discover list of supported resources on the server.
DiscoveryFunc NamespacedResourcesFunc
// A function that returns the list of resources to ignore
IgnoredResourcesFunc func ( ) map [ schema . GroupResource ] struct { }
// InformersStarted knows if informers were started.
InformersStarted <- chan struct { }
// InformerFactory interfaces with informers.
InformerFactory InformerFactory
2017-01-10 07:27:23 +00:00
// Controls full resync of objects monitored for replenishment.
2016-03-07 07:20:32 +00:00
ReplenishmentResyncPeriod controller . ResyncPeriodFunc
2016-02-22 16:15:09 +00:00
}
2015-07-31 11:38:04 +00:00
// ResourceQuotaController is responsible for tracking quota usage status in the system
type ResourceQuotaController struct {
2015-11-11 21:19:39 +00:00
// Must have authority to list all resources in the system, and update quota status
2017-07-19 19:36:45 +00:00
rqClient corev1client . ResourceQuotasGetter
2017-02-10 16:25:54 +00:00
// A lister/getter of resource quota objects
rqLister corelisters . ResourceQuotaLister
// A list of functions that return true when their caches have synced
informerSyncedFuncs [ ] cache . InformerSynced
2015-11-11 21:19:39 +00:00
// ResourceQuota objects that need to be synchronized
2016-05-17 15:09:30 +00:00
queue workqueue . RateLimitingInterface
2017-01-10 07:27:23 +00:00
// missingUsageQueue holds objects that are missing the initial usage information
2016-07-18 19:48:37 +00:00
missingUsageQueue workqueue . RateLimitingInterface
2015-01-25 06:11:10 +00:00
// To allow injection of syncUsage for testing.
2015-11-11 21:19:39 +00:00
syncHandler func ( key string ) error
// function that controls full recalculation of quota usage
resyncPeriod controller . ResyncPeriodFunc
2016-02-22 16:15:09 +00:00
// knows how to calculate usage
registry quota . Registry
2017-10-27 15:07:53 +00:00
// knows how to monitor all the resources tracked by quota and trigger replenishment
quotaMonitor * QuotaMonitor
// controls the workers that process quotas
// this lock is acquired to control write access to the monitors and ensures that all
// monitors are synced before the controller can process quotas.
workerLock sync . RWMutex
2015-01-25 06:11:10 +00:00
}
2017-10-27 15:07:53 +00:00
// NewResourceQuotaController creates a quota controller with specified options
func NewResourceQuotaController ( options * ResourceQuotaControllerOptions ) ( * ResourceQuotaController , error ) {
2016-02-22 16:15:09 +00:00
// build the resource quota controller
2015-11-11 21:19:39 +00:00
rq := & ResourceQuotaController {
2017-10-17 06:05:40 +00:00
rqClient : options . QuotaClient ,
rqLister : options . ResourceQuotaInformer . Lister ( ) ,
informerSyncedFuncs : [ ] cache . InformerSynced { options . ResourceQuotaInformer . Informer ( ) . HasSynced } ,
queue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "resourcequota_primary" ) ,
missingUsageQueue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "resourcequota_priority" ) ,
resyncPeriod : options . ResyncPeriod ,
registry : options . Registry ,
2015-01-25 06:11:10 +00:00
}
2016-02-22 16:15:09 +00:00
// set the synchronization handler
rq . syncHandler = rq . syncResourceQuotaFromKey
2017-02-10 16:25:54 +00:00
options . ResourceQuotaInformer . Informer ( ) . AddEventHandlerWithResyncPeriod (
2016-09-14 18:35:38 +00:00
cache . ResourceEventHandlerFuncs {
2016-07-18 19:48:37 +00:00
AddFunc : rq . addQuota ,
2015-11-11 21:19:39 +00:00
UpdateFunc : func ( old , cur interface { } ) {
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
// We ignore all updates to quota.Status because they are all driven by this controller.
// IMPORTANT:
// We do not use this function to queue up a full quota recalculation. To do so, would require
// us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
// that cannot be backed by a cache and result in a full query of a namespace's content, we do not
// want to pay the price on spurious status updates. As a result, we have a separate routine that is
// responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
2016-11-18 20:50:17 +00:00
oldResourceQuota := old . ( * v1 . ResourceQuota )
curResourceQuota := cur . ( * v1 . ResourceQuota )
if quota . V1Equals ( oldResourceQuota . Spec . Hard , curResourceQuota . Spec . Hard ) {
2015-11-11 21:19:39 +00:00
return
}
2016-07-18 19:48:37 +00:00
rq . addQuota ( curResourceQuota )
2015-11-11 21:19:39 +00:00
} ,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc : rq . enqueueResourceQuota ,
} ,
2017-02-10 16:25:54 +00:00
rq . resyncPeriod ( ) ,
2015-11-11 21:19:39 +00:00
)
2017-12-13 13:39:55 +00:00
if options . DiscoveryFunc != nil {
qm := & QuotaMonitor {
informersStarted : options . InformersStarted ,
informerFactory : options . InformerFactory ,
ignoredResources : options . IgnoredResourcesFunc ( ) ,
resourceChanges : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "resource_quota_controller_resource_changes" ) ,
resyncPeriod : options . ReplenishmentResyncPeriod ,
replenishmentFunc : rq . replenishQuota ,
registry : rq . registry ,
}
2017-10-27 15:07:53 +00:00
2017-12-13 13:39:55 +00:00
rq . quotaMonitor = qm
2018-08-15 12:19:24 +00:00
// do initial quota monitor setup. If we have a discovery failure here, it's ok. We'll discover more resources when a later sync happens.
2017-12-13 13:39:55 +00:00
resources , err := GetQuotableResources ( options . DiscoveryFunc )
2018-08-15 12:19:24 +00:00
if discovery . IsGroupDiscoveryFailedError ( err ) {
utilruntime . HandleError ( fmt . Errorf ( "initial discovery check failure, continuing and counting on future sync update: %v" , err ) )
} else if err != nil {
2017-12-13 13:39:55 +00:00
return nil , err
}
2018-08-15 12:19:24 +00:00
2017-12-13 13:39:55 +00:00
if err = qm . SyncMonitors ( resources ) ; err != nil {
utilruntime . HandleError ( fmt . Errorf ( "initial monitor sync has error: %v" , err ) )
}
2017-10-27 15:07:53 +00:00
2017-12-13 13:39:55 +00:00
// only start quota once all informers synced
rq . informerSyncedFuncs = append ( rq . informerSyncedFuncs , qm . IsSynced )
}
2017-10-27 15:07:53 +00:00
return rq , nil
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
func ( rq * ResourceQuotaController ) enqueueAll ( ) {
defer glog . V ( 4 ) . Infof ( "Resource quota controller queued all resource quota for full calculation of usage" )
2017-02-10 16:25:54 +00:00
rqs , err := rq . rqLister . List ( labels . Everything ( ) )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "unable to enqueue all - error listing resource quotas: %v" , err ) )
return
}
for i := range rqs {
key , err := controller . KeyFunc ( rqs [ i ] )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for object %+v: %v" , rqs [ i ] , err ) )
continue
}
rq . queue . Add ( key )
2015-11-11 21:19:39 +00:00
}
2015-01-25 06:11:10 +00:00
}
2016-11-18 20:50:17 +00:00
// obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
2015-11-11 21:19:39 +00:00
func ( rq * ResourceQuotaController ) enqueueResourceQuota ( obj interface { } ) {
key , err := controller . KeyFunc ( obj )
2015-01-25 06:11:10 +00:00
if err != nil {
2015-11-11 21:19:39 +00:00
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
rq . queue . Add ( key )
}
2016-07-18 19:48:37 +00:00
func ( rq * ResourceQuotaController ) addQuota ( obj interface { } ) {
key , err := controller . KeyFunc ( obj )
if err != nil {
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
}
2016-11-18 20:50:17 +00:00
resourceQuota := obj . ( * v1 . ResourceQuota )
2016-07-18 19:48:37 +00:00
// if we declared an intent that is not yet captured in status (prioritize it)
2017-01-25 13:39:54 +00:00
if ! apiequality . Semantic . DeepEqual ( resourceQuota . Spec . Hard , resourceQuota . Status . Hard ) {
2016-07-18 19:48:37 +00:00
rq . missingUsageQueue . Add ( key )
return
}
// if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
for constraint := range resourceQuota . Status . Hard {
if _ , usageFound := resourceQuota . Status . Used [ constraint ] ; ! usageFound {
2016-11-18 20:50:17 +00:00
matchedResources := [ ] api . ResourceName { api . ResourceName ( constraint ) }
2017-10-27 15:07:53 +00:00
for _ , evaluator := range rq . registry . List ( ) {
2016-12-07 20:44:16 +00:00
if intersection := evaluator . MatchingResources ( matchedResources ) ; len ( intersection ) > 0 {
2016-07-18 19:48:37 +00:00
rq . missingUsageQueue . Add ( key )
return
}
}
}
}
// no special priority, go in normal recalc queue
rq . queue . Add ( key )
}
2015-11-11 21:19:39 +00:00
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
2016-07-18 19:48:37 +00:00
func ( rq * ResourceQuotaController ) worker ( queue workqueue . RateLimitingInterface ) func ( ) {
2016-05-10 19:59:34 +00:00
workFunc := func ( ) bool {
2016-07-18 19:48:37 +00:00
key , quit := queue . Get ( )
2016-05-10 19:59:34 +00:00
if quit {
return true
}
2016-07-18 19:48:37 +00:00
defer queue . Done ( key )
2018-01-10 21:36:01 +00:00
rq . workerLock . RLock ( )
defer rq . workerLock . RUnlock ( )
2016-05-10 19:59:34 +00:00
err := rq . syncHandler ( key . ( string ) )
2016-05-17 15:09:30 +00:00
if err == nil {
2016-07-18 19:48:37 +00:00
queue . Forget ( key )
2016-05-17 15:09:30 +00:00
return false
2016-05-10 19:59:34 +00:00
}
2016-05-17 15:09:30 +00:00
utilruntime . HandleError ( err )
2016-07-18 19:48:37 +00:00
queue . AddRateLimited ( key )
2016-05-10 19:59:34 +00:00
return false
}
2016-07-18 19:48:37 +00:00
return func ( ) {
for {
if quit := workFunc ( ) ; quit {
glog . Infof ( "resource quota controller worker shutting down" )
return
}
2016-05-10 19:59:34 +00:00
}
2015-11-11 21:19:39 +00:00
}
}
// Run begins quota controller using the specified number of workers
func ( rq * ResourceQuotaController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2017-04-12 19:49:17 +00:00
defer rq . queue . ShutDown ( )
2017-02-10 16:25:54 +00:00
glog . Infof ( "Starting resource quota controller" )
2017-04-12 19:49:17 +00:00
defer glog . Infof ( "Shutting down resource quota controller" )
2017-02-10 16:25:54 +00:00
2017-12-13 13:39:55 +00:00
if rq . quotaMonitor != nil {
go rq . quotaMonitor . Run ( stopCh )
}
2017-10-27 15:07:53 +00:00
2017-04-12 19:49:17 +00:00
if ! controller . WaitForCacheSync ( "resource quota" , stopCh , rq . informerSyncedFuncs ... ) {
2017-02-10 16:25:54 +00:00
return
}
2016-02-22 16:15:09 +00:00
// the workers that chug through the quota calculation backlog
2015-11-11 21:19:39 +00:00
for i := 0 ; i < workers ; i ++ {
2016-07-18 19:48:37 +00:00
go wait . Until ( rq . worker ( rq . queue ) , time . Second , stopCh )
go wait . Until ( rq . worker ( rq . missingUsageQueue ) , time . Second , stopCh )
2015-01-25 06:11:10 +00:00
}
2017-05-11 19:58:55 +00:00
// the timer for how often we do a full recalculation across all quotas
go wait . Until ( func ( ) { rq . enqueueAll ( ) } , rq . resyncPeriod ( ) , stopCh )
2015-11-11 21:19:39 +00:00
<- stopCh
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
// syncResourceQuotaFromKey syncs a quota key
func ( rq * ResourceQuotaController ) syncResourceQuotaFromKey ( key string ) ( err error ) {
startTime := time . Now ( )
defer func ( ) {
2018-02-11 13:17:00 +00:00
glog . V ( 4 ) . Infof ( "Finished syncing resource quota %q (%v)" , key , time . Since ( startTime ) )
2015-11-11 21:19:39 +00:00
} ( )
2017-02-10 16:25:54 +00:00
namespace , name , err := cache . SplitMetaNamespaceKey ( key )
if err != nil {
return err
}
quota , err := rq . rqLister . ResourceQuotas ( namespace ) . Get ( name )
if errors . IsNotFound ( err ) {
2015-11-11 21:19:39 +00:00
glog . Infof ( "Resource quota has been deleted %v" , key )
return nil
}
if err != nil {
glog . Infof ( "Unable to retrieve resource quota %v from store: %v" , key , err )
return err
}
return rq . syncResourceQuota ( quota )
}
2016-02-22 16:15:09 +00:00
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
2017-02-10 16:25:54 +00:00
func ( rq * ResourceQuotaController ) syncResourceQuota ( v1ResourceQuota * v1 . ResourceQuota ) ( err error ) {
2015-04-16 21:46:27 +00:00
// quota is dirty if any part of spec hard limits differs from the status hard limits
2017-01-25 13:39:54 +00:00
dirty := ! apiequality . Semantic . DeepEqual ( v1ResourceQuota . Spec . Hard , v1ResourceQuota . Status . Hard )
2016-11-18 20:50:17 +00:00
resourceQuota := api . ResourceQuota { }
2017-10-09 17:13:46 +00:00
if err := k8s_api_v1 . Convert_v1_ResourceQuota_To_core_ResourceQuota ( v1ResourceQuota , & resourceQuota , nil ) ; err != nil {
2016-11-18 20:50:17 +00:00
return err
}
2015-04-16 21:46:27 +00:00
2015-01-25 06:11:10 +00:00
// dirty tracks if the usage status differs from the previous sync,
// if so, we send a new usage with latest status
// if this is our first sync, it will be dirty by default, since we need track usage
2016-02-22 16:15:09 +00:00
dirty = dirty || ( resourceQuota . Status . Hard == nil || resourceQuota . Status . Used == nil )
2015-01-25 06:11:10 +00:00
2016-07-01 13:57:47 +00:00
used := api . ResourceList { }
2016-02-22 16:15:09 +00:00
if resourceQuota . Status . Used != nil {
2016-07-01 13:57:47 +00:00
used = quota . Add ( api . ResourceList { } , resourceQuota . Status . Used )
}
hardLimits := quota . Add ( api . ResourceList { } , resourceQuota . Spec . Hard )
2018-06-05 08:50:37 +00:00
newUsage , err := quota . CalculateUsage ( resourceQuota . Namespace , resourceQuota . Spec . Scopes , hardLimits , rq . registry , resourceQuota . Spec . ScopeSelector )
2016-07-01 13:57:47 +00:00
if err != nil {
return err
2016-02-22 16:15:09 +00:00
}
2016-07-01 13:57:47 +00:00
for key , value := range newUsage {
used [ key ] = value
}
2016-08-05 22:10:09 +00:00
// ensure set of used values match those that have hard constraints
hardResources := quota . ResourceNames ( hardLimits )
used = quota . Mask ( used , hardResources )
2016-07-01 13:57:47 +00:00
// Create a usage object that is based on the quota resource version that will handle updates
// by default, we preserve the past usage observation, and set hard to the current spec
2015-03-13 19:15:04 +00:00
usage := api . ResourceQuota {
2017-01-17 03:38:19 +00:00
ObjectMeta : metav1 . ObjectMeta {
2016-02-22 16:15:09 +00:00
Name : resourceQuota . Name ,
Namespace : resourceQuota . Namespace ,
ResourceVersion : resourceQuota . ResourceVersion ,
Labels : resourceQuota . Labels ,
Annotations : resourceQuota . Annotations } ,
2015-01-25 06:11:10 +00:00
Status : api . ResourceQuotaStatus {
2016-07-01 13:57:47 +00:00
Hard : hardLimits ,
Used : used ,
2015-01-25 06:11:10 +00:00
} ,
}
2015-02-24 16:17:41 +00:00
2016-02-22 16:15:09 +00:00
dirty = dirty || ! quota . Equals ( usage . Status . Used , resourceQuota . Status . Used )
// there was a change observed by this controller that requires we update quota
2015-01-25 06:11:10 +00:00
if dirty {
2016-11-18 20:50:17 +00:00
v1Usage := & v1 . ResourceQuota { }
2017-10-09 17:13:46 +00:00
if err := k8s_api_v1 . Convert_core_ResourceQuota_To_v1_ResourceQuota ( & usage , v1Usage , nil ) ; err != nil {
2016-11-18 20:50:17 +00:00
return err
}
2017-07-19 19:36:45 +00:00
_ , err = rq . rqClient . ResourceQuotas ( usage . Namespace ) . UpdateStatus ( v1Usage )
2015-03-13 19:15:04 +00:00
return err
2015-01-25 06:11:10 +00:00
}
return nil
}
2015-01-26 04:34:30 +00:00
2016-02-22 16:15:09 +00:00
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
2017-10-27 15:07:53 +00:00
func ( rq * ResourceQuotaController ) replenishQuota ( groupResource schema . GroupResource , namespace string ) {
// check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
evaluator := rq . registry . Get ( groupResource )
if evaluator == nil {
2016-03-03 17:50:25 +00:00
return
}
// check if this namespace even has a quota...
2017-02-10 16:25:54 +00:00
resourceQuotas , err := rq . rqLister . ResourceQuotas ( namespace ) . List ( labels . Everything ( ) )
if errors . IsNotFound ( err ) {
utilruntime . HandleError ( fmt . Errorf ( "quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes" , namespace , rq . resyncPeriod ( ) ) )
return
}
2015-11-11 21:19:39 +00:00
if err != nil {
2017-02-10 16:25:54 +00:00
utilruntime . HandleError ( fmt . Errorf ( "error checking to see if namespace %s has any ResourceQuota associated with it: %v" , namespace , err ) )
return
2015-11-11 21:19:39 +00:00
}
2016-02-22 16:15:09 +00:00
if len ( resourceQuotas ) == 0 {
2015-11-11 21:19:39 +00:00
return
}
2016-03-03 17:50:25 +00:00
// only queue those quotas that are tracking a resource associated with this kind.
2016-02-22 16:15:09 +00:00
for i := range resourceQuotas {
2017-02-10 16:25:54 +00:00
resourceQuota := resourceQuotas [ i ]
2016-11-18 20:50:17 +00:00
internalResourceQuota := & api . ResourceQuota { }
2017-10-09 17:13:46 +00:00
if err := k8s_api_v1 . Convert_v1_ResourceQuota_To_core_ResourceQuota ( resourceQuota , internalResourceQuota , nil ) ; err != nil {
2016-11-18 20:50:17 +00:00
glog . Error ( err )
continue
}
resourceQuotaResources := quota . ResourceNames ( internalResourceQuota . Status . Hard )
2016-12-07 20:44:16 +00:00
if intersection := evaluator . MatchingResources ( resourceQuotaResources ) ; len ( intersection ) > 0 {
2016-03-03 17:50:25 +00:00
// TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
rq . enqueueResourceQuota ( resourceQuota )
}
2015-11-11 21:19:39 +00:00
}
}
2017-10-27 15:07:53 +00:00
// Sync periodically resyncs the controller when new resources are observed from discovery.
func ( rq * ResourceQuotaController ) Sync ( discoveryFunc NamespacedResourcesFunc , period time . Duration , stopCh <- chan struct { } ) {
// Something has changed, so track the new state and perform a sync.
oldResources := make ( map [ schema . GroupVersionResource ] struct { } )
wait . Until ( func ( ) {
// Get the current resource list from discovery.
newResources , err := GetQuotableResources ( discoveryFunc )
if err != nil {
utilruntime . HandleError ( err )
2018-08-15 12:19:24 +00:00
if discovery . IsGroupDiscoveryFailedError ( err ) && len ( newResources ) > 0 {
// In partial discovery cases, don't remove any existing informers, just add new ones
for k , v := range oldResources {
newResources [ k ] = v
}
} else {
// short circuit in non-discovery error cases or if discovery returned zero resources
return
}
2017-10-27 15:07:53 +00:00
}
// Decide whether discovery has reported a change.
if reflect . DeepEqual ( oldResources , newResources ) {
glog . V ( 4 ) . Infof ( "no resource updates from discovery, skipping resource quota sync" )
return
}
// Something has changed, so track the new state and perform a sync.
glog . V ( 2 ) . Infof ( "syncing resource quota controller with updated resources from discovery: %v" , newResources )
oldResources = newResources
// Ensure workers are paused to avoid processing events before informers
// have resynced.
rq . workerLock . Lock ( )
defer rq . workerLock . Unlock ( )
// Perform the monitor resync and wait for controllers to report cache sync.
if err := rq . resyncMonitors ( newResources ) ; err != nil {
utilruntime . HandleError ( fmt . Errorf ( "failed to sync resource monitors: %v" , err ) )
return
}
2017-12-13 13:39:55 +00:00
if rq . quotaMonitor != nil && ! controller . WaitForCacheSync ( "resource quota" , stopCh , rq . quotaMonitor . IsSynced ) {
2017-10-27 15:07:53 +00:00
utilruntime . HandleError ( fmt . Errorf ( "timed out waiting for quota monitor sync" ) )
}
} , period , stopCh )
}
// resyncMonitors starts or stops quota monitors as needed to ensure that all
// (and only) those resources present in the map are monitored.
func ( rq * ResourceQuotaController ) resyncMonitors ( resources map [ schema . GroupVersionResource ] struct { } ) error {
2017-12-13 13:39:55 +00:00
if rq . quotaMonitor == nil {
return nil
}
if err := rq . quotaMonitor . SyncMonitors ( resources ) ; err != nil {
2017-10-27 15:07:53 +00:00
return err
}
2017-12-13 13:39:55 +00:00
rq . quotaMonitor . StartMonitors ( )
2017-10-27 15:07:53 +00:00
return nil
}
// GetQuotableResources returns all resources that the quota system should recognize.
// It requires a resource supports the following verbs: 'create','list','delete'
2018-08-15 12:19:24 +00:00
// This function may return both results and an error. If that happens, it means that the discovery calls were only
// partially successful. A decision about whether to proceed or not is left to the caller.
2017-10-27 15:07:53 +00:00
func GetQuotableResources ( discoveryFunc NamespacedResourcesFunc ) ( map [ schema . GroupVersionResource ] struct { } , error ) {
2018-08-15 12:19:24 +00:00
possibleResources , discoveryErr := discoveryFunc ( )
if discoveryErr != nil && len ( possibleResources ) == 0 {
return nil , fmt . Errorf ( "failed to discover resources: %v" , discoveryErr )
2017-10-27 15:07:53 +00:00
}
2017-12-18 19:50:09 +00:00
quotableResources := discovery . FilteredBy ( discovery . SupportsAllVerbs { Verbs : [ ] string { "create" , "list" , "watch" , "delete" } } , possibleResources )
2017-10-27 15:07:53 +00:00
quotableGroupVersionResources , err := discovery . GroupVersionResources ( quotableResources )
if err != nil {
return nil , fmt . Errorf ( "Failed to parse resources: %v" , err )
}
2018-08-15 12:19:24 +00:00
// return the original discovery error (if any) in addition to the list
return quotableGroupVersionResources , discoveryErr
2017-10-27 15:07:53 +00:00
}