critical pods can preempt other pods to be admitted

pull/6/head
David Ashpole 2017-02-23 10:31:20 -08:00
parent 2e12711160
commit c58970e47c
15 changed files with 1071 additions and 9 deletions

View File

@ -227,3 +227,31 @@ func PodRequestsAndLimits(pod *Pod) (reqs map[ResourceName]resource.Quantity, li
}
return
}
// finds and returns the request for a specific resource.
func GetResourceRequest(pod *Pod, resource ResourceName) int64 {
if resource == ResourcePods {
return 1
}
totalResources := int64(0)
for _, container := range pod.Spec.Containers {
if rQuantity, ok := container.Resources.Requests[resource]; ok {
if resource == ResourceCPU {
totalResources += rQuantity.MilliValue()
} else {
totalResources += rQuantity.Value()
}
}
}
// take max_resource(sum_pod, any_init_container)
for _, container := range pod.Spec.InitContainers {
if rQuantity, ok := container.Resources.Requests[resource]; ok {
if resource == ResourceCPU && rQuantity.MilliValue() > totalResources {
totalResources = rQuantity.MilliValue()
} else if rQuantity.Value() > totalResources {
totalResources = rQuantity.Value()
}
}
}
return totalResources
}

View File

@ -65,6 +65,7 @@ go_library(
"//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/preemption:go_default_library",
"//pkg/kubelet/prober:go_default_library",
"//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/qos:go_default_library",
@ -253,6 +254,7 @@ filegroup(
"//pkg/kubelet/network:all-srcs",
"//pkg/kubelet/pleg:all-srcs",
"//pkg/kubelet/pod:all-srcs",
"//pkg/kubelet/preemption:all-srcs",
"//pkg/kubelet/prober:all-srcs",
"//pkg/kubelet/qos:all-srcs",
"//pkg/kubelet/remote:all-srcs",

View File

@ -28,6 +28,7 @@ const (
FailedToCreateContainer = "Failed"
FailedToStartContainer = "Failed"
KillingContainer = "Killing"
PreemptContainer = "Preempting"
BackOffStartContainer = "BackOff"
ExceededGracePeriod = "ExceededGracePeriod"

View File

@ -75,6 +75,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/preemption"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/remote"
@ -783,7 +784,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay))
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.getActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler))
// apply functional Option's
for _, opt := range kubeDeps.Options {
opt(klet)

View File

@ -248,7 +248,7 @@ func newTestKubeletWithImageList(
kubelet.evictionManager = evictionManager
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
// Add this as cleanup predicate pod admitter
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay))
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub()))
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
kubelet.volumePluginMgr, err =

View File

@ -11,6 +11,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"admission_failure_handler_stub.go",
"doc.go",
"fake_handler_runner.go",
"handlers.go",
@ -24,6 +25,7 @@ go_library(
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
"//vendor:github.com/golang/glog",

View File

@ -0,0 +1,36 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lifecycle
import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
)
// AdmissionFailureHandlerStub is an AdmissionFailureHandler that does not perform any handling of admission failure.
// It simply passes the failure on.
type AdmissionFailureHandlerStub struct{}
var _ AdmissionFailureHandler = &AdmissionFailureHandlerStub{}
func NewAdmissionFailureHandlerStub() *AdmissionFailureHandlerStub {
return &AdmissionFailureHandlerStub{}
}
func (n *AdmissionFailureHandlerStub) HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) {
return false, failureReasons, nil
}

View File

@ -22,20 +22,30 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type getNodeAnyWayFuncType func() (*v1.Node, error)
// AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod.
// This allows for the graceful handling of pod admission failure.
type AdmissionFailureHandler interface {
HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error)
}
type predicateAdmitHandler struct {
getNodeAnyWayFunc getNodeAnyWayFuncType
getNodeAnyWayFunc getNodeAnyWayFuncType
admissionFailureHandler AdmissionFailureHandler
}
var _ PodAdmitHandler = &predicateAdmitHandler{}
func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType) *predicateAdmitHandler {
func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, admissionFailureHandler AdmissionFailureHandler) *predicateAdmitHandler {
return &predicateAdmitHandler{
getNodeAnyWayFunc,
admissionFailureHandler,
}
}
@ -59,10 +69,22 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedError",
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
if !fit {
fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(pod, reasons)
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
}
if !fit {
var reason string
var message string

View File

@ -0,0 +1,58 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["preemption.go"],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/eviction:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apiserver/pkg/util/feature",
"//vendor:k8s.io/client-go/tools/record",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["preemption_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/client-go/tools/record",
],
)

