Revert "Move node information retreival to nodeManager"

This reverts commit 2369cd6401.
pull/6/head
Yu-Ju Hong 2015-09-21 11:00:04 -07:00
parent 7bdf9bfdfa
commit d7b9f8905f
2 changed files with 44 additions and 58 deletions

View File

@ -63,6 +63,7 @@ import (
utilErrors "k8s.io/kubernetes/pkg/util/errors"
kubeio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
"k8s.io/kubernetes/pkg/util/sets"
@ -200,6 +201,23 @@ func NewMainKubelet(
}
serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.Nodes().List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
// TODO: get the real node object of ourself,
// and use the real node name and UID.
// TODO: what is namespace for node?
@ -245,6 +263,7 @@ func NewMainKubelet(
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
runtimeMutex: sync.Mutex{},
runtimeUpThreshold: maxWaitForContainerRuntime,
lastTimestampRuntimeUp: time.Time{},
@ -387,6 +406,11 @@ type serviceLister interface {
List() (api.ServiceList, error)
}
type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
@ -429,6 +453,7 @@ type Kubelet struct {
masterServiceNamespace string
serviceLister serviceLister
nodeLister nodeLister
// Last timestamp when runtime responded on ping.
// Mutex is used to protect this value.
@ -676,6 +701,13 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
return pods, nil
}
func (kl *Kubelet) GetNode() (*api.Node, error) {
if kl.standaloneMode {
return nil, errors.New("no node entry for kubelet in standalone mode")
}
return kl.nodeLister.GetNodeInfo(kl.nodeName)
}
// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
go util.Until(func() {
@ -1657,7 +1689,7 @@ func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool {
if kl.standaloneMode {
return true
}
node, err := kl.nodeManager.GetNode()
node, err := kl.GetNode()
if err != nil {
glog.Errorf("error getting node: %v", err)
return true
@ -1956,7 +1988,11 @@ func (kl *Kubelet) GetHostname() string {
// Returns host IP or nil in case of error.
func (kl *Kubelet) GetHostIP() (net.IP, error) {
return kl.nodeManager.GetHostIP()
node, err := kl.GetNode()
if err != nil {
return nil, fmt.Errorf("cannot get node: %v", err)
}
return nodeutil.GetNodeHostIP(node)
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
@ -2258,7 +2294,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
}
if !kl.standaloneMode {
hostIP, err := kl.nodeManager.GetHostIP()
hostIP, err := kl.GetHostIP()
if err != nil {
glog.V(4).Infof("Cannot get host IP: %v", err)
} else {

View File

@ -16,8 +16,10 @@ limitations under the License.
package kubelet
// Note: if you change code in this file, you might need to change code in
// contrib/mesos/pkg/executor/.
import (
"errors"
"fmt"
"net"
"strings"
@ -34,13 +36,8 @@ import (
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
)
const (
@ -57,17 +54,13 @@ type infoGetter interface {
type nodeManager interface {
Start()
GetNode() (*api.Node, error)
GetPodCIDR() string
GetHostIP() (net.IP, error)
}
type realNodeManager struct {
// apiserver client.
client client.Interface
nodeLister nodeLister
// Set to true to have the node register itself with the apiserver.
registerNode bool
@ -133,17 +126,10 @@ func newRealNodeManager(client client.Interface, cloud cloudprovider.Interface,
}
}
type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}
func (nm *realNodeManager) Start() {
if nm.client == nil {
return
}
nm.setNodeLister()
if nm.client != nil {
go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop)
}
}
func (nm *realNodeManager) GetPodCIDR() string {
@ -152,42 +138,6 @@ func (nm *realNodeManager) GetPodCIDR() string {
return nm.podCIDR
}
func (nm *realNodeManager) GetNode() (*api.Node, error) {
if nm.client == nil {
return nil, errors.New("unable to get node entry because apiserver client is nil")
}
return nm.nodeLister.GetNodeInfo(nm.nodeName)
}
// Returns host IP or nil in case of error.
func (nm *realNodeManager) GetHostIP() (net.IP, error) {
if nm.client == nil {
return nil, errors.New("unable to get node entry because apiserver client is nil")
}
node, err := nm.GetNode()
if err != nil {
return nil, fmt.Errorf("cannot get node: %v", err)
}
return nodeutil.GetNodeHostIP(node)
}
func (nm *realNodeManager) setNodeLister() {
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{client.ObjectNameField: nm.nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return nm.client.Nodes().List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return nm.client.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
nm.nodeLister = &cache.StoreToNodeLister{Store: nodeStore}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.