Implement support for updating resources

pull/6/head
Hemant Kumar 2017-09-13 18:14:27 -04:00
parent 6f242f6878
commit 066fcf785e
4 changed files with 204 additions and 3 deletions

View File

@ -124,6 +124,32 @@ func Add(a api.ResourceList, b api.ResourceList) api.ResourceList {
return result
}
// SubtractWithNonNegativeResult - substracts and returns result of a - b but
// makes sure we don't return negative values to prevent negative resource usage.
func SubtractWithNonNegativeResult(a api.ResourceList, b api.ResourceList) api.ResourceList {
zero := resource.MustParse("0")
result := api.ResourceList{}
for key, value := range a {
quantity := *value.Copy()
if other, found := b[key]; found {
quantity.Sub(other)
}
if quantity.Cmp(zero) > 0 {
result[key] = quantity
} else {
result[key] = zero
}
}
for key := range b {
if _, found := result[key]; !found {
result[key] = zero
}
}
return result
}
// Subtract returns the result of a - b for each named resource
func Subtract(a api.ResourceList, b api.ResourceList) api.ResourceList {
result := api.ResourceList{}

View File

@ -66,6 +66,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
utilfeature "k8s.io/apiserver/pkg/util/feature"
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
@ -340,6 +341,9 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
decimatedActions := removeListWatch(kubeClient.Actions())
lastActionIndex := len(decimatedActions) - 1
usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
// Verify service usage. Since we don't add negative values, the api.ResourceServicesLoadBalancers
// will remain on last reported value
expectedUsage := api.ResourceQuota{
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
@ -349,7 +353,7 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
},
Used: api.ResourceList{
api.ResourceServices: resource.MustParse("1"),
api.ResourceServicesLoadBalancers: resource.MustParse("0"),
api.ResourceServicesLoadBalancers: resource.MustParse("1"),
api.ResourceServicesNodePorts: resource.MustParse("1"),
},
},
@ -364,6 +368,176 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
}
}
func TestAdmitHandlesNegativePVCUpdates(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourcePersistentVolumeClaims: resource.MustParse("3"),
api.ResourceRequestsStorage: resource.MustParse("100Gi"),
},
Used: api.ResourceList{
api.ResourcePersistentVolumeClaims: resource.MustParse("1"),
api.ResourceRequestsStorage: resource.MustParse("10Gi"),
},
},
}
// start up quota system
stopCh := make(chan struct{})
defer close(stopCh)
err := utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=true")
if err != nil {
t.Errorf("Failed to enable feature gate for LocalPersistentVolumes: %v", err)
return
}
defer func() {
utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=false")
}()
kubeClient := fake.NewSimpleClientset(resourceQuota)
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
quotaAccessor, _ := newQuotaAccessor()
quotaAccessor.client = kubeClient
quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister()
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
}
informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota)
oldPVC := &api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test", ResourceVersion: "1"},
Spec: api.PersistentVolumeClaimSpec{
Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("10Gi")}, api.ResourceList{}),
},
}
newPVC := &api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test"},
Spec: api.PersistentVolumeClaimSpec{
Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("5Gi")}, api.ResourceList{}),
},
}
err = handler.Admit(admission.NewAttributesRecord(newPVC, oldPVC, api.Kind("PersistentVolumeClaim").WithVersion("version"), newPVC.Namespace, newPVC.Name, api.Resource("persistentvolumeclaims").WithVersion("version"), "", admission.Update, nil))
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(kubeClient.Actions()) != 0 {
t.Errorf("No client action should be taken in case of negative updates")
}
}
func TestAdmitHandlesPVCUpdates(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourcePersistentVolumeClaims: resource.MustParse("3"),
api.ResourceRequestsStorage: resource.MustParse("100Gi"),
},
Used: api.ResourceList{
api.ResourcePersistentVolumeClaims: resource.MustParse("1"),
api.ResourceRequestsStorage: resource.MustParse("10Gi"),
},
},
}
err := utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=true")
if err != nil {
t.Errorf("Failed to enable feature gate for LocalPersistentVolumes: %v", err)
return
}
defer func() {
utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=false")
}()
// start up quota system
stopCh := make(chan struct{})
defer close(stopCh)
kubeClient := fake.NewSimpleClientset(resourceQuota)
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
quotaAccessor, _ := newQuotaAccessor()
quotaAccessor.client = kubeClient
quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister()
config := &resourcequotaapi.Configuration{}
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
}
informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota)
oldPVC := &api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test", ResourceVersion: "1"},
Spec: api.PersistentVolumeClaimSpec{
Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("10Gi")}, api.ResourceList{}),
},
}
newPVC := &api.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test"},
Spec: api.PersistentVolumeClaimSpec{
Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("15Gi")}, api.ResourceList{}),
},
}
err = handler.Admit(admission.NewAttributesRecord(newPVC, oldPVC, api.Kind("PersistentVolumeClaim").WithVersion("version"), newPVC.Namespace, newPVC.Name, api.Resource("persistentvolumeclaims").WithVersion("version"), "", admission.Update, nil))
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(kubeClient.Actions()) == 0 {
t.Errorf("Expected a client action")
}
// the only action should have been to update the quota (since we should not have fetched the previous item)
expectedActionSet := sets.NewString(
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
)
actionSet := sets.NewString()
for _, action := range kubeClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
decimatedActions := removeListWatch(kubeClient.Actions())
lastActionIndex := len(decimatedActions) - 1
usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
expectedUsage := api.ResourceQuota{
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourcePersistentVolumeClaims: resource.MustParse("3"),
api.ResourceRequestsStorage: resource.MustParse("100Gi"),
},
Used: api.ResourceList{
api.ResourcePersistentVolumeClaims: resource.MustParse("1"),
api.ResourceRequestsStorage: resource.MustParse("15Gi"),
},
},
}
for k, v := range expectedUsage.Status.Used {
actual := usage.Status.Used[k]
actualValue := actual.String()
expectedValue := v.String()
if expectedValue != actualValue {
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
}
}
}
// TestAdmitHandlesCreatingUpdates verifies that admit handles updates which behave as creates
func TestAdmitHandlesCreatingUpdates(t *testing.T) {
// in this scenario, there is an existing service

View File

@ -443,7 +443,6 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
accessor.SetNamespace(namespace)
}
}
// there is at least one quota that definitely matches our object
// as a result, we need to measure the usage of this object for quota
// on updates, we need to subtract the previous measured usage
@ -472,9 +471,10 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
if innerErr != nil {
return quotas, innerErr
}
deltaUsage = quota.Subtract(deltaUsage, prevUsage)
deltaUsage = quota.SubtractWithNonNegativeResult(deltaUsage, prevUsage)
}
}
if quota.IsZero(deltaUsage) {
return quotas, nil
}