mirror of https://github.com/k3s-io/k3s
Fist level ecache for nodeMap
Use new cache map in scheduler Add a integration test Move init before schedudling Add lock for first level cachepull/8/head
parent
17977478e7
commit
e5a7a4caf7
|
@ -33,25 +33,136 @@ import (
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Cache saves and reuses the output of predicate functions. Use RunPredicate to
|
// nodeMap stores a *Cache for each node.
|
||||||
// get or update the cached results. An appropriate Invalidate* function should
|
type nodeMap map[string]*NodeCache
|
||||||
// be called when some predicate results are no longer valid.
|
|
||||||
|
// Cache is a thread safe map saves and reuses the output of predicate functions,
|
||||||
|
// it uses node name as key to access those cached results.
|
||||||
//
|
//
|
||||||
// Internally, results are keyed by node name, predicate name, and "equivalence
|
// Internally, results are keyed by predicate name, and "equivalence
|
||||||
// class". (Equivalence class is defined in the `Class` type.) Saved results
|
// class". (Equivalence class is defined in the `Class` type.) Saved results
|
||||||
// will be reused until an appropriate invalidation function is called.
|
// will be reused until an appropriate invalidation function is called.
|
||||||
type Cache struct {
|
type Cache struct {
|
||||||
mu sync.RWMutex
|
// NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while
|
||||||
cache nodeMap
|
// the reality is lock contention in first level cache is rare.
|
||||||
|
mu sync.RWMutex
|
||||||
|
nodeToCache nodeMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCache returns an empty Cache.
|
// NewCache create an empty equiv class cache.
|
||||||
func NewCache() *Cache {
|
func NewCache() *Cache {
|
||||||
return &Cache{
|
return &Cache{
|
||||||
cache: make(nodeMap),
|
nodeToCache: make(nodeMap),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to
|
||||||
|
// get or update the cached results. An appropriate Invalidate* function should
|
||||||
|
// be called when some predicate results are no longer valid.
|
||||||
|
//
|
||||||
|
// Internally, results are keyed by predicate name, and "equivalence
|
||||||
|
// class". (Equivalence class is defined in the `Class` type.) Saved results
|
||||||
|
// will be reused until an appropriate invalidation function is called.
|
||||||
|
//
|
||||||
|
// NodeCache objects are thread safe within the context of NodeCache,
|
||||||
|
type NodeCache struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
cache predicateMap
|
||||||
|
}
|
||||||
|
|
||||||
|
// newNodeCache returns an empty NodeCache.
|
||||||
|
func newNodeCache() *NodeCache {
|
||||||
|
return &NodeCache{
|
||||||
|
cache: make(predicateMap),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNodeCache returns the existing NodeCache for given node if present. Otherwise,
|
||||||
|
// it creates the NodeCache and returns it.
|
||||||
|
// The boolean flag is true if the value was loaded, false if created.
|
||||||
|
func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if nodeCache, exists = c.nodeToCache[name]; !exists {
|
||||||
|
nodeCache = newNodeCache()
|
||||||
|
c.nodeToCache[name] = nodeCache
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// InvalidatePredicates clears all cached results for the given predicates.
|
||||||
|
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
|
||||||
|
if len(predicateKeys) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
for _, n := range c.nodeToCache {
|
||||||
|
n.invalidatePreds(predicateKeys)
|
||||||
|
}
|
||||||
|
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
|
||||||
|
func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
|
||||||
|
if len(predicateKeys) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
if n, ok := c.nodeToCache[nodeName]; ok {
|
||||||
|
n.invalidatePreds(predicateKeys)
|
||||||
|
}
|
||||||
|
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InvalidateAllPredicatesOnNode clears all cached results for one node.
|
||||||
|
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
delete(c.nodeToCache, nodeName)
|
||||||
|
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
|
||||||
|
// InvalidateCachedPredicateItem for pod add case
|
||||||
|
// TODO: This does not belong with the equivalence cache implementation.
|
||||||
|
func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
|
||||||
|
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
|
||||||
|
// will not break the existing inter pod affinity. So we does not need to
|
||||||
|
// invalidate MatchInterPodAffinity when pod added.
|
||||||
|
//
|
||||||
|
// But when a pod is deleted, existing inter pod affinity may become invalid.
|
||||||
|
// (e.g. this pod was preferred by some else, or vice versa)
|
||||||
|
//
|
||||||
|
// NOTE: assumptions above will not stand when we implemented features like
|
||||||
|
// RequiredDuringSchedulingRequiredDuringExecutioc.
|
||||||
|
|
||||||
|
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
|
||||||
|
// it will also fits to equivalence class of existing pods
|
||||||
|
|
||||||
|
// GeneralPredicates: will always be affected by adding a new pod
|
||||||
|
invalidPredicates := sets.NewString(predicates.GeneralPred)
|
||||||
|
|
||||||
|
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc.
|
||||||
|
for _, vol := range pod.Spec.Volumes {
|
||||||
|
if vol.PersistentVolumeClaim != nil {
|
||||||
|
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred)
|
||||||
|
} else {
|
||||||
|
if vol.AWSElasticBlockStore != nil {
|
||||||
|
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
|
||||||
|
}
|
||||||
|
if vol.GCEPersistentDisk != nil {
|
||||||
|
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
|
||||||
|
}
|
||||||
|
if vol.AzureDisk != nil {
|
||||||
|
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
|
||||||
|
}
|
||||||
|
|
||||||
// Class represents a set of pods which are equivalent from the perspective of
|
// Class represents a set of pods which are equivalent from the perspective of
|
||||||
// the scheduler. i.e. the scheduler would make the same decision for any pod
|
// the scheduler. i.e. the scheduler would make the same decision for any pod
|
||||||
// from the same class.
|
// from the same class.
|
||||||
|
@ -78,9 +189,6 @@ func NewClass(pod *v1.Pod) *Class {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeMap stores PredicateCaches with node name as the key.
|
|
||||||
type nodeMap map[string]predicateMap
|
|
||||||
|
|
||||||
// predicateMap stores resultMaps with predicate name as the key.
|
// predicateMap stores resultMaps with predicate name as the key.
|
||||||
type predicateMap map[string]resultMap
|
type predicateMap map[string]resultMap
|
||||||
|
|
||||||
|
@ -97,7 +205,7 @@ type predicateResult struct {
|
||||||
// run and its results cached for the next call.
|
// run and its results cached for the next call.
|
||||||
//
|
//
|
||||||
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
|
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
|
||||||
func (c *Cache) RunPredicate(
|
func (n *NodeCache) RunPredicate(
|
||||||
pred algorithm.FitPredicate,
|
pred algorithm.FitPredicate,
|
||||||
predicateKey string,
|
predicateKey string,
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
|
@ -111,7 +219,7 @@ func (c *Cache) RunPredicate(
|
||||||
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
|
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
|
||||||
}
|
}
|
||||||
|
|
||||||
result, ok := c.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash)
|
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash)
|
||||||
if ok {
|
if ok {
|
||||||
return result.Fit, result.FailReasons, nil
|
return result.Fit, result.FailReasons, nil
|
||||||
}
|
}
|
||||||
|
@ -120,13 +228,13 @@ func (c *Cache) RunPredicate(
|
||||||
return fit, reasons, err
|
return fit, reasons, err
|
||||||
}
|
}
|
||||||
if cache != nil {
|
if cache != nil {
|
||||||
c.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
|
n.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
|
||||||
}
|
}
|
||||||
return fit, reasons, nil
|
return fit, reasons, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateResult updates the cached result of a predicate.
|
// updateResult updates the cached result of a predicate.
|
||||||
func (c *Cache) updateResult(
|
func (n *NodeCache) updateResult(
|
||||||
podName, predicateKey string,
|
podName, predicateKey string,
|
||||||
fit bool,
|
fit bool,
|
||||||
reasons []algorithm.PredicateFailureReason,
|
reasons []algorithm.PredicateFailureReason,
|
||||||
|
@ -134,8 +242,6 @@ func (c *Cache) updateResult(
|
||||||
cache schedulercache.Cache,
|
cache schedulercache.Cache,
|
||||||
nodeInfo *schedulercache.NodeInfo,
|
nodeInfo *schedulercache.NodeInfo,
|
||||||
) {
|
) {
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
if nodeInfo == nil || nodeInfo.Node() == nil {
|
if nodeInfo == nil || nodeInfo.Node() == nil {
|
||||||
// This may happen during tests.
|
// This may happen during tests.
|
||||||
return
|
return
|
||||||
|
@ -144,114 +250,48 @@ func (c *Cache) updateResult(
|
||||||
if !cache.IsUpToDate(nodeInfo) {
|
if !cache.IsUpToDate(nodeInfo) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nodeName := nodeInfo.Node().GetName()
|
|
||||||
if _, exist := c.cache[nodeName]; !exist {
|
|
||||||
c.cache[nodeName] = make(predicateMap)
|
|
||||||
}
|
|
||||||
predicateItem := predicateResult{
|
predicateItem := predicateResult{
|
||||||
Fit: fit,
|
Fit: fit,
|
||||||
FailReasons: reasons,
|
FailReasons: reasons,
|
||||||
}
|
}
|
||||||
// if cached predicate map already exists, just update the predicate by key
|
|
||||||
if predicates, ok := c.cache[nodeName][predicateKey]; ok {
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
|
// If cached predicate map already exists, just update the predicate by key
|
||||||
|
if predicates, ok := n.cache[predicateKey]; ok {
|
||||||
// maps in golang are references, no need to add them back
|
// maps in golang are references, no need to add them back
|
||||||
predicates[equivalenceHash] = predicateItem
|
predicates[equivalenceHash] = predicateItem
|
||||||
} else {
|
} else {
|
||||||
c.cache[nodeName][predicateKey] =
|
n.cache[predicateKey] =
|
||||||
resultMap{
|
resultMap{
|
||||||
equivalenceHash: predicateItem,
|
equivalenceHash: predicateItem,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glog.V(5).Infof("Cache update: node=%s,predicate=%s,pod=%s,value=%v", nodeName, predicateKey, podName, predicateItem)
|
|
||||||
|
glog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v",
|
||||||
|
nodeInfo.Node().Name, predicateKey, podName, predicateItem)
|
||||||
}
|
}
|
||||||
|
|
||||||
// lookupResult returns cached predicate results and a bool saying whether a
|
// lookupResult returns cached predicate results and a bool saying whether a
|
||||||
// cache entry was found.
|
// cache entry was found.
|
||||||
func (c *Cache) lookupResult(
|
func (n *NodeCache) lookupResult(
|
||||||
podName, nodeName, predicateKey string,
|
podName, nodeName, predicateKey string,
|
||||||
equivalenceHash uint64,
|
equivalenceHash uint64,
|
||||||
) (value predicateResult, ok bool) {
|
) (value predicateResult, ok bool) {
|
||||||
c.mu.RLock()
|
n.mu.RLock()
|
||||||
defer c.mu.RUnlock()
|
defer n.mu.RUnlock()
|
||||||
glog.V(5).Infof("Cache lookup: node=%s,predicate=%s,pod=%s", nodeName, predicateKey, podName)
|
value, ok = n.cache[predicateKey][equivalenceHash]
|
||||||
value, ok = c.cache[nodeName][predicateKey][equivalenceHash]
|
|
||||||
return value, ok
|
return value, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// InvalidatePredicates clears all cached results for the given predicates.
|
// invalidatePreds deletes cached predicates by given keys.
|
||||||
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
|
func (n *NodeCache) invalidatePreds(predicateKeys sets.String) {
|
||||||
if len(predicateKeys) == 0 {
|
n.mu.Lock()
|
||||||
return
|
defer n.mu.Unlock()
|
||||||
}
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
// c.cache uses nodeName as key, so we just iterate it and invalid given predicates
|
|
||||||
for _, predicates := range c.cache {
|
|
||||||
for predicateKey := range predicateKeys {
|
|
||||||
delete(predicates, predicateKey)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
|
|
||||||
}
|
|
||||||
|
|
||||||
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
|
|
||||||
func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
|
|
||||||
if len(predicateKeys) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
for predicateKey := range predicateKeys {
|
for predicateKey := range predicateKeys {
|
||||||
delete(c.cache[nodeName], predicateKey)
|
delete(n.cache, predicateKey)
|
||||||
}
|
}
|
||||||
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
|
|
||||||
}
|
|
||||||
|
|
||||||
// InvalidateAllPredicatesOnNode clears all cached results for one node.
|
|
||||||
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
delete(c.cache, nodeName)
|
|
||||||
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
|
|
||||||
}
|
|
||||||
|
|
||||||
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
|
|
||||||
// InvalidateCachedPredicateItem for pod add case
|
|
||||||
// TODO: This does not belong with the equivalence cache implementation.
|
|
||||||
func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
|
|
||||||
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
|
|
||||||
// will not break the existing inter pod affinity. So we does not need to
|
|
||||||
// invalidate MatchInterPodAffinity when pod added.
|
|
||||||
//
|
|
||||||
// But when a pod is deleted, existing inter pod affinity may become invalid.
|
|
||||||
// (e.g. this pod was preferred by some else, or vice versa)
|
|
||||||
//
|
|
||||||
// NOTE: assumptions above will not stand when we implemented features like
|
|
||||||
// RequiredDuringSchedulingRequiredDuringExecution.
|
|
||||||
|
|
||||||
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
|
|
||||||
// it will also fits to equivalence class of existing pods
|
|
||||||
|
|
||||||
// GeneralPredicates: will always be affected by adding a new pod
|
|
||||||
invalidPredicates := sets.NewString(predicates.GeneralPred)
|
|
||||||
|
|
||||||
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision.
|
|
||||||
for _, vol := range pod.Spec.Volumes {
|
|
||||||
if vol.PersistentVolumeClaim != nil {
|
|
||||||
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred)
|
|
||||||
} else {
|
|
||||||
if vol.AWSElasticBlockStore != nil {
|
|
||||||
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
|
|
||||||
}
|
|
||||||
if vol.GCEPersistentDisk != nil {
|
|
||||||
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
|
|
||||||
}
|
|
||||||
if vol.AzureDisk != nil {
|
|
||||||
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// equivalencePod is the set of pod attributes which must match for two pods to
|
// equivalencePod is the set of pod attributes which must match for two pods to
|
||||||
|
|
|
@ -243,17 +243,21 @@ func TestRunPredicate(t *testing.T) {
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
node := schedulercache.NewNodeInfo()
|
node := schedulercache.NewNodeInfo()
|
||||||
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}})
|
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}
|
||||||
|
node.SetNode(testNode)
|
||||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}}
|
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}}
|
||||||
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
|
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
|
||||||
|
|
||||||
|
// Initialize and populate equivalence class cache.
|
||||||
ecache := NewCache()
|
ecache := NewCache()
|
||||||
|
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
|
||||||
|
|
||||||
equivClass := NewClass(pod)
|
equivClass := NewClass(pod)
|
||||||
if test.expectCacheHit {
|
if test.expectCacheHit {
|
||||||
ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
|
nodeCache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
|
fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err.Error() != test.expectedError {
|
if err.Error() != test.expectedError {
|
||||||
|
@ -284,7 +288,7 @@ func TestRunPredicate(t *testing.T) {
|
||||||
if !test.expectCacheHit && test.pred.callCount == 0 {
|
if !test.expectCacheHit && test.pred.callCount == 0 {
|
||||||
t.Errorf("Predicate should be called")
|
t.Errorf("Predicate should be called")
|
||||||
}
|
}
|
||||||
_, ok := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
|
_, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
|
||||||
if !ok && test.expectCacheWrite {
|
if !ok && test.expectCacheWrite {
|
||||||
t.Errorf("Cache write should happen")
|
t.Errorf("Cache write should happen")
|
||||||
}
|
}
|
||||||
|
@ -339,21 +343,25 @@ func TestUpdateResult(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
node := schedulercache.NewNodeInfo()
|
||||||
|
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
|
||||||
|
node.SetNode(testNode)
|
||||||
|
|
||||||
|
// Initialize and populate equivalence class cache.
|
||||||
ecache := NewCache()
|
ecache := NewCache()
|
||||||
|
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
|
||||||
|
|
||||||
if test.expectPredicateMap {
|
if test.expectPredicateMap {
|
||||||
ecache.cache[test.nodeName] = make(predicateMap)
|
|
||||||
predicateItem := predicateResult{
|
predicateItem := predicateResult{
|
||||||
Fit: true,
|
Fit: true,
|
||||||
}
|
}
|
||||||
ecache.cache[test.nodeName][test.predicateKey] =
|
nodeCache.cache[test.predicateKey] =
|
||||||
resultMap{
|
resultMap{
|
||||||
test.equivalenceHash: predicateItem,
|
test.equivalenceHash: predicateItem,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
node := schedulercache.NewNodeInfo()
|
nodeCache.updateResult(
|
||||||
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
|
|
||||||
ecache.updateResult(
|
|
||||||
test.pod,
|
test.pod,
|
||||||
test.predicateKey,
|
test.predicateKey,
|
||||||
test.fit,
|
test.fit,
|
||||||
|
@ -363,7 +371,7 @@ func TestUpdateResult(t *testing.T) {
|
||||||
node,
|
node,
|
||||||
)
|
)
|
||||||
|
|
||||||
cachedMapItem, ok := ecache.cache[test.nodeName][test.predicateKey]
|
cachedMapItem, ok := nodeCache.cache[test.predicateKey]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Errorf("Failed: %s, can't find expected cache item: %v",
|
t.Errorf("Failed: %s, can't find expected cache item: %v",
|
||||||
test.name, test.expectCacheItem)
|
test.name, test.expectCacheItem)
|
||||||
|
@ -473,11 +481,16 @@ func TestLookupResult(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
|
||||||
|
|
||||||
|
// Initialize and populate equivalence class cache.
|
||||||
ecache := NewCache()
|
ecache := NewCache()
|
||||||
|
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
|
||||||
|
|
||||||
node := schedulercache.NewNodeInfo()
|
node := schedulercache.NewNodeInfo()
|
||||||
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
|
node.SetNode(testNode)
|
||||||
// set cached item to equivalence cache
|
// set cached item to equivalence cache
|
||||||
ecache.updateResult(
|
nodeCache.updateResult(
|
||||||
test.podName,
|
test.podName,
|
||||||
test.predicateKey,
|
test.predicateKey,
|
||||||
test.cachedItem.fit,
|
test.cachedItem.fit,
|
||||||
|
@ -493,7 +506,7 @@ func TestLookupResult(t *testing.T) {
|
||||||
ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys)
|
ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys)
|
||||||
}
|
}
|
||||||
// calculate predicate with equivalence cache
|
// calculate predicate with equivalence cache
|
||||||
result, ok := ecache.lookupResult(test.podName,
|
result, ok := nodeCache.lookupResult(test.podName,
|
||||||
test.nodeName,
|
test.nodeName,
|
||||||
test.predicateKey,
|
test.predicateKey,
|
||||||
test.equivalenceHashForCalPredicate,
|
test.equivalenceHashForCalPredicate,
|
||||||
|
@ -689,9 +702,12 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
node := schedulercache.NewNodeInfo()
|
node := schedulercache.NewNodeInfo()
|
||||||
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
|
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
|
||||||
|
node.SetNode(testNode)
|
||||||
|
|
||||||
|
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
|
||||||
// set cached item to equivalence cache
|
// set cached item to equivalence cache
|
||||||
ecache.updateResult(
|
nodeCache.updateResult(
|
||||||
test.podName,
|
test.podName,
|
||||||
testPredicate,
|
testPredicate,
|
||||||
test.cachedItem.fit,
|
test.cachedItem.fit,
|
||||||
|
@ -707,8 +723,8 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
||||||
|
|
||||||
// there should be no cached predicate any more
|
// there should be no cached predicate any more
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
if algorithmCache, exist := ecache.cache[test.nodeName]; exist {
|
if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist {
|
||||||
if _, exist := algorithmCache[testPredicate]; exist {
|
if _, exist := nodeCache.cache[testPredicate]; exist {
|
||||||
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
|
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
|
||||||
testPredicate, test.nodeName)
|
testPredicate, test.nodeName)
|
||||||
break
|
break
|
||||||
|
@ -761,9 +777,12 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
node := schedulercache.NewNodeInfo()
|
node := schedulercache.NewNodeInfo()
|
||||||
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
|
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
|
||||||
|
node.SetNode(testNode)
|
||||||
|
|
||||||
|
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
|
||||||
// set cached item to equivalence cache
|
// set cached item to equivalence cache
|
||||||
ecache.updateResult(
|
nodeCache.updateResult(
|
||||||
test.podName,
|
test.podName,
|
||||||
testPredicate,
|
testPredicate,
|
||||||
test.cachedItem.fit,
|
test.cachedItem.fit,
|
||||||
|
@ -775,10 +794,10 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
// invalidate cached predicate for all nodes
|
// invalidate all cached predicate for node
|
||||||
ecache.InvalidateAllPredicatesOnNode(test.nodeName)
|
ecache.InvalidateAllPredicatesOnNode(test.nodeName)
|
||||||
if _, exist := ecache.cache[test.nodeName]; exist {
|
if _, ok := ecache.GetNodeCache(test.nodeName); ok {
|
||||||
t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName)
|
t.Errorf("Failed: node: %v should not be found in internal cache", test.nodeName)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -356,20 +356,25 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
||||||
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
|
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
|
||||||
|
|
||||||
var equivClass *equivalence.Class
|
var equivClass *equivalence.Class
|
||||||
|
|
||||||
if g.equivalenceCache != nil {
|
if g.equivalenceCache != nil {
|
||||||
// getEquivalenceClassInfo will return immediately if no equivalence pod found
|
// getEquivalenceClassInfo will return immediately if no equivalence pod found
|
||||||
equivClass = equivalence.NewClass(pod)
|
equivClass = equivalence.NewClass(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkNode := func(i int) {
|
checkNode := func(i int) {
|
||||||
|
var nodeCache *equivalence.NodeCache
|
||||||
nodeName := nodes[i].Name
|
nodeName := nodes[i].Name
|
||||||
|
if g.equivalenceCache != nil {
|
||||||
|
nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
|
||||||
|
}
|
||||||
fits, failedPredicates, err := podFitsOnNode(
|
fits, failedPredicates, err := podFitsOnNode(
|
||||||
pod,
|
pod,
|
||||||
meta,
|
meta,
|
||||||
g.cachedNodeInfoMap[nodeName],
|
g.cachedNodeInfoMap[nodeName],
|
||||||
g.predicates,
|
g.predicates,
|
||||||
g.cache,
|
g.cache,
|
||||||
g.equivalenceCache,
|
nodeCache,
|
||||||
g.schedulingQueue,
|
g.schedulingQueue,
|
||||||
g.alwaysCheckAllPredicates,
|
g.alwaysCheckAllPredicates,
|
||||||
equivClass,
|
equivClass,
|
||||||
|
@ -472,7 +477,7 @@ func podFitsOnNode(
|
||||||
info *schedulercache.NodeInfo,
|
info *schedulercache.NodeInfo,
|
||||||
predicateFuncs map[string]algorithm.FitPredicate,
|
predicateFuncs map[string]algorithm.FitPredicate,
|
||||||
cache schedulercache.Cache,
|
cache schedulercache.Cache,
|
||||||
ecache *equivalence.Cache,
|
nodeCache *equivalence.NodeCache,
|
||||||
queue SchedulingQueue,
|
queue SchedulingQueue,
|
||||||
alwaysCheckAllPredicates bool,
|
alwaysCheckAllPredicates bool,
|
||||||
equivClass *equivalence.Class,
|
equivClass *equivalence.Class,
|
||||||
|
@ -512,7 +517,7 @@ func podFitsOnNode(
|
||||||
// Bypass eCache if node has any nominated pods.
|
// Bypass eCache if node has any nominated pods.
|
||||||
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
|
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
|
||||||
// when pods are nominated or their nominations change.
|
// when pods are nominated or their nominations change.
|
||||||
eCacheAvailable = equivClass != nil && !podsAdded
|
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
|
||||||
for _, predicateKey := range predicates.Ordering() {
|
for _, predicateKey := range predicates.Ordering() {
|
||||||
var (
|
var (
|
||||||
fit bool
|
fit bool
|
||||||
|
@ -522,7 +527,7 @@ func podFitsOnNode(
|
||||||
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
|
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
|
||||||
if predicate, exist := predicateFuncs[predicateKey]; exist {
|
if predicate, exist := predicateFuncs[predicateKey]; exist {
|
||||||
if eCacheAvailable {
|
if eCacheAvailable {
|
||||||
fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
|
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
|
||||||
} else {
|
} else {
|
||||||
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
|
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1405,7 +1405,8 @@ func TestCacheInvalidationRace(t *testing.T) {
|
||||||
|
|
||||||
// Set up the mock cache.
|
// Set up the mock cache.
|
||||||
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
|
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
|
||||||
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}})
|
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
|
||||||
|
cache.AddNode(testNode)
|
||||||
mockCache := &syncingMockCache{
|
mockCache := &syncingMockCache{
|
||||||
Cache: cache,
|
Cache: cache,
|
||||||
cycleStart: make(chan struct{}),
|
cycleStart: make(chan struct{}),
|
||||||
|
|
|
@ -772,6 +772,11 @@ func (c *configFactory) addNodeToCache(obj interface{}) {
|
||||||
glog.Errorf("scheduler cache AddNode failed: %v", err)
|
glog.Errorf("scheduler cache AddNode failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.enableEquivalenceClassCache {
|
||||||
|
// GetNodeCache() will lazily create NodeCache for given node if it does not exist.
|
||||||
|
c.equivalencePodCache.GetNodeCache(node.GetName())
|
||||||
|
}
|
||||||
|
|
||||||
c.podQueue.MoveAllToActiveQueue()
|
c.podQueue.MoveAllToActiveQueue()
|
||||||
// NOTE: add a new node does not affect existing predicates in equivalence cache
|
// NOTE: add a new node does not affect existing predicates in equivalence cache
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue