mirror of https://github.com/k3s-io/k3s
Merge pull request #52452 from gnufied/fix-quota-on-update
Automatic merge from submit-queue (batch tested with PRs 52452, 52115, 52260, 52290) Fix support for updating quota on update This PR implements support for properly handling quota when resources are updated. We never take negative values and add them up. Fixes https://github.com/kubernetes/kubernetes/issues/51736 cc @derekwaynecarr /sig storage ```release-note Make sure that resources being updated are handled correctly by Quota system ```pull/6/head
commit
935726f109
|
@ -124,6 +124,32 @@ func Add(a api.ResourceList, b api.ResourceList) api.ResourceList {
|
|||
return result
|
||||
}
|
||||
|
||||
// SubtractWithNonNegativeResult - substracts and returns result of a - b but
|
||||
// makes sure we don't return negative values to prevent negative resource usage.
|
||||
func SubtractWithNonNegativeResult(a api.ResourceList, b api.ResourceList) api.ResourceList {
|
||||
zero := resource.MustParse("0")
|
||||
|
||||
result := api.ResourceList{}
|
||||
for key, value := range a {
|
||||
quantity := *value.Copy()
|
||||
if other, found := b[key]; found {
|
||||
quantity.Sub(other)
|
||||
}
|
||||
if quantity.Cmp(zero) > 0 {
|
||||
result[key] = quantity
|
||||
} else {
|
||||
result[key] = zero
|
||||
}
|
||||
}
|
||||
|
||||
for key := range b {
|
||||
if _, found := result[key]; !found {
|
||||
result[key] = zero
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Subtract returns the result of a - b for each named resource
|
||||
func Subtract(a api.ResourceList, b api.ResourceList) api.ResourceList {
|
||||
result := api.ResourceList{}
|
||||
|
|
|
@ -66,6 +66,7 @@ go_test(
|
|||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
],
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
testcore "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -340,6 +341,9 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
|
|||
decimatedActions := removeListWatch(kubeClient.Actions())
|
||||
lastActionIndex := len(decimatedActions) - 1
|
||||
usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
|
||||
|
||||
// Verify service usage. Since we don't add negative values, the api.ResourceServicesLoadBalancers
|
||||
// will remain on last reported value
|
||||
expectedUsage := api.ResourceQuota{
|
||||
Status: api.ResourceQuotaStatus{
|
||||
Hard: api.ResourceList{
|
||||
|
@ -349,7 +353,7 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
|
|||
},
|
||||
Used: api.ResourceList{
|
||||
api.ResourceServices: resource.MustParse("1"),
|
||||
api.ResourceServicesLoadBalancers: resource.MustParse("0"),
|
||||
api.ResourceServicesLoadBalancers: resource.MustParse("1"),
|
||||
api.ResourceServicesNodePorts: resource.MustParse("1"),
|
||||
},
|
||||
},
|
||||
|
@ -364,6 +368,176 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAdmitHandlesNegativePVCUpdates(t *testing.T) {
|
||||
resourceQuota := &api.ResourceQuota{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
|
||||
Status: api.ResourceQuotaStatus{
|
||||
Hard: api.ResourceList{
|
||||
api.ResourcePersistentVolumeClaims: resource.MustParse("3"),
|
||||
api.ResourceRequestsStorage: resource.MustParse("100Gi"),
|
||||
},
|
||||
Used: api.ResourceList{
|
||||
api.ResourcePersistentVolumeClaims: resource.MustParse("1"),
|
||||
api.ResourceRequestsStorage: resource.MustParse("10Gi"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// start up quota system
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
err := utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=true")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to enable feature gate for LocalPersistentVolumes: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=false")
|
||||
}()
|
||||
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||
quotaAccessor, _ := newQuotaAccessor()
|
||||
quotaAccessor.client = kubeClient
|
||||
quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister()
|
||||
config := &resourcequotaapi.Configuration{}
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
|
||||
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota)
|
||||
|
||||
oldPVC := &api.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test", ResourceVersion: "1"},
|
||||
Spec: api.PersistentVolumeClaimSpec{
|
||||
Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("10Gi")}, api.ResourceList{}),
|
||||
},
|
||||
}
|
||||
|
||||
newPVC := &api.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test"},
|
||||
Spec: api.PersistentVolumeClaimSpec{
|
||||
Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("5Gi")}, api.ResourceList{}),
|
||||
},
|
||||
}
|
||||
|
||||
err = handler.Admit(admission.NewAttributesRecord(newPVC, oldPVC, api.Kind("PersistentVolumeClaim").WithVersion("version"), newPVC.Namespace, newPVC.Name, api.Resource("persistentvolumeclaims").WithVersion("version"), "", admission.Update, nil))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if len(kubeClient.Actions()) != 0 {
|
||||
t.Errorf("No client action should be taken in case of negative updates")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdmitHandlesPVCUpdates(t *testing.T) {
|
||||
resourceQuota := &api.ResourceQuota{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
|
||||
Status: api.ResourceQuotaStatus{
|
||||
Hard: api.ResourceList{
|
||||
api.ResourcePersistentVolumeClaims: resource.MustParse("3"),
|
||||
api.ResourceRequestsStorage: resource.MustParse("100Gi"),
|
||||
},
|
||||
Used: api.ResourceList{
|
||||
api.ResourcePersistentVolumeClaims: resource.MustParse("1"),
|
||||
api.ResourceRequestsStorage: resource.MustParse("10Gi"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=true")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to enable feature gate for LocalPersistentVolumes: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
utilfeature.DefaultFeatureGate.Set("ExpandPersistentVolumes=false")
|
||||
}()
|
||||
|
||||
// start up quota system
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||
quotaAccessor, _ := newQuotaAccessor()
|
||||
quotaAccessor.client = kubeClient
|
||||
quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister()
|
||||
config := &resourcequotaapi.Configuration{}
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh)
|
||||
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota)
|
||||
|
||||
oldPVC := &api.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test", ResourceVersion: "1"},
|
||||
Spec: api.PersistentVolumeClaimSpec{
|
||||
Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("10Gi")}, api.ResourceList{}),
|
||||
},
|
||||
}
|
||||
|
||||
newPVC := &api.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "pvc-to-update", Namespace: "test"},
|
||||
Spec: api.PersistentVolumeClaimSpec{
|
||||
Resources: getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("15Gi")}, api.ResourceList{}),
|
||||
},
|
||||
}
|
||||
|
||||
err = handler.Admit(admission.NewAttributesRecord(newPVC, oldPVC, api.Kind("PersistentVolumeClaim").WithVersion("version"), newPVC.Namespace, newPVC.Name, api.Resource("persistentvolumeclaims").WithVersion("version"), "", admission.Update, nil))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(kubeClient.Actions()) == 0 {
|
||||
t.Errorf("Expected a client action")
|
||||
}
|
||||
|
||||
// the only action should have been to update the quota (since we should not have fetched the previous item)
|
||||
expectedActionSet := sets.NewString(
|
||||
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
|
||||
)
|
||||
actionSet := sets.NewString()
|
||||
for _, action := range kubeClient.Actions() {
|
||||
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource().Resource, action.GetSubresource()}, "-"))
|
||||
}
|
||||
if !actionSet.HasAll(expectedActionSet.List()...) {
|
||||
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
|
||||
}
|
||||
|
||||
decimatedActions := removeListWatch(kubeClient.Actions())
|
||||
lastActionIndex := len(decimatedActions) - 1
|
||||
usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
|
||||
expectedUsage := api.ResourceQuota{
|
||||
Status: api.ResourceQuotaStatus{
|
||||
Hard: api.ResourceList{
|
||||
api.ResourcePersistentVolumeClaims: resource.MustParse("3"),
|
||||
api.ResourceRequestsStorage: resource.MustParse("100Gi"),
|
||||
},
|
||||
Used: api.ResourceList{
|
||||
api.ResourcePersistentVolumeClaims: resource.MustParse("1"),
|
||||
api.ResourceRequestsStorage: resource.MustParse("15Gi"),
|
||||
},
|
||||
},
|
||||
}
|
||||
for k, v := range expectedUsage.Status.Used {
|
||||
actual := usage.Status.Used[k]
|
||||
actualValue := actual.String()
|
||||
expectedValue := v.String()
|
||||
if expectedValue != actualValue {
|
||||
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TestAdmitHandlesCreatingUpdates verifies that admit handles updates which behave as creates
|
||||
func TestAdmitHandlesCreatingUpdates(t *testing.T) {
|
||||
// in this scenario, there is an existing service
|
||||
|
|
|
@ -443,7 +443,6 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
|
|||
accessor.SetNamespace(namespace)
|
||||
}
|
||||
}
|
||||
|
||||
// there is at least one quota that definitely matches our object
|
||||
// as a result, we need to measure the usage of this object for quota
|
||||
// on updates, we need to subtract the previous measured usage
|
||||
|
@ -472,9 +471,10 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
|
|||
if innerErr != nil {
|
||||
return quotas, innerErr
|
||||
}
|
||||
deltaUsage = quota.Subtract(deltaUsage, prevUsage)
|
||||
deltaUsage = quota.SubtractWithNonNegativeResult(deltaUsage, prevUsage)
|
||||
}
|
||||
}
|
||||
|
||||
if quota.IsZero(deltaUsage) {
|
||||
return quotas, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue