mirror of https://github.com/k3s-io/k3s
Merge pull request #63692 from msau42/debug-scheduler
Automatic merge from submit-queue (batch tested with PRs 60012, 63692, 63977, 63960, 64008). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Only override objects from informer when version has increased. **What this PR does / why we need it**: We don't want an informer resync to override assumed volumes if the version has not increased. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #63467 **Special notes for your reviewer**: **Release note**: ```release-note NONE ```pull/8/head
commit
680e00a656
|
@ -158,8 +158,30 @@ func (c *assumeCache) add(obj interface{}) {
|
|||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
if objInfo, _ := c.getObjInfo(name); objInfo != nil {
|
||||
newVersion, err := c.getObjVersion(name, obj)
|
||||
if err != nil {
|
||||
glog.Errorf("add: couldn't get object version: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
|
||||
if err != nil {
|
||||
glog.Errorf("add: couldn't get stored object version: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Only update object if version is newer.
|
||||
// This is so we don't override assumed objects due to informer resync.
|
||||
if newVersion <= storedVersion {
|
||||
glog.V(10).Infof("Skip adding %v %v to assume cache because version %v is not newer than %v", c.description, name, newVersion, storedVersion)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
|
||||
c.store.Update(objInfo)
|
||||
glog.V(10).Infof("Adding %v %v to assume cache: %+v ", c.description, name, obj)
|
||||
}
|
||||
|
||||
func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
|
||||
|
|
|
@ -88,7 +88,7 @@ func TestAssumePV(t *testing.T) {
|
|||
|
||||
// Add oldPV to cache
|
||||
internal_cache.add(scenario.oldPV)
|
||||
if err := getPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
|
||||
if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil {
|
||||
t.Errorf("Failed to GetPV() after initial update: %v", err)
|
||||
continue
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ func TestAssumePV(t *testing.T) {
|
|||
if !scenario.shouldSucceed {
|
||||
expectedPV = scenario.oldPV
|
||||
}
|
||||
if err := getPV(cache, scenario.oldPV.Name, expectedPV); err != nil {
|
||||
if err := verifyPV(cache, scenario.oldPV.Name, expectedPV); err != nil {
|
||||
t.Errorf("Failed to GetPV() after initial update: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -128,13 +128,13 @@ func TestRestorePV(t *testing.T) {
|
|||
|
||||
// Add oldPV to cache
|
||||
internal_cache.add(oldPV)
|
||||
if err := getPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after initial update: %v", err)
|
||||
}
|
||||
|
||||
// Restore PV
|
||||
cache.Restore(oldPV.Name)
|
||||
if err := getPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after iniital restore: %v", err)
|
||||
}
|
||||
|
||||
|
@ -142,13 +142,13 @@ func TestRestorePV(t *testing.T) {
|
|||
if err := cache.Assume(newPV); err != nil {
|
||||
t.Fatalf("Assume() returned error %v", err)
|
||||
}
|
||||
if err := getPV(cache, oldPV.Name, newPV); err != nil {
|
||||
if err := verifyPV(cache, oldPV.Name, newPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after Assume: %v", err)
|
||||
}
|
||||
|
||||
// Restore PV
|
||||
cache.Restore(oldPV.Name)
|
||||
if err := getPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
if err := verifyPV(cache, oldPV.Name, oldPV); err != nil {
|
||||
t.Fatalf("Failed to GetPV() after restore: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -243,6 +243,39 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
|
|||
verifyListPVs(t, cache, pvs2, "class2")
|
||||
}
|
||||
|
||||
func TestAssumeUpdatePVCache(t *testing.T) {
|
||||
cache := NewPVAssumeCache(nil)
|
||||
internal_cache, ok := cache.(*pvAssumeCache)
|
||||
if !ok {
|
||||
t.Fatalf("Failed to get internal cache")
|
||||
}
|
||||
|
||||
pvName := "test-pv0"
|
||||
|
||||
// Add a PV
|
||||
pv := makePV(pvName, "1", "")
|
||||
internal_cache.add(pv)
|
||||
if err := verifyPV(cache, pvName, pv); err != nil {
|
||||
t.Fatalf("failed to get PV: %v", err)
|
||||
}
|
||||
|
||||
// Assume PV
|
||||
newPV := pv.DeepCopy()
|
||||
newPV.Spec.ClaimRef = &v1.ObjectReference{Name: "test-claim"}
|
||||
if err := cache.Assume(newPV); err != nil {
|
||||
t.Fatalf("failed to assume PV: %v", err)
|
||||
}
|
||||
if err := verifyPV(cache, pvName, newPV); err != nil {
|
||||
t.Fatalf("failed to get PV after assume: %v", err)
|
||||
}
|
||||
|
||||
// Add old PV
|
||||
internal_cache.add(pv)
|
||||
if err := verifyPV(cache, pvName, newPV); err != nil {
|
||||
t.Fatalf("failed to get PV after old PV added: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
|
||||
pvList := cache.ListPVs(storageClassName)
|
||||
if len(pvList) != len(expectedPVs) {
|
||||
|
@ -259,7 +292,7 @@ func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1
|
|||
}
|
||||
}
|
||||
|
||||
func getPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
|
||||
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
|
||||
pv, err := cache.GetPV(name)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -174,7 +174,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
|
|||
glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName)
|
||||
|
||||
if allBound := b.arePodVolumesBound(assumedPod); allBound {
|
||||
glog.V(4).Infof("AssumePodVolumes: all PVCs bound and nothing to do")
|
||||
glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: all PVCs bound and nothing to do", podName, nodeName)
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
|
@ -184,7 +184,8 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
|
|||
|
||||
for _, binding := range claimsToBind {
|
||||
newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc)
|
||||
glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for PV %q, PVC %q. newPV %p, dirty %v, err: %v",
|
||||
glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v",
|
||||
podName,
|
||||
binding.pv.Name,
|
||||
binding.pvc.Name,
|
||||
newPV,
|
||||
|
@ -208,7 +209,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
|
|||
if len(newBindings) == 0 {
|
||||
// Don't update cached bindings if no API updates are needed. This can happen if we
|
||||
// previously updated the PV object and are waiting for the PV controller to finish binding.
|
||||
glog.V(4).Infof("AssumePodVolumes: PVs already assumed")
|
||||
glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: PVs already assumed", podName, nodeName)
|
||||
return false, false, nil
|
||||
}
|
||||
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
|
||||
|
@ -218,13 +219,15 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
|
|||
|
||||
// BindPodVolumes gets the cached bindings in podBindingCache and makes the API update for those PVs.
|
||||
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
|
||||
glog.V(4).Infof("BindPodVolumes for pod %q", getPodName(assumedPod))
|
||||
podName := getPodName(assumedPod)
|
||||
glog.V(4).Infof("BindPodVolumes for pod %q", podName)
|
||||
|
||||
bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
|
||||
|
||||
// Do the actual prebinding. Let the PV controller take care of the rest
|
||||
// There is no API rollback if the actual binding fails
|
||||
for i, bindingInfo := range bindings {
|
||||
glog.V(5).Infof("BindPodVolumes: Pod %q, binding PV %q to PVC %q", podName, bindingInfo.pv.Name, bindingInfo.pvc.Name)
|
||||
_, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false)
|
||||
if err != nil {
|
||||
// only revert assumed cached updates for volumes we haven't successfully bound
|
||||
|
@ -335,11 +338,13 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node
|
|||
glog.V(5).Infof("PersistentVolume %q, Node %q matches for Pod %q", pvName, node.Name, podName)
|
||||
}
|
||||
|
||||
glog.V(4).Infof("All volumes for Pod %q match with Node %q", podName, node.Name)
|
||||
glog.V(4).Infof("All bound volumes for Pod %q match with Node %q", podName, node.Name)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, err error) {
|
||||
podName := getPodName(pod)
|
||||
|
||||
// Sort all the claims by increasing size request to get the smallest fits
|
||||
sort.Sort(byPVCSize(claimsToBind))
|
||||
|
||||
|
@ -360,17 +365,18 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
|
|||
return false, err
|
||||
}
|
||||
if bindingInfo.pv == nil {
|
||||
glog.V(4).Infof("No matching volumes for PVC %q on node %q", getPVCName(bindingInfo.pvc), node.Name)
|
||||
glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// matching PV needs to be excluded so we don't select it again
|
||||
chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
|
||||
glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName)
|
||||
}
|
||||
|
||||
// Mark cache with all the matches for each PVC for this node
|
||||
b.podBindingCache.UpdateBindings(pod, node.Name, claimsToBind)
|
||||
glog.V(4).Infof("Found matching volumes on node %q", node.Name)
|
||||
glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ var (
|
|||
pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass)
|
||||
pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass)
|
||||
pvNode1b = makeTestPV("pv-node1b", "node1", "10G", "1", nil, waitClass)
|
||||
pvNode1c = makeTestPV("pv-node1b", "node1", "5G", "1", nil, waitClass)
|
||||
pvNode2 = makeTestPV("pv-node2", "node2", "1G", "1", nil, waitClass)
|
||||
pvPrebound = makeTestPV("pv-prebound", "node1", "1G", "1", unboundPVC, waitClass)
|
||||
pvBound = makeTestPV("pv-bound", "node1", "1G", "1", boundPVC, waitClass)
|
||||
|
@ -190,6 +191,11 @@ func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Po
|
|||
}
|
||||
}
|
||||
|
||||
func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) []*bindingInfo {
|
||||
cache := env.internalBinder.podBindingCache
|
||||
return cache.GetBindings(pod, node)
|
||||
}
|
||||
|
||||
func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) {
|
||||
// TODO: Check binding cache
|
||||
|
||||
|
@ -734,3 +740,69 @@ func TestBindPodVolumes(t *testing.T) {
|
|||
testEnv.validateBind(t, name, pod, scenario.expectedPVs, scenario.expectedAPIPVs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindAssumeVolumes(t *testing.T) {
|
||||
// Set feature gate
|
||||
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
|
||||
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
|
||||
|
||||
// Test case
|
||||
podPVCs := []*v1.PersistentVolumeClaim{unboundPVC}
|
||||
pvs := []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1c}
|
||||
|
||||
// Setup
|
||||
testEnv := newTestBinder(t)
|
||||
testEnv.initVolumes(pvs, pvs)
|
||||
testEnv.initClaims(t, podPVCs)
|
||||
pod := makePod(podPVCs)
|
||||
|
||||
testNode := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node1",
|
||||
Labels: map[string]string{
|
||||
nodeLabelKey: "node1",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Execute
|
||||
// 1. Find matching PVs
|
||||
unboundSatisfied, _, err := testEnv.binder.FindPodVolumes(pod, testNode)
|
||||
if err != nil {
|
||||
t.Errorf("Test failed: FindPodVolumes returned error: %v", err)
|
||||
}
|
||||
if !unboundSatisfied {
|
||||
t.Errorf("Test failed: couldn't find PVs for all PVCs")
|
||||
}
|
||||
expectedBindings := testEnv.getPodBindings(t, "before-assume", testNode.Name, pod)
|
||||
|
||||
// 2. Assume matches
|
||||
allBound, bindingRequired, err := testEnv.binder.AssumePodVolumes(pod, testNode.Name)
|
||||
if err != nil {
|
||||
t.Errorf("Test failed: AssumePodVolumes returned error: %v", err)
|
||||
}
|
||||
if allBound {
|
||||
t.Errorf("Test failed: detected unbound volumes as bound")
|
||||
}
|
||||
if !bindingRequired {
|
||||
t.Errorf("Test failed: binding not required")
|
||||
}
|
||||
testEnv.validateAssume(t, "assume", pod, expectedBindings)
|
||||
// After assume, claimref should be set on pv
|
||||
expectedBindings = testEnv.getPodBindings(t, "after-assume", testNode.Name, pod)
|
||||
|
||||
// 3. Find matching PVs again
|
||||
// This should always return the original chosen pv
|
||||
// Run this many times in case sorting returns different orders for the two PVs.
|
||||
t.Logf("Testing FindPodVolumes after Assume")
|
||||
for i := 0; i < 50; i++ {
|
||||
unboundSatisfied, _, err := testEnv.binder.FindPodVolumes(pod, testNode)
|
||||
if err != nil {
|
||||
t.Errorf("Test failed: FindPodVolumes returned error: %v", err)
|
||||
}
|
||||
if !unboundSatisfied {
|
||||
t.Errorf("Test failed: couldn't find PVs for all PVCs")
|
||||
}
|
||||
testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -241,8 +241,6 @@ func TestVolumeBinding(t *testing.T) {
|
|||
validatePVPhase(t, config.client, pv.name, v1.VolumeAvailable)
|
||||
}
|
||||
|
||||
// TODO: validate events on Pods and PVCs
|
||||
|
||||
// Force delete objects, but they still may not be immediately removed
|
||||
deleteTestObjects(config.client, config.ns, deleteOption)
|
||||
}
|
||||
|
@ -288,7 +286,9 @@ func TestVolumeBindingStress(t *testing.T) {
|
|||
|
||||
// Validate Pods scheduled
|
||||
for _, pod := range pods {
|
||||
if err := waitForPodToSchedule(config.client, pod); err != nil {
|
||||
// Use increased timeout for stress test because there is a higher chance of
|
||||
// PV sync error
|
||||
if err := waitForPodToScheduleWithTimeout(config.client, pod, 60*time.Second); err != nil {
|
||||
t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
|
||||
}
|
||||
}
|
||||
|
@ -300,8 +300,6 @@ func TestVolumeBindingStress(t *testing.T) {
|
|||
for _, pv := range pvs {
|
||||
validatePVPhase(t, config.client, pv.Name, v1.VolumeBound)
|
||||
}
|
||||
|
||||
// TODO: validate events on Pods and PVCs
|
||||
}
|
||||
|
||||
func TestPVAffinityConflict(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue