Merge pull request #69753 from wangzhen127/diff-node-status

Update kubelet node status report logic with node lease feature
pull/58/head
k8s-ci-robot 2018-11-07 20:26:45 -08:00 committed by GitHub
commit 16d0992534
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 615 additions and 32 deletions

View File

@ -141,6 +141,7 @@ ComponentConfigs:
MaxOpenFiles: 1000000
MaxPods: 110
NodeLeaseDurationSeconds: 40
NodeStatusReportFrequency: 1m0s
NodeStatusUpdateFrequency: 10s
OOMScoreAdj: -999
PodCIDR: ""

View File

@ -148,6 +148,7 @@ makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeLeaseDurationSeconds: 40
nodeStatusReportFrequency: 1m0s
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1

View File

@ -152,6 +152,7 @@ makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeLeaseDurationSeconds: 40
nodeStatusReportFrequency: 1m0s
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1

View File

@ -139,6 +139,7 @@ makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeLeaseDurationSeconds: 40
nodeStatusReportFrequency: 1m0s
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1

View File

@ -117,6 +117,7 @@ go_library(
"//pkg/volume/validation:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -234,6 +235,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -62,6 +62,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.MaxPods = 110
obj.PodPidsLimit = -1
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute}
obj.NodeLeaseDurationSeconds = 40
obj.CPUManagerPolicy = "none"
obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency

View File

@ -197,6 +197,7 @@ var (
"MaxOpenFiles",
"MaxPods",
"NodeStatusUpdateFrequency.Duration",
"NodeStatusReportFrequency.Duration",
"NodeLeaseDurationSeconds",
"OOMScoreAdj",
"PodCIDR",

View File

@ -151,10 +151,16 @@ type KubeletConfiguration struct {
// streamingConnectionIdleTimeout is the maximum time a streaming connection
// can be idle before the connection is automatically closed.
StreamingConnectionIdleTimeout metav1.Duration
// nodeStatusUpdateFrequency is the frequency that kubelet posts node
// status to master. Note: be cautious when changing the constant, it
// must work with nodeMonitorGracePeriod in nodecontroller.
// nodeStatusUpdateFrequency is the frequency that kubelet computes node
// status. If node lease feature is not enabled, it is also the frequency that
// kubelet posts node status to master. In that case, be cautious when
// changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller.
NodeStatusUpdateFrequency metav1.Duration
// nodeStatusReportFrequency is the frequency that kubelet posts node
// status to master if node status does not change. Kubelet will ignore this
// frequency and post node status immediately if any change is detected. It is
// only used when node lease feature is enabled.
NodeStatusReportFrequency metav1.Duration
// nodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds int32
// imageMinimumGCAge is the minimum age for an unused image before it is

View File

@ -107,6 +107,16 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura
if obj.StreamingConnectionIdleTimeout == zeroDuration {
obj.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 4 * time.Hour}
}
if obj.NodeStatusReportFrequency == zeroDuration {
// For backward compatibility, NodeStatusReportFrequency's default value is
// set to NodeStatusUpdateFrequency if NodeStatusUpdateFrequency is set
// explicitly.
if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusReportFrequency = metav1.Duration{Duration: time.Minute}
} else {
obj.NodeStatusReportFrequency = obj.NodeStatusUpdateFrequency
}
}
if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
}

View File

