mirror of https://github.com/k3s-io/k3s
Test race condition in equivalence cache.
Add a unit test that invalidates equivalence cache during a scheduling cycle. This exercises the bug described in https://github.com/kubernetes/kubernetes/issues/62921pull/8/head
parent
817aac84b6
commit
02d657827c
|
@ -18,13 +18,19 @@ package core
|
|||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
algorithmpredicates "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
||||
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
)
|
||||
|
||||
type predicateItemType struct {
|
||||
|
@ -649,3 +655,101 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncingMockCache delegates method calls to an actual Cache,
|
||||
// but calls to UpdateNodeNameToInfoMap synchronize with the test.
|
||||
type syncingMockCache struct {
|
||||
schedulercache.Cache
|
||||
cycleStart, cacheInvalidated chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it
|
||||
// synchronizes with the test.
|
||||
//
|
||||
// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use
|
||||
// this point to signal to the test that a scheduling cycle has started.
|
||||
func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
|
||||
err := c.Cache.UpdateNodeNameToInfoMap(infoMap)
|
||||
c.once.Do(func() {
|
||||
c.cycleStart <- struct{}{}
|
||||
<-c.cacheInvalidated
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly
|
||||
// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event
|
||||
// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired.
|
||||
func TestEquivalenceCacheInvalidationRace(t *testing.T) {
|
||||
// Create a predicate that returns false the first time and true on subsequent calls.
|
||||
podWillFit := false
|
||||
var callCount int
|
||||
testPredicate := func(pod *v1.Pod,
|
||||
meta algorithm.PredicateMetadata,
|
||||
nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
callCount++
|
||||
if !podWillFit {
|
||||
podWillFit = true
|
||||
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
|
||||
}
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// Set up the mock cache.
|
||||
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
|
||||
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}})
|
||||
mockCache := &syncingMockCache{
|
||||
Cache: cache,
|
||||
cycleStart: make(chan struct{}),
|
||||
cacheInvalidated: make(chan struct{}),
|
||||
}
|
||||
|
||||
fakeGetEquivalencePod := func(pod *v1.Pod) interface{} { return pod }
|
||||
eCache := NewEquivalenceCache(fakeGetEquivalencePod)
|
||||
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
|
||||
// the equivalence cache would be updated.
|
||||
go func() {
|
||||
<-mockCache.cycleStart
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
|
||||
Spec: v1.PodSpec{NodeName: "machine1"}}
|
||||
if err := cache.AddPod(pod); err != nil {
|
||||
t.Errorf("Could not add pod to cache: %v", err)
|
||||
}
|
||||
eCache.InvalidateAllCachedPredicateItemOfNode("machine1")
|
||||
mockCache.cacheInvalidated <- struct{}{}
|
||||
}()
|
||||
|
||||
// Set up the scheduler.
|
||||
predicates := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
|
||||
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
|
||||
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
|
||||
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
|
||||
scheduler := NewGenericScheduler(
|
||||
mockCache,
|
||||
eCache,
|
||||
NewSchedulingQueue(),
|
||||
predicates,
|
||||
algorithm.EmptyPredicateMetadataProducer,
|
||||
prioritizers,
|
||||
algorithm.EmptyPriorityMetadataProducer,
|
||||
nil, nil, pvcLister, true, false)
|
||||
|
||||
// First scheduling attempt should fail.
|
||||
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
|
||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
|
||||
machine, err := scheduler.Schedule(pod, nodeLister)
|
||||
if machine != "" || err == nil {
|
||||
t.Error("First scheduling attempt did not fail")
|
||||
}
|
||||
|
||||
// Second scheduling attempt should succeed because cache was invalidated.
|
||||
_, err = scheduler.Schedule(pod, nodeLister)
|
||||
if err != nil {
|
||||
t.Errorf("Second scheduling attempt failed: %v", err)
|
||||
}
|
||||
if callCount != 2 {
|
||||
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,8 +54,6 @@ import (
|
|||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
const enableEquivalenceCache = true
|
||||
|
||||
type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface)
|
||||
|
||||
type nodeStateManager struct {
|
||||
|
|
Loading…
Reference in New Issue