out of resource killing (memory)

pull/6/head
derekwaynecarr 2016-05-12 23:35:18 -04:00
parent b8f1682695
commit edc76f6d4f
10 changed files with 1590 additions and 2 deletions

View File

@ -1889,6 +1889,8 @@ const (
// NodeOutOfDisk means the kubelet will not accept new pods due to insufficient free disk
// space on the node.
NodeOutOfDisk NodeConditionType = "OutOfDisk"
// NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory.
NodeMemoryPressure NodeConditionType = "MemoryPressure"
)
type NodeCondition struct {

View File

@ -2273,6 +2273,8 @@ const (
// NodeOutOfDisk means the kubelet will not accept new pods due to insufficient free disk
// space on the node.
NodeOutOfDisk NodeConditionType = "OutOfDisk"
// NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory.
NodeMemoryPressure NodeConditionType = "MemoryPressure"
)
// NodeCondition contains condition infromation for a node.

View File

@ -18,18 +18,41 @@ package eviction
import (
"fmt"
"sort"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
qosutil "k8s.io/kubernetes/pkg/kubelet/qos/util"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/quota/evaluator/core"
"k8s.io/kubernetes/pkg/util/sets"
)
const (
unsupportedEvictionSignal = "unsupported eviction signal %v"
// the reason reported back in status.
reason = "Evicted"
// the message associated with the reason.
message = "The node was low on compute resources."
// disk, in bytes. internal to this module, used to account for local disk usage.
resourceDisk api.ResourceName = "disk"
)
// resourceToRankFunc maps a resource to ranking function for that resource.
var resourceToRankFunc = map[api.ResourceName]rankFunc{
api.ResourceMemory: rankMemoryPressure,
}
// signalToNodeCondition maps a signal to the node condition to report if threshold is met.
var signalToNodeCondition = map[Signal]api.NodeConditionType{
SignalMemoryAvailable: api.NodeMemoryPressure,
}
// signalToResource maps a Signal to its associated Resource.
var signalToResource = map[Signal]api.ResourceName{
SignalMemoryAvailable: api.ResourceMemory,
@ -162,3 +185,373 @@ func parseGracePeriods(expr string) (map[Signal]time.Duration, error) {
}
return results, nil
}
// diskUsage converts used bytes into a resource quantity.
func diskUsage(fsStats *statsapi.FsStats) *resource.Quantity {
if fsStats == nil || fsStats.UsedBytes == nil {
return &resource.Quantity{Format: resource.BinarySI}
}
usage := int64(*fsStats.UsedBytes)
return resource.NewQuantity(usage, resource.BinarySI)
}
// memoryUsage converts working set into a resource quantity.
func memoryUsage(memStats *statsapi.MemoryStats) *resource.Quantity {
if memStats == nil || memStats.WorkingSetBytes == nil {
return &resource.Quantity{Format: resource.BinarySI}
}
usage := int64(*memStats.WorkingSetBytes)
return resource.NewQuantity(usage, resource.BinarySI)
}
// podUsage aggregates usage of compute resources.
// it supports the following memory and disk.
func podUsage(podStats statsapi.PodStats) (api.ResourceList, error) {
disk := resource.Quantity{Format: resource.BinarySI}
memory := resource.Quantity{Format: resource.BinarySI}
for _, container := range podStats.Containers {
// disk usage (if known)
// TODO: need to handle volumes
for _, fsStats := range []*statsapi.FsStats{container.Rootfs, container.Logs} {
if err := disk.Add(*diskUsage(fsStats)); err != nil {
return nil, err
}
}
// memory usage (if known)
if err := memory.Add(*memoryUsage(container.Memory)); err != nil {
return nil, err
}
}
return api.ResourceList{
api.ResourceMemory: memory,
resourceDisk: disk,
}, nil
}
// formatThreshold formats a threshold for logging.
func formatThreshold(threshold Threshold) string {
return fmt.Sprintf("threshold(signal=%v, operator=%v, value=%v, gracePeriod=%v)", threshold.Signal, threshold.Value.String(), threshold.Operator, threshold.GracePeriod)
}
// cachedStatsFunc returns a statsFunc based on the provided pod stats.
func cachedStatsFunc(podStats []statsapi.PodStats) statsFunc {
uid2PodStats := map[string]statsapi.PodStats{}
for i := range podStats {
uid2PodStats[podStats[i].PodRef.UID] = podStats[i]
}
return func(pod *api.Pod) (statsapi.PodStats, bool) {
stats, found := uid2PodStats[string(pod.UID)]
return stats, found
}
}
// Cmp compares p1 and p2 and returns:
//
// -1 if p1 < p2
// 0 if p1 == p2
// +1 if p1 > p2
//
type cmpFunc func(p1, p2 *api.Pod) int
// multiSorter implements the Sort interface, sorting changes within.
type multiSorter struct {
pods []*api.Pod
cmp []cmpFunc
}
// Sort sorts the argument slice according to the less functions passed to OrderedBy.
func (ms *multiSorter) Sort(pods []*api.Pod) {
ms.pods = pods
sort.Sort(ms)
}
// OrderedBy returns a Sorter that sorts using the cmp functions, in order.
// Call its Sort method to sort the data.
func orderedBy(cmp ...cmpFunc) *multiSorter {
return &multiSorter{
cmp: cmp,
}
}
// Len is part of sort.Interface.
func (ms *multiSorter) Len() int {
return len(ms.pods)
}
// Swap is part of sort.Interface.
func (ms *multiSorter) Swap(i, j int) {
ms.pods[i], ms.pods[j] = ms.pods[j], ms.pods[i]
}
// Less is part of sort.Interface.
func (ms *multiSorter) Less(i, j int) bool {
p1, p2 := ms.pods[i], ms.pods[j]
var k int
for k = 0; k < len(ms.cmp)-1; k++ {
cmpResult := ms.cmp[k](p1, p2)
// p1 is less than p2
if cmpResult < 0 {
return true
}
// p1 is greater than p2
if cmpResult > 0 {
return false
}
// we don't know yet
}
// the last cmp func is the final decider
return ms.cmp[k](p1, p2) < 0
}
// qos compares pods by QoS (BestEffort < Burstable < Guaranteed)
func qos(p1, p2 *api.Pod) int {
qosP1 := qosutil.GetPodQos(p1)
qosP2 := qosutil.GetPodQos(p2)
// its a tie
if qosP1 == qosP2 {
return 0
}
// if p1 is best effort, we know p2 is burstable or guaranteed
if qosP1 == qosutil.BestEffort {
return -1
}
// we know p1 and p2 are not besteffort, so if p1 is burstable, p2 must be guaranteed
if qosP1 == qosutil.Burstable {
if qosP2 == qosutil.Guaranteed {
return -1
}
return 1
}
// ok, p1 must be guaranteed.
return 1
}
// memory compares pods by largest consumer of memory relative to request.
func memory(stats statsFunc) cmpFunc {
return func(p1, p2 *api.Pod) int {
p1Stats, found := stats(p1)
// if we have no usage stats for p1, we want p2 first
if !found {
return -1
}
// if we have no usage stats for p2, but p1 has usage, we want p1 first.
p2Stats, found := stats(p2)
if !found {
return 1
}
// if we cant get usage for p1 measured, we want p2 first
p1Usage, err := podUsage(p1Stats)
if err != nil {
return -1
}
// if we cant get usage for p2 measured, we want p1 first
p2Usage, err := podUsage(p2Stats)
if err != nil {
return 1
}
// adjust p1, p2 usage relative to the request (if any)
p1Memory := p1Usage[api.ResourceMemory]
p1Spec := core.PodUsageFunc(p1)
p1Request := p1Spec[api.ResourceRequestsMemory]
p1Memory.Sub(p1Request)
p2Memory := p2Usage[api.ResourceMemory]
p2Spec := core.PodUsageFunc(p2)
p2Request := p2Spec[api.ResourceRequestsMemory]
p2Memory.Sub(p2Request)
// if p2 is using more than p1, we want p2 first
return p2Memory.Cmp(p1Memory)
}
}
// disk compares pods by largest consumer of disk relative to request.
func disk(stats statsFunc) cmpFunc {
return func(p1, p2 *api.Pod) int {
p1Stats, found := stats(p1)
// if we have no usage stats for p1, we want p2 first
if !found {
return -1
}
// if we have no usage stats for p2, but p1 has usage, we want p1 first.
p2Stats, found := stats(p2)
if !found {
return 1
}
// if we cant get usage for p1 measured, we want p2 first
p1Usage, err := podUsage(p1Stats)
if err != nil {
return -1
}
// if we cant get usage for p2 measured, we want p1 first
p2Usage, err := podUsage(p2Stats)
if err != nil {
return 1
}
// disk is best effort, so we don't measure relative to a request.
// TODO: add disk as a guaranteed resource
p1Disk := p1Usage[api.ResourceStorage]
p2Disk := p2Usage[api.ResourceStorage]
// if p2 is using more than p1, we want p2 first
return p2Disk.Cmp(p1Disk)
}
}
// rankMemoryPressure orders the input pods for eviction in response to memory pressure.
func rankMemoryPressure(pods []*api.Pod, stats statsFunc) {
orderedBy(qos, memory(stats)).Sort(pods)
}
// rankDiskPressure orders the input pods for eviction in response to disk pressure.
func rankDiskPressure(pods []*api.Pod, stats statsFunc) {
orderedBy(qos, disk(stats)).Sort(pods)
}
// byEvictionPriority implements sort.Interface for []api.ResourceName.
type byEvictionPriority []api.ResourceName
func (a byEvictionPriority) Len() int { return len(a) }
func (a byEvictionPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Less ranks memory before all other resources.
func (a byEvictionPriority) Less(i, j int) bool {
return a[i] == api.ResourceMemory
}
// makeSignalObservations derives observations using the specified summary provider.
func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObservations, statsFunc, error) {
summary, err := summaryProvider.Get()
if err != nil {
return nil, nil, err
}
// build the function to work against for pod stats
statsFunc := cachedStatsFunc(summary.Pods)
// build an evaluation context for current eviction signals
result := signalObservations{}
result[SignalMemoryAvailable] = *resource.NewQuantity(int64(*summary.Node.Memory.AvailableBytes), resource.BinarySI)
return result, statsFunc, nil
}
// thresholdsMet returns the set of thresholds that were met independent of grace period
func thresholdsMet(thresholds []Threshold, observations signalObservations) []Threshold {
results := []Threshold{}
for i := range thresholds {
threshold := thresholds[i]
observed, found := observations[threshold.Signal]
if !found {
glog.Warningf("eviction manager: no observation found for eviction signal %v", threshold.Signal)
continue
}
// determine if we have met the specified threshold
thresholdMet := false
thresholdResult := threshold.Value.Cmp(observed)
switch threshold.Operator {
case OpLessThan:
thresholdMet = thresholdResult > 0
}
if thresholdMet {
results = append(results, threshold)
}
}
return results
}
// thresholdsFirstObservedAt merges the input set of thresholds with the previous observation to determine when active set of thresholds were initially met.
func thresholdsFirstObservedAt(thresholds []Threshold, lastObservedAt thresholdsObservedAt, now time.Time) thresholdsObservedAt {
results := thresholdsObservedAt{}
for i := range thresholds {
observedAt, found := lastObservedAt[thresholds[i]]
if !found {
observedAt = now
}
results[thresholds[i]] = observedAt
}
return results
}
// thresholdsMetGracePeriod returns the set of thresholds that have satisfied associated grace period
func thresholdsMetGracePeriod(observedAt thresholdsObservedAt, now time.Time) []Threshold {
results := []Threshold{}
for threshold, at := range observedAt {
duration := now.Sub(at)
if duration < threshold.GracePeriod {
glog.V(2).Infof("eviction manager: eviction criteria not yet met for %v, duration: %v", formatThreshold(threshold), duration)
continue
}
results = append(results, threshold)
}
return results
}
// nodeConditions returns the set of node conditions associated with a threshold
func nodeConditions(thresholds []Threshold) []api.NodeConditionType {
results := []api.NodeConditionType{}
for _, threshold := range thresholds {
if nodeCondition, found := signalToNodeCondition[threshold.Signal]; found {
results = append(results, nodeCondition)
}
}
return results
}
// nodeConditionsLastObservedAt merges the input with the previous observation to determine when a condition was most recently met.
func nodeConditionsLastObservedAt(nodeConditions []api.NodeConditionType, lastObservedAt nodeConditionsObservedAt, now time.Time) nodeConditionsObservedAt {
results := nodeConditionsObservedAt{}
// the input conditions were observed "now"
for i := range nodeConditions {
results[nodeConditions[i]] = now
}
// the conditions that were not observed now are merged in with their old time
for key, value := range lastObservedAt {
_, found := results[key]
if !found {
results[key] = value
}
}
return results
}
// nodeConditionsObservedSince returns the set of conditions that have been observed within the specified period
func nodeConditionsObservedSince(observedAt nodeConditionsObservedAt, period time.Duration, now time.Time) []api.NodeConditionType {
results := []api.NodeConditionType{}
for nodeCondition, at := range observedAt {
duration := now.Sub(at)
if duration < period {
results = append(results, nodeCondition)
}
}
return results
}
// hasNodeCondition returns true if the node condition is in the input list
func hasNodeCondition(inputs []api.NodeConditionType, item api.NodeConditionType) bool {
for _, input := range inputs {
if input == item {
return true
}
}
return false
}
// hasThreshold returns true if the node condition is in the input list
func hasThreshold(inputs []Threshold, item Threshold) bool {
for _, input := range inputs {
if input.GracePeriod == item.GracePeriod && input.Operator == item.Operator && input.Signal == item.Signal && input.Value.Cmp(item.Value) == 0 {
return true
}
}
return false
}
// reclaimResources returns the set of resources that are starved based on thresholds met.
func reclaimResources(thresholds []Threshold) []api.ResourceName {
results := []api.ResourceName{}
for _, threshold := range thresholds {
if starvedResource, found := signalToResource[threshold.Signal]; found {
results = append(results, starvedResource)
}
}
return results
}

View File

@ -17,10 +17,17 @@ limitations under the License.
package eviction
import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/types"
)
func TestParseThresholdConfig(t *testing.T) {
@ -140,3 +147,574 @@ func thresholdEqual(a Threshold, b Threshold) bool {
a.Signal == b.Signal &&
a.Value.Cmp(b.Value) == 0
}
// TestOrderedByQoS ensures we order BestEffort < Burstable < Guaranteed
func TestOrderedByQoS(t *testing.T) {
bestEffort := newPod("best-effort", []api.Container{
newContainer("best-effort", newResourceList("", ""), newResourceList("", "")),
})
burstable := newPod("burstable", []api.Container{
newContainer("burstable", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi")),
})
guaranteed := newPod("guaranteed", []api.Container{
newContainer("guaranteed", newResourceList("200m", "200Mi"), newResourceList("200m", "200Mi")),
})
pods := []*api.Pod{guaranteed, burstable, bestEffort}
orderedBy(qos).Sort(pods)
expected := []*api.Pod{bestEffort, burstable, guaranteed}
for i := range expected {
if pods[i] != expected[i] {
t.Errorf("Expected pod: %s, but got: %s", expected[i].Name, pods[i].Name)
}
}
}
// TestOrderedByMemory ensures we order pods by greediest memory consumer relative to request.
func TestOrderedByMemory(t *testing.T) {
pod1 := newPod("best-effort-high", []api.Container{
newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")),
})
pod2 := newPod("best-effort-low", []api.Container{
newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")),
})
pod3 := newPod("burstable-high", []api.Container{
newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
})
pod4 := newPod("burstable-low", []api.Container{
newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
})
pod5 := newPod("guaranteed-high", []api.Container{
newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
})
pod6 := newPod("guaranteed-low", []api.Container{
newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
})
stats := map[*api.Pod]statsapi.PodStats{
pod1: newPodMemoryStats(pod1, resource.MustParse("500Mi")), // 500 relative to request
pod2: newPodMemoryStats(pod2, resource.MustParse("300Mi")), // 300 relative to request
pod3: newPodMemoryStats(pod3, resource.MustParse("800Mi")), // 700 relative to request
pod4: newPodMemoryStats(pod4, resource.MustParse("300Mi")), // 200 relative to request
pod5: newPodMemoryStats(pod5, resource.MustParse("800Mi")), // -200 relative to request
pod6: newPodMemoryStats(pod6, resource.MustParse("200Mi")), // -800 relative to request
}
statsFn := func(pod *api.Pod) (statsapi.PodStats, bool) {
result, found := stats[pod]
return result, found
}
pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6}
orderedBy(memory(statsFn)).Sort(pods)
expected := []*api.Pod{pod3, pod1, pod2, pod4, pod5, pod6}
for i := range expected {
if pods[i] != expected[i] {
t.Errorf("Expected pod[%d]: %s, but got: %s", i, expected[i].Name, pods[i].Name)
}
}
}
// TestOrderedByQoSMemory ensures we order by qos and then memory consumption relative to request.
func TestOrderedByQoSMemory(t *testing.T) {
pod1 := newPod("best-effort-high", []api.Container{
newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")),
})
pod2 := newPod("best-effort-low", []api.Container{
newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")),
})
pod3 := newPod("burstable-high", []api.Container{
newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
})
pod4 := newPod("burstable-low", []api.Container{
newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")),
})
pod5 := newPod("guaranteed-high", []api.Container{
newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
})
pod6 := newPod("guaranteed-low", []api.Container{
newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")),
})
stats := map[*api.Pod]statsapi.PodStats{
pod1: newPodMemoryStats(pod1, resource.MustParse("500Mi")), // 500 relative to request
pod2: newPodMemoryStats(pod2, resource.MustParse("50Mi")), // 50 relative to request
pod3: newPodMemoryStats(pod3, resource.MustParse("50Mi")), // -50 relative to request
pod4: newPodMemoryStats(pod4, resource.MustParse("300Mi")), // 200 relative to request
pod5: newPodMemoryStats(pod5, resource.MustParse("800Mi")), // -200 relative to request
pod6: newPodMemoryStats(pod6, resource.MustParse("200Mi")), // -800 relative to request
}
statsFn := func(pod *api.Pod) (statsapi.PodStats, bool) {
result, found := stats[pod]
return result, found
}
pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6}
expected := []*api.Pod{pod1, pod2, pod4, pod3, pod5, pod6}
orderedBy(qos, memory(statsFn)).Sort(pods)
for i := range expected {
if pods[i] != expected[i] {
t.Errorf("Expected pod[%d]: %s, but got: %s", i, expected[i].Name, pods[i].Name)
}
}
}
type fakeSummaryProvider struct {
result *statsapi.Summary
}
func (f *fakeSummaryProvider) Get() (*statsapi.Summary, error) {
return f.result, nil
}
// newPodStats returns a pod stat where each container is using the specified working set
// each pod must have a Name, UID, Namespace
func newPodStats(pod *api.Pod, containerWorkingSetBytes int64) statsapi.PodStats {
result := statsapi.PodStats{
PodRef: statsapi.PodReference{
Name: pod.Name,
Namespace: pod.Namespace,
UID: string(pod.UID),
},
}
val := uint64(containerWorkingSetBytes)
for range pod.Spec.Containers {
result.Containers = append(result.Containers, statsapi.ContainerStats{
Memory: &statsapi.MemoryStats{
WorkingSetBytes: &val,
},
})
}
return result
}
func TestMakeSignalObservations(t *testing.T) {
podMaker := func(name, namespace, uid string, numContainers int) *api.Pod {
pod := &api.Pod{}
pod.Name = name
pod.Namespace = namespace
pod.UID = types.UID(uid)
pod.Spec = api.PodSpec{}
for i := 0; i < numContainers; i++ {
pod.Spec.Containers = append(pod.Spec.Containers, api.Container{
Name: fmt.Sprintf("ctr%v", i),
})
}
return pod
}
nodeAvailableBytes := uint64(1024 * 1024 * 1024)
fakeStats := &statsapi.Summary{
Node: statsapi.NodeStats{
Memory: &statsapi.MemoryStats{
AvailableBytes: &nodeAvailableBytes,
},
},
Pods: []statsapi.PodStats{},
}
provider := &fakeSummaryProvider{
result: fakeStats,
}
pods := []*api.Pod{
podMaker("pod1", "ns1", "uuid1", 1),
podMaker("pod1", "ns2", "uuid2", 1),
podMaker("pod3", "ns3", "uuid3", 1),
}
containerWorkingSetBytes := int64(1024 * 1024)
for _, pod := range pods {
fakeStats.Pods = append(fakeStats.Pods, newPodStats(pod, containerWorkingSetBytes))
}
actualObservations, statsFunc, err := makeSignalObservations(provider)
if err != nil {
t.Errorf("Unexpected err: %v", err)
}
quantity, found := actualObservations[SignalMemoryAvailable]
if !found {
t.Errorf("Expected available memory observation: %v", err)
}
if expectedBytes := int64(nodeAvailableBytes); quantity.Value() != expectedBytes {
t.Errorf("Expected %v, actual: %v", expectedBytes, quantity.Value())
}
for _, pod := range pods {
podStats, found := statsFunc(pod)
if !found {
t.Errorf("Pod stats were not found for pod %v", pod.UID)
}
for _, container := range podStats.Containers {
actual := int64(*container.Memory.WorkingSetBytes)
if containerWorkingSetBytes != actual {
t.Errorf("Container working set expected %v, actual: %v", containerWorkingSetBytes, actual)
}
}
}
}
func TestThresholdsMet(t *testing.T) {
hardThreshold := Threshold{
Signal: SignalMemoryAvailable,
Operator: OpLessThan,
Value: resource.MustParse("1Gi"),
}
testCases := map[string]struct {
thresholds []Threshold
observations signalObservations
result []Threshold
}{
"empty": {
thresholds: []Threshold{},
observations: signalObservations{},
result: []Threshold{},
},
"threshold-met": {
thresholds: []Threshold{hardThreshold},
observations: signalObservations{
SignalMemoryAvailable: resource.MustParse("500Mi"),
},
result: []Threshold{hardThreshold},
},
"threshold-not-met": {
thresholds: []Threshold{hardThreshold},
observations: signalObservations{
SignalMemoryAvailable: resource.MustParse("2Gi"),
},
result: []Threshold{},
},
}
for testName, testCase := range testCases {
actual := thresholdsMet(testCase.thresholds, testCase.observations)
if !thresholdList(actual).Equal(thresholdList(testCase.result)) {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual)
}
}
}
func TestThresholdsFirstObservedAt(t *testing.T) {
hardThreshold := Threshold{
Signal: SignalMemoryAvailable,
Operator: OpLessThan,
Value: resource.MustParse("1Gi"),
}
now := unversioned.Now()
oldTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute))
testCases := map[string]struct {
thresholds []Threshold
lastObservedAt thresholdsObservedAt
now time.Time
result thresholdsObservedAt
}{
"empty": {
thresholds: []Threshold{},
lastObservedAt: thresholdsObservedAt{},
now: now.Time,
result: thresholdsObservedAt{},
},
"no-previous-observation": {
thresholds: []Threshold{hardThreshold},
lastObservedAt: thresholdsObservedAt{},
now: now.Time,
result: thresholdsObservedAt{
hardThreshold: now.Time,
},
},
"previous-observation": {
thresholds: []Threshold{hardThreshold},
lastObservedAt: thresholdsObservedAt{
hardThreshold: oldTime.Time,
},
now: now.Time,
result: thresholdsObservedAt{
hardThreshold: oldTime.Time,
},
},
}
for testName, testCase := range testCases {
actual := thresholdsFirstObservedAt(testCase.thresholds, testCase.lastObservedAt, testCase.now)
if !reflect.DeepEqual(actual, testCase.result) {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual)
}
}
}
func TestThresholdsMetGracePeriod(t *testing.T) {
now := unversioned.Now()
hardThreshold := Threshold{
Signal: SignalMemoryAvailable,
Operator: OpLessThan,
Value: resource.MustParse("1Gi"),
}
softThreshold := Threshold{
Signal: SignalMemoryAvailable,
Operator: OpLessThan,
Value: resource.MustParse("2Gi"),
GracePeriod: 1 * time.Minute,
}
oldTime := unversioned.NewTime(now.Time.Add(-2 * time.Minute))
testCases := map[string]struct {
observedAt thresholdsObservedAt
now time.Time
result []Threshold
}{
"empty": {
observedAt: thresholdsObservedAt{},
now: now.Time,
result: []Threshold{},
},
"hard-threshold-met": {
observedAt: thresholdsObservedAt{
hardThreshold: now.Time,
},
now: now.Time,
result: []Threshold{hardThreshold},
},
"soft-threshold-not-met": {
observedAt: thresholdsObservedAt{
softThreshold: now.Time,
},
now: now.Time,
result: []Threshold{},
},
"soft-threshold-met": {
observedAt: thresholdsObservedAt{
softThreshold: oldTime.Time,
},
now: now.Time,
result: []Threshold{softThreshold},
},
}
for testName, testCase := range testCases {
actual := thresholdsMetGracePeriod(testCase.observedAt, now.Time)
if !thresholdList(actual).Equal(thresholdList(testCase.result)) {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual)
}
}
}
func TestNodeConditions(t *testing.T) {
testCases := map[string]struct {
inputs []Threshold
result []api.NodeConditionType
}{
"empty-list": {
inputs: []Threshold{},
result: []api.NodeConditionType{},
},
"memory.available": {
inputs: []Threshold{
{Signal: SignalMemoryAvailable},
},
result: []api.NodeConditionType{api.NodeMemoryPressure},
},
}
for testName, testCase := range testCases {
actual := nodeConditions(testCase.inputs)
if !nodeConditionList(actual).Equal(nodeConditionList(testCase.result)) {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual)
}
}
}
func TestNodeConditionsLastObservedAt(t *testing.T) {
now := unversioned.Now()
oldTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute))
testCases := map[string]struct {
nodeConditions []api.NodeConditionType
lastObservedAt nodeConditionsObservedAt
now time.Time
result nodeConditionsObservedAt
}{
"no-previous-observation": {
nodeConditions: []api.NodeConditionType{api.NodeMemoryPressure},
lastObservedAt: nodeConditionsObservedAt{},
now: now.Time,
result: nodeConditionsObservedAt{
api.NodeMemoryPressure: now.Time,
},
},
"previous-observation": {
nodeConditions: []api.NodeConditionType{api.NodeMemoryPressure},
lastObservedAt: nodeConditionsObservedAt{
api.NodeMemoryPressure: oldTime.Time,
},
now: now.Time,
result: nodeConditionsObservedAt{
api.NodeMemoryPressure: now.Time,
},
},
"old-observation": {
nodeConditions: []api.NodeConditionType{},
lastObservedAt: nodeConditionsObservedAt{
api.NodeMemoryPressure: oldTime.Time,
},
now: now.Time,
result: nodeConditionsObservedAt{
api.NodeMemoryPressure: oldTime.Time,
},
},
}
for testName, testCase := range testCases {
actual := nodeConditionsLastObservedAt(testCase.nodeConditions, testCase.lastObservedAt, testCase.now)
if !reflect.DeepEqual(actual, testCase.result) {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual)
}
}
}
func TestNodeConditionsObservedSince(t *testing.T) {
now := unversioned.Now()
observedTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute))
testCases := map[string]struct {
observedAt nodeConditionsObservedAt
period time.Duration
now time.Time
result []api.NodeConditionType
}{
"in-period": {
observedAt: nodeConditionsObservedAt{
api.NodeMemoryPressure: observedTime.Time,
},
period: 2 * time.Minute,
now: now.Time,
result: []api.NodeConditionType{api.NodeMemoryPressure},
},
"out-of-period": {
observedAt: nodeConditionsObservedAt{
api.NodeMemoryPressure: observedTime.Time,
},
period: 30 * time.Second,
now: now.Time,
result: []api.NodeConditionType{},
},
}
for testName, testCase := range testCases {
actual := nodeConditionsObservedSince(testCase.observedAt, testCase.period, testCase.now)
if !nodeConditionList(actual).Equal(nodeConditionList(testCase.result)) {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual)
}
}
}
func TestHasNodeConditions(t *testing.T) {
testCases := map[string]struct {
inputs []api.NodeConditionType
item api.NodeConditionType
result bool
}{
"has-condition": {
inputs: []api.NodeConditionType{api.NodeReady, api.NodeOutOfDisk, api.NodeMemoryPressure},
item: api.NodeMemoryPressure,
result: true,
},
"does-not-have-condition": {
inputs: []api.NodeConditionType{api.NodeReady, api.NodeOutOfDisk},
item: api.NodeMemoryPressure,
result: false,
},
}
for testName, testCase := range testCases {
if actual := hasNodeCondition(testCase.inputs, testCase.item); actual != testCase.result {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual)
}
}
}
func TestReclaimResources(t *testing.T) {
testCases := map[string]struct {
inputs []Threshold
result []api.ResourceName
}{
"memory.available": {
inputs: []Threshold{
{Signal: SignalMemoryAvailable},
},
result: []api.ResourceName{api.ResourceMemory},
},
}
for testName, testCase := range testCases {
actual := reclaimResources(testCase.inputs)
actualSet := quota.ToSet(actual)
expectedSet := quota.ToSet(testCase.result)
if !actualSet.Equal(expectedSet) {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, expectedSet, actualSet)
}
}
}
func newPodMemoryStats(pod *api.Pod, workingSet resource.Quantity) statsapi.PodStats {
result := statsapi.PodStats{
PodRef: statsapi.PodReference{
Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID),
},
}
for range pod.Spec.Containers {
workingSetBytes := uint64(workingSet.Value())
result.Containers = append(result.Containers, statsapi.ContainerStats{
Memory: &statsapi.MemoryStats{
WorkingSetBytes: &workingSetBytes,
},
})
}
return result
}
func newResourceList(cpu, memory string) api.ResourceList {
res := api.ResourceList{}
if cpu != "" {
res[api.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[api.ResourceMemory] = resource.MustParse(memory)
}
return res
}
func newResourceRequirements(requests, limits api.ResourceList) api.ResourceRequirements {
res := api.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
func newContainer(name string, requests api.ResourceList, limits api.ResourceList) api.Container {
return api.Container{
Name: name,
Resources: newResourceRequirements(requests, limits),
}
}
func newPod(name string, containers []api.Container) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.PodSpec{
Containers: containers,
},
}
}
// nodeConditionList is a simple alias to support equality checking independent of order
type nodeConditionList []api.NodeConditionType
// Equal adds the ability to check equality between two lists of node conditions.
func (s1 nodeConditionList) Equal(s2 nodeConditionList) bool {
if len(s1) != len(s2) {
return false
}
for _, item := range s1 {
if !hasNodeCondition(s2, item) {
return false
}
}
return true
}
// thresholdList is a simple alias to support equality checking independent of order
type thresholdList []Threshold
// Equal adds the ability to check equality between two lists of node conditions.
func (s1 thresholdList) Equal(s2 thresholdList) bool {
if len(s1) != len(s2) {
return false
}
for _, item := range s1 {
if !hasThreshold(s2, item) {
return false
}
}
return true
}

