Merge pull request #65660 from mtaufen/incremental-refactor-kubelet-node-status

Automatic merge from submit-queue (batch tested with PRs 66152, 66406, 66218, 66278, 65660). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Refactor kubelet node status setters, add test coverage

This internal refactor moves the node status setters to a new package, explicitly injects dependencies to facilitate unit testing, and adds individual unit tests for the setters.

I gave each setter a distinct commit to facilitate review.

Non-goals:
- I intentionally excluded the class of setters that return a "modified" boolean, as I want to think more carefully about how to cleanly handle the behavior, and this PR is already rather large.
- I would like to clean up the status update control loops as well, but that belongs in a separate PR.

```release-note
NONE
```
pull/8/head
Kubernetes Submit Queue 2018-07-20 12:12:24 -07:00 committed by GitHub
commit 53ee0c8652
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 2535 additions and 899 deletions

View File

@ -10,7 +10,6 @@ go_library(
name = "go_default_library",
srcs = [
"active_deadline.go",
"cloud_request_manager.go",
"doc.go",
"kubelet.go",
"kubelet_getters.go",
@ -48,6 +47,7 @@ go_library(
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cloudresource:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/configmap:go_default_library",
@ -66,6 +66,7 @@ go_library(
"//pkg/kubelet/metrics/collectors:go_default_library",
"//pkg/kubelet/mountpod:go_default_library",
"//pkg/kubelet/network/dns:go_default_library",
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/preemption:go_default_library",
@ -102,7 +103,6 @@ go_library(
"//pkg/util/node:go_default_library",
"//pkg/util/oom:go_default_library",
"//pkg/util/removeall:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/util:go_default_library",
@ -119,7 +119,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
@ -147,7 +146,6 @@ go_test(
name = "go_default_test",
srcs = [
"active_deadline_test.go",
"cloud_request_manager_test.go",
"kubelet_getters_test.go",
"kubelet_network_test.go",
"kubelet_node_status_test.go",
@ -166,7 +164,6 @@ go_test(
deps = [
"//pkg/apis/core/install:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/cadvisor/testing:go_default_library",
@ -180,6 +177,7 @@ go_test(
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/logs:go_default_library",
"//pkg/kubelet/network/dns:go_default_library",
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/pod/testing:go_default_library",
@ -255,6 +253,7 @@ filegroup(
"//pkg/kubelet/checkpoint:all-srcs",
"//pkg/kubelet/checkpointmanager:all-srcs",
"//pkg/kubelet/client:all-srcs",
"//pkg/kubelet/cloudresource:all-srcs",
"//pkg/kubelet/cm:all-srcs",
"//pkg/kubelet/config:all-srcs",
"//pkg/kubelet/configmap:all-srcs",
@ -273,6 +272,7 @@ filegroup(
"//pkg/kubelet/metrics:all-srcs",
"//pkg/kubelet/mountpod:all-srcs",
"//pkg/kubelet/network:all-srcs",
"//pkg/kubelet/nodestatus:all-srcs",
"//pkg/kubelet/pleg:all-srcs",
"//pkg/kubelet/pod:all-srcs",
"//pkg/kubelet/preemption:all-srcs",

View File

@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["cloud_request_manager.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/cloudresource",
visibility = ["//visibility:public"],
deps = [
"//pkg/cloudprovider:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["cloud_request_manager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/cloudprovider/providers/fake:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cloudresource
import (
"context"
@ -32,6 +32,14 @@ import (
var nodeAddressesRetryPeriod = 5 * time.Second
// SyncManager is an interface for making requests to a cloud provider
type SyncManager interface {
Run(stopCh <-chan struct{})
NodeAddresses() ([]v1.NodeAddress, error)
}
var _ SyncManager = &cloudResourceSyncManager{}
type cloudResourceSyncManager struct {
// Cloud provider interface.
cloud cloudprovider.Interface
@ -45,9 +53,9 @@ type cloudResourceSyncManager struct {
nodeName types.NodeName
}
// NewCloudResourceSyncManager creates a manager responsible for collecting resources
// NewSyncManager creates a manager responsible for collecting resources
// from a cloud provider through requests that are sensitive to timeouts and hanging
func NewCloudResourceSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) *cloudResourceSyncManager {
func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager {
return &cloudResourceSyncManager{
cloud: cloud,
syncPeriod: syncPeriod,

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cloudresource
import (
"fmt"
@ -64,7 +64,7 @@ func TestNodeAddressesRequest(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
manager := NewCloudResourceSyncManager(cloud, "defaultNode", syncPeriod)
manager := NewSyncManager(cloud, "defaultNode", syncPeriod).(*cloudResourceSyncManager)
go manager.Run(stopCh)
nodeAddresses, err := collectNodeAddresses(manager)

View File

@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
@ -521,7 +522,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
if klet.cloud != nil {
klet.cloudResourceSyncManager = NewCloudResourceSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
}
var secretManager secret.Manager
@ -789,7 +790,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
@ -836,6 +836,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = *kubeCfg
// Generating the status funcs should be the last thing we do,
// since this relies on the rest of the Kubelet having been constructed.
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
return klet, nil
}
@ -962,7 +967,7 @@ type Kubelet struct {
// Cloud provider interface.
cloud cloudprovider.Interface
// Handles requests to cloud provider with timeout
cloudResourceSyncManager *cloudResourceSyncManager
cloudResourceSyncManager cloudresource.SyncManager
// Indicates that the node initialization happens in an external cloud controller
externalCloudProvider bool

View File

@ -19,10 +19,8 @@ package kubelet
import (
"context"
"fmt"
"math"
"net"
goruntime "runtime"
"strings"
"time"
"github.com/golang/glog"
@ -31,28 +29,20 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/version"
volutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
// maxNamesPerImageInNodeStatus is max number of names per image stored in
// the node status.
maxNamesPerImageInNodeStatus = 5
)
// registerWithAPIServer registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
@ -342,30 +332,6 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
return node, nil
}
// setVolumeLimits updates volume limits on the node
func (kl *Kubelet) setVolumeLimits(node *v1.Node) {
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
pluginWithLimits := kl.volumePluginMgr.ListVolumePluginWithLimits()
for _, volumePlugin := range pluginWithLimits {
attachLimits, err := volumePlugin.GetVolumeLimits()
if err != nil {
glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName())
continue
}
for limitKey, value := range attachLimits {
node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
}
}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
@ -398,8 +364,7 @@ func (kl *Kubelet) updateNodeStatus() error {
return fmt.Errorf("update node status exceeds retry count")
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
// tryUpdateNodeStatus tries to update node status to master.
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
// In large clusters, GET and PUT operations on Node objects coming
// from here are the majority of load on apiserver and etcd.
@ -447,591 +412,13 @@ func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) {
kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event)
}
// Set IP and hostname addresses for the node.
func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
if kl.nodeIP != nil {
if err := kl.nodeIPValidator(kl.nodeIP); err != nil {
return fmt.Errorf("failed to validate nodeIP: %v", err)
}
glog.V(2).Infof("Using node IP: %q", kl.nodeIP.String())
}
if kl.externalCloudProvider {
if kl.nodeIP != nil {
if node.ObjectMeta.Annotations == nil {
node.ObjectMeta.Annotations = make(map[string]string)
}
node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = kl.nodeIP.String()
}
// We rely on the external cloud provider to supply the addresses.
return nil
}
if kl.cloud != nil {
nodeAddresses, err := kl.cloudResourceSyncManager.NodeAddresses()
if err != nil {
return err
}
if kl.nodeIP != nil {
enforcedNodeAddresses := []v1.NodeAddress{}
var nodeIPType v1.NodeAddressType
for _, nodeAddress := range nodeAddresses {
if nodeAddress.Address == kl.nodeIP.String() {
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
nodeIPType = nodeAddress.Type
break
}
}
if len(enforcedNodeAddresses) > 0 {
for _, nodeAddress := range nodeAddresses {
if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName {
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
}
}
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: kl.GetHostname()})
node.Status.Addresses = enforcedNodeAddresses
return nil
}
return fmt.Errorf("failed to get node address from cloud provider that matches ip: %v", kl.nodeIP)
}
// Only add a NodeHostName address if the cloudprovider did not specify any addresses.
// (we assume the cloudprovider is authoritative if it specifies any addresses)
if len(nodeAddresses) == 0 {
nodeAddresses = []v1.NodeAddress{{Type: v1.NodeHostName, Address: kl.GetHostname()}}
}
node.Status.Addresses = nodeAddresses
} else {
var ipAddr net.IP
var err error
// 1) Use nodeIP if set
// 2) If the user has specified an IP to HostnameOverride, use it
// 3) Lookup the IP from node name by DNS and use the first valid IPv4 address.
// If the node does not have a valid IPv4 address, use the first valid IPv6 address.
// 4) Try to get the IP from the network interface used as default gateway
if kl.nodeIP != nil {
ipAddr = kl.nodeIP
} else if addr := net.ParseIP(kl.hostname); addr != nil {
ipAddr = addr
} else {
var addrs []net.IP
addrs, _ = net.LookupIP(node.Name)
for _, addr := range addrs {
if err = kl.nodeIPValidator(addr); err == nil {
if addr.To4() != nil {
ipAddr = addr
break
}
if addr.To16() != nil && ipAddr == nil {
ipAddr = addr
}
}
}
if ipAddr == nil {
ipAddr, err = utilnet.ChooseHostInterface()
}
}
if ipAddr == nil {
// We tried everything we could, but the IP address wasn't fetchable; error out
return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err)
}
node.Status.Addresses = []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: ipAddr.String()},
{Type: v1.NodeHostName, Address: kl.GetHostname()},
}
}
return nil
}
func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
// Note: avoid blindly overwriting the capacity in case opaque
// resources are being advertised.
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
var devicePluginAllocatable v1.ResourceList
var devicePluginCapacity v1.ResourceList
var removedDevicePlugins []string
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity[v1.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI)
node.Status.Capacity[v1.ResourceMemory] = resource.MustParse("0Gi")
node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI)
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) {
node.Status.Capacity[rName] = rCap
}
if kl.podsPerCore > 0 {
node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(
int64(math.Min(float64(info.NumCores*kl.podsPerCore), float64(kl.maxPods))), resource.DecimalSI)
} else {
node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(
int64(kl.maxPods), resource.DecimalSI)
}
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.NodeRebooted,
"Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
// TODO: all the node resources should use GetCapacity instead of deriving the
// capacity for every node status request
initialCapacity := kl.containerManager.GetCapacity()
if initialCapacity != nil {
node.Status.Capacity[v1.ResourceEphemeralStorage] = initialCapacity[v1.ResourceEphemeralStorage]
}
}
devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity()
if devicePluginCapacity != nil {
for k, v := range devicePluginCapacity {
if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
}
node.Status.Capacity[k] = v
}
}
for _, removedResource := range removedDevicePlugins {
glog.V(2).Infof("Set capacity for %s to 0 on device removal", removedResource)
// Set the capacity of the removed resource to 0 instead of
// removing the resource from the node status. This is to indicate
// that the resource is managed by device plugin and had been
// registered before.
//
// This is required to differentiate the device plugin managed
// resources and the cluster-level resources, which are absent in
// node status.
node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
}
}
// Set Allocatable.
if node.Status.Allocatable == nil {
node.Status.Allocatable = make(v1.ResourceList)
}
// Remove extended resources from allocatable that are no longer
// present in capacity.
for k := range node.Status.Allocatable {
_, found := node.Status.Capacity[k]
if !found && v1helper.IsExtendedResourceName(k) {
delete(node.Status.Allocatable, k)
}
}
allocatableReservation := kl.containerManager.GetNodeAllocatableReservation()
for k, v := range node.Status.Capacity {
value := *(v.Copy())
if res, exists := allocatableReservation[k]; exists {
value.Sub(res)
}
if value.Sign() < 0 {
// Negative Allocatable resources don't make sense.
value.Set(0)
}
node.Status.Allocatable[k] = value
}
if devicePluginAllocatable != nil {
for k, v := range devicePluginAllocatable {
if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() {
glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value())
}
node.Status.Allocatable[k] = v
}
}
// for every huge page reservation, we need to remove it from allocatable memory
for k, v := range node.Status.Capacity {
if v1helper.IsHugePageResourceName(k) {
allocatableMemory := node.Status.Allocatable[v1.ResourceMemory]
value := *(v.Copy())
allocatableMemory.Sub(value)
if allocatableMemory.Sign() < 0 {
// Negative Allocatable resources don't make sense.
allocatableMemory.Set(0)
}
node.Status.Allocatable[v1.ResourceMemory] = allocatableMemory
}
}
}
// Set versioninfo for the node.
func (kl *Kubelet) setNodeStatusVersionInfo(node *v1.Node) {
verinfo, err := kl.cadvisor.VersionInfo()
if err != nil {
glog.Errorf("Error getting version info: %v", err)
return
}
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion
runtimeVersion := "Unknown"
if runtimeVer, err := kl.containerRuntime.Version(); err == nil {
runtimeVersion = runtimeVer.String()
}
node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", kl.containerRuntime.Type(), runtimeVersion)
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
// Set daemonEndpoints for the node.
func (kl *Kubelet) setNodeStatusDaemonEndpoints(node *v1.Node) {
node.Status.DaemonEndpoints = *kl.daemonEndpoints
}
// Set images list for the node
func (kl *Kubelet) setNodeStatusImages(node *v1.Node) {
// Update image list of this node
var imagesOnNode []v1.ContainerImage
containerImages, err := kl.imageManager.GetImageList()
if err != nil {
glog.Errorf("Error getting image list: %v", err)
node.Status.Images = imagesOnNode
return
}
// sort the images from max to min, and only set top N images into the node status.
if int(kl.nodeStatusMaxImages) > -1 &&
int(kl.nodeStatusMaxImages) < len(containerImages) {
containerImages = containerImages[0:kl.nodeStatusMaxImages]
}
for _, image := range containerImages {
names := append(image.RepoDigests, image.RepoTags...)
// Report up to maxNamesPerImageInNodeStatus names per image.
if len(names) > maxNamesPerImageInNodeStatus {
names = names[0:maxNamesPerImageInNodeStatus]
}
imagesOnNode = append(imagesOnNode, v1.ContainerImage{
Names: names,
SizeBytes: image.Size,
})
}
node.Status.Images = imagesOnNode
}
// Set the GOOS and GOARCH for this node
func (kl *Kubelet) setNodeStatusGoRuntime(node *v1.Node) {
node.Status.NodeInfo.OperatingSystem = goruntime.GOOS
node.Status.NodeInfo.Architecture = goruntime.GOARCH
}
// Set status for the node.
func (kl *Kubelet) setNodeStatusInfo(node *v1.Node) {
kl.setNodeStatusMachineInfo(node)
kl.setNodeStatusVersionInfo(node)
kl.setNodeStatusDaemonEndpoints(node)
kl.setNodeStatusImages(node)
kl.setNodeStatusGoRuntime(node)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
kl.setVolumeLimits(node)
}
}
// Set Ready condition for the node.
func (kl *Kubelet) setNodeReadyCondition(node *v1.Node) {
// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
// This is due to an issue with version skewed kubelet and master components.
// ref: https://github.com/kubernetes/kubernetes/issues/16961
currentTime := metav1.NewTime(kl.clock.Now())
newNodeReadyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
rs := append(kl.runtimeState.runtimeErrors(), kl.runtimeState.networkErrors()...)
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
}
missingCapacities := []string{}
for _, resource := range requiredCapacities {
if _, found := node.Status.Capacity[resource]; !found {
missingCapacities = append(missingCapacities, string(resource))
}
}
if len(missingCapacities) > 0 {
rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
}
if len(rs) > 0 {
newNodeReadyCondition = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(rs, ","),
LastHeartbeatTime: currentTime,
}
}
// Append AppArmor status if it's enabled.
// TODO(tallclair): This is a temporary message until node feature reporting is added.
if newNodeReadyCondition.Status == v1.ConditionTrue &&
kl.appArmorValidator != nil && kl.appArmorValidator.ValidateHost() == nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message)
}
// Record any soft requirements that were not met in the container manager.
status := kl.containerManager.Status()
if status.SoftRequirements != nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error())
}
readyConditionUpdated := false
needToRecordEvent := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeReady {
if node.Status.Conditions[i].Status == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
needToRecordEvent = true
}
node.Status.Conditions[i] = newNodeReadyCondition
readyConditionUpdated = true
break
}
}
if !readyConditionUpdated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if needToRecordEvent {
if newNodeReadyCondition.Status == v1.ConditionTrue {
kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeReady)
} else {
kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotReady)
glog.Infof("Node became not ready: %+v", newNodeReadyCondition)
}
}
}
// setNodeMemoryPressureCondition for the node.
// TODO: this needs to move somewhere centralized...
func (kl *Kubelet) setNodeMemoryPressureCondition(node *v1.Node) {
currentTime := metav1.NewTime(kl.clock.Now())
var condition *v1.NodeCondition
// Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeMemoryPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodeMemoryPressure condition doesn't exist, create one
if condition == nil {
condition = &v1.NodeCondition{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeMemoryPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to v1.ConditionUnknown which matches either
// condition.Status != v1.ConditionTrue or
// condition.Status != v1.ConditionFalse in the conditions below depending on whether
// the kubelet is under memory pressure or not.
if kl.evictionManager.IsUnderMemoryPressure() {
if condition.Status != v1.ConditionTrue {
condition.Status = v1.ConditionTrue
condition.Reason = "KubeletHasInsufficientMemory"
condition.Message = "kubelet has insufficient memory available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasInsufficientMemory")
}
} else if condition.Status != v1.ConditionFalse {
condition.Status = v1.ConditionFalse
condition.Reason = "KubeletHasSufficientMemory"
condition.Message = "kubelet has sufficient memory available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasSufficientMemory")
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
}
// setNodePIDPressureCondition for the node.
// TODO: this needs to move somewhere centralized...
func (kl *Kubelet) setNodePIDPressureCondition(node *v1.Node) {
currentTime := metav1.NewTime(kl.clock.Now())
var condition *v1.NodeCondition
// Check if NodePIDPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodePIDPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodePIDPressure condition doesn't exist, create one
if condition == nil {
condition = &v1.NodeCondition{
Type: v1.NodePIDPressure,
Status: v1.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodePIDPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to v1.ConditionUnknown which matches either
// condition.Status != v1.ConditionTrue or
// condition.Status != v1.ConditionFalse in the conditions below depending on whether
// the kubelet is under PID pressure or not.
if kl.evictionManager.IsUnderPIDPressure() {
if condition.Status != v1.ConditionTrue {
condition.Status = v1.ConditionTrue
condition.Reason = "KubeletHasInsufficientPID"
condition.Message = "kubelet has insufficient PID available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasInsufficientPID")
}
} else if condition.Status != v1.ConditionFalse {
condition.Status = v1.ConditionFalse
condition.Reason = "KubeletHasSufficientPID"
condition.Message = "kubelet has sufficient PID available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasSufficientPID")
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
}
// setNodeDiskPressureCondition for the node.
// TODO: this needs to move somewhere centralized...
func (kl *Kubelet) setNodeDiskPressureCondition(node *v1.Node) {
currentTime := metav1.NewTime(kl.clock.Now())
var condition *v1.NodeCondition
// Check if NodeDiskPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeDiskPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodeDiskPressure condition doesn't exist, create one
if condition == nil {
condition = &v1.NodeCondition{
Type: v1.NodeDiskPressure,
Status: v1.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeDiskPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to v1.ConditionUnknown which matches either
// condition.Status != v1.ConditionTrue or
// condition.Status != v1.ConditionFalse in the conditions below depending on whether
// the kubelet is under disk pressure or not.
if kl.evictionManager.IsUnderDiskPressure() {
if condition.Status != v1.ConditionTrue {
condition.Status = v1.ConditionTrue
condition.Reason = "KubeletHasDiskPressure"
condition.Message = "kubelet has disk pressure"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasDiskPressure")
}
} else if condition.Status != v1.ConditionFalse {
condition.Status = v1.ConditionFalse
condition.Reason = "KubeletHasNoDiskPressure"
condition.Message = "kubelet has no disk pressure"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasNoDiskPressure")
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
}
// Set OODCondition for the node.
func (kl *Kubelet) setNodeOODCondition(node *v1.Node) {
currentTime := metav1.NewTime(kl.clock.Now())
var nodeOODCondition *v1.NodeCondition
// Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeOutOfDisk {
nodeOODCondition = &node.Status.Conditions[i]
}
}
newOODCondition := nodeOODCondition == nil
if newOODCondition {
nodeOODCondition = &v1.NodeCondition{}
}
if nodeOODCondition.Status != v1.ConditionFalse {
nodeOODCondition.Type = v1.NodeOutOfDisk
nodeOODCondition.Status = v1.ConditionFalse
nodeOODCondition.Reason = "KubeletHasSufficientDisk"
nodeOODCondition.Message = "kubelet has sufficient disk space available"
nodeOODCondition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(v1.EventTypeNormal, "NodeHasSufficientDisk")
}
// Update the heartbeat time irrespective of all the conditions.
nodeOODCondition.LastHeartbeatTime = currentTime
if newOODCondition {
node.Status.Conditions = append(node.Status.Conditions, *nodeOODCondition)
}
// recordEvent records an event for this node, the Kubelet's nodeRef is passed to the recorder
func (kl *Kubelet) recordEvent(eventType, event, message string) {
kl.recorder.Eventf(kl.nodeRef, eventType, event, message)
}
// record if node schedulable change.
func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) {
func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) error {
kl.lastNodeUnschedulableLock.Lock()
defer kl.lastNodeUnschedulableLock.Unlock()
if kl.lastNodeUnschedulable != node.Spec.Unschedulable {
@ -1042,15 +429,7 @@ func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) {
}
kl.lastNodeUnschedulable = node.Spec.Unschedulable
}
}
// Update VolumesInUse field in Node Status only after states are synced up at least once
// in volume reconciler.
func (kl *Kubelet) setNodeVolumesInUseStatus(node *v1.Node) {
// Make sure to only update node status after reconciler starts syncing up states
if kl.volumeManager.ReconcilerStatesHasBeenSynced() {
node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse()
}
return nil
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
@ -1080,24 +459,42 @@ func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress {
// defaultNodeStatusFuncs is a factory that generates the default set of
// setNodeStatus funcs
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
// initial set of node status update handlers, can be modified by Option's
withoutError := func(f func(*v1.Node)) func(*v1.Node) error {
return func(n *v1.Node) error {
f(n)
return nil
}
// if cloud is not nil, we expect the cloud resource sync manager to exist
var nodeAddressesFunc func() ([]v1.NodeAddress, error)
if kl.cloud != nil {
nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
}
return []func(*v1.Node) error{
kl.setNodeAddress,
withoutError(kl.setNodeStatusInfo),
withoutError(kl.setNodeOODCondition),
withoutError(kl.setNodeMemoryPressureCondition),
withoutError(kl.setNodeDiskPressureCondition),
withoutError(kl.setNodePIDPressureCondition),
withoutError(kl.setNodeReadyCondition),
withoutError(kl.setNodeVolumesInUseStatus),
withoutError(kl.recordNodeSchedulableEvent),
var validateHostFunc func() error
if kl.appArmorValidator != nil {
validateHostFunc = kl.appArmorValidator.ValidateHost
}
var setters []func(n *v1.Node) error
setters = append(setters,
nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
nodestatus.DaemonEndpoints(kl.daemonEndpoints),
nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
nodestatus.GoRuntime(),
)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
}
setters = append(setters,
nodestatus.OutOfDiskCondition(kl.clock.Now, kl.recordNodeStatusEvent),
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
// these side-effects by decoupling the decisions to send events and partial status recording
// from the Node setters.
kl.recordNodeSchedulableEvent,
)
return setters
}
// Validate given node IP belongs to the current host

View File

@ -46,11 +46,11 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume/util"
@ -85,7 +85,7 @@ func makeExpectedImageList(imageList []kubecontainer.Image, maxImages int) []v1.
var expectedImageList []v1.ContainerImage
for _, kubeImage := range imageList {
apiImage := v1.ContainerImage{
Names: kubeImage.RepoTags[0:maxNamesPerImageInNodeStatus],
Names: kubeImage.RepoTags[0:nodestatus.MaxNamesPerImageInNodeStatus],
SizeBytes: kubeImage.Size,
}
@ -100,9 +100,9 @@ func makeExpectedImageList(imageList []kubecontainer.Image, maxImages int) []v1.
func generateImageTags() []string {
var tagList []string
// Generate > maxNamesPerImageInNodeStatus tags so that the test can verify
// that kubelet report up to maxNamesPerImageInNodeStatus tags.
count := rand.IntnRange(maxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1)
// Generate > MaxNamesPerImageInNodeStatus tags so that the test can verify
// that kubelet report up to MaxNamesPerImageInNodeStatus tags.
count := rand.IntnRange(nodestatus.MaxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1)
for ; count > 0; count-- {
tagList = append(tagList, "k8s.gcr.io:v"+strconv.Itoa(count))
}
@ -140,160 +140,6 @@ func (lcm *localCM) GetCapacity() v1.ResourceList {
return lcm.capacity
}
func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.hostname = testKubeletHostname
cases := []struct {
name string
nodeIP net.IP
nodeAddresses []v1.NodeAddress
expectedAddresses []v1.NodeAddress
shouldError bool
}{
{
name: "A single InternalIP",
nodeIP: net.ParseIP("10.1.1.1"),
nodeAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
expectedAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
shouldError: false,
},
{
name: "NodeIP is external",
nodeIP: net.ParseIP("55.55.55.55"),
nodeAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
expectedAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
shouldError: false,
},
{
// Accommodating #45201 and #49202
name: "InternalIP and ExternalIP are the same",
nodeIP: net.ParseIP("55.55.55.55"),
nodeAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "55.55.55.55"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
expectedAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "55.55.55.55"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
shouldError: false,
},
{
name: "An Internal/ExternalIP, an Internal/ExternalDNS",
nodeIP: net.ParseIP("10.1.1.1"),
nodeAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"},
{Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
expectedAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"},
{Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
shouldError: false,
},
{
name: "An Internal with multiple internal IPs",
nodeIP: net.ParseIP("10.1.1.1"),
nodeAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeInternalIP, Address: "10.2.2.2"},
{Type: v1.NodeInternalIP, Address: "10.3.3.3"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
expectedAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
shouldError: false,
},
{
name: "An InternalIP that isn't valid: should error",
nodeIP: net.ParseIP("10.2.2.2"),
nodeAddresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
{Type: v1.NodeHostName, Address: testKubeletHostname},
},
expectedAddresses: nil,
shouldError: true,
},
}
for _, testCase := range cases {
// testCase setup
existingNode := v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)},
Spec: v1.NodeSpec{},
}
kubelet.nodeIP = testCase.nodeIP
fakeCloud := &fakecloud.FakeCloud{
Addresses: testCase.nodeAddresses,
Err: nil,
}
kubelet.cloud = fakeCloud
kubelet.cloudResourceSyncManager = NewCloudResourceSyncManager(kubelet.cloud, kubelet.nodeName, kubelet.nodeStatusUpdateFrequency)
stopCh := make(chan struct{})
go kubelet.cloudResourceSyncManager.Run(stopCh)
kubelet.nodeIPValidator = func(nodeIP net.IP) error {
return nil
}
// execute method
err := kubelet.setNodeAddress(&existingNode)
close(stopCh)
if err != nil && !testCase.shouldError {
t.Errorf("Unexpected error for test %s: %q", testCase.name, err)
continue
} else if err != nil && testCase.shouldError {
// expected an error
continue
}
// Sort both sets for consistent equality
sortNodeAddresses(testCase.expectedAddresses)
sortNodeAddresses(existingNode.Status.Addresses)
assert.True(
t,
apiequality.Semantic.DeepEqual(
testCase.expectedAddresses,
existingNode.Status.Addresses,
),
fmt.Sprintf("Test %s failed %%s", testCase.name),
diff.ObjectDiff(testCase.expectedAddresses, existingNode.Status.Addresses),
)
}
}
// sortableNodeAddress is a type for sorting []v1.NodeAddress
type sortableNodeAddress []v1.NodeAddress
@ -350,6 +196,10 @@ func TestUpdateNewNodeStatus(t *testing.T) {
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
@ -483,6 +333,9 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{
@ -749,6 +602,9 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
v1.ResourceEphemeralStorage: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
@ -1190,6 +1046,10 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
v1.ResourceEphemeralStorage: *resource.NewQuantity(3000, resource.BinarySI),
},
}
// Since this test retroactively overrides the stub container manager,
// we have to regenerate default status setters.
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
@ -1641,85 +1501,3 @@ func TestValidateNodeIPParam(t *testing.T) {
}
}
}
func TestSetVolumeLimits(t *testing.T) {
testKubelet := newTestKubeletWithoutFakeVolumePlugin(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.hostname = testKubeletHostname
var testcases = []struct {
name string
cloudProviderName string
expectedVolumeKey string
expectedLimit int64
}{
{
name: "For default GCE cloudprovider",
cloudProviderName: "gce",
expectedVolumeKey: util.GCEVolumeLimitKey,
expectedLimit: 16,
},
{
name: "For default AWS Cloudprovider",
cloudProviderName: "aws",
expectedVolumeKey: util.EBSVolumeLimitKey,
expectedLimit: 39,
},
{
name: "for default Azure cloudprovider",
cloudProviderName: "azure",
expectedVolumeKey: util.AzureVolumeLimitKey,
expectedLimit: 16,
},
{
name: "when no cloudprovider is present",
cloudProviderName: "",
expectedVolumeKey: util.AzureVolumeLimitKey,
expectedLimit: -1,
},
}
for _, test := range testcases {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)},
Spec: v1.NodeSpec{},
}
if test.cloudProviderName != "" {
fakeCloud := &fakecloud.FakeCloud{
Provider: test.cloudProviderName,
Err: nil,
}
kubelet.cloud = fakeCloud
} else {
kubelet.cloud = nil
}
kubelet.setVolumeLimits(node)
nodeLimits := []v1.ResourceList{}
nodeLimits = append(nodeLimits, node.Status.Allocatable)
nodeLimits = append(nodeLimits, node.Status.Capacity)
for _, volumeLimits := range nodeLimits {
if test.expectedLimit == -1 {
_, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)]
if ok {
t.Errorf("Expected no volume limit found for %s", test.expectedVolumeKey)
}
} else {
fl, ok := volumeLimits[v1.ResourceName(test.expectedVolumeKey)]
if !ok {
t.Errorf("Expected to found volume limit for %s found none", test.expectedVolumeKey)
}
foundLimit, _ := fl.AsInt64()
expectedValue := resource.NewQuantity(test.expectedLimit, resource.DecimalSI)
if expectedValue.Cmp(fl) != 0 {
t.Errorf("Expected volume limit for %s to be %v found %v", test.expectedVolumeKey, test.expectedLimit, foundLimit)
}
}
}
}
}

