mirror of https://github.com/k3s-io/k3s
Merge pull request #66862 from resouer/sync-map
Automatic merge from submit-queue (batch tested with PRs 66862, 67618). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Use sync.map to scale equiv class cache better **What this PR does / why we need it**: Change the current lock in first level ecache into `sync.Map`, which is known for scaling better than `sync. Mutex ` on machines with >8 CPUs ref: https://golang.org/pkg/sync/#Map And the code is much cleaner in this way. 5k Nodes, 10k Pods benchmark with ecache enabled in 64 cores VM: ```bash // before BenchmarkScheduling/5000Nodes/0Pods-64 10000 17550089 ns/op // after BenchmarkScheduling/5000Nodes/0Pods-64 10000 16975098 ns/op ``` Comparing to current implementation, the improvement after this change is noticeable, and the test is stable in 8, 16, 64 cores VM. **Special notes for your reviewer**: **Release note**: ```release-note Use sync.map to scale ecache better ```pull/8/head
commit
4cca6a89a0
|
@ -35,9 +35,6 @@ import (
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nodeMap stores a *Cache for each node.
|
|
||||||
type nodeMap map[string]*NodeCache
|
|
||||||
|
|
||||||
// Cache is a thread safe map saves and reuses the output of predicate functions,
|
// Cache is a thread safe map saves and reuses the output of predicate functions,
|
||||||
// it uses node name as key to access those cached results.
|
// it uses node name as key to access those cached results.
|
||||||
//
|
//
|
||||||
|
@ -45,17 +42,13 @@ type nodeMap map[string]*NodeCache
|
||||||
// class". (Equivalence class is defined in the `Class` type.) Saved results
|
// class". (Equivalence class is defined in the `Class` type.) Saved results
|
||||||
// will be reused until an appropriate invalidation function is called.
|
// will be reused until an appropriate invalidation function is called.
|
||||||
type Cache struct {
|
type Cache struct {
|
||||||
// NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while
|
// i.e. map[string]*NodeCache
|
||||||
// the reality is lock contention in first level cache is rare.
|
sync.Map
|
||||||
mu sync.RWMutex
|
|
||||||
nodeToCache nodeMap
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCache create an empty equiv class cache.
|
// NewCache create an empty equiv class cache.
|
||||||
func NewCache() *Cache {
|
func NewCache() *Cache {
|
||||||
return &Cache{
|
return new(Cache)
|
||||||
nodeToCache: make(nodeMap),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to
|
// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to
|
||||||
|
@ -83,12 +76,8 @@ func newNodeCache() *NodeCache {
|
||||||
// it creates the NodeCache and returns it.
|
// it creates the NodeCache and returns it.
|
||||||
// The boolean flag is true if the value was loaded, false if created.
|
// The boolean flag is true if the value was loaded, false if created.
|
||||||
func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
|
func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
|
||||||
c.mu.Lock()
|
v, exists := c.LoadOrStore(name, newNodeCache())
|
||||||
defer c.mu.Unlock()
|
nodeCache = v.(*NodeCache)
|
||||||
if nodeCache, exists = c.nodeToCache[name]; !exists {
|
|
||||||
nodeCache = newNodeCache()
|
|
||||||
c.nodeToCache[name] = nodeCache
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,12 +86,13 @@ func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
|
||||||
if len(predicateKeys) == 0 {
|
if len(predicateKeys) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.mu.RLock()
|
c.Range(func(k, v interface{}) bool {
|
||||||
defer c.mu.RUnlock()
|
n := v.(*NodeCache)
|
||||||
for _, n := range c.nodeToCache {
|
|
||||||
n.invalidatePreds(predicateKeys)
|
n.invalidatePreds(predicateKeys)
|
||||||
}
|
return true
|
||||||
|
})
|
||||||
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
|
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
|
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
|
||||||
|
@ -110,9 +100,8 @@ func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.S
|
||||||
if len(predicateKeys) == 0 {
|
if len(predicateKeys) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.mu.RLock()
|
if v, ok := c.Load(nodeName); ok {
|
||||||
defer c.mu.RUnlock()
|
n := v.(*NodeCache)
|
||||||
if n, ok := c.nodeToCache[nodeName]; ok {
|
|
||||||
n.invalidatePreds(predicateKeys)
|
n.invalidatePreds(predicateKeys)
|
||||||
}
|
}
|
||||||
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
|
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
|
||||||
|
@ -120,9 +109,7 @@ func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.S
|
||||||
|
|
||||||
// InvalidateAllPredicatesOnNode clears all cached results for one node.
|
// InvalidateAllPredicatesOnNode clears all cached results for one node.
|
||||||
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
|
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
|
||||||
c.mu.Lock()
|
c.Delete(nodeName)
|
||||||
defer c.mu.Unlock()
|
|
||||||
delete(c.nodeToCache, nodeName)
|
|
||||||
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
|
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -731,7 +731,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
||||||
// there should be no cached predicate any more
|
// there should be no cached predicate any more
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist {
|
if nodeCache, exist := ecache.GetNodeCache(test.nodeName); exist {
|
||||||
if _, exist := nodeCache.cache[testPredicate]; exist {
|
if _, exist := nodeCache.cache[testPredicate]; exist {
|
||||||
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
|
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
|
||||||
testPredicate, test.nodeName)
|
testPredicate, test.nodeName)
|
||||||
|
|
Loading…
Reference in New Issue