2015-01-25 06:11:10 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2014 The Kubernetes Authors .
2015-01-25 06:11:10 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2015-10-10 03:58:57 +00:00
package resourcequota
2015-01-25 06:11:10 +00:00
import (
"time"
2015-08-05 22:05:17 +00:00
"github.com/golang/glog"
2016-02-22 16:15:09 +00:00
2017-01-11 14:09:48 +00:00
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
2016-11-18 20:50:17 +00:00
"k8s.io/kubernetes/pkg/api/v1"
2015-11-11 21:19:39 +00:00
"k8s.io/kubernetes/pkg/client/cache"
2017-01-06 06:34:29 +00:00
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
2015-11-11 21:19:39 +00:00
"k8s.io/kubernetes/pkg/controller"
2016-02-22 16:15:09 +00:00
"k8s.io/kubernetes/pkg/quota"
2016-04-13 18:38:32 +00:00
"k8s.io/kubernetes/pkg/util/metrics"
2015-11-11 21:19:39 +00:00
"k8s.io/kubernetes/pkg/util/workqueue"
2015-01-25 06:11:10 +00:00
)
2016-02-22 16:15:09 +00:00
// ResourceQuotaControllerOptions holds options for creating a quota controller
type ResourceQuotaControllerOptions struct {
// Must have authority to list all quotas, and update quota status
KubeClient clientset . Interface
// Controls full recalculation of quota usage
ResyncPeriod controller . ResyncPeriodFunc
// Knows how to calculate usage
Registry quota . Registry
// Knows how to build controllers that notify replenishment events
ControllerFactory ReplenishmentControllerFactory
2016-03-07 07:20:32 +00:00
// Controls full resync of objects monitored for replenihsment.
ReplenishmentResyncPeriod controller . ResyncPeriodFunc
2016-02-22 16:15:09 +00:00
// List of GroupKind objects that should be monitored for replenishment at
// a faster frequency than the quota controller recalculation interval
2016-11-21 02:55:31 +00:00
GroupKindsToReplenish [ ] schema . GroupKind
2016-02-22 16:15:09 +00:00
}
2015-07-31 11:38:04 +00:00
// ResourceQuotaController is responsible for tracking quota usage status in the system
type ResourceQuotaController struct {
2015-11-11 21:19:39 +00:00
// Must have authority to list all resources in the system, and update quota status
2016-01-29 06:34:08 +00:00
kubeClient clientset . Interface
2015-11-11 21:19:39 +00:00
// An index of resource quota objects by namespace
rqIndexer cache . Indexer
// Watches changes to all resource quota
2016-09-14 18:35:38 +00:00
rqController * cache . Controller
2015-11-11 21:19:39 +00:00
// ResourceQuota objects that need to be synchronized
2016-05-17 15:09:30 +00:00
queue workqueue . RateLimitingInterface
2016-07-18 19:48:37 +00:00
// missingUsageQueue holds objects that are missing the initial usage informatino
missingUsageQueue workqueue . RateLimitingInterface
2015-01-25 06:11:10 +00:00
// To allow injection of syncUsage for testing.
2015-11-11 21:19:39 +00:00
syncHandler func ( key string ) error
// function that controls full recalculation of quota usage
resyncPeriod controller . ResyncPeriodFunc
2016-02-22 16:15:09 +00:00
// knows how to calculate usage
registry quota . Registry
// controllers monitoring to notify for replenishment
2016-09-14 18:35:38 +00:00
replenishmentControllers [ ] cache . ControllerInterface
2015-01-25 06:11:10 +00:00
}
2016-02-22 16:15:09 +00:00
func NewResourceQuotaController ( options * ResourceQuotaControllerOptions ) * ResourceQuotaController {
// build the resource quota controller
2015-11-11 21:19:39 +00:00
rq := & ResourceQuotaController {
2016-02-22 16:15:09 +00:00
kubeClient : options . KubeClient ,
2016-08-22 18:15:45 +00:00
queue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "resourcequota_primary" ) ,
missingUsageQueue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "resourcequota_priority" ) ,
2016-02-22 16:15:09 +00:00
resyncPeriod : options . ResyncPeriod ,
registry : options . Registry ,
2016-09-14 18:35:38 +00:00
replenishmentControllers : [ ] cache . ControllerInterface { } ,
2015-01-25 06:11:10 +00:00
}
2016-10-13 12:56:07 +00:00
if options . KubeClient != nil && options . KubeClient . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) != nil {
metrics . RegisterMetricAndTrackRateLimiterUsage ( "resource_quota_controller" , options . KubeClient . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) )
2016-04-13 18:38:32 +00:00
}
2016-02-22 16:15:09 +00:00
// set the synchronization handler
rq . syncHandler = rq . syncResourceQuotaFromKey
// build the controller that observes quota
2016-09-14 18:35:38 +00:00
rq . rqIndexer , rq . rqController = cache . NewIndexerInformer (
2015-11-11 21:19:39 +00:00
& cache . ListWatch {
2016-11-18 20:50:17 +00:00
ListFunc : func ( options v1 . ListOptions ) ( runtime . Object , error ) {
return rq . kubeClient . Core ( ) . ResourceQuotas ( v1 . NamespaceAll ) . List ( options )
2015-11-11 21:19:39 +00:00
} ,
2016-11-18 20:50:17 +00:00
WatchFunc : func ( options v1 . ListOptions ) ( watch . Interface , error ) {
return rq . kubeClient . Core ( ) . ResourceQuotas ( v1 . NamespaceAll ) . Watch ( options )
2015-11-11 21:19:39 +00:00
} ,
} ,
2016-11-18 20:50:17 +00:00
& v1 . ResourceQuota { } ,
2016-02-22 16:15:09 +00:00
rq . resyncPeriod ( ) ,
2016-09-14 18:35:38 +00:00
cache . ResourceEventHandlerFuncs {
2016-07-18 19:48:37 +00:00
AddFunc : rq . addQuota ,
2015-11-11 21:19:39 +00:00
UpdateFunc : func ( old , cur interface { } ) {
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
// We ignore all updates to quota.Status because they are all driven by this controller.
// IMPORTANT:
// We do not use this function to queue up a full quota recalculation. To do so, would require
// us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
// that cannot be backed by a cache and result in a full query of a namespace's content, we do not
// want to pay the price on spurious status updates. As a result, we have a separate routine that is
// responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
2016-11-18 20:50:17 +00:00
oldResourceQuota := old . ( * v1 . ResourceQuota )
curResourceQuota := cur . ( * v1 . ResourceQuota )
if quota . V1Equals ( oldResourceQuota . Spec . Hard , curResourceQuota . Spec . Hard ) {
2015-11-11 21:19:39 +00:00
return
}
2016-07-18 19:48:37 +00:00
rq . addQuota ( curResourceQuota )
2015-11-11 21:19:39 +00:00
} ,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc : rq . enqueueResourceQuota ,
} ,
cache . Indexers { "namespace" : cache . MetaNamespaceIndexFunc } ,
)
2016-02-22 16:15:09 +00:00
for _ , groupKindToReplenish := range options . GroupKindsToReplenish {
controllerOptions := & ReplenishmentControllerOptions {
GroupKind : groupKindToReplenish ,
2016-03-07 07:20:32 +00:00
ResyncPeriod : options . ReplenishmentResyncPeriod ,
2016-02-22 16:15:09 +00:00
ReplenishmentFunc : rq . replenishQuota ,
}
replenishmentController , err := options . ControllerFactory . NewController ( controllerOptions )
if err != nil {
glog . Warningf ( "quota controller unable to replenish %s due to %v, changes only accounted during full resync" , groupKindToReplenish , err )
} else {
rq . replenishmentControllers = append ( rq . replenishmentControllers , replenishmentController )
}
}
2015-11-11 21:19:39 +00:00
return rq
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
func ( rq * ResourceQuotaController ) enqueueAll ( ) {
defer glog . V ( 4 ) . Infof ( "Resource quota controller queued all resource quota for full calculation of usage" )
for _ , k := range rq . rqIndexer . ListKeys ( ) {
rq . queue . Add ( k )
}
2015-01-25 06:11:10 +00:00
}
2016-11-18 20:50:17 +00:00
// obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
2015-11-11 21:19:39 +00:00
func ( rq * ResourceQuotaController ) enqueueResourceQuota ( obj interface { } ) {
key , err := controller . KeyFunc ( obj )
2015-01-25 06:11:10 +00:00
if err != nil {
2015-11-11 21:19:39 +00:00
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
rq . queue . Add ( key )
}
2016-07-18 19:48:37 +00:00
func ( rq * ResourceQuotaController ) addQuota ( obj interface { } ) {
key , err := controller . KeyFunc ( obj )
if err != nil {
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
}
2016-11-18 20:50:17 +00:00
resourceQuota := obj . ( * v1 . ResourceQuota )
2016-07-18 19:48:37 +00:00
// if we declared an intent that is not yet captured in status (prioritize it)
2016-11-19 23:32:10 +00:00
if ! api . Semantic . DeepEqual ( resourceQuota . Spec . Hard , resourceQuota . Status . Hard ) {
2016-07-18 19:48:37 +00:00
rq . missingUsageQueue . Add ( key )
return
}
// if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
for constraint := range resourceQuota . Status . Hard {
if _ , usageFound := resourceQuota . Status . Used [ constraint ] ; ! usageFound {
2016-11-18 20:50:17 +00:00
matchedResources := [ ] api . ResourceName { api . ResourceName ( constraint ) }
2016-07-18 19:48:37 +00:00
for _ , evaluator := range rq . registry . Evaluators ( ) {
2016-12-07 20:44:16 +00:00
if intersection := evaluator . MatchingResources ( matchedResources ) ; len ( intersection ) > 0 {
2016-07-18 19:48:37 +00:00
rq . missingUsageQueue . Add ( key )
return
}
}
}
}
// no special priority, go in normal recalc queue
rq . queue . Add ( key )
}
2015-11-11 21:19:39 +00:00
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
2016-07-18 19:48:37 +00:00
func ( rq * ResourceQuotaController ) worker ( queue workqueue . RateLimitingInterface ) func ( ) {
2016-05-10 19:59:34 +00:00
workFunc := func ( ) bool {
2016-07-18 19:48:37 +00:00
key , quit := queue . Get ( )
2016-05-10 19:59:34 +00:00
if quit {
return true
}
2016-07-18 19:48:37 +00:00
defer queue . Done ( key )
2016-05-10 19:59:34 +00:00
err := rq . syncHandler ( key . ( string ) )
2016-05-17 15:09:30 +00:00
if err == nil {
2016-07-18 19:48:37 +00:00
queue . Forget ( key )
2016-05-17 15:09:30 +00:00
return false
2016-05-10 19:59:34 +00:00
}
2016-05-17 15:09:30 +00:00
utilruntime . HandleError ( err )
2016-07-18 19:48:37 +00:00
queue . AddRateLimited ( key )
2016-05-10 19:59:34 +00:00
return false
}
2016-07-18 19:48:37 +00:00
return func ( ) {
for {
if quit := workFunc ( ) ; quit {
glog . Infof ( "resource quota controller worker shutting down" )
return
}
2016-05-10 19:59:34 +00:00
}
2015-11-11 21:19:39 +00:00
}
}
// Run begins quota controller using the specified number of workers
func ( rq * ResourceQuotaController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2015-11-11 21:19:39 +00:00
go rq . rqController . Run ( stopCh )
2016-02-22 16:15:09 +00:00
// the controllers that replenish other resources to respond rapidly to state changes
for _ , replenishmentController := range rq . replenishmentControllers {
go replenishmentController . Run ( stopCh )
}
// the workers that chug through the quota calculation backlog
2015-11-11 21:19:39 +00:00
for i := 0 ; i < workers ; i ++ {
2016-07-18 19:48:37 +00:00
go wait . Until ( rq . worker ( rq . queue ) , time . Second , stopCh )
go wait . Until ( rq . worker ( rq . missingUsageQueue ) , time . Second , stopCh )
2015-01-25 06:11:10 +00:00
}
2016-02-22 16:15:09 +00:00
// the timer for how often we do a full recalculation across all quotas
2016-02-02 10:57:06 +00:00
go wait . Until ( func ( ) { rq . enqueueAll ( ) } , rq . resyncPeriod ( ) , stopCh )
2015-11-11 21:19:39 +00:00
<- stopCh
glog . Infof ( "Shutting down ResourceQuotaController" )
rq . queue . ShutDown ( )
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
// syncResourceQuotaFromKey syncs a quota key
func ( rq * ResourceQuotaController ) syncResourceQuotaFromKey ( key string ) ( err error ) {
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing resource quota %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
obj , exists , err := rq . rqIndexer . GetByKey ( key )
if ! exists {
glog . Infof ( "Resource quota has been deleted %v" , key )
return nil
}
if err != nil {
glog . Infof ( "Unable to retrieve resource quota %v from store: %v" , key , err )
rq . queue . Add ( key )
return err
}
2016-11-18 20:50:17 +00:00
quota := * obj . ( * v1 . ResourceQuota )
2015-11-11 21:19:39 +00:00
return rq . syncResourceQuota ( quota )
}
2016-02-22 16:15:09 +00:00
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
2016-11-18 20:50:17 +00:00
func ( rq * ResourceQuotaController ) syncResourceQuota ( v1ResourceQuota v1 . ResourceQuota ) ( err error ) {
2015-04-16 21:46:27 +00:00
// quota is dirty if any part of spec hard limits differs from the status hard limits
2016-11-19 23:32:10 +00:00
dirty := ! api . Semantic . DeepEqual ( v1ResourceQuota . Spec . Hard , v1ResourceQuota . Status . Hard )
2016-11-18 20:50:17 +00:00
resourceQuota := api . ResourceQuota { }
if err := v1 . Convert_v1_ResourceQuota_To_api_ResourceQuota ( & v1ResourceQuota , & resourceQuota , nil ) ; err != nil {
return err
}
2015-04-16 21:46:27 +00:00
2015-01-25 06:11:10 +00:00
// dirty tracks if the usage status differs from the previous sync,
// if so, we send a new usage with latest status
// if this is our first sync, it will be dirty by default, since we need track usage
2016-02-22 16:15:09 +00:00
dirty = dirty || ( resourceQuota . Status . Hard == nil || resourceQuota . Status . Used == nil )
2015-01-25 06:11:10 +00:00
2016-07-01 13:57:47 +00:00
used := api . ResourceList { }
2016-02-22 16:15:09 +00:00
if resourceQuota . Status . Used != nil {
2016-07-01 13:57:47 +00:00
used = quota . Add ( api . ResourceList { } , resourceQuota . Status . Used )
}
hardLimits := quota . Add ( api . ResourceList { } , resourceQuota . Spec . Hard )
newUsage , err := quota . CalculateUsage ( resourceQuota . Namespace , resourceQuota . Spec . Scopes , hardLimits , rq . registry )
if err != nil {
return err
2016-02-22 16:15:09 +00:00
}
2016-07-01 13:57:47 +00:00
for key , value := range newUsage {
used [ key ] = value
}
2016-08-05 22:10:09 +00:00
// ensure set of used values match those that have hard constraints
hardResources := quota . ResourceNames ( hardLimits )
used = quota . Mask ( used , hardResources )
2016-07-01 13:57:47 +00:00
// Create a usage object that is based on the quota resource version that will handle updates
// by default, we preserve the past usage observation, and set hard to the current spec
2015-03-13 19:15:04 +00:00
usage := api . ResourceQuota {
2015-01-25 06:11:10 +00:00
ObjectMeta : api . ObjectMeta {
2016-02-22 16:15:09 +00:00
Name : resourceQuota . Name ,
Namespace : resourceQuota . Namespace ,
ResourceVersion : resourceQuota . ResourceVersion ,
Labels : resourceQuota . Labels ,
Annotations : resourceQuota . Annotations } ,
2015-01-25 06:11:10 +00:00
Status : api . ResourceQuotaStatus {
2016-07-01 13:57:47 +00:00
Hard : hardLimits ,
Used : used ,
2015-01-25 06:11:10 +00:00
} ,
}
2015-02-24 16:17:41 +00:00
2016-02-22 16:15:09 +00:00
dirty = dirty || ! quota . Equals ( usage . Status . Used , resourceQuota . Status . Used )
// there was a change observed by this controller that requires we update quota
2015-01-25 06:11:10 +00:00
if dirty {
2016-11-18 20:50:17 +00:00
v1Usage := & v1 . ResourceQuota { }
if err := v1 . Convert_api_ResourceQuota_To_v1_ResourceQuota ( & usage , v1Usage , nil ) ; err != nil {
return err
}
_ , err = rq . kubeClient . Core ( ) . ResourceQuotas ( usage . Namespace ) . UpdateStatus ( v1Usage )
2015-03-13 19:15:04 +00:00
return err
2015-01-25 06:11:10 +00:00
}
return nil
}
2015-01-26 04:34:30 +00:00
2016-02-22 16:15:09 +00:00
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
2016-11-21 02:55:31 +00:00
func ( rq * ResourceQuotaController ) replenishQuota ( groupKind schema . GroupKind , namespace string , object runtime . Object ) {
2016-03-03 17:50:25 +00:00
// check if the quota controller can evaluate this kind, if not, ignore it altogether...
evaluators := rq . registry . Evaluators ( )
evaluator , found := evaluators [ groupKind ]
if ! found {
return
}
// check if this namespace even has a quota...
2016-11-18 20:50:17 +00:00
indexKey := & v1 . ResourceQuota { }
2016-02-22 16:15:09 +00:00
indexKey . Namespace = namespace
resourceQuotas , err := rq . rqIndexer . Index ( "namespace" , indexKey )
2015-11-11 21:19:39 +00:00
if err != nil {
2016-02-22 16:15:09 +00:00
glog . Errorf ( "quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes" , namespace , rq . resyncPeriod ( ) )
2015-11-11 21:19:39 +00:00
}
2016-02-22 16:15:09 +00:00
if len ( resourceQuotas ) == 0 {
2015-11-11 21:19:39 +00:00
return
}
2016-03-03 17:50:25 +00:00
// only queue those quotas that are tracking a resource associated with this kind.
2016-02-22 16:15:09 +00:00
for i := range resourceQuotas {
2016-11-18 20:50:17 +00:00
resourceQuota := resourceQuotas [ i ] . ( * v1 . ResourceQuota )
internalResourceQuota := & api . ResourceQuota { }
if err := v1 . Convert_v1_ResourceQuota_To_api_ResourceQuota ( resourceQuota , internalResourceQuota , nil ) ; err != nil {
glog . Error ( err )
continue
}
resourceQuotaResources := quota . ResourceNames ( internalResourceQuota . Status . Hard )
2016-12-07 20:44:16 +00:00
if intersection := evaluator . MatchingResources ( resourceQuotaResources ) ; len ( intersection ) > 0 {
2016-03-03 17:50:25 +00:00
// TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
rq . enqueueResourceQuota ( resourceQuota )
}
2015-11-11 21:19:39 +00:00
}
}