@ -251,6 +251,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
out.ClusterDNS = *(*[]string)(unsafe.Pointer(&in.ClusterDNS))
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds
out.ImageMinimumGCAge = in.ImageMinimumGCAge
if err := v1.Convert_Pointer_int32_To_int32(&in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent, s); err != nil {
@ -380,6 +381,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
out.ClusterDNS = *(*[]string)(unsafe.Pointer(&in.ClusterDNS))
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds
out.ImageMinimumGCAge = in.ImageMinimumGCAge
if err := v1.Convert_int32_To_Pointer_int32(&in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent, s); err != nil {

View File

@ -112,6 +112,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
}
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.ImageMinimumGCAge = in.ImageMinimumGCAge
out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod
out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod

View File

@ -510,6 +510,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
@ -716,7 +717,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
glog.Errorf("Pod CIDR update failed %v", err)
}
@ -1035,8 +1036,9 @@ type Kubelet struct {
// used for generating ContainerStatus.
reasonCache *ReasonCache
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
// feature is not enabled, it is also the frequency that kubelet posts node status to master.
// In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
@ -1048,6 +1050,13 @@ type Kubelet struct {
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// nodeStatusUpdateFrequency is the frequency that kubelet posts node
// status to master. It is only used when node lease feature is enabled.
nodeStatusReportFrequency time.Duration
// lastStatusReportTime is the time when node status was last reported.
lastStatusReportTime time.Time
// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
// This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else.
syncNodeStatusMux sync.Mutex
@ -2236,7 +2245,7 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
continue
}
if node.Spec.PodCIDR != "" {
if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
glog.Errorf("Pod CIDR update failed %v", err)
continue
}

View File

@ -55,26 +55,28 @@ func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool {
}
// updatePodCIDR updates the pod CIDR in the runtime state if it is different
// from the current CIDR.
func (kl *Kubelet) updatePodCIDR(cidr string) error {
// from the current CIDR. Return true if pod CIDR is actually changed.
func (kl *Kubelet) updatePodCIDR(cidr string) (bool, error) {
kl.updatePodCIDRMux.Lock()
defer kl.updatePodCIDRMux.Unlock()
podCIDR := kl.runtimeState.podCIDR()
if podCIDR == cidr {
return nil
return false, nil
}
// kubelet -> generic runtime -> runtime shim -> network plugin
// docker/non-cri implementations have a passthrough UpdatePodCIDR
if err := kl.getRuntime().UpdatePodCIDR(cidr); err != nil {
return fmt.Errorf("failed to update pod CIDR: %v", err)
// If updatePodCIDR would fail, theoretically pod CIDR could not change.
// But it is better to be on the safe side to still return true here.
return true, fmt.Errorf("failed to update pod CIDR: %v", err)
}
glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr)
kl.runtimeState.setPodCIDR(cidr)
return nil
return true, nil
}
// GetPodDNS returns DNS settings for the pod.

View File

@ -21,11 +21,13 @@ import (
"fmt"
"net"
goruntime "runtime"
"sort"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -348,8 +350,8 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
// It synchronizes node status to master if there is any change or enough time
// passed from the last sync, registering the kubelet first if necessary.
func (kl *Kubelet) syncNodeStatus() {
kl.syncNodeStatusMux.Lock()
defer kl.syncNodeStatusMux.Unlock()
@ -366,7 +368,8 @@ func (kl *Kubelet) syncNodeStatus() {
}
}
// updateNodeStatus updates node status to master with retries.
// updateNodeStatus updates node status to master with retries if there is any
// change or enough time passed from the last sync.
func (kl *Kubelet) updateNodeStatus() error {
glog.V(5).Infof("Updating node status")
for i := 0; i < nodeStatusUpdateRetry; i++ {
@ -382,7 +385,8 @@ func (kl *Kubelet) updateNodeStatus() error {
return fmt.Errorf("update node status exceeds retry count")
}
// tryUpdateNodeStatus tries to update node status to master.
// tryUpdateNodeStatus tries to update node status to master if there is any
// change or enough time passed from the last sync.
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
// In large clusters, GET and PUT operations on Node objects coming
// from here are the majority of load on apiserver and etcd.
@ -404,18 +408,31 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
return fmt.Errorf("nil %q node object", kl.nodeName)
}
podCIDRChanged := false
if node.Spec.PodCIDR != "" {
if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
// Pod CIDR could have been updated before, so we cannot rely on
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
// actually changed.
if podCIDRChanged, err = kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
glog.Errorf(err.Error())
}
}
kl.setNodeStatus(node)
now := kl.clock.Now()
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
return nil
}
}
// Patch the current status on the API server
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
}
kl.lastStatusReportTime = now
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
@ -553,3 +570,53 @@ func validateNodeIP(nodeIP net.IP) error {
}
return fmt.Errorf("Node IP: %q not found in the host's network interfaces", nodeIP.String())
}
// nodeStatusHasChanged compares the original node and current node's status and
// returns true if any change happens. The heartbeat timestamp is ignored.
func nodeStatusHasChanged(originalStatus *v1.NodeStatus, status *v1.NodeStatus) bool {
if originalStatus == nil && status == nil {
return false
}
if originalStatus == nil || status == nil {
return true
}
// Compare node conditions here because we need to ignore the heartbeat timestamp.
if nodeConditionsHaveChanged(originalStatus.Conditions, status.Conditions) {
return true
}
// Compare other fields of NodeStatus.
originalStatusCopy := originalStatus.DeepCopy()
statusCopy := status.DeepCopy()
originalStatusCopy.Conditions = nil
statusCopy.Conditions = nil
return !apiequality.Semantic.DeepEqual(originalStatusCopy, statusCopy)
}
// nodeConditionsHaveChanged compares the original node and current node's
// conditions and returns true if any change happens. The heartbeat timestamp is
// ignored.
func nodeConditionsHaveChanged(originalConditions []v1.NodeCondition, conditions []v1.NodeCondition) bool {
if len(originalConditions) != len(conditions) {
return true
}
originalConditionsCopy := make([]v1.NodeCondition, 0, len(originalConditions))
originalConditionsCopy = append(originalConditionsCopy, originalConditions...)
conditionsCopy := make([]v1.NodeCondition, 0, len(conditions))
conditionsCopy = append(conditionsCopy, conditions...)
sort.SliceStable(originalConditionsCopy, func(i, j int) bool { return originalConditionsCopy[i].Type < originalConditionsCopy[j].Type })
sort.SliceStable(conditionsCopy, func(i, j int) bool { return conditionsCopy[i].Type < conditionsCopy[j].Type })
replacedheartbeatTime := metav1.Time{}
for i := range conditionsCopy {
originalConditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
conditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
if !apiequality.Semantic.DeepEqual(&originalConditionsCopy[i], &conditionsCopy[i]) {
return true
}
}
return false
}