View File

@ -292,7 +292,6 @@ func newTestKubeletWithImageList(
// Relist period does not affect the tests.
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, clock.RealClock{})
kubelet.clock = fakeClock
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
nodeRef := &v1.ObjectReference{
Kind: "Node",
@ -338,6 +337,8 @@ func newTestKubeletWithImageList(
false, /* experimentalCheckNodeCapabilitiesBeforeMount*/
false /* keepTerminatedPodVolumes */)
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock)
require.NoError(t, err, "Can't initialize active deadline handler")

View File

@ -0,0 +1,68 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["setters.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/nodestatus",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["setters_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)

View File

@ -0,0 +1,739 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodestatus
import (
"fmt"
"math"
"net"
goruntime "runtime"
"strings"
"time"
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
"github.com/golang/glog"
)
const (
// MaxNamesPerImageInNodeStatus is max number of names
// per image stored in the node status.
MaxNamesPerImageInNodeStatus = 5
)
// Setter modifies the node in-place, and returns an error if the modification failed.
// Setters may partially mutate the node before returning an error.
type Setter func(node *v1.Node) error
// NodeAddress returns a Setter that updates address-related information on the node.
func NodeAddress(nodeIP net.IP, // typically Kubelet.nodeIP
validateNodeIPFunc func(net.IP) error, // typically Kubelet.nodeIPValidator
hostname string, // typically Kubelet.hostname
externalCloudProvider bool, // typically Kubelet.externalCloudProvider
cloud cloudprovider.Interface, // typically Kubelet.cloud
nodeAddressesFunc func() ([]v1.NodeAddress, error), // typically Kubelet.cloudResourceSyncManager.NodeAddresses
) Setter {
return func(node *v1.Node) error {
if nodeIP != nil {
if err := validateNodeIPFunc(nodeIP); err != nil {
return fmt.Errorf("failed to validate nodeIP: %v", err)
}
glog.V(2).Infof("Using node IP: %q", nodeIP.String())
}
if externalCloudProvider {
if nodeIP != nil {
if node.ObjectMeta.Annotations == nil {
node.ObjectMeta.Annotations = make(map[string]string)
}
node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = nodeIP.String()
}
// We rely on the external cloud provider to supply the addresses.
return nil
}
if cloud != nil {
nodeAddresses, err := nodeAddressesFunc()
if err != nil {
return err
}
if nodeIP != nil {
enforcedNodeAddresses := []v1.NodeAddress{}
var nodeIPType v1.NodeAddressType
for _, nodeAddress := range nodeAddresses {
if nodeAddress.Address == nodeIP.String() {
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
nodeIPType = nodeAddress.Type
break
}
}
if len(enforcedNodeAddresses) > 0 {
for _, nodeAddress := range nodeAddresses {
if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName {
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
}
}
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: hostname})
node.Status.Addresses = enforcedNodeAddresses
return nil
}
return fmt.Errorf("failed to get node address from cloud provider that matches ip: %v", nodeIP)
}
// Only add a NodeHostName address if the cloudprovider did not specify any addresses.
// (we assume the cloudprovider is authoritative if it specifies any addresses)
if len(nodeAddresses) == 0 {
nodeAddresses = []v1.NodeAddress{{Type: v1.NodeHostName, Address: hostname}}
}
node.Status.Addresses = nodeAddresses
} else {
var ipAddr net.IP
var err error
// 1) Use nodeIP if set
// 2) If the user has specified an IP to HostnameOverride, use it
// 3) Lookup the IP from node name by DNS and use the first valid IPv4 address.
// If the node does not have a valid IPv4 address, use the first valid IPv6 address.
// 4) Try to get the IP from the network interface used as default gateway
if nodeIP != nil {
ipAddr = nodeIP
} else if addr := net.ParseIP(hostname); addr != nil {
ipAddr = addr
} else {
var addrs []net.IP
addrs, _ = net.LookupIP(node.Name)
for _, addr := range addrs {
if err = validateNodeIPFunc(addr); err == nil {
if addr.To4() != nil {
ipAddr = addr
break
}
if addr.To16() != nil && ipAddr == nil {
ipAddr = addr
}
}
}
if ipAddr == nil {
ipAddr, err = utilnet.ChooseHostInterface()
}
}
if ipAddr == nil {
// We tried everything we could, but the IP address wasn't fetchable; error out
return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err)
}
node.Status.Addresses = []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: ipAddr.String()},
{Type: v1.NodeHostName, Address: hostname},
}
}
return nil
}
}
// MachineInfo returns a Setter that updates machine-related information on the node.
func MachineInfo(nodeName string,
maxPods int,
podsPerCore int,
machineInfoFunc func() (*cadvisorapiv1.MachineInfo, error), // typically Kubelet.GetCachedMachineInfo
capacityFunc func() v1.ResourceList, // typically Kubelet.containerManager.GetCapacity
devicePluginResourceCapacityFunc func() (v1.ResourceList, v1.ResourceList, []string), // typically Kubelet.containerManager.GetDevicePluginResourceCapacity
nodeAllocatableReservationFunc func() v1.ResourceList, // typically Kubelet.containerManager.GetNodeAllocatableReservation
recordEventFunc func(eventType, event, message string), // typically Kubelet.recordEvent
) Setter {
return func(node *v1.Node) error {
// Note: avoid blindly overwriting the capacity in case opaque
// resources are being advertised.
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
var devicePluginAllocatable v1.ResourceList
var devicePluginCapacity v1.ResourceList
var removedDevicePlugins []string
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := machineInfoFunc()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity[v1.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI)
node.Status.Capacity[v1.ResourceMemory] = resource.MustParse("0Gi")
node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(int64(maxPods), resource.DecimalSI)
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) {
node.Status.Capacity[rName] = rCap
}
if podsPerCore > 0 {
node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(
int64(math.Min(float64(info.NumCores*podsPerCore), float64(maxPods))), resource.DecimalSI)
} else {
node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(
int64(maxPods), resource.DecimalSI)
}
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
recordEventFunc(v1.EventTypeWarning, events.NodeRebooted,
fmt.Sprintf("Node %s has been rebooted, boot id: %s", nodeName, info.BootID))
}
node.Status.NodeInfo.BootID = info.BootID
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
// TODO: all the node resources should use ContainerManager.GetCapacity instead of deriving the
// capacity for every node status request
initialCapacity := capacityFunc()
if initialCapacity != nil {
node.Status.Capacity[v1.ResourceEphemeralStorage] = initialCapacity[v1.ResourceEphemeralStorage]
}
}
devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = devicePluginResourceCapacityFunc()
if devicePluginCapacity != nil {
for k, v := range devicePluginCapacity {
if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
}
node.Status.Capacity[k] = v
}
}
for _, removedResource := range removedDevicePlugins {
glog.V(2).Infof("Set capacity for %s to 0 on device removal", removedResource)
// Set the capacity of the removed resource to 0 instead of
// removing the resource from the node status. This is to indicate
// that the resource is managed by device plugin and had been
// registered before.
//
// This is required to differentiate the device plugin managed
// resources and the cluster-level resources, which are absent in
// node status.
node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
}
}
// Set Allocatable.
if node.Status.Allocatable == nil {
node.Status.Allocatable = make(v1.ResourceList)
}
// Remove extended resources from allocatable that are no longer
// present in capacity.
for k := range node.Status.Allocatable {
_, found := node.Status.Capacity[k]
if !found && v1helper.IsExtendedResourceName(k) {
delete(node.Status.Allocatable, k)
}
}
allocatableReservation := nodeAllocatableReservationFunc()
for k, v := range node.Status.Capacity {
value := *(v.Copy())
if res, exists := allocatableReservation[k]; exists {
value.Sub(res)
}
if value.Sign() < 0 {
// Negative Allocatable resources don't make sense.
value.Set(0)
}
node.Status.Allocatable[k] = value
}
if devicePluginAllocatable != nil {
for k, v := range devicePluginAllocatable {
if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() {
glog.V(2).Infof("Update allocatable for %s to %d", k, v.Value())
}
node.Status.Allocatable[k] = v
}
}
// for every huge page reservation, we need to remove it from allocatable memory
for k, v := range node.Status.Capacity {
if v1helper.IsHugePageResourceName(k) {
allocatableMemory := node.Status.Allocatable[v1.ResourceMemory]
value := *(v.Copy())
allocatableMemory.Sub(value)
if allocatableMemory.Sign() < 0 {
// Negative Allocatable resources don't make sense.
allocatableMemory.Set(0)
}
node.Status.Allocatable[v1.ResourceMemory] = allocatableMemory
}
}
return nil
}
}
// VersionInfo returns a Setter that updates version-related information on the node.
func VersionInfo(versionInfoFunc func() (*cadvisorapiv1.VersionInfo, error), // typically Kubelet.cadvisor.VersionInfo
runtimeTypeFunc func() string, // typically Kubelet.containerRuntime.Type
runtimeVersionFunc func() (kubecontainer.Version, error), // typically Kubelet.containerRuntime.Version
) Setter {
return func(node *v1.Node) error {
verinfo, err := versionInfoFunc()
if err != nil {
// TODO(mtaufen): consider removing this log line, since returned error will be logged
glog.Errorf("Error getting version info: %v", err)
return fmt.Errorf("error getting version info: %v", err)
}
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion
runtimeVersion := "Unknown"
if runtimeVer, err := runtimeVersionFunc(); err == nil {
runtimeVersion = runtimeVer.String()
}
node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", runtimeTypeFunc(), runtimeVersion)
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
return nil
}
}
// DaemonEndpoints returns a Setter that updates the daemon endpoints on the node.
func DaemonEndpoints(daemonEndpoints *v1.NodeDaemonEndpoints) Setter {
return func(node *v1.Node) error {
node.Status.DaemonEndpoints = *daemonEndpoints
return nil
}
}
// Images returns a Setter that updates the images on the node.
// imageListFunc is expected to return a list of images sorted in descending order by image size.
// nodeStatusMaxImages is ignored if set to -1.
func Images(nodeStatusMaxImages int32,
imageListFunc func() ([]kubecontainer.Image, error), // typically Kubelet.imageManager.GetImageList
) Setter {
return func(node *v1.Node) error {
// Update image list of this node
var imagesOnNode []v1.ContainerImage
containerImages, err := imageListFunc()
if err != nil {
// TODO(mtaufen): consider removing this log line, since returned error will be logged
glog.Errorf("Error getting image list: %v", err)
node.Status.Images = imagesOnNode
return fmt.Errorf("error getting image list: %v", err)
}
// we expect imageListFunc to return a sorted list, so we just need to truncate
if int(nodeStatusMaxImages) > -1 &&
int(nodeStatusMaxImages) < len(containerImages) {
containerImages = containerImages[0:nodeStatusMaxImages]
}
for _, image := range containerImages {
names := append(image.RepoDigests, image.RepoTags...)
// Report up to MaxNamesPerImageInNodeStatus names per image.
if len(names) > MaxNamesPerImageInNodeStatus {
names = names[0:MaxNamesPerImageInNodeStatus]
}
imagesOnNode = append(imagesOnNode, v1.ContainerImage{
Names: names,
SizeBytes: image.Size,
})
}
node.Status.Images = imagesOnNode
return nil
}
}
// GoRuntime returns a Setter that sets GOOS and GOARCH on the node.
func GoRuntime() Setter {
return func(node *v1.Node) error {
node.Status.NodeInfo.OperatingSystem = goruntime.GOOS
node.Status.NodeInfo.Architecture = goruntime.GOARCH
return nil
}
}
// ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node.
func ReadyCondition(
nowFunc func() time.Time, // typically Kubelet.clock.Now
runtimeErrorsFunc func() []string, // typically Kubelet.runtimeState.runtimeErrors
networkErrorsFunc func() []string, // typically Kubelet.runtimeState.networkErrors
appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
) Setter {
return func(node *v1.Node) error {
// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
// This is due to an issue with version skewed kubelet and master components.
// ref: https://github.com/kubernetes/kubernetes/issues/16961
currentTime := metav1.NewTime(nowFunc())
newNodeReadyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
rs := append(runtimeErrorsFunc(), networkErrorsFunc()...)
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
}
missingCapacities := []string{}
for _, resource := range requiredCapacities {
if _, found := node.Status.Capacity[resource]; !found {
missingCapacities = append(missingCapacities, string(resource))
}
}
if len(missingCapacities) > 0 {
rs = append(rs, fmt.Sprintf("Missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
}
if len(rs) > 0 {
newNodeReadyCondition = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(rs, ","),
LastHeartbeatTime: currentTime,
}
}
// Append AppArmor status if it's enabled.
// TODO(tallclair): This is a temporary message until node feature reporting is added.
if appArmorValidateHostFunc != nil && newNodeReadyCondition.Status == v1.ConditionTrue {
if err := appArmorValidateHostFunc(); err == nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message)
}
}
// Record any soft requirements that were not met in the container manager.
status := cmStatusFunc()
if status.SoftRequirements != nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error())
}
readyConditionUpdated := false
needToRecordEvent := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeReady {
if node.Status.Conditions[i].Status == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
needToRecordEvent = true
}
node.Status.Conditions[i] = newNodeReadyCondition
readyConditionUpdated = true
break
}
}
if !readyConditionUpdated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if needToRecordEvent {
if newNodeReadyCondition.Status == v1.ConditionTrue {
recordEventFunc(v1.EventTypeNormal, events.NodeReady)
} else {
recordEventFunc(v1.EventTypeNormal, events.NodeNotReady)
glog.Infof("Node became not ready: %+v", newNodeReadyCondition)
}
}
return nil
}
}
// MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node.
func MemoryPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderMemoryPressure
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
) Setter {
return func(node *v1.Node) error {
currentTime := metav1.NewTime(nowFunc())
var condition *v1.NodeCondition
// Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeMemoryPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodeMemoryPressure condition doesn't exist, create one
if condition == nil {
condition = &v1.NodeCondition{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeMemoryPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to v1.ConditionUnknown which matches either
// condition.Status != v1.ConditionTrue or
// condition.Status != v1.ConditionFalse in the conditions below depending on whether
// the kubelet is under memory pressure or not.
if pressureFunc() {
if condition.Status != v1.ConditionTrue {
condition.Status = v1.ConditionTrue
condition.Reason = "KubeletHasInsufficientMemory"
condition.Message = "kubelet has insufficient memory available"
condition.LastTransitionTime = currentTime
recordEventFunc(v1.EventTypeNormal, "NodeHasInsufficientMemory")
}
} else if condition.Status != v1.ConditionFalse {
condition.Status = v1.ConditionFalse
condition.Reason = "KubeletHasSufficientMemory"
condition.Message = "kubelet has sufficient memory available"
condition.LastTransitionTime = currentTime
recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientMemory")
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
return nil
}
}
// PIDPressureCondition returns a Setter that updates the v1.NodePIDPressure condition on the node.
func PIDPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderPIDPressure
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
) Setter {
return func(node *v1.Node) error {
currentTime := metav1.NewTime(nowFunc())
var condition *v1.NodeCondition
// Check if NodePIDPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodePIDPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodePIDPressure condition doesn't exist, create one
if condition == nil {
condition = &v1.NodeCondition{
Type: v1.NodePIDPressure,
Status: v1.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodePIDPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to v1.ConditionUnknown which matches either
// condition.Status != v1.ConditionTrue or
// condition.Status != v1.ConditionFalse in the conditions below depending on whether
// the kubelet is under PID pressure or not.
if pressureFunc() {
if condition.Status != v1.ConditionTrue {
condition.Status = v1.ConditionTrue
condition.Reason = "KubeletHasInsufficientPID"
condition.Message = "kubelet has insufficient PID available"
condition.LastTransitionTime = currentTime
recordEventFunc(v1.EventTypeNormal, "NodeHasInsufficientPID")
}
} else if condition.Status != v1.ConditionFalse {
condition.Status = v1.ConditionFalse
condition.Reason = "KubeletHasSufficientPID"
condition.Message = "kubelet has sufficient PID available"
condition.LastTransitionTime = currentTime
recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientPID")
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
return nil
}
}
// DiskPressureCondition returns a Setter that updates the v1.NodeDiskPressure condition on the node.
func DiskPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderDiskPressure
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
) Setter {
return func(node *v1.Node) error {
currentTime := metav1.NewTime(nowFunc())
var condition *v1.NodeCondition
// Check if NodeDiskPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeDiskPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodeDiskPressure condition doesn't exist, create one
if condition == nil {
condition = &v1.NodeCondition{
Type: v1.NodeDiskPressure,
Status: v1.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeDiskPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to v1.ConditionUnknown which matches either
// condition.Status != v1.ConditionTrue or
// condition.Status != v1.ConditionFalse in the conditions below depending on whether
// the kubelet is under disk pressure or not.
if pressureFunc() {
if condition.Status != v1.ConditionTrue {
condition.Status = v1.ConditionTrue
condition.Reason = "KubeletHasDiskPressure"
condition.Message = "kubelet has disk pressure"
condition.LastTransitionTime = currentTime
recordEventFunc(v1.EventTypeNormal, "NodeHasDiskPressure")
}
} else if condition.Status != v1.ConditionFalse {
condition.Status = v1.ConditionFalse
condition.Reason = "KubeletHasNoDiskPressure"
condition.Message = "kubelet has no disk pressure"
condition.LastTransitionTime = currentTime
recordEventFunc(v1.EventTypeNormal, "NodeHasNoDiskPressure")
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
return nil
}
}
// OutOfDiskCondition returns a Setter that updates the v1.NodeOutOfDisk condition on the node.
// TODO(#65658): remove this condition
func OutOfDiskCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
) Setter {
return func(node *v1.Node) error {
currentTime := metav1.NewTime(nowFunc())
var nodeOODCondition *v1.NodeCondition
// Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeOutOfDisk {
nodeOODCondition = &node.Status.Conditions[i]
}
}
newOODCondition := nodeOODCondition == nil
if newOODCondition {
nodeOODCondition = &v1.NodeCondition{}
}
if nodeOODCondition.Status != v1.ConditionFalse {
nodeOODCondition.Type = v1.NodeOutOfDisk
nodeOODCondition.Status = v1.ConditionFalse
nodeOODCondition.Reason = "KubeletHasSufficientDisk"
nodeOODCondition.Message = "kubelet has sufficient disk space available"
nodeOODCondition.LastTransitionTime = currentTime
recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientDisk")
}
// Update the heartbeat time irrespective of all the conditions.
nodeOODCondition.LastHeartbeatTime = currentTime
if newOODCondition {
node.Status.Conditions = append(node.Status.Conditions, *nodeOODCondition)
}
return nil
}
}
// VolumesInUse returns a Setter that updates the volumes in use on the node.
func VolumesInUse(syncedFunc func() bool, // typically Kubelet.volumeManager.ReconcilerStatesHasBeenSynced
volumesInUseFunc func() []v1.UniqueVolumeName, // typically Kubelet.volumeManager.GetVolumesInUse
) Setter {
return func(node *v1.Node) error {
// Make sure to only update node status after reconciler starts syncing up states
if syncedFunc() {
node.Status.VolumesInUse = volumesInUseFunc()
}
return nil
}
}
// VolumeLimits returns a Setter that updates the volume limits on the node.
func VolumeLimits(volumePluginListFunc func() []volume.VolumePluginWithAttachLimits, // typically Kubelet.volumePluginMgr.ListVolumePluginWithLimits
) Setter {
return func(node *v1.Node) error {
if node.Status.Capacity == nil {
node.Status.Capacity = v1.ResourceList{}
}
if node.Status.Allocatable == nil {
node.Status.Allocatable = v1.ResourceList{}
}
pluginWithLimits := volumePluginListFunc()
for _, volumePlugin := range pluginWithLimits {
attachLimits, err := volumePlugin.GetVolumeLimits()
if err != nil {
glog.V(4).Infof("Error getting volume limit for plugin %s", volumePlugin.GetPluginName())
continue
}
for limitKey, value := range attachLimits {
node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
}
}
return nil
}
}

File diff suppressed because it is too large Load Diff

View File

@ -223,6 +223,9 @@ type FakeVolumePlugin struct {
LastProvisionerOptions VolumeOptions
NewAttacherCallCount int
NewDetacherCallCount int
VolumeLimits map[string]int64
VolumeLimitsError error
LimitKey string
Mounters []*FakeVolume
Unmounters []*FakeVolume
@ -238,6 +241,7 @@ var _ RecyclableVolumePlugin = &FakeVolumePlugin{}
var _ DeletableVolumePlugin = &FakeVolumePlugin{}
var _ ProvisionableVolumePlugin = &FakeVolumePlugin{}
var _ AttachableVolumePlugin = &FakeVolumePlugin{}
var _ VolumePluginWithAttachLimits = &FakeVolumePlugin{}
func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
volume := &FakeVolume{}
@ -448,6 +452,14 @@ func (plugin *FakeVolumePlugin) RequiresFSResize() bool {
return true
}
func (plugin *FakeVolumePlugin) GetVolumeLimits() (map[string]int64, error) {
return plugin.VolumeLimits, plugin.VolumeLimitsError
}
func (plugin *FakeVolumePlugin) VolumeLimitKey(spec *Spec) string {
return plugin.LimitKey
}
type FakeFileVolumePlugin struct {
}