Revert "Revert "Modify nodes to register directly with the master.""

This reverts commit c53786ab31.
pull/6/head
CJ Cullen 2015-05-20 14:21:03 -07:00
parent a76bdd9710
commit bf646abf8d
12 changed files with 314 additions and 876 deletions

View File

@ -22,6 +22,19 @@
{% set api_servers_with_port = api_servers + ":6443" -%}
{% endif -%}
# Disable registration for the kubelet running on the master on GCE.
# TODO(roberthbailey): Make this configurable via an env var in config-default.sh
{% if grains.cloud == 'gce' -%}
{% if grains['roles'][0] == 'kubernetes-master' -%}
{% set api_servers_with_port = "" -%}
{% endif -%}
{% endif -%}
{% set cloud_provider = "" -%}
{% if grains.cloud is defined -%}
{% set cloud_provider = "--cloud_provider=" + grains.cloud -%}
{% endif -%}
{% set config = "--config=/etc/kubernetes/manifests" -%}
{% set hostname_override = "" -%}
{% if grains.hostname_override is defined -%}
@ -45,4 +58,4 @@
{% set configure_cbr0 = "--configure-cbr0=" + pillar['allocate_node_cidrs'] -%}
{% endif -%}
DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{hostname_override}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{configure_cbr0}}"
DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{hostname_override}} {{cloud_provider}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{configure_cbr0}}"

View File

@ -101,7 +101,6 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
// Setup
servers := []string{}
glog.Infof("Creating etcd client pointing to %v", servers)
machineList := []string{"localhost", "127.0.0.1"}
handler := delegateHandler{}
apiServer := httptest.NewServer(&handler)
@ -196,7 +195,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
}}
nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(),
nodeController := nodecontroller.NewNodeController(nil, "", nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5*time.Second, true)
cadvisorInterface := new(cadvisor.Fake)
@ -206,7 +205,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
configFilePath := makeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.15"}
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{})
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, "localhost", testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{})
kubeletapp.RunKubelet(kcfg, nil)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
@ -214,7 +213,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
testRootDir = makeTempDirOrDie("kubelet_integ_2.", "")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
fakeDocker2.VersionInfo = docker.Env{"ApiVersion=1.15"}
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{})
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, "127.0.0.1", testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{})
kubeletapp.RunKubelet(kcfg, nil)
return apiServer.URL, configFilePath
}

View File

@ -228,7 +228,7 @@ func (s *CMServer) Run(_ []string) error {
glog.Warning("DEPRECATION NOTICE: sync-node-status flag is being deprecated. It has no effect now and it will be removed in a future version.")
}
nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, nodeResources,
kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)

View File

@ -89,6 +89,7 @@ type KubeletServer struct {
HealthzBindAddress util.IP
OOMScoreAdj int
APIServerList util.StringList
RegisterNode bool
ClusterDomain string
MasterServiceNamespace string
ClusterDNS util.IP
@ -155,6 +156,7 @@ func NewKubeletServer() *KubeletServer {
CadvisorPort: 4194,
HealthzPort: 10248,
HealthzBindAddress: util.IP(net.ParseIP("127.0.0.1")),
RegisterNode: true, // will be ignored if no apiserver is configured
OOMScoreAdj: -900,
MasterServiceNamespace: api.NamespaceDefault,
ImageGCHighThresholdPercent: 90,
@ -211,6 +213,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.Var(&s.HealthzBindAddress, "healthz-bind-address", "The IP address for the healthz server to serve on, defaulting to 127.0.0.1 (set to 0.0.0.0 for all interfaces)")
fs.IntVar(&s.OOMScoreAdj, "oom-score-adj", s.OOMScoreAdj, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]")
fs.Var(&s.APIServerList, "api-servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.")
fs.BoolVar(&s.RegisterNode, "register-node", s.RegisterNode, "Register the node with the apiserver (defaults to true if --api-server is set)")
fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.Var(&s.ClusterDNS, "cluster-dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
@ -318,6 +321,7 @@ func (s *KubeletServer) Run(_ []string) error {
MinimumGCAge: s.MinimumGCAge,
MaxPerPodContainerCount: s.MaxPerPodContainerCount,
MaxContainerCount: s.MaxContainerCount,
RegisterNode: s.RegisterNode,
ClusterDomain: s.ClusterDomain,
ClusterDNS: s.ClusterDNS,
Runonce: s.RunOnce,
@ -493,6 +497,7 @@ func SimpleKubelet(client *client.Client,
MinimumGCAge: 10 * time.Second,
MaxPerPodContainerCount: 5,
MaxContainerCount: 100,
RegisterNode: true,
MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins,
TLSOptions: tlsOptions,
@ -618,6 +623,7 @@ type KubeletConfig struct {
MinimumGCAge time.Duration
MaxPerPodContainerCount int
MaxContainerCount int
RegisterNode bool
ClusterDomain string
ClusterDNS util.IP
EnableServer bool
@ -675,6 +681,7 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.RegistryBurst,
gcPolicy,
pc.SeenAllSources,
kc.RegisterNode,
kc.ClusterDomain,
net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace,

View File

@ -123,7 +123,7 @@ func runScheduler(cl *client.Client) {
}
// RunControllerManager starts a controller
func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, nodeMemory int64) {
func runControllerManager(cl *client.Client, nodeMilliCPU, nodeMemory int64) {
nodeResources := &api.NodeResources{
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(nodeMilliCPU, resource.DecimalSI),
@ -133,7 +133,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
const nodeSyncPeriod = 10 * time.Second
nodeController := nodecontroller.NewNodeController(
nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst),
nil, "", nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(nodeSyncPeriod, true)
@ -150,18 +150,16 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
}
func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP, port int) {
machineList := []string{"localhost"}
runApiServer(etcdClient, addr, port, *masterServiceNamespace)
runScheduler(cl)
runControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
runControllerManager(cl, *nodeMilliCPU, *nodeMemory)
dockerClient := dockertools.ConnectToDockerOrDie(*dockerEndpoint)
cadvisorInterface, err := cadvisor.New(0)
if err != nil {
glog.Fatalf("Failed to create cAdvisor: %v", err)
}
kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.RealOS{})
kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, "localhost", "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.RealOS{})
kubeletapp.RunKubelet(kcfg, nil)
}

View File

@ -1222,6 +1222,8 @@ func ValidateNodeUpdate(oldNode *api.Node, node *api.Node) errs.ValidationErrorL
oldNode.ObjectMeta = node.ObjectMeta
// Allow users to update capacity
oldNode.Status.Capacity = node.Status.Capacity
// Allow the controller manager to assign a CIDR to a node.
oldNode.Spec.PodCIDR = node.Spec.PodCIDR
// Allow users to unschedule node
oldNode.Spec.Unschedulable = node.Spec.Unschedulable
// Clear status

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloudprovider
import (
"errors"
"net"
"strings"
@ -86,6 +87,8 @@ type Instances interface {
Release(name string) error
}
var InstanceNotFound = errors.New("instance not found")
// Zone represents the location of a particular machine.
type Zone struct {
FailureDomain string

View File

@ -444,7 +444,10 @@ func (gce *GCECloud) getInstanceByName(name string) (*compute.Instance, error) {
name = canonicalizeInstanceName(name)
res, err := gce.service.Instances.Get(gce.projectID, gce.zone, name).Do()
if err != nil {
glog.Errorf("Failed to retrieve TargetInstance resource for instance:%s", name)
glog.Errorf("Failed to retrieve TargetInstance resource for instance: %s", name)
if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == http.StatusNotFound {
return nil, cloudprovider.InstanceNotFound
}
return nil, err
}
return res, nil

View File

@ -20,14 +20,9 @@ import (
"errors"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
@ -57,7 +52,6 @@ type NodeController struct {
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
nodes []string
kubeClient client.Interface
recorder record.EventRecorder
registerRetryCount int
@ -100,7 +94,6 @@ type NodeController struct {
func NewNodeController(
cloud cloudprovider.Interface,
matchRE string,
nodes []string,
staticResources *api.NodeResources,
kubeClient client.Interface,
registerRetryCount int,
@ -125,7 +118,6 @@ func NewNodeController(
return &NodeController{
cloud: cloud,
matchRE: matchRE,
nodes: nodes,
staticResources: staticResources,
kubeClient: kubeClient,
recorder: recorder,
@ -144,9 +136,9 @@ func NewNodeController(
}
// Generates num pod CIDRs that could be assigned to nodes.
func (nc *NodeController) generateCIDRs(num int) util.StringSet {
func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet {
res := util.NewStringSet()
cidrIP := nc.clusterCIDR.IP.To4()
cidrIP := clusterCIDR.IP.To4()
for i := 0; i < num; i++ {
// TODO: Make the CIDRs configurable.
b1 := byte(i >> 8)
@ -156,37 +148,46 @@ func (nc *NodeController) generateCIDRs(num int) util.StringSet {
return res
}
// For each node from newNodes, finds its current spec in registeredNodes.
// If it is not there, it gets a new valid CIDR assigned.
func (nc *NodeController) reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList {
registeredCIDRs := make(map[string]string)
availableCIDRs := nc.generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items))
for _, node := range registeredNodes.Items {
registeredCIDRs[node.Name] = node.Spec.PodCIDR
availableCIDRs.Delete(node.Spec.PodCIDR)
}
for i, node := range newNodes.Items {
podCIDR, registered := registeredCIDRs[node.Name]
if !registered {
podCIDR, _ = availableCIDRs.PopAny()
// reconcilePodCIDRs looks at each node and assigns it a valid CIDR
// if it doesn't currently have one.
func (nc *NodeController) reconcilePodCIDRs(nodes *api.NodeList) {
glog.V(4).Infof("Reconciling pods cidrs for %d nodes", len(nodes.Items))
// TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs
// on each sync period?
availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items))
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name)
availableCIDRs.Delete(node.Spec.PodCIDR)
}
}
for _, node := range nodes.Items {
if node.Spec.PodCIDR == "" {
podCIDR, found := availableCIDRs.PopAny()
if !found {
glog.Errorf("No available CIDR for node %s", node.Name)
continue
}
glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR)
node.Spec.PodCIDR = podCIDR
if err := nc.configureNodeCIDR(&node); err != nil {
glog.Errorf("Error configuring node %s: %s", node.Name, err)
// The newly assigned CIDR was not properly configured, so don't save it in the API server.
continue
}
if _, err := nc.kubeClient.Nodes().Update(&node); err != nil {
glog.Errorf("Unable to assign node %s CIDR %s: %v", node.Name, podCIDR, err)
}
}
newNodes.Items[i].Spec.PodCIDR = podCIDR
}
return newNodes
}
func (nc *NodeController) configureNodeCIDR(node *api.Node) {
func (nc *NodeController) configureNodeCIDR(node *api.Node) error {
instances, ok := nc.cloud.Instances()
if !ok {
glog.Errorf("Error configuring node %s: CloudProvider does not support Instances()", node.Name)
return
}
err := instances.Configure(node.Name, &node.Spec)
if err != nil {
glog.Errorf("Error configuring node %s: %s", node.Name, err)
// The newly assigned CIDR was not properly configured, so don't save it in the API server.
node.Spec.PodCIDR = ""
return fmt.Errorf("error configuring node %s: CloudProvider does not support Instances()", node.Name)
}
return instances.Configure(node.Name, &node.Spec)
}
func (nc *NodeController) unassignNodeCIDR(nodeName string) {
@ -195,59 +196,14 @@ func (nc *NodeController) unassignNodeCIDR(nodeName string) {
glog.Errorf("Error deconfiguring node %s: CloudProvider does not support Instances()", nodeName)
return
}
err := instances.Release(nodeName)
if err != nil {
if err := instances.Release(nodeName); err != nil {
glog.Errorf("Error deconfiguring node %s: %s", nodeName, err)
}
}
// Run creates initial node list and start syncing instances from cloudprovider, if any.
// It also starts syncing or monitoring cluster node status.
// 1. registerNodes() is called only once to register all initial nodes (from cloudprovider
// or from command line flag). To make cluster bootstrap faster, node controller populates
// node addresses.
// 2. syncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider.
// Node created here will only have specs.
// 3. monitorNodeStatus() is called periodically to incorporate the results of node status
// pushed from kubelet to master.
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
// Register intial set of nodes with their status set.
var nodes *api.NodeList
var err error
if nc.isRunningCloudProvider() {
if syncNodeList {
if nodes, err = nc.getCloudNodesWithSpec(); err != nil {
glog.Errorf("Error loading initial node from cloudprovider: %v", err)
}
} else {
nodes = &api.NodeList{}
}
} else {
if nodes, err = nc.getStaticNodesWithSpec(); err != nil {
glog.Errorf("Error loading initial static nodes: %v", err)
}
}
if nodes, err = nc.populateAddresses(nodes); err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
}
if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs {
nc.reconcilePodCIDRs(nodes, &api.NodeList{})
}
if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil {
glog.Errorf("Error registering node list %+v: %v", nodes, err)
}
// Start syncing node list from cloudprovider.
if syncNodeList && nc.isRunningCloudProvider() {
go util.Forever(func() {
if err := nc.syncCloudNodes(); err != nil {
glog.Errorf("Error syncing cloud: %v", err)
}
}, period)
}
// Start monitoring node status.
// Incorporate the results of node status pushed from kubelet to master.
go util.Forever(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
@ -255,165 +211,6 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
}, nc.nodeMonitorPeriod)
}
// registerNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error {
if len(nodes.Items) == 0 {
return nil
}
nodes = nc.canonicalizeName(nodes)
toRegister := util.NewStringSet()
var wg sync.WaitGroup
var successfullyRegistered int32 = 0
for i := range nodes.Items {
node := &nodes.Items[i]
if !toRegister.Has(node.Name) {
wg.Add(1)
toRegister.Insert(node.Name)
go func(n *api.Node) {
defer wg.Done()
for i := 0; i < retryCount; i++ {
if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs {
nc.configureNodeCIDR(n)
}
_, err := nc.kubeClient.Nodes().Create(n)
if err == nil || apierrors.IsAlreadyExists(err) {
glog.Infof("Registered node in registry: %v", n.Name)
atomic.AddInt32(&successfullyRegistered, 1)
return
} else {
glog.Errorf("Error registering node %v (retries left: %v): %v", n.Name, retryCount-i-1, err)
}
time.Sleep(retryInterval)
}
glog.Errorf("Unable to register node %v", n.Name)
}(node)
}
}
wg.Wait()
if int32(toRegister.Len()) != atomic.LoadInt32(&successfullyRegistered) {
return ErrRegistration
} else {
return nil
}
}
// syncCloudNodes synchronizes the list of instances from cloudprovider to master server.
func (nc *NodeController) syncCloudNodes() error {
matches, err := nc.getCloudNodesWithSpec()
if err != nil {
return err
}
nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
if err != nil {
return err
}
nodeMap := make(map[string]*api.Node)
nodeMapLock := sync.Mutex{}
for i := range nodes.Items {
node := nodes.Items[i]
nodeMapLock.Lock()
nodeMap[node.Name] = &node
nodeMapLock.Unlock()
}
if nc.allocateNodeCIDRs {
nc.reconcilePodCIDRs(matches, nodes)
}
var wg sync.WaitGroup
wg.Add(len(matches.Items))
// Create nodes which have been created in cloud, but not in kubernetes cluster
// Skip nodes if we hit an error while trying to get their addresses.
for i := range matches.Items {
go func(node *api.Node) {
defer wg.Done()
nodeMapLock.Lock()
_, ok := nodeMap[node.Name]
nodeMapLock.Unlock()
if !ok {
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
nodeList := &api.NodeList{}
nodeList.Items = []api.Node{*node}
_, err = nc.populateAddresses(nodeList)
if err != nil {
glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err)
return
}
node.Status.Addresses = nodeList.Items[0].Status.Addresses
if nc.allocateNodeCIDRs {
nc.configureNodeCIDR(node)
}
glog.Infof("Create node in registry: %s", node.Name)
_, err = nc.kubeClient.Nodes().Create(node)
if err != nil {
glog.Errorf("Create node %s error: %v", node.Name, err)
}
}
nodeMapLock.Lock()
delete(nodeMap, node.Name)
nodeMapLock.Unlock()
}(&matches.Items[i])
}
wg.Wait()
wg.Add(len(nodeMap))
// Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
for nodeID := range nodeMap {
go func(nodeID string) {
defer wg.Done()
if nc.allocateNodeCIDRs {
nc.unassignNodeCIDR(nodeID)
}
glog.Infof("Delete node from registry: %s", nodeID)
err = nc.kubeClient.Nodes().Delete(nodeID)
if err != nil {
glog.Errorf("Delete node %s error: %v", nodeID, err)
}
nc.deletePods(nodeID)
}(nodeID)
}
wg.Wait()
return nil
}
// populateAddresses queries Address for given list of nodes.
func (nc *NodeController) populateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
if nc.isRunningCloudProvider() {
instances, ok := nc.cloud.Instances()
if !ok {
return nodes, ErrCloudInstance
}
for i := range nodes.Items {
node := &nodes.Items[i]
nodeAddresses, err := instances.NodeAddresses(node.Name)
if err != nil {
glog.Errorf("error getting instance addresses for %s: %v", node.Name, err)
} else {
node.Status.Addresses = nodeAddresses
}
}
} else {
for i := range nodes.Items {
node := &nodes.Items[i]
addr := net.ParseIP(node.Name)
if addr != nil {
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()}
node.Status.Addresses = []api.NodeAddress{address}
} else {
addrs, err := nc.lookupIP(node.Name)
if err != nil {
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
glog.Errorf("No ip address for node %v", node.Name)
} else {
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}
node.Status.Addresses = []api.NodeAddress{address}
}
}
}
}
return nodes, nil
}
func (nc *NodeController) recordNodeEvent(node *api.Node, event string) {
ref := &api.ObjectReference{
Kind: "Node",
@ -567,6 +364,9 @@ func (nc *NodeController) monitorNodeStatus() error {
if err != nil {
return err
}
if nc.allocateNodeCIDRs {
nc.reconcilePodCIDRs(nodes)
}
for i := range nodes.Items {
var gracePeriod time.Duration
var lastReadyCondition api.NodeCondition
@ -595,10 +395,12 @@ func (nc *NodeController) monitorNodeStatus() error {
if lastReadyCondition.Status == api.ConditionFalse &&
nc.now().After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
// Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node.
// Makes sure we are not removing pods from to many nodes in the same time.
// Makes sure we are not removing pods from too many nodes in the same time.
glog.Infof("Evicting pods: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
if nc.deletingPodsRateLimiter.CanAccept() {
nc.deletePods(node.Name)
if err := nc.deletePods(node.Name); err != nil {
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
}
}
}
if lastReadyCondition.Status == api.ConditionUnknown &&
@ -607,7 +409,9 @@ func (nc *NodeController) monitorNodeStatus() error {
// need to substract monitoring grace period in order to get the real 'podEvictionTimeout'.
glog.Infof("Evicting pods2: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
if nc.deletingPodsRateLimiter.CanAccept() {
nc.deletePods(node.Name)
if err := nc.deletePods(node.Name); err != nil {
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
}
}
}
@ -621,71 +425,33 @@ func (nc *NodeController) monitorNodeStatus() error {
if readyCondition.Status == api.ConditionUnknown && lastReadyCondition.Status != api.ConditionUnknown {
nc.recordNodeEvent(node, "unknown")
}
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node and all pods scheduled on the node.
if readyCondition.Status != api.ConditionTrue && nc.cloud != nil {
instances, ok := nc.cloud.Instances()
if !ok {
glog.Errorf("%v", ErrCloudInstance)
continue
}
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
if nc.allocateNodeCIDRs {
nc.unassignNodeCIDR(node.Name)
}
if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
continue
}
if err := nc.deletePods(node.Name); err != nil {
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
}
}
}
}
}
return nil
}
// getStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
// constructs spec fields for nodes.
func (nc *NodeController) getStaticNodesWithSpec() (*api.NodeList, error) {
result := &api.NodeList{}
for _, nodeID := range nc.nodes {
node := api.Node{
ObjectMeta: api.ObjectMeta{Name: nodeID},
Spec: api.NodeSpec{
ExternalID: nodeID,
},
Status: api.NodeStatus{
Capacity: nc.staticResources.Capacity,
},
}
result.Items = append(result.Items, node)
}
return result, nil
}
// getCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
// constructs spec fields for nodes.
func (nc *NodeController) getCloudNodesWithSpec() (*api.NodeList, error) {
result := &api.NodeList{}
instances, ok := nc.cloud.Instances()
if !ok {
return result, ErrCloudInstance
}
matches, err := instances.List(nc.matchRE)
if err != nil {
return result, err
}
for i := range matches {
node := api.Node{}
node.Name = matches[i]
resources, err := instances.GetNodeResources(matches[i])
if err != nil {
return nil, err
}
if resources == nil {
resources = nc.staticResources
}
if resources != nil {
node.Status.Capacity = resources.Capacity
if node.Status.Capacity != nil {
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(0, resource.DecimalSI)
}
}
instanceID, err := instances.ExternalID(node.Name)
if err != nil {
glog.Errorf("Error getting instance id for %s: %v", node.Name, err)
} else {
node.Spec.ExternalID = instanceID
}
result.Items = append(result.Items, node)
}
return result, nil
}
// deletePods will delete all pods from master running on given node.
func (nc *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
@ -708,19 +474,6 @@ func (nc *NodeController) deletePods(nodeID string) error {
return nil
}
// isRunningCloudProvider checks if cluster is running with cloud provider.
func (nc *NodeController) isRunningCloudProvider() bool {
return nc.cloud != nil && len(nc.matchRE) > 0
}
// canonicalizeName takes a node list and lowercases all nodes' name.
func (nc *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList {
for i := range nodes.Items {
nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name)
}
return nodes
}
// getCondition returns a condition object for the specific condition
// type, nil if the condition is not set.
func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition {

View File

@ -19,7 +19,6 @@ package nodecontroller
import (
"errors"
"fmt"
"reflect"
"sort"
"sync"
"testing"
@ -30,7 +29,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -142,506 +140,6 @@ func (m *FakeNodeHandler) Watch(label labels.Selector, field fields.Selector, re
return nil, nil
}
func TestRegisterNodes(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
machines []string
retryCount int
expectedRequestCount int
expectedCreateCount int
expectedFail bool
}{
{
// Register two nodes normally.
machines: []string{"node0", "node1"},
fakeNodeHandler: &FakeNodeHandler{
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true },
},
retryCount: 1,
expectedRequestCount: 2,
expectedCreateCount: 2,
expectedFail: false,
},
{
// Canonicalize node names.
machines: []string{"NODE0", "node1"},
fakeNodeHandler: &FakeNodeHandler{
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool {
if node.Name == "NODE0" {
return false
}
return true
},
},
retryCount: 1,
expectedRequestCount: 2,
expectedCreateCount: 2,
expectedFail: false,
},
{
// No machine to register.
machines: []string{},
fakeNodeHandler: &FakeNodeHandler{
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true },
},
retryCount: 1,
expectedRequestCount: 0,
expectedCreateCount: 0,
expectedFail: false,
},
{
// Fail the first two requests.
machines: []string{"node0", "node1"},
fakeNodeHandler: &FakeNodeHandler{
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool {
if fake.RequestCount == 0 || fake.RequestCount == 1 {
return false
}
return true
},
},
retryCount: 10,
expectedRequestCount: 4,
expectedCreateCount: 2,
expectedFail: false,
},
{
// One node already exists
machines: []string{"node0", "node1"},
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
},
},
},
},
retryCount: 10,
expectedRequestCount: 2,
expectedCreateCount: 1,
expectedFail: false,
},
{
// The first node always fails.
machines: []string{"node0", "node1"},
fakeNodeHandler: &FakeNodeHandler{
CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool {
if node.Name == "node0" {
return false
}
return true
},
},
retryCount: 2,
expectedRequestCount: 3, // 2 for node0, 1 for node1
expectedCreateCount: 1,
expectedFail: true,
},
}
for _, item := range table {
nodes := api.NodeList{}
for _, machine := range item.machines {
nodes.Items = append(nodes.Items, *newNode(machine))
}
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
err := nodeController.registerNodes(&nodes, item.retryCount, time.Millisecond)
if !item.expectedFail && err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.expectedFail && err == nil {
t.Errorf("unexpected non-error")
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v calls, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
if len(item.fakeNodeHandler.CreatedNodes) != item.expectedCreateCount {
t.Errorf("expected %v nodes, but got %v.", item.expectedCreateCount, item.fakeNodeHandler.CreatedNodes)
}
}
}
func TestCreateGetStaticNodesWithSpec(t *testing.T) {
table := []struct {
machines []string
expectedNodes *api.NodeList
}{
{
machines: []string{},
expectedNodes: &api.NodeList{},
},
{
machines: []string{"node0"},
expectedNodes: &api.NodeList{
Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Spec: api.NodeSpec{
ExternalID: "node0",
},
Status: api.NodeStatus{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
},
},
},
},
},
},
{
machines: []string{"node0", "node1"},
expectedNodes: &api.NodeList{
Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Spec: api.NodeSpec{
ExternalID: "node0",
},
Status: api.NodeStatus{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "node1"},
Spec: api.NodeSpec{
ExternalID: "node1",
},
Status: api.NodeStatus{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
},
},
},
},
},
},
}
resources := api.NodeResources{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
},
}
for _, item := range table {
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodes, err := nodeController.getStaticNodesWithSpec()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(item.expectedNodes, nodes) {
t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes)
}
}
}
func TestCreateGetCloudNodesWithSpec(t *testing.T) {
resourceList := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(3000, resource.DecimalSI),
}
table := []struct {
fakeCloud *fake_cloud.FakeCloud
machines []string
expectedNodes *api.NodeList
}{
{
fakeCloud: &fake_cloud.FakeCloud{},
expectedNodes: &api.NodeList{},
},
{
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"},
NodeResources: &api.NodeResources{Capacity: resourceList},
},
expectedNodes: &api.NodeList{
Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{Capacity: resourceList},
},
},
},
},
{
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0", "node1"},
NodeResources: &api.NodeResources{Capacity: resourceList},
},
expectedNodes: &api.NodeList{
Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{Capacity: resourceList},
},
{
ObjectMeta: api.ObjectMeta{Name: "node1"},
Status: api.NodeStatus{Capacity: resourceList},
},
},
},
},
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodes, err := nodeController.getCloudNodesWithSpec()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(item.expectedNodes, nodes) {
t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes)
}
}
}
func TestSyncCloudNodes(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeCloud *fake_cloud.FakeCloud
matchRE string
expectedRequestCount int
expectedNameCreated []string
expectedExtIDCreated []string
expectedAddrsCreated []string
expectedDeleted []string
}{
{
// 1 existing node, 1 cloud nodes: do nothing.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0")},
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"},
ExtID: map[string]string{
"node0": "ext-node0",
"node1": "ext-node1",
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}},
},
matchRE: ".*",
expectedRequestCount: 1, // List
expectedNameCreated: []string{},
expectedExtIDCreated: []string{},
expectedAddrsCreated: []string{},
expectedDeleted: []string{},
},
{
// 1 existing node, 2 cloud nodes: create 1.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0")},
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0", "node1"},
ExtID: map[string]string{
"node0": "ext-node0",
"node1": "ext-node1",
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}},
},
matchRE: ".*",
expectedRequestCount: 2, // List + Create
expectedNameCreated: []string{"node1"},
expectedExtIDCreated: []string{"ext-node1"},
expectedAddrsCreated: []string{"1.2.3.4"},
expectedDeleted: []string{},
},
{
// 2 existing nodes, 1 cloud node: delete 1.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"},
ExtID: map[string]string{
"node0": "ext-node0",
"node1": "ext-node1",
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}},
},
matchRE: ".*",
expectedRequestCount: 2, // List + Delete
expectedNameCreated: []string{},
expectedExtIDCreated: []string{},
expectedAddrsCreated: []string{},
expectedDeleted: []string{"node1"},
},
{
// 1 existing node, 3 cloud nodes but only 2 match regex: delete 1.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0")},
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0", "node1", "fake"},
ExtID: map[string]string{
"node0": "ext-node0",
"node1": "ext-node1",
"fake": "ext-fake",
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}},
},
matchRE: "node[0-9]+",
expectedRequestCount: 2, // List + Create
expectedNameCreated: []string{"node1"},
expectedExtIDCreated: []string{"ext-node1"},
expectedAddrsCreated: []string{"1.2.3.4"},
expectedDeleted: []string{},
},
}
for _, item := range table {
if item.fakeNodeHandler.Fake == nil {
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
}
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
if err := nodeController.syncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
nodes := sortedNodeNames(item.fakeNodeHandler.CreatedNodes)
if !reflect.DeepEqual(item.expectedNameCreated, nodes) {
t.Errorf("expected node list %+v, got %+v", item.expectedNameCreated, nodes)
}
nodeExtIDs := sortedNodeExternalIDs(item.fakeNodeHandler.CreatedNodes)
if !reflect.DeepEqual(item.expectedExtIDCreated, nodeExtIDs) {
t.Errorf("expected node external id list %+v, got %+v", item.expectedExtIDCreated, nodeExtIDs)
}
nodeAddrs := sortedNodeAddresses(item.fakeNodeHandler.CreatedNodes)
if !reflect.DeepEqual(item.expectedAddrsCreated, nodeAddrs) {
t.Errorf("expected node address list %+v, got %+v", item.expectedAddrsCreated, nodeAddrs)
}
nodes = sortedNodeNames(item.fakeNodeHandler.DeletedNodes)
if !reflect.DeepEqual(item.expectedDeleted, nodes) {
t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes)
}
}
}
func TestSyncCloudNodesEvictPods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeCloud *fake_cloud.FakeCloud
matchRE string
expectedRequestCount int
expectedDeleted []string
expectedActions []testclient.FakeAction
}{
{
// No node to delete: do nothing.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0", "node1"},
},
matchRE: ".*",
expectedRequestCount: 1, // List
expectedDeleted: []string{},
expectedActions: nil,
},
{
// Delete node1, and pod0 is running on it.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node1")}}),
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"},
},
matchRE: ".*",
expectedRequestCount: 2, // List + Delete
expectedDeleted: []string{"node1"},
expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
{
// Delete node1, but pod0 is running on node0.
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}),
},
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"},
},
matchRE: ".*",
expectedRequestCount: 2, // List + Delete
expectedDeleted: []string{"node1"},
expectedActions: []testclient.FakeAction{{Action: "list-pods"}},
},
}
for _, item := range table {
if item.fakeNodeHandler.Fake == nil {
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
}
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
if err := nodeController.syncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
nodes := sortedNodeNames(item.fakeNodeHandler.DeletedNodes)
if !reflect.DeepEqual(item.expectedDeleted, nodes) {
t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes)
}
if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) {
t.Errorf("time out waiting for deleting pods, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions)
}
}
}
func TestPopulateNodeAddresses(t *testing.T) {
table := []struct {
nodes *api.NodeList
fakeCloud *fake_cloud.FakeCloud
expectedFail bool
expectedAddresses []api.NodeAddress
}{
{
nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}},
fakeCloud: &fake_cloud.FakeCloud{Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}},
expectedAddresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
},
},
{
nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}},
fakeCloud: &fake_cloud.FakeCloud{Err: ErrQueryIPAddress},
expectedAddresses: nil,
},
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
result, err := nodeController.populateAddresses(item.nodes)
// In case of IP querying error, we should continue.
if err != nil {
t.Errorf("unexpected error: %v", err)
}
for _, node := range result.Items {
if !reflect.DeepEqual(item.expectedAddresses, node.Status.Addresses) {
t.Errorf("expect HostIP %s, got %s", item.expectedAddresses, node.Status.Addresses)
}
}
}
}
func TestMonitorNodeStatusEvictPods(t *testing.T) {
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
evictionTimeout := 10 * time.Minute
@ -826,7 +324,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10,
nodeController := NewNodeController(nil, "", nil, item.fakeNodeHandler, 10,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
@ -1029,7 +527,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(),
nodeController := NewNodeController(nil, "", nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {

View File

@ -31,6 +31,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -66,11 +67,6 @@ const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 5 * time.Minute
// Initial node status update frequency and incremental frequency, for faster cluster startup.
// The update frequency will be increameted linearly, until it reaches status_update_frequency.
initialNodeStatusUpdateFrequency = 100 * time.Millisecond
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
@ -122,6 +118,7 @@ func NewMainKubelet(
pullBurst int,
containerGCPolicy ContainerGCPolicy,
sourcesReady SourcesReadyFn,
registerNode bool,
clusterDomain string,
clusterDNS net.IP,
masterServiceNamespace string,
@ -224,6 +221,7 @@ func NewMainKubelet(
readinessManager: readinessManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
registerNode: registerNode,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
@ -373,6 +371,9 @@ type Kubelet struct {
// cAdvisor used for container information.
cadvisor cadvisor.Interface
// Set to true to have the node register itself with the apiserver.
registerNode bool
// If non-empty, use this for container DNS search.
clusterDomain string
@ -657,26 +658,22 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
glog.Infof("Running in container %q", kl.resourceContainer)
}
err := kl.imageManager.Start()
if err != nil {
if err := kl.imageManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ImageManager %v", err)
glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
}
err = kl.cadvisor.Start()
if err != nil {
if err := kl.cadvisor.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start CAdvisor %v", err)
glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)
}
err = kl.containerManager.Start()
if err != nil {
if err := kl.containerManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ContainerManager %v", err)
glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)
}
err = kl.oomWatcher.Start(kl.nodeRef)
if err != nil {
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start OOM watcher %v", err)
glog.Errorf("Failed to start OOM watching: %v", err)
}
@ -688,20 +685,83 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl)
}
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: kl.hostname,
Labels: map[string]string{"kubernetes.io/hostname": kl.hostname},
},
}
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
instanceID, err := instances.ExternalID(kl.hostname)
if err != nil {
return nil, fmt.Errorf("failed to get instance ID from cloud provider: %v", err)
}
node.Spec.ExternalID = instanceID
} else {
node.Spec.ExternalID = kl.hostname
}
if err := kl.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// registerWithApiserver registers the node with the cluster master.
func (kl *Kubelet) registerWithApiserver() {
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := kl.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Nodes().Create(node); err != nil {
if apierrors.IsAlreadyExists(err) {
currentNode, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
glog.Errorf("error getting node %q: %v", kl.hostname, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.hostname)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
return
}
}
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
glog.Infof("Successfully registered node %s", node.Name)
return
}
}
// syncNodeStatus periodically synchronizes node status to master.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
glog.Infof("Starting node status updates")
for feq := initialNodeStatusUpdateFrequency; feq < kl.nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc {
select {
case <-time.After(feq):
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
if kl.registerNode {
kl.registerWithApiserver()
}
glog.Infof("Starting node status updates")
for {
select {
case <-time.After(kl.nodeStatusUpdateFrequency):
@ -1707,14 +1767,13 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
err := kl.tryUpdateNodeStatus()
if err != nil {
glog.Errorf("error updating node status, will retry: %v", err)
if err := kl.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("Update node status exceeds retry count")
return fmt.Errorf("update node status exceeds retry count")
}
func (kl *Kubelet) recordNodeOnlineEvent() {
@ -1738,15 +1797,36 @@ func (kl *Kubelet) recordNodeUnschedulableEvent() {
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.hostname)
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func (kl *Kubelet) setNodeStatus(node *api.Node) error {
// Set addresses for the node.
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
nodeAddresses, err := instances.NodeAddresses(kl.hostname)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
addr := net.ParseIP(kl.hostname)
if addr != nil {
node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addr.String()}}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}}
}
}
}
networkConfigured := true
@ -1761,7 +1841,13 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
glog.Errorf("error getting machine info: %v", err)
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
@ -1780,7 +1866,7 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
verinfo, err := kl.cadvisor.VersionInfo()
if err != nil {
glog.Errorf("error getting version info: %v", err)
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion
@ -1848,7 +1934,22 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
return nil
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.hostname)
}
if err := kl.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
_, err = kl.kubeClient.Nodes().UpdateStatus(node)
return err

