Move equivalence cache into new package.

This moves the equivalence cache implementation out of the 'core'
package and into k8s.io/kubernetes/pkg/scheduler/core/equivalence.

Separating the equiv. cache from the genericScheduler implementation
make their interaction points easier to follow, and prevents us from
accidentally accessing unexported fields.
pull/8/head
Jonathan Basseri 2018-05-23 16:06:45 -07:00
parent bbb138532a
commit 31c746d960
10 changed files with 167 additions and 114 deletions

View File

@ -46,6 +46,7 @@ go_library(
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",

View File

@ -9,7 +9,6 @@ load(
go_test(
name = "go_default_test",
srcs = [
"equivalence_cache_test.go",
"extender_test.go",
"generic_scheduler_test.go",
"scheduling_queue_test.go",
@ -22,6 +21,7 @@ go_test(
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//vendor/k8s.io/api/apps/v1beta1:go_default_library",
@ -38,7 +38,6 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"equivalence_cache.go",
"extender.go",
"generic_scheduler.go",
"scheduling_queue.go",
@ -51,10 +50,10 @@ go_library(
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//pkg/util/hash:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
@ -80,6 +79,9 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/scheduler/core/equivalence:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,47 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["eqivalence.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/core/equivalence",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/util/hash:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["eqivalence_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package core
package equivalence
import (
"fmt"

View File

@ -14,20 +14,17 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package core
package equivalence
import (
"errors"
"reflect"
"sync"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
@ -796,100 +793,3 @@ func BenchmarkEquivalenceHash(b *testing.B) {
getEquivalencePod(pod)
}
}
// syncingMockCache delegates method calls to an actual Cache,
// but calls to UpdateNodeNameToInfoMap synchronize with the test.
type syncingMockCache struct {
schedulercache.Cache
cycleStart, cacheInvalidated chan struct{}
once sync.Once
}
// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it
// synchronizes with the test.
//
// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use
// this point to signal to the test that a scheduling cycle has started.
func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
err := c.Cache.UpdateNodeNameToInfoMap(infoMap)
c.once.Do(func() {
c.cycleStart <- struct{}{}
<-c.cacheInvalidated
})
return err
}
// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly
// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event
// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired.
func TestEquivalenceCacheInvalidationRace(t *testing.T) {
// Create a predicate that returns false the first time and true on subsequent calls.
podWillFit := false
var callCount int
testPredicate := func(pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
callCount++
if !podWillFit {
podWillFit = true
return false, []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, nil
}
return true, nil, nil
}
// Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}})
mockCache := &syncingMockCache{
Cache: cache,
cycleStart: make(chan struct{}),
cacheInvalidated: make(chan struct{}),
}
eCache := NewEquivalenceCache()
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated.
go func() {
<-mockCache.cycleStart
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
Spec: v1.PodSpec{NodeName: "machine1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err)
}
eCache.InvalidateAllPredicatesOnNode("machine1")
mockCache.cacheInvalidated <- struct{}{}
}()
// Set up the scheduler.
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
predicates.SetPredicatesOrdering([]string{"testPredicate"})
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
scheduler := NewGenericScheduler(
mockCache,
eCache,
NewSchedulingQueue(),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
machine, err := scheduler.Schedule(pod, nodeLister)
if machine != "" || err == nil {
t.Error("First scheduling attempt did not fail")
}
// Second scheduling attempt should succeed because cache was invalidated.
_, err = scheduler.Schedule(pod, nodeLister)
if err != nil {
t.Errorf("Second scheduling attempt failed: %v", err)
}
if callCount != 2 {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
}
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -85,7 +86,7 @@ func (f *FitError) Error() string {
type genericScheduler struct {
cache schedulercache.Cache
equivalenceCache *EquivalenceCache
equivalenceCache *equivalence.EquivalenceCache
schedulingQueue SchedulingQueue
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.PriorityMetadataProducer
@ -342,7 +343,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
var equivCacheInfo *EquivalenceClassInfo
var equivCacheInfo *equivalence.EquivalenceClassInfo
if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivCacheInfo = g.equivalenceCache.GetEquivalenceClassInfo(pod)
@ -459,10 +460,10 @@ func podFitsOnNode(
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
cache schedulercache.Cache,
ecache *EquivalenceCache,
ecache *equivalence.EquivalenceCache,
queue SchedulingQueue,
alwaysCheckAllPredicates bool,
equivCacheInfo *EquivalenceClassInfo,
equivCacheInfo *equivalence.EquivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
eCacheAvailable bool
@ -1056,7 +1057,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
// NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler(
cache schedulercache.Cache,
eCache *EquivalenceCache,
eCache *equivalence.EquivalenceCache,
podQueue SchedulingQueue,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer,

View File

@ -22,6 +22,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -39,6 +40,7 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
)
@ -1342,3 +1344,100 @@ func TestPreempt(t *testing.T) {
close(stop)
}
}
// syncingMockCache delegates method calls to an actual Cache,
// but calls to UpdateNodeNameToInfoMap synchronize with the test.
type syncingMockCache struct {
schedulercache.Cache
cycleStart, cacheInvalidated chan struct{}
once sync.Once
}
// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it
// synchronizes with the test.
//
// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use
// this point to signal to the test that a scheduling cycle has started.
func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
err := c.Cache.UpdateNodeNameToInfoMap(infoMap)
c.once.Do(func() {
c.cycleStart <- struct{}{}
<-c.cacheInvalidated
})
return err
}
// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly
// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event
// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired.
func TestEquivalenceCacheInvalidationRace(t *testing.T) {
// Create a predicate that returns false the first time and true on subsequent calls.
podWillFit := false
var callCount int
testPredicate := func(pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
callCount++
if !podWillFit {
podWillFit = true
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
return true, nil, nil
}
// Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}})
mockCache := &syncingMockCache{
Cache: cache,
cycleStart: make(chan struct{}),
cacheInvalidated: make(chan struct{}),
}
eCache := equivalence.NewEquivalenceCache()
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated.
go func() {
<-mockCache.cycleStart
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
Spec: v1.PodSpec{NodeName: "machine1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err)
}
eCache.InvalidateAllPredicatesOnNode("machine1")
mockCache.cacheInvalidated <- struct{}{}
}()
// Set up the scheduler.
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
scheduler := NewGenericScheduler(
mockCache,
eCache,
NewSchedulingQueue(),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
machine, err := scheduler.Schedule(pod, nodeLister)
if machine != "" || err == nil {
t.Error("First scheduling attempt did not fail")
}
// Second scheduling attempt should succeed because cache was invalidated.
_, err = scheduler.Schedule(pod, nodeLister)
if err != nil {
t.Errorf("Second scheduling attempt failed: %v", err)
}
if callCount != 2 {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
}
}

View File

@ -62,6 +62,7 @@ go_library(
"//pkg/scheduler/api/validation:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",

View File

@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/api/validation"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -123,7 +124,7 @@ type configFactory struct {
hardPodAffinitySymmetricWeight int32
// Equivalence class cache
equivalencePodCache *core.EquivalenceCache
equivalencePodCache *equivalence.EquivalenceCache
// Enable equivalence class cache
enableEquivalenceClassCache bool
@ -1074,7 +1075,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = core.NewEquivalenceCache()
c.equivalencePodCache = equivalence.NewEquivalenceCache()
glog.Info("Created equivalence class cache")
}

View File

@ -34,6 +34,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -104,7 +105,7 @@ type Config struct {
SchedulerCache schedulercache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *core.EquivalenceCache
Ecache *equivalence.EquivalenceCache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder