Merge pull request #27199 from derekwaynecarr/disk_eviction

Automatic merge from submit-queue

Initial support for pod eviction based on disk

This PR adds the following:

1. node reports disk pressure condition based on configured thresholds
1. scheduler does not place pods on nodes reporting disk pressure
1. kubelet will not admit any pod when it reports disk pressure
1. kubelet ranks pods for eviction when low on disk
1. kubelet evicts greediest pod

Follow-on PRs will need to handle:

1. integrate with new image gc PR (https://github.com/kubernetes/kubernetes/pull/27199)
1. container gc policy should always run (will not be launched from eviction, tbd who does that)
  1. this means kill pod is fine for all eviction code paths since container gc will remove dead container
1. min reclaim support will just poll summary provider (derek will do follow-on)
1. need to know if imagefs is same device as rootfs from summary (derek follow-on)

/cc @vishh @kubernetes/sig-node
pull/6/head
k8s-merge-robot 2016-07-28 20:18:54 -07:00 committed by GitHub
commit 821ff657f9
14 changed files with 867 additions and 59 deletions

View File

@ -49,6 +49,7 @@ The purpose of filtering the nodes is to filter out the nodes that do not meet c
- `MaxEBSVolumeCount`: Ensure that the number of attached ElasticBlockStore volumes does not exceed a maximum value (by default, 39, since Amazon recommends a maximum of 40 with one of those 40 reserved for the root volume -- see [Amazon's documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html#linux-specific-volume-limits)). The maximum value can be controlled by setting the `KUBE_MAX_PD_VOLS` environment variable.
- `MaxGCEPDVolumeCount`: Ensure that the number of attached GCE PersistentDisk volumes does not exceed a maximum value (by default, 16, which is the maximum GCE allows -- see [GCE's documentation](https://cloud.google.com/compute/docs/disks/persistent-disks#limits_for_predefined_machine_types)). The maximum value can be controlled by setting the `KUBE_MAX_PD_VOLS` environment variable.
- `CheckNodeMemoryPressure`: Check if a pod can be scheduled on a node reporting memory pressure condition. Currently, no ``BestEffort`` should be placed on a node under memory pressure as it gets automatically evicted by kubelet.
- `CheckNodeDiskPressure`: Check if a pod can be scheduled on a node reporting disk pressure condition. Currently, no pods should be placed on a node under disk pressure as it gets automatically evicted by kubelet.
The details of the above predicates can be found in [plugin/pkg/scheduler/algorithm/predicates/predicates.go](http://releases.k8s.io/HEAD/plugin/pkg/scheduler/algorithm/predicates/predicates.go). All predicates mentioned above can be used in combination to perform a sophisticated filtering policy. Kubernetes uses some, but not all, of these predicates by default. You can see which ones are used by default in [plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go](http://releases.k8s.io/HEAD/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go).

View File

@ -2069,6 +2069,8 @@ const (
NodeOutOfDisk NodeConditionType = "OutOfDisk"
// NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory.
NodeMemoryPressure NodeConditionType = "MemoryPressure"
// NodeDiskPressure means the kubelet is under pressure due to insufficient available disk.
NodeDiskPressure NodeConditionType = "DiskPressure"
// NodeNetworkUnavailable means that network for the node is not correctly configured.
NodeNetworkUnavailable NodeConditionType = "NetworkUnavailable"
)

View File

@ -2470,6 +2470,8 @@ const (
NodeOutOfDisk NodeConditionType = "OutOfDisk"
// NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory.
NodeMemoryPressure NodeConditionType = "MemoryPressure"
// NodeDiskPressure means the kubelet is under pressure due to insufficient available disk.
NodeDiskPressure NodeConditionType = "DiskPressure"
// NodeNetworkUnavailable means that network for the node is not correctly configured.
NodeNetworkUnavailable NodeConditionType = "NetworkUnavailable"
)

View File

@ -54,6 +54,8 @@ type managerImpl struct {
summaryProvider stats.SummaryProvider
// records when a threshold was first observed
thresholdsFirstObservedAt thresholdsObservedAt
// resourceToRankFunc maps a resource to ranking function for that resource.
resourceToRankFunc map[api.ResourceName]rankFunc
}
// ensure it implements the required interface
@ -87,12 +89,17 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd
if len(m.nodeConditions) == 0 {
return lifecycle.PodAdmitResult{Admit: true}
}
notBestEffort := qos.BestEffort != qos.GetPodQOS(attrs.Pod)
if notBestEffort {
return lifecycle.PodAdmitResult{Admit: true}
// the node has memory pressure, admit if not best-effort
if hasNodeCondition(m.nodeConditions, api.NodeMemoryPressure) {
notBestEffort := qos.BestEffort != qos.GetPodQOS(attrs.Pod)
if notBestEffort {
return lifecycle.PodAdmitResult{Admit: true}
}
}
// reject pods when under memory pressure (if pod is best effort), or if under disk pressure.
glog.Warningf("Failed to admit pod %v - %s", format.Pod(attrs.Pod), "node has conditions: %v", m.nodeConditions)
// we reject all best effort pods until we are stable.
return lifecycle.PodAdmitResult{
Admit: false,
Reason: reason,
@ -102,7 +109,8 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd
// Start starts the control loop to observe and response to low compute resources.
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, monitoringInterval time.Duration) error {
go wait.Until(func() { m.synchronize(podFunc) }, monitoringInterval, wait.NeverStop)
// start the eviction manager monitoring
go wait.Until(func() { m.synchronize(diskInfoProvider, podFunc) }, monitoringInterval, wait.NeverStop)
return nil
}
@ -113,14 +121,32 @@ func (m *managerImpl) IsUnderMemoryPressure() bool {
return hasNodeCondition(m.nodeConditions, api.NodeMemoryPressure)
}
// IsUnderDiskPressure returns true if the node is under disk pressure.
func (m *managerImpl) IsUnderDiskPressure() bool {
m.RLock()
defer m.RUnlock()
return hasNodeCondition(m.nodeConditions, api.NodeDiskPressure)
}
// synchronize is the main control loop that enforces eviction thresholds.
func (m *managerImpl) synchronize(podFunc ActivePodsFunc) {
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) {
// if we have nothing to do, just return
thresholds := m.config.Thresholds
if len(thresholds) == 0 {
return
}
// build the ranking functions (if not yet known)
// TODO: have a function in cadvisor that lets us know if global housekeeping has completed
if len(m.resourceToRankFunc) == 0 {
// this may error if cadvisor has yet to complete housekeeping, so we will just try again in next pass.
hasDedicatedImageFs, err := diskInfoProvider.HasDedicatedImageFs()
if err != nil {
return
}
m.resourceToRankFunc = buildResourceToRankFunc(hasDedicatedImageFs)
}
// make observations and get a function to derive pod usage stats relative to those observations.
observations, statsFunc, err := makeSignalObservations(m.summaryProvider)
if err != nil {
@ -175,7 +201,7 @@ func (m *managerImpl) synchronize(podFunc ActivePodsFunc) {
m.recorder.Eventf(m.nodeRef, api.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim)
// rank the pods for eviction
rank, ok := resourceToRankFunc[resourceToReclaim]
rank, ok := m.resourceToRankFunc[resourceToReclaim]
if !ok {
glog.Errorf("eviction manager: no ranking function for resource %s", resourceToReclaim)
return

View File

@ -44,12 +44,22 @@ func (m *mockPodKiller) killPodNow(pod *api.Pod, status api.PodStatus, gracePeri
return nil
}
// mockDiskInfoProvider is used to simulate testing.
type mockDiskInfoProvider struct {
dedicatedImageFs bool
}
// HasDedicatedImageFs returns the mocked value
func (m *mockDiskInfoProvider) HasDedicatedImageFs() (bool, error) {
return m.dedicatedImageFs, nil
}
// TestMemoryPressure
func TestMemoryPressure(t *testing.T) {
podMaker := func(name string, requests api.ResourceList, limits api.ResourceList, memoryWorkingSet string) (*api.Pod, statsapi.PodStats) {
pod := newPod(name, []api.Container{
newContainer(name, requests, api.ResourceList{}),
})
newContainer(name, requests, limits),
}, nil)
podStats := newPodMemoryStats(pod, resource.MustParse(memoryWorkingSet))
return pod, podStats
}
@ -95,6 +105,7 @@ func TestMemoryPressure(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeRef := &api.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
config := Config{
@ -131,7 +142,7 @@ func TestMemoryPressure(t *testing.T) {
burstablePodToAdmit, _ := podMaker("burst-admit", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi"), "0Gi")
// synchronize
manager.synchronize(activePodsFunc)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should not have memory pressure
if manager.IsUnderMemoryPressure() {
@ -149,7 +160,7 @@ func TestMemoryPressure(t *testing.T) {
// induce soft threshold
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("1500Mi", podStats)
manager.synchronize(activePodsFunc)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -164,7 +175,7 @@ func TestMemoryPressure(t *testing.T) {
// step forward in time pass the grace period
fakeClock.Step(3 * time.Minute)
summaryProvider.result = summaryStatsMaker("1500Mi", podStats)
manager.synchronize(activePodsFunc)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -189,7 +200,7 @@ func TestMemoryPressure(t *testing.T) {
// remove memory pressure
fakeClock.Step(20 * time.Minute)
summaryProvider.result = summaryStatsMaker("3Gi", podStats)
manager.synchronize(activePodsFunc)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should not have memory pressure
if manager.IsUnderMemoryPressure() {
@ -199,7 +210,7 @@ func TestMemoryPressure(t *testing.T) {
// induce memory pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("500Mi", podStats)
manager.synchronize(activePodsFunc)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -227,7 +238,7 @@ func TestMemoryPressure(t *testing.T) {
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(activePodsFunc)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should have memory pressure (because transition period not yet met)
if !manager.IsUnderMemoryPressure() {
@ -251,7 +262,7 @@ func TestMemoryPressure(t *testing.T) {
fakeClock.Step(5 * time.Minute)
summaryProvider.result = summaryStatsMaker("2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(activePodsFunc)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should not have memory pressure (because transition period met)
if manager.IsUnderMemoryPressure() {
@ -271,3 +282,235 @@ func TestMemoryPressure(t *testing.T) {
}
}
}
// parseQuantity parses the specified value (if provided) otherwise returns 0 value
func parseQuantity(value string) resource.Quantity {
if len(value) == 0 {
return resource.MustParse("0")
}
return resource.MustParse(value)
}
func TestDiskPressureNodeFs(t *testing.T) {
podMaker := func(name string, requests api.ResourceList, limits api.ResourceList, rootFsUsed, logsUsed, perLocalVolumeUsed string) (*api.Pod, statsapi.PodStats) {
pod := newPod(name, []api.Container{
newContainer(name, requests, limits),
}, nil)
podStats := newPodDiskStats(pod, parseQuantity(rootFsUsed), parseQuantity(logsUsed), parseQuantity(perLocalVolumeUsed))
return pod, podStats
}
summaryStatsMaker := func(rootFsAvailableBytes, imageFsAvailableBytes string, podStats map[*api.Pod]statsapi.PodStats) *statsapi.Summary {
rootFsVal := resource.MustParse(rootFsAvailableBytes)
rootFsBytes := uint64(rootFsVal.Value())
imageFsVal := resource.MustParse(imageFsAvailableBytes)
imageFsBytes := uint64(imageFsVal.Value())
result := &statsapi.Summary{
Node: statsapi.NodeStats{
Fs: &statsapi.FsStats{
AvailableBytes: &rootFsBytes,
},
Runtime: &statsapi.RuntimeStats{
ImageFs: &statsapi.FsStats{
AvailableBytes: &imageFsBytes,
},
},
},
Pods: []statsapi.PodStats{},
}
for _, podStat := range podStats {
result.Pods = append(result.Pods, podStat)
}
return result
}
podsToMake := []struct {
name string
requests api.ResourceList
limits api.ResourceList
rootFsUsed string
logsFsUsed string
perLocalVolumeUsed string
}{
{name: "best-effort-high", requests: newResourceList("", ""), limits: newResourceList("", ""), rootFsUsed: "500Mi"},
{name: "best-effort-low", requests: newResourceList("", ""), limits: newResourceList("", ""), perLocalVolumeUsed: "300Mi"},
{name: "burstable-high", requests: newResourceList("100m", "100Mi"), limits: newResourceList("200m", "1Gi"), rootFsUsed: "800Mi"},
{name: "burstable-low", requests: newResourceList("100m", "100Mi"), limits: newResourceList("200m", "1Gi"), logsFsUsed: "300Mi"},
{name: "guaranteed-high", requests: newResourceList("100m", "1Gi"), limits: newResourceList("100m", "1Gi"), rootFsUsed: "800Mi"},
{name: "guaranteed-low", requests: newResourceList("100m", "1Gi"), limits: newResourceList("100m", "1Gi"), rootFsUsed: "200Mi"},
}
pods := []*api.Pod{}
podStats := map[*api.Pod]statsapi.PodStats{}
for _, podToMake := range podsToMake {
pod, podStat := podMaker(podToMake.name, podToMake.requests, podToMake.limits, podToMake.rootFsUsed, podToMake.logsFsUsed, podToMake.perLocalVolumeUsed)
pods = append(pods, pod)
podStats[pod] = podStat
}
activePodsFunc := func() []*api.Pod {
return pods
}
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeRef := &api.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
config := Config{
MaxPodGracePeriodSeconds: 5,
PressureTransitionPeriod: time.Minute * 5,
Thresholds: []Threshold{
{
Signal: SignalNodeFsAvailable,
Operator: OpLessThan,
Value: quantityMustParse("1Gi"),
},
{
Signal: SignalNodeFsAvailable,
Operator: OpLessThan,
Value: quantityMustParse("2Gi"),
GracePeriod: time.Minute * 2,
},
},
}
summaryProvider := &fakeSummaryProvider{result: summaryStatsMaker("16Gi", "200Gi", podStats)}
manager := &managerImpl{
clock: fakeClock,
killPodFunc: podKiller.killPodNow,
config: config,
recorder: &record.FakeRecorder{},
summaryProvider: summaryProvider,
nodeRef: nodeRef,
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
thresholdsFirstObservedAt: thresholdsObservedAt{},
}
// create a best effort pod to test admission
podToAdmit, _ := podMaker("pod-to-admit", newResourceList("", ""), newResourceList("", ""), "0Gi", "0Gi", "0Gi")
// synchronize
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should not have disk pressure
if manager.IsUnderDiskPressure() {
t.Errorf("Manager should not report disk pressure")
}
// try to admit our pod (should succeed)
if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: podToAdmit}); !result.Admit {
t.Errorf("Admit pod: %v, expected: %v, actual: %v", podToAdmit, true, result.Admit)
}
// induce soft threshold
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
t.Errorf("Manager should report disk pressure since soft threshold was met")
}
// verify no pod was yet killed because there has not yet been enough time passed.
if podKiller.pod != nil {
t.Errorf("Manager should not have killed a pod yet, but killed: %v", podKiller.pod)
}
// step forward in time pass the grace period
fakeClock.Step(3 * time.Minute)
summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
t.Errorf("Manager should report disk pressure since soft threshold was met")
}
// verify the right pod was killed with the right grace period.
if podKiller.pod != pods[0] {
t.Errorf("Manager chose to kill pod: %v, but should have chosen %v", podKiller.pod, pods[0])
}
if podKiller.gracePeriodOverride == nil {
t.Errorf("Manager chose to kill pod but should have had a grace period override.")
}
observedGracePeriod := *podKiller.gracePeriodOverride
if observedGracePeriod != manager.config.MaxPodGracePeriodSeconds {
t.Errorf("Manager chose to kill pod with incorrect grace period. Expected: %d, actual: %d", manager.config.MaxPodGracePeriodSeconds, observedGracePeriod)
}
// reset state
podKiller.pod = nil
podKiller.gracePeriodOverride = nil
// remove disk pressure
fakeClock.Step(20 * time.Minute)
summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should not have disk pressure
if manager.IsUnderDiskPressure() {
t.Errorf("Manager should not report disk pressure")
}
// induce disk pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("500Mi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
t.Errorf("Manager should report disk pressure")
}
// check the right pod was killed
if podKiller.pod != pods[0] {
t.Errorf("Manager chose to kill pod: %v, but should have chosen %v", podKiller.pod, pods[0])
}
observedGracePeriod = *podKiller.gracePeriodOverride
if observedGracePeriod != int64(0) {
t.Errorf("Manager chose to kill pod with incorrect grace period. Expected: %d, actual: %d", 0, observedGracePeriod)
}
// try to admit our pod (should fail)
if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: podToAdmit}); result.Admit {
t.Errorf("Admit pod: %v, expected: %v, actual: %v", podToAdmit, false, result.Admit)
}
// reduce disk pressure
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should have disk pressure (because transition period not yet met)
if !manager.IsUnderDiskPressure() {
t.Errorf("Manager should report disk pressure")
}
// no pod should have been killed
if podKiller.pod != nil {
t.Errorf("Manager chose to kill pod: %v when no pod should have been killed", podKiller.pod)
}
// try to admit our pod (should fail)
if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: podToAdmit}); result.Admit {
t.Errorf("Admit pod: %v, expected: %v, actual: %v", podToAdmit, false, result.Admit)
}
// move the clock past transition period to ensure that we stop reporting pressure
fakeClock.Step(5 * time.Minute)
summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc)
// we should not have disk pressure (because transition period met)
if manager.IsUnderDiskPressure() {
t.Errorf("Manager should not report disk pressure")
}
// no pod should have been killed
if podKiller.pod != nil {
t.Errorf("Manager chose to kill pod: %v when no pod should have been killed", podKiller.pod)
}
// try to admit our pod (should succeed)
if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: podToAdmit}); !result.Admit {
t.Errorf("Admit pod: %v, expected: %v, actual: %v", podToAdmit, true, result.Admit)
}
}

View File

@ -41,21 +41,24 @@ const (
message = "The node was low on compute resources."
// disk, in bytes. internal to this module, used to account for local disk usage.
resourceDisk api.ResourceName = "disk"
// imagefs, in bytes. internal to this module, used to account for local image filesystem usage.
resourceImageFs api.ResourceName = "imagefs"
// nodefs, in bytes. internal to this module, used to account for local node root filesystem usage.
resourceNodeFs api.ResourceName = "nodefs"
)
// resourceToRankFunc maps a resource to ranking function for that resource.
var resourceToRankFunc = map[api.ResourceName]rankFunc{
api.ResourceMemory: rankMemoryPressure,
}
// signalToNodeCondition maps a signal to the node condition to report if threshold is met.
var signalToNodeCondition = map[Signal]api.NodeConditionType{
SignalMemoryAvailable: api.NodeMemoryPressure,
SignalMemoryAvailable: api.NodeMemoryPressure,
SignalImageFsAvailable: api.NodeDiskPressure,
SignalNodeFsAvailable: api.NodeDiskPressure,
}
// signalToResource maps a Signal to its associated Resource.
var signalToResource = map[Signal]api.ResourceName{
SignalMemoryAvailable: api.ResourceMemory,
SignalMemoryAvailable: api.ResourceMemory,
SignalImageFsAvailable: resourceImageFs,
SignalNodeFsAvailable: resourceNodeFs,
}
// validSignal returns true if the signal is supported.
@ -160,7 +163,6 @@ func parseThresholdStatement(statement string) (Threshold, error) {
if quantity.Sign() < 0 {
return Threshold{}, fmt.Errorf("eviction threshold %v cannot be negative: %s", signal, &quantity)
}
return Threshold{
Signal: signal,
Operator: operator,
@ -252,14 +254,54 @@ func memoryUsage(memStats *statsapi.MemoryStats) *resource.Quantity {
return resource.NewQuantity(usage, resource.BinarySI)
}
// podUsage aggregates usage of compute resources.
// it supports the following memory and disk.
func podUsage(podStats statsapi.PodStats) (api.ResourceList, error) {
// localVolumeNames returns the set of volumes for the pod that are local
// TODO: sumamry API should report what volumes consume local storage rather than hard-code here.
func localVolumeNames(pod *api.Pod) []string {
result := []string{}
for _, volume := range pod.Spec.Volumes {
if volume.HostPath != nil ||
(volume.EmptyDir != nil && volume.EmptyDir.Medium != api.StorageMediumMemory) ||
volume.ConfigMap != nil ||
volume.GitRepo != nil {
result = append(result, volume.Name)
}
}
return result
}
// podDiskUsage aggregates pod disk usage for the specified stats to measure.
func podDiskUsage(podStats statsapi.PodStats, pod *api.Pod, statsToMeasure []fsStatsType) (api.ResourceList, error) {
disk := resource.Quantity{Format: resource.BinarySI}
for _, container := range podStats.Containers {
if hasFsStatsType(statsToMeasure, fsStatsRoot) {
disk.Add(*diskUsage(container.Rootfs))
}
if hasFsStatsType(statsToMeasure, fsStatsLogs) {
disk.Add(*diskUsage(container.Logs))
}
}
if hasFsStatsType(statsToMeasure, fsStatsLocalVolumeSource) {
volumeNames := localVolumeNames(pod)
for _, volumeName := range volumeNames {
for _, volumeStats := range podStats.VolumeStats {
if volumeStats.Name == volumeName {
disk.Add(*diskUsage(&volumeStats.FsStats))
break
}
}
}
}
return api.ResourceList{
resourceDisk: disk,
}, nil
}
// podMemoryUsage aggregates pod memory usage.
func podMemoryUsage(podStats statsapi.PodStats) (api.ResourceList, error) {
disk := resource.Quantity{Format: resource.BinarySI}
memory := resource.Quantity{Format: resource.BinarySI}
for _, container := range podStats.Containers {
// disk usage (if known)
// TODO: need to handle volumes
for _, fsStats := range []*statsapi.FsStats{container.Rootfs, container.Logs} {
disk.Add(*diskUsage(fsStats))
}
@ -384,12 +426,12 @@ func memory(stats statsFunc) cmpFunc {
return 1
}
// if we cant get usage for p1 measured, we want p2 first
p1Usage, err := podUsage(p1Stats)
p1Usage, err := podMemoryUsage(p1Stats)
if err != nil {
return -1
}
// if we cant get usage for p2 measured, we want p1 first
p2Usage, err := podUsage(p2Stats)
p2Usage, err := podMemoryUsage(p2Stats)
if err != nil {
return 1
}
@ -411,7 +453,7 @@ func memory(stats statsFunc) cmpFunc {
}
// disk compares pods by largest consumer of disk relative to request.
func disk(stats statsFunc) cmpFunc {
func disk(stats statsFunc, fsStatsToMeasure []fsStatsType) cmpFunc {
return func(p1, p2 *api.Pod) int {
p1Stats, found := stats(p1)
// if we have no usage stats for p1, we want p2 first
@ -424,20 +466,20 @@ func disk(stats statsFunc) cmpFunc {
return 1
}
// if we cant get usage for p1 measured, we want p2 first
p1Usage, err := podUsage(p1Stats)
p1Usage, err := podDiskUsage(p1Stats, p1, fsStatsToMeasure)
if err != nil {
return -1
}
// if we cant get usage for p2 measured, we want p1 first
p2Usage, err := podUsage(p2Stats)
p2Usage, err := podDiskUsage(p2Stats, p2, fsStatsToMeasure)
if err != nil {
return 1
}
// disk is best effort, so we don't measure relative to a request.
// TODO: add disk as a guaranteed resource
p1Disk := p1Usage[api.ResourceStorage]
p2Disk := p2Usage[api.ResourceStorage]
p1Disk := p1Usage[resourceDisk]
p2Disk := p2Usage[resourceDisk]
// if p2 is using more than p1, we want p2 first
return p2Disk.Cmp(p1Disk)
}
@ -448,9 +490,11 @@ func rankMemoryPressure(pods []*api.Pod, stats statsFunc) {
orderedBy(qosComparator, memory(stats)).Sort(pods)
}
// rankDiskPressure orders the input pods for eviction in response to disk pressure.
func rankDiskPressure(pods []*api.Pod, stats statsFunc) {
orderedBy(qosComparator, disk(stats)).Sort(pods)
// rankDiskPressureFunc returns a rankFunc that measures the specified fs stats.
func rankDiskPressureFunc(fsStatsToMeasure []fsStatsType) rankFunc {
return func(pods []*api.Pod, stats statsFunc) {
orderedBy(qosComparator, disk(stats, fsStatsToMeasure)).Sort(pods)
}
}
// byEvictionPriority implements sort.Interface for []api.ResourceName.
@ -474,7 +518,18 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObserv
statsFunc := cachedStatsFunc(summary.Pods)
// build an evaluation context for current eviction signals
result := signalObservations{}
result[SignalMemoryAvailable] = resource.NewQuantity(int64(*summary.Node.Memory.AvailableBytes), resource.BinarySI)
if memory := summary.Node.Memory; memory != nil && memory.AvailableBytes != nil {
result[SignalMemoryAvailable] = resource.NewQuantity(int64(*memory.AvailableBytes), resource.BinarySI)
}
if nodeFs := summary.Node.Fs; nodeFs != nil && nodeFs.AvailableBytes != nil {
result[SignalNodeFsAvailable] = resource.NewQuantity(int64(*nodeFs.AvailableBytes), resource.BinarySI)
}
if summary.Node.Runtime != nil {
if imageFs := summary.Node.Runtime.ImageFs; imageFs != nil && imageFs.AvailableBytes != nil {
result[SignalImageFsAvailable] = resource.NewQuantity(int64(*imageFs.AvailableBytes), resource.BinarySI)
}
}
return result, statsFunc, nil
}
@ -569,6 +624,16 @@ func nodeConditionsObservedSince(observedAt nodeConditionsObservedAt, period tim
return results
}
// hasFsStatsType returns true if the fsStat is in the input list
func hasFsStatsType(inputs []fsStatsType, item fsStatsType) bool {
for _, input := range inputs {
if input == item {
return true
}
}
return false
}
// hasNodeCondition returns true if the node condition is in the input list
func hasNodeCondition(inputs []api.NodeConditionType, item api.NodeConditionType) bool {
for _, input := range inputs {
@ -612,3 +677,21 @@ func isSoftEviction(thresholds []Threshold, starvedResource api.ResourceName) bo
}
return true
}
// buildResourceToRankFunc returns ranking functions associated with resources
func buildResourceToRankFunc(withImageFs bool) map[api.ResourceName]rankFunc {
resourceToRankFunc := map[api.ResourceName]rankFunc{
api.ResourceMemory: rankMemoryPressure,
}
// usage of an imagefs is optional
if withImageFs {
// with an imagefs, nodefs pod rank func for eviction only includes logs and local volumes
resourceToRankFunc[resourceNodeFs] = rankDiskPressureFunc([]fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource})
// with an imagefs, imagefs pod rank func for eviction only includes rootfs
resourceToRankFunc[resourceImageFs] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot})
} else {
// without an imagefs, nodefs pod rank func for eviction looks at all fs stats
resourceToRankFunc[resourceNodeFs] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})
}
return resourceToRankFunc
}

View File

@ -75,6 +75,41 @@ func TestParseThresholdConfig(t *testing.T) {
},
},
},
"disk flag values": {
evictionHard: "imagefs.available<150Mi,nodefs.available<100Mi",
evictionSoft: "imagefs.available<300Mi,nodefs.available<200Mi",
evictionSoftGracePeriod: "imagefs.available=30s,nodefs.available=30s",
evictionMinReclaim: "imagefs.available=2Gi,nodefs.available=1Gi",
expectErr: false,
expectThresholds: []Threshold{
{
Signal: SignalImageFsAvailable,
Operator: OpLessThan,
Value: quantityMustParse("150Mi"),
MinReclaim: quantityMustParse("2Gi"),
},
{
Signal: SignalNodeFsAvailable,
Operator: OpLessThan,
Value: quantityMustParse("100Mi"),
MinReclaim: quantityMustParse("1Gi"),
},
{
Signal: SignalImageFsAvailable,
Operator: OpLessThan,
Value: quantityMustParse("300Mi"),
GracePeriod: gracePeriod,
MinReclaim: quantityMustParse("2Gi"),
},
{
Signal: SignalNodeFsAvailable,
Operator: OpLessThan,
Value: quantityMustParse("200Mi"),
GracePeriod: gracePeriod,
MinReclaim: quantityMustParse("1Gi"),
},
},
},
"invalid-signal": {
evictionHard: "mem.available<150Mi",
evictionSoft: "",
@ -127,6 +162,7 @@ func TestParseThresholdConfig(t *testing.T) {
evictionHard: "",
evictionSoft: "memory.available<150Mi",
evictionSoftGracePeriod: "memory.available=-30s",
evictionMinReclaim: "",
expectErr: true,
expectThresholds: []Threshold{},
},
@ -199,13 +235,13 @@ func thresholdEqual(a Threshold, b Threshold) bool {
func TestOrderedByQoS(t *testing.T) {
bestEffort := newPod("best-effort", []api.Container{
newContainer("best-effort", newResourceList("", ""), newResourceList("", "")),
})
}, nil)
burstable := newPod("burstable", []api.Container{
newContainer("burstable", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi")),
})
}, nil)
guaranteed := newPod("guaranteed", []api.Container{
newContainer("guaranteed", newResourceList("200m", "200Mi"), newResourceList("200m", "200Mi")),
})
}, nil)
pods := []*api.Pod{guaranteed, burstable, bestEffort}
orderedBy(qosComparator).Sort(pods)
@ -218,26 +254,158 @@ func TestOrderedByQoS(t *testing.T) {
}
}
// TestOrderedByDisk ensures we order pods by greediest disk consumer
func TestOrderedByDisk(t *testing.T) {
pod1 := newPod("best-effort-high", []api.Container{
newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod2 := newPod("best-effort-low", []api.Container{
newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod3 := newPod("burstable-high", []api.Container{
newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod4 := newPod("burstable-low", []api.Container{
newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod5 := newPod("guaranteed-high", []api.Container{
newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod6 := newPod("guaranteed-low", []api.Container{
newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
stats := map[*api.Pod]statsapi.PodStats{
pod1: newPodDiskStats(pod1, resource.MustParse("50Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 200Mi
pod2: newPodDiskStats(pod2, resource.MustParse("100Mi"), resource.MustParse("150Mi"), resource.MustParse("50Mi")), // 300Mi
pod3: newPodDiskStats(pod3, resource.MustParse("200Mi"), resource.MustParse("150Mi"), resource.MustParse("50Mi")), // 400Mi
pod4: newPodDiskStats(pod4, resource.MustParse("300Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 450Mi
pod5: newPodDiskStats(pod5, resource.MustParse("400Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 550Mi
pod6: newPodDiskStats(pod6, resource.MustParse("500Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 650Mi
}
statsFn := func(pod *api.Pod) (statsapi.PodStats, bool) {
result, found := stats[pod]
return result, found
}
pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6}
orderedBy(disk(statsFn, []fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})).Sort(pods)
expected := []*api.Pod{pod6, pod5, pod4, pod3, pod2, pod1}
for i := range expected {
if pods[i] != expected[i] {
t.Errorf("Expected pod[%d]: %s, but got: %s", i, expected[i].Name, pods[i].Name)
}
}
}
// TestOrderedByQoSDisk ensures we order pods by qos and then greediest disk consumer
func TestOrderedByQoSDisk(t *testing.T) {
pod1 := newPod("best-effort-high", []api.Container{
newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod2 := newPod("best-effort-low", []api.Container{
newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod3 := newPod("burstable-high", []api.Container{
newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod4 := newPod("burstable-low", []api.Container{
newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod5 := newPod("guaranteed-high", []api.Container{
newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
pod6 := newPod("guaranteed-low", []api.Container{
newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
}, []api.Volume{
newVolume("local-volume", api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
}),
})
stats := map[*api.Pod]statsapi.PodStats{
pod1: newPodDiskStats(pod1, resource.MustParse("50Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 200Mi
pod2: newPodDiskStats(pod2, resource.MustParse("100Mi"), resource.MustParse("150Mi"), resource.MustParse("50Mi")), // 300Mi
pod3: newPodDiskStats(pod3, resource.MustParse("200Mi"), resource.MustParse("150Mi"), resource.MustParse("50Mi")), // 400Mi
pod4: newPodDiskStats(pod4, resource.MustParse("300Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 450Mi
pod5: newPodDiskStats(pod5, resource.MustParse("400Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 550Mi
pod6: newPodDiskStats(pod6, resource.MustParse("500Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 650Mi
}
statsFn := func(pod *api.Pod) (statsapi.PodStats, bool) {
result, found := stats[pod]
return result, found
}
pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6}
orderedBy(qosComparator, disk(statsFn, []fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})).Sort(pods)
expected := []*api.Pod{pod2, pod1, pod4, pod3, pod6, pod5}
for i := range expected {
if pods[i] != expected[i] {
t.Errorf("Expected pod[%d]: %s, but got: %s", i, expected[i].Name, pods[i].Name)
}
}
}
// TestOrderedByMemory ensures we order pods by greediest memory consumer relative to request.
func TestOrderedByMemory(t *testing.T) {
pod1 := newPod("best-effort-high", []api.Container{
newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")),
})
}, nil)
pod2 := newPod("best-effort-low", []api.Container{
newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")),
})
}, nil)
pod3 := newPod("burstable-high", []api.Container{
newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
})
}, nil)
pod4 := newPod("burstable-low", []api.Container{
newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
})
}, nil)
pod5 := newPod("guaranteed-high", []api.Container{
newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
})
}, nil)
pod6 := newPod("guaranteed-low", []api.Container{
newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
})
}, nil)
stats := map[*api.Pod]statsapi.PodStats{
pod1: newPodMemoryStats(pod1, resource.MustParse("500Mi")), // 500 relative to request
pod2: newPodMemoryStats(pod2, resource.MustParse("300Mi")), // 300 relative to request
@ -264,22 +432,22 @@ func TestOrderedByMemory(t *testing.T) {
func TestOrderedByQoSMemory(t *testing.T) {
pod1 := newPod("best-effort-high", []api.Container{
newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")),
})
}, nil)
pod2 := newPod("best-effort-low", []api.Container{
newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")),
})
}, nil)
pod3 := newPod("burstable-high", []api.Container{
newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
})
}, nil)
pod4 := newPod("burstable-low", []api.Container{
newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
})
}, nil)
pod5 := newPod("guaranteed-high", []api.Container{
newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
})
}, nil)
pod6 := newPod("guaranteed-low", []api.Container{
newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
})
}, nil)
stats := map[*api.Pod]statsapi.PodStats{
pod1: newPodMemoryStats(pod1, resource.MustParse("500Mi")), // 500 relative to request
pod2: newPodMemoryStats(pod2, resource.MustParse("50Mi")), // 50 relative to request
@ -346,11 +514,21 @@ func TestMakeSignalObservations(t *testing.T) {
return pod
}
nodeAvailableBytes := uint64(1024 * 1024 * 1024)
imageFsAvailableBytes := uint64(1024 * 1024)
nodeFsAvailableBytes := uint64(1024)
fakeStats := &statsapi.Summary{
Node: statsapi.NodeStats{
Memory: &statsapi.MemoryStats{
AvailableBytes: &nodeAvailableBytes,
},
Runtime: &statsapi.RuntimeStats{
ImageFs: &statsapi.FsStats{
AvailableBytes: &imageFsAvailableBytes,
},
},
Fs: &statsapi.FsStats{
AvailableBytes: &nodeFsAvailableBytes,
},
},
Pods: []statsapi.PodStats{},
}
@ -370,12 +548,26 @@ func TestMakeSignalObservations(t *testing.T) {
if err != nil {
t.Errorf("Unexpected err: %v", err)
}
quantity, found := actualObservations[SignalMemoryAvailable]
memQuantity, found := actualObservations[SignalMemoryAvailable]
if !found {
t.Errorf("Expected available memory observation: %v", err)
}
if expectedBytes := int64(nodeAvailableBytes); quantity.Value() != expectedBytes {
t.Errorf("Expected %v, actual: %v", expectedBytes, quantity.Value())
if expectedBytes := int64(nodeAvailableBytes); memQuantity.Value() != expectedBytes {
t.Errorf("Expected %v, actual: %v", expectedBytes, memQuantity.Value())
}
nodeFsQuantity, found := actualObservations[SignalNodeFsAvailable]
if !found {
t.Errorf("Expected available nodefs observation: %v", err)
}
if expectedBytes := int64(nodeFsAvailableBytes); nodeFsQuantity.Value() != expectedBytes {
t.Errorf("Expected %v, actual: %v", expectedBytes, nodeFsQuantity.Value())
}
imageFsQuantity, found := actualObservations[SignalImageFsAvailable]
if !found {
t.Errorf("Expected available imagefs observation: %v", err)
}
if expectedBytes := int64(imageFsAvailableBytes); imageFsQuantity.Value() != expectedBytes {
t.Errorf("Expected %v, actual: %v", expectedBytes, imageFsQuantity.Value())
}
for _, pod := range pods {
podStats, found := statsFunc(pod)
@ -670,6 +862,18 @@ func TestReclaimResources(t *testing.T) {
},
result: []api.ResourceName{api.ResourceMemory},
},
"imagefs.available": {
inputs: []Threshold{
{Signal: SignalImageFsAvailable},
},
result: []api.ResourceName{resourceImageFs},
},
"nodefs.available": {
inputs: []Threshold{
{Signal: SignalNodeFsAvailable},
},
result: []api.ResourceName{resourceNodeFs},
},
}
for testName, testCase := range testCases {
actual := reclaimResources(testCase.inputs)
@ -681,6 +885,40 @@ func TestReclaimResources(t *testing.T) {
}
}
// newPodDiskStats returns stats with specified usage amounts.
func newPodDiskStats(pod *api.Pod, rootFsUsed, logsUsed, perLocalVolumeUsed resource.Quantity) statsapi.PodStats {
result := statsapi.PodStats{
PodRef: statsapi.PodReference{
Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID),
},
}
rootFsUsedBytes := uint64(rootFsUsed.Value())
logsUsedBytes := uint64(logsUsed.Value())
for range pod.Spec.Containers {
result.Containers = append(result.Containers, statsapi.ContainerStats{
Rootfs: &statsapi.FsStats{
UsedBytes: &rootFsUsedBytes,
},
Logs: &statsapi.FsStats{
UsedBytes: &logsUsedBytes,
},
})
}
perLocalVolumeUsedBytes := uint64(perLocalVolumeUsed.Value())
for _, volumeName := range localVolumeNames(pod) {
result.VolumeStats = append(result.VolumeStats, statsapi.VolumeStats{
Name: volumeName,
FsStats: statsapi.FsStats{
UsedBytes: &perLocalVolumeUsedBytes,
},
})
}
return result
}
func newPodMemoryStats(pod *api.Pod, workingSet resource.Quantity) statsapi.PodStats {
result := statsapi.PodStats{
PodRef: statsapi.PodReference{
@ -723,13 +961,21 @@ func newContainer(name string, requests api.ResourceList, limits api.ResourceLis
}
}
func newPod(name string, containers []api.Container) *api.Pod {
func newVolume(name string, volumeSource api.VolumeSource) api.Volume {
return api.Volume{
Name: name,
VolumeSource: volumeSource,
}
}
func newPod(name string, containers []api.Container, volumes []api.Volume) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.PodSpec{
Containers: containers,
Volumes: volumes,
},
}
}

View File

@ -30,6 +30,22 @@ type Signal string
const (
// SignalMemoryAvailable is memory available (i.e. capacity - workingSet), in bytes.
SignalMemoryAvailable Signal = "memory.available"
// SignalNodeFsAvailable is amount of storage available on filesystem that kubelet uses for volumes, daemon logs, etc.
SignalNodeFsAvailable Signal = "nodefs.available"
// SignalImageFsAvailable is amount of storage available on filesystem that container runtime uses for for storing images and container writable layers.
SignalImageFsAvailable Signal = "imagefs.available"
)
// fsStatsType defines the types of filesystem stats to collect.
type fsStatsType string
const (
// fsStatsLocalVolumeSource identifies stats for pod local volume sources.
fsStatsLocalVolumeSource fsStatsType = "localVolumeSource"
// fsStatsLogs identifies stats for pod logs.
fsStatsLogs fsStatsType = "logs"
// fsStatsRoot identifies stats for pod container writable layers.
fsStatsRoot fsStatsType = "root"
)
// ThresholdOperator is the operator used to express a Threshold.
@ -71,6 +87,9 @@ type Manager interface {
// IsUnderMemoryPressure returns true if the node is under memory pressure.
IsUnderMemoryPressure() bool
// IsUnderDiskPressure returns true if the node is under disk pressure.
IsUnderDiskPressure() bool
}
// DiskInfoProvider is responsible for informing the manager how disk is configured.

View File

@ -587,6 +587,64 @@ func (kl *Kubelet) setNodeMemoryPressureCondition(node *api.Node) {
}
}
// setNodeDiskPressureCondition for the node.
// TODO: this needs to move somewhere centralized...
func (kl *Kubelet) setNodeDiskPressureCondition(node *api.Node) {
currentTime := unversioned.NewTime(kl.clock.Now())
var condition *api.NodeCondition
// Check if NodeDiskPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeDiskPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodeDiskPressure condition doesn't exist, create one
if condition == nil {
condition = &api.NodeCondition{
Type: api.NodeDiskPressure,
Status: api.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeDiskressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to api.ConditionUnknown which matches either
// condition.Status != api.ConditionTrue or
// condition.Status != api.ConditionFalse in the conditions below depending on whether
// the kubelet is under disk pressure or not.
if kl.evictionManager.IsUnderDiskPressure() {
if condition.Status != api.ConditionTrue {
condition.Status = api.ConditionTrue
condition.Reason = "KubeletHasDiskPressure"
condition.Message = "kubelet has disk pressure"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasDiskPressure")
}
} else {
if condition.Status != api.ConditionFalse {
condition.Status = api.ConditionFalse
condition.Reason = "KubeletHasNoDiskPressure"
condition.Message = "kubelet has no disk pressure"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasNoDiskPressure")
}
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
}
// Set OODcondition for the node.
func (kl *Kubelet) setNodeOODCondition(node *api.Node) {
currentTime := unversioned.NewTime(kl.clock.Now())
@ -700,6 +758,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error {
withoutError(kl.setNodeStatusInfo),
withoutError(kl.setNodeOODCondition),
withoutError(kl.setNodeMemoryPressureCondition),
withoutError(kl.setNodeDiskPressureCondition),
withoutError(kl.setNodeReadyCondition),
withoutError(kl.setNodeVolumesInUseStatus),
withoutError(kl.recordNodeSchedulableEvent),

View File

@ -133,6 +133,14 @@ func TestUpdateNewNodeStatus(t *testing.T) {
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeDiskPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasNoDiskPressure",
Message: fmt.Sprintf("kubelet has no disk pressure"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
@ -316,6 +324,14 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: api.NodeDiskPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
@ -380,6 +396,14 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeDiskPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
@ -489,7 +513,6 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
LastTransitionTime: unversioned.NewTime(clock.Now()),
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
@ -509,8 +532,13 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
NumCores: 2,
MemoryCapacity: 1024,
}
fsInfo := cadvisorapiv2.FsInfo{
Device: "123",
}
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
mockCadvisor.On("ImagesFsInfo").Return(fsInfo, nil)
mockCadvisor.On("RootFsInfo").Return(fsInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
@ -671,6 +699,14 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeDiskPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasNoDiskPressure",
Message: fmt.Sprintf("kubelet has no disk pressure"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{}, //placeholder
},
NodeInfo: api.NodeSystemInfo{

View File

@ -39,6 +39,7 @@ var (
ErrServiceAffinityViolated = newPredicateFailureError("CheckServiceAffinity")
ErrMaxVolumeCountExceeded = newPredicateFailureError("MaxVolumeCount")
ErrNodeUnderMemoryPressure = newPredicateFailureError("NodeUnderMemoryPressure")
ErrNodeUnderDiskPressure = newPredicateFailureError("NodeUnderDiskPressure")
// ErrFakePredicate is used for test only. The fake predicates returning false also returns error
// as ErrFakePredicate.
ErrFakePredicate = newPredicateFailureError("FakePredicateError")

View File

@ -1049,3 +1049,21 @@ func CheckNodeMemoryPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *
return true, nil
}
// CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node
// reporting disk pressure condition.
func CheckNodeDiskPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
}
// is node under presure?
for _, cond := range node.Status.Conditions {
if cond.Type == api.NodeDiskPressure && cond.Status == api.ConditionTrue {
return false, ErrNodeUnderDiskPressure
}
}
return true, nil
}

View File

@ -2906,3 +2906,72 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) {
}
}
}
func TestPodSchedulesOnNodeWithDiskPressureCondition(t *testing.T) {
pod := &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "container",
Image: "image",
ImagePullPolicy: "Always",
},
},
},
}
// specify a node with no disk pressure condition on
noPressureNode := &api.Node{
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: "Ready",
Status: "True",
},
},
},
}
// specify a node with pressure condition on
pressureNode := &api.Node{
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: "DiskPressure",
Status: "True",
},
},
},
}
tests := []struct {
pod *api.Pod
nodeInfo *schedulercache.NodeInfo
fits bool
name string
}{
{
pod: pod,
nodeInfo: makeEmptyNodeInfo(noPressureNode),
fits: true,
name: "pod schedulable on node without pressure condition on",
},
{
pod: pod,
nodeInfo: makeEmptyNodeInfo(pressureNode),
fits: false,
name: "pod not schedulable on node with pressure condition on",
},
}
for _, test := range tests {
fits, err := CheckNodeDiskPressurePredicate(test.pod, nil, test.nodeInfo)
if fits != test.fits {
t.Errorf("%s: expected %v got %v", test.name, test.fits, fits)
}
if err != nil && err != ErrNodeUnderDiskPressure {
t.Errorf("%s: unexpected error: %v", test.name, err)
}
}
}

View File

@ -151,6 +151,9 @@ func defaultPredicates() sets.String {
// Fit is determined by node memory pressure condition.
factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate),
// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
)
}