Merge pull request #51294 from dashpole/scheduler_remove_ood

Automatic merge from submit-queue (batch tested with PRs 51915, 51294, 51562, 51911)

Remove OutOfDisk from controllers

This is one of the working items for #48843 for 1.8.

This changes the scheduler and daemonset controllers to no longer respect the OutOfDisk condition.  The kubelet has not published OutOfDisk=True since 1.5.
This still preserves the Toleration for the OutOfDisk condition, as (I think?) this is required for backwards compatibility.  I added TODOs to remove this in 1.10.
pull/6/head
Kubernetes Submit Queue 2017-09-05 08:47:25 -07:00 committed by GitHub
commit 4692555e72
8 changed files with 7 additions and 514 deletions

View File

@ -74,8 +74,6 @@ go_test(
"//pkg/securitycontext:go_default_library",
"//pkg/util/labels:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -1180,6 +1180,7 @@ func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *ext
Effect: v1.TaintEffectNoSchedule,
})
// TODO(#48843) OutOfDisk taints will be removed in 1.10
if utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) &&
kubelettypes.IsCriticalPod(newPod) {
v1helper.AddOrUpdateTolerationInPod(newPod, &v1.Toleration{
@ -1221,7 +1222,7 @@ func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *ext
// summary. Returned booleans are:
// * wantToRun:
// Returns true when a user would expect a pod to run on this node and ignores conditions
// such as OutOfDisk or insufficient resource that would cause a daemonset pod not to schedule.
// such as DiskPressure or insufficient resource that would cause a daemonset pod not to schedule.
// This is primarily used to populate daemonset status.
// * shouldSchedule:
// Returns true when a daemonset should be scheduled to a node if a daemonset pod is not already
@ -1257,11 +1258,6 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten
var emitEvent bool
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
switch reason {
case predicates.ErrNodeOutOfDisk:
// the kubelet will evict this pod if it needs to. Let kubelet
// decide whether to continue running this pod so leave shouldContinueRunning
// set to true
shouldSchedule = false
// intentional
case
predicates.ErrNodeSelectorNotMatch,
@ -1344,9 +1340,6 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit
fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo)
} else {
fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo)
ncFit, ncReasons := NodeConditionPredicates(nodeInfo)
fit = ncFit && fit
reasons = append(reasons, ncReasons...)
}
if err != nil {
return false, predicateFails, err
@ -1358,26 +1351,6 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit
return len(predicateFails) == 0, predicateFails, nil
}
func NodeConditionPredicates(nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason) {
reasons := []algorithm.PredicateFailureReason{}
// If TaintNodesByCondition feature was enabled, account PodToleratesNodeTaints predicates.
if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
return true, nil
}
for _, c := range nodeInfo.Node().Status.Conditions {
// TODO: There are other node status that the DaemonSet should ideally respect too,
// e.g. MemoryPressure, and DiskPressure
if c.Type == v1.NodeOutOfDisk && c.Status == v1.ConditionTrue {
reasons = append(reasons, predicates.ErrNodeOutOfDisk)
break
}
}
return len(reasons) == 0, reasons
}
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
type byCreationTimestamp []*extensions.DaemonSet

View File

@ -47,8 +47,6 @@ import (
"k8s.io/kubernetes/pkg/securitycontext"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
var (
@ -503,20 +501,6 @@ func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) {
}
}
// DaemonSets should not place onto OutOfDisk nodes
func TestOutOfDiskNodeDaemonDoesNotLaunchPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
node := newNode("not-enough-disk", nil)
node.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}
manager.nodeStore.Add(node)
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0, 0)
}
}
func resourcePodSpec(nodeName, memory, cpu string) v1.PodSpec {
return v1.PodSpec{
NodeName: nodeName,
@ -1267,30 +1251,8 @@ func setDaemonSetToleration(ds *extensions.DaemonSet, tolerations []v1.Toleratio
ds.Spec.Template.Spec.Tolerations = tolerations
}
// DaemonSet should launch a critical pod even when the node is OutOfDisk.
func TestOutOfDiskNodeDaemonLaunchesCriticalPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("critical")
ds.Spec.UpdateStrategy = *strategy
setDaemonSetCritical(ds)
manager, podControl, _ := newTestController(ds)
node := newNode("not-enough-disk", nil)
node.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}
manager.nodeStore.Add(node)
// Without enabling critical pod annotation feature gate, we shouldn't create critical pod
utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=False")
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0, 0)
// Enabling critical pod annotation feature gate should create critical pod
utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=True")
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0, 0)
}
}
// DaemonSet should launch a critical pod even when the node with OutOfDisk taints.
// TODO(#48843) OutOfDisk taints will be removed in 1.10
func TestTaintOutOfDiskNodeDaemonLaunchesCriticalPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("critical")
@ -1454,23 +1416,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
shouldSchedule: true,
shouldContinueRunning: true,
},
{
ds: &extensions.DaemonSet{
Spec: extensions.DaemonSetSpec{
Selector: &metav1.LabelSelector{MatchLabels: simpleDaemonSetLabel},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: simpleDaemonSetLabel,
},
Spec: resourcePodSpec("", "50M", "0.5"),
},
},
},
nodeCondition: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
wantToRun: true,
shouldSchedule: false,
shouldContinueRunning: true,
},
{
ds: &extensions.DaemonSet{
Spec: extensions.DaemonSetSpec{
@ -1612,41 +1557,6 @@ func TestUpdateNode(t *testing.T) {
ds: newDaemonSet("ds"),
shouldEnqueue: true,
},
{
test: "Node conditions changed",
oldNode: func() *v1.Node {
node := newNode("node1", nil)
node.Status.Conditions = []v1.NodeCondition{
{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue},
}
return node
}(),
newNode: newNode("node1", nil),
ds: newDaemonSet("ds"),
shouldEnqueue: true,
},
{
test: "Node conditions not changed",
oldNode: func() *v1.Node {
node := newNode("node1", nil)
node.Status.Conditions = []v1.NodeCondition{
{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue},
{Type: v1.NodeMemoryPressure, Status: v1.ConditionFalse},
{Type: v1.NodeDiskPressure, Status: v1.ConditionFalse},
{Type: v1.NodeNetworkUnavailable, Status: v1.ConditionFalse},
}
return node
}(),
newNode: func() *v1.Node {
node := newNode("node1", nil)
node.Status.Conditions = []v1.NodeCondition{
{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue},
}
return node
}(),
ds: newDaemonSet("ds"),
shouldEnqueue: false,
},
}
for _, c := range cases {
for _, strategy := range updateStrategies() {
@ -2205,57 +2115,3 @@ func getQueuedKeys(queue workqueue.RateLimitingInterface) []string {
sort.Strings(keys)
return keys
}
func TestPredicates(t *testing.T) {
type args struct {
pod *v1.Pod
node *v1.Node
}
tests := []struct {
name string
args args
want bool
wantRes []algorithm.PredicateFailureReason
wantErr bool
}{
{
name: "retrun OutOfDiskErr if outOfDisk",
args: args{
pod: newPod("pod1-", "node-0", nil, nil),
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue},
},
Allocatable: v1.ResourceList{
v1.ResourcePods: resource.MustParse("100"),
},
},
},
},
want: false,
wantRes: []algorithm.PredicateFailureReason{predicates.ErrNodeOutOfDisk},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nodeInfo := schedulercache.NewNodeInfo(tt.args.pod)
nodeInfo.SetNode(tt.args.node)
got, res, err := Predicates(tt.args.pod, nodeInfo)
if (err != nil) != tt.wantErr {
t.Errorf("%s (error): error = %v, wantErr %v", tt.name, err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("%s (fit): got = %v, want %v", tt.name, got, tt.want)
}
if !reflect.DeepEqual(res, tt.wantRes) {
t.Errorf("%s (reasons): got = %v, want %v", tt.name, res, tt.wantRes)
}
})
}
}

View File

