externalize quota admission controller

pull/58/head
yue9944882 2018-08-27 21:46:11 +08:00
parent 444373b404
commit b86e8f7631
5 changed files with 594 additions and 589 deletions

View File

@ -17,19 +17,16 @@ go_library(
],
importpath = "k8s.io/kubernetes/plugin/pkg/admission/resourcequota",
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/quota:go_default_library",
"//pkg/quota/generic:go_default_library",
"//pkg/quota/v1:go_default_library",
"//pkg/quota/v1/generic:go_default_library",
"//pkg/util/reflector/prometheus:go_default_library",
"//pkg/util/workqueue/prometheus:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/install:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/v1beta1:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/validation:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -41,7 +38,11 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/initializer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/hashicorp/golang-lru:go_default_library",
@ -54,17 +55,18 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/quota/generic:go_default_library",
"//pkg/quota/install:go_default_library",
"//pkg/quota/v1/generic:go_default_library",
"//pkg/quota/v1/install:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/hashicorp/golang-lru:go_default_library",

View File

@ -21,13 +21,14 @@ import (
"io"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/admission"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/generic"
quota "k8s.io/kubernetes/pkg/quota/v1"
"k8s.io/kubernetes/pkg/quota/v1/generic"
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/validation"
)
@ -65,12 +66,13 @@ type QuotaAdmission struct {
}
var _ admission.ValidationInterface = &QuotaAdmission{}
var _ = kubeapiserveradmission.WantsInternalKubeClientSet(&QuotaAdmission{})
var _ = genericadmissioninitializer.WantsExternalKubeInformerFactory(&QuotaAdmission{})
var _ = genericadmissioninitializer.WantsExternalKubeClientSet(&QuotaAdmission{})
var _ = kubeapiserveradmission.WantsQuotaConfiguration(&QuotaAdmission{})
type liveLookupEntry struct {
expiry time.Time
items []*api.ResourceQuota
items []*corev1.ResourceQuota
}
// NewResourceQuota configures an admission controller that can enforce quota constraints
@ -91,12 +93,12 @@ func NewResourceQuota(config *resourcequotaapi.Configuration, numEvaluators int,
}, nil
}
func (a *QuotaAdmission) SetInternalKubeClientSet(client internalclientset.Interface) {
func (a *QuotaAdmission) SetExternalKubeClientSet(client kubernetes.Interface) {
a.quotaAccessor.client = client
}
func (a *QuotaAdmission) SetInternalKubeInformerFactory(f informers.SharedInformerFactory) {
a.quotaAccessor.lister = f.Core().InternalVersion().ResourceQuotas().Lister()
func (a *QuotaAdmission) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) {
a.quotaAccessor.lister = f.Core().V1().ResourceQuotas().Lister()
}
func (a *QuotaAdmission) SetQuotaConfiguration(c quota.Configuration) {

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
@ -34,9 +35,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/util/workqueue"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/generic"
quota "k8s.io/kubernetes/pkg/quota/v1"
"k8s.io/kubernetes/pkg/quota/v1/generic"
_ "k8s.io/kubernetes/pkg/util/reflector/prometheus" // for reflector metric registration
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
@ -52,7 +52,7 @@ type Evaluator interface {
type quotaEvaluator struct {
quotaAccessor QuotaAccessor
// lockAcquisitionFunc acquires any required locks and returns a cleanup method to defer
lockAcquisitionFunc func([]api.ResourceQuota) func()
lockAcquisitionFunc func([]corev1.ResourceQuota) func()
ignoredResources map[schema.GroupResource]struct{}
@ -111,7 +111,7 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter {
// NewQuotaEvaluator configures an admission controller that can enforce quota constraints
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, ignoredResources map[schema.GroupResource]struct{}, quotaRegistry quota.Registry, lockAcquisitionFunc func([]api.ResourceQuota) func(), config *resourcequotaapi.Configuration, workers int, stopCh <-chan struct{}) Evaluator {
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, ignoredResources map[schema.GroupResource]struct{}, quotaRegistry quota.Registry, lockAcquisitionFunc func([]corev1.ResourceQuota) func(), config *resourcequotaapi.Configuration, workers int, stopCh <-chan struct{}) Evaluator {
// if we get a nil config, just create an empty default.
if config == nil {
config = &resourcequotaapi.Configuration{}
@ -214,7 +214,7 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis
// updates failed on conflict errors and we have retries left, re-get the failed quota from our cache for the latest version
// and recurse into this method with the subset. It's safe for us to evaluate ONLY the subset, because the other quota
// documents for these waiters have already been evaluated. Step 1, will mark all the ones that should already have succeeded.
func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttributes []*admissionWaiter, remainingRetries int) {
func (e *quotaEvaluator) checkQuotas(quotas []corev1.ResourceQuota, admissionAttributes []*admissionWaiter, remainingRetries int) {
// yet another copy to compare against originals to see if we actually have deltas
originalQuotas, err := copyQuotas(quotas)
if err != nil {
@ -264,7 +264,7 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
// 1. check to see if the quota changed. If not, skip.
// 2. if the quota changed and the update passes, be happy
// 3. if the quota changed and the update fails, add the original to a retry list
var updatedFailedQuotas []api.ResourceQuota
var updatedFailedQuotas []corev1.ResourceQuota
var lastErr error
for i := range quotas {
newQuota := quotas[i]
@ -318,7 +318,7 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
// this logic goes through our cache to find the new version of all quotas that failed update. If something has been removed
// it is skipped on this retry. After all, you removed it.
quotasToCheck := []api.ResourceQuota{}
quotasToCheck := []corev1.ResourceQuota{}
for _, newQuota := range newQuotas {
for _, oldQuota := range updatedFailedQuotas {
if newQuota.Name == oldQuota.Name {
@ -330,8 +330,8 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
e.checkQuotas(quotasToCheck, admissionAttributes, remainingRetries-1)
}
func copyQuotas(in []api.ResourceQuota) ([]api.ResourceQuota, error) {
out := make([]api.ResourceQuota, 0, len(in))
func copyQuotas(in []corev1.ResourceQuota) ([]corev1.ResourceQuota, error) {
out := make([]corev1.ResourceQuota, 0, len(in))
for _, quota := range in {
out = append(out, *quota.DeepCopy())
}
@ -355,8 +355,8 @@ func filterLimitedResourcesByGroupResource(input []resourcequotaapi.LimitedResou
// limitedByDefault determines from the specified usage and limitedResources the set of resources names
// that must be present in a covering quota. It returns empty set if it was unable to determine if
// a resource was not limited by default.
func limitedByDefault(usage api.ResourceList, limitedResources []resourcequotaapi.LimitedResource) []api.ResourceName {
result := []api.ResourceName{}
func limitedByDefault(usage corev1.ResourceList, limitedResources []resourcequotaapi.LimitedResource) []corev1.ResourceName {
result := []corev1.ResourceName{}
for _, limitedResource := range limitedResources {
for k, v := range usage {
// if a resource is consumed, we need to check if it matches on the limited resource list.
@ -374,13 +374,13 @@ func limitedByDefault(usage api.ResourceList, limitedResources []resourcequotaap
return result
}
func getMatchedLimitedScopes(evaluator quota.Evaluator, inputObject runtime.Object, limitedResources []resourcequotaapi.LimitedResource) ([]api.ScopedResourceSelectorRequirement, error) {
scopes := []api.ScopedResourceSelectorRequirement{}
func getMatchedLimitedScopes(evaluator quota.Evaluator, inputObject runtime.Object, limitedResources []resourcequotaapi.LimitedResource) ([]corev1.ScopedResourceSelectorRequirement, error) {
scopes := []corev1.ScopedResourceSelectorRequirement{}
for _, limitedResource := range limitedResources {
matched, err := evaluator.MatchingScopes(inputObject, limitedResource.MatchScopes)
if err != nil {
glog.Errorf("Error while matching limited Scopes: %v", err)
return []api.ScopedResourceSelectorRequirement{}, err
return []corev1.ScopedResourceSelectorRequirement{}, err
}
for _, scope := range matched {
scopes = append(scopes, scope)
@ -391,7 +391,7 @@ func getMatchedLimitedScopes(evaluator quota.Evaluator, inputObject runtime.Obje
// checkRequest verifies that the request does not exceed any quota constraint. it returns a copy of quotas not yet persisted
// that capture what the usage would be if the request succeeded. It return an error if there is insufficient quota to satisfy the request
func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.Attributes) ([]api.ResourceQuota, error) {
func (e *quotaEvaluator) checkRequest(quotas []corev1.ResourceQuota, a admission.Attributes) ([]corev1.ResourceQuota, error) {
evaluator := e.registry.Get(a.GetResource().GroupResource())
if evaluator == nil {
return quotas, nil
@ -400,8 +400,8 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
}
// CheckRequest is a static version of quotaEvaluator.checkRequest, possible to be called from outside.
func CheckRequest(quotas []api.ResourceQuota, a admission.Attributes, evaluator quota.Evaluator,
limited []resourcequotaapi.LimitedResource) ([]api.ResourceQuota, error) {
func CheckRequest(quotas []corev1.ResourceQuota, a admission.Attributes, evaluator quota.Evaluator,
limited []resourcequotaapi.LimitedResource) ([]corev1.ResourceQuota, error) {
if !evaluator.Handles(a) {
return quotas, nil
}
@ -416,7 +416,7 @@ func CheckRequest(quotas []api.ResourceQuota, a admission.Attributes, evaluator
}
// determine the set of resource names that must exist in a covering quota
limitedResourceNames := []api.ResourceName{}
limitedResourceNames := []corev1.ResourceName{}
limitedResources := filterLimitedResourcesByGroupResource(limited, a.GetResource().GroupResource())
if len(limitedResources) > 0 {
deltaUsage, err := evaluator.Usage(inputObject)
@ -436,7 +436,7 @@ func CheckRequest(quotas []api.ResourceQuota, a admission.Attributes, evaluator
// this is needed to know if we have satisfied any constraints where consumption
// was limited by default.
restrictedResourcesSet := sets.String{}
restrictedScopes := []api.ScopedResourceSelectorRequirement{}
restrictedScopes := []corev1.ScopedResourceSelectorRequirement{}
for i := range quotas {
resourceQuota := quotas[i]
scopeSelectors := getScopeSelectorsFromQuota(resourceQuota)
@ -571,12 +571,12 @@ func CheckRequest(quotas []api.ResourceQuota, a admission.Attributes, evaluator
return outQuotas, nil
}
func getScopeSelectorsFromQuota(quota api.ResourceQuota) []api.ScopedResourceSelectorRequirement {
selectors := []api.ScopedResourceSelectorRequirement{}
func getScopeSelectorsFromQuota(quota corev1.ResourceQuota) []corev1.ScopedResourceSelectorRequirement {
selectors := []corev1.ScopedResourceSelectorRequirement{}
for _, scope := range quota.Spec.Scopes {
selectors = append(selectors, api.ScopedResourceSelectorRequirement{
selectors = append(selectors, corev1.ScopedResourceSelectorRequirement{
ScopeName: scope,
Operator: api.ScopeSelectorOpExists})
Operator: corev1.ScopeSelectorOpExists})
}
if quota.Spec.ScopeSelector != nil {
for _, scopeSelector := range quota.Spec.ScopeSelector.MatchExpressions {
@ -680,7 +680,7 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
// prettyPrint formats a resource list for usage in errors
// it outputs resources sorted in increasing order
func prettyPrint(item api.ResourceList) string {
func prettyPrint(item corev1.ResourceList) string {
parts := []string{}
keys := []string{}
for key := range item {
@ -688,14 +688,14 @@ func prettyPrint(item api.ResourceList) string {
}
sort.Strings(keys)
for _, key := range keys {
value := item[api.ResourceName(key)]
value := item[corev1.ResourceName(key)]
constraint := key + "=" + value.String()
parts = append(parts, constraint)
}
return strings.Join(parts, ",")
}
func prettyPrintResourceNames(a []api.ResourceName) string {
func prettyPrintResourceNames(a []corev1.ResourceName) string {
values := []string{}
for _, value := range a {
values = append(values, string(value))
@ -705,7 +705,7 @@ func prettyPrintResourceNames(a []api.ResourceName) string {
}
// hasUsageStats returns true if for each hard constraint there is a value for its current usage
func hasUsageStats(resourceQuota *api.ResourceQuota) bool {
func hasUsageStats(resourceQuota *corev1.ResourceQuota) bool {
for resourceName := range resourceQuota.Status.Hard {
if _, found := resourceQuota.Status.Used[resourceName]; !found {
return false

View File

@ -20,14 +20,14 @@ import (
"fmt"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apiserver/pkg/storage/etcd"
api "k8s.io/kubernetes/pkg/apis/core"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
)
// QuotaAccessor abstracts the get/set logic from the rest of the Evaluator. This could be a test stub, a straight passthrough,
@ -35,17 +35,17 @@ import (
type QuotaAccessor interface {
// UpdateQuotaStatus is called to persist final status. This method should write to persistent storage.
// An error indicates that write didn't complete successfully.
UpdateQuotaStatus(newQuota *api.ResourceQuota) error
UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error
// GetQuotas gets all possible quotas for a given namespace
GetQuotas(namespace string) ([]api.ResourceQuota, error)
GetQuotas(namespace string) ([]corev1.ResourceQuota, error)
}
type quotaAccessor struct {
client clientset.Interface
client kubernetes.Interface
// lister can list/get quota objects from a shared informer's cache
lister corelisters.ResourceQuotaLister
lister corev1listers.ResourceQuotaLister
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
// This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
@ -77,8 +77,8 @@ func newQuotaAccessor() (*quotaAccessor, error) {
}, nil
}
func (e *quotaAccessor) UpdateQuotaStatus(newQuota *api.ResourceQuota) error {
updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(newQuota)
func (e *quotaAccessor) UpdateQuotaStatus(newQuota *corev1.ResourceQuota) error {
updatedQuota, err := e.client.CoreV1().ResourceQuotas(newQuota.Namespace).UpdateStatus(newQuota)
if err != nil {
return err
}
@ -93,13 +93,13 @@ var etcdVersioner = etcd.APIObjectVersioner{}
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
// being monotonically increasing integers
func (e *quotaAccessor) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
func (e *quotaAccessor) checkCache(quota *corev1.ResourceQuota) *corev1.ResourceQuota {
key := quota.Namespace + "/" + quota.Name
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
if !ok {
return quota
}
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)
cachedQuota := uncastCachedQuota.(*corev1.ResourceQuota)
if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
e.updatedQuotas.Remove(key)
@ -108,7 +108,7 @@ func (e *quotaAccessor) checkCache(quota *api.ResourceQuota) *api.ResourceQuota
return cachedQuota
}
func (e *quotaAccessor) GetQuotas(namespace string) ([]api.ResourceQuota, error) {
func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, error) {
// determine if there are any quotas in this namespace
// if there are no quotas, we don't need to do anything
items, err := e.lister.ResourceQuotas(namespace).List(labels.Everything())
@ -142,7 +142,7 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]api.ResourceQuota, error)
}
}
resourceQuotas := []api.ResourceQuota{}
resourceQuotas := []corev1.ResourceQuota{}
for i := range items {
quota := items[i]
quota = e.checkCache(quota)