From d7b9f8905f58a3440d6c9a1725a81b2ca5178d31 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Mon, 21 Sep 2015 11:00:04 -0700 Subject: [PATCH] Revert "Move node information retreival to nodeManager" This reverts commit 2369cd64011ff4f16b115d34d5e815c8e676020d. --- pkg/kubelet/kubelet.go | 42 ++++++++++++++++++++++++-- pkg/kubelet/node_manager.go | 60 ++++--------------------------------- 2 files changed, 44 insertions(+), 58 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8ee8e283e7..b19322932a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -63,6 +63,7 @@ import ( utilErrors "k8s.io/kubernetes/pkg/util/errors" kubeio "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" + nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/sets" @@ -200,6 +201,23 @@ func NewMainKubelet( } serviceLister := &cache.StoreToServiceLister{Store: serviceStore} + nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + if kubeClient != nil { + // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather + // than an interface. There is no way to construct a list+watcher using resource name. + fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector() + listWatch := &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return kubeClient.Nodes().List(labels.Everything(), fieldSelector) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion) + }, + } + cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() + } + nodeLister := &cache.StoreToNodeLister{Store: nodeStore} + // TODO: get the real node object of ourself, // and use the real node name and UID. // TODO: what is namespace for node? @@ -245,6 +263,7 @@ func NewMainKubelet( clusterDomain: clusterDomain, clusterDNS: clusterDNS, serviceLister: serviceLister, + nodeLister: nodeLister, runtimeMutex: sync.Mutex{}, runtimeUpThreshold: maxWaitForContainerRuntime, lastTimestampRuntimeUp: time.Time{}, @@ -387,6 +406,11 @@ type serviceLister interface { List() (api.ServiceList, error) } +type nodeLister interface { + List() (machines api.NodeList, err error) + GetNodeInfo(id string) (*api.Node, error) +} + // Kubelet is the main kubelet implementation. type Kubelet struct { hostname string @@ -429,6 +453,7 @@ type Kubelet struct { masterServiceNamespace string serviceLister serviceLister + nodeLister nodeLister // Last timestamp when runtime responded on ping. // Mutex is used to protect this value. @@ -676,6 +701,13 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) { return pods, nil } +func (kl *Kubelet) GetNode() (*api.Node, error) { + if kl.standaloneMode { + return nil, errors.New("no node entry for kubelet in standalone mode") + } + return kl.nodeLister.GetNodeInfo(kl.nodeName) +} + // Starts garbage collection threads. func (kl *Kubelet) StartGarbageCollection() { go util.Until(func() { @@ -1657,7 +1689,7 @@ func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool { if kl.standaloneMode { return true } - node, err := kl.nodeManager.GetNode() + node, err := kl.GetNode() if err != nil { glog.Errorf("error getting node: %v", err) return true @@ -1956,7 +1988,11 @@ func (kl *Kubelet) GetHostname() string { // Returns host IP or nil in case of error. func (kl *Kubelet) GetHostIP() (net.IP, error) { - return kl.nodeManager.GetHostIP() + node, err := kl.GetNode() + if err != nil { + return nil, fmt.Errorf("cannot get node: %v", err) + } + return nodeutil.GetNodeHostIP(node) } // GetPods returns all pods bound to the kubelet and their spec, and the mirror @@ -2258,7 +2294,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { } if !kl.standaloneMode { - hostIP, err := kl.nodeManager.GetHostIP() + hostIP, err := kl.GetHostIP() if err != nil { glog.V(4).Infof("Cannot get host IP: %v", err) } else { diff --git a/pkg/kubelet/node_manager.go b/pkg/kubelet/node_manager.go index 773672b179..91e1fe4ceb 100644 --- a/pkg/kubelet/node_manager.go +++ b/pkg/kubelet/node_manager.go @@ -16,8 +16,10 @@ limitations under the License. package kubelet +// Note: if you change code in this file, you might need to change code in +// contrib/mesos/pkg/executor/. + import ( - "errors" "fmt" "net" "strings" @@ -34,13 +36,8 @@ import ( "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" - nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/version" - "k8s.io/kubernetes/pkg/watch" ) const ( @@ -57,17 +54,13 @@ type infoGetter interface { type nodeManager interface { Start() - GetNode() (*api.Node, error) GetPodCIDR() string - GetHostIP() (net.IP, error) } type realNodeManager struct { // apiserver client. client client.Interface - nodeLister nodeLister - // Set to true to have the node register itself with the apiserver. registerNode bool @@ -133,17 +126,10 @@ func newRealNodeManager(client client.Interface, cloud cloudprovider.Interface, } } -type nodeLister interface { - List() (machines api.NodeList, err error) - GetNodeInfo(id string) (*api.Node, error) -} - func (nm *realNodeManager) Start() { - if nm.client == nil { - return + if nm.client != nil { + go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop) } - nm.setNodeLister() - go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop) } func (nm *realNodeManager) GetPodCIDR() string { @@ -152,42 +138,6 @@ func (nm *realNodeManager) GetPodCIDR() string { return nm.podCIDR } -func (nm *realNodeManager) GetNode() (*api.Node, error) { - if nm.client == nil { - return nil, errors.New("unable to get node entry because apiserver client is nil") - } - return nm.nodeLister.GetNodeInfo(nm.nodeName) -} - -// Returns host IP or nil in case of error. -func (nm *realNodeManager) GetHostIP() (net.IP, error) { - if nm.client == nil { - return nil, errors.New("unable to get node entry because apiserver client is nil") - } - node, err := nm.GetNode() - if err != nil { - return nil, fmt.Errorf("cannot get node: %v", err) - } - return nodeutil.GetNodeHostIP(node) -} - -func (nm *realNodeManager) setNodeLister() { - nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather - // than an interface. There is no way to construct a list+watcher using resource name. - fieldSelector := fields.Set{client.ObjectNameField: nm.nodeName}.AsSelector() - listWatch := &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return nm.client.Nodes().List(labels.Everything(), fieldSelector) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return nm.client.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion) - }, - } - cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() - nm.nodeLister = &cache.StoreToNodeLister{Store: nodeStore} -} - // syncNodeStatus should be called periodically from a goroutine. // It synchronizes node status to master, registering the kubelet first if // necessary.