Merge pull request #14292 from yujuhong/revert_node

Revert node status manager
pull/6/head
Dawn Chen 2015-09-21 23:13:18 -07:00
commit a010e91913
8 changed files with 792 additions and 1042 deletions

View File

@ -1,46 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"net"
"k8s.io/kubernetes/pkg/api"
)
type fakeNodeManager struct {
podCIDR string
node *api.Node
IP net.IP
}
var _ nodeManager = &fakeNodeManager{}
func (f *fakeNodeManager) Start() {
}
func (f *fakeNodeManager) GetNode() (*api.Node, error) {
return f.node, nil
}
func (f *fakeNodeManager) GetHostIP() (net.IP, error) {
return f.IP, nil
}
func (f *fakeNodeManager) GetPodCIDR() string {
return f.podCIDR
}

View File

@ -36,6 +36,7 @@ import (
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
@ -63,9 +64,11 @@ import (
utilErrors "k8s.io/kubernetes/pkg/util/errors"
kubeio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@ -76,6 +79,9 @@ const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 5 * time.Minute
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// Location of container logs.
containerLogsDir = "/var/log/containers"
@ -200,6 +206,23 @@ func NewMainKubelet(
}
serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.Nodes().List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
// TODO: get the real node object of ourself,
// and use the real node name and UID.
// TODO: what is namespace for node?
@ -241,10 +264,12 @@ func NewMainKubelet(
readinessManager: readinessManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
registerNode: registerNode,
standaloneMode: standaloneMode,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
runtimeMutex: sync.Mutex{},
runtimeUpThreshold: maxWaitForContainerRuntime,
lastTimestampRuntimeUp: time.Time{},
@ -259,6 +284,7 @@ func NewMainKubelet(
volumeManager: volumeManager,
cloud: cloud,
nodeRef: nodeRef,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
oomWatcher: oomWatcher,
@ -266,10 +292,12 @@ func NewMainKubelet(
mounter: mounter,
writer: writer,
configureCBR0: configureCBR0,
podCIDR: podCIDR,
pods: pods,
syncLoopMonitor: util.AtomicValue{},
resolverConfig: resolverConfig,
cpuCFSQuota: cpuCFSQuota,
daemonEndpoints: daemonEndpoints,
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@ -278,7 +306,7 @@ func NewMainKubelet(
klet.networkPlugin = plug
}
machineInfo, err := klet.GetMachineInfo()
machineInfo, err := klet.GetCachedMachineInfo()
if err != nil {
return nil, err
}
@ -340,10 +368,11 @@ func NewMainKubelet(
}
klet.containerManager = containerManager
klet.nodeManager = newRealNodeManager(kubeClient, cloud, registerNode, nodeStatusUpdateFrequency, recorder, nodeName, hostname, podCIDR, pods, klet, daemonEndpoints, nodeRef)
klet.nodeManager.Start()
go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop)
if klet.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop)
}
// Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
@ -387,6 +416,11 @@ type serviceLister interface {
List() (api.ServiceList, error)
}
type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
@ -415,6 +449,8 @@ type Kubelet struct {
// cAdvisor used for container information.
cadvisor cadvisor.Interface
// Set to true to have the node register itself with the apiserver.
registerNode bool
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
@ -429,6 +465,7 @@ type Kubelet struct {
masterServiceNamespace string
serviceLister serviceLister
nodeLister nodeLister
// Last timestamp when runtime responded on ping.
// Mutex is used to protect this value.
@ -468,18 +505,13 @@ type Kubelet struct {
// Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorApi.MachineInfo
// nodeManager handles interaction of api.Node object with apiserver. This
// includes watching the nodes, registering itself to the apiserver, and
// sync node status.
nodeManager nodeManager
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager status.Manager
// Manager for the volume maps for the pods.
volumeManager *volumeManager
// Cloud provider interface
//Cloud provider interface
cloud cloudprovider.Interface
// Reference to this node.
@ -488,6 +520,19 @@ type Kubelet struct {
// Container runtime.
containerRuntime kubecontainer.Runtime
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliably if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
@ -512,6 +557,7 @@ type Kubelet struct {
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
configureCBR0 bool
podCIDR string
// Number of Pods which can be run by this Kubelet
pods int
@ -535,6 +581,9 @@ type Kubelet struct {
// True if container cpu limits should be enforced via cgroup CFS quota
cpuCFSQuota bool
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *api.NodeDaemonEndpoints
}
// getRootDir returns the full path to the directory under which kubelet can
@ -676,6 +725,13 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
return pods, nil
}
func (kl *Kubelet) GetNode() (*api.Node, error) {
if kl.standaloneMode {
return nil, errors.New("no node entry for kubelet in standalone mode")
}
return kl.nodeLister.GetNodeInfo(kl.nodeName)
}
// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
go util.Until(func() {
@ -740,6 +796,117 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl)
}
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: kl.nodeName,
Labels: map[string]string{"kubernetes.io/hostname": kl.hostname},
},
}
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(kl.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
if err != nil {
return nil, err
}
} else {
node.Spec.ExternalID = kl.hostname
}
if err := kl.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
func (kl *Kubelet) registerWithApiserver() {
if kl.registrationCompleted {
return
}
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := kl.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
currentNode, err := kl.kubeClient.Nodes().Get(kl.nodeName)
if err != nil {
glog.Errorf("error getting node %q: %v", kl.nodeName, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.nodeName)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
kl.registrationCompleted = true
return
}
glog.Errorf(
"Previously %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID,
)
if err := kl.kubeClient.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete old node: %v", err)
} else {
glog.Errorf("Deleted old node object %q", kl.nodeName)
}
continue
}
glog.Infof("Successfully registered node %s", node.Name)
kl.registrationCompleted = true
return
}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithApiserver()
}
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
func makeMounts(container *api.Container, podVolumes kubecontainer.VolumeMap) (mounts []kubecontainer.Mount) {
for _, mount := range container.VolumeMounts {
vol, ok := podVolumes[mount.Name]
@ -1617,7 +1784,7 @@ func hasHostPortConflicts(pods []*api.Pod) bool {
// TODO: Consider integrate disk space into this function, and returns a
// suitable reason and message per resource type.
func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) {
info, err := kl.GetMachineInfo()
info, err := kl.GetCachedMachineInfo()
if err != nil {
glog.Errorf("error getting machine info: %v", err)
// TODO: Should we admit the pod when machine info is unavailable?
@ -1657,7 +1824,7 @@ func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool {
if kl.standaloneMode {
return true
}
node, err := kl.nodeManager.GetNode()
node, err := kl.GetNode()
if err != nil {
glog.Errorf("error getting node: %v", err)
return true
@ -1707,12 +1874,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
var housekeepingTimestamp time.Time
for {
if !kl.ContainerRuntimeUp() {
if !kl.containerRuntimeUp() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, container runtime is not up.")
continue
}
if !kl.NetworkConfigured() {
if !kl.doneNetworkConfigure() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, network is not configured")
continue
@ -1872,10 +2039,6 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
return val.(time.Time)
}
func (kl *Kubelet) GetVersionInfo() (*cadvisorApi.VersionInfo, error) {
return kl.cadvisor.VersionInfo()
}
// Returns the container runtime version for this Kubelet.
func (kl *Kubelet) GetContainerRuntimeVersion() (kubecontainer.Version, error) {
if kl.containerRuntime == nil {
@ -1956,7 +2119,11 @@ func (kl *Kubelet) GetHostname() string {
// Returns host IP or nil in case of error.
func (kl *Kubelet) GetHostIP() (net.IP, error) {
return kl.nodeManager.GetHostIP()
node, err := kl.GetNode()
if err != nil {
return nil, fmt.Errorf("cannot get node: %v", err)
}
return nodeutil.GetNodeHostIP(node)
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
@ -2026,6 +2193,25 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
return kl.shaper.ReconcileInterface()
}
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}
func (kl *Kubelet) recordNodeStatusEvent(event string) {
glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.nodeName, event)
}
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
@ -2039,10 +2225,10 @@ func (kl *Kubelet) syncNetworkStatus() {
networkConfigured = false
glog.Errorf("Error on adding ip table rules: %v", err)
}
if len(kl.nodeManager.GetPodCIDR()) == 0 {
if len(kl.podCIDR) == 0 {
glog.Warningf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now")
networkConfigured = false
} else if err := kl.reconcileCBR0(kl.nodeManager.GetPodCIDR()); err != nil {
} else if err := kl.reconcileCBR0(kl.podCIDR); err != nil {
networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err)
}
@ -2050,18 +2236,214 @@ func (kl *Kubelet) syncNetworkStatus() {
kl.networkConfigured = networkConfigured
}
func (kl *Kubelet) ContainerRuntimeUp() bool {
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func (kl *Kubelet) setNodeStatus(node *api.Node) error {
// Set addresses for the node.
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
nodeAddresses, err := instances.NodeAddresses(kl.nodeName)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
addr := net.ParseIP(kl.hostname)
if addr != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: addr.String()},
{Type: api.NodeInternalIP, Address: addr.String()},
}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
// check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address.
// If no match is found, it uses the IP of the interface with gateway on it.
for _, ip := range addrs {
if ip.IsLoopback() {
continue
}
if ip.To4() != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
break
}
}
if len(node.Status.Addresses) == 0 {
ip, err := util.ChooseHostInterface()
if err != nil {
return err
}
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
}
}
}
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
api.ResourcePods: *resource.NewQuantity(int64(kl.pods), resource.DecimalSI),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = CapacityFromMachineInfo(info)
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(kl.pods), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, "Rebooted",
"Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
}
verinfo, err := kl.cadvisor.VersionInfo()
if err != nil {
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion
// TODO: Determine the runtime is docker or rocket
node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
node.Status.DaemonEndpoints = *kl.daemonEndpoints
// Check whether container runtime can be reported as up.
containerRuntimeUp := kl.containerRuntimeUp()
// Check whether network is configured properly
networkConfigured := kl.doneNetworkConfigure()
currentTime := unversioned.Now()
var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
} else {
var reasons []string
var messages []string
if !containerRuntimeUp {
messages = append(messages, "container runtime is down")
}
if !networkConfigured {
messages = append(reasons, "network not configured correctly")
}
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(messages, ","),
LastHeartbeatTime: currentTime,
}
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
oldNodeReadyConditionStatus = node.Status.Conditions[i].Status
if oldNodeReadyConditionStatus == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
}
node.Status.Conditions[i] = newNodeReadyCondition
updated = true
}
}
if !updated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status {
if newNodeReadyCondition.Status == api.ConditionTrue {
kl.recordNodeStatusEvent("NodeReady")
} else {
kl.recordNodeStatusEvent("NodeNotReady")
}
}
if oldNodeUnschedulable != node.Spec.Unschedulable {
if node.Spec.Unschedulable {
kl.recordNodeStatusEvent("NodeNotSchedulable")
} else {
kl.recordNodeStatusEvent("NodeSchedulable")
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
return nil
}
func (kl *Kubelet) containerRuntimeUp() bool {
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now())
}
func (kl *Kubelet) NetworkConfigured() bool {
func (kl *Kubelet) doneNetworkConfigure() bool {
kl.networkConfigMutex.Lock()
defer kl.networkConfigMutex.Unlock()
return kl.networkConfigured
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.nodeName)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.nodeName)
}
kl.networkConfigMutex.Lock()
kl.podCIDR = node.Spec.PodCIDR
kl.networkConfigMutex.Unlock()
if err := kl.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
_, err = kl.kubeClient.Nodes().UpdateStatus(node)
return err
}
// GetPhase returns the phase of a pod given its container info.
// This func is exported to simplify integration with 3rd party kubelet
// integrations like kubernetes-mesos.
@ -2258,7 +2640,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
}
if !kl.standaloneMode {
hostIP, err := kl.nodeManager.GetHostIP()
hostIP, err := kl.GetHostIP()
if err != nil {
glog.V(4).Infof("Cannot get host IP: %v", err)
} else {
@ -2398,8 +2780,8 @@ func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorApi.Co
}
}
// GetMachineInfo assumes that the machine info can't change without a reboot
func (kl *Kubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
// GetCachedMachineInfo assumes that the machine info can't change without a reboot
func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
if kl.machineInfo == nil {
info, err := kl.cadvisor.MachineInfo()
if err != nil {

View File

@ -34,6 +34,7 @@ import (
cadvisorApi "github.com/google/cadvisor/info/v1"
cadvisorApiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -45,9 +46,11 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
_ "k8s.io/kubernetes/pkg/volume/host_path"
)
@ -75,7 +78,6 @@ type TestKubelet struct {
fakeCadvisor *cadvisor.Mock
fakeKubeClient *testclient.Fake
fakeMirrorClient *fakeMirrorClient
fakeNodeManager *fakeNodeManager
}
func newTestKubelet(t *testing.T) *TestKubelet {
@ -102,15 +104,14 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.sourcesReady = func() bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.readinessManager = kubecontainer.NewReadinessManager()
kubelet.recorder = fakeRecorder
kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
fakeNodeManager := &fakeNodeManager{}
kubelet.nodeManager = fakeNodeManager
kubelet.daemonEndpoints = &api.NodeDaemonEndpoints{}
mockCadvisor := &cadvisor.Mock{}
kubelet.cadvisor = mockCadvisor
podManager, fakeMirrorClient := newFakePodManager()
@ -136,7 +137,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeNodeManager}
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
}
func newTestPods(count int) []*api.Pod {
@ -962,6 +963,25 @@ func (ls testServiceLister) List() (api.ServiceList, error) {
}, nil
}
type testNodeLister struct {
nodes []api.Node
}
func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) {
for _, node := range ls.nodes {
if node.Name == id {
return &node, nil
}
}
return nil, fmt.Errorf("Node with name: %s does not exist", id)
}
func (ls testNodeLister) List() (api.NodeList, error) {
return api.NodeList{
Items: ls.nodes,
}, nil
}
type envs []kubecontainer.EnvVar
func (e envs) Len() int {
@ -2154,10 +2174,10 @@ func TestHandlePortConflicts(t *testing.T) {
// Tests that we handle not matching labels selector correctly by setting the failed status in status map.
func TestHandleNodeSelector(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeNodeManager.node = &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}}
kl := testKubelet.kubelet
kl.nodeLister = testNodeLister{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorApiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorApiv2.FsInfo{}, nil)
@ -2368,6 +2388,309 @@ func TestValidateContainerStatus(t *testing.T) {
}
}
func TestUpdateNewNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}
func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{}, // placeholder
LastTransitionTime: unversioned.Time{}, // placeholder
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Errorf("unexpected actions: %v", actions)
}
updateAction, ok := actions[1].(testclient.UpdateAction)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
updatedNode, ok := updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
// Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same.
if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastHeartbeatTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) {
t.Errorf("expected \n%v\n, got \n%v", unversioned.Now(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
}
if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) {
t.Errorf("expected \n%#v\n, got \n%#v", updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy(),
unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
}
}
func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
fakeRuntime := testKubelet.fakeRuntime
// This causes returning an error from GetContainerRuntimeVersion() which
// simulates that container runtime is down.
fakeRuntime.VersionInfo = ""
kubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: fmt.Sprintf("container runtime is down"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
kubelet.runtimeUpThreshold = time.Duration(0)
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}
func TestUpdateNodeStatusError(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
// No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{}}).ReactionChain
if err := kubelet.updateNodeStatus(); err == nil {
t.Errorf("unexpected non error: %v", err)
}
if len(testKubelet.fakeKubeClient.Actions()) != nodeStatusUpdateRetry {
t.Errorf("unexpected actions: %v", testKubelet.fakeKubeClient.Actions())
}
}
func TestCreateMirrorPod(t *testing.T) {
for _, updateType := range []SyncPodType{SyncPodCreate, SyncPodUpdate} {
testKubelet := newTestKubelet(t)
@ -2743,6 +3066,55 @@ func TestFilterOutTerminatedPods(t *testing.T) {
}
}
func TestRegisterExistingNodeWithApiserver(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("create", "nodes", func(action testclient.Action) (bool, runtime.Object, error) {
// Return an error on create.
return true, &api.Node{}, &apierrors.StatusError{
ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists},
}
})
kubeClient.AddReactor("get", "nodes", func(action testclient.Action) (bool, runtime.Object, error) {
// Return an existing (matching) node on get.
return true, &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{ExternalID: testKubeletHostname},
}, nil
})
kubeClient.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
done := make(chan struct{})
go func() {
kubelet.registerWithApiserver()
done <- struct{}{}
}()
select {
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for registration")
case <-done:
return
}
}
func TestMakePortMappings(t *testing.T) {
tests := []struct {
container *api.Container

View File

@ -1,513 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
)
const (
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
)
type infoGetter interface {
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
ContainerRuntimeUp() bool
NetworkConfigured() bool
GetVersionInfo() (*cadvisorApi.VersionInfo, error)
}
type nodeManager interface {
Start()
GetNode() (*api.Node, error)
GetPodCIDR() string
GetHostIP() (net.IP, error)
}
type realNodeManager struct {
// apiserver client.
client client.Interface
nodeLister nodeLister
// Set to true to have the node register itself with the apiserver.
registerNode bool
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliably if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// Cloud provider interface
cloud cloudprovider.Interface
nodeName string
hostname string
// Number of Pods which can be run by this Kubelet.
pods int
// The EventRecorder to use
recorder record.EventRecorder
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *api.NodeDaemonEndpoints
// Interface to get machine and version info.
infoGetter infoGetter
// Reference to this node.
nodeRef *api.ObjectReference
// podCIDR may be updated by node.Spec.
podCIDR string
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
lock sync.RWMutex
}
func newRealNodeManager(client client.Interface, cloud cloudprovider.Interface, registerNode bool,
nodeStatusUpdateFrequency time.Duration, recorder record.EventRecorder, nodeName, hostname, podCIDR string,
pods int, infoGetter infoGetter, daemonEndpoints *api.NodeDaemonEndpoints, nodeRef *api.ObjectReference) *realNodeManager {
return &realNodeManager{
client: client,
cloud: cloud,
registerNode: registerNode,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
recorder: recorder,
nodeName: nodeName,
hostname: hostname,
podCIDR: podCIDR,
pods: pods,
infoGetter: infoGetter,
daemonEndpoints: daemonEndpoints,
nodeRef: nodeRef,
}
}
type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}
func (nm *realNodeManager) Start() {
if nm.client == nil {
return
}
nm.setNodeLister()
go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop)
}
func (nm *realNodeManager) GetPodCIDR() string {
nm.lock.RLock()
defer nm.lock.RUnlock()
return nm.podCIDR
}
func (nm *realNodeManager) GetNode() (*api.Node, error) {
if nm.client == nil {
return nil, errors.New("unable to get node entry because apiserver client is nil")
}
return nm.nodeLister.GetNodeInfo(nm.nodeName)
}
// Returns host IP or nil in case of error.
func (nm *realNodeManager) GetHostIP() (net.IP, error) {
if nm.client == nil {
return nil, errors.New("unable to get node entry because apiserver client is nil")
}
node, err := nm.GetNode()
if err != nil {
return nil, fmt.Errorf("cannot get node: %v", err)
}
return nodeutil.GetNodeHostIP(node)
}
func (nm *realNodeManager) setNodeLister() {
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{client.ObjectNameField: nm.nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return nm.client.Nodes().List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return nm.client.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
nm.nodeLister = &cache.StoreToNodeLister{Store: nodeStore}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
func (nm *realNodeManager) syncNodeStatus() {
if nm.registerNode {
// This will exit immediately if it doesn't need to do anything.
nm.registerWithApiserver()
}
if err := nm.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
func (nm *realNodeManager) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: nm.nodeName,
Labels: map[string]string{"kubernetes.io/hostname": nm.hostname},
},
}
if nm.cloud != nil {
instances, ok := nm.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(nm.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(nm.cloud, nm.nodeName)
if err != nil {
return nil, err
}
} else {
node.Spec.ExternalID = nm.hostname
}
if err := nm.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (nm.registrationCompleted is
// not locked).
func (nm *realNodeManager) registerWithApiserver() {
if nm.registrationCompleted {
return
}
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := nm.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := nm.client.Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
currentNode, err := nm.client.Nodes().Get(nm.nodeName)
if err != nil {
glog.Errorf("error getting node %q: %v", nm.nodeName, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", nm.nodeName)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
nm.registrationCompleted = true
return
}
glog.Errorf(
"Previously %q had externalID %q; now it is %q; will delete and recreate.",
nm.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID,
)
if err := nm.client.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete old node: %v", err)
} else {
glog.Errorf("Deleted old node object %q", nm.nodeName)
}
continue
}
glog.Infof("Successfully registered node %s", node.Name)
nm.registrationCompleted = true
return
}
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func (nm *realNodeManager) setNodeStatus(node *api.Node) error {
// Set addresses for the node.
if nm.cloud != nil {
instances, ok := nm.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
nodeAddresses, err := instances.NodeAddresses(nm.nodeName)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
addr := net.ParseIP(nm.hostname)
if addr != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: addr.String()},
{Type: api.NodeInternalIP, Address: addr.String()},
}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
// check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address.
// If no match is found, it uses the IP of the interface with gateway on it.
for _, ip := range addrs {
if ip.IsLoopback() {
continue
}
if ip.To4() != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
break
}
}
if len(node.Status.Addresses) == 0 {
ip, err := util.ChooseHostInterface()
if err != nil {
return err
}
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
}
}
}
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := nm.infoGetter.GetMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
api.ResourcePods: *resource.NewQuantity(int64(nm.pods), resource.DecimalSI),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = CapacityFromMachineInfo(info)
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(nm.pods), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
nm.recorder.Eventf(nm.nodeRef, "Rebooted",
"Node %s has been rebooted, boot id: %s", nm.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
}
verinfo, err := nm.infoGetter.GetVersionInfo()
if err != nil {
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion
// TODO: Determine the runtime is docker or rocket
node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
node.Status.DaemonEndpoints = *nm.daemonEndpoints
// Check whether container runtime can be reported as up.
containerRuntimeUp := nm.infoGetter.ContainerRuntimeUp()
// Check whether network is configured properly
networkConfigured := nm.infoGetter.NetworkConfigured()
currentTime := unversioned.Now()
var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
} else {
var reasons []string
var messages []string
if !containerRuntimeUp {
messages = append(messages, "container runtime is down")
}
if !networkConfigured {
messages = append(reasons, "network not configured correctly")
}
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(messages, ","),
LastHeartbeatTime: currentTime,
}
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
oldNodeReadyConditionStatus = node.Status.Conditions[i].Status
if oldNodeReadyConditionStatus == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
}
node.Status.Conditions[i] = newNodeReadyCondition
updated = true
}
}
if !updated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status {
if newNodeReadyCondition.Status == api.ConditionTrue {
nm.recordNodeStatusEvent("NodeReady")
} else {
nm.recordNodeStatusEvent("NodeNotReady")
}
}
if oldNodeUnschedulable != node.Spec.Unschedulable {
if node.Spec.Unschedulable {
nm.recordNodeStatusEvent("NodeNotSchedulable")
} else {
nm.recordNodeStatusEvent("NodeSchedulable")
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
return nil
}
// updateNodeStatus updates node status to master with retries.
func (nm *realNodeManager) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := nm.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}
func (nm *realNodeManager) recordNodeStatusEvent(event string) {
glog.V(2).Infof("Recording %s event message for node %s", event, nm.nodeName)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
nm.recorder.Eventf(nm.nodeRef, event, "Node %s status is now: %s", nm.nodeName, event)
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (nm *realNodeManager) tryUpdateNodeStatus() error {
node, err := nm.client.Nodes().Get(nm.nodeName)
if err != nil {
return fmt.Errorf("error getting node %q: %v", nm.nodeName, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", nm.nodeName)
}
nm.lock.Lock()
defer nm.lock.Unlock()
nm.podCIDR = node.Spec.PodCIDR
if err := nm.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
_, err = nm.client.Nodes().UpdateStatus(node)
return err
}

View File

@ -1,445 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"reflect"
"testing"
"time"
cadvisorApi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/version"
)
type fakeInfoGetter struct {
machineInfo *cadvisorApi.MachineInfo
versionInfo *cadvisorApi.VersionInfo
runtimeUp bool
networkConfigured bool
}
func (f *fakeInfoGetter) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
return f.machineInfo, nil
}
func (f *fakeInfoGetter) GetVersionInfo() (*cadvisorApi.VersionInfo, error) {
return f.versionInfo, nil
}
func (f *fakeInfoGetter) ContainerRuntimeUp() bool {
return f.runtimeUp
}
func (f *fakeInfoGetter) NetworkConfigured() bool {
return f.networkConfigured
}
var _ infoGetter = &fakeInfoGetter{}
type testNodeLister struct {
nodes []api.Node
}
func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) {
for _, node := range ls.nodes {
if node.Name == id {
return &node, nil
}
}
return nil, fmt.Errorf("Node with name: %s does not exist", id)
}
func (ls testNodeLister) List() (api.NodeList, error) {
return api.NodeList{
Items: ls.nodes,
}, nil
}
type testNodeManager struct {
fakeClient *testclient.Fake
fakeInfoGetter *fakeInfoGetter
nodeManager *realNodeManager
}
func newTestNodeManager() *testNodeManager {
fakeRecorder := &record.FakeRecorder{}
fakeClient := &testclient.Fake{}
fakeInfoGetter := &fakeInfoGetter{}
nodeManager := newRealNodeManager(fakeClient, nil, true, time.Second, fakeRecorder, testKubeletHostname,
testKubeletHostname, "", 0, fakeInfoGetter, &api.NodeDaemonEndpoints{}, nil)
nodeManager.nodeLister = &testNodeLister{}
return &testNodeManager{fakeClient: fakeClient, fakeInfoGetter: fakeInfoGetter, nodeManager: nodeManager}
}
func TestUpdateNewNodeStatus(t *testing.T) {
testNodeManager := newTestNodeManager()
nodeManager := testNodeManager.nodeManager
client := testNodeManager.fakeClient
fakeInfoGetter := testNodeManager.fakeInfoGetter
client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
fakeInfoGetter.runtimeUp = true
fakeInfoGetter.networkConfigured = true
if err := nodeManager.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := client.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}
func TestUpdateExistingNodeStatus(t *testing.T) {
testNodeManager := newTestNodeManager()
nodeManager := testNodeManager.nodeManager
client := testNodeManager.fakeClient
fakeInfoGetter := testNodeManager.fakeInfoGetter
client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(2048, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
},
}}).ReactionChain
fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{}, // placeholder
LastTransitionTime: unversioned.Time{}, // placeholder
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
fakeInfoGetter.runtimeUp = true
fakeInfoGetter.networkConfigured = true
if err := nodeManager.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := client.Actions()
if len(actions) != 2 {
t.Errorf("unexpected actions: %v", actions)
}
updateAction, ok := actions[1].(testclient.UpdateAction)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
updatedNode, ok := updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
// Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same.
if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastHeartbeatTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) {
t.Errorf("expected \n%v\n, got \n%v", unversioned.Now(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
}
if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time) {
t.Errorf("expected \n%#v\n, got \n%#v", updatedNode.Status.Conditions[0].LastTransitionTime.Rfc3339Copy(),
unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC))
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
}
}
func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
testNodeManager := newTestNodeManager()
nodeManager := testNodeManager.nodeManager
client := testNodeManager.fakeClient
fakeInfoGetter := testNodeManager.fakeInfoGetter
client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: fmt.Sprintf("container runtime is down"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OsImage: "Debian GNU/Linux 7 (wheezy)",
ContainerRuntimeVersion: "docker://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
},
}
// Pretend that container runtime is down.
fakeInfoGetter.runtimeUp = false
fakeInfoGetter.networkConfigured = true
if err := nodeManager.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := client.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
if updatedNode.Status.Conditions[0].LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[0].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[0].LastTransitionTime = unversioned.Time{}
if !reflect.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode))
}
}
func TestUpdateNodeStatusError(t *testing.T) {
testNodeManager := newTestNodeManager()
nodeManager := testNodeManager.nodeManager
client := testNodeManager.fakeClient
// No matching node for the kubelet
client.ReactionChain = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{}}).ReactionChain
if err := nodeManager.updateNodeStatus(); err == nil {
t.Errorf("unexpected non error: %v", err)
}
if len(client.Actions()) != nodeStatusUpdateRetry {
t.Errorf("unexpected actions: %v", client.Actions())
}
}
func TestRegisterExistingNodeWithApiserver(t *testing.T) {
testNodeManager := newTestNodeManager()
nodeManager := testNodeManager.nodeManager
client := testNodeManager.fakeClient
fakeInfoGetter := testNodeManager.fakeInfoGetter
client.AddReactor("create", "nodes", func(action testclient.Action) (bool, runtime.Object, error) {
// Return an error on create.
return true, &api.Node{}, &apierrors.StatusError{
ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists},
}
})
client.AddReactor("get", "nodes", func(action testclient.Action) (bool, runtime.Object, error) {
// Return an existing (matching) node on get.
return true, &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{ExternalID: testKubeletHostname},
}, nil
})
client.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
fakeInfoGetter.machineInfo = &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
fakeInfoGetter.versionInfo = &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
done := make(chan struct{})
go func() {
nodeManager.registerWithApiserver()
done <- struct{}{}
}()
select {
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for registration")
case <-done:
return
}
}

