Update kubelet node status report logic with node lease feature

When node lease feature is enabled, kubelet reports node status to api server
only if there is some change or it didn't report over last report interval.
pull/58/head
Zhen Wang 2018-10-01 11:32:56 -07:00
parent 8dcdec0a67
commit 98fc4a107a
19 changed files with 615 additions and 32 deletions

View File

@ -141,6 +141,7 @@ ComponentConfigs:
MaxOpenFiles: 1000000
MaxPods: 110
NodeLeaseDurationSeconds: 40
NodeStatusReportFrequency: 1m0s
NodeStatusUpdateFrequency: 10s
OOMScoreAdj: -999
PodCIDR: ""

View File

@ -148,6 +148,7 @@ makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeLeaseDurationSeconds: 40
nodeStatusReportFrequency: 1m0s
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1

View File

@ -152,6 +152,7 @@ makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeLeaseDurationSeconds: 40
nodeStatusReportFrequency: 1m0s
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1

View File

@ -139,6 +139,7 @@ makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeLeaseDurationSeconds: 40
nodeStatusReportFrequency: 1m0s
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1

View File

@ -117,6 +117,7 @@ go_library(
"//pkg/volume/validation:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -234,6 +235,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -62,6 +62,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.MaxPods = 110
obj.PodPidsLimit = -1
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute}
obj.NodeLeaseDurationSeconds = 40
obj.CPUManagerPolicy = "none"
obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency

View File

@ -197,6 +197,7 @@ var (
"MaxOpenFiles",
"MaxPods",
"NodeStatusUpdateFrequency.Duration",
"NodeStatusReportFrequency.Duration",
"NodeLeaseDurationSeconds",
"OOMScoreAdj",
"PodCIDR",

View File

@ -151,10 +151,16 @@ type KubeletConfiguration struct {
// streamingConnectionIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamingConnectionIdleTimeout metav1.Duration
// nodeStatusUpdateFrequency is the frequency that kubelet posts node
// status to master. Note: be cautious when changing the constant, it
// must work with nodeMonitorGracePeriod in nodecontroller.
// nodeStatusUpdateFrequency is the frequency that kubelet computes node
// status. If node lease feature is not enabled, it is also the frequency that
// kubelet posts node status to master. In that case, be cautious when
// changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller.
NodeStatusUpdateFrequency metav1.Duration
// nodeStatusReportFrequency is the frequency that kubelet posts node
// status to master if node status does not change. Kubelet will ignore this
// frequency and post node status immediately if any change is detected. It is
// only used when node lease feature is enabled.
NodeStatusReportFrequency metav1.Duration
// nodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds int32
// imageMinimumGCAge is the minimum age for an unused image before it is

View File

@ -107,6 +107,16 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura
if obj.StreamingConnectionIdleTimeout == zeroDuration {
obj.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 4 * time.Hour}
}
if obj.NodeStatusReportFrequency == zeroDuration {
// For backward compatibility, NodeStatusReportFrequency's default value is
// set to NodeStatusUpdateFrequency if NodeStatusUpdateFrequency is set
// explicitly.
if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute}
} else {
obj.NodeStatusReportFrequency = obj.NodeStatusUpdateFrequency
}
}
if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
}

View File