View File

@ -43,6 +43,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
@ -795,6 +796,239 @@ func TestUpdateNodeStatusError(t *testing.T) {
assert.Len(t, testKubelet.fakeKubeClient.Actions(), nodeStatusUpdateRetry)
}
func TestUpdateNodeStatusWithLease(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, true)()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
clock := testKubelet.fakeClock
kubelet := testKubelet.kubelet
kubelet.nodeStatusMaxImages = 5 // don't truncate the image list that gets constructed by hand for this test
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatableReservation: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubelet.nodeStatusReportFrequency = time.Minute
kubeClient := testKubelet.fakeKubeClient
existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 20E9,
}
kubelet.machineInfo = machineInfo
now := metav1.NewTime(clock.Now()).Rfc3339Copy()
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasNoDiskPressure",
Message: fmt.Sprintf("kubelet has no disk pressure"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodePIDPressure,
Status: v1.ConditionFalse,
Reason: "KubeletHasSufficientPID",
Message: fmt.Sprintf("kubelet has sufficient PID available"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: now,
LastTransitionTime: now,
},
},
NodeInfo: v1.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: cadvisortest.FakeKernelVersion,
OSImage: cadvisortest.FakeContainerOsVersion,
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "127.0.0.1"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
// images will be sorted from max to min in node status.
Images: []v1.ContainerImage{
{
Names: []string{"k8s.gcr.io:v1", "k8s.gcr.io:v2"},
SizeBytes: 123,
},
{
Names: []string{"k8s.gcr.io:v3", "k8s.gcr.io:v4"},
SizeBytes: 456,
},
},
},
}
// Update node status when node status is created.
// Report node status.
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
assert.Len(t, actions, 2)
assert.IsType(t, core.GetActionImpl{}, actions[0])
assert.IsType(t, core.PatchActionImpl{}, actions[1])
patchAction := actions[1].(core.PatchActionImpl)
updatedNode, err := applyNodeStatusPatch(existingNode, patchAction.GetPatch())
require.NoError(t, err)
for _, cond := range updatedNode.Status.Conditions {
cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy()
cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy()
}
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
assert.Equal(t, v1.NodeReady, updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type,
"NodeReady should be the last condition")
// Update node status again when nothing is changed (except heatbeat time).
// Report node status if it has exceeded the duration of nodeStatusReportFrequency.
clock.Step(time.Minute)
assert.NoError(t, kubelet.updateNodeStatus())
// 2 more action (There were 2 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 4)
assert.IsType(t, core.GetActionImpl{}, actions[2])
assert.IsType(t, core.PatchActionImpl{}, actions[3])
patchAction = actions[3].(core.PatchActionImpl)
updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch())
require.NoError(t, err)
for _, cond := range updatedNode.Status.Conditions {
cond.LastHeartbeatTime = cond.LastHeartbeatTime.Rfc3339Copy()
cond.LastTransitionTime = cond.LastTransitionTime.Rfc3339Copy()
}
// Expect LastHearbeat updated, other things unchanged.
for i, cond := range expectedNode.Status.Conditions {
expectedNode.Status.Conditions[i].LastHeartbeatTime = metav1.NewTime(cond.LastHeartbeatTime.Time.Add(time.Minute)).Rfc3339Copy()
}
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
// Update node status again when nothing is changed (except heatbeat time).
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.NoError(t, kubelet.updateNodeStatus())
// Only 1 more action (There were 4 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 5)
assert.IsType(t, core.GetActionImpl{}, actions[4])
// Update node status again when something is changed.
// Report node status even if it is still within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
var newMemoryCapacity int64 = 40E9
kubelet.machineInfo.MemoryCapacity = uint64(newMemoryCapacity)
assert.NoError(t, kubelet.updateNodeStatus())
// 2 more action (There were 5 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 7)
assert.IsType(t, core.GetActionImpl{}, actions[5])
assert.IsType(t, core.PatchActionImpl{}, actions[6])
patchAction = actions[6].(core.PatchActionImpl)
updatedNode, err = applyNodeStatusPatch(updatedNode, patchAction.GetPatch())
require.NoError(t, err)
memCapacity, _ := updatedNode.Status.Capacity[v1.ResourceMemory]
updatedMemoryCapacity, _ := (&memCapacity).AsInt64()
assert.Equal(t, newMemoryCapacity, updatedMemoryCapacity, "Memory capacity")
now = metav1.NewTime(clock.Now()).Rfc3339Copy()
for _, cond := range updatedNode.Status.Conditions {
// Expect LastHearbeat updated, while LastTransitionTime unchanged.
assert.Equal(t, now, cond.LastHeartbeatTime.Rfc3339Copy(),
"LastHeartbeatTime for condition %v", cond.Type)
assert.Equal(t, now, metav1.NewTime(cond.LastTransitionTime.Time.Add(time.Minute+20*time.Second)).Rfc3339Copy(),
"LastTransitionTime for condition %v", cond.Type)
}
// Update node status when changing pod CIDR.
// Report node status if it is still within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.Equal(t, "", kubelet.runtimeState.podCIDR(), "Pod CIDR should be empty")
podCIDR := "10.0.0.0/24"
updatedNode.Spec.PodCIDR = podCIDR
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain
assert.NoError(t, kubelet.updateNodeStatus())
assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now")
// 2 more action (There were 7 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 9)
assert.IsType(t, core.GetActionImpl{}, actions[7])
assert.IsType(t, core.PatchActionImpl{}, actions[8])
patchAction = actions[8].(core.PatchActionImpl)
// Update node status when keeping the pod CIDR.
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated")
assert.NoError(t, kubelet.updateNodeStatus())
// Only 1 more action (There were 9 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 10)
assert.IsType(t, core.GetActionImpl{}, actions[9])
}
func TestRegisterWithApiServer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -1529,3 +1763,159 @@ func TestRegisterWithApiServerWithTaint(t *testing.T) {
return
})
}
func TestNodeStatusHasChanged(t *testing.T) {
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
fakeFuture := metav1.Time{Time: fakeNow.Time.Add(time.Minute)}
readyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
readyConditionAtDiffHearbeatTime := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeFuture,
LastTransitionTime: fakeNow,
}
readyConditionAtDiffTransitionTime := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: fakeFuture,
LastTransitionTime: fakeFuture,
}
notReadyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
memoryPressureCondition := v1.NodeCondition{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
}
testcases := []struct {
name string
originalStatus *v1.NodeStatus
status *v1.NodeStatus
expectChange bool
}{
{
name: "Node status does not change with nil status.",
originalStatus: nil,
status: nil,
expectChange: false,
},
{
name: "Node status does not change with default status.",
originalStatus: &v1.NodeStatus{},
status: &v1.NodeStatus{},
expectChange: false,
},
{
name: "Node status changes with nil and default status.",
originalStatus: nil,
status: &v1.NodeStatus{},
expectChange: true,
},
{
name: "Node status changes with nil and status.",
originalStatus: nil,
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status does not change with empty conditions.",
originalStatus: &v1.NodeStatus{Conditions: []v1.NodeCondition{}},
status: &v1.NodeStatus{Conditions: []v1.NodeCondition{}},
expectChange: false,
},
{
name: "Node status does not change",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: false,
},
{
name: "Node status does not change even if heartbeat time changes.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyConditionAtDiffHearbeatTime, memoryPressureCondition},
},
expectChange: false,
},
{
name: "Node status does not change even if the orders of conditions are different.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{memoryPressureCondition, readyConditionAtDiffHearbeatTime},
},
expectChange: false,
},
{
name: "Node status changes if condition status differs.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{notReadyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes if transition time changes.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyConditionAtDiffTransitionTime, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes with different number of conditions.",
originalStatus: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition},
},
status: &v1.NodeStatus{
Conditions: []v1.NodeCondition{readyCondition, memoryPressureCondition},
},
expectChange: true,
},
{
name: "Node status changes with different phase.",
originalStatus: &v1.NodeStatus{
Phase: v1.NodePending,
Conditions: []v1.NodeCondition{readyCondition},
},
status: &v1.NodeStatus{
Phase: v1.NodeRunning,
Conditions: []v1.NodeCondition{readyCondition},
},
expectChange: true,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
originalStatusCopy := tc.originalStatus.DeepCopy()
statusCopy := tc.status.DeepCopy()
changed := nodeStatusHasChanged(tc.originalStatus, tc.status)
assert.Equal(t, tc.expectChange, changed, "Expect node status change to be %t, but got %t.", tc.expectChange, changed)
assert.True(t, apiequality.Semantic.DeepEqual(originalStatusCopy, tc.originalStatus), "%s", diff.ObjectDiff(originalStatusCopy, tc.originalStatus))
assert.True(t, apiequality.Semantic.DeepEqual(statusCopy, tc.status), "%s", diff.ObjectDiff(statusCopy, tc.status))
})
}
}

