2015-01-25 06:11:10 +00:00
/ *
2015-05-01 16:19:44 +00:00
Copyright 2014 The Kubernetes Authors All rights reserved .
2015-01-25 06:11:10 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2015-10-10 03:58:57 +00:00
package resourcequota
2015-01-25 06:11:10 +00:00
import (
"time"
2015-08-05 22:05:17 +00:00
"github.com/golang/glog"
2016-02-22 16:15:09 +00:00
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
2016-02-22 16:15:09 +00:00
"k8s.io/kubernetes/pkg/api/unversioned"
2015-11-11 21:19:39 +00:00
"k8s.io/kubernetes/pkg/client/cache"
2016-02-05 21:58:03 +00:00
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
2015-11-11 21:19:39 +00:00
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
2016-02-22 16:15:09 +00:00
"k8s.io/kubernetes/pkg/quota"
2015-11-11 21:19:39 +00:00
"k8s.io/kubernetes/pkg/runtime"
2016-04-13 18:38:32 +00:00
"k8s.io/kubernetes/pkg/util/metrics"
2016-01-15 07:32:10 +00:00
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
2016-02-02 10:57:06 +00:00
"k8s.io/kubernetes/pkg/util/wait"
2015-11-11 21:19:39 +00:00
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
2015-01-25 06:11:10 +00:00
)
2016-02-22 16:15:09 +00:00
// ResourceQuotaControllerOptions holds options for creating a quota controller
type ResourceQuotaControllerOptions struct {
// Must have authority to list all quotas, and update quota status
KubeClient clientset . Interface
// Controls full recalculation of quota usage
ResyncPeriod controller . ResyncPeriodFunc
// Knows how to calculate usage
Registry quota . Registry
// Knows how to build controllers that notify replenishment events
ControllerFactory ReplenishmentControllerFactory
2016-03-07 07:20:32 +00:00
// Controls full resync of objects monitored for replenihsment.
ReplenishmentResyncPeriod controller . ResyncPeriodFunc
2016-02-22 16:15:09 +00:00
// List of GroupKind objects that should be monitored for replenishment at
// a faster frequency than the quota controller recalculation interval
GroupKindsToReplenish [ ] unversioned . GroupKind
}
2015-07-31 11:38:04 +00:00
// ResourceQuotaController is responsible for tracking quota usage status in the system
type ResourceQuotaController struct {
2015-11-11 21:19:39 +00:00
// Must have authority to list all resources in the system, and update quota status
2016-01-29 06:34:08 +00:00
kubeClient clientset . Interface
2015-11-11 21:19:39 +00:00
// An index of resource quota objects by namespace
rqIndexer cache . Indexer
// Watches changes to all resource quota
rqController * framework . Controller
// ResourceQuota objects that need to be synchronized
queue * workqueue . Type
2015-01-25 06:11:10 +00:00
// To allow injection of syncUsage for testing.
2015-11-11 21:19:39 +00:00
syncHandler func ( key string ) error
// function that controls full recalculation of quota usage
resyncPeriod controller . ResyncPeriodFunc
2016-02-22 16:15:09 +00:00
// knows how to calculate usage
registry quota . Registry
// controllers monitoring to notify for replenishment
2016-04-19 14:46:31 +00:00
replenishmentControllers [ ] framework . ControllerInterface
2015-01-25 06:11:10 +00:00
}
2016-02-22 16:15:09 +00:00
func NewResourceQuotaController ( options * ResourceQuotaControllerOptions ) * ResourceQuotaController {
// build the resource quota controller
2015-11-11 21:19:39 +00:00
rq := & ResourceQuotaController {
2016-02-22 16:15:09 +00:00
kubeClient : options . KubeClient ,
queue : workqueue . New ( ) ,
resyncPeriod : options . ResyncPeriod ,
registry : options . Registry ,
2016-04-19 14:46:31 +00:00
replenishmentControllers : [ ] framework . ControllerInterface { } ,
2015-01-25 06:11:10 +00:00
}
2016-04-13 18:38:32 +00:00
if options . KubeClient != nil && options . KubeClient . Core ( ) . GetRESTClient ( ) . GetRateLimiter ( ) != nil {
metrics . RegisterMetricAndTrackRateLimiterUsage ( "resource_quota_controller" , options . KubeClient . Core ( ) . GetRESTClient ( ) . GetRateLimiter ( ) )
}
2016-02-22 16:15:09 +00:00
// set the synchronization handler
rq . syncHandler = rq . syncResourceQuotaFromKey
// build the controller that observes quota
2015-11-11 21:19:39 +00:00
rq . rqIndexer , rq . rqController = framework . NewIndexerInformer (
& cache . ListWatch {
2015-12-10 09:39:03 +00:00
ListFunc : func ( options api . ListOptions ) ( runtime . Object , error ) {
2016-02-03 21:21:05 +00:00
return rq . kubeClient . Core ( ) . ResourceQuotas ( api . NamespaceAll ) . List ( options )
2015-11-11 21:19:39 +00:00
} ,
2015-12-10 09:39:03 +00:00
WatchFunc : func ( options api . ListOptions ) ( watch . Interface , error ) {
2016-02-03 21:21:05 +00:00
return rq . kubeClient . Core ( ) . ResourceQuotas ( api . NamespaceAll ) . Watch ( options )
2015-11-11 21:19:39 +00:00
} ,
} ,
& api . ResourceQuota { } ,
2016-02-22 16:15:09 +00:00
rq . resyncPeriod ( ) ,
2015-11-11 21:19:39 +00:00
framework . ResourceEventHandlerFuncs {
AddFunc : rq . enqueueResourceQuota ,
UpdateFunc : func ( old , cur interface { } ) {
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
// We ignore all updates to quota.Status because they are all driven by this controller.
// IMPORTANT:
// We do not use this function to queue up a full quota recalculation. To do so, would require
// us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
// that cannot be backed by a cache and result in a full query of a namespace's content, we do not
// want to pay the price on spurious status updates. As a result, we have a separate routine that is
// responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
oldResourceQuota := old . ( * api . ResourceQuota )
curResourceQuota := cur . ( * api . ResourceQuota )
2016-02-22 16:15:09 +00:00
if quota . Equals ( curResourceQuota . Spec . Hard , oldResourceQuota . Spec . Hard ) {
2015-11-11 21:19:39 +00:00
return
}
rq . enqueueResourceQuota ( curResourceQuota )
} ,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc : rq . enqueueResourceQuota ,
} ,
cache . Indexers { "namespace" : cache . MetaNamespaceIndexFunc } ,
)
2016-02-22 16:15:09 +00:00
for _ , groupKindToReplenish := range options . GroupKindsToReplenish {
controllerOptions := & ReplenishmentControllerOptions {
GroupKind : groupKindToReplenish ,
2016-03-07 07:20:32 +00:00
ResyncPeriod : options . ReplenishmentResyncPeriod ,
2016-02-22 16:15:09 +00:00
ReplenishmentFunc : rq . replenishQuota ,
}
replenishmentController , err := options . ControllerFactory . NewController ( controllerOptions )
if err != nil {
glog . Warningf ( "quota controller unable to replenish %s due to %v, changes only accounted during full resync" , groupKindToReplenish , err )
} else {
rq . replenishmentControllers = append ( rq . replenishmentControllers , replenishmentController )
}
}
2015-11-11 21:19:39 +00:00
return rq
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
func ( rq * ResourceQuotaController ) enqueueAll ( ) {
defer glog . V ( 4 ) . Infof ( "Resource quota controller queued all resource quota for full calculation of usage" )
for _ , k := range rq . rqIndexer . ListKeys ( ) {
rq . queue . Add ( k )
}
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
// obj could be an *api.ResourceQuota, or a DeletionFinalStateUnknown marker item.
func ( rq * ResourceQuotaController ) enqueueResourceQuota ( obj interface { } ) {
key , err := controller . KeyFunc ( obj )
2015-01-25 06:11:10 +00:00
if err != nil {
2015-11-11 21:19:39 +00:00
glog . Errorf ( "Couldn't get key for object %+v: %v" , obj , err )
return
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
rq . queue . Add ( key )
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func ( rq * ResourceQuotaController ) worker ( ) {
2016-05-10 19:59:34 +00:00
workFunc := func ( ) bool {
key , quit := rq . queue . Get ( )
if quit {
return true
}
defer rq . queue . Done ( key )
err := rq . syncHandler ( key . ( string ) )
if err != nil {
utilruntime . HandleError ( err )
rq . queue . Add ( key )
}
return false
}
2015-11-11 21:19:39 +00:00
for {
2016-05-10 19:59:34 +00:00
if quit := workFunc ( ) ; quit {
glog . Infof ( "resource quota controller worker shutting down" )
return
}
2015-11-11 21:19:39 +00:00
}
}
// Run begins quota controller using the specified number of workers
func ( rq * ResourceQuotaController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2015-11-11 21:19:39 +00:00
go rq . rqController . Run ( stopCh )
2016-02-22 16:15:09 +00:00
// the controllers that replenish other resources to respond rapidly to state changes
for _ , replenishmentController := range rq . replenishmentControllers {
go replenishmentController . Run ( stopCh )
}
// the workers that chug through the quota calculation backlog
2015-11-11 21:19:39 +00:00
for i := 0 ; i < workers ; i ++ {
2016-02-02 10:57:06 +00:00
go wait . Until ( rq . worker , time . Second , stopCh )
2015-01-25 06:11:10 +00:00
}
2016-02-22 16:15:09 +00:00
// the timer for how often we do a full recalculation across all quotas
2016-02-02 10:57:06 +00:00
go wait . Until ( func ( ) { rq . enqueueAll ( ) } , rq . resyncPeriod ( ) , stopCh )
2015-11-11 21:19:39 +00:00
<- stopCh
glog . Infof ( "Shutting down ResourceQuotaController" )
rq . queue . ShutDown ( )
2015-01-25 06:11:10 +00:00
}
2015-11-11 21:19:39 +00:00
// syncResourceQuotaFromKey syncs a quota key
func ( rq * ResourceQuotaController ) syncResourceQuotaFromKey ( key string ) ( err error ) {
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing resource quota %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
obj , exists , err := rq . rqIndexer . GetByKey ( key )
if ! exists {
glog . Infof ( "Resource quota has been deleted %v" , key )
return nil
}
if err != nil {
glog . Infof ( "Unable to retrieve resource quota %v from store: %v" , key , err )
rq . queue . Add ( key )
return err
}
quota := * obj . ( * api . ResourceQuota )
return rq . syncResourceQuota ( quota )
}
2016-02-22 16:15:09 +00:00
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
func ( rq * ResourceQuotaController ) syncResourceQuota ( resourceQuota api . ResourceQuota ) ( err error ) {
2015-04-16 21:46:27 +00:00
// quota is dirty if any part of spec hard limits differs from the status hard limits
2016-02-22 16:15:09 +00:00
dirty := ! api . Semantic . DeepEqual ( resourceQuota . Spec . Hard , resourceQuota . Status . Hard )
2015-04-16 21:46:27 +00:00
2015-01-25 06:11:10 +00:00
// dirty tracks if the usage status differs from the previous sync,
// if so, we send a new usage with latest status
// if this is our first sync, it will be dirty by default, since we need track usage
2016-02-22 16:15:09 +00:00
dirty = dirty || ( resourceQuota . Status . Hard == nil || resourceQuota . Status . Used == nil )
2015-01-25 06:11:10 +00:00
2016-02-22 16:15:09 +00:00
// Create a usage object that is based on the quota resource version that will handle updates
// by default, we preserve the past usage observation, and set hard to the current spec
previousUsed := api . ResourceList { }
if resourceQuota . Status . Used != nil {
previousUsed = quota . Add ( api . ResourceList { } , resourceQuota . Status . Used )
}
2015-03-13 19:15:04 +00:00
usage := api . ResourceQuota {
2015-01-25 06:11:10 +00:00
ObjectMeta : api . ObjectMeta {
2016-02-22 16:15:09 +00:00
Name : resourceQuota . Name ,
Namespace : resourceQuota . Namespace ,
ResourceVersion : resourceQuota . ResourceVersion ,
Labels : resourceQuota . Labels ,
Annotations : resourceQuota . Annotations } ,
2015-01-25 06:11:10 +00:00
Status : api . ResourceQuotaStatus {
2016-02-22 16:15:09 +00:00
Hard : quota . Add ( api . ResourceList { } , resourceQuota . Spec . Hard ) ,
Used : previousUsed ,
2015-01-25 06:11:10 +00:00
} ,
}
2015-02-24 16:17:41 +00:00
2016-02-22 16:15:09 +00:00
// find the intersection between the hard resources on the quota
// and the resources this controller can track to know what we can
// look to measure updated usage stats for
hardResources := quota . ResourceNames ( usage . Status . Hard )
potentialResources := [ ] api . ResourceName { }
evaluators := rq . registry . Evaluators ( )
for _ , evaluator := range evaluators {
potentialResources = append ( potentialResources , evaluator . MatchesResources ( ) ... )
2015-01-25 06:11:10 +00:00
}
2016-02-22 16:15:09 +00:00
matchedResources := quota . Intersection ( hardResources , potentialResources )
2015-01-25 06:11:10 +00:00
2016-02-22 16:15:09 +00:00
// sum the observed usage from each evaluator
newUsage := api . ResourceList { }
usageStatsOptions := quota . UsageStatsOptions { Namespace : resourceQuota . Namespace , Scopes : resourceQuota . Spec . Scopes }
for _ , evaluator := range evaluators {
stats , err := evaluator . UsageStats ( usageStatsOptions )
2015-01-25 06:11:10 +00:00
if err != nil {
return err
}
2016-02-22 16:15:09 +00:00
newUsage = quota . Add ( newUsage , stats . Used )
2015-01-25 06:11:10 +00:00
}
2016-02-22 16:15:09 +00:00
// mask the observed usage to only the set of resources tracked by this quota
// merge our observed usage with the quota usage status
// if the new usage is different than the last usage, we will need to do an update
newUsage = quota . Mask ( newUsage , matchedResources )
for key , value := range newUsage {
usage . Status . Used [ key ] = value
2015-01-25 06:11:10 +00:00
}
2016-02-22 16:15:09 +00:00
dirty = dirty || ! quota . Equals ( usage . Status . Used , resourceQuota . Status . Used )
// there was a change observed by this controller that requires we update quota
2015-01-25 06:11:10 +00:00
if dirty {
2016-02-03 21:21:05 +00:00
_ , err = rq . kubeClient . Core ( ) . ResourceQuotas ( usage . Namespace ) . UpdateStatus ( & usage )
2015-03-13 19:15:04 +00:00
return err
2015-01-25 06:11:10 +00:00
}
return nil
}
2015-01-26 04:34:30 +00:00
2016-02-22 16:15:09 +00:00
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
func ( rq * ResourceQuotaController ) replenishQuota ( groupKind unversioned . GroupKind , namespace string , object runtime . Object ) {
2016-03-03 17:50:25 +00:00
// check if the quota controller can evaluate this kind, if not, ignore it altogether...
evaluators := rq . registry . Evaluators ( )
evaluator , found := evaluators [ groupKind ]
if ! found {
return
}
// check if this namespace even has a quota...
2016-02-22 16:15:09 +00:00
indexKey := & api . ResourceQuota { }
indexKey . Namespace = namespace
resourceQuotas , err := rq . rqIndexer . Index ( "namespace" , indexKey )
2015-11-11 21:19:39 +00:00
if err != nil {
2016-02-22 16:15:09 +00:00
glog . Errorf ( "quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes" , namespace , rq . resyncPeriod ( ) )
2015-11-11 21:19:39 +00:00
}
2016-02-22 16:15:09 +00:00
if len ( resourceQuotas ) == 0 {
2015-11-11 21:19:39 +00:00
return
}
2016-03-03 17:50:25 +00:00
// only queue those quotas that are tracking a resource associated with this kind.
matchedResources := evaluator . MatchesResources ( )
2016-02-22 16:15:09 +00:00
for i := range resourceQuotas {
resourceQuota := resourceQuotas [ i ] . ( * api . ResourceQuota )
2016-03-03 17:50:25 +00:00
resourceQuotaResources := quota . ResourceNames ( resourceQuota . Status . Hard )
if len ( quota . Intersection ( matchedResources , resourceQuotaResources ) ) > 0 {
// TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
rq . enqueueResourceQuota ( resourceQuota )
}
2015-11-11 21:19:39 +00:00
}
}