View File

@ -83,11 +83,11 @@ func TestRunOnce(t *testing.T) {
rootDirectory: "/tmp/kubelet",
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
nodeLister: testNodeLister{},
statusManager: status.NewManager(nil),
containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(),
podManager: podManager,
nodeManager: &fakeNodeManager{},
os: kubecontainer.FakeOS{},
volumeManager: newVolumeManager(),
diskSpaceManager: diskSpaceManager,

View File

@ -100,7 +100,7 @@ type HostInterface interface {
GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
GetContainerRuntimeVersion() (kubecontainer.Version, error)
GetRawContainerInfo(containerName string, req *cadvisorApi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorApi.ContainerInfo, error)
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
GetPods() []*api.Pod
GetRunningPods() ([]*api.Pod, error)
GetPodByName(namespace, name string) (*api.Pod, bool)
@ -440,7 +440,7 @@ func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
// getSpec handles spec requests against the Kubelet.
func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
info, err := s.host.GetMachineInfo()
info, err := s.host.GetCachedMachineInfo()
if err != nil {
response.WriteError(http.StatusInternalServerError, err)
return

View File

@ -86,7 +86,7 @@ func (fk *fakeKubelet) GetContainerRuntimeVersion() (kubecontainer.Version, erro
return fk.containerVersionFunc()
}
func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
return fk.machineInfoFunc()
}