mirror of https://github.com/k3s-io/k3s
port setNodeMemoryPressureCondition to Setter abstraction, add test
parent
c33f321acd
commit
f057c9a4ae
|
@ -724,62 +724,6 @@ func (kl *Kubelet) setNodeReadyCondition(node *v1.Node) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setNodeMemoryPressureCondition for the node.
|
|
||||||
// TODO: this needs to move somewhere centralized...
|
|
||||||
func (kl *Kubelet) setNodeMemoryPressureCondition(node *v1.Node) {
|
|
||||||
currentTime := metav1.NewTime(kl.clock.Now())
|
|
||||||
var condition *v1.NodeCondition
|
|
||||||
|
|
||||||
// Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update.
|
|
||||||
for i := range node.Status.Conditions {
|
|
||||||
if node.Status.Conditions[i].Type == v1.NodeMemoryPressure {
|
|
||||||
condition = &node.Status.Conditions[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
newCondition := false
|
|
||||||
// If the NodeMemoryPressure condition doesn't exist, create one
|
|
||||||
if condition == nil {
|
|
||||||
condition = &v1.NodeCondition{
|
|
||||||
Type: v1.NodeMemoryPressure,
|
|
||||||
Status: v1.ConditionUnknown,
|
|
||||||
}
|
|
||||||
// cannot be appended to node.Status.Conditions here because it gets
|
|
||||||
// copied to the slice. So if we append to the slice here none of the
|
|
||||||
// updates we make below are reflected in the slice.
|
|
||||||
newCondition = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the heartbeat time
|
|
||||||
condition.LastHeartbeatTime = currentTime
|
|
||||||
|
|
||||||
// Note: The conditions below take care of the case when a new NodeMemoryPressure condition is
|
|
||||||
// created and as well as the case when the condition already exists. When a new condition
|
|
||||||
// is created its status is set to v1.ConditionUnknown which matches either
|
|
||||||
// condition.Status != v1.ConditionTrue or
|
|
||||||
// condition.Status != v1.ConditionFalse in the conditions below depending on whether
|
|
||||||
// the kubelet is under memory pressure or not.
|
|
||||||
if kl.evictionManager.IsUnderMemoryPressure() {
|
|
||||||
if condition.Status != v1.ConditionTrue {
|
|
||||||
condition.Status = v1.ConditionTrue
|
|
||||||
condition.Reason = "KubeletHasInsufficientMemory"
|
|
||||||
condition.Message = "kubelet has insufficient memory available"
|
|
||||||
condition.LastTransitionTime = currentTime
|
|
||||||
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasInsufficientMemory")
|
|
||||||
}
|
|
||||||
} else if condition.Status != v1.ConditionFalse {
|
|
||||||
condition.Status = v1.ConditionFalse
|
|
||||||
condition.Reason = "KubeletHasSufficientMemory"
|
|
||||||
condition.Message = "kubelet has sufficient memory available"
|
|
||||||
condition.LastTransitionTime = currentTime
|
|
||||||
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasSufficientMemory")
|
|
||||||
}
|
|
||||||
|
|
||||||
if newCondition {
|
|
||||||
node.Status.Conditions = append(node.Status.Conditions, *condition)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// setNodePIDPressureCondition for the node.
|
// setNodePIDPressureCondition for the node.
|
||||||
// TODO: this needs to move somewhere centralized...
|
// TODO: this needs to move somewhere centralized...
|
||||||
func (kl *Kubelet) setNodePIDPressureCondition(node *v1.Node) {
|
func (kl *Kubelet) setNodePIDPressureCondition(node *v1.Node) {
|
||||||
|
@ -958,7 +902,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
|
||||||
nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
|
nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
|
||||||
withoutError(kl.setNodeStatusInfo),
|
withoutError(kl.setNodeStatusInfo),
|
||||||
nodestatus.OutOfDiskCondition(kl.clock.Now, kl.recordNodeStatusEvent),
|
nodestatus.OutOfDiskCondition(kl.clock.Now, kl.recordNodeStatusEvent),
|
||||||
withoutError(kl.setNodeMemoryPressureCondition),
|
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
|
||||||
withoutError(kl.setNodeDiskPressureCondition),
|
withoutError(kl.setNodeDiskPressureCondition),
|
||||||
withoutError(kl.setNodePIDPressureCondition),
|
withoutError(kl.setNodePIDPressureCondition),
|
||||||
withoutError(kl.setNodeReadyCondition),
|
withoutError(kl.setNodeReadyCondition),
|
||||||
|
|
|
@ -292,7 +292,6 @@ func newTestKubeletWithImageList(
|
||||||
// Relist period does not affect the tests.
|
// Relist period does not affect the tests.
|
||||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, clock.RealClock{})
|
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, clock.RealClock{})
|
||||||
kubelet.clock = fakeClock
|
kubelet.clock = fakeClock
|
||||||
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
|
|
||||||
|
|
||||||
nodeRef := &v1.ObjectReference{
|
nodeRef := &v1.ObjectReference{
|
||||||
Kind: "Node",
|
Kind: "Node",
|
||||||
|
@ -338,6 +337,8 @@ func newTestKubeletWithImageList(
|
||||||
false, /* experimentalCheckNodeCapabilitiesBeforeMount*/
|
false, /* experimentalCheckNodeCapabilitiesBeforeMount*/
|
||||||
false /* keepTerminatedPodVolumes */)
|
false /* keepTerminatedPodVolumes */)
|
||||||
|
|
||||||
|
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
|
||||||
|
|
||||||
// enable active deadline handler
|
// enable active deadline handler
|
||||||
activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock)
|
activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock)
|
||||||
require.NoError(t, err, "Can't initialize active deadline handler")
|
require.NoError(t, err, "Can't initialize active deadline handler")
|
||||||
|
|
|
@ -40,5 +40,6 @@ go_test(
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
|
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -142,6 +142,67 @@ func NodeAddress(nodeIP net.IP, // typically Kubelet.nodeIP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node.
|
||||||
|
func MemoryPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
|
||||||
|
pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderMemoryPressure
|
||||||
|
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
|
||||||
|
) Setter {
|
||||||
|
return func(node *v1.Node) error {
|
||||||
|
currentTime := metav1.NewTime(nowFunc())
|
||||||
|
var condition *v1.NodeCondition
|
||||||
|
|
||||||
|
// Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update.
|
||||||
|
for i := range node.Status.Conditions {
|
||||||
|
if node.Status.Conditions[i].Type == v1.NodeMemoryPressure {
|
||||||
|
condition = &node.Status.Conditions[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newCondition := false
|
||||||
|
// If the NodeMemoryPressure condition doesn't exist, create one
|
||||||
|
if condition == nil {
|
||||||
|
condition = &v1.NodeCondition{
|
||||||
|
Type: v1.NodeMemoryPressure,
|
||||||
|
Status: v1.ConditionUnknown,
|
||||||
|
}
|
||||||
|
// cannot be appended to node.Status.Conditions here because it gets
|
||||||
|
// copied to the slice. So if we append to the slice here none of the
|
||||||
|
// updates we make below are reflected in the slice.
|
||||||
|
newCondition = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the heartbeat time
|
||||||
|
condition.LastHeartbeatTime = currentTime
|
||||||
|
|
||||||
|
// Note: The conditions below take care of the case when a new NodeMemoryPressure condition is
|
||||||
|
// created and as well as the case when the condition already exists. When a new condition
|
||||||
|
// is created its status is set to v1.ConditionUnknown which matches either
|
||||||
|
// condition.Status != v1.ConditionTrue or
|
||||||
|
// condition.Status != v1.ConditionFalse in the conditions below depending on whether
|
||||||
|
// the kubelet is under memory pressure or not.
|
||||||
|
if pressureFunc() {
|
||||||
|
if condition.Status != v1.ConditionTrue {
|
||||||
|
condition.Status = v1.ConditionTrue
|
||||||
|
condition.Reason = "KubeletHasInsufficientMemory"
|
||||||
|
condition.Message = "kubelet has insufficient memory available"
|
||||||
|
condition.LastTransitionTime = currentTime
|
||||||
|
recordEventFunc(v1.EventTypeNormal, "NodeHasInsufficientMemory")
|
||||||
|
}
|
||||||
|
} else if condition.Status != v1.ConditionFalse {
|
||||||
|
condition.Status = v1.ConditionFalse
|
||||||
|
condition.Reason = "KubeletHasSufficientMemory"
|
||||||
|
condition.Message = "kubelet has sufficient memory available"
|
||||||
|
condition.LastTransitionTime = currentTime
|
||||||
|
recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientMemory")
|
||||||
|
}
|
||||||
|
|
||||||
|
if newCondition {
|
||||||
|
node.Status.Conditions = append(node.Status.Conditions, *condition)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// OutOfDiskCondition returns a Setter that updates the v1.NodeOutOfDisk condition on the node.
|
// OutOfDiskCondition returns a Setter that updates the v1.NodeOutOfDisk condition on the node.
|
||||||
// TODO(#65658): remove this condition
|
// TODO(#65658): remove this condition
|
||||||
func OutOfDiskCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
|
func OutOfDiskCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -184,6 +185,127 @@ func TestNodeAddress(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemoryPressureCondition(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
before := now.Add(-time.Second)
|
||||||
|
nowFunc := func() time.Time { return now }
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
desc string
|
||||||
|
node *v1.Node
|
||||||
|
pressure bool
|
||||||
|
expectConditions []v1.NodeCondition
|
||||||
|
expectEvents []testEvent
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "new, no pressure",
|
||||||
|
node: &v1.Node{},
|
||||||
|
pressure: false,
|
||||||
|
expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, now, now)},
|
||||||
|
expectEvents: []testEvent{
|
||||||
|
{
|
||||||
|
eventType: v1.EventTypeNormal,
|
||||||
|
event: "NodeHasSufficientMemory",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "new, pressure",
|
||||||
|
node: &v1.Node{},
|
||||||
|
pressure: true,
|
||||||
|
expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, now, now)},
|
||||||
|
expectEvents: []testEvent{
|
||||||
|
{
|
||||||
|
eventType: v1.EventTypeNormal,
|
||||||
|
event: "NodeHasInsufficientMemory",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "transition to pressure",
|
||||||
|
node: &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, before)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pressure: true,
|
||||||
|
expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, now, now)},
|
||||||
|
expectEvents: []testEvent{
|
||||||
|
{
|
||||||
|
eventType: v1.EventTypeNormal,
|
||||||
|
event: "NodeHasInsufficientMemory",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "transition to no pressure",
|
||||||
|
node: &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, before)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pressure: false,
|
||||||
|
expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, now, now)},
|
||||||
|
expectEvents: []testEvent{
|
||||||
|
{
|
||||||
|
eventType: v1.EventTypeNormal,
|
||||||
|
event: "NodeHasSufficientMemory",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "pressure, no transition",
|
||||||
|
node: &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, before)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pressure: true,
|
||||||
|
expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, now)},
|
||||||
|
expectEvents: []testEvent{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "no pressure, no transition",
|
||||||
|
node: &v1.Node{
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, before)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pressure: false,
|
||||||
|
expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, now)},
|
||||||
|
expectEvents: []testEvent{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
events := []testEvent{}
|
||||||
|
recordEventFunc := func(eventType, event string) {
|
||||||
|
events = append(events, testEvent{
|
||||||
|
eventType: eventType,
|
||||||
|
event: event,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pressureFunc := func() bool {
|
||||||
|
return tc.pressure
|
||||||
|
}
|
||||||
|
// construct setter
|
||||||
|
setter := MemoryPressureCondition(nowFunc, pressureFunc, recordEventFunc)
|
||||||
|
// call setter on node
|
||||||
|
if err := setter(tc.node); err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
// check expected condition
|
||||||
|
assert.True(t, apiequality.Semantic.DeepEqual(tc.expectConditions, tc.node.Status.Conditions),
|
||||||
|
"Diff: %s", diff.ObjectDiff(tc.expectConditions, tc.node.Status.Conditions))
|
||||||
|
// check expected events
|
||||||
|
require.Equal(t, len(tc.expectEvents), len(events))
|
||||||
|
for i := range tc.expectEvents {
|
||||||
|
assert.Equal(t, tc.expectEvents[i], events[i])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Test Helpers:
|
// Test Helpers:
|
||||||
|
|
||||||
// sortableNodeAddress is a type for sorting []v1.NodeAddress
|
// sortableNodeAddress is a type for sorting []v1.NodeAddress
|
||||||
|
@ -198,3 +320,30 @@ func (s sortableNodeAddress) Swap(i, j int) { s[j], s[i] = s[i], s[j] }
|
||||||
func sortNodeAddresses(addrs sortableNodeAddress) {
|
func sortNodeAddresses(addrs sortableNodeAddress) {
|
||||||
sort.Sort(addrs)
|
sort.Sort(addrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testEvent is used to record events for tests
|
||||||
|
type testEvent struct {
|
||||||
|
eventType string
|
||||||
|
event string
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeMemoryPressureCondition(pressure bool, transition, heartbeat time.Time) *v1.NodeCondition {
|
||||||
|
if pressure {
|
||||||
|
return &v1.NodeCondition{
|
||||||
|
Type: v1.NodeMemoryPressure,
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
Reason: "KubeletHasInsufficientMemory",
|
||||||
|
Message: "kubelet has insufficient memory available",
|
||||||
|
LastTransitionTime: metav1.NewTime(transition),
|
||||||
|
LastHeartbeatTime: metav1.NewTime(heartbeat),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &v1.NodeCondition{
|
||||||
|
Type: v1.NodeMemoryPressure,
|
||||||
|
Status: v1.ConditionFalse,
|
||||||
|
Reason: "KubeletHasSufficientMemory",
|
||||||
|
Message: "kubelet has sufficient memory available",
|
||||||
|
LastTransitionTime: metav1.NewTime(transition),
|
||||||
|
LastHeartbeatTime: metav1.NewTime(heartbeat),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue