diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 269a58d282..907b39e9ef 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -724,62 +724,6 @@ func (kl *Kubelet) setNodeReadyCondition(node *v1.Node) { } } -// setNodeMemoryPressureCondition for the node. -// TODO: this needs to move somewhere centralized... -func (kl *Kubelet) setNodeMemoryPressureCondition(node *v1.Node) { - currentTime := metav1.NewTime(kl.clock.Now()) - var condition *v1.NodeCondition - - // Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update. - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == v1.NodeMemoryPressure { - condition = &node.Status.Conditions[i] - } - } - - newCondition := false - // If the NodeMemoryPressure condition doesn't exist, create one - if condition == nil { - condition = &v1.NodeCondition{ - Type: v1.NodeMemoryPressure, - Status: v1.ConditionUnknown, - } - // cannot be appended to node.Status.Conditions here because it gets - // copied to the slice. So if we append to the slice here none of the - // updates we make below are reflected in the slice. - newCondition = true - } - - // Update the heartbeat time - condition.LastHeartbeatTime = currentTime - - // Note: The conditions below take care of the case when a new NodeMemoryPressure condition is - // created and as well as the case when the condition already exists. When a new condition - // is created its status is set to v1.ConditionUnknown which matches either - // condition.Status != v1.ConditionTrue or - // condition.Status != v1.ConditionFalse in the conditions below depending on whether - // the kubelet is under memory pressure or not. - if kl.evictionManager.IsUnderMemoryPressure() { - if condition.Status != v1.ConditionTrue { - condition.Status = v1.ConditionTrue - condition.Reason = "KubeletHasInsufficientMemory" - condition.Message = "kubelet has insufficient memory available" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasInsufficientMemory") - } - } else if condition.Status != v1.ConditionFalse { - condition.Status = v1.ConditionFalse - condition.Reason = "KubeletHasSufficientMemory" - condition.Message = "kubelet has sufficient memory available" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasSufficientMemory") - } - - if newCondition { - node.Status.Conditions = append(node.Status.Conditions, *condition) - } -} - // setNodePIDPressureCondition for the node. // TODO: this needs to move somewhere centralized... func (kl *Kubelet) setNodePIDPressureCondition(node *v1.Node) { @@ -958,7 +902,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc), withoutError(kl.setNodeStatusInfo), nodestatus.OutOfDiskCondition(kl.clock.Now, kl.recordNodeStatusEvent), - withoutError(kl.setNodeMemoryPressureCondition), + nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), withoutError(kl.setNodeDiskPressureCondition), withoutError(kl.setNodePIDPressureCondition), withoutError(kl.setNodeReadyCondition), diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e5416c4a71..bdf54d6d87 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -292,7 +292,6 @@ func newTestKubeletWithImageList( // Relist period does not affect the tests. kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, clock.RealClock{}) kubelet.clock = fakeClock - kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() nodeRef := &v1.ObjectReference{ Kind: "Node", @@ -338,6 +337,8 @@ func newTestKubeletWithImageList( false, /* experimentalCheckNodeCapabilitiesBeforeMount*/ false /* keepTerminatedPodVolumes */) + kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() + // enable active deadline handler activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock) require.NoError(t, err, "Can't initialize active deadline handler") diff --git a/pkg/kubelet/nodestatus/BUILD b/pkg/kubelet/nodestatus/BUILD index 1322361f63..0fe196b85b 100644 --- a/pkg/kubelet/nodestatus/BUILD +++ b/pkg/kubelet/nodestatus/BUILD @@ -40,5 +40,6 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", ], ) diff --git a/pkg/kubelet/nodestatus/setters.go b/pkg/kubelet/nodestatus/setters.go index 0639f2be71..f3c32746bc 100644 --- a/pkg/kubelet/nodestatus/setters.go +++ b/pkg/kubelet/nodestatus/setters.go @@ -142,6 +142,67 @@ func NodeAddress(nodeIP net.IP, // typically Kubelet.nodeIP } } +// MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node. +func MemoryPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now + pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderMemoryPressure + recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent +) Setter { + return func(node *v1.Node) error { + currentTime := metav1.NewTime(nowFunc()) + var condition *v1.NodeCondition + + // Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == v1.NodeMemoryPressure { + condition = &node.Status.Conditions[i] + } + } + + newCondition := false + // If the NodeMemoryPressure condition doesn't exist, create one + if condition == nil { + condition = &v1.NodeCondition{ + Type: v1.NodeMemoryPressure, + Status: v1.ConditionUnknown, + } + // cannot be appended to node.Status.Conditions here because it gets + // copied to the slice. So if we append to the slice here none of the + // updates we make below are reflected in the slice. + newCondition = true + } + + // Update the heartbeat time + condition.LastHeartbeatTime = currentTime + + // Note: The conditions below take care of the case when a new NodeMemoryPressure condition is + // created and as well as the case when the condition already exists. When a new condition + // is created its status is set to v1.ConditionUnknown which matches either + // condition.Status != v1.ConditionTrue or + // condition.Status != v1.ConditionFalse in the conditions below depending on whether + // the kubelet is under memory pressure or not. + if pressureFunc() { + if condition.Status != v1.ConditionTrue { + condition.Status = v1.ConditionTrue + condition.Reason = "KubeletHasInsufficientMemory" + condition.Message = "kubelet has insufficient memory available" + condition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasInsufficientMemory") + } + } else if condition.Status != v1.ConditionFalse { + condition.Status = v1.ConditionFalse + condition.Reason = "KubeletHasSufficientMemory" + condition.Message = "kubelet has sufficient memory available" + condition.LastTransitionTime = currentTime + recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientMemory") + } + + if newCondition { + node.Status.Conditions = append(node.Status.Conditions, *condition) + } + return nil + } +} + // OutOfDiskCondition returns a Setter that updates the v1.NodeOutOfDisk condition on the node. // TODO(#65658): remove this condition func OutOfDiskCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now diff --git a/pkg/kubelet/nodestatus/setters_test.go b/pkg/kubelet/nodestatus/setters_test.go index f3f86ea63c..e5bfb89075 100644 --- a/pkg/kubelet/nodestatus/setters_test.go +++ b/pkg/kubelet/nodestatus/setters_test.go @@ -28,6 +28,7 @@ import ( fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -184,6 +185,127 @@ func TestNodeAddress(t *testing.T) { } } +func TestMemoryPressureCondition(t *testing.T) { + now := time.Now() + before := now.Add(-time.Second) + nowFunc := func() time.Time { return now } + + cases := []struct { + desc string + node *v1.Node + pressure bool + expectConditions []v1.NodeCondition + expectEvents []testEvent + }{ + { + desc: "new, no pressure", + node: &v1.Node{}, + pressure: false, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasSufficientMemory", + }, + }, + }, + { + desc: "new, pressure", + node: &v1.Node{}, + pressure: true, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasInsufficientMemory", + }, + }, + }, + { + desc: "transition to pressure", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, before)}, + }, + }, + pressure: true, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasInsufficientMemory", + }, + }, + }, + { + desc: "transition to no pressure", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, before)}, + }, + }, + pressure: false, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, now, now)}, + expectEvents: []testEvent{ + { + eventType: v1.EventTypeNormal, + event: "NodeHasSufficientMemory", + }, + }, + }, + { + desc: "pressure, no transition", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, before)}, + }, + }, + pressure: true, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(true, before, now)}, + expectEvents: []testEvent{}, + }, + { + desc: "no pressure, no transition", + node: &v1.Node{ + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, before)}, + }, + }, + pressure: false, + expectConditions: []v1.NodeCondition{*makeMemoryPressureCondition(false, before, now)}, + expectEvents: []testEvent{}, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + events := []testEvent{} + recordEventFunc := func(eventType, event string) { + events = append(events, testEvent{ + eventType: eventType, + event: event, + }) + } + pressureFunc := func() bool { + return tc.pressure + } + // construct setter + setter := MemoryPressureCondition(nowFunc, pressureFunc, recordEventFunc) + // call setter on node + if err := setter(tc.node); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check expected condition + assert.True(t, apiequality.Semantic.DeepEqual(tc.expectConditions, tc.node.Status.Conditions), + "Diff: %s", diff.ObjectDiff(tc.expectConditions, tc.node.Status.Conditions)) + // check expected events + require.Equal(t, len(tc.expectEvents), len(events)) + for i := range tc.expectEvents { + assert.Equal(t, tc.expectEvents[i], events[i]) + } + }) + } +} + // Test Helpers: // sortableNodeAddress is a type for sorting []v1.NodeAddress @@ -198,3 +320,30 @@ func (s sortableNodeAddress) Swap(i, j int) { s[j], s[i] = s[i], s[j] } func sortNodeAddresses(addrs sortableNodeAddress) { sort.Sort(addrs) } + +// testEvent is used to record events for tests +type testEvent struct { + eventType string + event string +} + +func makeMemoryPressureCondition(pressure bool, transition, heartbeat time.Time) *v1.NodeCondition { + if pressure { + return &v1.NodeCondition{ + Type: v1.NodeMemoryPressure, + Status: v1.ConditionTrue, + Reason: "KubeletHasInsufficientMemory", + Message: "kubelet has insufficient memory available", + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } + } + return &v1.NodeCondition{ + Type: v1.NodeMemoryPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: "kubelet has sufficient memory available", + LastTransitionTime: metav1.NewTime(transition), + LastHeartbeatTime: metav1.NewTime(heartbeat), + } +}