Implement support for limited resources in quota

pull/6/head
Derek Carr 2017-02-18 12:10:22 -05:00
parent 8575978d7a
commit 3fad0cb52a
8 changed files with 548 additions and 31 deletions

View File

@ -260,6 +260,8 @@ plugin/pkg/admission/gc
plugin/pkg/admission/imagepolicy
plugin/pkg/admission/namespace/autoprovision
plugin/pkg/admission/namespace/exists
plugin/pkg/admission/resourcequota/apis/resourcequota/install
plugin/pkg/admission/resourcequota/apis/resourcequota/validation
plugin/pkg/admission/securitycontext/scdeny
plugin/pkg/auth
plugin/pkg/auth/authorizer

View File

@ -73,6 +73,7 @@ AUTH_ARGS=${AUTH_ARGS:-""}
KUBE_CACHE_MUTATION_DETECTOR="${KUBE_CACHE_MUTATION_DETECTOR:-true}"
export KUBE_CACHE_MUTATION_DETECTOR
ADMISSION_CONTROL_CONFIG_FILE=${ADMISSION_CONTROL_CONFIG_FILE:-""}
# START_MODE can be 'all', 'kubeletonly', or 'nokubelet'
START_MODE=${START_MODE:-"all"}
@ -431,6 +432,7 @@ function start_apiserver {
--service-account-key-file="${SERVICE_ACCOUNT_KEY}" \
--service-account-lookup="${SERVICE_ACCOUNT_LOOKUP}" \
--admission-control="${ADMISSION_CONTROL}" \
--admission-control-config-file="${ADMISSION_CONTROL_CONFIG_FILE}" \
--bind-address="${API_BIND_ADDR}" \
--secure-port="${API_SECURE_PORT}" \
--tls-cert-file="${CERT_DIR}/serving-kube-apiserver.crt" \

View File

@ -12,6 +12,7 @@ go_library(
name = "go_default_library",
srcs = [
"admission.go",
"config.go",
"controller.go",
"doc.go",
"resource_access.go",
@ -24,11 +25,19 @@ go_library(
"//pkg/quota:go_default_library",
"//pkg/quota/install:go_default_library",
"//pkg/util/workqueue/prometheus:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/install:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/v1alpha1:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/validation:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/hashicorp/golang-lru",
"//vendor:k8s.io/apimachinery/pkg/api/meta",
"//vendor:k8s.io/apimachinery/pkg/apimachinery/announced",
"//vendor:k8s.io/apimachinery/pkg/apimachinery/registered",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/runtime/serializer",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
@ -51,6 +60,7 @@ go_test(
"//pkg/quota:go_default_library",
"//pkg/quota/generic:go_default_library",
"//pkg/quota/install:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library",
"//vendor:github.com/hashicorp/golang-lru",
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
@ -71,6 +81,9 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//plugin/pkg/admission/resourcequota/apis/resourcequota:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -27,22 +27,35 @@ import (
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/install"
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/validation"
)
func init() {
admission.RegisterPlugin("ResourceQuota",
func(config io.Reader) (admission.Interface, error) {
// load the configuration provided (if any)
configuration, err := LoadConfiguration(config)
if err != nil {
return nil, err
}
// validate the configuration (if any)
if configuration != nil {
if errs := validation.ValidateConfiguration(configuration); len(errs) != 0 {
return nil, errs.ToAggregate()
}
}
// NOTE: we do not provide informers to the registry because admission level decisions
// does not require us to open watches for all items tracked by quota.
registry := install.NewRegistry(nil, nil)
return NewResourceQuota(registry, 5, make(chan struct{}))
return NewResourceQuota(registry, configuration, 5, make(chan struct{}))
})
}
// quotaAdmission implements an admission controller that can enforce quota constraints
type quotaAdmission struct {
*admission.Handler
config *resourcequotaapi.Configuration
stopCh <-chan struct{}
registry quota.Registry
numEvaluators int
@ -59,12 +72,13 @@ type liveLookupEntry struct {
// NewResourceQuota configures an admission controller that can enforce quota constraints
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func NewResourceQuota(registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) {
func NewResourceQuota(registry quota.Registry, config *resourcequotaapi.Configuration, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) {
return &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
stopCh: stopCh,
registry: registry,
numEvaluators: numEvaluators,
config: config,
}, nil
}
@ -77,7 +91,7 @@ func (a *quotaAdmission) SetInternalClientSet(client internalclientset.Interface
}
go quotaAccessor.Run(a.stopCh)
a.evaluator = NewQuotaEvaluator(quotaAccessor, a.registry, nil, a.numEvaluators, a.stopCh)
a.evaluator = NewQuotaEvaluator(quotaAccessor, a.registry, nil, a.config, a.numEvaluators, a.stopCh)
}
// Validate ensures an authorizer is set.
@ -89,11 +103,10 @@ func (a *quotaAdmission) Validate() error {
}
// Admit makes admission decisions while enforcing quota
func (q *quotaAdmission) Admit(a admission.Attributes) (err error) {
func (a *quotaAdmission) Admit(attr admission.Attributes) (err error) {
// ignore all operations that correspond to sub-resource actions
if a.GetSubresource() != "" {
if attr.GetSubresource() != "" {
return nil
}
return q.evaluator.Evaluate(a)
return a.evaluator.Evaluate(attr)
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/generic"
"k8s.io/kubernetes/pkg/quota/install"
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
)
func getResourceList(cpu, memory string) api.ResourceList {
@ -130,7 +131,8 @@ func TestAdmissionIgnoresDelete(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -164,7 +166,8 @@ func TestAdmissionIgnoresSubresources(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -207,7 +210,8 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -289,7 +293,8 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -385,7 +390,8 @@ func TestAdmitHandlesCreatingUpdates(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -478,7 +484,8 @@ func TestAdmitExceedQuotaLimit(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -521,7 +528,9 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -574,7 +583,8 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
quotaAccessor.indexer = indexer
quotaAccessor.liveLookupCache = liveLookupCache
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -639,7 +649,8 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -743,7 +754,8 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -834,7 +846,8 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -929,7 +942,8 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
evaluator.(*quotaEvaluator).registry = registry
handler := &quotaAdmission{
@ -974,7 +988,8 @@ func TestAdmitRejectsNegativeUsage(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -1019,7 +1034,8 @@ func TestAdmitWhenUnrelatedResourceExceedsQuota(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh)
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -1034,3 +1050,219 @@ func TestAdmitWhenUnrelatedResourceExceedsQuota(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
}
// TestAdmitLimitedResourceNoQuota verifies if a limited resource is configured with no quota, it cannot be consumed.
func TestAdmitLimitedResourceNoQuota(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
// disable consumption of cpu unless there is a covering quota.
config := &resourcequotaapi.Configuration{
LimitedResources: []resourcequotaapi.LimitedResource{
{
Resource: "pods",
MatchContains: []string{"cpu"},
},
},
}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
}
newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected an error for consuming a limited resource without quota.")
}
}
// TestAdmitLimitedResourceNoQuotaIgnoresNonMatchingResources shows it ignores non matching resources in config.
func TestAdmitLimitedResourceNoQuotaIgnoresNonMatchingResources(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
// disable consumption of cpu unless there is a covering quota.
config := &resourcequotaapi.Configuration{
LimitedResources: []resourcequotaapi.LimitedResource{
{
Resource: "services",
MatchContains: []string{"services"},
},
},
}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
}
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// TestAdmitLimitedResourceWithQuota verifies if a limited resource is configured with quota, it can be consumed.
func TestAdmitLimitedResourceWithQuota(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceRequestsCPU: resource.MustParse("10"),
},
Used: api.ResourceList{
api.ResourceRequestsCPU: resource.MustParse("1"),
},
},
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
// disable consumption of cpu unless there is a covering quota.
// disable consumption of cpu unless there is a covering quota.
config := &resourcequotaapi.Configuration{
LimitedResources: []resourcequotaapi.LimitedResource{
{
Resource: "pods",
MatchContains: []string{"requests.cpu"}, // match on "requests.cpu" only
},
},
}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
}
indexer.Add(resourceQuota)
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
// TestAdmitLimitedResourceWithMultipleQuota verifies if a limited resource is configured with quota, it can be consumed if one matches.
func TestAdmitLimitedResourceWithMultipleQuota(t *testing.T) {
resourceQuota1 := &api.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: "quota1", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceRequestsCPU: resource.MustParse("10"),
},
Used: api.ResourceList{
api.ResourceRequestsCPU: resource.MustParse("1"),
},
},
}
resourceQuota2 := &api.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: "quota2", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceMemory: resource.MustParse("10Gi"),
},
Used: api.ResourceList{
api.ResourceMemory: resource.MustParse("1Gi"),
},
},
}
kubeClient := fake.NewSimpleClientset(resourceQuota1, resourceQuota2)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
// disable consumption of cpu unless there is a covering quota.
// disable consumption of cpu unless there is a covering quota.
config := &resourcequotaapi.Configuration{
LimitedResources: []resourcequotaapi.LimitedResource{
{
Resource: "pods",
MatchContains: []string{"requests.cpu"}, // match on "requests.cpu" only
},
},
}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
}
indexer.Add(resourceQuota1)
indexer.Add(resourceQuota2)
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
// TestAdmitLimitedResourceWithQuotaThatDoesNotCover verifies if a limited resource is configured the quota must cover the resource.
func TestAdmitLimitedResourceWithQuotaThatDoesNotCover(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceMemory: resource.MustParse("10Gi"),
},
Used: api.ResourceList{
api.ResourceMemory: resource.MustParse("1Gi"),
},
},
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
// disable consumption of cpu unless there is a covering quota.
// disable consumption of cpu unless there is a covering quota.
config := &resourcequotaapi.Configuration{
LimitedResources: []resourcequotaapi.LimitedResource{
{
Resource: "pods",
MatchContains: []string{"cpu"}, // match on "cpu" only
},
},
}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
}
indexer.Add(resourceQuota)
newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
t.Fatalf("Expected an error since the quota did not cover cpu")
}
}

View File

@ -0,0 +1,72 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcequota
import (
"fmt"
"io"
"io/ioutil"
"os"
"k8s.io/apimachinery/pkg/apimachinery/announced"
"k8s.io/apimachinery/pkg/apimachinery/registered"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/install"
resourcequotav1alpha1 "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/v1alpha1"
)
var (
groupFactoryRegistry = make(announced.APIGroupFactoryRegistry)
registry = registered.NewOrDie(os.Getenv("KUBE_API_VERSIONS"))
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
func init() {
install.Install(groupFactoryRegistry, registry, scheme)
}
// LoadConfiguration loads the provided configuration.
func LoadConfiguration(config io.Reader) (*resourcequotaapi.Configuration, error) {
// if no config is provided, return a default configuration
if config == nil {
externalConfig := &resourcequotav1alpha1.Configuration{}
scheme.Default(externalConfig)
internalConfig := &resourcequotaapi.Configuration{}
if err := scheme.Convert(externalConfig, internalConfig, nil); err != nil {
return nil, err
}
return internalConfig, nil
}
// we have a config so parse it.
data, err := ioutil.ReadAll(config)
if err != nil {
return nil, err
}
decoder := codecs.UniversalDecoder()
decodedObj, err := runtime.Decode(decoder, data)
if err != nil {
return nil, err
}
resourceQuotaConfiguration, ok := decodedObj.(*resourcequotaapi.Configuration)
if !ok {
return nil, fmt.Errorf("unexpected type: %T", decodedObj)
}
return resourceQuotaConfiguration, nil
}

View File

@ -26,6 +26,7 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -34,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/quota"
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
)
// Evaluator is used to see if quota constraints are satisfied.
@ -65,6 +67,9 @@ type quotaEvaluator struct {
workers int
stopCh <-chan struct{}
init sync.Once
// lets us know what resources are limited by default
config *resourcequotaapi.Configuration
}
type admissionWaiter struct {
@ -79,6 +84,7 @@ func (defaultDeny) Error() string {
return "DEFAULT DENY"
}
// IsDefaultDeny returns true if the error is defaultDeny
func IsDefaultDeny(err error) bool {
if err == nil {
return false
@ -99,7 +105,11 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter {
// NewQuotaEvaluator configures an admission controller that can enforce quota constraints
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAcquisitionFunc func([]api.ResourceQuota) func(), workers int, stopCh <-chan struct{}) Evaluator {
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAcquisitionFunc func([]api.ResourceQuota) func(), config *resourcequotaapi.Configuration, workers int, stopCh <-chan struct{}) Evaluator {
// if we get a nil config, just create an empty default.
if config == nil {
config = &resourcequotaapi.Configuration{}
}
return &quotaEvaluator{
quotaAccessor: quotaAccessor,
lockAcquisitionFunc: lockAcquisitionFunc,
@ -113,6 +123,7 @@ func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, loc
workers: workers,
stopCh: stopCh,
config: config,
}
}
@ -166,7 +177,9 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis
}
return
}
if len(quotas) == 0 {
// if limited resources are disabled, we can just return safely when there are no quotas.
limitedResourcesDisabled := len(e.config.LimitedResources) == 0
if len(quotas) == 0 && limitedResourcesDisabled {
for _, admissionAttribute := range admissionAttributes {
admissionAttribute.result = nil
}
@ -316,6 +329,41 @@ func copyQuotas(in []api.ResourceQuota) ([]api.ResourceQuota, error) {
return out, nil
}
// filterLimitedResourcesByGroupResource filters the input that match the specified groupResource
func filterLimitedResourcesByGroupResource(input []resourcequotaapi.LimitedResource, groupResource schema.GroupResource) []resourcequotaapi.LimitedResource {
result := []resourcequotaapi.LimitedResource{}
for i := range input {
limitedResource := input[i]
limitedGroupResource := schema.GroupResource{Group: limitedResource.APIGroup, Resource: limitedResource.Resource}
if limitedGroupResource == groupResource {
result = append(result, limitedResource)
}
}
return result
}
// limitedByDefault determines from the specfified usage and limitedResources the set of resources names
// that must be present in a covering quota. It returns an error if it was unable to determine if
// a resource was not limited by default.
func limitedByDefault(usage api.ResourceList, limitedResources []resourcequotaapi.LimitedResource) []api.ResourceName {
result := []api.ResourceName{}
for _, limitedResource := range limitedResources {
for k, v := range usage {
// if a resource is consumed, we need to check if it matches on the limited resource list.
if v.Sign() == 1 {
// if we get a match, we add it to limited set
for _, matchContain := range limitedResource.MatchContains {
if strings.Contains(string(k), matchContain) {
result = append(result, k)
break
}
}
}
}
}
return result
}
// checkRequest verifies that the request does not exceed any quota constraint. it returns a copy of quotas not yet persisted
// that capture what the usage would be if the request succeeded. It return an error if the is insufficient quota to satisfy the request
func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.Attributes) ([]api.ResourceQuota, error) {
@ -331,12 +379,30 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
return quotas, nil
}
// if we have limited resources enabled for this resource, always calculate usage
inputObject := a.GetObject()
// determine the set of resource names that must exist in a covering quota
limitedResourceNames := []api.ResourceName{}
limitedResources := filterLimitedResourcesByGroupResource(e.config.LimitedResources, a.GetResource().GroupResource())
if len(limitedResources) > 0 {
deltaUsage, err := evaluator.Usage(inputObject)
if err != nil {
return quotas, err
}
limitedResourceNames = limitedByDefault(deltaUsage, limitedResources)
}
limitedResourceNamesSet := quota.ToSet(limitedResourceNames)
// find the set of quotas that are pertinent to this request
// reject if we match the quota, but usage is not calculated yet
// reject if the input object does not satisfy quota constraints
// if there are no pertinent quotas, we can just return
inputObject := a.GetObject()
interestingQuotaIndexes := []int{}
// track the cumulative set of resources that were required across all quotas
// this is needed to know if we have satisfied any constraints where consumption
// was limited by default.
restrictedResourcesSet := sets.String{}
for i := range quotas {
resourceQuota := quotas[i]
match, err := evaluator.Matches(&resourceQuota, inputObject)
@ -348,16 +414,26 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
}
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
requiredResources := evaluator.MatchingResources(hardResources)
if err := evaluator.Constraints(requiredResources, inputObject); err != nil {
restrictedResources := evaluator.MatchingResources(hardResources)
if err := evaluator.Constraints(restrictedResources, inputObject); err != nil {
return nil, admission.NewForbidden(a, fmt.Errorf("failed quota: %s: %v", resourceQuota.Name, err))
}
if !hasUsageStats(&resourceQuota) {
return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name))
}
interestingQuotaIndexes = append(interestingQuotaIndexes, i)
localRestrictedResourcesSet := quota.ToSet(restrictedResources)
restrictedResourcesSet.Insert(localRestrictedResourcesSet.List()...)
}
// verify that for every resource that had limited by default consumption
// enabled that there was a corresponding quota that covered its use.
// if not, we reject the request.
hasNoCoveringQuota := limitedResourceNamesSet.Difference(restrictedResourcesSet)
if len(hasNoCoveringQuota) > 0 {
return quotas, fmt.Errorf("insufficient quota to consume: %v", strings.Join(hasNoCoveringQuota.List(), ","))
}
if len(interestingQuotaIndexes) == 0 {
return quotas, nil
}

View File

@ -35,7 +35,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
@ -44,6 +44,7 @@ import (
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota"
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
"k8s.io/kubernetes/test/integration/framework"
)
@ -65,7 +66,8 @@ func TestQuota(t *testing.T) {
admissionCh := make(chan struct{})
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
internalClientset := internalclientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
admission, err := resourcequota.NewResourceQuota(quotainstall.NewRegistry(nil, nil), 5, admissionCh)
config := &resourcequotaapi.Configuration{}
admission, err := resourcequota.NewResourceQuota(quotainstall.NewRegistry(nil, nil), config, 5, admissionCh)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -226,3 +228,108 @@ func scale(t *testing.T, namespace string, clientset *clientset.Clientset) {
t.Fatalf("unexpected error: %v, ended with %v pods", err, len(pods.Items))
}
}
func TestQuotaLimitedResourceDenial(t *testing.T) {
// Set up a master
h := &framework.MasterHolder{Initialized: make(chan struct{})}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
<-h.Initialized
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
}))
defer s.Close()
admissionCh := make(chan struct{})
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
internalClientset := internalclientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
// stop creation of a pod resource unless there is a quota
config := &resourcequotaapi.Configuration{
LimitedResources: []resourcequotaapi.LimitedResource{
{
Resource: "pods",
MatchContains: []string{"pods"},
},
},
}
admission, err := resourcequota.NewResourceQuota(quotainstall.NewRegistry(nil, nil), config, 5, admissionCh)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
admission.(kubeadmission.WantsInternalClientSet).SetInternalClientSet(internalClientset)
defer close(admissionCh)
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.GenericConfig.AdmissionControl = admission
framework.RunAMasterUsingServer(masterConfig, s, h)
ns := framework.CreateTestingNamespace("quota", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
controllerCh := make(chan struct{})
defer close(controllerCh)
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
rm := replicationcontroller.NewReplicationManager(
informers.Core().V1().Pods(),
informers.Core().V1().ReplicationControllers(),
clientset,
replicationcontroller.BurstReplicas,
4096,
false,
)
rm.SetEventRecorder(&record.FakeRecorder{})
go rm.Run(3, controllerCh)
resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil)
groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: clientset,
ResourceQuotaInformer: informers.Core().V1().ResourceQuotas(),
ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: resourceQuotaRegistry,
GroupKindsToReplenish: groupKindsToReplenish,
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(informers),
}
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, controllerCh)
informers.Start(controllerCh)
// try to create a pod
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns.Name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "busybox",
},
},
},
}
if _, err := clientset.Core().Pods(ns.Name).Create(pod); err == nil {
t.Fatalf("expected error for insufficient quota")
}
// now create a covering quota
quota := &v1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{
Name: "quota",
Namespace: ns.Name,
},
Spec: v1.ResourceQuotaSpec{
Hard: v1.ResourceList{
v1.ResourcePods: resource.MustParse("1000"),
},
},
}
waitForQuota(t, quota, clientset)
if _, err := clientset.Core().Pods(ns.Name).Create(pod); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}