Move node information retreival to nodeManager

nodeManager should handle most node object interaction with apiserver. This
moves exsiting node-watching and GetNodes() methods to nodeManager.
pull/6/head
Yu-Ju Hong 2015-09-16 15:44:51 -07:00
parent e7d1e47f31
commit 2369cd6401
2 changed files with 59 additions and 44 deletions

View File

@ -61,7 +61,6 @@ import (
"k8s.io/kubernetes/pkg/util/bandwidth"
utilErrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
"k8s.io/kubernetes/pkg/util/sets"
@ -198,23 +197,6 @@ func NewMainKubelet(
}
serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.Nodes().List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
// TODO: get the real node object of ourself,
// and use the real node name and UID.
// TODO: what is namespace for node?
@ -260,7 +242,6 @@ func NewMainKubelet(
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
runtimeMutex: sync.Mutex{},
runtimeUpThreshold: maxWaitForContainerRuntime,
lastTimestampRuntimeUp: time.Time{},
@ -402,11 +383,6 @@ type serviceLister interface {
List() (api.ServiceList, error)
}
type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
@ -449,7 +425,6 @@ type Kubelet struct {
masterServiceNamespace string
serviceLister serviceLister
nodeLister nodeLister
// Last timestamp when runtime responded on ping.
// Mutex is used to protect this value.
@ -694,13 +669,6 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
return pods, nil
}
func (kl *Kubelet) GetNode() (*api.Node, error) {
if kl.standaloneMode {
return nil, errors.New("no node entry for kubelet in standalone mode")
}
return kl.nodeLister.GetNodeInfo(kl.nodeName)
}
// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
go util.Until(func() {
@ -1682,7 +1650,7 @@ func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool {
if kl.standaloneMode {
return true
}
node, err := kl.GetNode()
node, err := kl.nodeManager.GetNode()
if err != nil {
glog.Errorf("error getting node: %v", err)
return true
@ -1981,11 +1949,7 @@ func (kl *Kubelet) GetHostname() string {
// Returns host IP or nil in case of error.
func (kl *Kubelet) GetHostIP() (net.IP, error) {
node, err := kl.GetNode()
if err != nil {
return nil, fmt.Errorf("cannot get node: %v", err)
}
return nodeutil.GetNodeHostIP(node)
return kl.nodeManager.GetHostIP()
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
@ -2248,7 +2212,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses)...)
if !kl.standaloneMode {
hostIP, err := kl.GetHostIP()
hostIP, err := kl.nodeManager.GetHostIP()
if err != nil {
glog.V(4).Infof("Cannot get host IP: %v", err)
} else {

View File

@ -16,10 +16,8 @@ limitations under the License.
package kubelet
// Note: if you change code in this file, you might need to change code in
// contrib/mesos/pkg/executor/.
import (
"errors"
"fmt"
"net"
"strings"
@ -31,11 +29,17 @@ import (
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
)
const (
@ -52,13 +56,17 @@ type infoGetter interface {
type nodeManager interface {
Start()
GetNode() (*api.Node, error)
GetPodCIDR() string
GetHostIP() (net.IP, error)
}
type realNodeManager struct {
// apiserver client.
client client.Interface
nodeLister nodeLister
// Set to true to have the node register itself with the apiserver.
registerNode bool
@ -124,10 +132,17 @@ func newRealNodeManager(client client.Interface, cloud cloudprovider.Interface,
}
}
type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}
func (nm *realNodeManager) Start() {
if nm.client != nil {
go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop)
if nm.client == nil {
return
}
nm.setNodeLister()
go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop)
}
func (nm *realNodeManager) GetPodCIDR() string {
@ -136,6 +151,42 @@ func (nm *realNodeManager) GetPodCIDR() string {
return nm.podCIDR
}
func (nm *realNodeManager) GetNode() (*api.Node, error) {
if nm.client == nil {
return nil, errors.New("unable to get node entry because apiserver client is nil")
}
return nm.nodeLister.GetNodeInfo(nm.nodeName)
}
// Returns host IP or nil in case of error.
func (nm *realNodeManager) GetHostIP() (net.IP, error) {
if nm.client == nil {
return nil, errors.New("unable to get node entry because apiserver client is nil")
}
node, err := nm.GetNode()
if err != nil {
return nil, fmt.Errorf("cannot get node: %v", err)
}
return nodeutil.GetNodeHostIP(node)
}
func (nm *realNodeManager) setNodeLister() {
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{client.ObjectNameField: nm.nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return nm.client.Nodes().List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return nm.client.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
nm.nodeLister = &cache.StoreToNodeLister{Store: nodeStore}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.