View File

@ -0,0 +1,264 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package preemption
import (
"fmt"
"math"
"github.com/golang/glog"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/qos"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
)
const message = "Preempted in order to admit critical pod"
// CriticalPodAdmissionFailureHandler is an AdmissionFailureHandler that handles admission failure for Critical Pods.
// If the ONLY admission failures are due to insufficient resources, then CriticalPodAdmissionHandler evicts pods
// so that the critical pod can be admitted. For evictions, the CriticalPodAdmissionHandler evicts a set of pods that
// frees up the required resource requests. The set of pods is designed to minimize impact, and is prioritized according to the ordering:
// minimal impact for guaranteed pods > minimal impact for burstable pods > minimal impact for besteffort pods.
// minimal impact is defined as follows: fewest pods evicted > fewest total requests of pods.
// finding the fewest total requests of pods is considered besteffort.
type CriticalPodAdmissionHandler struct {
getPodsFunc eviction.ActivePodsFunc
killPodFunc eviction.KillPodFunc
recorder record.EventRecorder
}
var _ lifecycle.AdmissionFailureHandler = &CriticalPodAdmissionHandler{}
func NewCriticalPodAdmissionHandler(getPodsFunc eviction.ActivePodsFunc, killPodFunc eviction.KillPodFunc, recorder record.EventRecorder) *CriticalPodAdmissionHandler {
return &CriticalPodAdmissionHandler{
getPodsFunc: getPodsFunc,
killPodFunc: killPodFunc,
recorder: recorder,
}
}
// HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
// to allow admission of the pod despite its previous failure.
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(pod *v1.Pod, failureReasons []algorithm.PredicateFailureReason) (bool, []algorithm.PredicateFailureReason, error) {
if !kubetypes.IsCriticalPod(pod) || !utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) {
return false, failureReasons, nil
}
// InsufficientResourceError is not a reason to reject a critical pod.
// Instead of rejecting, we free up resources to admit it, if no other reasons for rejection exist.
nonResourceReasons := []algorithm.PredicateFailureReason{}
resourceReasons := []*admissionRequirement{}
for _, reason := range failureReasons {
if r, ok := reason.(*predicates.InsufficientResourceError); ok {
resourceReasons = append(resourceReasons, &admissionRequirement{
resourceName: r.ResourceName,
quantity: r.GetInsufficientAmount(),
})
} else {
nonResourceReasons = append(nonResourceReasons, reason)
}
}
if len(nonResourceReasons) > 0 {
// Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
return false, nonResourceReasons, nil
}
err := c.evictPodsToFreeRequests(admissionRequirementList(resourceReasons))
// if no error is returned, preemption succeeded and the pod is safe to admit.
return err == nil, nil, err
}
// freeRequests takes a list of insufficient resources, and attempts to free them by evicting pods
// based on requests. For example, if the only insufficient resource is 200Mb of memory, this function could
// evict a pod with request=250Mb.
func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(insufficientResources admissionRequirementList) error {
podsToPreempt, err := getPodsToPreempt(c.getPodsFunc(), insufficientResources)
if err != nil {
return fmt.Errorf("preemption: error finding a set of pods to preempt: %v", err)
}
glog.Infof("preemption: attempting to evict pods %v, in order to free up resources: %s", podsToPreempt, insufficientResources.toString())
for _, pod := range podsToPreempt {
status := v1.PodStatus{
Phase: v1.PodFailed,
Message: message,
Reason: events.PreemptContainer,
}
// record that we are evicting the pod
c.recorder.Eventf(pod, v1.EventTypeWarning, events.PreemptContainer, message)
// this is a blocking call and should only return when the pod and its containers are killed.
err := c.killPodFunc(pod, status, nil)
if err != nil {
return fmt.Errorf("preemption: pod %s failed to evict %v", format.Pod(pod), err)
}
glog.Infof("preemption: pod %s evicted successfully", format.Pod(pod))
}
return nil
}
// getPodsToPreempt returns a list of pods that could be preempted to free requests >= requirements
func getPodsToPreempt(pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pods)
// make sure that pods exist to reclaim the requirements
unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...)
if len(unableToMeetRequirements) > 0 {
return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", unableToMeetRequirements.toString())
}
// find the guaranteed pods we would need to evict if we already evicted ALL burstable and besteffort pods.
guarateedToEvict, err := getPodsToPreemptByDistance(guaranteedPods, requirements.subtract(append(bestEffortPods, burstablePods...)...))
if err != nil {
return nil, err
}
// Find the burstable pods we would need to evict if we already evicted ALL besteffort pods, and the required guaranteed pods.
burstableToEvict, err := getPodsToPreemptByDistance(burstablePods, requirements.subtract(append(bestEffortPods, guarateedToEvict...)...))
if err != nil {
return nil, err
}
// Find the besteffort pods we would need to evict if we already evicted the required guaranteed and burstable pods.
bestEffortToEvict, err := getPodsToPreemptByDistance(bestEffortPods, requirements.subtract(append(burstableToEvict, guarateedToEvict...)...))
if err != nil {
return nil, err
}
return append(append(bestEffortToEvict, burstableToEvict...), guarateedToEvict...), nil
}
// finds the pods that have pod requests >= admission requirements.
// Chooses pods that minimize "distance" to the requirements.
// If more than one pod exists that fulfills the remaining requirements,
// it chooses the pod that has the "smaller resource request"
// This method, by repeatedly choosing the pod that fulfills as much of the requirements as possible,
// attempts to minimize the number of pods returned.
func getPodsToPreemptByDistance(pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
podsToEvict := []*v1.Pod{}
// evict pods by shortest distance from remaining requirements, updating requirements every round.
for len(requirements) > 0 {
if len(pods) == 0 {
return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", requirements.toString())
}
// all distances must be less than len(requirements), because the max distance for a single requirement is 1
bestDistance := float64(len(requirements) + 1)
bestPodIndex := 0
// Find the pod with the smallest distance from requirements
// Or, in the case of two equidistant pods, find the pod with "smaller" resource requests.
for i, pod := range pods {
dist := requirements.distance(pod)
if dist < bestDistance || (bestDistance == dist && smallerResourceRequest(pod, pods[bestPodIndex])) {
bestDistance = dist
bestPodIndex = i
}
}
// subtract the pod from requirements, and transfer the pod from input-pods to pods-to-evicted
requirements = requirements.subtract(pods[bestPodIndex])
podsToEvict = append(podsToEvict, pods[bestPodIndex])
pods[bestPodIndex] = pods[len(pods)-1]
pods = pods[:len(pods)-1]
}
return podsToEvict, nil
}
type admissionRequirement struct {
resourceName v1.ResourceName
quantity int64
}
type admissionRequirementList []*admissionRequirement
// distance of the pods requests from the admissionRequirements.
// distance is measured by the fraction of the requirement satisfied by the pod,
// so that each requirement is weighted equally, regardless of absolute magnitude.
func (a admissionRequirementList) distance(pod *v1.Pod) float64 {
dist := float64(0)
for _, req := range a {
remainingRequest := float64(req.quantity - v1.GetResourceRequest(pod, req.resourceName))
if remainingRequest < 0 {
remainingRequest = 0
}
dist += math.Pow(remainingRequest/float64(req.quantity), 2)
}
return dist
}
// returns a new admissionRequirementList containing remaining requirements if the provided pod
// were to be preempted
func (a admissionRequirementList) subtract(pods ...*v1.Pod) admissionRequirementList {
newList := []*admissionRequirement{}
for _, req := range a {
newQuantity := req.quantity
for _, pod := range pods {
newQuantity -= v1.GetResourceRequest(pod, req.resourceName)
}
if newQuantity > 0 {
newList = append(newList, &admissionRequirement{
resourceName: req.resourceName,
quantity: newQuantity,
})
}
}
return newList
}
func (a admissionRequirementList) toString() string {
s := "["
for _, req := range a {
s += fmt.Sprintf("(res: %v, q: %d), ", req.resourceName, req.quantity)
}
return s + "]"
}
// returns lists containing non-critical besteffort, burstable, and guaranteed pods
func sortPodsByQOS(pods []*v1.Pod) (bestEffort, burstable, guaranteed []*v1.Pod) {
for _, pod := range pods {
if !kubetypes.IsCriticalPod(pod) {
switch qos.GetPodQOS(pod) {
case v1.PodQOSBestEffort:
bestEffort = append(bestEffort, pod)
case v1.PodQOSBurstable:
burstable = append(burstable, pod)
case v1.PodQOSGuaranteed:
guaranteed = append(guaranteed, pod)
default:
}
}
}
return
}
// returns true if pod1 has a smaller request than pod2
func smallerResourceRequest(pod1 *v1.Pod, pod2 *v1.Pod) bool {
priorityList := []v1.ResourceName{
v1.ResourceNvidiaGPU,
v1.ResourceMemory,
v1.ResourceCPU,
}
for _, res := range priorityList {
req1 := v1.GetResourceRequest(pod1, res)
req2 := v1.GetResourceRequest(pod2, res)
if req1 < req2 {
return true
} else if req1 > req2 {
return false
}
}
return true
}

View File

@ -0,0 +1,480 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package preemption
import (
"fmt"
"testing"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
kubeapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
const (
critical = "critical"
bestEffort = "bestEffort"
burstable = "burstable"
highRequestBurstable = "high-request-burstable"
guaranteed = "guaranteed"
highRequestGuaranteed = "high-request-guaranteed"
tinyBurstable = "tiny"
maxPods = 110
)
type fakePodKiller struct {
killedPods []*v1.Pod
}
func newFakePodKiller() *fakePodKiller {
return &fakePodKiller{killedPods: []*v1.Pod{}}
}
func (f *fakePodKiller) clear() {
f.killedPods = []*v1.Pod{}
}
func (f *fakePodKiller) getKilledPods() []*v1.Pod {
return f.killedPods
}
func (f *fakePodKiller) killPodNow(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error {
f.killedPods = append(f.killedPods, pod)
return nil
}
type fakePodProvider struct {
pods []*v1.Pod
}
func newFakePodProvider() *fakePodProvider {
return &fakePodProvider{pods: []*v1.Pod{}}
}
func (f *fakePodProvider) setPods(pods []*v1.Pod) {
f.pods = pods
}
func (f *fakePodProvider) getPods() []*v1.Pod {
return f.pods
}
func getTestCriticalPodAdmissionHandler(podProvider *fakePodProvider, podKiller *fakePodKiller) *CriticalPodAdmissionHandler {
return &CriticalPodAdmissionHandler{
getPodsFunc: podProvider.getPods,
killPodFunc: podKiller.killPodNow,
recorder: &record.FakeRecorder{},
}
}
func TestEvictPodsToFreeRequests(t *testing.T) {
type testRun struct {
testName string
inputPods []*v1.Pod
insufficientResources admissionRequirementList
expectErr bool
expectedOutput []*v1.Pod
}
podProvider := newFakePodProvider()
podKiller := newFakePodKiller()
criticalPodAdmissionHandler := getTestCriticalPodAdmissionHandler(podProvider, podKiller)
allPods := getTestPods()
runs := []testRun{
{
testName: "critical pods cannot be preempted",
inputPods: []*v1.Pod{allPods[critical]},
insufficientResources: getAdmissionRequirementList(0, 0, 1),
expectErr: true,
expectedOutput: nil,
},
{
testName: "best effort pods are not preempted when attempting to free resources",
inputPods: []*v1.Pod{allPods[bestEffort]},
insufficientResources: getAdmissionRequirementList(0, 1, 0),
expectErr: true,
expectedOutput: nil,
},
{
testName: "multiple pods evicted",
inputPods: []*v1.Pod{
allPods[critical], allPods[bestEffort], allPods[burstable], allPods[highRequestBurstable],
allPods[guaranteed], allPods[highRequestGuaranteed]},
insufficientResources: getAdmissionRequirementList(0, 550, 0),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[highRequestBurstable], allPods[highRequestGuaranteed]},
},
}
for _, r := range runs {
podProvider.setPods(r.inputPods)
outErr := criticalPodAdmissionHandler.evictPodsToFreeRequests(r.insufficientResources)
outputPods := podKiller.getKilledPods()
if !r.expectErr && outErr != nil {
t.Errorf("evictPodsToFreeRequests returned an unexpected error during the %s test. Err: %v", r.testName, outErr)
} else if r.expectErr && outErr == nil {
t.Errorf("evictPodsToFreeRequests expected an error but returned a successful output=%v during the %s test.", outputPods, r.testName)
} else if !podListEqual(r.expectedOutput, outputPods) {
t.Errorf("evictPodsToFreeRequests expected %v but got %v during the %s test.", r.expectedOutput, outputPods, r.testName)
}
podKiller.clear()
}
}
func BenchmarkGetPodsToPreempt(t *testing.B) {
allPods := getTestPods()
inputPods := []*v1.Pod{}
for i := 0; i < maxPods; i++ {
inputPods = append(inputPods, allPods[tinyBurstable])
}
for n := 0; n < t.N; n++ {
getPodsToPreempt(inputPods, admissionRequirementList([]*admissionRequirement{
{
resourceName: v1.ResourceCPU,
quantity: parseCPUToInt64("110m"),
}}))
}
}
func TestGetPodsToPreempt(t *testing.T) {
type testRun struct {
testName string
inputPods []*v1.Pod
insufficientResources admissionRequirementList
expectErr bool
expectedOutput []*v1.Pod
}
allPods := getTestPods()
runs := []testRun{
{
testName: "no requirements",
inputPods: []*v1.Pod{},
insufficientResources: getAdmissionRequirementList(0, 0, 0),
expectErr: false,
expectedOutput: []*v1.Pod{},
},
{
testName: "no pods",
inputPods: []*v1.Pod{},
insufficientResources: getAdmissionRequirementList(0, 0, 1),
expectErr: true,
expectedOutput: nil,
},
{
testName: "equal pods and resources requirements",
inputPods: []*v1.Pod{allPods[burstable]},
insufficientResources: getAdmissionRequirementList(100, 100, 1),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[burstable]},
},
{
testName: "higer requirements than pod requests",
inputPods: []*v1.Pod{allPods[burstable]},
insufficientResources: getAdmissionRequirementList(200, 200, 2),
expectErr: true,
expectedOutput: nil,
},
{
testName: "choose between bestEffort and burstable",
inputPods: []*v1.Pod{allPods[burstable], allPods[bestEffort]},
insufficientResources: getAdmissionRequirementList(0, 0, 1),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[bestEffort]},
},
{
testName: "choose between burstable and guaranteed",
inputPods: []*v1.Pod{allPods[burstable], allPods[guaranteed]},
insufficientResources: getAdmissionRequirementList(0, 0, 1),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[burstable]},
},
{
testName: "choose lower request burstable if it meets requirements",
inputPods: []*v1.Pod{allPods[bestEffort], allPods[highRequestBurstable], allPods[burstable]},
insufficientResources: getAdmissionRequirementList(100, 100, 0),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[burstable]},
},
{
testName: "choose higher request burstable if lower does not meet requirements",
inputPods: []*v1.Pod{allPods[bestEffort], allPods[burstable], allPods[highRequestBurstable]},
insufficientResources: getAdmissionRequirementList(150, 150, 0),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[highRequestBurstable]},
},
{
testName: "multiple pods required",
inputPods: []*v1.Pod{allPods[bestEffort], allPods[burstable], allPods[highRequestBurstable], allPods[guaranteed], allPods[highRequestGuaranteed]},
insufficientResources: getAdmissionRequirementList(350, 350, 0),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[burstable], allPods[highRequestBurstable]},
},
{
testName: "evict guaranteed when we have to, and dont evict the extra burstable",
inputPods: []*v1.Pod{allPods[bestEffort], allPods[burstable], allPods[highRequestBurstable], allPods[guaranteed], allPods[highRequestGuaranteed]},
insufficientResources: getAdmissionRequirementList(0, 550, 0),
expectErr: false,
expectedOutput: []*v1.Pod{allPods[highRequestBurstable], allPods[highRequestGuaranteed]},
},
}
for _, r := range runs {
outputPods, outErr := getPodsToPreempt(r.inputPods, r.insufficientResources)
if !r.expectErr && outErr != nil {
t.Errorf("getPodsToPreempt returned an unexpected error during the %s test. Err: %v", r.testName, outErr)
} else if r.expectErr && outErr == nil {
t.Errorf("getPodsToPreempt expected an error but returned a successful output=%v during the %s test.", outputPods, r.testName)
} else if !podListEqual(r.expectedOutput, outputPods) {
t.Errorf("getPodsToPreempt expected %v but got %v during the %s test.", r.expectedOutput, outputPods, r.testName)
}
}
}
func TestAdmissionRequirementsDistance(t *testing.T) {
type testRun struct {
testName string
requirements admissionRequirementList
inputPod *v1.Pod
expectedOutput float64
}
allPods := getTestPods()
runs := []testRun{
{
testName: "no requirements",
requirements: getAdmissionRequirementList(0, 0, 0),
inputPod: allPods[burstable],
expectedOutput: 0,
},
{
testName: "no requests, some requirements",
requirements: getAdmissionRequirementList(100, 100, 1),
inputPod: allPods[bestEffort],
expectedOutput: 2,
},
{
testName: "equal requests and requirements",
requirements: getAdmissionRequirementList(100, 100, 1),
inputPod: allPods[burstable],
expectedOutput: 0,
},
{
testName: "higher requests than requirements",
requirements: getAdmissionRequirementList(50, 50, 0),
inputPod: allPods[burstable],
expectedOutput: 0,
},
}
for _, run := range runs {
output := run.requirements.distance(run.inputPod)
if output != run.expectedOutput {
t.Errorf("expected: %f, got: %f for %s test", run.expectedOutput, output, run.testName)
}
}
}
func TestAdmissionRequirementsSubtract(t *testing.T) {
type testRun struct {
testName string
initial admissionRequirementList
inputPod *v1.Pod
expectedOutput admissionRequirementList
}
allPods := getTestPods()
runs := []testRun{
{
testName: "subtract a pod from no requirements",
initial: getAdmissionRequirementList(0, 0, 0),
inputPod: allPods[burstable],
expectedOutput: getAdmissionRequirementList(0, 0, 0),
},
{
testName: "subtract no requests from some requirements",
initial: getAdmissionRequirementList(100, 100, 1),
inputPod: allPods[bestEffort],
expectedOutput: getAdmissionRequirementList(100, 100, 0),
},
{
testName: "equal requests and requirements",
initial: getAdmissionRequirementList(100, 100, 1),
inputPod: allPods[burstable],
expectedOutput: getAdmissionRequirementList(0, 0, 0),
},
{
testName: "subtract higher requests than requirements",
initial: getAdmissionRequirementList(50, 50, 0),
inputPod: allPods[burstable],
expectedOutput: getAdmissionRequirementList(0, 0, 0),
},
{
testName: "subtract lower requests than requirements",
initial: getAdmissionRequirementList(200, 200, 1),
inputPod: allPods[burstable],
expectedOutput: getAdmissionRequirementList(100, 100, 0),
},
}
for _, run := range runs {
output := run.initial.subtract(run.inputPod)
if !admissionRequirementListEqual(output, run.expectedOutput) {
t.Errorf("expected: %s, got: %s for %s test", run.expectedOutput.toString(), output.toString(), run.testName)
}
}
}
func getTestPods() map[string]*v1.Pod {
allPods := map[string]*v1.Pod{
tinyBurstable: getPodWithResources(tinyBurstable, v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("1m"),
"memory": resource.MustParse("1Mi"),
},
}),
bestEffort: getPodWithResources(bestEffort, v1.ResourceRequirements{}),
critical: getPodWithResources(critical, v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("100Mi"),
},
}),
burstable: getPodWithResources(burstable, v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("100Mi"),
},
}),
guaranteed: getPodWithResources(guaranteed, v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("100Mi"),
},
Limits: v1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("100Mi"),
},
}),
highRequestBurstable: getPodWithResources(highRequestBurstable, v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("300m"),
"memory": resource.MustParse("300Mi"),
},
}),
highRequestGuaranteed: getPodWithResources(highRequestGuaranteed, v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("300m"),
"memory": resource.MustParse("300Mi"),
},
Limits: v1.ResourceList{
"cpu": resource.MustParse("300m"),
"memory": resource.MustParse("300Mi"),
},
}),
}
allPods[critical].Namespace = kubeapi.NamespaceSystem
allPods[critical].Annotations[kubetypes.CriticalPodAnnotationKey] = ""
return allPods
}
func getPodWithResources(name string, requests v1.ResourceRequirements) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: name,
Annotations: map[string]string{},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: fmt.Sprintf("%s-container", name),
Resources: requests,
},
},
},
}
}
func parseCPUToInt64(res string) int64 {
r := resource.MustParse(res)
return (&r).MilliValue()
}
func parseNonCpuResourceToInt64(res string) int64 {
r := resource.MustParse(res)
return (&r).Value()
}
func getAdmissionRequirementList(cpu, memory, pods int) admissionRequirementList {
reqs := []*admissionRequirement{}
if cpu > 0 {
reqs = append(reqs, &admissionRequirement{
resourceName: v1.ResourceCPU,
quantity: parseCPUToInt64(fmt.Sprintf("%dm", cpu)),
})
}
if memory > 0 {
reqs = append(reqs, &admissionRequirement{
resourceName: v1.ResourceMemory,
quantity: parseNonCpuResourceToInt64(fmt.Sprintf("%dMi", memory)),
})
}
if pods > 0 {
reqs = append(reqs, &admissionRequirement{
resourceName: v1.ResourcePods,
quantity: int64(pods),
})
}
return admissionRequirementList(reqs)
}
// this checks if the lists contents contain all of the same elements.
// this is not correct if there are duplicate pods in the list.
// for example: podListEqual([a, a, b], [a, b, b]) will return true
func admissionRequirementListEqual(list1 admissionRequirementList, list2 admissionRequirementList) bool {
if len(list1) != len(list2) {
return false
}
for _, a := range list1 {
contains := false
for _, b := range list2 {
if a.resourceName == b.resourceName && a.quantity == b.quantity {
contains = true
}
}
if !contains {
return false
}
}
return true
}
// this checks if the lists contents contain all of the same elements.
// this is not correct if there are duplicate pods in the list.
// for example: podListEqual([a, a, b], [a, b, b]) will return true
func podListEqual(list1 []*v1.Pod, list2 []*v1.Pod) bool {
if len(list1) != len(list2) {
return false
}
for _, a := range list1 {
contains := false
for _, b := range list2 {
if a == b {
contains = true
}
}
if !contains {
return false
}
}
return true
}

View File

@ -70,6 +70,10 @@ func (e *InsufficientResourceError) GetReason() string {
return fmt.Sprintf("Insufficient %v", e.ResourceName)
}
func (e *InsufficientResourceError) GetInsufficientAmount() int64 {
return e.requested - (e.capacity - e.used)
}
type PredicateFailureError struct {
PredicateName string
}

View File

@ -72,16 +72,21 @@ func (c *PodClient) Create(pod *v1.Pod) *v1.Pod {
return p
}
// CreateSync creates a new pod according to the framework specifications, and wait for it to start.
func (c *PodClient) CreateSync(pod *v1.Pod) *v1.Pod {
// CreateSync creates a new pod according to the framework specifications in the given namespace, and waits for it to start.
func (c *PodClient) CreateSyncInNamespace(pod *v1.Pod, namespace string) *v1.Pod {
p := c.Create(pod)
ExpectNoError(c.f.WaitForPodRunning(p.Name))
ExpectNoError(WaitForPodNameRunningInNamespace(c.f.ClientSet, p.Name, namespace))
// Get the newest pod after it becomes running, some status may change after pod created, such as pod ip.
p, err := c.Get(p.Name, metav1.GetOptions{})
ExpectNoError(err)
return p
}
// CreateSync creates a new pod according to the framework specifications, and wait for it to start.
func (c *PodClient) CreateSync(pod *v1.Pod) *v1.Pod {
return c.CreateSyncInNamespace(pod, c.f.Namespace.Name)
}
// CreateBatch create a batch of pods. All pods are created before waiting.
func (c *PodClient) CreateBatch(pods []*v1.Pod) []*v1.Pod {
ps := make([]*v1.Pod, len(pods))
@ -124,11 +129,17 @@ func (c *PodClient) Update(name string, updateFn func(pod *v1.Pod)) {
// DeleteSync deletes the pod and wait for the pod to disappear for `timeout`. If the pod doesn't
// disappear before the timeout, it will fail the test.
func (c *PodClient) DeleteSync(name string, options *metav1.DeleteOptions, timeout time.Duration) {
c.DeleteSyncInNamespace(name, c.f.Namespace.Name, options, timeout)
}
// DeleteSyncInNamespace deletes the pod from the namespace and wait for the pod to disappear for `timeout`. If the pod doesn't
// disappear before the timeout, it will fail the test.
func (c *PodClient) DeleteSyncInNamespace(name string, namespace string, options *metav1.DeleteOptions, timeout time.Duration) {
err := c.Delete(name, options)
if err != nil && !errors.IsNotFound(err) {
Failf("Failed to delete pod %q: %v", name, err)
}
Expect(WaitForPodToDisappear(c.f.ClientSet, c.f.Namespace.Name, name, labels.Everything(),
Expect(WaitForPodToDisappear(c.f.ClientSet, namespace, name, labels.Everything(),
2*time.Second, timeout)).To(Succeed(), "wait for pod %q to disappear", name)
}

View File

@ -52,6 +52,7 @@ go_test(
"apparmor_test.go",
"cgroup_manager_test.go",
"container_manager_test.go",
"critical_pod_test.go",
"density_test.go",
"disk_eviction_test.go",
"dynamic_kubelet_configuration_test.go",
@ -76,7 +77,9 @@ go_test(
"integration",
],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubelet:go_default_library",
"//pkg/kubelet/api/v1alpha1/stats:go_default_library",
@ -85,6 +88,7 @@ go_test(
"//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/images:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/metrics:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//test/e2e/common:go_default_library",

View File

@ -0,0 +1,148 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
criticalPodName = "critical-pod"
guaranteedPodName = "guaranteed"
burstablePodName = "burstable"
bestEffortPodName = "best-effort"
)
var _ = framework.KubeDescribe("CriticalPod [Serial] [Disruptive]", func() {
f := framework.NewDefaultFramework("critical-pod-test")
Context("when we need to admit a critical pod", func() {
tempSetCurrentKubeletConfig(f, func(initialConfig *componentconfig.KubeletConfiguration) {
initialConfig.FeatureGates += ", ExperimentalCriticalPodAnnotation=true"
})
It("should be able to create and delete a critical pod", func() {
configEnabled, err := isKubeletConfigEnabled(f)
framework.ExpectNoError(err)
if !configEnabled {
framework.Skipf("unable to run test without dynamic kubelet config enabled.")
}
// Define test pods
nonCriticalGuaranteed := getTestPod(false, guaranteedPodName, v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("100Mi"),
},
Limits: v1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("100Mi"),
},
})
nonCriticalBurstable := getTestPod(false, burstablePodName, v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("100Mi"),
},
})
nonCriticalBestEffort := getTestPod(false, bestEffortPodName, v1.ResourceRequirements{})
criticalPod := getTestPod(true, criticalPodName, v1.ResourceRequirements{
// request the entire resource capacity of the node, so that
// admitting this pod requires the other pod to be preempted
Requests: getNodeCPUAndMemoryCapacity(f),
})
// Create pods, starting with non-critical so that the critical preempts the other pods.
f.PodClient().CreateBatch([]*v1.Pod{nonCriticalBestEffort, nonCriticalBurstable, nonCriticalGuaranteed})
f.PodClientNS(kubeapi.NamespaceSystem).CreateSyncInNamespace(criticalPod, kubeapi.NamespaceSystem)
// Check that non-critical pods other than the besteffort have been evicted
updatedPodList, err := f.ClientSet.Core().Pods(f.Namespace.Name).List(metav1.ListOptions{})
framework.ExpectNoError(err)
for _, p := range updatedPodList.Items {
if p.Name == nonCriticalBestEffort.Name {
Expect(p.Status.Phase).NotTo(Equal(v1.PodFailed), fmt.Sprintf("pod: %v should be preempted", p.Name))
} else {
Expect(p.Status.Phase).To(Equal(v1.PodFailed), fmt.Sprintf("pod: %v should not be preempted", p.Name))
}
}
})
AfterEach(func() {
// Delete Pods
f.PodClient().DeleteSync(guaranteedPodName, &metav1.DeleteOptions{}, podDisappearTimeout)
f.PodClient().DeleteSync(burstablePodName, &metav1.DeleteOptions{}, podDisappearTimeout)
f.PodClient().DeleteSync(bestEffortPodName, &metav1.DeleteOptions{}, podDisappearTimeout)
f.PodClientNS(kubeapi.NamespaceSystem).DeleteSyncInNamespace(criticalPodName, kubeapi.NamespaceSystem, &metav1.DeleteOptions{}, podDisappearTimeout)
// Log Events
logPodEvents(f)
logNodeEvents(f)
})
})
})
func getNodeCPUAndMemoryCapacity(f *framework.Framework) v1.ResourceList {
nodeList, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{})
framework.ExpectNoError(err)
// Assuming that there is only one node, because this is a node e2e test.
Expect(len(nodeList.Items)).To(Equal(1))
capacity := nodeList.Items[0].Status.Capacity
return v1.ResourceList{
v1.ResourceCPU: capacity[v1.ResourceCPU],
v1.ResourceMemory: capacity[v1.ResourceMemory],
}
}
func getTestPod(critical bool, name string, resources v1.ResourceRequirements) *v1.Pod {
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: framework.GetPauseImageNameForHostArch(),
Resources: resources,
},
},
},
}
if critical {
pod.ObjectMeta.Namespace = kubeapi.NamespaceSystem
pod.ObjectMeta.Annotations = map[string]string{
kubelettypes.CriticalPodAnnotationKey: "",
}
Expect(kubelettypes.IsCriticalPod(pod)).To(BeTrue(), "pod should be a critical pod")
} else {
Expect(kubelettypes.IsCriticalPod(pod)).To(BeFalse(), "pod should not be a critical pod")
}
return pod
}