port setNodeReadyCondition to Setter abstraction, add test

pull/8/head
Michael Taufen 2018-06-25 18:52:52 -07:00
parent e0b6ae219f
commit 3e03e0611e
4 changed files with 301 additions and 80 deletions

View File

@ -22,7 +22,6 @@ import (
"math" "math"
"net" "net"
goruntime "runtime" goruntime "runtime"
"strings"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -646,84 +645,6 @@ func (kl *Kubelet) setNodeStatusInfo(node *v1.Node) {
} }
} }
// Set Ready condition for the node.
func (kl *Kubelet) setNodeReadyCondition(node *v1.Node) {
// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
// This is due to an issue with version skewed kubelet and master components.
// ref: https://github.com/kubernetes/kubernetes/issues/16961
currentTime := metav1.NewTime(kl.clock.Now())
newNodeReadyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
rs := append(kl.runtimeState.runtimeErrors(), kl.runtimeState.networkErrors()...)
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
}
missingCapacities := []string{}
for _, resource := range requiredCapacities {
if _, found := node.Status.Capacity[resource]; !found {
missingCapacities = append(missingCapacities, string(resource))
}
}
if len(missingCapacities) > 0 {
rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
}
if len(rs) > 0 {
newNodeReadyCondition = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(rs, ","),
LastHeartbeatTime: currentTime,
}
}
// Append AppArmor status if it's enabled.
// TODO(tallclair): This is a temporary message until node feature reporting is added.
if newNodeReadyCondition.Status == v1.ConditionTrue &&
kl.appArmorValidator != nil && kl.appArmorValidator.ValidateHost() == nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message)
}
// Record any soft requirements that were not met in the container manager.
status := kl.containerManager.Status()
if status.SoftRequirements != nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error())
}
readyConditionUpdated := false
needToRecordEvent := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeReady {
if node.Status.Conditions[i].Status == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
needToRecordEvent = true
}
node.Status.Conditions[i] = newNodeReadyCondition
readyConditionUpdated = true
break
}
}
if !readyConditionUpdated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if needToRecordEvent {
if newNodeReadyCondition.Status == v1.ConditionTrue {
kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeReady)
} else {
kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotReady)
glog.Infof("Node became not ready: %+v", newNodeReadyCondition)
}
}
}
// record if node schedulable change. // record if node schedulable change.
func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) { func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) {
kl.lastNodeUnschedulableLock.Lock() kl.lastNodeUnschedulableLock.Lock()
@ -786,6 +707,10 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
if kl.cloud != nil { if kl.cloud != nil {
nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
} }
var validateHostFunc func() error
if kl.appArmorValidator != nil {
validateHostFunc = kl.appArmorValidator.ValidateHost
}
return []func(*v1.Node) error{ return []func(*v1.Node) error{
nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc), nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
withoutError(kl.setNodeStatusInfo), withoutError(kl.setNodeStatusInfo),
@ -793,7 +718,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
withoutError(kl.setNodeReadyCondition), nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
withoutError(kl.setNodeVolumesInUseStatus), withoutError(kl.setNodeVolumesInUseStatus),
withoutError(kl.recordNodeSchedulableEvent), withoutError(kl.recordNodeSchedulableEvent),
} }

View File

@ -7,10 +7,14 @@ go_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
], ],
) )
@ -35,8 +39,11 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",

View File

@ -19,13 +19,18 @@ package nodestatus
import ( import (
"fmt" "fmt"
"net" "net"
"strings"
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/events"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -142,6 +147,95 @@ func NodeAddress(nodeIP net.IP, // typically Kubelet.nodeIP
} }
} }
// ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node.
func ReadyCondition(
nowFunc func() time.Time, // typically Kubelet.clock.Now
runtimeErrorsFunc func() []string, // typically Kubelet.runtimeState.runtimeErrors
networkErrorsFunc func() []string, // typically Kubelet.runtimeState.networkErrors
appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
) Setter {
return func(node *v1.Node) error {
// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
// This is due to an issue with version skewed kubelet and master components.
// ref: https://github.com/kubernetes/kubernetes/issues/16961
currentTime := metav1.NewTime(nowFunc())
newNodeReadyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
rs := append(runtimeErrorsFunc(), networkErrorsFunc()...)
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
}
missingCapacities := []string{}
for _, resource := range requiredCapacities {
if _, found := node.Status.Capacity[resource]; !found {
missingCapacities = append(missingCapacities, string(resource))
}
}
if len(missingCapacities) > 0 {
rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
}
if len(rs) > 0 {
newNodeReadyCondition = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(rs, ","),
LastHeartbeatTime: currentTime,
}
}
// Append AppArmor status if it's enabled.
// TODO(tallclair): This is a temporary message until node feature reporting is added.
if appArmorValidateHostFunc != nil && newNodeReadyCondition.Status == v1.ConditionTrue {
if err := appArmorValidateHostFunc(); err == nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message)
}
}
// Record any soft requirements that were not met in the container manager.
status := cmStatusFunc()
if status.SoftRequirements != nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error())
}
readyConditionUpdated := false
needToRecordEvent := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeReady {
if node.Status.Conditions[i].Status == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
needToRecordEvent = true
}
node.Status.Conditions[i] = newNodeReadyCondition
readyConditionUpdated = true
break
}
}
if !readyConditionUpdated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if needToRecordEvent {
if newNodeReadyCondition.Status == v1.ConditionTrue {
recordEventFunc(v1.EventTypeNormal, events.NodeReady)
} else {
recordEventFunc(v1.EventTypeNormal, events.NodeNotReady)
glog.Infof("Node became not ready: %+v", newNodeReadyCondition)
}
}
return nil
}
}
// MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node. // MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node.
func MemoryPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now func MemoryPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderMemoryPressure pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderMemoryPressure

View File

@ -17,15 +17,20 @@ limitations under the License.
package nodestatus package nodestatus
import ( import (
"fmt"
"net" "net"
"sort" "sort"
"testing" "testing"
"time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/events"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -185,6 +190,175 @@ func TestNodeAddress(t *testing.T) {
} }
} }
func TestReadyCondition(t *testing.T) {
now := time.Now()
before := now.Add(-time.Second)
nowFunc := func() time.Time { return now }
withCapacity := &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(100, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
}
cases := []struct {
desc string
node *v1.Node
runtimeErrors []string
networkErrors []string
appArmorValidateHostFunc func() error
cmStatus cm.Status
expectConditions []v1.NodeCondition
expectEvents []testEvent
}{
{
desc: "new, ready",
node: withCapacity.DeepCopy(),
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)},
// TODO(mtaufen): The current behavior is that we don't send an event for the initial NodeReady condition,
// the reason for this is unclear, so we may want to actually send an event, and change these test cases
// to ensure an event is sent.
},
{
desc: "new, ready: apparmor validator passed",
node: withCapacity.DeepCopy(),
appArmorValidateHostFunc: func() error { return nil },
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. AppArmor enabled", now, now)},
},
{
desc: "new, ready: apparmor validator failed",
node: withCapacity.DeepCopy(),
appArmorValidateHostFunc: func() error { return fmt.Errorf("foo") },
// absence of an additional message is understood to mean that AppArmor is disabled
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)},
},
{
desc: "new, ready: soft requirement warning",
node: withCapacity.DeepCopy(),
cmStatus: cm.Status{
SoftRequirements: fmt.Errorf("foo"),
},
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status. WARNING: foo", now, now)},
},
{
desc: "new, not ready: runtime errors",
node: withCapacity.DeepCopy(),
runtimeErrors: []string{"foo", "bar"},
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo,bar", now, now)},
},
{
desc: "new, not ready: network errors",
node: withCapacity.DeepCopy(),
networkErrors: []string{"foo", "bar"},
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo,bar", now, now)},
},
{
desc: "new, not ready: runtime and network errors",
node: withCapacity.DeepCopy(),
runtimeErrors: []string{"runtime"},
networkErrors: []string{"network"},
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "runtime,network", now, now)},
},
{
desc: "new, not ready: missing capacities",
node: &v1.Node{},
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "Missing node capacity for resources: cpu, memory, pods, ephemeral-storage", now, now)},
},
// the transition tests ensure timestamps are set correctly, no need to test the entire condition matrix in this section
{
desc: "transition to ready",
node: func() *v1.Node {
node := withCapacity.DeepCopy()
node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(false, "", before, before)}
return node
}(),
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)},
expectEvents: []testEvent{
{
eventType: v1.EventTypeNormal,
event: events.NodeReady,
},
},
},
{
desc: "transition to not ready",
node: func() *v1.Node {
node := withCapacity.DeepCopy()
node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(true, "", before, before)}
return node
}(),
runtimeErrors: []string{"foo"},
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", now, now)},
expectEvents: []testEvent{
{
eventType: v1.EventTypeNormal,
event: events.NodeNotReady,
},
},
},
{
desc: "ready, no transition",
node: func() *v1.Node {
node := withCapacity.DeepCopy()
node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(true, "", before, before)}
return node
}(),
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", before, now)},
expectEvents: []testEvent{},
},
{
desc: "not ready, no transition",
node: func() *v1.Node {
node := withCapacity.DeepCopy()
node.Status.Conditions = []v1.NodeCondition{*makeReadyCondition(false, "", before, before)}
return node
}(),
runtimeErrors: []string{"foo"},
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "foo", before, now)},
expectEvents: []testEvent{},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
runtimeErrorsFunc := func() []string {
return tc.runtimeErrors
}
networkErrorsFunc := func() []string {
return tc.networkErrors
}
cmStatusFunc := func() cm.Status {
return tc.cmStatus
}
events := []testEvent{}
recordEventFunc := func(eventType, event string) {
events = append(events, testEvent{
eventType: eventType,
event: event,
})
}
// construct setter
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, tc.appArmorValidateHostFunc, cmStatusFunc, recordEventFunc)
// call setter on node
if err := setter(tc.node); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// check expected condition
assert.True(t, apiequality.Semantic.DeepEqual(tc.expectConditions, tc.node.Status.Conditions),
"Diff: %s", diff.ObjectDiff(tc.expectConditions, tc.node.Status.Conditions))
// check expected events
require.Equal(t, len(tc.expectEvents), len(events))
for i := range tc.expectEvents {
assert.Equal(t, tc.expectEvents[i], events[i])
}
})
}
}
func TestMemoryPressureCondition(t *testing.T) { func TestMemoryPressureCondition(t *testing.T) {
now := time.Now() now := time.Now()
before := now.Add(-time.Second) before := now.Add(-time.Second)
@ -569,6 +743,27 @@ type testEvent struct {
event string event string
} }
func makeReadyCondition(ready bool, message string, transition, heartbeat time.Time) *v1.NodeCondition {
if ready {
return &v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: message,
LastTransitionTime: metav1.NewTime(transition),
LastHeartbeatTime: metav1.NewTime(heartbeat),
}
}
return &v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "KubeletNotReady",
Message: message,
LastTransitionTime: metav1.NewTime(transition),
LastHeartbeatTime: metav1.NewTime(heartbeat),
}
}
func makeMemoryPressureCondition(pressure bool, transition, heartbeat time.Time) *v1.NodeCondition { func makeMemoryPressureCondition(pressure bool, transition, heartbeat time.Time) *v1.NodeCondition {
if pressure { if pressure {
return &v1.NodeCondition{ return &v1.NodeCondition{