From 066fcf785e2d70af581cf7a63042ec772a0df6f4 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 13 Sep 2017 18:14:27 -0400 Subject: [PATCH] Implement support for updating resources --- pkg/quota/resources.go | 26 +++ plugin/pkg/admission/resourcequota/BUILD | 1 + .../admission/resourcequota/admission_test.go | 176 +++++++++++++++++- .../pkg/admission/resourcequota/controller.go | 4 +- 4 files changed, 204 insertions(+), 3 deletions(-) diff --git a/pkg/quota/resources.go b/pkg/quota/resources.go index aaf56b72a6..b9bd39bc83 100644 --- a/pkg/quota/resources.go +++ b/pkg/quota/resources.go @@ -124,6 +124,32 @@ func Add(a api.ResourceList, b api.ResourceList) api.ResourceList { return result } +// SubtractWithNonNegativeResult - substracts and returns result of a - b but +// makes sure we don't return negative values to prevent negative resource usage. +func SubtractWithNonNegativeResult(a api.ResourceList, b api.ResourceList) api.ResourceList { + zero := resource.MustParse("0") + + result := api.ResourceList{} + for key, value := range a { + quantity := *value.Copy() + if other, found := b[key]; found { + quantity.Sub(other) + } + if quantity.Cmp(zero) > 0 { + result[key] = quantity + } else { + result[key] = zero + } + } + + for key := range b { + if _, found := result[key]; !found { + result[key] = zero + } + } + return result +} + // Subtract returns the result of a - b for each named resource func Subtract(a api.ResourceList, b api.ResourceList) api.ResourceList { result := api.ResourceList{} diff --git a/plugin/pkg/admission/resourcequota/BUILD b/plugin/pkg/admission/resourcequota/BUILD index 44fbd1a74c..256886eadd 100644 --- a/plugin/pkg/admission/resourcequota/BUILD +++ b/plugin/pkg/admission/resourcequota/BUILD @@ -66,6 +66,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 4fa876c01a..0d197c1238 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" + utilfeature "k8s.io/apiserver/pkg/util/feature" testcore "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -340,6 +341,9 @@ func TestAdmitHandlesOldObjects(t *testing.T) { decimatedActions := removeListWatch(kubeClient.Actions()) lastActionIndex := len(decimatedActions) - 1 usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota) + + // Verify service usage. Since we don't add negative values, the api.ResourceServicesLoadBalancers + // will remain on last reported value expectedUsage := api.ResourceQuota{ Status: api.ResourceQuotaStatus{ Hard: api.ResourceList{ @@ -349,7 +353,7 @@ func TestAdmitHandlesOldObjects(t *testing.T) { }, Used: api.ResourceList{ api.ResourceServices: resource.MustParse("1"), - api.ResourceServicesLoadBalancers: resource.MustParse("0"), + api.ResourceServicesLoadBalancers: resource.MustParse("1"), api.ResourceServicesNodePorts: resource.MustParse("1"), }, }, @@ -364,6 +368,176 @@ func TestAdmitHandlesOldObjects(t *testing.T) { } } +func TestAdmitHandlesNegativePVCUpdates(t *testing.T) { + resourceQuota := &api.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"}, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourcePersistentVolumeClaims: resource.MustParse("3"), + api.ResourceRequestsStorage: resource.MustParse("100Gi"), + }, + Used: api.ResourceList{ + api.ResourcePersistentVolumeClaims: resource.MustParse("1"), + api.ResourceRequestsStorage: resource.MustParse("10Gi"), + }, + }, + } + + // start up quota system + stopCh := make(chan struct{}) + defer close(stopCh) + + err := utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=true") + if err != nil { + t.Errorf("Failed to enable feature gate for LocalPersistentVolumes: %v", err) + return + } + + defer func() { + utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=false") + }() + + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) + + handler := "aAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, + } + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) + + oldPVC := &api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test", ResourceVersion: "1"}, + Spec: api.PersistentVolumeClaimSpec{ + Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("10Gi")}, api.ResourceList{}), + }, + } + + newPVC := &api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test"}, + Spec: api.PersistentVolumeClaimSpec{ + Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("5Gi")}, api.ResourceList{}), + }, + } + + err = handler.Admit(admission.NewAttributesRecord(newPVC, oldPVC, api.Kind("PersistentVolumeClaim").WithVersion("version"), newPVC.Namespace, newPVC.Name, api.Resource("persistentvolumeclaims").WithVersion("version"), "", admission.Update, nil)) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if len(kubeClient.Actions()) != 0 { + t.Errorf("No client action should be taken in case of negative updates") + } +} + +func TestAdmitHandlesPVCUpdates(t *testing.T) { + resourceQuota := &api.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"}, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourcePersistentVolumeClaims: resource.MustParse("3"), + api.ResourceRequestsStorage: resource.MustParse("100Gi"), + }, + Used: api.ResourceList{ + api.ResourcePersistentVolumeClaims: resource.MustParse("1"), + api.ResourceRequestsStorage: resource.MustParse("10Gi"), + }, + }, + } + + err := utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=true") + if err != nil { + t.Errorf("Failed to enable feature gate for LocalPersistentVolumes: %v", err) + return + } + + defer func() { + utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=false") + }() + + // start up quota system + stopCh := make(chan struct{}) + defer close(stopCh) + + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) + + handler := "aAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, + } + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) + + oldPVC := &api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test", ResourceVersion: "1"}, + Spec: api.PersistentVolumeClaimSpec{ + Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("10Gi")}, api.ResourceList{}), + }, + } + + newPVC := &api.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test"}, + Spec: api.PersistentVolumeClaimSpec{ + Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("15Gi")}, api.ResourceList{}), + }, + } + + err = handler.Admit(admission.NewAttributesRecord(newPVC, oldPVC, api.Kind("PersistentVolumeClaim").WithVersion("version"), newPVC.Namespace, newPVC.Name, api.Resource("persistentvolumeclaims").WithVersion("version"), "", admission.Update, nil)) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if len(kubeClient.Actions()) == 0 { + t.Errorf("Expected a client action") + } + + // the only action should have been to update the quota (since we should not have fetched the previous item) + expectedActionSet := sets.NewString( + strings.Join([]string{"update", "resourcequotas", "status"}, "-"), + ) + actionSet := sets.NewString() + for _, action := range kubeClient.Actions() { + actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-")) + } + if !actionSet.HasAll(expectedActionSet.List()...) { + t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) + } + + decimatedActions := removeListWatch(kubeClient.Actions()) + lastActionIndex := len(decimatedActions) - 1 + usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota) + expectedUsage := api.ResourceQuota{ + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourcePersistentVolumeClaims: resource.MustParse("3"), + api.ResourceRequestsStorage: resource.MustParse("100Gi"), + }, + Used: api.ResourceList{ + api.ResourcePersistentVolumeClaims: resource.MustParse("1"), + api.ResourceRequestsStorage: resource.MustParse("15Gi"), + }, + }, + } + for k, v := range expectedUsage.Status.Used { + actual := usage.Status.Used[k] + actualValue := actual.String() + expectedValue := v.String() + if expectedValue != actualValue { + t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue) + } + } + +} + // TestAdmitHandlesCreatingUpdates verifies that admit handles updates which behave as creates func TestAdmitHandlesCreatingUpdates(t *testing.T) { // in this scenario, there is an existing service diff --git a/plugin/pkg/admission/resourcequota/controller.go b/plugin/pkg/admission/resourcequota/controller.go index 689ed4d177..3650f2d39e 100644 --- a/plugin/pkg/admission/resourcequota/controller.go +++ b/plugin/pkg/admission/resourcequota/controller.go @@ -443,7 +443,6 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At accessor.SetNamespace(namespace) } } - // there is at least one quota that definitely matches our object // as a result, we need to measure the usage of this object for quota // on updates, we need to subtract the previous measured usage @@ -472,9 +471,10 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At if innerErr != nil { return quotas, innerErr } - deltaUsage = quota.Subtract(deltaUsage, prevUsage) + deltaUsage = quota.SubtractWithNonNegativeResult(deltaUsage, prevUsage) } } + if quota.IsZero(deltaUsage) { return quotas, nil }