@ -251,6 +251,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
out.ClusterDNS = *(*[]string)(unsafe.Pointer(&in.ClusterDNS))
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds
out.ImageMinimumGCAge = in.ImageMinimumGCAge
if err := v1.Convert_Pointer_int32_To_int32(&in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent, s); err != nil {
@ -380,6 +381,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
out.ClusterDNS = *(*[]string)(unsafe.Pointer(&in.ClusterDNS))
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds
out.ImageMinimumGCAge = in.ImageMinimumGCAge
if err := v1.Convert_int32_To_Pointer_int32(&in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent, s); err != nil {

View File

@ -112,6 +112,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
}
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.ImageMinimumGCAge = in.ImageMinimumGCAge
out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod

View File

@ -510,6 +510,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
@ -716,7 +717,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
glog.Errorf("Pod CIDR update failed %v", err)
}
@ -1035,8 +1036,9 @@ type Kubelet struct {
// used for generating ContainerStatus.
reasonCache *ReasonCache
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
// feature is not enabled, it is also the frequency that kubelet posts node status to master.
// In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
@ -1048,6 +1050,13 @@ type Kubelet struct {
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// nodeStatusUpdateFrequency is the frequency that kubelet posts node
// status to master. It is only used when node lease feature is enabled.
nodeStatusReportFrequency time.Duration
// lastStatusReportTime is the time when node status was last reported.
lastStatusReportTime time.Time
// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
// This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else.
syncNodeStatusMux sync.Mutex
@ -2236,7 +2245,7 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
continue
}
if node.Spec.PodCIDR != "" {
if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
glog.Errorf("Pod CIDR update failed %v", err)
continue
}

View File

@ -55,26 +55,28 @@ func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool {
}
// updatePodCIDR updates the pod CIDR in the runtime state if it is different
// from the current CIDR.
func (kl *Kubelet) updatePodCIDR(cidr string) error {
// from the current CIDR. Return true if pod CIDR is actually changed.
func (kl *Kubelet) updatePodCIDR(cidr string) (bool, error) {
kl.updatePodCIDRMux.Lock()
defer kl.updatePodCIDRMux.Unlock()
podCIDR := kl.runtimeState.podCIDR()
if podCIDR == cidr {
return nil
return false, nil
}
// kubelet -> generic runtime -> runtime shim -> network plugin
// docker/non-cri implementations have a passthrough UpdatePodCIDR
if err := kl.getRuntime().UpdatePodCIDR(cidr); err != nil {
return fmt.Errorf("failed to update pod CIDR: %v", err)
// If updatePodCIDR would fail, theoretically pod CIDR could not change.
// But it is better to be on the safe side to still return true here.
return true, fmt.Errorf("failed to update pod CIDR: %v", err)
}
glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr)
kl.runtimeState.setPodCIDR(cidr)
return nil
return true, nil
}
// GetPodDNS returns DNS settings for the pod.

View File

@ -21,11 +21,13 @@ import (
"fmt"
"net"
goruntime "runtime"
"sort"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -348,8 +350,8 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
// It synchronizes node status to master if there is any change or enough time
// passed from the last sync, registering the kubelet first if necessary.
func (kl *Kubelet) syncNodeStatus() {
kl.syncNodeStatusMux.Lock()
defer kl.syncNodeStatusMux.Unlock()
@ -366,7 +368,8 @@ func (kl *Kubelet) syncNodeStatus() {
}
}
// updateNodeStatus updates node status to master with retries.
// updateNodeStatus updates node status to master with retries if there is any
// change or enough time passed from the last sync.
func (kl *Kubelet) updateNodeStatus() error {
glog.V(5).Infof("Updating node status")
for i := 0; i < nodeStatusUpdateRetry; i++ {
@ -382,7 +385,8 @@ func (kl *Kubelet) updateNodeStatus() error {
return fmt.Errorf("update node status exceeds retry count")
}
// tryUpdateNodeStatus tries to update node status to master.
// tryUpdateNodeStatus tries to update node status to master if there is any
// change or enough time passed from the last sync.
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
// In large clusters, GET and PUT operations on Node objects coming
// from here are the majority of load on apiserver and etcd.
@ -404,18 +408,31 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
return fmt.Errorf("nil %q node object", kl.nodeName)
}
podCIDRChanged := false
if node.Spec.PodCIDR != "" {
if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
// Pod CIDR could have been updated before, so we cannot rely on
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
// actually changed.
if podCIDRChanged, err = kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
glog.Errorf(err.Error())
}
}
kl.setNodeStatus(node)
now := kl.clock.Now()
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
return nil
}
}
// Patch the current status on the API server
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
}
kl.lastStatusReportTime = now
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
@ -553,3 +570,53 @@ func validateNodeIP(nodeIP net.IP) error {
}
return fmt.Errorf("Node IP: %q not found in the host's network interfaces", nodeIP.String())
}
// nodeStatusHasChanged compares the original node and current node's status and
// returns true if any change happens. The heartbeat timestamp is ignored.
func nodeStatusHasChanged(originalStatus *v1.NodeStatus, status *v1.NodeStatus) bool {
if originalStatus == nil && status == nil {
return false
}
if originalStatus == nil || status == nil {
return true
}
// Compare node conditions here because we need to ignore the heartbeat timestamp.
if nodeConditionsHaveChanged(originalStatus.Conditions, status.Conditions) {
return true
}
// Compare other fields of NodeStatus.
originalStatusCopy := originalStatus.DeepCopy()
statusCopy := status.DeepCopy()
originalStatusCopy.Conditions = nil
statusCopy.Conditions = nil
return !apiequality.Semantic.DeepEqual(originalStatusCopy, statusCopy)
}
// nodeConditionsHaveChanged compares the original node and current node's
// conditions and returns true if any change happens. The heartbeat timestamp is
// ignored.
func nodeConditionsHaveChanged(originalConditions []v1.NodeCondition, conditions []v1.NodeCondition) bool {
if len(originalConditions) != len(conditions) {
return true
}
originalConditionsCopy := make([]v1.NodeCondition, 0, len(originalConditions))
originalConditionsCopy = append(originalConditionsCopy, originalConditions...)
conditionsCopy := make([]v1.NodeCondition, 0, len(conditions))
conditionsCopy = append(conditionsCopy, conditions...)
sort.SliceStable(originalConditionsCopy, func(i, j int) bool { return originalConditionsCopy[i].Type < originalConditionsCopy[j].Type })
sort.SliceStable(conditionsCopy, func(i, j int) bool { return conditionsCopy[i].Type < conditionsCopy[j].Type })
replacedheartbeatTime := metav1.Time{}
for i := range conditionsCopy {
originalConditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
conditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
if !apiequality.Semantic.DeepEqual(&originalConditionsCopy[i], &conditionsCopy[i]) {
return true
}
}
return false
}