@ -71,6 +71,7 @@ func CreatePodTemplate(template v1.PodTemplateSpec, generation int64, hash strin
Effect: v1.TaintEffectNoSchedule,
})
// TODO(#48843) OutOfDisk taints will be removed in 1.10
if utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) &&
kubelettypes.IsCritical(newTemplate.Namespace, newTemplate.Annotations) {
v1helper.AddOrUpdateTolerationInPodSpec(&newTemplate.Spec, &v1.Toleration{

View File

@ -985,7 +985,6 @@ func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.Node
// remaining node conditions should also be set to Unknown
remainingNodeConditionTypes := []v1.NodeConditionType{
v1.NodeOutOfDisk,
v1.NodeMemoryPressure,
v1.NodeDiskPressure,
// We don't change 'NodeNetworkUnavailable' condition, as it's managed on a control plane level.

View File

@ -1455,14 +1455,6 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: fakeNow,
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionUnknown,
Reason: "NodeStatusNeverUpdated",
Message: "Kubelet never posted node status.",
LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: fakeNow,
},
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionUnknown,
@ -1520,13 +1512,6 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionFalse,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
@ -1551,13 +1536,6 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionFalse,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
@ -1580,14 +1558,6 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Time{Time: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC).Add(time.Hour)},
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionUnknown,
Reason: "NodeStatusUnknown",
Message: "Kubelet stopped posting node status.",
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Time{Time: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC).Add(time.Hour)},
},
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionUnknown,
@ -1767,13 +1737,6 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionFalse,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
@ -1800,13 +1763,6 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionFalse,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
@ -1837,13 +1793,6 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionFalse,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
@ -1870,13 +1819,6 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionFalse,
// Node status hasn't been updated for 1hr.
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),

View File