View File

@ -34,6 +34,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
@ -44,6 +45,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
@ -3239,7 +3241,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "testnode"}},
{ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}},
}}).ReactFn
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
@ -3257,7 +3259,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
@ -3284,6 +3286,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}},
},
}
@ -3317,7 +3320,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
@ -3353,7 +3356,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
@ -3380,6 +3383,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}},
},
}
@ -3419,7 +3423,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
fakeDocker.VersionInfo = []string{}
kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "testnode"}},
{ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}},
}}).ReactFn
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorApi.MachineInfo{
@ -3438,7 +3442,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
@ -3465,6 +3469,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}},
},
}
@ -4402,6 +4407,62 @@ func TestFilterOutTerminatedPods(t *testing.T) {
}
}
func TestRegisterExistingNodeWithApiserver(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubelet.hostname = "127.0.0.1"
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactFn = func(action testclient.FakeAction) (runtime.Object, error) {
segments := strings.Split(action.Action, "-")
if len(segments) < 2 {
return nil, fmt.Errorf("unrecognized action, need two or three segments <verb>-<resource> or <verb>-<subresource>-<resource>: %s", action.Action)
}
verb := segments[0]
switch verb {
case "create":
// Return an error on create.
return &api.Node{}, &apierrors.StatusError{
ErrStatus: api.Status{Reason: api.StatusReasonAlreadyExists},
}
case "get":
// Return an existing (matching) node on get.
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{ExternalID: "127.0.0.1"},
}, nil
default:
return nil, fmt.Errorf("no reaction implemented for %s", action.Action)
}
}
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
done := make(chan struct{})
go func() {
kubelet.registerWithApiserver()
done <- struct{}{}
}()
select {
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for registration")
case <-done:
return
}
}
func TestMakePortMappings(t *testing.T) {
tests := []struct {
container *api.Container