From 31c746d9609d7104995dfbec66a954c6b0262d2a Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Wed, 23 May 2018 16:06:45 -0700 Subject: [PATCH 1/4] Move equivalence cache into new package. This moves the equivalence cache implementation out of the 'core' package and into k8s.io/kubernetes/pkg/scheduler/core/equivalence. Separating the equiv. cache from the genericScheduler implementation make their interaction points easier to follow, and prevents us from accidentally accessing unexported fields. --- pkg/scheduler/BUILD | 1 + pkg/scheduler/core/BUILD | 10 +- pkg/scheduler/core/equivalence/BUILD | 47 ++++++++ .../eqivalence.go} | 2 +- .../eqivalence_test.go} | 102 +----------------- pkg/scheduler/core/generic_scheduler.go | 11 +- pkg/scheduler/core/generic_scheduler_test.go | 99 +++++++++++++++++ pkg/scheduler/factory/BUILD | 1 + pkg/scheduler/factory/factory.go | 5 +- pkg/scheduler/scheduler.go | 3 +- 10 files changed, 167 insertions(+), 114 deletions(-) create mode 100644 pkg/scheduler/core/equivalence/BUILD rename pkg/scheduler/core/{equivalence_cache.go => equivalence/eqivalence.go} (99%) rename pkg/scheduler/core/{equivalence_cache_test.go => equivalence/eqivalence_test.go} (85%) diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 1e14062c5a..33fd8ea30b 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -46,6 +46,7 @@ go_library( "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/core:go_default_library", + "//pkg/scheduler/core/equivalence:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 0e8cd4177c..6cd5b1bb10 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -9,7 +9,6 @@ load( go_test( name = "go_default_test", srcs = [ - "equivalence_cache_test.go", "extender_test.go", "generic_scheduler_test.go", "scheduling_queue_test.go", @@ -22,6 +21,7 @@ go_test( "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/cache:go_default_library", + "//pkg/scheduler/core/equivalence:go_default_library", "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/util:go_default_library", "//vendor/k8s.io/api/apps/v1beta1:go_default_library", @@ -38,7 +38,6 @@ go_test( go_library( name = "go_default_library", srcs = [ - "equivalence_cache.go", "extender.go", "generic_scheduler.go", "scheduling_queue.go", @@ -51,10 +50,10 @@ go_library( "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/cache:go_default_library", + "//pkg/scheduler/core/equivalence:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", - "//pkg/util/hash:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/policy/v1beta1:go_default_library", @@ -80,6 +79,9 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/scheduler/core/equivalence:all-srcs", + ], tags = ["automanaged"], ) diff --git a/pkg/scheduler/core/equivalence/BUILD b/pkg/scheduler/core/equivalence/BUILD new file mode 100644 index 0000000000..c46e344c25 --- /dev/null +++ b/pkg/scheduler/core/equivalence/BUILD @@ -0,0 +1,47 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["eqivalence.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/core/equivalence", + visibility = ["//visibility:public"], + deps = [ + "//pkg/scheduler/algorithm:go_default_library", + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/cache:go_default_library", + "//pkg/util/hash:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["eqivalence_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/algorithm:go_default_library", + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/cache:go_default_library", + "//pkg/scheduler/testing:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence/eqivalence.go similarity index 99% rename from pkg/scheduler/core/equivalence_cache.go rename to pkg/scheduler/core/equivalence/eqivalence.go index c845ddbdde..eee9324ceb 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence/eqivalence.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package core +package equivalence import ( "fmt" diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence/eqivalence_test.go similarity index 85% rename from pkg/scheduler/core/equivalence_cache_test.go rename to pkg/scheduler/core/equivalence/eqivalence_test.go index 7df8097a17..c9353d3ac9 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence/eqivalence_test.go @@ -14,20 +14,17 @@ See the License for the specific language governing permissions and limitations under the License. */ -package core +package equivalence import ( "errors" "reflect" - "sync" "testing" - "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" @@ -796,100 +793,3 @@ func BenchmarkEquivalenceHash(b *testing.B) { getEquivalencePod(pod) } } - -// syncingMockCache delegates method calls to an actual Cache, -// but calls to UpdateNodeNameToInfoMap synchronize with the test. -type syncingMockCache struct { - schedulercache.Cache - cycleStart, cacheInvalidated chan struct{} - once sync.Once -} - -// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it -// synchronizes with the test. -// -// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use -// this point to signal to the test that a scheduling cycle has started. -func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { - err := c.Cache.UpdateNodeNameToInfoMap(infoMap) - c.once.Do(func() { - c.cycleStart <- struct{}{} - <-c.cacheInvalidated - }) - return err -} - -// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly -// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event -// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired. -func TestEquivalenceCacheInvalidationRace(t *testing.T) { - // Create a predicate that returns false the first time and true on subsequent calls. - podWillFit := false - var callCount int - testPredicate := func(pod *v1.Pod, - meta algorithm.PredicateMetadata, - nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { - callCount++ - if !podWillFit { - podWillFit = true - return false, []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, nil - } - return true, nil, nil - } - - // Set up the mock cache. - cache := schedulercache.New(time.Duration(0), wait.NeverStop) - cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}) - mockCache := &syncingMockCache{ - Cache: cache, - cycleStart: make(chan struct{}), - cacheInvalidated: make(chan struct{}), - } - - eCache := NewEquivalenceCache() - // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before - // the equivalence cache would be updated. - go func() { - <-mockCache.cycleStart - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"}, - Spec: v1.PodSpec{NodeName: "machine1"}} - if err := cache.AddPod(pod); err != nil { - t.Errorf("Could not add pod to cache: %v", err) - } - eCache.InvalidateAllPredicatesOnNode("machine1") - mockCache.cacheInvalidated <- struct{}{} - }() - - // Set up the scheduler. - ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} - predicates.SetPredicatesOrdering([]string{"testPredicate"}) - prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} - pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) - scheduler := NewGenericScheduler( - mockCache, - eCache, - NewSchedulingQueue(), - ps, - algorithm.EmptyPredicateMetadataProducer, - prioritizers, - algorithm.EmptyPriorityMetadataProducer, - nil, nil, pvcLister, true, false) - - // First scheduling attempt should fail. - nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"})) - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}} - machine, err := scheduler.Schedule(pod, nodeLister) - if machine != "" || err == nil { - t.Error("First scheduling attempt did not fail") - } - - // Second scheduling attempt should succeed because cache was invalidated. - _, err = scheduler.Schedule(pod, nodeLister) - if err != nil { - t.Errorf("Second scheduling attempt failed: %v", err) - } - if callCount != 2 { - t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) - } -} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 54d340a8f5..35bf8e212a 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + "k8s.io/kubernetes/pkg/scheduler/core/equivalence" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" @@ -85,7 +86,7 @@ func (f *FitError) Error() string { type genericScheduler struct { cache schedulercache.Cache - equivalenceCache *EquivalenceCache + equivalenceCache *equivalence.EquivalenceCache schedulingQueue SchedulingQueue predicates map[string]algorithm.FitPredicate priorityMetaProducer algorithm.PriorityMetadataProducer @@ -342,7 +343,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v // We can use the same metadata producer for all nodes. meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) - var equivCacheInfo *EquivalenceClassInfo + var equivCacheInfo *equivalence.EquivalenceClassInfo if g.equivalenceCache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found equivCacheInfo = g.equivalenceCache.GetEquivalenceClassInfo(pod) @@ -459,10 +460,10 @@ func podFitsOnNode( info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, cache schedulercache.Cache, - ecache *EquivalenceCache, + ecache *equivalence.EquivalenceCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, - equivCacheInfo *EquivalenceClassInfo, + equivCacheInfo *equivalence.EquivalenceClassInfo, ) (bool, []algorithm.PredicateFailureReason, error) { var ( eCacheAvailable bool @@ -1056,7 +1057,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla // NewGenericScheduler creates a genericScheduler object. func NewGenericScheduler( cache schedulercache.Cache, - eCache *EquivalenceCache, + eCache *equivalence.EquivalenceCache, podQueue SchedulingQueue, predicates map[string]algorithm.FitPredicate, predicateMetaProducer algorithm.PredicateMetadataProducer, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index ce10d9fd3f..9b164cf6d6 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -22,6 +22,7 @@ import ( "reflect" "strconv" "strings" + "sync" "testing" "time" @@ -39,6 +40,7 @@ import ( priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + "k8s.io/kubernetes/pkg/scheduler/core/equivalence" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -1342,3 +1344,100 @@ func TestPreempt(t *testing.T) { close(stop) } } + +// syncingMockCache delegates method calls to an actual Cache, +// but calls to UpdateNodeNameToInfoMap synchronize with the test. +type syncingMockCache struct { + schedulercache.Cache + cycleStart, cacheInvalidated chan struct{} + once sync.Once +} + +// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it +// synchronizes with the test. +// +// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use +// this point to signal to the test that a scheduling cycle has started. +func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { + err := c.Cache.UpdateNodeNameToInfoMap(infoMap) + c.once.Do(func() { + c.cycleStart <- struct{}{} + <-c.cacheInvalidated + }) + return err +} + +// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly +// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event +// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired. +func TestEquivalenceCacheInvalidationRace(t *testing.T) { + // Create a predicate that returns false the first time and true on subsequent calls. + podWillFit := false + var callCount int + testPredicate := func(pod *v1.Pod, + meta algorithm.PredicateMetadata, + nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + callCount++ + if !podWillFit { + podWillFit = true + return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil + } + return true, nil, nil + } + + // Set up the mock cache. + cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}) + mockCache := &syncingMockCache{ + Cache: cache, + cycleStart: make(chan struct{}), + cacheInvalidated: make(chan struct{}), + } + + eCache := equivalence.NewEquivalenceCache() + // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before + // the equivalence cache would be updated. + go func() { + <-mockCache.cycleStart + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"}, + Spec: v1.PodSpec{NodeName: "machine1"}} + if err := cache.AddPod(pod); err != nil { + t.Errorf("Could not add pod to cache: %v", err) + } + eCache.InvalidateAllPredicatesOnNode("machine1") + mockCache.cacheInvalidated <- struct{}{} + }() + + // Set up the scheduler. + ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} + algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) + prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} + pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) + scheduler := NewGenericScheduler( + mockCache, + eCache, + NewSchedulingQueue(), + ps, + algorithm.EmptyPredicateMetadataProducer, + prioritizers, + algorithm.EmptyPriorityMetadataProducer, + nil, nil, pvcLister, true, false) + + // First scheduling attempt should fail. + nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"})) + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}} + machine, err := scheduler.Schedule(pod, nodeLister) + if machine != "" || err == nil { + t.Error("First scheduling attempt did not fail") + } + + // Second scheduling attempt should succeed because cache was invalidated. + _, err = scheduler.Schedule(pod, nodeLister) + if err != nil { + t.Errorf("Second scheduling attempt failed: %v", err) + } + if callCount != 2 { + t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) + } +} diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index faf552687c..cc0ebf88ff 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -62,6 +62,7 @@ go_library( "//pkg/scheduler/api/validation:go_default_library", "//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/core:go_default_library", + "//pkg/scheduler/core/equivalence:go_default_library", "//pkg/scheduler/util:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//vendor/github.com/golang/glog:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index c401f834b3..29323526fe 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/api/validation" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/scheduler/core" + "k8s.io/kubernetes/pkg/scheduler/core/equivalence" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -123,7 +124,7 @@ type configFactory struct { hardPodAffinitySymmetricWeight int32 // Equivalence class cache - equivalencePodCache *core.EquivalenceCache + equivalencePodCache *equivalence.EquivalenceCache // Enable equivalence class cache enableEquivalenceClassCache bool @@ -1074,7 +1075,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // Init equivalence class cache if c.enableEquivalenceClassCache { - c.equivalencePodCache = core.NewEquivalenceCache() + c.equivalencePodCache = equivalence.NewEquivalenceCache() glog.Info("Created equivalence class cache") } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 98b36678ab..1288758008 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -34,6 +34,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/scheduler/core" + "k8s.io/kubernetes/pkg/scheduler/core/equivalence" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" @@ -104,7 +105,7 @@ type Config struct { SchedulerCache schedulercache.Cache // Ecache is used for optimistically invalid affected cache items after // successfully binding a pod - Ecache *core.EquivalenceCache + Ecache *equivalence.EquivalenceCache NodeLister algorithm.NodeLister Algorithm algorithm.ScheduleAlgorithm GetBinder func(pod *v1.Pod) Binder From b571065bc473f181b2be5e5b4be3b66e187758fb Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Wed, 23 May 2018 16:42:30 -0700 Subject: [PATCH 2/4] Clean up names in equivalence package. Remove stutter from names and provide more idiomatic patterns. This makes call sites that use equivalence cache easier to read. --- pkg/scheduler/core/equivalence/eqivalence.go | 139 +++++++++--------- .../core/equivalence/eqivalence_test.go | 17 +-- pkg/scheduler/core/generic_scheduler.go | 18 +-- pkg/scheduler/core/generic_scheduler_test.go | 6 +- pkg/scheduler/factory/factory.go | 4 +- pkg/scheduler/scheduler.go | 2 +- 6 files changed, 92 insertions(+), 94 deletions(-) diff --git a/pkg/scheduler/core/equivalence/eqivalence.go b/pkg/scheduler/core/equivalence/eqivalence.go index eee9324ceb..c8a5106021 100644 --- a/pkg/scheduler/core/equivalence/eqivalence.go +++ b/pkg/scheduler/core/equivalence/eqivalence.go @@ -31,19 +31,51 @@ import ( "github.com/golang/glog" ) -// EquivalenceCache saves and reuses the output of predicate functions. Use -// RunPredicate to get or update the cached results. An appropriate Invalidate* -// function should be called when some predicate results are no longer valid. +// Cache saves and reuses the output of predicate functions. Use RunPredicate to +// get or update the cached results. An appropriate Invalidate* function should +// be called when some predicate results are no longer valid. // // Internally, results are keyed by node name, predicate name, and "equivalence -// class". (Equivalence class is defined in the `EquivalenceClassInfo` type.) -// Saved results will be reused until an appropriate invalidation function is -// called. -type EquivalenceCache struct { +// class". (Equivalence class is defined in the `Class` type.) Saved results +// will be reused until an appropriate invalidation function is called. +type Cache struct { mu sync.RWMutex cache nodeMap } +// NewCache returns an empty Cache. +func NewCache() *Cache { + return &Cache{ + cache: make(nodeMap), + } +} + +// Class represents a set of pods which are equivalent from the perspective of +// the scheduler. i.e. the scheduler would make the same decision for any pod +// from the same class. +type Class struct { + // Equivalence hash + hash uint64 +} + +// NewClass returns the equivalence class for a given Pod. The returned Class +// objects will be equal for two Pods in the same class. nil values should not +// be considered equal to each other. +// +// NOTE: Make sure to compare types of Class and not *Class. +// TODO(misterikkit): Return error instead of nil *Class. +func NewClass(pod *v1.Pod) *Class { + equivalencePod := getEquivalencePod(pod) + if equivalencePod != nil { + hash := fnv.New32a() + hashutil.DeepHashObject(hash, equivalencePod) + return &Class{ + hash: uint64(hash.Sum32()), + } + } + return nil +} + // nodeMap stores PredicateCaches with node name as the key. type nodeMap map[string]predicateMap @@ -59,25 +91,17 @@ type predicateResult struct { FailReasons []algorithm.PredicateFailureReason } -// NewEquivalenceCache returns EquivalenceCache to speed up predicates by caching -// result from previous scheduling. -func NewEquivalenceCache() *EquivalenceCache { - return &EquivalenceCache{ - cache: make(nodeMap), - } -} - // RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be // run and its results cached for the next call. // // NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale. -func (ec *EquivalenceCache) RunPredicate( +func (c *Cache) RunPredicate( pred algorithm.FitPredicate, predicateKey string, pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, - equivClassInfo *EquivalenceClassInfo, + equivClass *Class, cache schedulercache.Cache, ) (bool, []algorithm.PredicateFailureReason, error) { if nodeInfo == nil || nodeInfo.Node() == nil { @@ -85,7 +109,7 @@ func (ec *EquivalenceCache) RunPredicate( return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") } - result, ok := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash) + result, ok := c.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash) if ok { return result.Fit, result.FailReasons, nil } @@ -94,13 +118,13 @@ func (ec *EquivalenceCache) RunPredicate( return fit, reasons, err } if cache != nil { - ec.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClassInfo.hash, cache, nodeInfo) + c.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo) } return fit, reasons, nil } // updateResult updates the cached result of a predicate. -func (ec *EquivalenceCache) updateResult( +func (c *Cache) updateResult( podName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, @@ -108,8 +132,8 @@ func (ec *EquivalenceCache) updateResult( cache schedulercache.Cache, nodeInfo *schedulercache.NodeInfo, ) { - ec.mu.Lock() - defer ec.mu.Unlock() + c.mu.Lock() + defer c.mu.Unlock() if nodeInfo == nil || nodeInfo.Node() == nil { // This may happen during tests. return @@ -119,19 +143,19 @@ func (ec *EquivalenceCache) updateResult( return } nodeName := nodeInfo.Node().GetName() - if _, exist := ec.cache[nodeName]; !exist { - ec.cache[nodeName] = make(predicateMap) + if _, exist := c.cache[nodeName]; !exist { + c.cache[nodeName] = make(predicateMap) } predicateItem := predicateResult{ Fit: fit, FailReasons: reasons, } // if cached predicate map already exists, just update the predicate by key - if predicates, ok := ec.cache[nodeName][predicateKey]; ok { + if predicates, ok := c.cache[nodeName][predicateKey]; ok { // maps in golang are references, no need to add them back predicates[equivalenceHash] = predicateItem } else { - ec.cache[nodeName][predicateKey] = + c.cache[nodeName][predicateKey] = resultMap{ equivalenceHash: predicateItem, } @@ -141,27 +165,27 @@ func (ec *EquivalenceCache) updateResult( // lookupResult returns cached predicate results and a bool saying whether a // cache entry was found. -func (ec *EquivalenceCache) lookupResult( +func (c *Cache) lookupResult( podName, nodeName, predicateKey string, equivalenceHash uint64, ) (value predicateResult, ok bool) { - ec.mu.RLock() - defer ec.mu.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, podName, nodeName) - value, ok = ec.cache[nodeName][predicateKey][equivalenceHash] + value, ok = c.cache[nodeName][predicateKey][equivalenceHash] return value, ok } // InvalidatePredicates clears all cached results for the given predicates. -func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) { +func (c *Cache) InvalidatePredicates(predicateKeys sets.String) { if len(predicateKeys) == 0 { return } - ec.mu.Lock() - defer ec.mu.Unlock() - // ec.cache uses nodeName as key, so we just iterate it and invalid given predicates - for _, predicates := range ec.cache { + c.mu.Lock() + defer c.mu.Unlock() + // c.cache uses nodeName as key, so we just iterate it and invalid given predicates + for _, predicates := range c.cache { for predicateKey := range predicateKeys { delete(predicates, predicateKey) } @@ -170,30 +194,30 @@ func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) { } // InvalidatePredicatesOnNode clears cached results for the given predicates on one node. -func (ec *EquivalenceCache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) { +func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) { if len(predicateKeys) == 0 { return } - ec.mu.Lock() - defer ec.mu.Unlock() + c.mu.Lock() + defer c.mu.Unlock() for predicateKey := range predicateKeys { - delete(ec.cache[nodeName], predicateKey) + delete(c.cache[nodeName], predicateKey) } glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) } // InvalidateAllPredicatesOnNode clears all cached results for one node. -func (ec *EquivalenceCache) InvalidateAllPredicatesOnNode(nodeName string) { - ec.mu.Lock() - defer ec.mu.Unlock() - delete(ec.cache, nodeName) +func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.cache, nodeName) glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) } // InvalidateCachedPredicateItemForPodAdd is a wrapper of // InvalidateCachedPredicateItem for pod add case -// TODO: This logic does not belong with the equivalence cache implementation. -func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { +// TODO: This does not belong with the equivalence cache implementation. +func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod // will not break the existing inter pod affinity. So we does not need to // invalidate MatchInterPodAffinity when pod added. @@ -226,30 +250,7 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, } } } - ec.InvalidatePredicatesOnNode(nodeName, invalidPredicates) -} - -// EquivalenceClassInfo holds equivalence hash which is used for checking -// equivalence cache. We will pass this to podFitsOnNode to ensure equivalence -// hash is only calculated per schedule. -type EquivalenceClassInfo struct { - // Equivalence hash. - hash uint64 -} - -// GetEquivalenceClassInfo returns a hash of the given pod. The hashing function -// returns the same value for any two pods that are equivalent from the -// perspective of scheduling. -func (ec *EquivalenceCache) GetEquivalenceClassInfo(pod *v1.Pod) *EquivalenceClassInfo { - equivalencePod := getEquivalencePod(pod) - if equivalencePod != nil { - hash := fnv.New32a() - hashutil.DeepHashObject(hash, equivalencePod) - return &EquivalenceClassInfo{ - hash: uint64(hash.Sum32()), - } - } - return nil + c.InvalidatePredicatesOnNode(nodeName, invalidPredicates) } // equivalencePod is the set of pod attributes which must match for two pods to diff --git a/pkg/scheduler/core/equivalence/eqivalence_test.go b/pkg/scheduler/core/equivalence/eqivalence_test.go index c9353d3ac9..a7c8fc3360 100644 --- a/pkg/scheduler/core/equivalence/eqivalence_test.go +++ b/pkg/scheduler/core/equivalence/eqivalence_test.go @@ -247,8 +247,8 @@ func TestRunPredicate(t *testing.T) { pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}} meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) - ecache := NewEquivalenceCache() - equivClass := ecache.GetEquivalenceClassInfo(pod) + ecache := NewCache() + equivClass := NewClass(pod) if test.expectCacheHit { ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node) } @@ -339,7 +339,7 @@ func TestUpdateResult(t *testing.T) { }, } for _, test := range tests { - ecache := NewEquivalenceCache() + ecache := NewCache() if test.expectPredicateMap { ecache.cache[test.nodeName] = make(predicateMap) predicateItem := predicateResult{ @@ -473,7 +473,7 @@ func TestLookupResult(t *testing.T) { } for _, test := range tests { - ecache := NewEquivalenceCache() + ecache := NewCache() node := schedulercache.NewNodeInfo() node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) // set cached item to equivalence cache @@ -527,9 +527,6 @@ func TestLookupResult(t *testing.T) { } func TestGetEquivalenceHash(t *testing.T) { - - ecache := NewEquivalenceCache() - pod1 := makeBasicPod("pod1") pod2 := makeBasicPod("pod2") @@ -620,7 +617,7 @@ func TestGetEquivalenceHash(t *testing.T) { t.Run(test.name, func(t *testing.T) { for i, podInfo := range test.podInfoList { testPod := podInfo.pod - eclassInfo := ecache.GetEquivalenceClassInfo(testPod) + eclassInfo := NewClass(testPod) if eclassInfo == nil && podInfo.hashIsValid { t.Errorf("Failed: pod %v is expected to have valid hash", testPod) } @@ -688,7 +685,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { cache: &upToDateCache{}, }, } - ecache := NewEquivalenceCache() + ecache := NewCache() for _, test := range tests { node := schedulercache.NewNodeInfo() @@ -760,7 +757,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { cache: &upToDateCache{}, }, } - ecache := NewEquivalenceCache() + ecache := NewCache() for _, test := range tests { node := schedulercache.NewNodeInfo() diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 35bf8e212a..bdde7aaf50 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -86,7 +86,7 @@ func (f *FitError) Error() string { type genericScheduler struct { cache schedulercache.Cache - equivalenceCache *equivalence.EquivalenceCache + equivalenceCache *equivalence.Cache schedulingQueue SchedulingQueue predicates map[string]algorithm.FitPredicate priorityMetaProducer algorithm.PriorityMetadataProducer @@ -343,10 +343,10 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v // We can use the same metadata producer for all nodes. meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) - var equivCacheInfo *equivalence.EquivalenceClassInfo + var equivClass *equivalence.Class if g.equivalenceCache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found - equivCacheInfo = g.equivalenceCache.GetEquivalenceClassInfo(pod) + equivClass = equivalence.NewClass(pod) } checkNode := func(i int) { @@ -360,7 +360,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates, - equivCacheInfo, + equivClass, ) if err != nil { predicateResultLock.Lock() @@ -460,10 +460,10 @@ func podFitsOnNode( info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, cache schedulercache.Cache, - ecache *equivalence.EquivalenceCache, + ecache *equivalence.Cache, queue SchedulingQueue, alwaysCheckAllPredicates bool, - equivCacheInfo *equivalence.EquivalenceClassInfo, + equivClass *equivalence.Class, ) (bool, []algorithm.PredicateFailureReason, error) { var ( eCacheAvailable bool @@ -500,7 +500,7 @@ func podFitsOnNode( // Bypass eCache if node has any nominated pods. // TODO(bsalamat): consider using eCache and adding proper eCache invalidations // when pods are nominated or their nominations change. - eCacheAvailable = equivCacheInfo != nil && !podsAdded + eCacheAvailable = equivClass != nil && !podsAdded for _, predicateKey := range predicates.Ordering() { var ( fit bool @@ -510,7 +510,7 @@ func podFitsOnNode( //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric if predicate, exist := predicateFuncs[predicateKey]; exist { if eCacheAvailable { - fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivCacheInfo, cache) + fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache) } else { fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) } @@ -1057,7 +1057,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla // NewGenericScheduler creates a genericScheduler object. func NewGenericScheduler( cache schedulercache.Cache, - eCache *equivalence.EquivalenceCache, + eCache *equivalence.Cache, podQueue SchedulingQueue, predicates map[string]algorithm.FitPredicate, predicateMetaProducer algorithm.PredicateMetadataProducer, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 9b164cf6d6..d0611c55a5 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1367,10 +1367,10 @@ func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*scheduler return err } -// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly +// TestCacheInvalidationRace tests that equivalence cache invalidation is correctly // handled when an invalidation event happens early in a scheduling cycle. Specifically, the event // occurs after schedulercache is snapshotted and before equivalence cache lock is acquired. -func TestEquivalenceCacheInvalidationRace(t *testing.T) { +func TestCacheInvalidationRace(t *testing.T) { // Create a predicate that returns false the first time and true on subsequent calls. podWillFit := false var callCount int @@ -1394,7 +1394,7 @@ func TestEquivalenceCacheInvalidationRace(t *testing.T) { cacheInvalidated: make(chan struct{}), } - eCache := equivalence.NewEquivalenceCache() + eCache := equivalence.NewCache() // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before // the equivalence cache would be updated. go func() { diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 29323526fe..2082e22fee 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -124,7 +124,7 @@ type configFactory struct { hardPodAffinitySymmetricWeight int32 // Equivalence class cache - equivalencePodCache *equivalence.EquivalenceCache + equivalencePodCache *equivalence.Cache // Enable equivalence class cache enableEquivalenceClassCache bool @@ -1075,7 +1075,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // Init equivalence class cache if c.enableEquivalenceClassCache { - c.equivalencePodCache = equivalence.NewEquivalenceCache() + c.equivalencePodCache = equivalence.NewCache() glog.Info("Created equivalence class cache") } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 1288758008..d06a6cb9e3 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -105,7 +105,7 @@ type Config struct { SchedulerCache schedulercache.Cache // Ecache is used for optimistically invalid affected cache items after // successfully binding a pod - Ecache *equivalence.EquivalenceCache + Ecache *equivalence.Cache NodeLister algorithm.NodeLister Algorithm algorithm.ScheduleAlgorithm GetBinder func(pod *v1.Pod) Binder From c24842d806d3291c103ad253149851b711afc38f Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Tue, 29 May 2018 12:11:23 -0700 Subject: [PATCH 3/4] Add a package docstring to core/equivalence. --- pkg/scheduler/core/equivalence/eqivalence.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/scheduler/core/equivalence/eqivalence.go b/pkg/scheduler/core/equivalence/eqivalence.go index c8a5106021..09a2c89459 100644 --- a/pkg/scheduler/core/equivalence/eqivalence.go +++ b/pkg/scheduler/core/equivalence/eqivalence.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package equivalence defines Pod equivalence classes and the equivalence class +// cache. package equivalence import ( From 56b941f3df5633770b56b042d0aadc09b57e6ac8 Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Wed, 30 May 2018 13:48:50 -0700 Subject: [PATCH 4/4] scheduler: fix equiv. cache logging. Change the log messages to accurately reflect the cache behavior. --- pkg/scheduler/core/equivalence/eqivalence.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/core/equivalence/eqivalence.go b/pkg/scheduler/core/equivalence/eqivalence.go index 09a2c89459..5ec9d9fdcc 100644 --- a/pkg/scheduler/core/equivalence/eqivalence.go +++ b/pkg/scheduler/core/equivalence/eqivalence.go @@ -162,7 +162,7 @@ func (c *Cache) updateResult( equivalenceHash: predicateItem, } } - glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem) + glog.V(5).Infof("Cache update: node=%s,predicate=%s,pod=%s,value=%v", nodeName, predicateKey, podName, predicateItem) } // lookupResult returns cached predicate results and a bool saying whether a @@ -173,8 +173,7 @@ func (c *Cache) lookupResult( ) (value predicateResult, ok bool) { c.mu.RLock() defer c.mu.RUnlock() - glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", - predicateKey, podName, nodeName) + glog.V(5).Infof("Cache lookup: node=%s,predicate=%s,pod=%s", nodeName, predicateKey, podName) value, ok = c.cache[nodeName][predicateKey][equivalenceHash] return value, ok } @@ -192,7 +191,7 @@ func (c *Cache) InvalidatePredicates(predicateKeys sets.String) { delete(predicates, predicateKey) } } - glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys) + glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys) } // InvalidatePredicatesOnNode clears cached results for the given predicates on one node. @@ -205,7 +204,7 @@ func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.S for predicateKey := range predicateKeys { delete(c.cache[nodeName], predicateKey) } - glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) + glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys) } // InvalidateAllPredicatesOnNode clears all cached results for one node. @@ -213,7 +212,7 @@ func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) { c.mu.Lock() defer c.mu.Unlock() delete(c.cache, nodeName) - glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) + glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName) } // InvalidateCachedPredicateItemForPodAdd is a wrapper of