View File

@ -43,6 +43,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
@ -795,6 +796,239 @@ func TestUpdateNodeStatusError(t *testing.T) {
assert.Len(t, testKubelet.fakeKubeClient.Actions(), nodeStatusUpdateRetry)
}
func TestUpdateNodeStatusWithLease(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, true)()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
clock := testKubelet.fakeClock
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatableReservation: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubelet.nodeStatusReportFrequency = time.Minute
kubeClient := testKubelet.fakeKubeClient
existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 20E9,
}
kubelet.machineInfo = machineInfo
now := metav1.NewTime(clock.Now()).Rfc3339Copy()
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasNoDiskPressure",
Message: fmt.Sprintf("kubelet has no disk pressure"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodePIDPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientPID",
Message: fmt.Sprintf("kubelet has sufficient PID available"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
},
NodeInfo: v1.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: cadvisortest.FakeKernelVersion,
OSImage: cadvisortest.FakeContainerOsVersion,
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "127.0.0.1"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
// images will be sorted from max to min in node status.
Images: []v1.ContainerImage{
{
Names: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"},
SizeBytes: 123,
},
{
Names: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"},
SizeBytes: 456,
},
},
},
}
// Update node status when node status is created.
// Report node status.
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
assert.Len(t, actions, 2)
assert.IsType(t, core.GetActionImpl{}, actions[0])
assert.IsType(t, core.PatchActionImpl{}, actions[1])
patchAction := actions[1].(core.PatchActionImpl)
updatedNode, err := applyNodeStatusPatch(existingNode, patchAction.GetPatch())
require.NoError(t, err)
for _, cond := range updatedNode.Status.Conditions {
cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy()
cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy()
}
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type,
"NodeReady should be the last condition")
// Update node status again when nothing is changed (except heatbeat time).
// Report node status if it has exceeded the duration of nodeStatusReportFrequency.
clock.Step(time.Minute)
assert.NoError(t, kubelet.updateNodeStatus())
// 2 more action (There were 2 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 4)
assert.IsType(t, core.GetActionImpl{}, actions[2])
assert.IsType(t, core.PatchActionImpl{}, actions[3])
patchAction = actions[3].(core.PatchActionImpl)
updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch())
require.NoError(t, err)
for _, cond := range updatedNode.Status.Conditions {
cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy()
cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy()
}
// Expect LastHearbeat updated, other things unchanged.
for i, cond := range expectedNode.Status.Conditions {
expectedNode.Status.Conditions[i].LastHeartbeatTime = metav1.NewTime(cond.LastHeartbeatTime.Time.Add(time.Minute)).Rfc3339Copy()
}
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
// Update node status again when nothing is changed (except heatbeat time).
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.NoError(t, kubelet.updateNodeStatus())
// Only 1 more action (There were 4 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 5)
assert.IsType(t, core.GetActionImpl{}, actions[4])
// Update node status again when something is changed.
// Report node status even if it is still within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
var newMemoryCapacity int64 = 40E9
kubelet.machineInfo.MemoryCapacity = uint64(newMemoryCapacity)
assert.NoError(t, kubelet.updateNodeStatus())
// 2 more action (There were 5 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 7)
assert.IsType(t, core.GetActionImpl{}, actions[5])
assert.IsType(t, core.PatchActionImpl{}, actions[6])
patchAction = actions[6].(core.PatchActionImpl)
updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch())
require.NoError(t, err)
memCapacity, _ := updatedNode.Status.Capacity[v1.ResourceMemory]
updatedMemoryCapacity, _ := (&memCapacity).AsInt64()
assert.Equal(t, newMemoryCapacity, updatedMemoryCapacity, "Memory capacity")
now = metav1.NewTime(clock.Now()).Rfc3339Copy()
for _, cond := range updatedNode.Status.Conditions {
// Expect LastHearbeat updated, while LastTransitionTime unchanged.
assert.Equal(t, now, cond.LastHeartbeatTime.Rfc3339Copy(),
"LastHeartbeatTime for condition %v", cond.Type)
assert.Equal(t, now, metav1.NewTime(cond.LastTransitionTime.Time.Add(time.Minute+20*time.Second)).Rfc3339Copy(),
"LastTransitionTime for condition %v", cond.Type)
}
// Update node status when changing pod CIDR.
// Report node status if it is still within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.Equal(t, "", kubelet.runtimeState.podCIDR(), "Pod CIDR should be empty")
podCIDR := "10.0.0.0/24"
updatedNode.Spec.PodCIDR = podCIDR
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain
assert.NoError(t, kubelet.updateNodeStatus())
assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now")
// 2 more action (There were 7 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 9)
assert.IsType(t, core.GetActionImpl{}, actions[7])
assert.IsType(t, core.PatchActionImpl{}, actions[8])
patchAction = actions[8].(core.PatchActionImpl)
// Update node status when keeping the pod CIDR.
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated")
assert.NoError(t, kubelet.updateNodeStatus())
// Only 1 more action (There were 9 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 10)
assert.IsType(t, core.GetActionImpl{}, actions[9])
}
func TestRegisterWithApiServer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -1529,3 +1763,159 @@ func TestRegisterWithApiServerWithTaint(t *testing.T) {
return
})
}
func TestNodeStatusHasChanged(t *testing.T) {
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
fakeFuture := metav1.Time{Time: fakeNow.Time.Add(time.Minute)}
readyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
readyConditionAtDiffHearbeatTime := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeFuture,
LastTransitionTime: fakeNow,
}
readyConditionAtDiffTransitionTime := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeFuture,
LastTransitionTime: fakeFuture,
}
notReadyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
memoryPressureCondition := v1.NodeCondition{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
testcases := []struct {
name string
originalStatus *v1.NodeStatus
status *v1.NodeStatus
expectChange bool
}{
{
name: "Node status does not change with nil status.",
originalStatus: nil,
status: nil,
expectChange: false,
},
{
name: "Node status does not change with default status.",
originalStatus: &v1.NodeStatus{},
status: &v1.NodeStatus{},
expectChange: false,
},
{
name: "Node status changes with nil and default status.",
originalStatus: nil,
status: &v1.NodeStatus{},
expectChange: true,
},
{
name: "Node status changes with nil and status.",
originalStatus: nil,
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status does not change with empty conditions.",
originalStatus: &v1.NodeStatus{Conditions: []v1.NodeCondition{}},
status: &v1.NodeStatus{Conditions: []v1.NodeCondition{}},
expectChange: false,
},
{
name: "Node status does not change",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: false,
},
{
name: "Node status does not change even if heartbeat time changes.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyConditionAtDiffHearbeatTime, memoryPressureCondition},
},
expectChange: false,
},
{
name: "Node status does not change even if the orders of conditions are different.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{memoryPressureCondition, readyConditionAtDiffHearbeatTime},
},
expectChange: false,
},
{
name: "Node status changes if condition status differs.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{notReadyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes if transition time changes.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyConditionAtDiffTransitionTime, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes with different number of conditions.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes with different phase.",
originalStatus: &v1.NodeStatus{
Phase: v1.NodePending,
Conditions: []v1.NodeCondition{readyCondition},
},
status: &v1.NodeStatus{
Phase: v1.NodeRunning,
Conditions: []v1.NodeCondition{readyCondition},
},
expectChange: true,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
originalStatusCopy := tc.originalStatus.DeepCopy()
statusCopy := tc.status.DeepCopy()
changed := nodeStatusHasChanged(tc.originalStatus, tc.status)
assert.Equal(t, tc.expectChange, changed, "Expect node status change to be %t, but got %t.", tc.expectChange, changed)
assert.True(t, apiequality.Semantic.DeepEqual(originalStatusCopy, tc.originalStatus), "%s", diff.ObjectDiff(originalStatusCopy, tc.originalStatus))
assert.True(t, apiequality.Semantic.DeepEqual(statusCopy, tc.status), "%s", diff.ObjectDiff(statusCopy, tc.status))
})
}
}