View File

@ -0,0 +1,215 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
import (
"sort"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
qosutil "k8s.io/kubernetes/pkg/kubelet/qos/util"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
// managerImpl implements NodeStabilityManager
type managerImpl struct {
// used to track time
clock util.Clock
// config is how the manager is configured
config Config
// the function to invoke to kill a pod
killPodFunc KillPodFunc
// protects access to internal state
sync.RWMutex
// node conditions are the set of conditions present
nodeConditions []api.NodeConditionType
// captures when a node condition was last observed based on a threshold being met
nodeConditionsLastObservedAt nodeConditionsObservedAt
// nodeRef is a reference to the node
nodeRef *api.ObjectReference
// used to record events about the node
recorder record.EventRecorder
// used to measure usage stats on system
summaryProvider stats.SummaryProvider
// records when a threshold was first observed
thresholdsFirstObservedAt thresholdsObservedAt
}
// ensure it implements the required interface
var _ Manager = &managerImpl{}
// NewManager returns a configured Manager and an associated admission handler to enforce eviction configuration.
func NewManager(
summaryProvider stats.SummaryProvider,
config Config,
killPodFunc KillPodFunc,
recorder record.EventRecorder,
nodeRef *api.ObjectReference,
clock util.Clock) (Manager, lifecycle.PodAdmitHandler, error) {
manager := &managerImpl{
clock: clock,
killPodFunc: killPodFunc,
config: config,
recorder: recorder,
summaryProvider: summaryProvider,
nodeRef: nodeRef,
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
thresholdsFirstObservedAt: thresholdsObservedAt{},
}
return manager, manager, nil
}
// Admit rejects a pod if its not safe to admit for node stability.
func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
m.RLock()
defer m.RUnlock()
if len(m.nodeConditions) == 0 {
return lifecycle.PodAdmitResult{Admit: true}
}
notBestEffort := qosutil.BestEffort != qosutil.GetPodQos(attrs.Pod)
if notBestEffort {
return lifecycle.PodAdmitResult{Admit: true}
}
glog.Warningf("Failed to admit pod %v - %s", format.Pod(attrs.Pod), "node has conditions: %v", m.nodeConditions)
// we reject all best effort pods until we are stable.
return lifecycle.PodAdmitResult{
Admit: false,
Reason: reason,
Message: message,
}
}
// Start starts the control loop to observe and response to low compute resources.
func (m *managerImpl) Start(podFunc ActivePodsFunc, monitoringInterval time.Duration) {
go wait.Until(func() { m.synchronize(podFunc) }, monitoringInterval, wait.NeverStop)
}
// IsUnderMemoryPressure returns true if the node is under memory pressure.
func (m *managerImpl) IsUnderMemoryPressure() bool {
m.RLock()
defer m.RUnlock()
return hasNodeCondition(m.nodeConditions, api.NodeMemoryPressure)
}
// synchronize is the main control loop that enforces eviction thresholds.
func (m *managerImpl) synchronize(podFunc ActivePodsFunc) {
// if we have nothing to do, just return
thresholds := m.config.Thresholds
if len(thresholds) == 0 {
return
}
// make observations and get a function to derive pod usage stats relative to those observations.
observations, statsFunc, err := makeSignalObservations(m.summaryProvider)
if err != nil {
glog.Errorf("eviction manager: unexpected err: %v", err)
return
}
// find the list of thresholds that are met independent of grace period
now := m.clock.Now()
// determine the set of thresholds met independent of grace period
thresholds = thresholdsMet(thresholds, observations)
// track when a threshold was first observed
thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)
// the set of node conditions that are triggered by currently observed thresholds
nodeConditions := nodeConditions(thresholds)
// track when a node condition was last observed
nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now)
// node conditions report true if it has been observed within the transition period window
nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now)
// determine the set of thresholds we need to drive eviction behavior (i.e. all grace periods are met)
thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now)
// update internal state
m.Lock()
m.nodeConditions = nodeConditions
m.thresholdsFirstObservedAt = thresholdsFirstObservedAt
m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt
m.Unlock()
// determine the set of resources under starvation
starvedResources := reclaimResources(thresholds)
if len(starvedResources) == 0 {
glog.Infof("eviction manager: no resources are starved")
return
}
// rank the resources to reclaim by eviction priority
sort.Sort(byEvictionPriority(starvedResources))
resourceToReclaim := starvedResources[0]
glog.Warningf("eviction manager: attempting to reclaim %v", resourceToReclaim)
// record an event about the resources we are now attempting to reclaim via eviction
m.recorder.Eventf(m.nodeRef, api.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim)
// rank the pods for eviction
rank, ok := resourceToRankFunc[resourceToReclaim]
if !ok {
glog.Errorf("eviction manager: no ranking function for resource %s", resourceToReclaim)
return
}
// the only candidates viable for eviction are those pods that had anything running.
activePods := podFunc()
if len(activePods) == 0 {
glog.Errorf("eviction manager: eviction thresholds have been met, but no pods are active to evict")
return
}
// rank the running pods for eviction for the specified resource
rank(activePods, statsFunc)
glog.Infof("eviction manager: pods ranked for eviction: %s", format.Pods(activePods))
// we kill at most a single pod during each eviction interval
for i := range activePods {
pod := activePods[i]
status := api.PodStatus{
Phase: api.PodFailed,
Message: message,
Reason: reason,
}
// record that we are evicting the pod
m.recorder.Eventf(pod, api.EventTypeWarning, reason, message)
// TODO this needs to be based on soft or hard eviction threshold being met, soft eviction will allow a configured value.
gracePeriodOverride := int64(0)
// this is a blocking call and should only return when the pod and its containers are killed.
err := m.killPodFunc(pod, status, &gracePeriodOverride)
if err != nil {
glog.Infof("eviction manager: pod %s failed to evict %v", format.Pod(pod), err)
continue
}
// success, so we return until the next housekeeping interval
glog.Infof("eviction manager: pod %s evicted successfully", format.Pod(pod))
return
}
glog.Infof("eviction manager: unable to evict any pods from the node")
}

View File

@ -0,0 +1,212 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eviction
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/record"
statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
// mockPodKiller is used to testing which pod is killed
type mockPodKiller struct {
pod *api.Pod
status api.PodStatus
gracePeriodOverride *int64
}
// killPodNow records the pod that was killed
func (m *mockPodKiller) killPodNow(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error {
m.pod = pod
m.status = status
m.gracePeriodOverride = gracePeriodOverride
return nil
}
// TestMemoryPressure
func TestMemoryPressure(t *testing.T) {
podMaker := func(name string, requests api.ResourceList, limits api.ResourceList, memoryWorkingSet string) (*api.Pod, statsapi.PodStats) {
pod := newPod(name, []api.Container{
newContainer(name, requests, api.ResourceList{}),
})
podStats := newPodMemoryStats(pod, resource.MustParse(memoryWorkingSet))
return pod, podStats
}
summaryStatsMaker := func(nodeAvailableBytes string, podStats map[*api.Pod]statsapi.PodStats) *statsapi.Summary {
val := resource.MustParse(nodeAvailableBytes)
availableBytes := uint64(val.Value())
result := &statsapi.Summary{
Node: statsapi.NodeStats{
Memory: &statsapi.MemoryStats{
AvailableBytes: &availableBytes,
},
},
Pods: []statsapi.PodStats{},
}
for _, podStat := range podStats {
result.Pods = append(result.Pods, podStat)
}
return result
}
podsToMake := []struct {
name string
requests api.ResourceList
limits api.ResourceList
memoryWorkingSet string
}{
{name: "best-effort-high", requests: newResourceList("", ""), limits: newResourceList("", ""), memoryWorkingSet: "500Mi"},
{name: "best-effort-low", requests: newResourceList("", ""), limits: newResourceList("", ""), memoryWorkingSet: "300Mi"},
{name: "burstable-high", requests: newResourceList("100m", "100Mi"), limits: newResourceList("200m", "1Gi"), memoryWorkingSet: "800Mi"},
{name: "burstable-low", requests: newResourceList("100m", "100Mi"), limits: newResourceList("200m", "1Gi"), memoryWorkingSet: "300Mi"},
{name: "guaranteed-high", requests: newResourceList("100m", "1Gi"), limits: newResourceList("100m", "1Gi"), memoryWorkingSet: "800Mi"},
{name: "guaranteed-low", requests: newResourceList("100m", "1Gi"), limits: newResourceList("100m", "1Gi"), memoryWorkingSet: "200Mi"},
}
pods := []*api.Pod{}
podStats := map[*api.Pod]statsapi.PodStats{}
for _, podToMake := range podsToMake {
pod, podStat := podMaker(podToMake.name, podToMake.requests, podToMake.limits, podToMake.memoryWorkingSet)
pods = append(pods, pod)
podStats[pod] = podStat
}
activePodsFunc := func() []*api.Pod {
return pods
}
fakeClock := util.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
nodeRef := &api.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
config := Config{
PressureTransitionPeriod: time.Minute * 5,
Thresholds: []Threshold{
{
Signal: SignalMemoryAvailable,
Operator: OpLessThan,
Value: resource.MustParse("1Gi"),
},
},
}
summaryProvider := &fakeSummaryProvider{result: summaryStatsMaker("2Gi", podStats)}
manager := &managerImpl{
clock: fakeClock,
killPodFunc: podKiller.killPodNow,
config: config,
recorder: &record.FakeRecorder{},
summaryProvider: summaryProvider,
nodeRef: nodeRef,
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
thresholdsFirstObservedAt: thresholdsObservedAt{},
}
// create a best effort pod to test admission
bestEffortPodToAdmit, _ := podMaker("best-admit", newResourceList("", ""), newResourceList("", ""), "0Gi")
burstablePodToAdmit, _ := podMaker("burst-admit", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi"), "0Gi")
// synchronize
manager.synchronize(activePodsFunc)
// we should not have memory pressure
if manager.IsUnderMemoryPressure() {
t.Errorf("Manager should not report memory pressure")
}
// try to admit our pods (they should succeed)
expected := []bool{true, true}
for i, pod := range []*api.Pod{bestEffortPodToAdmit, burstablePodToAdmit} {
if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: pod}); expected[i] != result.Admit {
t.Errorf("Admit pod: %v, expected: %v, actual: %v", pod, expected[i], result.Admit)
}
}
// induce memory pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("500Mi", podStats)
manager.synchronize(activePodsFunc)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
t.Errorf("Manager should report memory pressure")
}
// check the right pod was killed
if podKiller.pod != pods[0] {
t.Errorf("Manager chose to kill pod: %v, but should have chosen %v", podKiller.pod, pods[0])
}
// the best-effort pod should not admit, burstable should
expected = []bool{false, true}
for i, pod := range []*api.Pod{bestEffortPodToAdmit, burstablePodToAdmit} {
if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: pod}); expected[i] != result.Admit {
t.Errorf("Admit pod: %v, expected: %v, actual: %v", pod, expected[i], result.Admit)
}
}
// reduce memory pressure
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(activePodsFunc)
// we should have memory pressure (because transition period not yet met)
if !manager.IsUnderMemoryPressure() {
t.Errorf("Manager should report memory pressure")
}
// no pod should have been killed
if podKiller.pod != nil {
t.Errorf("Manager chose to kill pod: %v when no pod should have been killed", podKiller.pod)
}
// the best-effort pod should not admit, burstable should
expected = []bool{false, true}
for i, pod := range []*api.Pod{bestEffortPodToAdmit, burstablePodToAdmit} {
if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: pod}); expected[i] != result.Admit {
t.Errorf("Admit pod: %v, expected: %v, actual: %v", pod, expected[i], result.Admit)
}
}
// move the clock past transition period to ensure that we stop reporting pressure
fakeClock.Step(5 * time.Minute)
summaryProvider.result = summaryStatsMaker("2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(activePodsFunc)
// we should not have memory pressure (because transition period met)
if manager.IsUnderMemoryPressure() {
t.Errorf("Manager should not report memory pressure")
}
// no pod should have been killed
if podKiller.pod != nil {
t.Errorf("Manager chose to kill pod: %v when no pod should have been killed", podKiller.pod)
}
// all pods should admit now
expected = []bool{true, true}
for i, pod := range []*api.Pod{bestEffortPodToAdmit, burstablePodToAdmit} {
if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: pod}); expected[i] != result.Admit {
t.Errorf("Admit pod: %v, expected: %v, actual: %v", pod, expected[i], result.Admit)
}
}
}

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
)
// Signal defines a signal that can trigger eviction of pods on a node.
@ -59,6 +60,15 @@ type Threshold struct {
GracePeriod time.Duration
}
// Manager evaluates when an eviction threshold for node stability has been met on the node.
type Manager interface {
// Start starts the control loop to monitor eviction thresholds at specified interval.
Start(podFunc ActivePodsFunc, monitoringInterval time.Duration)
// IsUnderMemoryPressure returns true if the node is under memory pressure.
IsUnderMemoryPressure() bool
}
// KillPodFunc kills a pod.
// The pod status is updated, and then it is killed with the specified grace period.
// This function must block until either the pod is killed or an error is encountered.
@ -67,3 +77,21 @@ type Threshold struct {
// status - the desired status to associate with the pod (i.e. why its killed)
// gracePeriodOverride - the grace period override to use instead of what is on the pod spec
type KillPodFunc func(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error
// ActivePodsFunc returns pods bound to the kubelet that are active (i.e. non-terminal state)
type ActivePodsFunc func() []*api.Pod
// statsFunc returns the usage stats if known for an input pod.
type statsFunc func(pod *api.Pod) (statsapi.PodStats, bool)
// rankFunc sorts the pods in eviction order
type rankFunc func(pods []*api.Pod, stats statsFunc)
// signalObservations maps a signal to an observed quantity
type signalObservations map[Signal]resource.Quantity
// thresholdsObservedAt maps a threshold to a time that it was observed
type thresholdsObservedAt map[Threshold]time.Time
// nodeConditionsObservedAt maps a node condition to a time that it was observed
type nodeConditionsObservedAt map[api.NodeConditionType]time.Time

View File

@ -117,6 +117,10 @@ const (
// Period for performing global cleanup tasks.
housekeepingPeriod = time.Second * 2
// Period for performing eviction monitoring.
// TODO ensure this is in sync with internal cadvisor housekeeping.
evictionMonitoringPeriod = time.Second * 10
// The path in containers' filesystems where the hosts file is mounted.
etcHostsPath = "/etc/hosts"
@ -494,6 +498,14 @@ func NewMainKubelet(
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
// setup eviction manager
evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), recorder, nodeRef, klet.clock)
if err != nil {
return nil, fmt.Errorf("failed to initialize eviction manager: %v", err)
}
klet.evictionManager = evictionManager
klet.AddPodAdmitHandler(evictionAdmitHandler)
// apply functional Option's
for _, opt := range kubeOptions {
opt(klet)
@ -566,6 +578,9 @@ type Kubelet struct {
// this Kubelet services.
podManager kubepod.Manager
// Needed to observe and respond to situations that could impact node stability
evictionManager eviction.Manager
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
containerRefManager *kubecontainer.RefManager
@ -959,12 +974,20 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
kl.evictionManager.Start(kl.getActivePods, evictionMonitoringPeriod)
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
// getActivePods returns non-terminal pods
func (kl *Kubelet) getActivePods() []*api.Pod {
allPods := kl.podManager.GetPods()
activePods := kl.filterOutTerminatedPods(allPods)
return activePods
}
// initialNodeStatus determines the initial node status, incorporating node
// labels and information from the cloud provider.
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
@ -3146,6 +3169,64 @@ func (kl *Kubelet) setNodeReadyCondition(node *api.Node) {
}
}
// setNodeMemoryPressureCondition for the node.
// TODO: this needs to move somewhere centralized...
func (kl *Kubelet) setNodeMemoryPressureCondition(node *api.Node) {
currentTime := unversioned.NewTime(kl.clock.Now())
var condition *api.NodeCondition
// Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeMemoryPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodeMemoryPressure condition doesn't exist, create one
if condition == nil {
condition = &api.NodeCondition{
Type: api.NodeMemoryPressure,
Status: api.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeMemoryPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to api.ConditionUnknown which matches either
// condition.Status != api.ConditionTrue or
// condition.Status != api.ConditionFalse in the conditions below depending on whether
// the kubelet is under memory pressure or not.
if kl.evictionManager.IsUnderMemoryPressure() {
if condition.Status != api.ConditionTrue {
condition.Status = api.ConditionTrue
condition.Reason = "KubeletHasInsufficientMemory"
condition.Message = "kubelet has insufficient memory available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasInsufficientMemory")
}
} else {
if condition.Status != api.ConditionFalse {
condition.Status = api.ConditionFalse
condition.Reason = "KubeletHasSufficientMemory"
condition.Message = "kubelet has sufficient memory available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientMemory")
}
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
}
// Set OODcondition for the node.
func (kl *Kubelet) setNodeOODCondition(node *api.Node) {
currentTime := unversioned.NewTime(kl.clock.Now())
@ -3251,6 +3332,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error {
kl.setNodeAddress,
withoutError(kl.setNodeStatusInfo),
withoutError(kl.setNodeOODCondition),
withoutError(kl.setNodeMemoryPressureCondition),
withoutError(kl.setNodeReadyCondition),
withoutError(kl.recordNodeSchedulableEvent),
}

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
@ -55,6 +56,7 @@ import (
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
@ -202,6 +204,24 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, util.RealClock{})
kubelet.clock = fakeClock
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
volumeStatsAggPeriod := time.Second * 10
kubelet.resourceAnalyzer = stats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod, kubelet.containerRuntime)
nodeRef := &api.ObjectReference{
Kind: "Node",
Name: kubelet.nodeName,
UID: types.UID(kubelet.nodeName),
Namespace: "",
}
// setup eviction manager
evictionManager, evictionAdmitHandler, err := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{}, killPodNow(kubelet.podWorkers), fakeRecorder, nodeRef, kubelet.clock)
if err != nil {
t.Fatalf("failed to initialize eviction manager: %v", err)
}
kubelet.evictionManager = evictionManager
kubelet.AddPodAdmitHandler(evictionAdmitHandler)
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil}
}
@ -2375,6 +2395,14 @@ func TestUpdateNewNodeStatus(t *testing.T) {
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
@ -2554,6 +2582,14 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
@ -2610,6 +2646,14 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
LastHeartbeatTime: unversioned.Time{}, // placeholder
LastTransitionTime: unversioned.Time{}, // placeholder
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
@ -2892,6 +2936,14 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{}, //placeholder
},
NodeInfo: api.NodeSystemInfo{
@ -2964,10 +3016,11 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady {
lastIndex := len(updatedNode.Status.Conditions) - 1
if updatedNode.Status.Conditions[lastIndex].Type != api.NodeReady {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
expectedNode.Status.Conditions[1] = api.NodeCondition{
expectedNode.Status.Conditions[lastIndex] = api.NodeCondition{
Type: api.NodeReady,
Status: status,
Reason: reason,

View File

@ -31,11 +31,14 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
)
@ -76,10 +79,30 @@ func TestRunOnce(t *testing.T) {
reasonCache: NewReasonCache(),
clock: util.RealClock{},
kubeClient: &fake.Clientset{},
hostname: testKubeletHostname,
nodeName: testKubeletHostname,
}
kb.containerManager = cm.NewStubContainerManager()
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone)
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
volumeStatsAggPeriod := time.Second * 10
kb.resourceAnalyzer = stats.NewResourceAnalyzer(kb, volumeStatsAggPeriod, kb.containerRuntime)
nodeRef := &api.ObjectReference{
Kind: "Node",
Name: kb.nodeName,
UID: types.UID(kb.nodeName),
Namespace: "",
}
fakeKillPodFunc := func(pod *api.Pod, podStatus api.PodStatus, gracePeriodOverride *int64) error {
return nil
}
evictionManager, evictionAdmitHandler, err := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, kb.recorder, nodeRef, kb.clock)
if err != nil {
t.Fatalf("failed to initialize eviction manager: %v", err)
}
kb.evictionManager = evictionManager
kb.AddPodAdmitHandler(evictionAdmitHandler)
if err := kb.setupDataDirs(); err != nil {
t.Errorf("Failed to init data dirs: %v", err)
}