From 1835c8528d784816b9555931392831840082d333 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 21 Apr 2016 10:24:12 +0200 Subject: [PATCH] Store node information in NodeInfo --- pkg/kubelet/kubelet.go | 5 +- .../algorithm/predicates/predicates.go | 133 ++++---------- .../algorithm/predicates/predicates_test.go | 23 ++- .../algorithmprovider/defaults/defaults.go | 25 +-- plugin/pkg/scheduler/factory/factory.go | 166 ++++++++++++------ plugin/pkg/scheduler/factory/plugins.go | 1 - plugin/pkg/scheduler/schedulercache/cache.go | 52 +++++- .../pkg/scheduler/schedulercache/interface.go | 9 + .../pkg/scheduler/schedulercache/node_info.go | 28 +++ plugin/pkg/scheduler/testing/fake_cache.go | 6 + plugin/pkg/scheduler/testing/pods_to_cache.go | 6 + 11 files changed, 269 insertions(+), 185 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3c548c2b04..d390e80048 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2351,8 +2351,9 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str otherPods = append(otherPods, p) } } - nodeInfo := schedulercache.CreateNodeNameToInfoMap(otherPods)[kl.nodeName] - fit, err := predicates.RunGeneralPredicates(pod, kl.nodeName, nodeInfo, node) + nodeInfo := schedulercache.NewNodeInfo(otherPods...) + nodeInfo.SetNode(node) + fit, err := predicates.GeneralPredicates(pod, kl.nodeName, nodeInfo) if !fit { if re, ok := err.(*predicates.PredicateFailureError); ok { reason := re.PredicateName diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 29c5d33766..a67b161525 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -21,7 +21,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -42,27 +41,6 @@ type PersistentVolumeClaimInfo interface { GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*api.PersistentVolumeClaim, error) } -type StaticNodeInfo struct { - *api.NodeList -} - -func (nodes StaticNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) { - for ix := range nodes.Items { - if nodes.Items[ix].Name == nodeID { - return &nodes.Items[ix], nil - } - } - return nil, fmt.Errorf("failed to find node: %s, %#v", nodeID, nodes) -} - -type ClientNodeInfo struct { - *client.Client -} - -func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) { - return nodes.Nodes().Get(nodeID) -} - type CachedNodeInfo struct { *cache.StoreToNodeLister } @@ -271,9 +249,8 @@ var GCEPDVolumeFilter VolumeFilter = VolumeFilter{ } type VolumeZoneChecker struct { - nodeInfo NodeInfo - pvInfo PersistentVolumeInfo - pvcInfo PersistentVolumeClaimInfo + pvInfo PersistentVolumeInfo + pvcInfo PersistentVolumeClaimInfo } // VolumeZonePredicate evaluates if a pod can fit due to the volumes it requests, given @@ -290,20 +267,16 @@ type VolumeZoneChecker struct { // determining the zone of a volume during scheduling, and that is likely to // require calling out to the cloud provider. It seems that we are moving away // from inline volume declarations anyway. -func NewVolumeZonePredicate(nodeInfo NodeInfo, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate { +func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate { c := &VolumeZoneChecker{ - nodeInfo: nodeInfo, - pvInfo: pvInfo, - pvcInfo: pvcInfo, + pvInfo: pvInfo, + pvcInfo: pvcInfo, } return c.predicate } func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - node, err := c.nodeInfo.GetNodeInfo(nodeName) - if err != nil { - return false, err - } + node := nodeInfo.Node() if node == nil { return false, fmt.Errorf("node not found: %q", nodeName) } @@ -372,10 +345,6 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *s return true, nil } -type ResourceFit struct { - info NodeInfo -} - type resourceRequest struct { milliCPU int64 memory int64 @@ -422,8 +391,12 @@ func podName(pod *api.Pod) string { return pod.Namespace + "/" + pod.Name } -func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, info *api.Node) (bool, error) { - allocatable := info.Status.Allocatable +func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + node := nodeInfo.Node() + if node == nil { + return false, fmt.Errorf("node not found: %q", nodeName) + } + allocatable := node.Status.Allocatable allowedPodNumber := allocatable.Pods().Value() if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber { return false, @@ -450,26 +423,8 @@ func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *scheduler return true, nil } -func (r *NodeStatus) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - info, err := r.info.GetNodeInfo(nodeName) - if err != nil { - return false, err - } - return podFitsResourcesInternal(pod, nodeName, nodeInfo, info) -} - -func NewResourceFitPredicate(info NodeInfo) algorithm.FitPredicate { - fit := &NodeStatus{ - info: info, - } - return fit.PodFitsResources -} - -func NewSelectorMatchPredicate(info NodeInfo) algorithm.FitPredicate { - selector := &NodeStatus{ - info: info, - } - return selector.PodSelectorMatches +func PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + return podFitsResourcesInternal(pod, nodeName, nodeInfo) } // nodeMatchesNodeSelectorTerms checks if a node's labels satisfy a list of node selector terms, @@ -541,14 +496,10 @@ func PodMatchesNodeLabels(pod *api.Pod, node *api.Node) bool { return nodeAffinityMatches } -type NodeSelector struct { - info NodeInfo -} - -func (n *NodeStatus) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - node, err := n.info.GetNodeInfo(nodeName) - if err != nil { - return false, err +func PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + node := nodeInfo.Node() + if node == nil { + return false, fmt.Errorf("node not found: %q", nodeName) } if PodMatchesNodeLabels(pod, node) { return true, nil @@ -567,14 +518,12 @@ func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInf } type NodeLabelChecker struct { - info NodeInfo labels []string presence bool } -func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algorithm.FitPredicate { +func NewNodeLabelPredicate(labels []string, presence bool) algorithm.FitPredicate { labelChecker := &NodeLabelChecker{ - info: info, labels: labels, presence: presence, } @@ -594,11 +543,12 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algori // A node may have a label with "retiring" as key and the date as the value // and it may be desirable to avoid scheduling new pods on this node func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - var exists bool - node, err := n.info.GetNodeInfo(nodeName) - if err != nil { - return false, err + node := nodeInfo.Node() + if node == nil { + return false, fmt.Errorf("node not found: %q", nodeName) } + + var exists bool nodeLabels := labels.Set(node.Labels) for _, label := range n.labels { exists = nodeLabels.Has(label) @@ -725,11 +675,16 @@ func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.No } func getUsedPorts(pods ...*api.Pod) map[int]bool { + // TODO: Aggregate it at the NodeInfo level. ports := make(map[int]bool) for _, pod := range pods { for _, container := range pod.Spec.Containers { for _, podPort := range container.Ports { - ports[podPort.HostPort] = true + // "0" is explicitly ignored in PodFitsHostPorts, + // which is the only function that uses this value. + if podPort.HostPort != 0 { + ports[podPort.HostPort] = true + } } } } @@ -748,27 +703,8 @@ func haveSame(a1, a2 []string) bool { return false } -type NodeStatus struct { - info NodeInfo -} - -func GeneralPredicates(info NodeInfo) algorithm.FitPredicate { - node := &NodeStatus{ - info: info, - } - return node.SchedulerGeneralPredicates -} - -func (n *NodeStatus) SchedulerGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - node, err := n.info.GetNodeInfo(nodeName) - if err != nil { - return false, err - } - return RunGeneralPredicates(pod, nodeName, nodeInfo, node) -} - -func RunGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, node *api.Node) (bool, error) { - fit, err := podFitsResourcesInternal(pod, nodeName, nodeInfo, node) +func GeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + fit, err := podFitsResourcesInternal(pod, nodeName, nodeInfo) if !fit { return fit, err } @@ -781,8 +717,9 @@ func RunGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercach if !fit { return fit, err } - if !PodMatchesNodeLabels(pod, node) { - return false, ErrNodeSelectorNotMatch + fit, err = PodSelectorMatches(pod, nodeName, nodeInfo) + if !fit { + return fit, err } return true, nil } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index b0fcf6472c..c0dc7913fa 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -158,9 +158,9 @@ func TestPodFitsResources(t *testing.T) { for _, test := range enoughPodsTests { node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}} + test.nodeInfo.SetNode(&node) - fit := NodeStatus{FakeNodeInfo(node)} - fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo) + fits, err := PodFitsResources(test.pod, "machine", test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) } @@ -203,9 +203,9 @@ func TestPodFitsResources(t *testing.T) { } for _, test := range notEnoughPodsTests { node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1)}} + test.nodeInfo.SetNode(&node) - fit := NodeStatus{FakeNodeInfo(node)} - fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo) + fits, err := PodFitsResources(test.pod, "machine", test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) } @@ -994,9 +994,10 @@ func TestPodFitsSelector(t *testing.T) { for _, test := range tests { node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}} + nodeInfo := schedulercache.NewNodeInfo() + nodeInfo.SetNode(&node) - fit := NodeStatus{FakeNodeInfo(node)} - fits, err := fit.PodSelectorMatches(test.pod, "machine", schedulercache.NewNodeInfo()) + fits, err := PodSelectorMatches(test.pod, "machine", nodeInfo) if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -1057,8 +1058,11 @@ func TestNodeLabelPresence(t *testing.T) { } for _, test := range tests { node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}} - labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence} - fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", schedulercache.NewNodeInfo()) + nodeInfo := schedulercache.NewNodeInfo() + nodeInfo.SetNode(&node) + + labelChecker := NodeLabelChecker{test.labels, test.presence} + fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", nodeInfo) if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -1550,7 +1554,8 @@ func TestRunGeneralPredicates(t *testing.T) { }, } for _, test := range resourceTests { - fits, err := RunGeneralPredicates(test.pod, test.nodeName, test.nodeInfo, test.node) + test.nodeInfo.SetNode(test.node) + fits, err := GeneralPredicates(test.pod, test.nodeName, test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 7f6d05780f..08a81a371d 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -84,16 +84,13 @@ func init() { // Fit is determined by resource availability. // This predicate is actually a default predicate, because it is invoked from // predicates.GeneralPredicates() - factory.RegisterFitPredicateFactory( - "PodFitsResources", - func(args factory.PluginFactoryArgs) algorithm.FitPredicate { - return predicates.NewResourceFitPredicate(args.NodeInfo) - }, - ) + factory.RegisterFitPredicate("PodFitsResources", predicates.PodFitsResources) // Fit is determined by the presence of the Host parameter and a string match // This predicate is actually a default predicate, because it is invoked from // predicates.GeneralPredicates() factory.RegisterFitPredicate("HostName", predicates.PodFitsHost) + // Fit is determined by node selector query. + factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches) } func defaultPredicates() sets.String { @@ -104,14 +101,7 @@ func defaultPredicates() sets.String { factory.RegisterFitPredicateFactory( "NoVolumeZoneConflict", func(args factory.PluginFactoryArgs) algorithm.FitPredicate { - return predicates.NewVolumeZonePredicate(args.NodeInfo, args.PVInfo, args.PVCInfo) - }, - ), - // Fit is determined by node selector query. - factory.RegisterFitPredicateFactory( - "MatchNodeSelector", - func(args factory.PluginFactoryArgs) algorithm.FitPredicate { - return predicates.NewSelectorMatchPredicate(args.NodeInfo) + return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo) }, ), // Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node @@ -134,12 +124,7 @@ func defaultPredicates() sets.String { ), // GeneralPredicates are the predicates that are enforced by all Kubernetes components // (e.g. kubelet and all schedulers) - factory.RegisterFitPredicateFactory( - "GeneralPredicates", - func(args factory.PluginFactoryArgs) algorithm.FitPredicate { - return predicates.GeneralPredicates(args.NodeInfo) - }, - ), + factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates), ) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 55e2b51610..801135a72d 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -75,7 +75,9 @@ type ConfigFactory struct { StopEverything chan struct{} scheduledPodPopulator *framework.Controller - schedulerCache schedulercache.Cache + nodePopulator *framework.Controller + + schedulerCache schedulercache.Cache // SchedulerName of a scheduler is used to select which pods will be // processed by this scheduler, based on pods's annotation key: @@ -93,7 +95,7 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), ScheduledPodLister: &cache.StoreToPodLister{}, // Only nodes in the "Ready" condition with status == "True" are schedulable - NodeLister: &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, + NodeLister: &cache.StoreToNodeLister{}, PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, @@ -115,57 +117,122 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor &api.Pod{}, 0, framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod, ok := obj.(*api.Pod) - if !ok { - glog.Errorf("cannot convert to *api.Pod") - return - } - if err := schedulerCache.AddPod(pod); err != nil { - glog.Errorf("scheduler cache AddPod failed: %v", err) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldPod, ok := oldObj.(*api.Pod) - if !ok { - glog.Errorf("cannot convert to *api.Pod") - return - } - newPod, ok := newObj.(*api.Pod) - if !ok { - glog.Errorf("cannot convert to *api.Pod") - return - } - if err := schedulerCache.UpdatePod(oldPod, newPod); err != nil { - glog.Errorf("scheduler cache UpdatePod failed: %v", err) - } - }, - DeleteFunc: func(obj interface{}) { - var pod *api.Pod - switch t := obj.(type) { - case *api.Pod: - pod = t - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*api.Pod) - if !ok { - glog.Errorf("cannot convert to *api.Pod") - return - } - default: - glog.Errorf("cannot convert to *api.Pod") - return - } - if err := schedulerCache.RemovePod(pod); err != nil { - glog.Errorf("scheduler cache RemovePod failed: %v", err) - } - }, + AddFunc: c.addPodToCache, + UpdateFunc: c.updatePodInCache, + DeleteFunc: c.deletePodFromCache, + }, + ) + + c.NodeLister.Store, c.nodePopulator = framework.NewInformer( + c.createNodeLW(), + &api.Node{}, + 0, + framework.ResourceEventHandlerFuncs{ + AddFunc: c.addNodeToCache, + UpdateFunc: c.updateNodeInCache, + DeleteFunc: c.deleteNodeFromCache, }, ) return c } +func (c *ConfigFactory) addPodToCache(obj interface{}) { + pod, ok := obj.(*api.Pod) + if !ok { + glog.Errorf("cannot convert to *api.Pod: %v", obj) + return + } + if err := c.schedulerCache.AddPod(pod); err != nil { + glog.Errorf("scheduler cache AddPod failed: %v", err) + } +} + +func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) { + oldPod, ok := oldObj.(*api.Pod) + if !ok { + glog.Errorf("cannot convert oldObj to *api.Pod: %v", oldObj) + return + } + newPod, ok := newObj.(*api.Pod) + if !ok { + glog.Errorf("cannot convert newObj to *api.Pod: %v", newObj) + return + } + if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil { + glog.Errorf("scheduler cache UpdatePod failed: %v", err) + } +} + +func (c *ConfigFactory) deletePodFromCache(obj interface{}) { + var pod *api.Pod + switch t := obj.(type) { + case *api.Pod: + pod = t + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*api.Pod) + if !ok { + glog.Errorf("cannot convert to *api.Pod: %v", t.Obj) + return + } + default: + glog.Errorf("cannot convert to *api.Pod: %v", t) + return + } + if err := c.schedulerCache.RemovePod(pod); err != nil { + glog.Errorf("scheduler cache RemovePod failed: %v", err) + } +} + +func (c *ConfigFactory) addNodeToCache(obj interface{}) { + node, ok := obj.(*api.Node) + if !ok { + glog.Errorf("cannot convert to *api.Node: %v", obj) + return + } + if err := c.schedulerCache.AddNode(node); err != nil { + glog.Errorf("scheduler cache AddNode failed: %v", err) + } +} + +func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) { + oldNode, ok := oldObj.(*api.Node) + if !ok { + glog.Errorf("cannot convert oldObj to *api.Node: %v", oldObj) + return + } + newNode, ok := newObj.(*api.Node) + if !ok { + glog.Errorf("cannot convert newObj to *api.Node: %v", newObj) + return + } + if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil { + glog.Errorf("scheduler cache UpdateNode failed: %v", err) + } +} + +func (c *ConfigFactory) deleteNodeFromCache(obj interface{}) { + var node *api.Node + switch t := obj.(type) { + case *api.Node: + node = t + case cache.DeletedFinalStateUnknown: + var ok bool + node, ok = t.Obj.(*api.Node) + if !ok { + glog.Errorf("cannot convert to *api.Node: %v", t.Obj) + return + } + default: + glog.Errorf("cannot convert to *api.Node: %v", t) + return + } + if err := c.schedulerCache.RemoveNode(node); err != nil { + glog.Errorf("scheduler cache RemoveNode failed: %v", err) + } +} + // Create creates a scheduler with the default algorithm provider. func (f *ConfigFactory) Create() (*scheduler.Config, error) { return f.CreateFromProvider(DefaultProvider) @@ -247,9 +314,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // Begin populating scheduled pods. go f.scheduledPodPopulator.Run(f.StopEverything) - // Watch nodes. - // Nodes may be listed frequently, so provide a local up-to-date cache. - cache.NewReflector(f.createNodeLW(), &api.Node{}, f.NodeLister.Store, 0).RunUntil(f.StopEverything) + // Begin populating nodes. + go f.nodePopulator.Run(f.StopEverything) // Watch PVs & PVCs // They may be listed frequently for scheduling constraints, so provide a local up-to-date cache. diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index 93266100bf..a4a5857c27 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -111,7 +111,6 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { } else if policy.Argument.LabelsPresence != nil { predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate { return predicates.NewNodeLabelPredicate( - args.NodeInfo, policy.Argument.LabelsPresence.Labels, policy.Argument.LabelsPresence.Presence, ) diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index c6e749c6a7..f0cd01abe7 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -177,7 +177,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *api.Pod) error { } func (cache *schedulerCache) updatePod(oldPod, newPod *api.Pod) error { - if err := cache.deletePod(oldPod); err != nil { + if err := cache.removePod(oldPod); err != nil { return err } cache.addPod(newPod) @@ -193,12 +193,12 @@ func (cache *schedulerCache) addPod(pod *api.Pod) { n.addPod(pod) } -func (cache *schedulerCache) deletePod(pod *api.Pod) error { +func (cache *schedulerCache) removePod(pod *api.Pod) error { n := cache.nodes[pod.Spec.NodeName] if err := n.removePod(pod); err != nil { return err } - if len(n.pods) == 0 { + if len(n.pods) == 0 && n.node == nil { delete(cache.nodes, pod.Spec.NodeName) } return nil @@ -218,7 +218,7 @@ func (cache *schedulerCache) RemovePod(pod *api.Pod) error { // An assumed pod won't have Delete/Remove event. It needs to have Add event // before Remove event, in which case the state would change from Assumed to Added. case ok && !cache.assumedPods[key]: - err := cache.deletePod(pod) + err := cache.removePod(pod) if err != nil { return err } @@ -229,6 +229,48 @@ func (cache *schedulerCache) RemovePod(pod *api.Pod) error { return nil } +func (cache *schedulerCache) AddNode(node *api.Node) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + n, ok := cache.nodes[node.Name] + if !ok { + n = NewNodeInfo() + cache.nodes[node.Name] = n + } + return n.SetNode(node) +} + +func (cache *schedulerCache) UpdateNode(oldNode, newNode *api.Node) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + n, ok := cache.nodes[newNode.Name] + if !ok { + n = NewNodeInfo() + cache.nodes[newNode.Name] = n + } + return n.SetNode(newNode) +} + +func (cache *schedulerCache) RemoveNode(node *api.Node) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + n := cache.nodes[node.Name] + if err := n.RemoveNode(node); err != nil { + return err + } + // We remove NodeInfo for this node only if there aren't any pods on this node. + // We can't do it unconditionally, because notifications about pods are delivered + // in a different watch, and thus can potentially be observed later, even though + // they happened before node removal. + if len(n.pods) == 0 && n.node == nil { + delete(cache.nodes, node.Name) + } + return nil +} + func (cache *schedulerCache) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) } @@ -257,7 +299,7 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { } func (cache *schedulerCache) expirePod(key string, ps *podState) error { - if err := cache.deletePod(ps.pod); err != nil { + if err := cache.removePod(ps.pod); err != nil { return err } delete(cache.assumedPods, key) diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go index 59557bdd29..07330a2a89 100644 --- a/plugin/pkg/scheduler/schedulercache/interface.go +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -71,6 +71,15 @@ type Cache interface { // RemovePod removes a pod. The pod's information would be subtracted from assigned node. RemovePod(pod *api.Pod) error + // AddNode adds overall information about node. + AddNode(node *api.Node) error + + // UpdateNode updates overall information about node. + UpdateNode(oldNode, newNode *api.Node) error + + // RemoveNode removes overall information about node. + RemoveNode(node *api.Node) error + // GetNodeNameToInfoMap returns a map of node names to node info. The node info contains // aggregated information of pods scheduled (including assumed to be) on this node. GetNodeNameToInfoMap() (map[string]*NodeInfo, error) diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 7f40e556f1..320b9135b1 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -30,6 +30,9 @@ var emptyResource = Resource{} // NodeInfo is node level aggregated information. type NodeInfo struct { + // Overall node information. + node *api.Node + // Total requested resource of all pods on this node. // It includes assumed pods which scheduler sends binding to apiserver but // didn't get it as scheduled yet. @@ -58,6 +61,14 @@ func NewNodeInfo(pods ...*api.Pod) *NodeInfo { return ni } +// Returns overall information about this node. +func (n *NodeInfo) Node() *api.Node { + if n == nil { + return nil + } + return n.node +} + // Pods return all pods scheduled (including assumed to be) on this node. func (n *NodeInfo) Pods() []*api.Pod { if n == nil { @@ -85,6 +96,7 @@ func (n *NodeInfo) NonZeroRequest() Resource { func (n *NodeInfo) Clone() *NodeInfo { pods := append([]*api.Pod(nil), n.pods...) clone := &NodeInfo{ + node: n.node, requestedResource: &(*n.requestedResource), nonzeroRequest: &(*n.nonzeroRequest), pods: pods, @@ -153,6 +165,22 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, non0_cpu int64, non0 return } +// Sets the overall node information. +func (n *NodeInfo) SetNode(node *api.Node) error { + n.node = node + return nil +} + +// Removes the overall information about the node. +func (n *NodeInfo) RemoveNode(node *api.Node) error { + // We don't remove NodeInfo for because there can still be some pods on this node - + // this is because notifications about pods are delivered in a different watch, + // and thus can potentially be observed later, even though they happened before + // node removal. This is handled correctly in cache.go file. + n.node = nil + return nil +} + // getPodKey returns the string key of a pod. func getPodKey(pod *api.Pod) (string, error) { return clientcache.MetaNamespaceKeyFunc(pod) diff --git a/plugin/pkg/scheduler/testing/fake_cache.go b/plugin/pkg/scheduler/testing/fake_cache.go index 09e8660a15..02c76d3d65 100644 --- a/plugin/pkg/scheduler/testing/fake_cache.go +++ b/plugin/pkg/scheduler/testing/fake_cache.go @@ -38,6 +38,12 @@ func (f *FakeCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil } func (f *FakeCache) RemovePod(pod *api.Pod) error { return nil } +func (f *FakeCache) AddNode(node *api.Node) error { return nil } + +func (f *FakeCache) UpdateNode(oldNode, newNode *api.Node) error { return nil } + +func (f *FakeCache) RemoveNode(node *api.Node) error { return nil } + func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) { return nil, nil } diff --git a/plugin/pkg/scheduler/testing/pods_to_cache.go b/plugin/pkg/scheduler/testing/pods_to_cache.go index b58d19d541..99fe15ee70 100644 --- a/plugin/pkg/scheduler/testing/pods_to_cache.go +++ b/plugin/pkg/scheduler/testing/pods_to_cache.go @@ -35,6 +35,12 @@ func (p PodsToCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil } func (p PodsToCache) RemovePod(pod *api.Pod) error { return nil } +func (p PodsToCache) AddNode(node *api.Node) error { return nil } + +func (p PodsToCache) UpdateNode(oldNode, newNode *api.Node) error { return nil } + +func (p PodsToCache) RemoveNode(node *api.Node) error { return nil } + func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) { return schedulercache.CreateNodeNameToInfoMap(p), nil }