mirror of https://github.com/k3s-io/k3s
Merge pull request #63942 from misterikkit/ecache-cleanup
Automatic merge from submit-queue (batch tested with PRs 64142, 64426, 62910, 63942, 64548). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. scheduler: further cleanup of equivalence cache **What this PR does / why we need it**: This improves comments and simplifies some names/logic in equivalence_cache.go, as well as changing the order of some items in the file. **Special notes for your reviewer**: **Release note**: ```release-note NONE ``` /kind cleanuppull/8/head
commit
dd040d6010
|
@ -31,22 +31,30 @@ import (
|
|||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// EquivalenceCache holds:
|
||||
// 1. a map of AlgorithmCache with node name as key
|
||||
// 2. function to get equivalence pod
|
||||
// EquivalenceCache saves and reuses the output of predicate functions. Use
|
||||
// RunPredicate to get or update the cached results. An appropriate Invalidate*
|
||||
// function should be called when some predicate results are no longer valid.
|
||||
//
|
||||
// Internally, results are keyed by node name, predicate name, and "equivalence
|
||||
// class". (Equivalence class is defined in the `EquivalenceClassInfo` type.)
|
||||
// Saved results will be reused until an appropriate invalidation function is
|
||||
// called.
|
||||
type EquivalenceCache struct {
|
||||
mu sync.RWMutex
|
||||
algorithmCache map[string]AlgorithmCache
|
||||
cache nodeMap
|
||||
}
|
||||
|
||||
// The AlgorithmCache stores PredicateMap with predicate name as key, PredicateMap as value.
|
||||
type AlgorithmCache map[string]PredicateMap
|
||||
// nodeMap stores PredicateCaches with node name as the key.
|
||||
type nodeMap map[string]predicateMap
|
||||
|
||||
// PredicateMap stores HostPrediacte with equivalence hash as key
|
||||
type PredicateMap map[uint64]HostPredicate
|
||||
// predicateMap stores resultMaps with predicate name as the key.
|
||||
type predicateMap map[string]resultMap
|
||||
|
||||
// HostPredicate is the cached predicate result
|
||||
type HostPredicate struct {
|
||||
// resultMap stores PredicateResult with pod equivalence hash as the key.
|
||||
type resultMap map[uint64]predicateResult
|
||||
|
||||
// predicateResult stores the output of a FitPredicate.
|
||||
type predicateResult struct {
|
||||
Fit bool
|
||||
FailReasons []algorithm.PredicateFailureReason
|
||||
}
|
||||
|
@ -55,12 +63,12 @@ type HostPredicate struct {
|
|||
// result from previous scheduling.
|
||||
func NewEquivalenceCache() *EquivalenceCache {
|
||||
return &EquivalenceCache{
|
||||
algorithmCache: make(map[string]AlgorithmCache),
|
||||
cache: make(nodeMap),
|
||||
}
|
||||
}
|
||||
|
||||
// RunPredicate will return a cached predicate result. In case of a cache miss, the predicate will
|
||||
// be run and its results cached for the next call.
|
||||
// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be
|
||||
// run and its results cached for the next call.
|
||||
//
|
||||
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
|
||||
func (ec *EquivalenceCache) RunPredicate(
|
||||
|
@ -69,7 +77,7 @@ func (ec *EquivalenceCache) RunPredicate(
|
|||
pod *v1.Pod,
|
||||
meta algorithm.PredicateMetadata,
|
||||
nodeInfo *schedulercache.NodeInfo,
|
||||
equivClassInfo *equivalenceClassInfo,
|
||||
equivClassInfo *EquivalenceClassInfo,
|
||||
cache schedulercache.Cache,
|
||||
) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
if nodeInfo == nil || nodeInfo.Node() == nil {
|
||||
|
@ -77,9 +85,9 @@ func (ec *EquivalenceCache) RunPredicate(
|
|||
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
|
||||
}
|
||||
|
||||
fit, reasons, invalid := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash)
|
||||
if !invalid {
|
||||
return fit, reasons, nil
|
||||
result, ok := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash)
|
||||
if ok {
|
||||
return result.Fit, result.FailReasons, nil
|
||||
}
|
||||
fit, reasons, err := pred(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
|
@ -111,90 +119,84 @@ func (ec *EquivalenceCache) updateResult(
|
|||
return
|
||||
}
|
||||
nodeName := nodeInfo.Node().GetName()
|
||||
if _, exist := ec.algorithmCache[nodeName]; !exist {
|
||||
ec.algorithmCache[nodeName] = AlgorithmCache{}
|
||||
if _, exist := ec.cache[nodeName]; !exist {
|
||||
ec.cache[nodeName] = make(predicateMap)
|
||||
}
|
||||
predicateItem := HostPredicate{
|
||||
predicateItem := predicateResult{
|
||||
Fit: fit,
|
||||
FailReasons: reasons,
|
||||
}
|
||||
// if cached predicate map already exists, just update the predicate by key
|
||||
if predicateMap, ok := ec.algorithmCache[nodeName][predicateKey]; ok {
|
||||
if predicates, ok := ec.cache[nodeName][predicateKey]; ok {
|
||||
// maps in golang are references, no need to add them back
|
||||
predicateMap[equivalenceHash] = predicateItem
|
||||
predicates[equivalenceHash] = predicateItem
|
||||
} else {
|
||||
ec.algorithmCache[nodeName][predicateKey] =
|
||||
PredicateMap{
|
||||
ec.cache[nodeName][predicateKey] =
|
||||
resultMap{
|
||||
equivalenceHash: predicateItem,
|
||||
}
|
||||
}
|
||||
glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem)
|
||||
}
|
||||
|
||||
// lookupResult returns cached predicate results:
|
||||
// 1. if pod fit
|
||||
// 2. reasons if pod did not fit
|
||||
// 3. if cache item is not found
|
||||
// lookupResult returns cached predicate results and a bool saying whether a
|
||||
// cache entry was found.
|
||||
func (ec *EquivalenceCache) lookupResult(
|
||||
podName, nodeName, predicateKey string,
|
||||
equivalenceHash uint64,
|
||||
) (bool, []algorithm.PredicateFailureReason, bool) {
|
||||
) (value predicateResult, ok bool) {
|
||||
ec.mu.RLock()
|
||||
defer ec.mu.RUnlock()
|
||||
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache",
|
||||
predicateKey, podName, nodeName)
|
||||
if hostPredicate, exist := ec.algorithmCache[nodeName][predicateKey][equivalenceHash]; exist {
|
||||
if hostPredicate.Fit {
|
||||
return true, []algorithm.PredicateFailureReason{}, false
|
||||
}
|
||||
return false, hostPredicate.FailReasons, false
|
||||
}
|
||||
// is invalid
|
||||
return false, []algorithm.PredicateFailureReason{}, true
|
||||
value, ok = ec.cache[nodeName][predicateKey][equivalenceHash]
|
||||
return value, ok
|
||||
}
|
||||
|
||||
// InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid
|
||||
func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) {
|
||||
// InvalidatePredicates clears all cached results for the given predicates.
|
||||
func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) {
|
||||
if len(predicateKeys) == 0 {
|
||||
return
|
||||
}
|
||||
ec.mu.Lock()
|
||||
defer ec.mu.Unlock()
|
||||
// ec.cache uses nodeName as key, so we just iterate it and invalid given predicates
|
||||
for _, predicates := range ec.cache {
|
||||
for predicateKey := range predicateKeys {
|
||||
delete(ec.algorithmCache[nodeName], predicateKey)
|
||||
}
|
||||
glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName)
|
||||
}
|
||||
|
||||
// InvalidateCachedPredicateItemOfAllNodes marks all items of given predicateKeys, of all pods, on all node as invalid
|
||||
func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) {
|
||||
if len(predicateKeys) == 0 {
|
||||
return
|
||||
}
|
||||
ec.mu.Lock()
|
||||
defer ec.mu.Unlock()
|
||||
// algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates
|
||||
for _, algorithmCache := range ec.algorithmCache {
|
||||
for predicateKey := range predicateKeys {
|
||||
delete(algorithmCache, predicateKey)
|
||||
delete(predicates, predicateKey)
|
||||
}
|
||||
}
|
||||
glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys)
|
||||
}
|
||||
|
||||
// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid
|
||||
func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) {
|
||||
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
|
||||
func (ec *EquivalenceCache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
|
||||
if len(predicateKeys) == 0 {
|
||||
return
|
||||
}
|
||||
ec.mu.Lock()
|
||||
defer ec.mu.Unlock()
|
||||
delete(ec.algorithmCache, nodeName)
|
||||
for predicateKey := range predicateKeys {
|
||||
delete(ec.cache[nodeName], predicateKey)
|
||||
}
|
||||
glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName)
|
||||
}
|
||||
|
||||
// InvalidateAllPredicatesOnNode clears all cached results for one node.
|
||||
func (ec *EquivalenceCache) InvalidateAllPredicatesOnNode(nodeName string) {
|
||||
ec.mu.Lock()
|
||||
defer ec.mu.Unlock()
|
||||
delete(ec.cache, nodeName)
|
||||
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
|
||||
}
|
||||
|
||||
// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case
|
||||
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
|
||||
// InvalidateCachedPredicateItem for pod add case
|
||||
// TODO: This logic does not belong with the equivalence cache implementation.
|
||||
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
|
||||
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
|
||||
// will not break the existing inter pod affinity. So we does not need to invalidate
|
||||
// MatchInterPodAffinity when pod added.
|
||||
// will not break the existing inter pod affinity. So we does not need to
|
||||
// invalidate MatchInterPodAffinity when pod added.
|
||||
//
|
||||
// But when a pod is deleted, existing inter pod affinity may become invalid.
|
||||
// (e.g. this pod was preferred by some else, or vice versa)
|
||||
|
@ -224,25 +226,26 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod,
|
|||
}
|
||||
}
|
||||
}
|
||||
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
|
||||
ec.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
|
||||
}
|
||||
|
||||
// equivalenceClassInfo holds equivalence hash which is used for checking equivalence cache.
|
||||
// We will pass this to podFitsOnNode to ensure equivalence hash is only calculated per schedule.
|
||||
type equivalenceClassInfo struct {
|
||||
// EquivalenceClassInfo holds equivalence hash which is used for checking
|
||||
// equivalence cache. We will pass this to podFitsOnNode to ensure equivalence
|
||||
// hash is only calculated per schedule.
|
||||
type EquivalenceClassInfo struct {
|
||||
// Equivalence hash.
|
||||
hash uint64
|
||||
}
|
||||
|
||||
// getEquivalenceClassInfo returns a hash of the given pod.
|
||||
// The hashing function returns the same value for any two pods that are
|
||||
// equivalent from the perspective of scheduling.
|
||||
func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo {
|
||||
equivalencePod := getEquivalenceHash(pod)
|
||||
// GetEquivalenceClassInfo returns a hash of the given pod. The hashing function
|
||||
// returns the same value for any two pods that are equivalent from the
|
||||
// perspective of scheduling.
|
||||
func (ec *EquivalenceCache) GetEquivalenceClassInfo(pod *v1.Pod) *EquivalenceClassInfo {
|
||||
equivalencePod := getEquivalencePod(pod)
|
||||
if equivalencePod != nil {
|
||||
hash := fnv.New32a()
|
||||
hashutil.DeepHashObject(hash, equivalencePod)
|
||||
return &equivalenceClassInfo{
|
||||
return &EquivalenceClassInfo{
|
||||
hash: uint64(hash.Sum32()),
|
||||
}
|
||||
}
|
||||
|
@ -254,9 +257,9 @@ func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceCla
|
|||
// include any Pod field which is used by a FitPredicate.
|
||||
//
|
||||
// NOTE: For equivalence hash to be formally correct, lists and maps in the
|
||||
// equivalencePod should be normalized. (e.g. by sorting them) However, the
|
||||
// vast majority of equivalent pod classes are expected to be created from a
|
||||
// single pod template, so they will all have the same ordering.
|
||||
// equivalencePod should be normalized. (e.g. by sorting them) However, the vast
|
||||
// majority of equivalent pod classes are expected to be created from a single
|
||||
// pod template, so they will all have the same ordering.
|
||||
type equivalencePod struct {
|
||||
Namespace *string
|
||||
Labels map[string]string
|
||||
|
@ -269,8 +272,9 @@ type equivalencePod struct {
|
|||
Volumes []v1.Volume // See note about ordering
|
||||
}
|
||||
|
||||
// getEquivalenceHash returns the equivalencePod for a Pod.
|
||||
func getEquivalenceHash(pod *v1.Pod) *equivalencePod {
|
||||
// getEquivalencePod returns a normalized representation of a pod so that two
|
||||
// "equivalent" pods will hash to the same value.
|
||||
func getEquivalencePod(pod *v1.Pod) *equivalencePod {
|
||||
ep := &equivalencePod{
|
||||
Namespace: &pod.Namespace,
|
||||
Labels: pod.Labels,
|
||||
|
|
|
@ -251,7 +251,7 @@ func TestRunPredicate(t *testing.T) {
|
|||
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
|
||||
|
||||
ecache := NewEquivalenceCache()
|
||||
equivClass := ecache.getEquivalenceClassInfo(pod)
|
||||
equivClass := ecache.GetEquivalenceClassInfo(pod)
|
||||
if test.expectCacheHit {
|
||||
ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
|
||||
}
|
||||
|
@ -287,14 +287,14 @@ func TestRunPredicate(t *testing.T) {
|
|||
if !test.expectCacheHit && test.pred.callCount == 0 {
|
||||
t.Errorf("Predicate should be called")
|
||||
}
|
||||
_, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
|
||||
if invalid && test.expectCacheWrite {
|
||||
_, ok := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
|
||||
if !ok && test.expectCacheWrite {
|
||||
t.Errorf("Cache write should happen")
|
||||
}
|
||||
if !test.expectCacheHit && test.expectCacheWrite && invalid {
|
||||
if !test.expectCacheHit && test.expectCacheWrite && !ok {
|
||||
t.Errorf("Cache write should happen")
|
||||
}
|
||||
if !test.expectCacheHit && !test.expectCacheWrite && !invalid {
|
||||
if !test.expectCacheHit && !test.expectCacheWrite && ok {
|
||||
t.Errorf("Cache write should not happen")
|
||||
}
|
||||
})
|
||||
|
@ -311,7 +311,7 @@ func TestUpdateResult(t *testing.T) {
|
|||
reasons []algorithm.PredicateFailureReason
|
||||
equivalenceHash uint64
|
||||
expectPredicateMap bool
|
||||
expectCacheItem HostPredicate
|
||||
expectCacheItem predicateResult
|
||||
cache schedulercache.Cache
|
||||
}{
|
||||
{
|
||||
|
@ -322,7 +322,7 @@ func TestUpdateResult(t *testing.T) {
|
|||
fit: true,
|
||||
equivalenceHash: 123,
|
||||
expectPredicateMap: false,
|
||||
expectCacheItem: HostPredicate{
|
||||
expectCacheItem: predicateResult{
|
||||
Fit: true,
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
|
@ -335,7 +335,7 @@ func TestUpdateResult(t *testing.T) {
|
|||
fit: false,
|
||||
equivalenceHash: 123,
|
||||
expectPredicateMap: true,
|
||||
expectCacheItem: HostPredicate{
|
||||
expectCacheItem: predicateResult{
|
||||
Fit: false,
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
|
@ -344,12 +344,12 @@ func TestUpdateResult(t *testing.T) {
|
|||
for _, test := range tests {
|
||||
ecache := NewEquivalenceCache()
|
||||
if test.expectPredicateMap {
|
||||
ecache.algorithmCache[test.nodeName] = AlgorithmCache{}
|
||||
predicateItem := HostPredicate{
|
||||
ecache.cache[test.nodeName] = make(predicateMap)
|
||||
predicateItem := predicateResult{
|
||||
Fit: true,
|
||||
}
|
||||
ecache.algorithmCache[test.nodeName][test.predicateKey] =
|
||||
PredicateMap{
|
||||
ecache.cache[test.nodeName][test.predicateKey] =
|
||||
resultMap{
|
||||
test.equivalenceHash: predicateItem,
|
||||
}
|
||||
}
|
||||
|
@ -366,7 +366,7 @@ func TestUpdateResult(t *testing.T) {
|
|||
node,
|
||||
)
|
||||
|
||||
cachedMapItem, ok := ecache.algorithmCache[test.nodeName][test.predicateKey]
|
||||
cachedMapItem, ok := ecache.cache[test.nodeName][test.predicateKey]
|
||||
if !ok {
|
||||
t.Errorf("Failed: %s, can't find expected cache item: %v",
|
||||
test.name, test.expectCacheItem)
|
||||
|
@ -396,8 +396,8 @@ func TestLookupResult(t *testing.T) {
|
|||
equivalenceHashForUpdatePredicate uint64
|
||||
equivalenceHashForCalPredicate uint64
|
||||
cachedItem predicateItemType
|
||||
expectedInvalidPredicateKey bool
|
||||
expectedInvalidEquivalenceHash bool
|
||||
expectedPredicateKeyMiss bool
|
||||
expectedEquivalenceHashMiss bool
|
||||
expectedPredicateItem predicateItemType
|
||||
cache schedulercache.Cache
|
||||
}{
|
||||
|
@ -412,7 +412,7 @@ func TestLookupResult(t *testing.T) {
|
|||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
},
|
||||
expectedInvalidPredicateKey: true,
|
||||
expectedPredicateKeyMiss: true,
|
||||
expectedPredicateItem: predicateItemType{
|
||||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{},
|
||||
|
@ -429,7 +429,7 @@ func TestLookupResult(t *testing.T) {
|
|||
cachedItem: predicateItemType{
|
||||
fit: true,
|
||||
},
|
||||
expectedInvalidPredicateKey: false,
|
||||
expectedPredicateKeyMiss: false,
|
||||
expectedPredicateItem: predicateItemType{
|
||||
fit: true,
|
||||
reasons: []algorithm.PredicateFailureReason{},
|
||||
|
@ -447,7 +447,7 @@ func TestLookupResult(t *testing.T) {
|
|||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
},
|
||||
expectedInvalidPredicateKey: false,
|
||||
expectedPredicateKeyMiss: false,
|
||||
expectedPredicateItem: predicateItemType{
|
||||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
|
@ -465,8 +465,8 @@ func TestLookupResult(t *testing.T) {
|
|||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
},
|
||||
expectedInvalidPredicateKey: false,
|
||||
expectedInvalidEquivalenceHash: true,
|
||||
expectedPredicateKeyMiss: false,
|
||||
expectedEquivalenceHashMiss: true,
|
||||
expectedPredicateItem: predicateItemType{
|
||||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{},
|
||||
|
@ -490,27 +490,32 @@ func TestLookupResult(t *testing.T) {
|
|||
node,
|
||||
)
|
||||
// if we want to do invalid, invalid the cached item
|
||||
if test.expectedInvalidPredicateKey {
|
||||
if test.expectedPredicateKeyMiss {
|
||||
predicateKeys := sets.NewString()
|
||||
predicateKeys.Insert(test.predicateKey)
|
||||
ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys)
|
||||
ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys)
|
||||
}
|
||||
// calculate predicate with equivalence cache
|
||||
fit, reasons, invalid := ecache.lookupResult(test.podName,
|
||||
result, ok := ecache.lookupResult(test.podName,
|
||||
test.nodeName,
|
||||
test.predicateKey,
|
||||
test.equivalenceHashForCalPredicate,
|
||||
)
|
||||
// returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash
|
||||
fit, reasons := result.Fit, result.FailReasons
|
||||
// returned invalid should match expectedPredicateKeyMiss or expectedEquivalenceHashMiss
|
||||
if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate {
|
||||
if invalid != test.expectedInvalidEquivalenceHash {
|
||||
t.Errorf("Failed: %s, expected invalid: %v, but got: %v",
|
||||
test.name, test.expectedInvalidEquivalenceHash, invalid)
|
||||
if ok && test.expectedEquivalenceHashMiss {
|
||||
t.Errorf("Failed: %s, expected (equivalence hash) cache miss", test.name)
|
||||
}
|
||||
if !ok && !test.expectedEquivalenceHashMiss {
|
||||
t.Errorf("Failed: %s, expected (equivalence hash) cache hit", test.name)
|
||||
}
|
||||
} else {
|
||||
if invalid != test.expectedInvalidPredicateKey {
|
||||
t.Errorf("Failed: %s, expected invalid: %v, but got: %v",
|
||||
test.name, test.expectedInvalidPredicateKey, invalid)
|
||||
if ok && test.expectedPredicateKeyMiss {
|
||||
t.Errorf("Failed: %s, expected (predicate key) cache miss", test.name)
|
||||
}
|
||||
if !ok && !test.expectedPredicateKeyMiss {
|
||||
t.Errorf("Failed: %s, expected (predicate key) cache hit", test.name)
|
||||
}
|
||||
}
|
||||
// returned predicate result should match expected predicate item
|
||||
|
@ -618,7 +623,7 @@ func TestGetEquivalenceHash(t *testing.T) {
|
|||
t.Run(test.name, func(t *testing.T) {
|
||||
for i, podInfo := range test.podInfoList {
|
||||
testPod := podInfo.pod
|
||||
eclassInfo := ecache.getEquivalenceClassInfo(testPod)
|
||||
eclassInfo := ecache.GetEquivalenceClassInfo(testPod)
|
||||
if eclassInfo == nil && podInfo.hashIsValid {
|
||||
t.Errorf("Failed: pod %v is expected to have valid hash", testPod)
|
||||
}
|
||||
|
@ -704,11 +709,11 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
}
|
||||
|
||||
// invalidate cached predicate for all nodes
|
||||
ecache.InvalidateCachedPredicateItemOfAllNodes(sets.NewString(testPredicate))
|
||||
ecache.InvalidatePredicates(sets.NewString(testPredicate))
|
||||
|
||||
// there should be no cached predicate any more
|
||||
for _, test := range tests {
|
||||
if algorithmCache, exist := ecache.algorithmCache[test.nodeName]; exist {
|
||||
if algorithmCache, exist := ecache.cache[test.nodeName]; exist {
|
||||
if _, exist := algorithmCache[testPredicate]; exist {
|
||||
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
|
||||
testPredicate, test.nodeName)
|
||||
|
@ -777,8 +782,8 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
// invalidate cached predicate for all nodes
|
||||
ecache.InvalidateAllCachedPredicateItemOfNode(test.nodeName)
|
||||
if _, exist := ecache.algorithmCache[test.nodeName]; exist {
|
||||
ecache.InvalidateAllPredicatesOnNode(test.nodeName)
|
||||
if _, exist := ecache.cache[test.nodeName]; exist {
|
||||
t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName)
|
||||
break
|
||||
}
|
||||
|
@ -788,7 +793,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
func BenchmarkEquivalenceHash(b *testing.B) {
|
||||
pod := makeBasicPod("test")
|
||||
for i := 0; i < b.N; i++ {
|
||||
getEquivalenceHash(pod)
|
||||
getEquivalencePod(pod)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -852,7 +857,7 @@ func TestEquivalenceCacheInvalidationRace(t *testing.T) {
|
|||
if err := cache.AddPod(pod); err != nil {
|
||||
t.Errorf("Could not add pod to cache: %v", err)
|
||||
}
|
||||
eCache.InvalidateAllCachedPredicateItemOfNode("machine1")
|
||||
eCache.InvalidateAllPredicatesOnNode("machine1")
|
||||
mockCache.cacheInvalidated <- struct{}{}
|
||||
}()
|
||||
|
||||
|
|
|
@ -342,10 +342,10 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
|||
// We can use the same metadata producer for all nodes.
|
||||
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
|
||||
|
||||
var equivCacheInfo *equivalenceClassInfo
|
||||
var equivCacheInfo *EquivalenceClassInfo
|
||||
if g.equivalenceCache != nil {
|
||||
// getEquivalenceClassInfo will return immediately if no equivalence pod found
|
||||
equivCacheInfo = g.equivalenceCache.getEquivalenceClassInfo(pod)
|
||||
equivCacheInfo = g.equivalenceCache.GetEquivalenceClassInfo(pod)
|
||||
}
|
||||
|
||||
checkNode := func(i int) {
|
||||
|
@ -462,7 +462,7 @@ func podFitsOnNode(
|
|||
ecache *EquivalenceCache,
|
||||
queue SchedulingQueue,
|
||||
alwaysCheckAllPredicates bool,
|
||||
equivCacheInfo *equivalenceClassInfo,
|
||||
equivCacheInfo *EquivalenceClassInfo,
|
||||
) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var (
|
||||
eCacheAvailable bool
|
||||
|
|
|
@ -410,7 +410,7 @@ func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.Persist
|
|||
break
|
||||
}
|
||||
}
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||
}
|
||||
|
||||
// isZoneRegionLabel check if given key of label is zone or region label.
|
||||
|
@ -468,7 +468,7 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
|
|||
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
|
||||
}
|
||||
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||
}
|
||||
|
||||
func (c *configFactory) onPvcAdd(obj interface{}) {
|
||||
|
@ -538,7 +538,7 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim
|
|||
// Add/delete impacts the available PVs to choose from
|
||||
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
|
||||
}
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||
}
|
||||
|
||||
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
|
||||
|
@ -553,12 +553,12 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent
|
|||
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
|
||||
}
|
||||
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
|
||||
}
|
||||
|
||||
func (c *configFactory) onServiceAdd(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
|
||||
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
|
||||
}
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
@ -569,7 +569,7 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{})
|
|||
oldService := oldObj.(*v1.Service)
|
||||
newService := newObj.(*v1.Service)
|
||||
if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
|
||||
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
|
||||
}
|
||||
}
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
|
@ -577,7 +577,7 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{})
|
|||
|
||||
func (c *configFactory) onServiceDelete(obj interface{}) {
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
|
||||
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
|
||||
}
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
}
|
||||
|
@ -694,13 +694,13 @@ func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, ol
|
|||
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
|
||||
// MatchInterPodAffinity need to be reconsidered for this node,
|
||||
// as well as all nodes in its same failure domain.
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
|
||||
c.equivalencePodCache.InvalidatePredicates(
|
||||
matchInterPodAffinitySet)
|
||||
}
|
||||
// if requested container resource changed, invalidate GeneralPredicates of this node
|
||||
if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
|
||||
predicates.GetResourceRequest(oldPod)) {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItem(
|
||||
c.equivalencePodCache.InvalidatePredicatesOnNode(
|
||||
newPod.Spec.NodeName, generalPredicatesSets)
|
||||
}
|
||||
}
|
||||
|
@ -741,14 +741,14 @@ func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
|
|||
// MatchInterPodAffinity need to be reconsidered for this node,
|
||||
// as well as all nodes in its same failure domain.
|
||||
// TODO(resouer) can we just do this for nodes in the same failure domain
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
|
||||
c.equivalencePodCache.InvalidatePredicates(
|
||||
matchInterPodAffinitySet)
|
||||
|
||||
// if this pod have these PV, cached result of disk conflict will become invalid.
|
||||
for _, volume := range pod.Spec.Volumes {
|
||||
if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
|
||||
volume.RBD != nil || volume.ISCSI != nil {
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItem(
|
||||
c.equivalencePodCache.InvalidatePredicatesOnNode(
|
||||
pod.Spec.NodeName, noDiskConflictSet)
|
||||
}
|
||||
}
|
||||
|
@ -858,7 +858,7 @@ func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node,
|
|||
if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable {
|
||||
invalidPredicates.Insert(predicates.CheckNodeConditionPred)
|
||||
}
|
||||
c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates)
|
||||
c.equivalencePodCache.InvalidatePredicatesOnNode(newNode.GetName(), invalidPredicates)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -885,7 +885,7 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
|
|||
glog.Errorf("scheduler cache RemoveNode failed: %v", err)
|
||||
}
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(node.GetName())
|
||||
c.equivalencePodCache.InvalidateAllPredicatesOnNode(node.GetName())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1315,7 +1315,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
|
|||
c.schedulerCache.RemoveNode(&node)
|
||||
// invalidate cached predicate for the node
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(nodeName)
|
||||
c.equivalencePodCache.InvalidateAllPredicatesOnNode(nodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -284,7 +284,7 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error
|
|||
if bindingRequired {
|
||||
if sched.config.Ecache != nil {
|
||||
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
|
||||
sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
|
||||
sched.config.Ecache.InvalidatePredicates(invalidPredicates)
|
||||
}
|
||||
|
||||
// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue
|
||||
|
|
Loading…
Reference in New Issue