Merge pull request #17827 from jiangyaoguo/move-GetNodeInfo-to-scheduler

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-11-30 21:16:16 -08:00
commit 0ca2182192
6 changed files with 36 additions and 21 deletions

View File

@ -138,23 +138,6 @@ func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) {
return
}
// TODO Move this back to scheduler as a helper function that takes a Store,
// rather than a method of StoreToNodeLister.
// GetNodeInfo returns cached data for the node 'id'.
func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) {
node, exists, err := s.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}})
if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}
if !exists {
return nil, fmt.Errorf("node '%v' is not in cache", id)
}
return node.(*api.Node), nil
}
// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
type StoreToReplicationControllerLister struct {
Store

View File

@ -263,6 +263,7 @@ func NewMainKubelet(
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
nodeInfo := &predicates.CachedNodeInfo{nodeLister}
// TODO: get the real node object of ourself,
// and use the real node name and UID.
@ -301,6 +302,7 @@ func NewMainKubelet(
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
nodeInfo: nodeInfo,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
@ -473,7 +475,6 @@ type serviceLister interface {
type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}
// Kubelet is the main kubelet implementation.
@ -527,6 +528,7 @@ type Kubelet struct {
masterServiceNamespace string
serviceLister serviceLister
nodeLister nodeLister
nodeInfo predicates.NodeInfo
// a list of node labels to register
nodeLabels []string
@ -822,7 +824,7 @@ func (kl *Kubelet) GetNode() (*api.Node, error) {
if kl.standaloneMode {
return nil, errors.New("no node entry for kubelet in standalone mode")
}
return kl.nodeLister.GetNodeInfo(kl.nodeName)
return kl.nodeInfo.GetNodeInfo(kl.nodeName)
}
// Starts garbage collection threads.

View File

@ -112,6 +112,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.nodeInfo = testNodeInfo{}
kubelet.recorder = fakeRecorder
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
@ -1045,7 +1046,11 @@ type testNodeLister struct {
nodes []api.Node
}
func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) {
type testNodeInfo struct {
nodes []api.Node
}
func (ls testNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
for _, node := range ls.nodes {
if node.Name == id {
return &node, nil
@ -2319,6 +2324,9 @@ func TestHandleNodeSelector(t *testing.T) {
kl.nodeLister = testNodeLister{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)

View File

@ -49,6 +49,7 @@ func TestRunOnce(t *testing.T) {
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
nodeLister: testNodeLister{},
nodeInfo: testNodeInfo{},
statusManager: status.NewManager(nil, podManager),
containerRefManager: kubecontainer.NewRefManager(),
podManager: podManager,

View File

@ -20,6 +20,7 @@ import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -52,6 +53,25 @@ func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
return nodes.Nodes().Get(nodeID)
}
type CachedNodeInfo struct {
*cache.StoreToNodeLister
}
// GetNodeInfo returns cached data for the node 'id'.
func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
node, exists, err := c.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}})
if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}
if !exists {
return nil, fmt.Errorf("node '%v' is not in cache", id)
}
return node.(*api.Node), nil
}
func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
if volume.GCEPersistentDisk != nil {
disk := volume.GCEPersistentDisk

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/api/validation"
@ -176,7 +177,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
ControllerLister: f.ControllerLister,
// All fit predicates only need to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: f.NodeLister,
NodeInfo: &predicates.CachedNodeInfo{f.NodeLister},
}
predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs)
if err != nil {