@ -1,9 +1,4 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
@ -12,9 +7,9 @@ go_library(
"framework.go",
"kubelet.go",
"kubelet_perf.go",
"nodeoutofdisk.go",
"security_context.go",
],
visibility = ["//visibility:public"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
@ -22,13 +17,10 @@ go_library(
"//test/e2e/framework:go_default_library",
"//test/utils:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
@ -47,4 +39,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,269 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"encoding/json"
"fmt"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
mb = 1024 * 1024
gb = 1024 * mb
// TODO(madhusudancs): find a way to query kubelet's disk space manager to obtain this value. 256MB
// is the default that is set today. This test might break if the default value changes. This value
// can be configured by setting the "low-diskspace-threshold-mb" flag while starting a kubelet.
// However, kubelets are started as part of the cluster start up, once, before any e2e test is run,
// and remain unchanged until all the tests are run and the cluster is brought down. Changing the
// flag value affects all the e2e tests. So we are hard-coding this value for now.
lowDiskSpaceThreshold uint64 = 256 * mb
nodeOODTimeOut = 5 * time.Minute
numNodeOODPods = 3
)
// Plan:
// 1. Fill disk space on all nodes except one. One node is left out so that we can schedule pods
// on that node. Arbitrarily choose that node to be node with index 0. This makes this a disruptive test.
// 2. Get the CPU capacity on unfilled node.
// 3. Divide the available CPU into one less than the number of pods we want to schedule. We want
// to schedule 3 pods, so divide CPU capacity by 2.
// 4. Request the divided CPU for each pod.
// 5. Observe that 2 of the pods schedule onto the node whose disk is not full, and the remaining
// pod stays pending and does not schedule onto the nodes whose disks are full nor the node
// with the other two pods, since there is not enough free CPU capacity there.
// 6. Recover disk space from one of the nodes whose disk space was previously filled. Arbritrarily
// choose that node to be node with index 1.
// 7. Observe that the pod in pending status schedules on that node.
//
// Flaky issue #20015. We have no clear path for how to test this functionality in a non-flaky way.
var _ = SIGDescribe("NodeOutOfDisk [Serial] [Flaky] [Disruptive]", func() {
var c clientset.Interface
var unfilledNodeName, recoveredNodeName string
f := framework.NewDefaultFramework("node-outofdisk")
BeforeEach(func() {
c = f.ClientSet
framework.Skipf("test is broken. #40249")
nodelist := framework.GetReadySchedulableNodesOrDie(c)
// Skip this test on small clusters. No need to fail since it is not a use
// case that any cluster of small size needs to support.
framework.SkipUnlessNodeCountIsAtLeast(2)
unfilledNodeName = nodelist.Items[0].Name
for _, node := range nodelist.Items[1:] {
fillDiskSpace(c, &node)
}
})
AfterEach(func() {
nodelist := framework.GetReadySchedulableNodesOrDie(c)
Expect(len(nodelist.Items)).ToNot(BeZero())
for _, node := range nodelist.Items {
if unfilledNodeName == node.Name || recoveredNodeName == node.Name {
continue
}
recoverDiskSpace(c, &node)
}
})
It("runs out of disk space", func() {
unfilledNode, err := c.Core().Nodes().Get(unfilledNodeName, metav1.GetOptions{})
framework.ExpectNoError(err)
By(fmt.Sprintf("Calculating CPU availability on node %s", unfilledNode.Name))
milliCpu, err := availCpu(c, unfilledNode)
framework.ExpectNoError(err)
// Per pod CPU should be just enough to fit only (numNodeOODPods - 1) pods on the given
// node. We compute this value by dividing the available CPU capacity on the node by
// (numNodeOODPods - 1) and subtracting ϵ from it. We arbitrarily choose ϵ to be 1%
// of the available CPU per pod, i.e. 0.01 * milliCpu/(numNodeOODPods-1). Instead of
// subtracting 1% from the value, we directly use 0.99 as the multiplier.
podCPU := int64(float64(milliCpu/(numNodeOODPods-1)) * 0.99)
ns := f.Namespace.Name
podClient := c.Core().Pods(ns)
By("Creating pods and waiting for all but one pods to be scheduled")
for i := 0; i < numNodeOODPods-1; i++ {
name := fmt.Sprintf("pod-node-outofdisk-%d", i)
createOutOfDiskPod(c, ns, name, podCPU)
framework.ExpectNoError(f.WaitForPodRunning(name))
pod, err := podClient.Get(name, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(pod.Spec.NodeName).To(Equal(unfilledNodeName))
}
pendingPodName := fmt.Sprintf("pod-node-outofdisk-%d", numNodeOODPods-1)
createOutOfDiskPod(c, ns, pendingPodName, podCPU)
By(fmt.Sprintf("Finding a failed scheduler event for pod %s", pendingPodName))
wait.Poll(2*time.Second, 5*time.Minute, func() (bool, error) {
selector := fields.Set{
"involvedObject.kind": "Pod",
"involvedObject.name": pendingPodName,
"involvedObject.namespace": ns,
"source": v1.DefaultSchedulerName,
"reason": "FailedScheduling",
}.AsSelector().String()
options := metav1.ListOptions{FieldSelector: selector}
schedEvents, err := c.Core().Events(ns).List(options)
framework.ExpectNoError(err)
if len(schedEvents.Items) > 0 {
return true, nil
} else {
return false, nil
}
})
nodelist := framework.GetReadySchedulableNodesOrDie(c)
Expect(len(nodelist.Items)).To(BeNumerically(">", 1))
nodeToRecover := nodelist.Items[1]
Expect(nodeToRecover.Name).ToNot(Equal(unfilledNodeName))
recoverDiskSpace(c, &nodeToRecover)
recoveredNodeName = nodeToRecover.Name
By(fmt.Sprintf("Verifying that pod %s schedules on node %s", pendingPodName, recoveredNodeName))
framework.ExpectNoError(f.WaitForPodRunning(pendingPodName))
pendingPod, err := podClient.Get(pendingPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(pendingPod.Spec.NodeName).To(Equal(recoveredNodeName))
})
})
// createOutOfDiskPod creates a pod in the given namespace with the requested amount of CPU.
func createOutOfDiskPod(c clientset.Interface, ns, name string, milliCPU int64) {
podClient := c.Core().Pods(ns)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "pause",
Image: framework.GetPauseImageName(c),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
// Request enough CPU to fit only two pods on a given node.
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
},
},
},
},
},
}
_, err := podClient.Create(pod)
framework.ExpectNoError(err)
}
// availCpu calculates the available CPU on a given node by subtracting the CPU requested by
// all the pods from the total available CPU capacity on the node.
func availCpu(c clientset.Interface, node *v1.Node) (int64, error) {
podClient := c.Core().Pods(metav1.NamespaceAll)
selector := fields.Set{"spec.nodeName": node.Name}.AsSelector().String()
options := metav1.ListOptions{FieldSelector: selector}
pods, err := podClient.List(options)
if err != nil {
return 0, fmt.Errorf("failed to retrieve all the pods on node %s: %v", node.Name, err)
}
avail := node.Status.Capacity.Cpu().MilliValue()
for _, pod := range pods.Items {
for _, cont := range pod.Spec.Containers {
avail -= cont.Resources.Requests.Cpu().MilliValue()
}
}
return avail, nil
}
// availSize returns the available disk space on a given node by querying node stats which
// is in turn obtained internally from cadvisor.
func availSize(c clientset.Interface, node *v1.Node) (uint64, error) {
statsResource := fmt.Sprintf("api/v1/proxy/nodes/%s/stats/", node.Name)
framework.Logf("Querying stats for node %s using url %s", node.Name, statsResource)
res, err := c.Core().RESTClient().Get().AbsPath(statsResource).Timeout(time.Minute).Do().Raw()
if err != nil {
return 0, fmt.Errorf("error querying cAdvisor API: %v", err)
}
ci := cadvisorapi.ContainerInfo{}
err = json.Unmarshal(res, &ci)
if err != nil {
return 0, fmt.Errorf("couldn't unmarshal container info: %v", err)
}
return ci.Stats[len(ci.Stats)-1].Filesystem[0].Available, nil
}
// fillDiskSpace fills the available disk space on a given node by creating a large file. The disk
// space on the node is filled in such a way that the available space after filling the disk is just
// below the lowDiskSpaceThreshold mark.
func fillDiskSpace(c clientset.Interface, node *v1.Node) {
avail, err := availSize(c, node)
framework.ExpectNoError(err, "Node %s: couldn't obtain available disk size %v", node.Name, err)
fillSize := (avail - lowDiskSpaceThreshold + (100 * mb))
framework.Logf("Node %s: disk space available %d bytes", node.Name, avail)
By(fmt.Sprintf("Node %s: creating a file of size %d bytes to fill the available disk space", node.Name, fillSize))
cmd := fmt.Sprintf("fallocate -l %d test.img", fillSize)
framework.ExpectNoError(framework.IssueSSHCommand(cmd, framework.TestContext.Provider, node))
ood := framework.WaitForNodeToBe(c, node.Name, v1.NodeOutOfDisk, true, nodeOODTimeOut)
Expect(ood).To(BeTrue(), "Node %s did not run out of disk within %v", node.Name, nodeOODTimeOut)
avail, err = availSize(c, node)
framework.Logf("Node %s: disk space available %d bytes", node.Name, avail)
Expect(avail < lowDiskSpaceThreshold).To(BeTrue())
}
// recoverDiskSpace recovers disk space, filled by creating a large file, on a given node.
func recoverDiskSpace(c clientset.Interface, node *v1.Node) {
By(fmt.Sprintf("Recovering disk space on node %s", node.Name))
cmd := "rm -f test.img"
framework.ExpectNoError(framework.IssueSSHCommand(cmd, framework.TestContext.Provider, node))
ood := framework.WaitForNodeToBe(c, node.Name, v1.NodeOutOfDisk, false, nodeOODTimeOut)
Expect(ood).To(BeTrue(), "Node %s's out of disk condition status did not change to false within %v", node.Name, nodeOODTimeOut)
}