mirror of https://github.com/k3s-io/k3s
Use seqeuence number to represent generation of equivalence cache.
- snapshot equivalence cache generation numbers before snapshotting the scheduler cache - skip update when generation does not match live generation - keep the node and increment its generation to invalidate it instead of deletion - use predicates order ID as key to improve performancepull/58/head
parent
a2cc1b1a20
commit
2f46bc8a18
|
@ -50,7 +50,6 @@ go_test(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -555,13 +555,6 @@ func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDi
|
|||
return pdbs, nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) IsUpToDate(n *NodeInfo) bool {
|
||||
cache.mu.RLock()
|
||||
defer cache.mu.RUnlock()
|
||||
node, ok := cache.nodes[n.Node().Name]
|
||||
return ok && n.generation == node.generation
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) run() {
|
||||
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
|
||||
|
@ -1336,26 +1335,3 @@ func TestPDBOperations(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsUpToDate(t *testing.T) {
|
||||
cache := New(time.Duration(0), wait.NeverStop)
|
||||
if err := cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}); err != nil {
|
||||
t.Errorf("Could not add node: %v", err)
|
||||
}
|
||||
s := cache.Snapshot()
|
||||
node := s.Nodes["n1"]
|
||||
if !cache.IsUpToDate(node) {
|
||||
t.Errorf("Node incorrectly marked as stale")
|
||||
}
|
||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", UID: "p1"}, Spec: v1.PodSpec{NodeName: "n1"}}
|
||||
if err := cache.AddPod(pod); err != nil {
|
||||
t.Errorf("Could not add pod: %v", err)
|
||||
}
|
||||
if cache.IsUpToDate(node) {
|
||||
t.Errorf("Node incorrectly marked as up to date")
|
||||
}
|
||||
badNode := &NodeInfo{node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n2"}}}
|
||||
if cache.IsUpToDate(badNode) {
|
||||
t.Errorf("Nonexistant node incorrectly marked as up to date")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,9 +123,6 @@ type Cache interface {
|
|||
// Snapshot takes a snapshot on current cache
|
||||
Snapshot() *Snapshot
|
||||
|
||||
// IsUpToDate returns true if the given NodeInfo matches the current data in the cache.
|
||||
IsUpToDate(n *NodeInfo) bool
|
||||
|
||||
// NodeTree returns a node tree structure
|
||||
NodeTree() *NodeTree
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ go_test(
|
|||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/cache:go_default_library",
|
||||
"//pkg/scheduler/testing:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
|
|
@ -35,7 +35,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// nodeMap stores a *Cache for each node.
|
||||
// nodeMap stores a *NodeCache for each node.
|
||||
type nodeMap map[string]*NodeCache
|
||||
|
||||
// Cache is a thread safe map saves and reuses the output of predicate functions,
|
||||
|
@ -47,14 +47,20 @@ type nodeMap map[string]*NodeCache
|
|||
type Cache struct {
|
||||
// NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while
|
||||
// the reality is lock contention in first level cache is rare.
|
||||
mu sync.RWMutex
|
||||
nodeToCache nodeMap
|
||||
mu sync.RWMutex
|
||||
nodeToCache nodeMap
|
||||
predicateIDMap map[string]int
|
||||
}
|
||||
|
||||
// NewCache create an empty equiv class cache.
|
||||
func NewCache() *Cache {
|
||||
func NewCache(predicates []string) *Cache {
|
||||
predicateIDMap := make(map[string]int, len(predicates))
|
||||
for id, predicate := range predicates {
|
||||
predicateIDMap[predicate] = id
|
||||
}
|
||||
return &Cache{
|
||||
nodeToCache: make(nodeMap),
|
||||
nodeToCache: make(nodeMap),
|
||||
predicateIDMap: predicateIDMap,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,15 +76,46 @@ func NewCache() *Cache {
|
|||
type NodeCache struct {
|
||||
mu sync.RWMutex
|
||||
cache predicateMap
|
||||
// generation is current generation of node cache, incremented on node
|
||||
// invalidation.
|
||||
generation uint64
|
||||
// snapshotGeneration saves snapshot of generation of node cache.
|
||||
snapshotGeneration uint64
|
||||
// predicateGenerations stores generation numbers for predicates, incremented on
|
||||
// predicate invalidation. Created on first update. Use 0 if does not
|
||||
// exist.
|
||||
predicateGenerations []uint64
|
||||
// snapshotPredicateGenerations saves snapshot of generation numbers for predicates.
|
||||
snapshotPredicateGenerations []uint64
|
||||
}
|
||||
|
||||
// newNodeCache returns an empty NodeCache.
|
||||
func newNodeCache() *NodeCache {
|
||||
func newNodeCache(n int) *NodeCache {
|
||||
return &NodeCache{
|
||||
cache: make(predicateMap),
|
||||
cache: make(predicateMap, n),
|
||||
predicateGenerations: make([]uint64, n),
|
||||
snapshotPredicateGenerations: make([]uint64, n),
|
||||
}
|
||||
}
|
||||
|
||||
// Snapshot snapshots current generations of cache.
|
||||
// NOTE: We snapshot generations of all node caches before using it and these
|
||||
// operations are serialized, we can save snapshot as member of node cache
|
||||
// itself.
|
||||
func (c *Cache) Snapshot() {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
for _, n := range c.nodeToCache {
|
||||
n.mu.Lock()
|
||||
// snapshot predicate generations
|
||||
copy(n.snapshotPredicateGenerations, n.predicateGenerations)
|
||||
// snapshot node generation
|
||||
n.snapshotGeneration = n.generation
|
||||
n.mu.Unlock()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetNodeCache returns the existing NodeCache for given node if present. Otherwise,
|
||||
// it creates the NodeCache and returns it.
|
||||
// The boolean flag is true if the value was loaded, false if created.
|
||||
|
@ -86,12 +123,32 @@ func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if nodeCache, exists = c.nodeToCache[name]; !exists {
|
||||
nodeCache = newNodeCache()
|
||||
nodeCache = newNodeCache(len(c.predicateIDMap))
|
||||
c.nodeToCache[name] = nodeCache
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// LoadNodeCache returns the existing NodeCache for given node, nil if not
|
||||
// present.
|
||||
func (c *Cache) LoadNodeCache(node string) *NodeCache {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.nodeToCache[node]
|
||||
}
|
||||
|
||||
func (c *Cache) predicateKeysToIDs(predicateKeys sets.String) []int {
|
||||
predicateIDs := make([]int, 0, len(predicateKeys))
|
||||
for predicateKey := range predicateKeys {
|
||||
if id, ok := c.predicateIDMap[predicateKey]; ok {
|
||||
predicateIDs = append(predicateIDs, id)
|
||||
} else {
|
||||
glog.Errorf("predicate key %q not found", predicateKey)
|
||||
}
|
||||
}
|
||||
return predicateIDs
|
||||
}
|
||||
|
||||
// InvalidatePredicates clears all cached results for the given predicates.
|
||||
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
|
||||
if len(predicateKeys) == 0 {
|
||||
|
@ -99,10 +156,12 @@ func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
|
|||
}
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
predicateIDs := c.predicateKeysToIDs(predicateKeys)
|
||||
for _, n := range c.nodeToCache {
|
||||
n.invalidatePreds(predicateKeys)
|
||||
n.invalidatePreds(predicateIDs)
|
||||
}
|
||||
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
|
||||
|
||||
}
|
||||
|
||||
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
|
||||
|
@ -112,17 +171,20 @@ func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.S
|
|||
}
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
predicateIDs := c.predicateKeysToIDs(predicateKeys)
|
||||
if n, ok := c.nodeToCache[nodeName]; ok {
|
||||
n.invalidatePreds(predicateKeys)
|
||||
n.invalidatePreds(predicateIDs)
|
||||
}
|
||||
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
|
||||
}
|
||||
|
||||
// InvalidateAllPredicatesOnNode clears all cached results for one node.
|
||||
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
delete(c.nodeToCache, nodeName)
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
if node, ok := c.nodeToCache[nodeName]; ok {
|
||||
node.invalidate()
|
||||
}
|
||||
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
|
||||
}
|
||||
|
||||
|
@ -191,8 +253,8 @@ func NewClass(pod *v1.Pod) *Class {
|
|||
return nil
|
||||
}
|
||||
|
||||
// predicateMap stores resultMaps with predicate name as the key.
|
||||
type predicateMap map[string]resultMap
|
||||
// predicateMap stores resultMaps with predicate ID as the key.
|
||||
type predicateMap []resultMap
|
||||
|
||||
// resultMap stores PredicateResult with pod equivalence hash as the key.
|
||||
type resultMap map[uint64]predicateResult
|
||||
|
@ -206,22 +268,22 @@ type predicateResult struct {
|
|||
// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be
|
||||
// run and its results cached for the next call.
|
||||
//
|
||||
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
|
||||
// NOTE: RunPredicate will not update the equivalence cache if generation does not match live version.
|
||||
func (n *NodeCache) RunPredicate(
|
||||
pred algorithm.FitPredicate,
|
||||
predicateKey string,
|
||||
predicateID int,
|
||||
pod *v1.Pod,
|
||||
meta algorithm.PredicateMetadata,
|
||||
nodeInfo *schedulercache.NodeInfo,
|
||||
equivClass *Class,
|
||||
cache schedulercache.Cache,
|
||||
) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
if nodeInfo == nil || nodeInfo.Node() == nil {
|
||||
// This may happen during tests.
|
||||
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
|
||||
}
|
||||
|
||||
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash)
|
||||
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, predicateID, equivClass.hash)
|
||||
if ok {
|
||||
return result.Fit, result.FailReasons, nil
|
||||
}
|
||||
|
@ -229,19 +291,17 @@ func (n *NodeCache) RunPredicate(
|
|||
if err != nil {
|
||||
return fit, reasons, err
|
||||
}
|
||||
if cache != nil {
|
||||
n.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
|
||||
}
|
||||
n.updateResult(pod.GetName(), predicateKey, predicateID, fit, reasons, equivClass.hash, nodeInfo)
|
||||
return fit, reasons, nil
|
||||
}
|
||||
|
||||
// updateResult updates the cached result of a predicate.
|
||||
func (n *NodeCache) updateResult(
|
||||
podName, predicateKey string,
|
||||
predicateID int,
|
||||
fit bool,
|
||||
reasons []algorithm.PredicateFailureReason,
|
||||
equivalenceHash uint64,
|
||||
cache schedulercache.Cache,
|
||||
nodeInfo *schedulercache.NodeInfo,
|
||||
) {
|
||||
if nodeInfo == nil || nodeInfo.Node() == nil {
|
||||
|
@ -249,11 +309,6 @@ func (n *NodeCache) updateResult(
|
|||
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc()
|
||||
return
|
||||
}
|
||||
// Skip update if NodeInfo is stale.
|
||||
if !cache.IsUpToDate(nodeInfo) {
|
||||
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc()
|
||||
return
|
||||
}
|
||||
|
||||
predicateItem := predicateResult{
|
||||
Fit: fit,
|
||||
|
@ -262,16 +317,24 @@ func (n *NodeCache) updateResult(
|
|||
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
if (n.snapshotGeneration != n.generation) || (n.snapshotPredicateGenerations[predicateID] != n.predicateGenerations[predicateID]) {
|
||||
// Generation of node or predicate has been updated since we last took
|
||||
// a snapshot, this indicates that we received a invalidation request
|
||||
// during this time. Cache may be stale, skip update.
|
||||
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc()
|
||||
return
|
||||
}
|
||||
// If cached predicate map already exists, just update the predicate by key
|
||||
if predicates, ok := n.cache[predicateKey]; ok {
|
||||
if predicates := n.cache[predicateID]; predicates != nil {
|
||||
// maps in golang are references, no need to add them back
|
||||
predicates[equivalenceHash] = predicateItem
|
||||
} else {
|
||||
n.cache[predicateKey] =
|
||||
n.cache[predicateID] =
|
||||
resultMap{
|
||||
equivalenceHash: predicateItem,
|
||||
}
|
||||
}
|
||||
n.predicateGenerations[predicateID]++
|
||||
|
||||
glog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v",
|
||||
nodeInfo.Node().Name, predicateKey, podName, predicateItem)
|
||||
|
@ -281,11 +344,12 @@ func (n *NodeCache) updateResult(
|
|||
// cache entry was found.
|
||||
func (n *NodeCache) lookupResult(
|
||||
podName, nodeName, predicateKey string,
|
||||
predicateID int,
|
||||
equivalenceHash uint64,
|
||||
) (value predicateResult, ok bool) {
|
||||
n.mu.RLock()
|
||||
defer n.mu.RUnlock()
|
||||
value, ok = n.cache[predicateKey][equivalenceHash]
|
||||
value, ok = n.cache[predicateID][equivalenceHash]
|
||||
if ok {
|
||||
metrics.EquivalenceCacheHits.Inc()
|
||||
} else {
|
||||
|
@ -294,15 +358,24 @@ func (n *NodeCache) lookupResult(
|
|||
return value, ok
|
||||
}
|
||||
|
||||
// invalidatePreds deletes cached predicates by given keys.
|
||||
func (n *NodeCache) invalidatePreds(predicateKeys sets.String) {
|
||||
// invalidatePreds deletes cached predicates by given IDs.
|
||||
func (n *NodeCache) invalidatePreds(predicateIDs []int) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
for predicateKey := range predicateKeys {
|
||||
delete(n.cache, predicateKey)
|
||||
for _, predicateID := range predicateIDs {
|
||||
n.cache[predicateID] = nil
|
||||
n.predicateGenerations[predicateID]++
|
||||
}
|
||||
}
|
||||
|
||||
// invalidate invalidates node cache.
|
||||
func (n *NodeCache) invalidate() {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
n.cache = make(predicateMap, len(n.cache))
|
||||
n.generation++
|
||||
}
|
||||
|
||||
// equivalencePod is the set of pod attributes which must match for two pods to
|
||||
// be considered equivalent for scheduling purposes. For correctness, this must
|
||||
// include any Pod field which is used by a FitPredicate.
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
)
|
||||
|
||||
// makeBasicPod returns a Pod object with many of the fields populated.
|
||||
|
@ -155,16 +154,6 @@ type predicateItemType struct {
|
|||
reasons []algorithm.PredicateFailureReason
|
||||
}
|
||||
|
||||
// upToDateCache is a fake Cache where IsUpToDate always returns true.
|
||||
type upToDateCache = schedulertesting.FakeCache
|
||||
|
||||
// staleNodeCache is a fake Cache where IsUpToDate always returns false.
|
||||
type staleNodeCache struct {
|
||||
schedulertesting.FakeCache
|
||||
}
|
||||
|
||||
func (c *staleNodeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return false }
|
||||
|
||||
// mockPredicate provides an algorithm.FitPredicate with pre-set return values.
|
||||
type mockPredicate struct {
|
||||
fit bool
|
||||
|
@ -182,7 +171,6 @@ func TestRunPredicate(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
pred mockPredicate
|
||||
cache schedulercache.Cache
|
||||
expectFit, expectCacheHit, expectCacheWrite bool
|
||||
expectedReasons []algorithm.PredicateFailureReason
|
||||
expectedError string
|
||||
|
@ -190,7 +178,6 @@ func TestRunPredicate(t *testing.T) {
|
|||
{
|
||||
name: "pod fits/cache hit",
|
||||
pred: mockPredicate{},
|
||||
cache: &upToDateCache{},
|
||||
expectFit: true,
|
||||
expectCacheHit: true,
|
||||
expectCacheWrite: false,
|
||||
|
@ -198,23 +185,13 @@ func TestRunPredicate(t *testing.T) {
|
|||
{
|
||||
name: "pod fits/cache miss",
|
||||
pred: mockPredicate{fit: true},
|
||||
cache: &upToDateCache{},
|
||||
expectFit: true,
|
||||
expectCacheHit: false,
|
||||
expectCacheWrite: true,
|
||||
},
|
||||
{
|
||||
name: "pod fits/cache miss/no write",
|
||||
pred: mockPredicate{fit: true},
|
||||
cache: &staleNodeCache{},
|
||||
expectFit: true,
|
||||
expectCacheHit: false,
|
||||
expectCacheWrite: false,
|
||||
},
|
||||
{
|
||||
name: "pod doesn't fit/cache miss",
|
||||
pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}},
|
||||
cache: &upToDateCache{},
|
||||
expectFit: false,
|
||||
expectCacheHit: false,
|
||||
expectCacheWrite: true,
|
||||
|
@ -223,7 +200,6 @@ func TestRunPredicate(t *testing.T) {
|
|||
{
|
||||
name: "pod doesn't fit/cache hit",
|
||||
pred: mockPredicate{},
|
||||
cache: &upToDateCache{},
|
||||
expectFit: false,
|
||||
expectCacheHit: true,
|
||||
expectCacheWrite: false,
|
||||
|
@ -232,7 +208,6 @@ func TestRunPredicate(t *testing.T) {
|
|||
{
|
||||
name: "predicate error",
|
||||
pred: mockPredicate{err: errors.New("This is expected")},
|
||||
cache: &upToDateCache{},
|
||||
expectFit: false,
|
||||
expectCacheHit: false,
|
||||
expectCacheWrite: false,
|
||||
|
@ -240,6 +215,8 @@ func TestRunPredicate(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
predicatesOrdering := []string{"testPredicate"}
|
||||
predicateID := 0
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
node := schedulercache.NewNodeInfo()
|
||||
|
@ -249,15 +226,16 @@ func TestRunPredicate(t *testing.T) {
|
|||
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
|
||||
|
||||
// Initialize and populate equivalence class cache.
|
||||
ecache := NewCache()
|
||||
ecache := NewCache(predicatesOrdering)
|
||||
ecache.Snapshot()
|
||||
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
|
||||
|
||||
equivClass := NewClass(pod)
|
||||
if test.expectCacheHit {
|
||||
nodeCache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
|
||||
nodeCache.updateResult(pod.Name, "testPredicate", predicateID, test.expectFit, test.expectedReasons, equivClass.hash, node)
|
||||
}
|
||||
|
||||
fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
|
||||
fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", predicateID, pod, meta, node, equivClass)
|
||||
|
||||
if err != nil {
|
||||
if err.Error() != test.expectedError {
|
||||
|
@ -288,7 +266,7 @@ func TestRunPredicate(t *testing.T) {
|
|||
if !test.expectCacheHit && test.pred.callCount == 0 {
|
||||
t.Errorf("Predicate should be called")
|
||||
}
|
||||
_, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
|
||||
_, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", predicateID, equivClass.hash)
|
||||
if !ok && test.expectCacheWrite {
|
||||
t.Errorf("Cache write should happen")
|
||||
}
|
||||
|
@ -303,22 +281,24 @@ func TestRunPredicate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUpdateResult(t *testing.T) {
|
||||
predicatesOrdering := []string{"GeneralPredicates"}
|
||||
tests := []struct {
|
||||
name string
|
||||
pod string
|
||||
predicateKey string
|
||||
predicateID int
|
||||
nodeName string
|
||||
fit bool
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
equivalenceHash uint64
|
||||
expectPredicateMap bool
|
||||
expectCacheItem predicateResult
|
||||
cache schedulercache.Cache
|
||||
}{
|
||||
{
|
||||
name: "test 1",
|
||||
pod: "testPod",
|
||||
predicateKey: "GeneralPredicates",
|
||||
predicateID: 0,
|
||||
nodeName: "node1",
|
||||
fit: true,
|
||||
equivalenceHash: 123,
|
||||
|
@ -326,12 +306,12 @@ func TestUpdateResult(t *testing.T) {
|
|||
expectCacheItem: predicateResult{
|
||||
Fit: true,
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
{
|
||||
name: "test 2",
|
||||
pod: "testPod",
|
||||
predicateKey: "GeneralPredicates",
|
||||
predicateID: 0,
|
||||
nodeName: "node2",
|
||||
fit: false,
|
||||
equivalenceHash: 123,
|
||||
|
@ -339,7 +319,6 @@ func TestUpdateResult(t *testing.T) {
|
|||
expectCacheItem: predicateResult{
|
||||
Fit: false,
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
|
@ -349,14 +328,14 @@ func TestUpdateResult(t *testing.T) {
|
|||
node.SetNode(testNode)
|
||||
|
||||
// Initialize and populate equivalence class cache.
|
||||
ecache := NewCache()
|
||||
ecache := NewCache(predicatesOrdering)
|
||||
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
|
||||
|
||||
if test.expectPredicateMap {
|
||||
predicateItem := predicateResult{
|
||||
Fit: true,
|
||||
}
|
||||
nodeCache.cache[test.predicateKey] =
|
||||
nodeCache.cache[test.predicateID] =
|
||||
resultMap{
|
||||
test.equivalenceHash: predicateItem,
|
||||
}
|
||||
|
@ -365,15 +344,15 @@ func TestUpdateResult(t *testing.T) {
|
|||
nodeCache.updateResult(
|
||||
test.pod,
|
||||
test.predicateKey,
|
||||
test.predicateID,
|
||||
test.fit,
|
||||
test.reasons,
|
||||
test.equivalenceHash,
|
||||
test.cache,
|
||||
node,
|
||||
)
|
||||
|
||||
cachedMapItem, ok := nodeCache.cache[test.predicateKey]
|
||||
if !ok {
|
||||
cachedMapItem := nodeCache.cache[test.predicateID]
|
||||
if cachedMapItem == nil {
|
||||
t.Errorf("can't find expected cache item: %v", test.expectCacheItem)
|
||||
} else {
|
||||
if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) {
|
||||
|
@ -394,18 +373,19 @@ func slicesEqual(a, b []algorithm.PredicateFailureReason) bool {
|
|||
}
|
||||
|
||||
func TestLookupResult(t *testing.T) {
|
||||
predicatesOrdering := []string{"GeneralPredicates"}
|
||||
tests := []struct {
|
||||
name string
|
||||
podName string
|
||||
nodeName string
|
||||
predicateKey string
|
||||
predicateID int
|
||||
equivalenceHashForUpdatePredicate uint64
|
||||
equivalenceHashForCalPredicate uint64
|
||||
cachedItem predicateItemType
|
||||
expectedPredicateKeyMiss bool
|
||||
expectedEquivalenceHashMiss bool
|
||||
expectedPredicateItem predicateItemType
|
||||
cache schedulercache.Cache
|
||||
}{
|
||||
{
|
||||
name: "test 1",
|
||||
|
@ -414,6 +394,7 @@ func TestLookupResult(t *testing.T) {
|
|||
equivalenceHashForUpdatePredicate: 123,
|
||||
equivalenceHashForCalPredicate: 123,
|
||||
predicateKey: "GeneralPredicates",
|
||||
predicateID: 0,
|
||||
cachedItem: predicateItemType{
|
||||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
|
@ -423,7 +404,6 @@ func TestLookupResult(t *testing.T) {
|
|||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{},
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
{
|
||||
name: "test 2",
|
||||
|
@ -432,6 +412,7 @@ func TestLookupResult(t *testing.T) {
|
|||
equivalenceHashForUpdatePredicate: 123,
|
||||
equivalenceHashForCalPredicate: 123,
|
||||
predicateKey: "GeneralPredicates",
|
||||
predicateID: 0,
|
||||
cachedItem: predicateItemType{
|
||||
fit: true,
|
||||
},
|
||||
|
@ -440,7 +421,6 @@ func TestLookupResult(t *testing.T) {
|
|||
fit: true,
|
||||
reasons: []algorithm.PredicateFailureReason{},
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
{
|
||||
name: "test 3",
|
||||
|
@ -449,6 +429,7 @@ func TestLookupResult(t *testing.T) {
|
|||
equivalenceHashForUpdatePredicate: 123,
|
||||
equivalenceHashForCalPredicate: 123,
|
||||
predicateKey: "GeneralPredicates",
|
||||
predicateID: 0,
|
||||
cachedItem: predicateItemType{
|
||||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
|
@ -458,7 +439,6 @@ func TestLookupResult(t *testing.T) {
|
|||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
{
|
||||
name: "test 4",
|
||||
|
@ -467,6 +447,7 @@ func TestLookupResult(t *testing.T) {
|
|||
equivalenceHashForUpdatePredicate: 123,
|
||||
equivalenceHashForCalPredicate: 456,
|
||||
predicateKey: "GeneralPredicates",
|
||||
predicateID: 0,
|
||||
cachedItem: predicateItemType{
|
||||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
|
@ -477,7 +458,6 @@ func TestLookupResult(t *testing.T) {
|
|||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{},
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -486,7 +466,7 @@ func TestLookupResult(t *testing.T) {
|
|||
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
|
||||
|
||||
// Initialize and populate equivalence class cache.
|
||||
ecache := NewCache()
|
||||
ecache := NewCache(predicatesOrdering)
|
||||
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
|
||||
|
||||
node := schedulercache.NewNodeInfo()
|
||||
|
@ -495,10 +475,10 @@ func TestLookupResult(t *testing.T) {
|
|||
nodeCache.updateResult(
|
||||
test.podName,
|
||||
test.predicateKey,
|
||||
test.predicateID,
|
||||
test.cachedItem.fit,
|
||||
test.cachedItem.reasons,
|
||||
test.equivalenceHashForUpdatePredicate,
|
||||
test.cache,
|
||||
node,
|
||||
)
|
||||
// if we want to do invalid, invalid the cached item
|
||||
|
@ -511,6 +491,7 @@ func TestLookupResult(t *testing.T) {
|
|||
result, ok := nodeCache.lookupResult(test.podName,
|
||||
test.nodeName,
|
||||
test.predicateKey,
|
||||
test.predicateID,
|
||||
test.equivalenceHashForCalPredicate,
|
||||
)
|
||||
fit, reasons := result.Fit, result.FailReasons
|
||||
|
@ -659,6 +640,8 @@ func TestGetEquivalenceHash(t *testing.T) {
|
|||
|
||||
func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
||||
testPredicate := "GeneralPredicates"
|
||||
testPredicateID := 0
|
||||
predicatesOrdering := []string{testPredicate}
|
||||
// tests is used to initialize all nodes
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -666,7 +649,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
nodeName string
|
||||
equivalenceHashForUpdatePredicate uint64
|
||||
cachedItem predicateItemType
|
||||
cache schedulercache.Cache
|
||||
}{
|
||||
{
|
||||
name: "hash predicate 123 not fits host ports",
|
||||
|
@ -679,7 +661,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
predicates.ErrPodNotFitsHostPorts,
|
||||
},
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
{
|
||||
name: "hash predicate 456 not fits host ports",
|
||||
|
@ -692,7 +673,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
predicates.ErrPodNotFitsHostPorts,
|
||||
},
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
{
|
||||
name: "hash predicate 123 fits",
|
||||
|
@ -702,10 +682,9 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
cachedItem: predicateItemType{
|
||||
fit: true,
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
}
|
||||
ecache := NewCache()
|
||||
ecache := NewCache(predicatesOrdering)
|
||||
|
||||
for _, test := range tests {
|
||||
node := schedulercache.NewNodeInfo()
|
||||
|
@ -717,10 +696,10 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
nodeCache.updateResult(
|
||||
test.podName,
|
||||
testPredicate,
|
||||
testPredicateID,
|
||||
test.cachedItem.fit,
|
||||
test.cachedItem.reasons,
|
||||
test.equivalenceHashForUpdatePredicate,
|
||||
test.cache,
|
||||
node,
|
||||
)
|
||||
}
|
||||
|
@ -732,7 +711,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist {
|
||||
if _, exist := nodeCache.cache[testPredicate]; exist {
|
||||
if cache := nodeCache.cache[testPredicateID]; cache != nil {
|
||||
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
|
||||
testPredicate, test.nodeName)
|
||||
}
|
||||
|
@ -743,6 +722,8 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
|
||||
func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
||||
testPredicate := "GeneralPredicates"
|
||||
testPredicateID := 0
|
||||
predicatesOrdering := []string{testPredicate}
|
||||
// tests is used to initialize all nodes
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -750,7 +731,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
nodeName string
|
||||
equivalenceHashForUpdatePredicate uint64
|
||||
cachedItem predicateItemType
|
||||
cache schedulercache.Cache
|
||||
}{
|
||||
{
|
||||
name: "hash predicate 123 not fits host ports",
|
||||
|
@ -761,7 +741,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
{
|
||||
name: "hash predicate 456 not fits host ports",
|
||||
|
@ -772,7 +751,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
fit: false,
|
||||
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
{
|
||||
name: "hash predicate 123 fits host ports",
|
||||
|
@ -782,10 +760,9 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
cachedItem: predicateItemType{
|
||||
fit: true,
|
||||
},
|
||||
cache: &upToDateCache{},
|
||||
},
|
||||
}
|
||||
ecache := NewCache()
|
||||
ecache := NewCache(predicatesOrdering)
|
||||
|
||||
for _, test := range tests {
|
||||
node := schedulercache.NewNodeInfo()
|
||||
|
@ -797,19 +774,21 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
nodeCache.updateResult(
|
||||
test.podName,
|
||||
testPredicate,
|
||||
testPredicateID,
|
||||
test.cachedItem.fit,
|
||||
test.cachedItem.reasons,
|
||||
test.equivalenceHashForUpdatePredicate,
|
||||
test.cache,
|
||||
node,
|
||||
)
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
oldNodeCache, _ := ecache.GetNodeCache(test.nodeName)
|
||||
oldGeneration := oldNodeCache.generation
|
||||
// invalidate cached predicate for all nodes
|
||||
ecache.InvalidateAllPredicatesOnNode(test.nodeName)
|
||||
if _, ok := ecache.GetNodeCache(test.nodeName); ok {
|
||||
if n, _ := ecache.GetNodeCache(test.nodeName); oldGeneration == n.generation {
|
||||
t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -111,6 +111,25 @@ type genericScheduler struct {
|
|||
percentageOfNodesToScore int32
|
||||
}
|
||||
|
||||
// snapshot snapshots equivalane cache and node infos for all fit and priority
|
||||
// functions.
|
||||
func (g *genericScheduler) snapshot() error {
|
||||
// IMPORTANT NOTE: We must snapshot equivalence cache before snapshotting
|
||||
// scheduler cache, otherwise stale data may be written into equivalence
|
||||
// cache, e.g.
|
||||
// 1. snapshot cache
|
||||
// 2. event arrives, updating cache and invalidating predicates or whole node cache
|
||||
// 3. snapshot ecache
|
||||
// 4. evaludate predicates
|
||||
// 5. stale result will be written to ecache
|
||||
if g.equivalenceCache != nil {
|
||||
g.equivalenceCache.Snapshot()
|
||||
}
|
||||
|
||||
// Used for all fit and priority funcs.
|
||||
return g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
|
||||
}
|
||||
|
||||
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
||||
// If it succeeds, it will return the name of the node.
|
||||
// If it fails, it will return a FitError error with reasons.
|
||||
|
@ -130,8 +149,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
|
|||
return "", ErrNoNodesAvailable
|
||||
}
|
||||
|
||||
// Used for all fit and priority funcs.
|
||||
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
|
||||
err = g.snapshot()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -227,7 +245,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
|
|||
if !ok || fitError == nil {
|
||||
return nil, nil, nil, nil
|
||||
}
|
||||
err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
|
||||
err := g.snapshot()
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
@ -396,14 +414,13 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
|||
var nodeCache *equivalence.NodeCache
|
||||
nodeName := g.cache.NodeTree().Next()
|
||||
if g.equivalenceCache != nil {
|
||||
nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
|
||||
nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
|
||||
}
|
||||
fits, failedPredicates, err := podFitsOnNode(
|
||||
pod,
|
||||
meta,
|
||||
g.cachedNodeInfoMap[nodeName],
|
||||
g.predicates,
|
||||
g.cache,
|
||||
nodeCache,
|
||||
g.schedulingQueue,
|
||||
g.alwaysCheckAllPredicates,
|
||||
|
@ -516,7 +533,6 @@ func podFitsOnNode(
|
|||
meta algorithm.PredicateMetadata,
|
||||
info *schedulercache.NodeInfo,
|
||||
predicateFuncs map[string]algorithm.FitPredicate,
|
||||
cache schedulercache.Cache,
|
||||
nodeCache *equivalence.NodeCache,
|
||||
queue SchedulingQueue,
|
||||
alwaysCheckAllPredicates bool,
|
||||
|
@ -558,7 +574,7 @@ func podFitsOnNode(
|
|||
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
|
||||
// when pods are nominated or their nominations change.
|
||||
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
|
||||
for _, predicateKey := range predicates.Ordering() {
|
||||
for predicateID, predicateKey := range predicates.Ordering() {
|
||||
var (
|
||||
fit bool
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
|
@ -567,7 +583,7 @@ func podFitsOnNode(
|
|||
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
|
||||
if predicate, exist := predicateFuncs[predicateKey]; exist {
|
||||
if eCacheAvailable {
|
||||
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
|
||||
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
|
||||
} else {
|
||||
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
|
||||
}
|
||||
|
@ -991,7 +1007,7 @@ func selectVictimsOnNode(
|
|||
// that we should check is if the "pod" is failing to schedule due to pod affinity
|
||||
// failure.
|
||||
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
|
||||
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil); !fits {
|
||||
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
|
||||
if err != nil {
|
||||
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
||||
}
|
||||
|
@ -1005,7 +1021,7 @@ func selectVictimsOnNode(
|
|||
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
|
||||
reprievePod := func(p *v1.Pod) bool {
|
||||
addPod(p)
|
||||
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil)
|
||||
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
|
||||
if !fits {
|
||||
removePod(p)
|
||||
victims = append(victims, p)
|
||||
|
|
|
@ -1474,7 +1474,10 @@ func TestCacheInvalidationRace(t *testing.T) {
|
|||
cacheInvalidated: make(chan struct{}),
|
||||
}
|
||||
|
||||
eCache := equivalence.NewCache()
|
||||
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
|
||||
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
|
||||
eCache := equivalence.NewCache(algorithmpredicates.Ordering())
|
||||
eCache.GetNodeCache(testNode.Name)
|
||||
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
|
||||
// the equivalence cache would be updated.
|
||||
go func() {
|
||||
|
@ -1490,8 +1493,6 @@ func TestCacheInvalidationRace(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Set up the scheduler.
|
||||
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
|
||||
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
|
||||
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
|
||||
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
|
||||
scheduler := NewGenericScheduler(
|
||||
|
@ -1522,3 +1523,84 @@ func TestCacheInvalidationRace(t *testing.T) {
|
|||
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCacheInvalidationRace2 tests that cache invalidation is correctly handled
|
||||
// when an invalidation event happens while a predicate is running.
|
||||
func TestCacheInvalidationRace2(t *testing.T) {
|
||||
// Create a predicate that returns false the first time and true on subsequent calls.
|
||||
var (
|
||||
podWillFit = false
|
||||
callCount int
|
||||
cycleStart = make(chan struct{})
|
||||
cacheInvalidated = make(chan struct{})
|
||||
once sync.Once
|
||||
)
|
||||
testPredicate := func(pod *v1.Pod,
|
||||
meta algorithm.PredicateMetadata,
|
||||
nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
callCount++
|
||||
once.Do(func() {
|
||||
cycleStart <- struct{}{}
|
||||
<-cacheInvalidated
|
||||
})
|
||||
if !podWillFit {
|
||||
podWillFit = true
|
||||
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
|
||||
}
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// Set up the mock cache.
|
||||
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
|
||||
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
|
||||
cache.AddNode(testNode)
|
||||
|
||||
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
|
||||
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
|
||||
eCache := equivalence.NewCache(algorithmpredicates.Ordering())
|
||||
eCache.GetNodeCache(testNode.Name)
|
||||
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
|
||||
// the equivalence cache would be updated.
|
||||
go func() {
|
||||
<-cycleStart
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
|
||||
Spec: v1.PodSpec{NodeName: "machine1"}}
|
||||
if err := cache.AddPod(pod); err != nil {
|
||||
t.Errorf("Could not add pod to cache: %v", err)
|
||||
}
|
||||
eCache.InvalidateAllPredicatesOnNode("machine1")
|
||||
cacheInvalidated <- struct{}{}
|
||||
}()
|
||||
|
||||
// Set up the scheduler.
|
||||
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
|
||||
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
|
||||
scheduler := NewGenericScheduler(
|
||||
cache,
|
||||
eCache,
|
||||
NewSchedulingQueue(),
|
||||
ps,
|
||||
algorithm.EmptyPredicateMetadataProducer,
|
||||
prioritizers,
|
||||
algorithm.EmptyPriorityMetadataProducer,
|
||||
nil, nil, pvcLister, true, false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
|
||||
// First scheduling attempt should fail.
|
||||
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
|
||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
|
||||
machine, err := scheduler.Schedule(pod, nodeLister)
|
||||
if machine != "" || err == nil {
|
||||
t.Error("First scheduling attempt did not fail")
|
||||
}
|
||||
|
||||
// Second scheduling attempt should succeed because cache was invalidated.
|
||||
_, err = scheduler.Schedule(pod, nodeLister)
|
||||
if err != nil {
|
||||
t.Errorf("Second scheduling attempt failed: %v", err)
|
||||
}
|
||||
if callCount != 2 {
|
||||
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -849,15 +849,17 @@ func (c *configFactory) addNodeToCache(obj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.AddNode(node); err != nil {
|
||||
glog.Errorf("scheduler cache AddNode failed: %v", err)
|
||||
}
|
||||
|
||||
// NOTE: Because the scheduler uses equivalence cache for nodes, we need
|
||||
// to create it before adding node into scheduler cache.
|
||||
if c.enableEquivalenceClassCache {
|
||||
// GetNodeCache() will lazily create NodeCache for given node if it does not exist.
|
||||
c.equivalencePodCache.GetNodeCache(node.GetName())
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.AddNode(node); err != nil {
|
||||
glog.Errorf("scheduler cache AddNode failed: %v", err)
|
||||
}
|
||||
|
||||
c.podQueue.MoveAllToActiveQueue()
|
||||
// NOTE: add a new node does not affect existing predicates in equivalence cache
|
||||
}
|
||||
|
@ -1166,7 +1168,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||
|
||||
// Init equivalence class cache
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache = equivalence.NewCache()
|
||||
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
|
||||
glog.Info("Created equivalence class cache")
|
||||
}
|
||||
|
||||
|
|
|
@ -106,8 +106,5 @@ func (f *FakeCache) Snapshot() *schedulercache.Snapshot {
|
|||
return &schedulercache.Snapshot{}
|
||||
}
|
||||
|
||||
// IsUpToDate is a fake method for testing
|
||||
func (f *FakeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return true }
|
||||
|
||||
// NodeTree is a fake method for testing.
|
||||
func (f *FakeCache) NodeTree() *schedulercache.NodeTree { return nil }
|
||||
|
|
Loading…
Reference in New Issue