View File

@ -292,9 +292,11 @@ type KubeletConfiguration struct {
// Default: "4h"
// +optional
StreamingConnectionIdleTimeout metav1.Duration `json:"streamingConnectionIdleTimeout,omitempty"`
// nodeStatusUpdateFrequency is the frequency that kubelet posts node
// status to master. Note: be cautious when changing the constant, it
// must work with nodeMonitorGracePeriod in nodecontroller.
// nodeStatusUpdateFrequency is the frequency that kubelet computes node
// status. If node lease feature is not enabled, it is also the frequency that
// kubelet posts node status to master.
// Note: When node lease feature is not enabled, be cautious when changing the
// constant, it must work with nodeMonitorGracePeriod in nodecontroller.
// Dynamic Kubelet Config (beta): If dynamically updating this field, consider that
// it may impact node scalability, and also that the node controller's
// nodeMonitorGracePeriod must be set to N*NodeStatusUpdateFrequency,
@ -303,6 +305,16 @@ type KubeletConfiguration struct {
// Default: "10s"
// +optional
NodeStatusUpdateFrequency metav1.Duration `json:"nodeStatusUpdateFrequency,omitempty"`
// nodeStatusReportFrequency is the frequency that kubelet posts node
// status to master if node status does not change. Kubelet will ignore this
// frequency and post node status immediately if any change is detected. It is
// only used when node lease feature is enabled. nodeStatusReportFrequency's
// default value is 1m. But if nodeStatusUpdateFrequency is set explicitly,
// nodeStatusReportFrequency's default value will be set to
// nodeStatusUpdateFrequency for backward compatibility.
// Default: "1m"
// +optional
NodeStatusReportFrequency metav1.Duration `json:"nodeStatusReportFrequency,omitempty"`
// nodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease,
// when the NodeLease feature is enabled. This feature provides an indicator of node
// health by having the Kublet create and periodically renew a lease, named after the node,

View File

@ -143,6 +143,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
}
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeStatusReportFrequency = in.NodeStatusReportFrequency
out.ImageMinimumGCAge = in.ImageMinimumGCAge
if in.ImageGCHighThresholdPercent != nil {
in, out := &in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent

View File

@ -44,6 +44,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/test/e2e/common",
deps = [
"//pkg/api/v1/node:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",

View File

@ -23,7 +23,9 @@ import (
coordv1beta1 "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
@ -31,33 +33,41 @@ import (
)
var _ = framework.KubeDescribe("[Feature:NodeLease][NodeAlphaFeature:NodeLease]", func() {
var nodeName string
f := framework.NewDefaultFramework("node-lease-test")
BeforeEach(func() {
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
Expect(len(nodes.Items)).NotTo(BeZero())
nodeName = nodes.Items[0].ObjectMeta.Name
})
Context("when the NodeLease feature is enabled", func() {
It("the Kubelet should create and update a lease in the kube-node-lease namespace", func() {
It("the kubelet should create and update a lease in the kube-node-lease namespace", func() {
leaseClient := f.ClientSet.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
var (
err error
lease *coordv1beta1.Lease
)
// check that lease for this Kubelet exists in the kube-node-lease namespace
By("check that lease for this Kubelet exists in the kube-node-lease namespace")
Eventually(func() error {
lease, err = leaseClient.Get(framework.TestContext.NodeName, metav1.GetOptions{})
lease, err = leaseClient.Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
return nil
}, 5*time.Minute, 5*time.Second).Should(BeNil())
// check basic expectations for the lease
Expect(expectLease(lease)).To(BeNil())
// ensure that at least one lease renewal happens within the
// lease duration by checking for a change to renew time
Expect(expectLease(lease, nodeName)).To(BeNil())
By("check that node lease is updated at least once within the lease duration")
Eventually(func() error {
newLease, err := leaseClient.Get(framework.TestContext.NodeName, metav1.GetOptions{})
newLease, err := leaseClient.Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
// check basic expectations for the latest lease
if err := expectLease(newLease); err != nil {
if err := expectLease(newLease, nodeName); err != nil {
return err
}
// check that RenewTime has been updated on the latest lease
@ -68,12 +78,76 @@ var _ = framework.KubeDescribe("[Feature:NodeLease][NodeAlphaFeature:NodeLease]"
}
return nil
}, time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second,
time.Duration(*lease.Spec.LeaseDurationSeconds/3)*time.Second)
time.Duration(*lease.Spec.LeaseDurationSeconds/4)*time.Second)
})
It("the kubelet should report node status infrequently", func() {
By("wait until node is ready")
framework.WaitForNodeToBeReady(f.ClientSet, nodeName, 5*time.Minute)
By("wait until there is node lease")
var err error
var lease *coordv1beta1.Lease
Eventually(func() error {
lease, err = f.ClientSet.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease).Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
return nil
}, 5*time.Minute, 5*time.Second).Should(BeNil())
// check basic expectations for the lease
Expect(expectLease(lease, nodeName)).To(BeNil())
leaseDuration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second
By("verify NodeStatus report period is longer than lease duration")
// NodeStatus is reported from node to master when there is some change or
// enough time has passed. So for here, keep checking the time diff
// between 2 NodeStatus report, until it is longer than lease duration (
// the same as nodeMonitorGracePeriod).
heartbeatTime := getNextReadyConditionHeartbeatTime(f.ClientSet, nodeName, metav1.Time{})
Eventually(func() error {
nextHeartbeatTime := getNextReadyConditionHeartbeatTime(f.ClientSet, nodeName, heartbeatTime)
if nextHeartbeatTime.Time.After(heartbeatTime.Time.Add(leaseDuration)) {
return nil
}
heartbeatTime = nextHeartbeatTime
return fmt.Errorf("node status report period is shorter than lease duration")
// Enter next round immediately.
}, 5*time.Minute, time.Nanosecond).Should(BeNil())
By("verify node is still in ready status even though node status report is infrequent")
// This check on node status is only meaningful when this e2e test is
// running as cluster e2e test, because node e2e test does not create and
// run controller manager, i.e., no node lifecycle controller.
node, err := f.ClientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
Expect(err).To(BeNil())
_, readyCondition := v1node.GetNodeCondition(&node.Status, corev1.NodeReady)
Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
})
})
})
func expectLease(lease *coordv1beta1.Lease) error {
func getNextReadyConditionHeartbeatTime(clientSet clientset.Interface, nodeName string, prevHeartbeatTime metav1.Time) metav1.Time {
var newHeartbeatTime metav1.Time
Eventually(func() error {
node, err := clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
return err
}
_, readyCondition := v1node.GetNodeCondition(&node.Status, corev1.NodeReady)
Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
newHeartbeatTime = readyCondition.LastHeartbeatTime
if prevHeartbeatTime.Before(&newHeartbeatTime) {
return nil
}
return fmt.Errorf("heartbeat has not changed yet")
}, 5*time.Minute, 5*time.Second).Should(BeNil())
return newHeartbeatTime
}
func expectLease(lease *coordv1beta1.Lease, nodeName string) error {
// expect values for HolderIdentity, LeaseDurationSeconds, and RenewTime
if lease.Spec.HolderIdentity == nil {
return fmt.Errorf("Spec.HolderIdentity should not be nil")
@ -85,8 +159,8 @@ func expectLease(lease *coordv1beta1.Lease) error {
return fmt.Errorf("Spec.RenewTime should not be nil")
}
// ensure that the HolderIdentity matches the node name
if *lease.Spec.HolderIdentity != framework.TestContext.NodeName {
return fmt.Errorf("Spec.HolderIdentity (%v) should match the node name (%v)", *lease.Spec.HolderIdentity, framework.TestContext.NodeName)
if *lease.Spec.HolderIdentity != nodeName {
return fmt.Errorf("Spec.HolderIdentity (%v) should match the node name (%v)", *lease.Spec.HolderIdentity, nodeName)
}
return nil
}