From da01c6d3a25e3dc9ab0626da375bfbe5958c058b Mon Sep 17 00:00:00 2001 From: Derek Carr Date: Fri, 25 Aug 2017 17:09:10 -0400 Subject: [PATCH] Ignore pods for quota that exceed deletion grace period --- pkg/controller/resourcequota/BUILD | 2 + .../resourcequota/replenishment_controller.go | 8 +- .../replenishment_controller_test.go | 5 +- pkg/kubelet/eviction/BUILD | 2 - pkg/kubelet/eviction/helpers.go | 31 +++++--- pkg/quota/evaluator/core/BUILD | 3 + pkg/quota/evaluator/core/pods.go | 69 +++++++++++++---- pkg/quota/evaluator/core/pods_test.go | 77 ++++++++++++++++++- pkg/quota/evaluator/core/registry.go | 3 +- 9 files changed, 166 insertions(+), 34 deletions(-) diff --git a/pkg/controller/resourcequota/BUILD b/pkg/controller/resourcequota/BUILD index 1e5f0646f1..b5661e4952 100644 --- a/pkg/controller/resourcequota/BUILD +++ b/pkg/controller/resourcequota/BUILD @@ -28,6 +28,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", @@ -56,6 +57,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go index ec7372a333..5f063e6568 100644 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ b/pkg/controller/resourcequota/replenishment_controller.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" @@ -51,11 +52,11 @@ type ReplenishmentControllerOptions struct { } // PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not -func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) { +func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions, clock clock.Clock) func(oldObj, newObj interface{}) { return func(oldObj, newObj interface{}) { oldPod := oldObj.(*v1.Pod) newPod := newObj.(*v1.Pod) - if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) { + if core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock) { options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod) } } @@ -115,9 +116,10 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon if err != nil { return nil, err } + clock := clock.RealClock{} informer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ - UpdateFunc: PodReplenishmentUpdateFunc(options), + UpdateFunc: PodReplenishmentUpdateFunc(options, clock), DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, options.ResyncPeriod(), diff --git a/pkg/controller/resourcequota/replenishment_controller_test.go b/pkg/controller/resourcequota/replenishment_controller_test.go index e5797ef668..45df8164ec 100644 --- a/pkg/controller/resourcequota/replenishment_controller_test.go +++ b/pkg/controller/resourcequota/replenishment_controller_test.go @@ -18,11 +18,13 @@ package resourcequota import ( "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/controller" @@ -55,7 +57,8 @@ func TestPodReplenishmentUpdateFunc(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}, Status: v1.PodStatus{Phase: v1.PodFailed}, } - updateFunc := PodReplenishmentUpdateFunc(&options) + fakeClock := clock.NewFakeClock(time.Now()) + updateFunc := PodReplenishmentUpdateFunc(&options, fakeClock) updateFunc(oldPod, newPod) if mockReplenish.groupKind != api.Kind("Pod") { t.Errorf("Unexpected group kind %v", mockReplenish.groupKind) diff --git a/pkg/kubelet/eviction/BUILD b/pkg/kubelet/eviction/BUILD index 69b373533f..f3d0ae93d7 100644 --- a/pkg/kubelet/eviction/BUILD +++ b/pkg/kubelet/eviction/BUILD @@ -46,7 +46,6 @@ go_library( "//conditions:default": [], }), deps = [ - "//pkg/api:go_default_library", "//pkg/api/v1/helper/qos:go_default_library", "//pkg/api/v1/resource:go_default_library", "//pkg/features:go_default_library", @@ -59,7 +58,6 @@ go_library( "//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/format:go_default_library", - "//pkg/quota/evaluator/core:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index 34481f96ae..dd04e9ff62 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -27,13 +27,11 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/kubernetes/pkg/api" v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cm" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" "k8s.io/kubernetes/pkg/kubelet/server/stats" - "k8s.io/kubernetes/pkg/quota/evaluator/core" ) const ( @@ -638,19 +636,11 @@ func memory(stats statsFunc) cmpFunc { // adjust p1, p2 usage relative to the request (if any) p1Memory := p1Usage[v1.ResourceMemory] - p1Spec, err := core.PodUsageFunc(p1) - if err != nil { - return -1 - } - p1Request := p1Spec[api.ResourceRequestsMemory] + p1Request := podMemoryRequest(p1) p1Memory.Sub(p1Request) p2Memory := p2Usage[v1.ResourceMemory] - p2Spec, err := core.PodUsageFunc(p2) - if err != nil { - return 1 - } - p2Request := p2Spec[api.ResourceRequestsMemory] + p2Request := podMemoryRequest(p2) p2Memory.Sub(p2Request) // if p2 is using more than p1, we want p2 first @@ -658,6 +648,23 @@ func memory(stats statsFunc) cmpFunc { } } +// podMemoryRequest returns the total memory request of a pod which is the +// max(sum of init container requests, sum of container requests) +func podMemoryRequest(pod *v1.Pod) resource.Quantity { + containerValue := resource.Quantity{Format: resource.BinarySI} + for i := range pod.Spec.Containers { + containerValue.Add(*pod.Spec.Containers[i].Resources.Requests.Memory()) + } + initValue := resource.Quantity{Format: resource.BinarySI} + for i := range pod.Spec.InitContainers { + initValue.Add(*pod.Spec.InitContainers[i].Resources.Requests.Memory()) + } + if containerValue.Cmp(initValue) > 0 { + return containerValue + } + return initValue +} + // disk compares pods by largest consumer of disk relative to request for the specified disk resource. func disk(stats statsFunc, fsStatsToMeasure []fsStatsType, diskResource v1.ResourceName) cmpFunc { return func(p1, p2 *v1.Pod) int { diff --git a/pkg/quota/evaluator/core/BUILD b/pkg/quota/evaluator/core/BUILD index 3fa20c8931..14e63cf3d9 100644 --- a/pkg/quota/evaluator/core/BUILD +++ b/pkg/quota/evaluator/core/BUILD @@ -34,6 +34,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/initialization:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", @@ -57,8 +58,10 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/quota:go_default_library", + "//pkg/util/node:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", ], ) diff --git a/pkg/quota/evaluator/core/pods.go b/pkg/quota/evaluator/core/pods.go index 7272ed5e26..7354c3d8bf 100644 --- a/pkg/quota/evaluator/core/pods.go +++ b/pkg/quota/evaluator/core/pods.go @@ -19,12 +19,15 @@ package core import ( "fmt" "strings" + "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/initialization" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -77,13 +80,14 @@ func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic. // NewPodEvaluator returns an evaluator that can evaluate pods // if the specified shared informer factory is not nil, evaluator may use it to support listing functions. -func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator { +func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory, clock clock.Clock) quota.Evaluator { listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient) if f != nil { listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("pods")) } return &podEvaluator{ listFuncByNamespace: listFuncByNamespace, + clock: clock, } } @@ -91,6 +95,8 @@ func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerF type podEvaluator struct { // knows how to list pods listFuncByNamespace generic.ListFuncByNamespace + // used to track time + clock clock.Clock } // Constraints verifies that all required resources are present on the pod @@ -167,7 +173,8 @@ func (p *podEvaluator) MatchingResources(input []api.ResourceName) []api.Resourc // Usage knows how to measure usage associated with pods func (p *podEvaluator) Usage(item runtime.Object) (api.ResourceList, error) { - return PodUsageFunc(item) + // delegate to normal usage + return PodUsageFunc(item, p.clock) } // UsageStats calculates aggregate usage for the object. @@ -253,15 +260,17 @@ func podMatchesScopeFunc(scope api.ResourceQuotaScope, object runtime.Object) (b return false, nil } -// PodUsageFunc knows how to measure usage associated with pods -func PodUsageFunc(obj runtime.Object) (api.ResourceList, error) { +// PodUsageFunc returns the quota usage for a pod. +// A pod is charged for quota if the following are not true. +// - pod has a terminal phase (failed or succeeded) +// - pod has been marked for deletion and grace period has expired +func PodUsageFunc(obj runtime.Object, clock clock.Clock) (api.ResourceList, error) { pod, err := toInternalPodOrError(obj) if err != nil { return api.ResourceList{}, err } - - // by convention, we do not quota pods that have reached an end-of-life state - if !QuotaPod(pod) { + // by convention, we do not quota pods that have reached end-of life + if !QuotaPod(pod, clock) { return api.ResourceList{}, nil } // Only charge pod count for uninitialized pod. @@ -274,8 +283,7 @@ func PodUsageFunc(obj runtime.Object) (api.ResourceList, error) { } requests := api.ResourceList{} limits := api.ResourceList{} - // TODO: fix this when we have pod level cgroups - // when we have pod level cgroups, we can just read pod level requests/limits + // TODO: ideally, we have pod level requests and limits in the future. for i := range pod.Spec.Containers { requests = quota.Add(requests, pod.Spec.Containers[i].Resources.Requests) limits = quota.Add(limits, pod.Spec.Containers[i].Resources.Limits) @@ -303,12 +311,47 @@ func isTerminating(pod *api.Pod) bool { } // QuotaPod returns true if the pod is eligible to track against a quota -func QuotaPod(pod *api.Pod) bool { - return !(api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase) +// A pod is eligible for quota, unless any of the following are true: +// - pod has a terminal phase (failed or succeeded) +// - pod has been marked for deletion and grace period has expired. +func QuotaPod(pod *api.Pod, clock clock.Clock) bool { + // if pod is terminal, ignore it for quota + if api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase { + return false + } + // deleted pods that should be gone should not be charged to user quota. + // this can happen if a node is lost, and the kubelet is never able to confirm deletion. + // even though the cluster may have drifting clocks, quota makes a reasonable effort + // to balance cluster needs against user needs. user's do not control clocks, + // but at worst a small drive in clocks will only slightly impact quota. + if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil { + now := clock.Now() + deletionTime := pod.DeletionTimestamp.Time + gracePeriod := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second + if now.After(deletionTime.Add(gracePeriod)) { + return false + } + } + return true } // QuotaV1Pod returns true if the pod is eligible to track against a quota // if it's not in a terminal state according to its phase. -func QuotaV1Pod(pod *v1.Pod) bool { - return !(v1.PodFailed == pod.Status.Phase || v1.PodSucceeded == pod.Status.Phase) +func QuotaV1Pod(pod *v1.Pod, clock clock.Clock) bool { + // if pod is terminal, ignore it for quota + if v1.PodFailed == pod.Status.Phase || v1.PodSucceeded == pod.Status.Phase { + return false + } + // if pods are stuck terminating (for example, a node is lost), we do not want + // to charge the user for that pod in quota because it could prevent them from + // scaling up new pods to service their application. + if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil { + now := clock.Now() + deletionTime := pod.DeletionTimestamp.Time + gracePeriod := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second + if now.After(deletionTime.Add(gracePeriod)) { + return false + } + } + return true } diff --git a/pkg/quota/evaluator/core/pods_test.go b/pkg/quota/evaluator/core/pods_test.go index 3a93165b81..bb3714f8ab 100644 --- a/pkg/quota/evaluator/core/pods_test.go +++ b/pkg/quota/evaluator/core/pods_test.go @@ -18,11 +18,15 @@ package core import ( "testing" + "time" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/quota" + "k8s.io/kubernetes/pkg/util/node" ) func TestPodConstraintsFunc(t *testing.T) { @@ -87,7 +91,7 @@ func TestPodConstraintsFunc(t *testing.T) { }, } kubeClient := fake.NewSimpleClientset() - evaluator := NewPodEvaluator(kubeClient, nil) + evaluator := NewPodEvaluator(kubeClient, nil, clock.RealClock{}) for testName, test := range testCases { err := evaluator.Constraints(test.required, test.pod) switch { @@ -101,7 +105,16 @@ func TestPodConstraintsFunc(t *testing.T) { func TestPodEvaluatorUsage(t *testing.T) { kubeClient := fake.NewSimpleClientset() - evaluator := NewPodEvaluator(kubeClient, nil) + fakeClock := clock.NewFakeClock(time.Now()) + evaluator := NewPodEvaluator(kubeClient, nil, fakeClock) + + // fields use to simulate a pod undergoing termination + // note: we set the deletion time in the past + now := fakeClock.Now() + terminationGracePeriodSeconds := int64(30) + deletionTimestampPastGracePeriod := metav1.NewTime(now.Add(time.Duration(terminationGracePeriodSeconds) * time.Second * time.Duration(-2))) + deletionTimestampNotPastGracePeriod := metav1.NewTime(fakeClock.Now()) + testCases := map[string]struct { pod *api.Pod usage api.ResourceList @@ -281,6 +294,66 @@ func TestPodEvaluatorUsage(t *testing.T) { api.ResourceMemory: resource.MustParse("100M"), }, }, + "pod deletion timestamp exceeded": { + pod: &api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &deletionTimestampPastGracePeriod, + DeletionGracePeriodSeconds: &terminationGracePeriodSeconds, + }, + Status: api.PodStatus{ + Reason: node.NodeUnreachablePodReason, + }, + Spec: api.PodSpec{ + TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, + Containers: []api.Container{ + { + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceCPU: resource.MustParse("1"), + api.ResourceMemory: resource.MustParse("50M"), + }, + Limits: api.ResourceList{ + api.ResourceCPU: resource.MustParse("2"), + api.ResourceMemory: resource.MustParse("100M"), + }, + }, + }, + }, + }, + }, + usage: api.ResourceList{}, + }, + "pod deletion timestamp not exceeded": { + pod: &api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &deletionTimestampNotPastGracePeriod, + DeletionGracePeriodSeconds: &terminationGracePeriodSeconds, + }, + Status: api.PodStatus{ + Reason: node.NodeUnreachablePodReason, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceCPU: resource.MustParse("1"), + }, + Limits: api.ResourceList{ + api.ResourceCPU: resource.MustParse("2"), + }, + }, + }, + }, + }, + }, + usage: api.ResourceList{ + api.ResourceRequestsCPU: resource.MustParse("1"), + api.ResourceLimitsCPU: resource.MustParse("2"), + api.ResourcePods: resource.MustParse("1"), + api.ResourceCPU: resource.MustParse("1"), + }, + }, } for testName, testCase := range testCases { actual, err := evaluator.Usage(testCase.pod) diff --git a/pkg/quota/evaluator/core/registry.go b/pkg/quota/evaluator/core/registry.go index 0ada06e405..e6278c731c 100644 --- a/pkg/quota/evaluator/core/registry.go +++ b/pkg/quota/evaluator/core/registry.go @@ -18,6 +18,7 @@ package core import ( "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/quota" @@ -27,7 +28,7 @@ import ( // NewRegistry returns a registry that knows how to deal with core kubernetes resources // If an informer factory is provided, evaluators will use them. func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry { - pod := NewPodEvaluator(kubeClient, f) + pod := NewPodEvaluator(kubeClient, f, clock.RealClock{}) service := NewServiceEvaluator(kubeClient, f) replicationController := NewReplicationControllerEvaluator(kubeClient, f) resourceQuota := NewResourceQuotaEvaluator(kubeClient, f)