mirror of https://github.com/k3s-io/k3s
Update quota controller to monitor all types
parent
13294a0abe
commit
7f88e91892
|
@ -24,7 +24,6 @@ go_library(
|
||||||
importpath = "k8s.io/kubernetes/cmd/kube-controller-manager/app",
|
importpath = "k8s.io/kubernetes/cmd/kube-controller-manager/app",
|
||||||
deps = [
|
deps = [
|
||||||
"//cmd/kube-controller-manager/app/options:go_default_library",
|
"//cmd/kube-controller-manager/app/options:go_default_library",
|
||||||
"//pkg/api:go_default_library",
|
|
||||||
"//pkg/api/install:go_default_library",
|
"//pkg/api/install:go_default_library",
|
||||||
"//pkg/api/legacyscheme:go_default_library",
|
"//pkg/api/legacyscheme:go_default_library",
|
||||||
"//pkg/apis/apps/install:go_default_library",
|
"//pkg/apis/apps/install:go_default_library",
|
||||||
|
@ -78,6 +77,7 @@ go_library(
|
||||||
"//pkg/controller/volume/expand:go_default_library",
|
"//pkg/controller/volume/expand:go_default_library",
|
||||||
"//pkg/controller/volume/persistentvolume:go_default_library",
|
"//pkg/controller/volume/persistentvolume:go_default_library",
|
||||||
"//pkg/features:go_default_library",
|
"//pkg/features:go_default_library",
|
||||||
|
"//pkg/quota/generic:go_default_library",
|
||||||
"//pkg/quota/install:go_default_library",
|
"//pkg/quota/install:go_default_library",
|
||||||
"//pkg/serviceaccount:go_default_library",
|
"//pkg/serviceaccount:go_default_library",
|
||||||
"//pkg/util/configz:go_default_library",
|
"//pkg/util/configz:go_default_library",
|
||||||
|
|
|
@ -36,7 +36,6 @@ import (
|
||||||
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
|
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
|
||||||
|
@ -55,6 +54,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/expand"
|
"k8s.io/kubernetes/pkg/controller/volume/expand"
|
||||||
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
|
"k8s.io/kubernetes/pkg/quota/generic"
|
||||||
quotainstall "k8s.io/kubernetes/pkg/quota/install"
|
quotainstall "k8s.io/kubernetes/pkg/quota/install"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
)
|
)
|
||||||
|
@ -240,31 +240,34 @@ func startPodGCController(ctx ControllerContext) (bool, error) {
|
||||||
|
|
||||||
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
|
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
|
||||||
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
|
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
|
||||||
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
|
discoveryFunc := resourceQuotaControllerClient.Discovery().ServerPreferredNamespacedResources
|
||||||
groupKindsToReplenish := []schema.GroupKind{
|
listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource)
|
||||||
api.Kind("Pod"),
|
quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)
|
||||||
api.Kind("Service"),
|
|
||||||
api.Kind("ReplicationController"),
|
|
||||||
api.Kind("PersistentVolumeClaim"),
|
|
||||||
api.Kind("Secret"),
|
|
||||||
api.Kind("ConfigMap"),
|
|
||||||
}
|
|
||||||
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
||||||
QuotaClient: resourceQuotaControllerClient.CoreV1(),
|
QuotaClient: resourceQuotaControllerClient.CoreV1(),
|
||||||
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
|
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
|
||||||
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
|
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
|
||||||
Registry: resourceQuotaRegistry,
|
InformerFactory: ctx.InformerFactory,
|
||||||
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory),
|
|
||||||
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
|
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
|
||||||
GroupKindsToReplenish: groupKindsToReplenish,
|
DiscoveryFunc: discoveryFunc,
|
||||||
|
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
|
||||||
|
InformersStarted: ctx.InformersStarted,
|
||||||
|
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
|
||||||
}
|
}
|
||||||
if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil {
|
if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||||
metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter())
|
metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter())
|
||||||
}
|
}
|
||||||
|
|
||||||
go resourcequotacontroller.NewResourceQuotaController(
|
resourceQuotaController, err := resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||||
resourceQuotaControllerOptions,
|
if err != nil {
|
||||||
).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)
|
return false, err
|
||||||
|
}
|
||||||
|
go resourceQuotaController.Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)
|
||||||
|
|
||||||
|
// Periodically the quota controller to detect new resource types
|
||||||
|
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop)
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,8 +10,8 @@ go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"doc.go",
|
"doc.go",
|
||||||
"replenishment_controller.go",
|
|
||||||
"resource_quota_controller.go",
|
"resource_quota_controller.go",
|
||||||
|
"resource_quota_monitor.go",
|
||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/controller/resourcequota",
|
importpath = "k8s.io/kubernetes/pkg/controller/resourcequota",
|
||||||
deps = [
|
deps = [
|
||||||
|
@ -20,6 +20,7 @@ go_library(
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/quota:go_default_library",
|
"//pkg/quota:go_default_library",
|
||||||
"//pkg/quota/evaluator/core:go_default_library",
|
"//pkg/quota/evaluator/core:go_default_library",
|
||||||
|
"//pkg/quota/generic:go_default_library",
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||||
|
@ -27,11 +28,12 @@ go_library(
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||||
|
@ -43,15 +45,12 @@ go_library(
|
||||||
|
|
||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = [
|
srcs = ["resource_quota_controller_test.go"],
|
||||||
"replenishment_controller_test.go",
|
|
||||||
"resource_quota_controller_test.go",
|
|
||||||
],
|
|
||||||
importpath = "k8s.io/kubernetes/pkg/controller/resourcequota",
|
importpath = "k8s.io/kubernetes/pkg/controller/resourcequota",
|
||||||
library = ":go_default_library",
|
library = ":go_default_library",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api:go_default_library",
|
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
|
"//pkg/quota:go_default_library",
|
||||||
"//pkg/quota/generic:go_default_library",
|
"//pkg/quota/generic:go_default_library",
|
||||||
"//pkg/quota/install:go_default_library",
|
"//pkg/quota/install:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
|
@ -59,12 +58,12 @@ go_test(
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -1,233 +0,0 @@
|
||||||
/*
|
|
||||||
Copyright 2015 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package resourcequota
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
"k8s.io/client-go/informers"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
|
||||||
"k8s.io/kubernetes/pkg/quota/evaluator/core"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ReplenishmentFunc is a function that is invoked when controller sees a change
|
|
||||||
// that may require a quota to be replenished (i.e. object deletion, or object moved to terminal state)
|
|
||||||
type ReplenishmentFunc func(groupKind schema.GroupKind, namespace string, object runtime.Object)
|
|
||||||
|
|
||||||
// ReplenishmentControllerOptions is an options struct that tells a factory
|
|
||||||
// how to configure a controller that can inform the quota system it should
|
|
||||||
// replenish quota
|
|
||||||
type ReplenishmentControllerOptions struct {
|
|
||||||
// The kind monitored for replenishment
|
|
||||||
GroupKind schema.GroupKind
|
|
||||||
// The period that should be used to re-sync the monitored resource
|
|
||||||
ResyncPeriod controller.ResyncPeriodFunc
|
|
||||||
// The function to invoke when a change is observed that should trigger
|
|
||||||
// replenishment
|
|
||||||
ReplenishmentFunc ReplenishmentFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not
|
|
||||||
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions, clock clock.Clock) func(oldObj, newObj interface{}) {
|
|
||||||
return func(oldObj, newObj interface{}) {
|
|
||||||
oldPod := oldObj.(*v1.Pod)
|
|
||||||
newPod := newObj.(*v1.Pod)
|
|
||||||
if core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock) {
|
|
||||||
options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ObjectReplenishmentDeleteFunc will replenish on every delete
|
|
||||||
func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) {
|
|
||||||
return func(obj interface{}) {
|
|
||||||
metaObject, err := meta.Accessor(obj)
|
|
||||||
if err != nil {
|
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod())
|
|
||||||
utilruntime.HandleError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
metaObject, err = meta.Accessor(tombstone.Obj)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod())
|
|
||||||
utilruntime.HandleError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReplenishmentControllerFactory knows how to build replenishment controllers
|
|
||||||
type ReplenishmentControllerFactory interface {
|
|
||||||
// NewController returns a controller configured with the specified options.
|
|
||||||
// This method is NOT thread-safe.
|
|
||||||
NewController(options *ReplenishmentControllerOptions) (cache.Controller, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// replenishmentControllerFactory implements ReplenishmentControllerFactory
|
|
||||||
type replenishmentControllerFactory struct {
|
|
||||||
sharedInformerFactory informers.SharedInformerFactory
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers
|
|
||||||
// to replenish resources when updated or deleted
|
|
||||||
func NewReplenishmentControllerFactory(f informers.SharedInformerFactory) ReplenishmentControllerFactory {
|
|
||||||
return &replenishmentControllerFactory{
|
|
||||||
sharedInformerFactory: f,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) {
|
|
||||||
var (
|
|
||||||
informer informers.GenericInformer
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
switch options.GroupKind {
|
|
||||||
case api.Kind("Pod"):
|
|
||||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("pods"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
clock := clock.RealClock{}
|
|
||||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
UpdateFunc: PodReplenishmentUpdateFunc(options, clock),
|
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
|
||||||
},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
)
|
|
||||||
case api.Kind("Service"):
|
|
||||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("services"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
UpdateFunc: ServiceReplenishmentUpdateFunc(options),
|
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
|
||||||
},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
)
|
|
||||||
case api.Kind("ReplicationController"):
|
|
||||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("replicationcontrollers"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
|
||||||
},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
)
|
|
||||||
case api.Kind("PersistentVolumeClaim"):
|
|
||||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
|
||||||
},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
)
|
|
||||||
case api.Kind("Secret"):
|
|
||||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("secrets"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
|
||||||
},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
)
|
|
||||||
case api.Kind("ConfigMap"):
|
|
||||||
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("configmaps"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
informer.Informer().AddEventHandlerWithResyncPeriod(
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
|
||||||
},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
)
|
|
||||||
default:
|
|
||||||
return nil, NewUnhandledGroupKindError(options.GroupKind)
|
|
||||||
}
|
|
||||||
return informer.Informer().GetController(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServiceReplenishmentUpdateFunc will replenish if the service was quota tracked has changed service type
|
|
||||||
func ServiceReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
|
|
||||||
return func(oldObj, newObj interface{}) {
|
|
||||||
oldService := oldObj.(*v1.Service)
|
|
||||||
newService := newObj.(*v1.Service)
|
|
||||||
if core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService) {
|
|
||||||
options.ReplenishmentFunc(options.GroupKind, newService.Namespace, nil)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type unhandledKindErr struct {
|
|
||||||
kind schema.GroupKind
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e unhandledKindErr) Error() string {
|
|
||||||
return fmt.Sprintf("no replenishment controller available for %s", e.kind)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewUnhandledGroupKindError(kind schema.GroupKind) error {
|
|
||||||
return unhandledKindErr{kind: kind}
|
|
||||||
}
|
|
||||||
|
|
||||||
func IsUnhandledGroupKindError(err error) bool {
|
|
||||||
if err == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
_, ok := err.(unhandledKindErr)
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnionReplenishmentControllerFactory iterates through its constituent factories ignoring, UnhandledGroupKindErrors
|
|
||||||
// returning the first success or failure it hits. If there are no hits either way, it return an UnhandledGroupKind error
|
|
||||||
type UnionReplenishmentControllerFactory []ReplenishmentControllerFactory
|
|
||||||
|
|
||||||
func (f UnionReplenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) {
|
|
||||||
for _, factory := range f {
|
|
||||||
controller, err := factory.NewController(options)
|
|
||||||
if !IsUnhandledGroupKindError(err) {
|
|
||||||
return controller, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, NewUnhandledGroupKindError(options.GroupKind)
|
|
||||||
}
|
|
|
@ -1,160 +0,0 @@
|
||||||
/*
|
|
||||||
Copyright 2016 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package resourcequota
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
|
||||||
)
|
|
||||||
|
|
||||||
// testReplenishment lets us test replenishment functions are invoked
|
|
||||||
type testReplenishment struct {
|
|
||||||
groupKind schema.GroupKind
|
|
||||||
namespace string
|
|
||||||
}
|
|
||||||
|
|
||||||
// mock function that holds onto the last kind that was replenished
|
|
||||||
func (t *testReplenishment) Replenish(groupKind schema.GroupKind, namespace string, object runtime.Object) {
|
|
||||||
t.groupKind = groupKind
|
|
||||||
t.namespace = namespace
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPodReplenishmentUpdateFunc(t *testing.T) {
|
|
||||||
mockReplenish := &testReplenishment{}
|
|
||||||
options := ReplenishmentControllerOptions{
|
|
||||||
GroupKind: api.Kind("Pod"),
|
|
||||||
ReplenishmentFunc: mockReplenish.Replenish,
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
|
||||||
oldPod := &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"},
|
|
||||||
Status: v1.PodStatus{Phase: v1.PodRunning},
|
|
||||||
}
|
|
||||||
newPod := &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"},
|
|
||||||
Status: v1.PodStatus{Phase: v1.PodFailed},
|
|
||||||
}
|
|
||||||
fakeClock := clock.NewFakeClock(time.Now())
|
|
||||||
updateFunc := PodReplenishmentUpdateFunc(&options, fakeClock)
|
|
||||||
updateFunc(oldPod, newPod)
|
|
||||||
if mockReplenish.groupKind != api.Kind("Pod") {
|
|
||||||
t.Errorf("Unexpected group kind %v", mockReplenish.groupKind)
|
|
||||||
}
|
|
||||||
if mockReplenish.namespace != oldPod.Namespace {
|
|
||||||
t.Errorf("Unexpected namespace %v", mockReplenish.namespace)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestObjectReplenishmentDeleteFunc(t *testing.T) {
|
|
||||||
mockReplenish := &testReplenishment{}
|
|
||||||
options := ReplenishmentControllerOptions{
|
|
||||||
GroupKind: api.Kind("Pod"),
|
|
||||||
ReplenishmentFunc: mockReplenish.Replenish,
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
|
||||||
oldPod := &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"},
|
|
||||||
Status: v1.PodStatus{Phase: v1.PodRunning},
|
|
||||||
}
|
|
||||||
deleteFunc := ObjectReplenishmentDeleteFunc(&options)
|
|
||||||
deleteFunc(oldPod)
|
|
||||||
if mockReplenish.groupKind != api.Kind("Pod") {
|
|
||||||
t.Errorf("Unexpected group kind %v", mockReplenish.groupKind)
|
|
||||||
}
|
|
||||||
if mockReplenish.namespace != oldPod.Namespace {
|
|
||||||
t.Errorf("Unexpected namespace %v", mockReplenish.namespace)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestServiceReplenishmentUpdateFunc(t *testing.T) {
|
|
||||||
mockReplenish := &testReplenishment{}
|
|
||||||
options := ReplenishmentControllerOptions{
|
|
||||||
GroupKind: api.Kind("Service"),
|
|
||||||
ReplenishmentFunc: mockReplenish.Replenish,
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
|
||||||
oldService := &v1.Service{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "mysvc"},
|
|
||||||
Spec: v1.ServiceSpec{
|
|
||||||
Type: v1.ServiceTypeNodePort,
|
|
||||||
Ports: []v1.ServicePort{{
|
|
||||||
Port: 80,
|
|
||||||
TargetPort: intstr.FromInt(80),
|
|
||||||
}},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
newService := &v1.Service{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "mysvc"},
|
|
||||||
Spec: v1.ServiceSpec{
|
|
||||||
Type: v1.ServiceTypeClusterIP,
|
|
||||||
Ports: []v1.ServicePort{{
|
|
||||||
Port: 80,
|
|
||||||
TargetPort: intstr.FromInt(80),
|
|
||||||
}}},
|
|
||||||
}
|
|
||||||
updateFunc := ServiceReplenishmentUpdateFunc(&options)
|
|
||||||
updateFunc(oldService, newService)
|
|
||||||
if mockReplenish.groupKind != api.Kind("Service") {
|
|
||||||
t.Errorf("Unexpected group kind %v", mockReplenish.groupKind)
|
|
||||||
}
|
|
||||||
if mockReplenish.namespace != oldService.Namespace {
|
|
||||||
t.Errorf("Unexpected namespace %v", mockReplenish.namespace)
|
|
||||||
}
|
|
||||||
|
|
||||||
mockReplenish = &testReplenishment{}
|
|
||||||
options = ReplenishmentControllerOptions{
|
|
||||||
GroupKind: api.Kind("Service"),
|
|
||||||
ReplenishmentFunc: mockReplenish.Replenish,
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
|
||||||
oldService = &v1.Service{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "mysvc"},
|
|
||||||
Spec: v1.ServiceSpec{
|
|
||||||
Type: v1.ServiceTypeNodePort,
|
|
||||||
Ports: []v1.ServicePort{{
|
|
||||||
Port: 80,
|
|
||||||
TargetPort: intstr.FromInt(80),
|
|
||||||
}},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
newService = &v1.Service{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "mysvc"},
|
|
||||||
Spec: v1.ServiceSpec{
|
|
||||||
Type: v1.ServiceTypeNodePort,
|
|
||||||
Ports: []v1.ServicePort{{
|
|
||||||
Port: 81,
|
|
||||||
TargetPort: intstr.FromInt(81),
|
|
||||||
}}},
|
|
||||||
}
|
|
||||||
updateFunc = ServiceReplenishmentUpdateFunc(&options)
|
|
||||||
updateFunc(oldService, newService)
|
|
||||||
if mockReplenish.groupKind == api.Kind("Service") {
|
|
||||||
t.Errorf("Unexpected group kind %v", mockReplenish.groupKind)
|
|
||||||
}
|
|
||||||
if mockReplenish.namespace == oldService.Namespace {
|
|
||||||
t.Errorf("Unexpected namespace %v", mockReplenish.namespace)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -18,6 +18,8 @@ package resourcequota
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
@ -27,10 +29,11 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/discovery"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
corelisters "k8s.io/client-go/listers/core/v1"
|
corelisters "k8s.io/client-go/listers/core/v1"
|
||||||
|
@ -42,6 +45,19 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/quota"
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NamespacedResourcesFunc knows how to discover namespaced resources.
|
||||||
|
type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
|
||||||
|
|
||||||
|
// ReplenishmentFunc is a signal that a resource changed in specified namespace
|
||||||
|
// that may require quota to be recalculated.
|
||||||
|
type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)
|
||||||
|
|
||||||
|
// InformerFactory is all the quota system needs to interface with informers.
|
||||||
|
type InformerFactory interface {
|
||||||
|
ForResource(resource schema.GroupVersionResource) (informers.GenericInformer, error)
|
||||||
|
Start(stopCh <-chan struct{})
|
||||||
|
}
|
||||||
|
|
||||||
// ResourceQuotaControllerOptions holds options for creating a quota controller
|
// ResourceQuotaControllerOptions holds options for creating a quota controller
|
||||||
type ResourceQuotaControllerOptions struct {
|
type ResourceQuotaControllerOptions struct {
|
||||||
// Must have authority to list all quotas, and update quota status
|
// Must have authority to list all quotas, and update quota status
|
||||||
|
@ -50,15 +66,18 @@ type ResourceQuotaControllerOptions struct {
|
||||||
ResourceQuotaInformer coreinformers.ResourceQuotaInformer
|
ResourceQuotaInformer coreinformers.ResourceQuotaInformer
|
||||||
// Controls full recalculation of quota usage
|
// Controls full recalculation of quota usage
|
||||||
ResyncPeriod controller.ResyncPeriodFunc
|
ResyncPeriod controller.ResyncPeriodFunc
|
||||||
// Knows how to calculate usage
|
// Maintains evaluators that know how to calculate usage for group resource
|
||||||
Registry quota.Registry
|
Registry quota.Registry
|
||||||
// Knows how to build controllers that notify replenishment events
|
// Discover list of supported resources on the server.
|
||||||
ControllerFactory ReplenishmentControllerFactory
|
DiscoveryFunc NamespacedResourcesFunc
|
||||||
|
// A function that returns the list of resources to ignore
|
||||||
|
IgnoredResourcesFunc func() map[schema.GroupResource]struct{}
|
||||||
|
// InformersStarted knows if informers were started.
|
||||||
|
InformersStarted <-chan struct{}
|
||||||
|
// InformerFactory interfaces with informers.
|
||||||
|
InformerFactory InformerFactory
|
||||||
// Controls full resync of objects monitored for replenishment.
|
// Controls full resync of objects monitored for replenishment.
|
||||||
ReplenishmentResyncPeriod controller.ResyncPeriodFunc
|
ReplenishmentResyncPeriod controller.ResyncPeriodFunc
|
||||||
// List of GroupKind objects that should be monitored for replenishment at
|
|
||||||
// a faster frequency than the quota controller recalculation interval
|
|
||||||
GroupKindsToReplenish []schema.GroupKind
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResourceQuotaController is responsible for tracking quota usage status in the system
|
// ResourceQuotaController is responsible for tracking quota usage status in the system
|
||||||
|
@ -79,9 +98,16 @@ type ResourceQuotaController struct {
|
||||||
resyncPeriod controller.ResyncPeriodFunc
|
resyncPeriod controller.ResyncPeriodFunc
|
||||||
// knows how to calculate usage
|
// knows how to calculate usage
|
||||||
registry quota.Registry
|
registry quota.Registry
|
||||||
|
// knows how to monitor all the resources tracked by quota and trigger replenishment
|
||||||
|
quotaMonitor *QuotaMonitor
|
||||||
|
// controls the workers that process quotas
|
||||||
|
// this lock is acquired to control write access to the monitors and ensures that all
|
||||||
|
// monitors are synced before the controller can process quotas.
|
||||||
|
workerLock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
|
// NewResourceQuotaController creates a quota controller with specified options
|
||||||
|
func NewResourceQuotaController(options *ResourceQuotaControllerOptions) (*ResourceQuotaController, error) {
|
||||||
// build the resource quota controller
|
// build the resource quota controller
|
||||||
rq := &ResourceQuotaController{
|
rq := &ResourceQuotaController{
|
||||||
rqClient: options.QuotaClient,
|
rqClient: options.QuotaClient,
|
||||||
|
@ -122,21 +148,30 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
||||||
rq.resyncPeriod(),
|
rq.resyncPeriod(),
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, groupKindToReplenish := range options.GroupKindsToReplenish {
|
qm := &QuotaMonitor{
|
||||||
controllerOptions := &ReplenishmentControllerOptions{
|
informersStarted: options.InformersStarted,
|
||||||
GroupKind: groupKindToReplenish,
|
informerFactory: options.InformerFactory,
|
||||||
ResyncPeriod: options.ReplenishmentResyncPeriod,
|
ignoredResources: options.IgnoredResourcesFunc(),
|
||||||
ReplenishmentFunc: rq.replenishQuota,
|
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
|
||||||
}
|
resyncPeriod: options.ReplenishmentResyncPeriod,
|
||||||
replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
|
replenishmentFunc: rq.replenishQuota,
|
||||||
if err != nil {
|
registry: rq.registry,
|
||||||
glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
|
|
||||||
} else {
|
|
||||||
// make sure we wait for each shared informer's cache to sync
|
|
||||||
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, replenishmentController.HasSynced)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return rq
|
rq.quotaMonitor = qm
|
||||||
|
|
||||||
|
// do initial quota monitor setup
|
||||||
|
resources, err := GetQuotableResources(options.DiscoveryFunc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = qm.syncMonitors(resources); err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
// only start quota once all informers synced
|
||||||
|
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
|
||||||
|
|
||||||
|
return rq, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
|
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
|
||||||
|
@ -186,7 +221,7 @@ func (rq *ResourceQuotaController) addQuota(obj interface{}) {
|
||||||
for constraint := range resourceQuota.Status.Hard {
|
for constraint := range resourceQuota.Status.Hard {
|
||||||
if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
|
if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
|
||||||
matchedResources := []api.ResourceName{api.ResourceName(constraint)}
|
matchedResources := []api.ResourceName{api.ResourceName(constraint)}
|
||||||
for _, evaluator := range rq.registry.Evaluators() {
|
for _, evaluator := range rq.registry.List() {
|
||||||
if intersection := evaluator.MatchingResources(matchedResources); len(intersection) > 0 {
|
if intersection := evaluator.MatchingResources(matchedResources); len(intersection) > 0 {
|
||||||
rq.missingUsageQueue.Add(key)
|
rq.missingUsageQueue.Add(key)
|
||||||
return
|
return
|
||||||
|
@ -202,6 +237,10 @@ func (rq *ResourceQuotaController) addQuota(obj interface{}) {
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
|
func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
|
||||||
workFunc := func() bool {
|
workFunc := func() bool {
|
||||||
|
|
||||||
|
rq.workerLock.RLock()
|
||||||
|
defer rq.workerLock.RUnlock()
|
||||||
|
|
||||||
key, quit := queue.Get()
|
key, quit := queue.Get()
|
||||||
if quit {
|
if quit {
|
||||||
return true
|
return true
|
||||||
|
@ -235,6 +274,8 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
glog.Infof("Starting resource quota controller")
|
glog.Infof("Starting resource quota controller")
|
||||||
defer glog.Infof("Shutting down resource quota controller")
|
defer glog.Infof("Shutting down resource quota controller")
|
||||||
|
|
||||||
|
go rq.quotaMonitor.Run(stopCh)
|
||||||
|
|
||||||
if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
|
if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -336,11 +377,10 @@ func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota *v1.Resourc
|
||||||
}
|
}
|
||||||
|
|
||||||
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
|
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
|
||||||
func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, namespace string, object runtime.Object) {
|
func (rq *ResourceQuotaController) replenishQuota(groupResource schema.GroupResource, namespace string) {
|
||||||
// check if the quota controller can evaluate this kind, if not, ignore it altogether...
|
// check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
|
||||||
evaluators := rq.registry.Evaluators()
|
evaluator := rq.registry.Get(groupResource)
|
||||||
evaluator, found := evaluators[groupKind]
|
if evaluator == nil {
|
||||||
if !found {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,3 +413,66 @@ func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, na
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync periodically resyncs the controller when new resources are observed from discovery.
|
||||||
|
func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, period time.Duration, stopCh <-chan struct{}) {
|
||||||
|
// Something has changed, so track the new state and perform a sync.
|
||||||
|
oldResources := make(map[schema.GroupVersionResource]struct{})
|
||||||
|
wait.Until(func() {
|
||||||
|
// Get the current resource list from discovery.
|
||||||
|
newResources, err := GetQuotableResources(discoveryFunc)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decide whether discovery has reported a change.
|
||||||
|
if reflect.DeepEqual(oldResources, newResources) {
|
||||||
|
glog.V(4).Infof("no resource updates from discovery, skipping resource quota sync")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Something has changed, so track the new state and perform a sync.
|
||||||
|
glog.V(2).Infof("syncing resource quota controller with updated resources from discovery: %v", newResources)
|
||||||
|
oldResources = newResources
|
||||||
|
|
||||||
|
// Ensure workers are paused to avoid processing events before informers
|
||||||
|
// have resynced.
|
||||||
|
rq.workerLock.Lock()
|
||||||
|
defer rq.workerLock.Unlock()
|
||||||
|
|
||||||
|
// Perform the monitor resync and wait for controllers to report cache sync.
|
||||||
|
if err := rq.resyncMonitors(newResources); err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
|
||||||
|
}
|
||||||
|
}, period, stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// resyncMonitors starts or stops quota monitors as needed to ensure that all
|
||||||
|
// (and only) those resources present in the map are monitored.
|
||||||
|
func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||||
|
if err := rq.quotaMonitor.syncMonitors(resources); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rq.quotaMonitor.startMonitors()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetQuotableResources returns all resources that the quota system should recognize.
|
||||||
|
// It requires a resource supports the following verbs: 'create','list','delete'
|
||||||
|
func GetQuotableResources(discoveryFunc NamespacedResourcesFunc) (map[schema.GroupVersionResource]struct{}, error) {
|
||||||
|
possibleResources, err := discoveryFunc()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to discover resources: %v", err)
|
||||||
|
}
|
||||||
|
quotableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"create", "list", "delete"}}, possibleResources)
|
||||||
|
quotableGroupVersionResources, err := discovery.GroupVersionResources(quotableResources)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to parse resources: %v", err)
|
||||||
|
}
|
||||||
|
return quotableGroupVersionResources, nil
|
||||||
|
}
|
||||||
|
|
|
@ -17,19 +17,23 @@ limitations under the License.
|
||||||
package resourcequota
|
package resourcequota
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
"k8s.io/kubernetes/pkg/quota/generic"
|
"k8s.io/kubernetes/pkg/quota/generic"
|
||||||
"k8s.io/kubernetes/pkg/quota/install"
|
"k8s.io/kubernetes/pkg/quota/install"
|
||||||
)
|
)
|
||||||
|
@ -52,383 +56,249 @@ func getResourceRequirements(requests, limits v1.ResourceList) v1.ResourceRequir
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mockDiscoveryFunc() ([]*metav1.APIResourceList, error) {
|
||||||
|
return []*metav1.APIResourceList{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func mockListerForResourceFunc(listersForResource map[schema.GroupVersionResource]cache.GenericLister) quota.ListerForResourceFunc {
|
||||||
|
return func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
|
||||||
|
lister, found := listersForResource[gvr]
|
||||||
|
if !found {
|
||||||
|
return nil, fmt.Errorf("no lister found for resource")
|
||||||
|
}
|
||||||
|
return lister, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGenericLister(groupResource schema.GroupResource, items []runtime.Object) cache.GenericLister {
|
||||||
|
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||||
|
for _, item := range items {
|
||||||
|
store.Add(item)
|
||||||
|
}
|
||||||
|
return cache.NewGenericLister(store, groupResource)
|
||||||
|
}
|
||||||
|
|
||||||
|
type quotaController struct {
|
||||||
|
*ResourceQuotaController
|
||||||
|
stop chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc) quotaController {
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||||
|
quotaConfiguration := install.NewQuotaConfigurationForControllers(lister)
|
||||||
|
alwaysStarted := make(chan struct{})
|
||||||
|
close(alwaysStarted)
|
||||||
|
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||||
|
QuotaClient: kubeClient.Core(),
|
||||||
|
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||||
|
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
|
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
|
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
|
||||||
|
DiscoveryFunc: mockDiscoveryFunc,
|
||||||
|
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
|
||||||
|
InformersStarted: alwaysStarted,
|
||||||
|
}
|
||||||
|
qc, err := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
stop := make(chan struct{})
|
||||||
|
go informerFactory.Start(stop)
|
||||||
|
return quotaController{qc, stop}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestPods() []runtime.Object {
|
||||||
|
return []runtime.Object{
|
||||||
|
&v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "pod-running", Namespace: "testing"},
|
||||||
|
Status: v1.PodStatus{Phase: v1.PodRunning},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{{Name: "vol"}},
|
||||||
|
Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "pod-running-2", Namespace: "testing"},
|
||||||
|
Status: v1.PodStatus{Phase: v1.PodRunning},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{{Name: "vol"}},
|
||||||
|
Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "pod-failed", Namespace: "testing"},
|
||||||
|
Status: v1.PodStatus{Phase: v1.PodFailed},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{{Name: "vol"}},
|
||||||
|
Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSyncResourceQuota(t *testing.T) {
|
func TestSyncResourceQuota(t *testing.T) {
|
||||||
podList := v1.PodList{
|
testCases := map[string]struct {
|
||||||
Items: []v1.Pod{
|
gvr schema.GroupVersionResource
|
||||||
{
|
items []runtime.Object
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-running", Namespace: "testing"},
|
quota v1.ResourceQuota
|
||||||
Status: v1.PodStatus{Phase: v1.PodRunning},
|
status v1.ResourceQuotaStatus
|
||||||
Spec: v1.PodSpec{
|
expectedActionSet sets.String
|
||||||
Volumes: []v1.Volume{{Name: "vol"}},
|
}{
|
||||||
Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}},
|
"pods": {
|
||||||
|
gvr: v1.SchemeGroupVersion.WithResource("pods"),
|
||||||
|
quota: v1.ResourceQuota{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "testing"},
|
||||||
|
Spec: v1.ResourceQuotaSpec{
|
||||||
|
Hard: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("3"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("100Gi"),
|
||||||
|
v1.ResourcePods: resource.MustParse("5"),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
status: v1.ResourceQuotaStatus{
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-running-2", Namespace: "testing"},
|
Hard: v1.ResourceList{
|
||||||
Status: v1.PodStatus{Phase: v1.PodRunning},
|
v1.ResourceCPU: resource.MustParse("3"),
|
||||||
Spec: v1.PodSpec{
|
v1.ResourceMemory: resource.MustParse("100Gi"),
|
||||||
Volumes: []v1.Volume{{Name: "vol"}},
|
v1.ResourcePods: resource.MustParse("5"),
|
||||||
Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}},
|
},
|
||||||
|
Used: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("200m"),
|
||||||
|
v1.ResourceMemory: resource.MustParse("2Gi"),
|
||||||
|
v1.ResourcePods: resource.MustParse("2"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
expectedActionSet: sets.NewString(
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "pod-failed", Namespace: "testing"},
|
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
|
||||||
Status: v1.PodStatus{Phase: v1.PodFailed},
|
),
|
||||||
Spec: v1.PodSpec{
|
items: newTestPods(),
|
||||||
Volumes: []v1.Volume{{Name: "vol"}},
|
},
|
||||||
Containers: []v1.Container{{Name: "ctr", Image: "image", Resources: getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))}},
|
"quota-spec-hard-updated": {
|
||||||
|
gvr: v1.SchemeGroupVersion.WithResource("pods"),
|
||||||
|
quota: v1.ResourceQuota{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "default",
|
||||||
|
Name: "rq",
|
||||||
|
},
|
||||||
|
Spec: v1.ResourceQuotaSpec{
|
||||||
|
Hard: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("4"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: v1.ResourceQuotaStatus{
|
||||||
|
Hard: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("3"),
|
||||||
|
},
|
||||||
|
Used: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("0"),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
status: v1.ResourceQuotaStatus{
|
||||||
|
Hard: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("4"),
|
||||||
|
},
|
||||||
|
Used: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("0"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedActionSet: sets.NewString(
|
||||||
|
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
|
||||||
|
),
|
||||||
|
items: []runtime.Object{},
|
||||||
},
|
},
|
||||||
}
|
"quota-unchanged": {
|
||||||
resourceQuota := v1.ResourceQuota{
|
gvr: v1.SchemeGroupVersion.WithResource("pods"),
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "testing"},
|
quota: v1.ResourceQuota{
|
||||||
Spec: v1.ResourceQuotaSpec{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Hard: v1.ResourceList{
|
Namespace: "default",
|
||||||
v1.ResourceCPU: resource.MustParse("3"),
|
Name: "rq",
|
||||||
v1.ResourceMemory: resource.MustParse("100Gi"),
|
},
|
||||||
v1.ResourcePods: resource.MustParse("5"),
|
Spec: v1.ResourceQuotaSpec{
|
||||||
|
Hard: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("4"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: v1.ResourceQuotaStatus{
|
||||||
|
Hard: v1.ResourceList{
|
||||||
|
v1.ResourceCPU: resource.MustParse("0"),
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
status: v1.ResourceQuotaStatus{
|
||||||
}
|
Hard: v1.ResourceList{
|
||||||
expectedUsage := v1.ResourceQuota{
|
v1.ResourceCPU: resource.MustParse("4"),
|
||||||
Status: v1.ResourceQuotaStatus{
|
},
|
||||||
Hard: v1.ResourceList{
|
Used: v1.ResourceList{
|
||||||
v1.ResourceCPU: resource.MustParse("3"),
|
v1.ResourceCPU: resource.MustParse("0"),
|
||||||
v1.ResourceMemory: resource.MustParse("100Gi"),
|
},
|
||||||
v1.ResourcePods: resource.MustParse("5"),
|
|
||||||
},
|
|
||||||
Used: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("200m"),
|
|
||||||
v1.ResourceMemory: resource.MustParse("2Gi"),
|
|
||||||
v1.ResourcePods: resource.MustParse("2"),
|
|
||||||
},
|
},
|
||||||
|
expectedActionSet: sets.NewString(),
|
||||||
|
items: []runtime.Object{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(&podList, &resourceQuota)
|
for testName, testCase := range testCases {
|
||||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
kubeClient := fake.NewSimpleClientset(&testCase.quota)
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
|
||||||
QuotaClient: kubeClient.Core(),
|
testCase.gvr: newGenericLister(testCase.gvr.GroupResource(), testCase.items),
|
||||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
|
||||||
api.Kind("Pod"),
|
|
||||||
api.Kind("Service"),
|
|
||||||
api.Kind("ReplicationController"),
|
|
||||||
api.Kind("PersistentVolumeClaim"),
|
|
||||||
},
|
|
||||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
|
||||||
err := quotaController.syncResourceQuota(&resourceQuota)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error %v", err)
|
|
||||||
}
|
|
||||||
expectedActionSet := sets.NewString(
|
|
||||||
strings.Join([]string{"list", "pods", ""}, "-"),
|
|
||||||
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
|
|
||||||
)
|
|
||||||
actionSet := sets.NewString()
|
|
||||||
for _, action := range kubeClient.Actions() {
|
|
||||||
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
|
|
||||||
}
|
|
||||||
if !actionSet.HasAll(expectedActionSet.List()...) {
|
|
||||||
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
|
|
||||||
}
|
|
||||||
|
|
||||||
lastActionIndex := len(kubeClient.Actions()) - 1
|
|
||||||
usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota)
|
|
||||||
|
|
||||||
// ensure hard and used limits are what we expected
|
|
||||||
for k, v := range expectedUsage.Status.Hard {
|
|
||||||
actual := usage.Status.Hard[k]
|
|
||||||
actualValue := actual.String()
|
|
||||||
expectedValue := v.String()
|
|
||||||
if expectedValue != actualValue {
|
|
||||||
t.Errorf("Usage Hard: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
|
|
||||||
}
|
}
|
||||||
}
|
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig))
|
||||||
for k, v := range expectedUsage.Status.Used {
|
defer close(qc.stop)
|
||||||
actual := usage.Status.Used[k]
|
|
||||||
actualValue := actual.String()
|
if err := qc.syncResourceQuota(&testCase.quota); err != nil {
|
||||||
expectedValue := v.String()
|
t.Fatalf("test: %s, unexpected error: %v", testName, err)
|
||||||
if expectedValue != actualValue {
|
|
||||||
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSyncResourceQuotaSpecChange(t *testing.T) {
|
actionSet := sets.NewString()
|
||||||
resourceQuota := v1.ResourceQuota{
|
for _, action := range kubeClient.Actions() {
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
|
||||||
Namespace: "default",
|
|
||||||
Name: "rq",
|
|
||||||
},
|
|
||||||
Spec: v1.ResourceQuotaSpec{
|
|
||||||
Hard: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("4"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Status: v1.ResourceQuotaStatus{
|
|
||||||
Hard: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("3"),
|
|
||||||
},
|
|
||||||
Used: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("0"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedUsage := v1.ResourceQuota{
|
|
||||||
Status: v1.ResourceQuotaStatus{
|
|
||||||
Hard: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("4"),
|
|
||||||
},
|
|
||||||
Used: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("0"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(&resourceQuota)
|
|
||||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
|
||||||
QuotaClient: kubeClient.Core(),
|
|
||||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
|
||||||
api.Kind("Pod"),
|
|
||||||
api.Kind("Service"),
|
|
||||||
api.Kind("ReplicationController"),
|
|
||||||
api.Kind("PersistentVolumeClaim"),
|
|
||||||
},
|
|
||||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
|
||||||
err := quotaController.syncResourceQuota(&resourceQuota)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedActionSet := sets.NewString(
|
|
||||||
strings.Join([]string{"list", "pods", ""}, "-"),
|
|
||||||
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
|
|
||||||
)
|
|
||||||
actionSet := sets.NewString()
|
|
||||||
for _, action := range kubeClient.Actions() {
|
|
||||||
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
|
|
||||||
}
|
|
||||||
if !actionSet.HasAll(expectedActionSet.List()...) {
|
|
||||||
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
|
|
||||||
}
|
|
||||||
|
|
||||||
lastActionIndex := len(kubeClient.Actions()) - 1
|
|
||||||
usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota)
|
|
||||||
|
|
||||||
// ensure hard and used limits are what we expected
|
|
||||||
for k, v := range expectedUsage.Status.Hard {
|
|
||||||
actual := usage.Status.Hard[k]
|
|
||||||
actualValue := actual.String()
|
|
||||||
expectedValue := v.String()
|
|
||||||
if expectedValue != actualValue {
|
|
||||||
t.Errorf("Usage Hard: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
|
|
||||||
}
|
}
|
||||||
}
|
if !actionSet.HasAll(testCase.expectedActionSet.List()...) {
|
||||||
for k, v := range expectedUsage.Status.Used {
|
t.Errorf("test: %s,\nExpected actions:\n%v\n but got:\n%v\nDifference:\n%v", testName, testCase.expectedActionSet, actionSet, testCase.expectedActionSet.Difference(actionSet))
|
||||||
actual := usage.Status.Used[k]
|
|
||||||
actualValue := actual.String()
|
|
||||||
expectedValue := v.String()
|
|
||||||
if expectedValue != actualValue {
|
|
||||||
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
}
|
lastActionIndex := len(kubeClient.Actions()) - 1
|
||||||
func TestSyncResourceQuotaSpecHardChange(t *testing.T) {
|
usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota)
|
||||||
resourceQuota := v1.ResourceQuota{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Namespace: "default",
|
|
||||||
Name: "rq",
|
|
||||||
},
|
|
||||||
Spec: v1.ResourceQuotaSpec{
|
|
||||||
Hard: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("4"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Status: v1.ResourceQuotaStatus{
|
|
||||||
Hard: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("3"),
|
|
||||||
v1.ResourceMemory: resource.MustParse("1Gi"),
|
|
||||||
},
|
|
||||||
Used: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("0"),
|
|
||||||
v1.ResourceMemory: resource.MustParse("0"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedUsage := v1.ResourceQuota{
|
// ensure usage is as expected
|
||||||
Status: v1.ResourceQuotaStatus{
|
if len(usage.Status.Hard) != len(testCase.status.Hard) {
|
||||||
Hard: v1.ResourceList{
|
t.Errorf("test: %s, status hard lengths do not match", testName)
|
||||||
v1.ResourceCPU: resource.MustParse("4"),
|
|
||||||
},
|
|
||||||
Used: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("0"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(&resourceQuota)
|
|
||||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
|
||||||
QuotaClient: kubeClient.Core(),
|
|
||||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
|
||||||
api.Kind("Pod"),
|
|
||||||
api.Kind("Service"),
|
|
||||||
api.Kind("ReplicationController"),
|
|
||||||
api.Kind("PersistentVolumeClaim"),
|
|
||||||
},
|
|
||||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
|
||||||
err := quotaController.syncResourceQuota(&resourceQuota)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedActionSet := sets.NewString(
|
|
||||||
strings.Join([]string{"list", "pods", ""}, "-"),
|
|
||||||
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
|
|
||||||
)
|
|
||||||
actionSet := sets.NewString()
|
|
||||||
for _, action := range kubeClient.Actions() {
|
|
||||||
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
|
|
||||||
}
|
|
||||||
if !actionSet.HasAll(expectedActionSet.List()...) {
|
|
||||||
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
|
|
||||||
}
|
|
||||||
|
|
||||||
lastActionIndex := len(kubeClient.Actions()) - 1
|
|
||||||
usage := kubeClient.Actions()[lastActionIndex].(core.UpdateAction).GetObject().(*v1.ResourceQuota)
|
|
||||||
|
|
||||||
// ensure hard and used limits are what we expected
|
|
||||||
for k, v := range expectedUsage.Status.Hard {
|
|
||||||
actual := usage.Status.Hard[k]
|
|
||||||
actualValue := actual.String()
|
|
||||||
expectedValue := v.String()
|
|
||||||
if expectedValue != actualValue {
|
|
||||||
t.Errorf("Usage Hard: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
|
|
||||||
}
|
}
|
||||||
}
|
if len(usage.Status.Used) != len(testCase.status.Used) {
|
||||||
for k, v := range expectedUsage.Status.Used {
|
t.Errorf("test: %s, status used lengths do not match", testName)
|
||||||
actual := usage.Status.Used[k]
|
|
||||||
actualValue := actual.String()
|
|
||||||
expectedValue := v.String()
|
|
||||||
if expectedValue != actualValue {
|
|
||||||
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
|
|
||||||
}
|
}
|
||||||
}
|
for k, v := range testCase.status.Hard {
|
||||||
|
actual := usage.Status.Hard[k]
|
||||||
// ensure usage hard and used are are synced with spec hard, not have dirty resource
|
actualValue := actual.String()
|
||||||
for k, v := range usage.Status.Hard {
|
expectedValue := v.String()
|
||||||
if k == v1.ResourceMemory {
|
if expectedValue != actualValue {
|
||||||
t.Errorf("Unexpected Usage Hard: Key: %v, Value: %v", k, v.String())
|
t.Errorf("test: %s, Usage Hard: Key: %v, Expected: %v, Actual: %v", testName, k, expectedValue, actualValue)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
for k, v := range testCase.status.Used {
|
||||||
|
actual := usage.Status.Used[k]
|
||||||
for k, v := range usage.Status.Used {
|
actualValue := actual.String()
|
||||||
if k == v1.ResourceMemory {
|
expectedValue := v.String()
|
||||||
t.Errorf("Unexpected Usage Used: Key: %v, Value: %v", k, v.String())
|
if expectedValue != actualValue {
|
||||||
|
t.Errorf("test: %s, Usage Used: Key: %v, Expected: %v, Actual: %v", testName, k, expectedValue, actualValue)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncResourceQuotaNoChange(t *testing.T) {
|
|
||||||
resourceQuota := v1.ResourceQuota{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Namespace: "default",
|
|
||||||
Name: "rq",
|
|
||||||
},
|
|
||||||
Spec: v1.ResourceQuotaSpec{
|
|
||||||
Hard: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("4"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Status: v1.ResourceQuotaStatus{
|
|
||||||
Hard: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("4"),
|
|
||||||
},
|
|
||||||
Used: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: resource.MustParse("0"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(&v1.PodList{}, &resourceQuota)
|
|
||||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
|
||||||
QuotaClient: kubeClient.Core(),
|
|
||||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
|
||||||
api.Kind("Pod"),
|
|
||||||
api.Kind("Service"),
|
|
||||||
api.Kind("ReplicationController"),
|
|
||||||
api.Kind("PersistentVolumeClaim"),
|
|
||||||
},
|
|
||||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
|
||||||
err := quotaController.syncResourceQuota(&resourceQuota)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error %v", err)
|
|
||||||
}
|
|
||||||
expectedActionSet := sets.NewString(
|
|
||||||
strings.Join([]string{"list", "pods", ""}, "-"),
|
|
||||||
)
|
|
||||||
actionSet := sets.NewString()
|
|
||||||
for _, action := range kubeClient.Actions() {
|
|
||||||
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
|
|
||||||
}
|
|
||||||
if !actionSet.HasAll(expectedActionSet.List()...) {
|
|
||||||
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddQuota(t *testing.T) {
|
func TestAddQuota(t *testing.T) {
|
||||||
kubeClient := fake.NewSimpleClientset()
|
kubeClient := fake.NewSimpleClientset()
|
||||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
gvr := v1.SchemeGroupVersion.WithResource("pods")
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
listersForResourceConfig := map[schema.GroupVersionResource]cache.GenericLister{
|
||||||
QuotaClient: kubeClient.Core(),
|
gvr: newGenericLister(gvr.GroupResource(), newTestPods()),
|
||||||
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
|
||||||
api.Kind("Pod"),
|
|
||||||
api.Kind("ReplicationController"),
|
|
||||||
api.Kind("PersistentVolumeClaim"),
|
|
||||||
},
|
|
||||||
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
|
||||||
}
|
}
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
|
||||||
|
|
||||||
delete(quotaController.registry.(*generic.GenericRegistry).InternalEvaluators, api.Kind("Service"))
|
qc := setupQuotaController(t, kubeClient, mockListerForResourceFunc(listersForResourceConfig))
|
||||||
|
defer close(qc.stop)
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
||||||
quota *v1.ResourceQuota
|
quota *v1.ResourceQuota
|
||||||
expectedPriority bool
|
expectedPriority bool
|
||||||
}{
|
}{
|
||||||
|
@ -491,7 +361,7 @@ func TestAddQuota(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "status, missing usage, but don't care",
|
name: "status, missing usage, but don't care (no informer)",
|
||||||
expectedPriority: false,
|
expectedPriority: false,
|
||||||
quota: &v1.ResourceQuota{
|
quota: &v1.ResourceQuota{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
@ -500,12 +370,12 @@ func TestAddQuota(t *testing.T) {
|
||||||
},
|
},
|
||||||
Spec: v1.ResourceQuotaSpec{
|
Spec: v1.ResourceQuotaSpec{
|
||||||
Hard: v1.ResourceList{
|
Hard: v1.ResourceList{
|
||||||
v1.ResourceServices: resource.MustParse("4"),
|
"count/foobars.example.com": resource.MustParse("4"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Status: v1.ResourceQuotaStatus{
|
Status: v1.ResourceQuotaStatus{
|
||||||
Hard: v1.ResourceList{
|
Hard: v1.ResourceList{
|
||||||
v1.ResourceServices: resource.MustParse("4"),
|
"count/foobars.example.com": resource.MustParse("4"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -536,30 +406,29 @@ func TestAddQuota(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
quotaController.addQuota(tc.quota)
|
qc.addQuota(tc.quota)
|
||||||
if tc.expectedPriority {
|
if tc.expectedPriority {
|
||||||
if e, a := 1, quotaController.missingUsageQueue.Len(); e != a {
|
if e, a := 1, qc.missingUsageQueue.Len(); e != a {
|
||||||
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
||||||
}
|
}
|
||||||
if e, a := 0, quotaController.queue.Len(); e != a {
|
if e, a := 0, qc.queue.Len(); e != a {
|
||||||
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if e, a := 0, quotaController.missingUsageQueue.Len(); e != a {
|
if e, a := 0, qc.missingUsageQueue.Len(); e != a {
|
||||||
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
||||||
}
|
}
|
||||||
if e, a := 1, quotaController.queue.Len(); e != a {
|
if e, a := 1, qc.queue.Len(); e != a {
|
||||||
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for qc.missingUsageQueue.Len() > 0 {
|
||||||
for quotaController.missingUsageQueue.Len() > 0 {
|
key, _ := qc.missingUsageQueue.Get()
|
||||||
key, _ := quotaController.missingUsageQueue.Get()
|
qc.missingUsageQueue.Done(key)
|
||||||
quotaController.missingUsageQueue.Done(key)
|
|
||||||
}
|
}
|
||||||
for quotaController.queue.Len() > 0 {
|
for qc.queue.Len() > 0 {
|
||||||
key, _ := quotaController.queue.Get()
|
key, _ := qc.queue.Get()
|
||||||
quotaController.queue.Done(key)
|
qc.queue.Done(key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,341 @@
|
||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package resourcequota
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
"k8s.io/client-go/util/workqueue"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
|
"k8s.io/kubernetes/pkg/quota/evaluator/core"
|
||||||
|
"k8s.io/kubernetes/pkg/quota/generic"
|
||||||
|
)
|
||||||
|
|
||||||
|
type eventType int
|
||||||
|
|
||||||
|
func (e eventType) String() string {
|
||||||
|
switch e {
|
||||||
|
case addEvent:
|
||||||
|
return "add"
|
||||||
|
case updateEvent:
|
||||||
|
return "update"
|
||||||
|
case deleteEvent:
|
||||||
|
return "delete"
|
||||||
|
default:
|
||||||
|
return fmt.Sprintf("unknown(%d)", int(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
addEvent eventType = iota
|
||||||
|
updateEvent
|
||||||
|
deleteEvent
|
||||||
|
)
|
||||||
|
|
||||||
|
type event struct {
|
||||||
|
eventType eventType
|
||||||
|
obj interface{}
|
||||||
|
oldObj interface{}
|
||||||
|
gvr schema.GroupVersionResource
|
||||||
|
}
|
||||||
|
|
||||||
|
type QuotaMonitor struct {
|
||||||
|
// each monitor list/watches a resource and determines if we should replenish quota
|
||||||
|
monitors monitors
|
||||||
|
monitorLock sync.Mutex
|
||||||
|
// informersStarted is closed after after all of the controllers have been initialized and are running.
|
||||||
|
// After that it is safe to start them here, before that it is not.
|
||||||
|
informersStarted <-chan struct{}
|
||||||
|
|
||||||
|
// stopCh drives shutdown. If it is nil, it indicates that Run() has not been
|
||||||
|
// called yet. If it is non-nil, then when closed it indicates everything
|
||||||
|
// should shut down.
|
||||||
|
//
|
||||||
|
// This channel is also protected by monitorLock.
|
||||||
|
stopCh <-chan struct{}
|
||||||
|
|
||||||
|
// monitors are the producer of the resourceChanges queue
|
||||||
|
resourceChanges workqueue.RateLimitingInterface
|
||||||
|
|
||||||
|
// interfaces with informers
|
||||||
|
informerFactory InformerFactory
|
||||||
|
|
||||||
|
// list of resources to ignore
|
||||||
|
ignoredResources map[schema.GroupResource]struct{}
|
||||||
|
|
||||||
|
// The period that should be used to re-sync the monitored resource
|
||||||
|
resyncPeriod controller.ResyncPeriodFunc
|
||||||
|
|
||||||
|
// callback to alert that a change may require quota recalculation
|
||||||
|
replenishmentFunc ReplenishmentFunc
|
||||||
|
|
||||||
|
// maintains list of evaluators
|
||||||
|
registry quota.Registry
|
||||||
|
}
|
||||||
|
|
||||||
|
// monitor runs a Controller with a local stop channel.
|
||||||
|
type monitor struct {
|
||||||
|
controller cache.Controller
|
||||||
|
|
||||||
|
// stopCh stops Controller. If stopCh is nil, the monitor is considered to be
|
||||||
|
// not yet started.
|
||||||
|
stopCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run is intended to be called in a goroutine. Multiple calls of this is an
|
||||||
|
// error.
|
||||||
|
func (m *monitor) Run() {
|
||||||
|
m.controller.Run(m.stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
type monitors map[schema.GroupVersionResource]*monitor
|
||||||
|
|
||||||
|
func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cache.Controller, error) {
|
||||||
|
// TODO: pass this down
|
||||||
|
clock := clock.RealClock{}
|
||||||
|
handlers := cache.ResourceEventHandlerFuncs{
|
||||||
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
|
// TODO: leaky abstraction! live w/ it for now, but should pass down an update filter func.
|
||||||
|
// we only want to queue the updates we care about though as too much noise will overwhelm queue.
|
||||||
|
notifyUpdate := false
|
||||||
|
switch resource.GroupResource() {
|
||||||
|
case schema.GroupResource{Resource: "pods"}:
|
||||||
|
oldPod := oldObj.(*v1.Pod)
|
||||||
|
newPod := newObj.(*v1.Pod)
|
||||||
|
notifyUpdate = core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock)
|
||||||
|
case schema.GroupResource{Resource: "services"}:
|
||||||
|
oldService := oldObj.(*v1.Service)
|
||||||
|
newService := newObj.(*v1.Service)
|
||||||
|
notifyUpdate = core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService)
|
||||||
|
}
|
||||||
|
if notifyUpdate {
|
||||||
|
event := &event{
|
||||||
|
eventType: updateEvent,
|
||||||
|
obj: newObj,
|
||||||
|
oldObj: oldObj,
|
||||||
|
gvr: resource,
|
||||||
|
}
|
||||||
|
qm.resourceChanges.Add(event)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
|
||||||
|
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||||
|
obj = deletedFinalStateUnknown.Obj
|
||||||
|
}
|
||||||
|
event := &event{
|
||||||
|
eventType: deleteEvent,
|
||||||
|
obj: obj,
|
||||||
|
gvr: resource,
|
||||||
|
}
|
||||||
|
qm.resourceChanges.Add(event)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
shared, err := qm.informerFactory.ForResource(resource)
|
||||||
|
if err == nil {
|
||||||
|
glog.V(4).Infof("QuotaMonitor using a shared informer for resource %q", resource.String())
|
||||||
|
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, qm.resyncPeriod())
|
||||||
|
return shared.Informer().GetController(), nil
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("QuotaMonitor unable to use a shared informer for resource %q: %v", resource.String(), err)
|
||||||
|
|
||||||
|
// TODO: if we can share storage with garbage collector, it may make sense to support other resources
|
||||||
|
// until that time, aggregated api servers will have to run their own controller to reconcile their own quota.
|
||||||
|
return nil, fmt.Errorf("unable to monitor quota for resource %q", resource.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncMonitors rebuilds the monitor set according to the supplied resources,
|
||||||
|
// creating or deleting monitors as necessary. It will return any error
|
||||||
|
// encountered, but will make an attempt to create a monitor for each resource
|
||||||
|
// instead of immediately exiting on an error. It may be called before or after
|
||||||
|
// Run. Monitors are NOT started as part of the sync. To ensure all existing
|
||||||
|
// monitors are started, call startMonitors.
|
||||||
|
func (qm *QuotaMonitor) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||||
|
qm.monitorLock.Lock()
|
||||||
|
defer qm.monitorLock.Unlock()
|
||||||
|
|
||||||
|
toRemove := qm.monitors
|
||||||
|
if toRemove == nil {
|
||||||
|
toRemove = monitors{}
|
||||||
|
}
|
||||||
|
current := monitors{}
|
||||||
|
errs := []error{}
|
||||||
|
kept := 0
|
||||||
|
added := 0
|
||||||
|
for resource := range resources {
|
||||||
|
if _, ok := qm.ignoredResources[resource.GroupResource()]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if m, ok := toRemove[resource]; ok {
|
||||||
|
current[resource] = m
|
||||||
|
delete(toRemove, resource)
|
||||||
|
kept++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
c, err := qm.controllerFor(resource)
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if we need to create an evaluator for this resource (if none previously registered)
|
||||||
|
evaluator := qm.registry.Get(resource.GroupResource())
|
||||||
|
if evaluator == nil {
|
||||||
|
listerFunc := generic.ListerFuncForResourceFunc(qm.informerFactory.ForResource)
|
||||||
|
listResourceFunc := generic.ListResourceUsingListerFunc(listerFunc, resource)
|
||||||
|
evaluator = generic.NewObjectCountEvaluator(false, resource.GroupResource(), listResourceFunc, "")
|
||||||
|
qm.registry.Add(evaluator)
|
||||||
|
glog.Infof("QuotaMonitor created object count evaluator for %s", resource.GroupResource())
|
||||||
|
}
|
||||||
|
|
||||||
|
// track the monitor
|
||||||
|
current[resource] = &monitor{controller: c}
|
||||||
|
added++
|
||||||
|
}
|
||||||
|
qm.monitors = current
|
||||||
|
|
||||||
|
for _, monitor := range toRemove {
|
||||||
|
if monitor.stopCh != nil {
|
||||||
|
close(monitor.stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof("quota synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
|
||||||
|
// NewAggregate returns nil if errs is 0-length
|
||||||
|
return utilerrors.NewAggregate(errs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// startMonitors ensures the current set of monitors are running. Any newly
|
||||||
|
// started monitors will also cause shared informers to be started.
|
||||||
|
//
|
||||||
|
// If called before Run, startMonitors does nothing (as there is no stop channel
|
||||||
|
// to support monitor/informer execution).
|
||||||
|
func (qm *QuotaMonitor) startMonitors() {
|
||||||
|
qm.monitorLock.Lock()
|
||||||
|
defer qm.monitorLock.Unlock()
|
||||||
|
|
||||||
|
if qm.stopCh == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// we're waiting until after the informer start that happens once all the controllers are initialized. This ensures
|
||||||
|
// that they don't get unexpected events on their work queues.
|
||||||
|
<-qm.informersStarted
|
||||||
|
|
||||||
|
monitors := qm.monitors
|
||||||
|
started := 0
|
||||||
|
for _, monitor := range monitors {
|
||||||
|
if monitor.stopCh == nil {
|
||||||
|
monitor.stopCh = make(chan struct{})
|
||||||
|
qm.informerFactory.Start(qm.stopCh)
|
||||||
|
go monitor.Run()
|
||||||
|
started++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("QuotaMonitor started %d new monitors, %d currently running", started, len(monitors))
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsSynced returns true if any monitors exist AND all those monitors'
|
||||||
|
// controllers HasSynced functions return true. This means IsSynced could return
|
||||||
|
// true at one time, and then later return false if all monitors were
|
||||||
|
// reconstructed.
|
||||||
|
func (qm *QuotaMonitor) IsSynced() bool {
|
||||||
|
qm.monitorLock.Lock()
|
||||||
|
defer qm.monitorLock.Unlock()
|
||||||
|
|
||||||
|
if len(qm.monitors) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, monitor := range qm.monitors {
|
||||||
|
if !monitor.controller.HasSynced() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run sets the stop channel and starts monitor execution until stopCh is
|
||||||
|
// closed. Any running monitors will be stopped before Run returns.
|
||||||
|
func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
|
||||||
|
glog.Infof("QuotaMonitor running")
|
||||||
|
defer glog.Infof("QuotaMonitor stopping")
|
||||||
|
|
||||||
|
// Set up the stop channel.
|
||||||
|
qm.monitorLock.Lock()
|
||||||
|
qm.stopCh = stopCh
|
||||||
|
qm.monitorLock.Unlock()
|
||||||
|
|
||||||
|
// Start monitors and begin change processing until the stop channel is
|
||||||
|
// closed.
|
||||||
|
qm.startMonitors()
|
||||||
|
wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh)
|
||||||
|
|
||||||
|
// Stop any running monitors.
|
||||||
|
qm.monitorLock.Lock()
|
||||||
|
defer qm.monitorLock.Unlock()
|
||||||
|
monitors := qm.monitors
|
||||||
|
stopped := 0
|
||||||
|
for _, monitor := range monitors {
|
||||||
|
if monitor.stopCh != nil {
|
||||||
|
stopped++
|
||||||
|
close(monitor.stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.Infof("QuotaMonitor stopped %d of %d monitors", stopped, len(monitors))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (qm *QuotaMonitor) runProcessResourceChanges() {
|
||||||
|
for qm.processResourceChanges() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dequeueing an event from resourceChanges to process
|
||||||
|
func (qm *QuotaMonitor) processResourceChanges() bool {
|
||||||
|
item, quit := qm.resourceChanges.Get()
|
||||||
|
if quit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer qm.resourceChanges.Done(item)
|
||||||
|
event, ok := item.(*event)
|
||||||
|
if !ok {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
obj := event.obj
|
||||||
|
accessor, err := meta.Accessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("QuotaMonitor process object: %s, namespace %s, name %s, uid %s, event type %v", event.gvr.String(), accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
|
||||||
|
qm.replenishmentFunc(event.gvr.GroupResource(), accessor.GetNamespace())
|
||||||
|
return true
|
||||||
|
}
|
Loading…
Reference in New Issue