Optimize the lock which in the RunPredicate

pull/8/head
godliness 2018-05-17 23:10:28 +08:00
parent 9c2e51f37c
commit 506271b363
2 changed files with 53 additions and 27 deletions

View File

@ -35,7 +35,7 @@ import (
// 1. a map of AlgorithmCache with node name as key // 1. a map of AlgorithmCache with node name as key
// 2. function to get equivalence pod // 2. function to get equivalence pod
type EquivalenceCache struct { type EquivalenceCache struct {
mu sync.Mutex mu sync.RWMutex
algorithmCache map[string]AlgorithmCache algorithmCache map[string]AlgorithmCache
} }
@ -72,9 +72,6 @@ func (ec *EquivalenceCache) RunPredicate(
equivClassInfo *equivalenceClassInfo, equivClassInfo *equivalenceClassInfo,
cache schedulercache.Cache, cache schedulercache.Cache,
) (bool, []algorithm.PredicateFailureReason, error) { ) (bool, []algorithm.PredicateFailureReason, error) {
ec.mu.Lock()
defer ec.mu.Unlock()
if nodeInfo == nil || nodeInfo.Node() == nil { if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests. // This may happen during tests.
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
@ -88,20 +85,32 @@ func (ec *EquivalenceCache) RunPredicate(
if err != nil { if err != nil {
return fit, reasons, err return fit, reasons, err
} }
// Skip update if NodeInfo is stale. if cache != nil {
if cache != nil && cache.IsUpToDate(nodeInfo) { ec.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClassInfo.hash, cache, nodeInfo)
ec.updateResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, fit, reasons, equivClassInfo.hash)
} }
return fit, reasons, nil return fit, reasons, nil
} }
// updateResult updates the cached result of a predicate. // updateResult updates the cached result of a predicate.
func (ec *EquivalenceCache) updateResult( func (ec *EquivalenceCache) updateResult(
podName, nodeName, predicateKey string, podName, predicateKey string,
fit bool, fit bool,
reasons []algorithm.PredicateFailureReason, reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64, equivalenceHash uint64,
cache schedulercache.Cache,
nodeInfo *schedulercache.NodeInfo,
) { ) {
ec.mu.Lock()
defer ec.mu.Unlock()
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
return
}
// Skip update if NodeInfo is stale.
if !cache.IsUpToDate(nodeInfo) {
return
}
nodeName := nodeInfo.Node().GetName()
if _, exist := ec.algorithmCache[nodeName]; !exist { if _, exist := ec.algorithmCache[nodeName]; !exist {
ec.algorithmCache[nodeName] = AlgorithmCache{} ec.algorithmCache[nodeName] = AlgorithmCache{}
} }
@ -130,6 +139,8 @@ func (ec *EquivalenceCache) lookupResult(
podName, nodeName, predicateKey string, podName, nodeName, predicateKey string,
equivalenceHash uint64, equivalenceHash uint64,
) (bool, []algorithm.PredicateFailureReason, bool) { ) (bool, []algorithm.PredicateFailureReason, bool) {
ec.mu.RLock()
defer ec.mu.RUnlock()
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache",
predicateKey, podName, nodeName) predicateKey, podName, nodeName)
if hostPredicate, exist := ec.algorithmCache[nodeName][predicateKey][equivalenceHash]; exist { if hostPredicate, exist := ec.algorithmCache[nodeName][predicateKey][equivalenceHash]; exist {

View File

@ -253,9 +253,7 @@ func TestRunPredicate(t *testing.T) {
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
equivClass := ecache.getEquivalenceClassInfo(pod) equivClass := ecache.getEquivalenceClassInfo(pod)
if test.expectCacheHit { if test.expectCacheHit {
ecache.mu.Lock() ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
ecache.updateResult(pod.Name, node.Node().Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash)
ecache.mu.Unlock()
} }
fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
@ -289,9 +287,7 @@ func TestRunPredicate(t *testing.T) {
if !test.expectCacheHit && test.pred.callCount == 0 { if !test.expectCacheHit && test.pred.callCount == 0 {
t.Errorf("Predicate should be called") t.Errorf("Predicate should be called")
} }
ecache.mu.Lock()
_, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) _, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
ecache.mu.Unlock()
if invalid && test.expectCacheWrite { if invalid && test.expectCacheWrite {
t.Errorf("Cache write should happen") t.Errorf("Cache write should happen")
} }
@ -316,6 +312,7 @@ func TestUpdateResult(t *testing.T) {
equivalenceHash uint64 equivalenceHash uint64
expectPredicateMap bool expectPredicateMap bool
expectCacheItem HostPredicate expectCacheItem HostPredicate
cache schedulercache.Cache
}{ }{
{ {
name: "test 1", name: "test 1",
@ -328,6 +325,7 @@ func TestUpdateResult(t *testing.T) {
expectCacheItem: HostPredicate{ expectCacheItem: HostPredicate{
Fit: true, Fit: true,
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 2", name: "test 2",
@ -340,6 +338,7 @@ func TestUpdateResult(t *testing.T) {
expectCacheItem: HostPredicate{ expectCacheItem: HostPredicate{
Fit: false, Fit: false,
}, },
cache: &upToDateCache{},
}, },
} }
for _, test := range tests { for _, test := range tests {
@ -354,16 +353,18 @@ func TestUpdateResult(t *testing.T) {
test.equivalenceHash: predicateItem, test.equivalenceHash: predicateItem,
} }
} }
ecache.mu.Lock()
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
ecache.updateResult( ecache.updateResult(
test.pod, test.pod,
test.nodeName,
test.predicateKey, test.predicateKey,
test.fit, test.fit,
test.reasons, test.reasons,
test.equivalenceHash, test.equivalenceHash,
test.cache,
node,
) )
ecache.mu.Unlock()
cachedMapItem, ok := ecache.algorithmCache[test.nodeName][test.predicateKey] cachedMapItem, ok := ecache.algorithmCache[test.nodeName][test.predicateKey]
if !ok { if !ok {
@ -390,6 +391,7 @@ func TestLookupResult(t *testing.T) {
expectedInvalidPredicateKey bool expectedInvalidPredicateKey bool
expectedInvalidEquivalenceHash bool expectedInvalidEquivalenceHash bool
expectedPredicateItem predicateItemType expectedPredicateItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
name: "test 1", name: "test 1",
@ -407,6 +409,7 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 2", name: "test 2",
@ -423,6 +426,7 @@ func TestLookupResult(t *testing.T) {
fit: true, fit: true,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 3", name: "test 3",
@ -440,6 +444,7 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 4", name: "test 4",
@ -458,22 +463,24 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
} }
for _, test := range tests { for _, test := range tests {
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.mu.Lock()
ecache.updateResult( ecache.updateResult(
test.podName, test.podName,
test.nodeName,
test.predicateKey, test.predicateKey,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node,
) )
ecache.mu.Unlock()
// if we want to do invalid, invalid the cached item // if we want to do invalid, invalid the cached item
if test.expectedInvalidPredicateKey { if test.expectedInvalidPredicateKey {
predicateKeys := sets.NewString() predicateKeys := sets.NewString()
@ -481,13 +488,11 @@ func TestLookupResult(t *testing.T) {
ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys)
} }
// calculate predicate with equivalence cache // calculate predicate with equivalence cache
ecache.mu.Lock()
fit, reasons, invalid := ecache.lookupResult(test.podName, fit, reasons, invalid := ecache.lookupResult(test.podName,
test.nodeName, test.nodeName,
test.predicateKey, test.predicateKey,
test.equivalenceHashForCalPredicate, test.equivalenceHashForCalPredicate,
) )
ecache.mu.Unlock()
// returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash // returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash
if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate { if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate {
if invalid != test.expectedInvalidEquivalenceHash { if invalid != test.expectedInvalidEquivalenceHash {
@ -637,6 +642,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
nodeName string nodeName string
equivalenceHashForUpdatePredicate uint64 equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType cachedItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
podName: "testPod", podName: "testPod",
@ -648,6 +654,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
predicates.ErrPodNotFitsHostPorts, predicates.ErrPodNotFitsHostPorts,
}, },
}, },
cache: &upToDateCache{},
}, },
{ {
podName: "testPod", podName: "testPod",
@ -659,6 +666,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
predicates.ErrPodNotFitsHostPorts, predicates.ErrPodNotFitsHostPorts,
}, },
}, },
cache: &upToDateCache{},
}, },
{ {
podName: "testPod", podName: "testPod",
@ -667,22 +675,24 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: true, fit: true,
}, },
cache: &upToDateCache{},
}, },
} }
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
for _, test := range tests { for _, test := range tests {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.mu.Lock()
ecache.updateResult( ecache.updateResult(
test.podName, test.podName,
test.nodeName,
testPredicate, testPredicate,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node,
) )
ecache.mu.Unlock()
} }
// invalidate cached predicate for all nodes // invalidate cached predicate for all nodes
@ -708,6 +718,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
nodeName string nodeName string
equivalenceHashForUpdatePredicate uint64 equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType cachedItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
podName: "testPod", podName: "testPod",
@ -717,6 +728,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
podName: "testPod", podName: "testPod",
@ -726,6 +738,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
podName: "testPod", podName: "testPod",
@ -734,22 +747,24 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: true, fit: true,
}, },
cache: &upToDateCache{},
}, },
} }
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
for _, test := range tests { for _, test := range tests {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.mu.Lock()
ecache.updateResult( ecache.updateResult(
test.podName, test.podName,
test.nodeName,
testPredicate, testPredicate,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node,
) )
ecache.mu.Unlock()
} }
for _, test := range tests { for _, test := range tests {