Ignore pods for quota that exceed deletion grace period

pull/6/head
Derek Carr 2017-08-25 17:09:10 -04:00
parent db809c0eb7
commit da01c6d3a2
9 changed files with 166 additions and 34 deletions

View File

@ -28,6 +28,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
@ -56,6 +57,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
@ -51,11 +52,11 @@ type ReplenishmentControllerOptions struct {
}
// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions, clock clock.Clock) func(oldObj, newObj interface{}) {
return func(oldObj, newObj interface{}) {
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) {
if core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock) {
options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)
}
}
@ -115,9 +116,10 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon
if err != nil {
return nil, err
}
clock := clock.RealClock{}
informer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
UpdateFunc: PodReplenishmentUpdateFunc(options),
UpdateFunc: PodReplenishmentUpdateFunc(options, clock),
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
options.ResyncPeriod(),

View File

@ -18,11 +18,13 @@ package resourcequota
import (
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/controller"
@ -55,7 +57,8 @@ func TestPodReplenishmentUpdateFunc(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"},
Status: v1.PodStatus{Phase: v1.PodFailed},
}
updateFunc := PodReplenishmentUpdateFunc(&options)
fakeClock := clock.NewFakeClock(time.Now())
updateFunc := PodReplenishmentUpdateFunc(&options, fakeClock)
updateFunc(oldPod, newPod)
if mockReplenish.groupKind != api.Kind("Pod") {
t.Errorf("Unexpected group kind %v", mockReplenish.groupKind)

View File

@ -46,7 +46,6 @@ go_library(
"//conditions:default": [],
}),
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/api/v1/resource:go_default_library",
"//pkg/features:go_default_library",
@ -59,7 +58,6 @@ go_library(
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/quota/evaluator/core:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -27,13 +27,11 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/quota/evaluator/core"
)
const (
@ -638,19 +636,11 @@ func memory(stats statsFunc) cmpFunc {
// adjust p1, p2 usage relative to the request (if any)
p1Memory := p1Usage[v1.ResourceMemory]
p1Spec, err := core.PodUsageFunc(p1)
if err != nil {
return -1
}
p1Request := p1Spec[api.ResourceRequestsMemory]
p1Request := podMemoryRequest(p1)
p1Memory.Sub(p1Request)
p2Memory := p2Usage[v1.ResourceMemory]
p2Spec, err := core.PodUsageFunc(p2)
if err != nil {
return 1
}
p2Request := p2Spec[api.ResourceRequestsMemory]
p2Request := podMemoryRequest(p2)
p2Memory.Sub(p2Request)
// if p2 is using more than p1, we want p2 first
@ -658,6 +648,23 @@ func memory(stats statsFunc) cmpFunc {
}
}
// podMemoryRequest returns the total memory request of a pod which is the
// max(sum of init container requests, sum of container requests)
func podMemoryRequest(pod *v1.Pod) resource.Quantity {
containerValue := resource.Quantity{Format: resource.BinarySI}
for i := range pod.Spec.Containers {
containerValue.Add(*pod.Spec.Containers[i].Resources.Requests.Memory())
}
initValue := resource.Quantity{Format: resource.BinarySI}
for i := range pod.Spec.InitContainers {
initValue.Add(*pod.Spec.InitContainers[i].Resources.Requests.Memory())
}
if containerValue.Cmp(initValue) > 0 {
return containerValue
}
return initValue
}
// disk compares pods by largest consumer of disk relative to request for the specified disk resource.
func disk(stats statsFunc, fsStatsToMeasure []fsStatsType, diskResource v1.ResourceName) cmpFunc {
return func(p1, p2 *v1.Pod) int {

View File

@ -34,6 +34,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/initialization:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
@ -57,8 +58,10 @@ go_test(
deps = [
"//pkg/api:go_default_library",
"//pkg/quota:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)

View File

@ -19,12 +19,15 @@ package core
import (
"fmt"
"strings"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/initialization"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -77,13 +80,14 @@ func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic.
// NewPodEvaluator returns an evaluator that can evaluate pods
// if the specified shared informer factory is not nil, evaluator may use it to support listing functions.
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory, clock clock.Clock) quota.Evaluator {
listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient)
if f != nil {
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("pods"))
}
return &podEvaluator{
listFuncByNamespace: listFuncByNamespace,
clock: clock,
}
}
@ -91,6 +95,8 @@ func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerF
type podEvaluator struct {
// knows how to list pods
listFuncByNamespace generic.ListFuncByNamespace
// used to track time
clock clock.Clock
}
// Constraints verifies that all required resources are present on the pod
@ -167,7 +173,8 @@ func (p *podEvaluator) MatchingResources(input []api.ResourceName) []api.Resourc
// Usage knows how to measure usage associated with pods
func (p *podEvaluator) Usage(item runtime.Object) (api.ResourceList, error) {
return PodUsageFunc(item)
// delegate to normal usage
return PodUsageFunc(item, p.clock)
}
// UsageStats calculates aggregate usage for the object.
@ -253,15 +260,17 @@ func podMatchesScopeFunc(scope api.ResourceQuotaScope, object runtime.Object) (b
return false, nil
}
// PodUsageFunc knows how to measure usage associated with pods
func PodUsageFunc(obj runtime.Object) (api.ResourceList, error) {
// PodUsageFunc returns the quota usage for a pod.
// A pod is charged for quota if the following are not true.
// - pod has a terminal phase (failed or succeeded)
// - pod has been marked for deletion and grace period has expired
func PodUsageFunc(obj runtime.Object, clock clock.Clock) (api.ResourceList, error) {
pod, err := toInternalPodOrError(obj)
if err != nil {
return api.ResourceList{}, err
}
// by convention, we do not quota pods that have reached an end-of-life state
if !QuotaPod(pod) {
// by convention, we do not quota pods that have reached end-of life
if !QuotaPod(pod, clock) {
return api.ResourceList{}, nil
}
// Only charge pod count for uninitialized pod.
@ -274,8 +283,7 @@ func PodUsageFunc(obj runtime.Object) (api.ResourceList, error) {
}
requests := api.ResourceList{}
limits := api.ResourceList{}
// TODO: fix this when we have pod level cgroups
// when we have pod level cgroups, we can just read pod level requests/limits
// TODO: ideally, we have pod level requests and limits in the future.
for i := range pod.Spec.Containers {
requests = quota.Add(requests, pod.Spec.Containers[i].Resources.Requests)
limits = quota.Add(limits, pod.Spec.Containers[i].Resources.Limits)
@ -303,12 +311,47 @@ func isTerminating(pod *api.Pod) bool {
}
// QuotaPod returns true if the pod is eligible to track against a quota
func QuotaPod(pod *api.Pod) bool {
return !(api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase)
// A pod is eligible for quota, unless any of the following are true:
// - pod has a terminal phase (failed or succeeded)
// - pod has been marked for deletion and grace period has expired.
func QuotaPod(pod *api.Pod, clock clock.Clock) bool {
// if pod is terminal, ignore it for quota
if api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase {
return false
}
// deleted pods that should be gone should not be charged to user quota.
// this can happen if a node is lost, and the kubelet is never able to confirm deletion.
// even though the cluster may have drifting clocks, quota makes a reasonable effort
// to balance cluster needs against user needs. user's do not control clocks,
// but at worst a small drive in clocks will only slightly impact quota.
if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil {
now := clock.Now()
deletionTime := pod.DeletionTimestamp.Time
gracePeriod := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if now.After(deletionTime.Add(gracePeriod)) {
return false
}
}
return true
}
// QuotaV1Pod returns true if the pod is eligible to track against a quota
// if it's not in a terminal state according to its phase.
func QuotaV1Pod(pod *v1.Pod) bool {
return !(v1.PodFailed == pod.Status.Phase || v1.PodSucceeded == pod.Status.Phase)
func QuotaV1Pod(pod *v1.Pod, clock clock.Clock) bool {
// if pod is terminal, ignore it for quota
if v1.PodFailed == pod.Status.Phase || v1.PodSucceeded == pod.Status.Phase {
return false
}
// if pods are stuck terminating (for example, a node is lost), we do not want
// to charge the user for that pod in quota because it could prevent them from
// scaling up new pods to service their application.
if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil {
now := clock.Now()
deletionTime := pod.DeletionTimestamp.Time
gracePeriod := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if now.After(deletionTime.Add(gracePeriod)) {
return false
}
}
return true
}

View File

@ -18,11 +18,15 @@ package core
import (
"testing"
"time"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/util/node"
)
func TestPodConstraintsFunc(t *testing.T) {
@ -87,7 +91,7 @@ func TestPodConstraintsFunc(t *testing.T) {
},
}
kubeClient := fake.NewSimpleClientset()
evaluator := NewPodEvaluator(kubeClient, nil)
evaluator := NewPodEvaluator(kubeClient, nil, clock.RealClock{})
for testName, test := range testCases {
err := evaluator.Constraints(test.required, test.pod)
switch {
@ -101,7 +105,16 @@ func TestPodConstraintsFunc(t *testing.T) {
func TestPodEvaluatorUsage(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
evaluator := NewPodEvaluator(kubeClient, nil)
fakeClock := clock.NewFakeClock(time.Now())
evaluator := NewPodEvaluator(kubeClient, nil, fakeClock)
// fields use to simulate a pod undergoing termination
// note: we set the deletion time in the past
now := fakeClock.Now()
terminationGracePeriodSeconds := int64(30)
deletionTimestampPastGracePeriod := metav1.NewTime(now.Add(time.Duration(terminationGracePeriodSeconds) * time.Second * time.Duration(-2)))
deletionTimestampNotPastGracePeriod := metav1.NewTime(fakeClock.Now())
testCases := map[string]struct {
pod *api.Pod
usage api.ResourceList
@ -281,6 +294,66 @@ func TestPodEvaluatorUsage(t *testing.T) {
api.ResourceMemory: resource.MustParse("100M"),
},
},
"pod deletion timestamp exceeded": {
pod: &api.Pod{
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: &deletionTimestampPastGracePeriod,
DeletionGracePeriodSeconds: &terminationGracePeriodSeconds,
},
Status: api.PodStatus{
Reason: node.NodeUnreachablePodReason,
},
Spec: api.PodSpec{
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
Containers: []api.Container{
{
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
api.ResourceMemory: resource.MustParse("50M"),
},
Limits: api.ResourceList{
api.ResourceCPU: resource.MustParse("2"),
api.ResourceMemory: resource.MustParse("100M"),
},
},
},
},
},
},
usage: api.ResourceList{},
},
"pod deletion timestamp not exceeded": {
pod: &api.Pod{
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: &deletionTimestampNotPastGracePeriod,
DeletionGracePeriodSeconds: &terminationGracePeriodSeconds,
},
Status: api.PodStatus{
Reason: node.NodeUnreachablePodReason,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
},
Limits: api.ResourceList{
api.ResourceCPU: resource.MustParse("2"),
},
},
},
},
},
},
usage: api.ResourceList{
api.ResourceRequestsCPU: resource.MustParse("1"),
api.ResourceLimitsCPU: resource.MustParse("2"),
api.ResourcePods: resource.MustParse("1"),
api.ResourceCPU: resource.MustParse("1"),
},
},
}
for testName, testCase := range testCases {
actual, err := evaluator.Usage(testCase.pod)

View File

@ -18,6 +18,7 @@ package core
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/quota"
@ -27,7 +28,7 @@ import (
// NewRegistry returns a registry that knows how to deal with core kubernetes resources
// If an informer factory is provided, evaluators will use them.
func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry {
pod := NewPodEvaluator(kubeClient, f)
pod := NewPodEvaluator(kubeClient, f, clock.RealClock{})
service := NewServiceEvaluator(kubeClient, f)
replicationController := NewReplicationControllerEvaluator(kubeClient, f)
resourceQuota := NewResourceQuotaEvaluator(kubeClient, f)