View File

@ -292,9 +292,11 @@ type KubeletConfiguration struct {
// Default: "4h"
// +optional
StreamingConnectionIdleTimeout metav1.Duration `json:"streamingConnectionIdleTimeout,omitempty"`
// nodeStatusUpdateFrequency is the frequency that kubelet posts node
// status to master. Note: be cautious when changing the constant, it
// must work with nodeMonitorGracePeriod in nodecontroller.
// nodeStatusUpdateFrequency is the frequency that kubelet computes node
// status. If node lease feature is not enabled, it is also the frequency that
// kubelet posts node status to master.
// Note: When node lease feature is not enabled, be cautious when changing the
// constant, it must work with nodeMonitorGracePeriod in nodecontroller.
// Dynamic Kubelet Config (beta): If dynamically updating this field, consider that
// it may impact node scalability, and also that the node controller's
// nodeMonitorGracePeriod must be set to N*NodeStatusUpdateFrequency,
@ -303,6 +305,16 @@ type KubeletConfiguration struct {
// Default: "10s"
// +optional
NodeStatusUpdateFrequency metav1.Duration `json:"nodeStatusUpdateFrequency,omitempty"`
// nodeStatusReportFrequency is the frequency that kubelet posts node
// status to master if node status does not change. Kubelet will ignore this
// frequency and post node status immediately if any change is detected. It is
// only used when node lease feature is enabled. nodeStatusReportFrequency's
// default value is 1m. But if nodeStatusUpdateFrequency is set explicitly,
// nodeStatusReportFrequency's default value will be set to
// nodeStatusUpdateFrequency for backward compatibility.
// Default: "1m"
// +optional
NodeStatusReportFrequency metav1.Duration `json:"nodeStatusReportFrequency,omitempty"`
// nodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease,
// when the NodeLease feature is enabled. This feature provides an indicator of node
// health by having the Kublet create and periodically renew a lease, named after the node,

View File

@ -143,6 +143,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
}
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.ImageMinimumGCAge = in.ImageMinimumGCAge
if in.ImageGCHighThresholdPercent != nil {
in, out := &in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent

View File

@ -44,6 +44,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/test/e2e/common",
deps = [
"//pkg/api/v1/node:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",

View File

@ -23,7 +23,9 @@ import (
coordv1beta1 "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
@ -31,33 +33,41 @@ import (
)
var _ = framework.KubeDescribe("[Feature:NodeLease][NodeAlphaFeature:NodeLease]", func() {
var nodeName string
f := framework.NewDefaultFramework("node-lease-test")
BeforeEach(func() {
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
Expect(len(nodes.Items)).NotTo(BeZero())
nodeName = nodes.Items[0].ObjectMeta.Name
})
Context("when the NodeLease feature is enabled", func() {
It("the Kubelet should create and update a lease in the kube-node-lease namespace", func() {
It("the kubelet should create and update a lease in the kube-node-lease namespace", func() {
leaseClient := f.ClientSet.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
var (
err error
lease *coordv1beta1.Lease
)
// check that lease for this Kubelet exists in the kube-node-lease namespace
By("check that lease for this Kubelet exists in the kube-node-lease namespace")
Eventually(func() error {
lease, err = leaseClient.Get(framework.TestContext.NodeName, metav1.GetOptions{})
lease, err = leaseClient.Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
return nil
}, 5*time.Minute, 5*time.Second).Should(BeNil())
// check basic expectations for the lease
Expect(expectLease(lease)).To(BeNil())
// ensure that at least one lease renewal happens within the
// lease duration by checking for a change to renew time
Expect(expectLease(lease, nodeName)).To(BeNil())
By("check that node lease is updated at least once within the lease duration")
Eventually(func() error {
newLease, err := leaseClient.Get(framework.TestContext.NodeName, metav1.GetOptions{})
newLease, err := leaseClient.Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
// check basic expectations for the latest lease
if err := expectLease(newLease); err != nil {
if err := expectLease(newLease, nodeName); err != nil {
return err
}
// check that RenewTime has been updated on the latest lease
@ -68,12 +78,76 @@ var _ = framework.KubeDescribe("[Feature:NodeLease][NodeAlphaFeature:NodeLease]"
}
return nil
}, time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second,
time.Duration(*lease.Spec.LeaseDurationSeconds/3)*time.Second)
time.Duration(*lease.Spec.LeaseDurationSeconds/4)*time.Second)
})
It("the kubelet should report node status infrequently", func() {
By("wait until node is ready")
framework.WaitForNodeToBeReady(f.ClientSet, nodeName, 5*time.Minute)
By("wait until there is node lease")
var err error
var lease *coordv1beta1.Lease
Eventually(func() error {
lease, err = f.ClientSet.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease).Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
return nil
}, 5*time.Minute, 5*time.Second).Should(BeNil())
// check basic expectations for the lease
Expect(expectLease(lease, nodeName)).To(BeNil())
leaseDuration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second
By("verify NodeStatus report period is longer than lease duration")
// NodeStatus is reported from node to master when there is some change or
// enough time has passed. So for here, keep checking the time diff
// between 2 NodeStatus report, until it is longer than lease duration (
// the same as nodeMonitorGracePeriod).
heartbeatTime := getNextReadyConditionHeartbeatTime(f.ClientSet, nodeName, metav1.Time{})
Eventually(func() error {
nextHeartbeatTime := getNextReadyConditionHeartbeatTime(f.ClientSet, nodeName, heartbeatTime)
if nextHeartbeatTime.Time.After(heartbeatTime.Time.Add(leaseDuration)) {
return nil
}
heartbeatTime = nextHeartbeatTime
return fmt.Errorf("node status report period is shorter than lease duration")
// Enter next round immediately.
}, 5*time.Minute, time.Nanosecond).Should(BeNil())
By("verify node is still in ready status even though node status report is infrequent")
// This check on node status is only meaningful when this e2e test is
// running as cluster e2e test, because node e2e test does not create and
// run controller manager, i.e., no node lifecycle controller.
node, err := f.ClientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
Expect(err).To(BeNil())
_, readyCondition := v1node.GetNodeCondition(&node.Status, corev1.NodeReady)
Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
})
})
})
func expectLease(lease *coordv1beta1.Lease) error {
func getNextReadyConditionHeartbeatTime(clientSet clientset.Interface, nodeName string, prevHeartbeatTime metav1.Time) metav1.Time {
var newHeartbeatTime metav1.Time
Eventually(func() error {
node, err := clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
_, readyCondition := v1node.GetNodeCondition(&node.Status, corev1.NodeReady)
Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
newHeartbeatTime = readyCondition.LastHeartbeatTime
if prevHeartbeatTime.Before(&newHeartbeatTime) {
return nil
}
return fmt.Errorf("heartbeat has not changed yet")
}, 5*time.Minute, 5*time.Second).Should(BeNil())
return newHeartbeatTime
}
func expectLease(lease *coordv1beta1.Lease, nodeName string) error {
// expect values for HolderIdentity, LeaseDurationSeconds, and RenewTime
if lease.Spec.HolderIdentity == nil {
return fmt.Errorf("Spec.HolderIdentity should not be nil")
@ -85,8 +159,8 @@ func expectLease(lease *coordv1beta1.Lease) error {
return fmt.Errorf("Spec.RenewTime should not be nil")
}
// ensure that the HolderIdentity matches the node name
if *lease.Spec.HolderIdentity != framework.TestContext.NodeName {
return fmt.Errorf("Spec.HolderIdentity (%v) should match the node name (%v)", *lease.Spec.HolderIdentity, framework.TestContext.NodeName)
if *lease.Spec.HolderIdentity != nodeName {
return fmt.Errorf("Spec.HolderIdentity (%v) should match the node name (%v)", *lease.Spec.HolderIdentity, nodeName)
}
return nil
}