diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go index c5217be0e7..3b2352beac 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go @@ -158,8 +158,30 @@ func (c *assumeCache) add(obj interface{}) { c.mutex.Lock() defer c.mutex.Unlock() + if objInfo, _ := c.getObjInfo(name); objInfo != nil { + newVersion, err := c.getObjVersion(name, obj) + if err != nil { + glog.Errorf("add: couldn't get object version: %v", err) + return + } + + storedVersion, err := c.getObjVersion(name, objInfo.latestObj) + if err != nil { + glog.Errorf("add: couldn't get stored object version: %v", err) + return + } + + // Only update object if version is newer. + // This is so we don't override assumed objects due to informer resync. + if newVersion <= storedVersion { + glog.V(10).Infof("Skip adding %v %v to assume cache because version %v is not newer than %v", c.description, name, newVersion, storedVersion) + return + } + } + objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} c.store.Update(objInfo) + glog.V(10).Infof("Adding %v %v to assume cache: %+v ", c.description, name, obj) } func (c *assumeCache) update(oldObj interface{}, newObj interface{}) { diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go index 2664e4c41e..467daffe74 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go @@ -88,7 +88,7 @@ func TestAssumePV(t *testing.T) { // Add oldPV to cache internal_cache.add(scenario.oldPV) - if err := getPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { + if err := verifyPV(cache, scenario.oldPV.Name, scenario.oldPV); err != nil { t.Errorf("Failed to GetPV() after initial update: %v", err) continue } @@ -107,7 +107,7 @@ func TestAssumePV(t *testing.T) { if !scenario.shouldSucceed { expectedPV = scenario.oldPV } - if err := getPV(cache, scenario.oldPV.Name, expectedPV); err != nil { + if err := verifyPV(cache, scenario.oldPV.Name, expectedPV); err != nil { t.Errorf("Failed to GetPV() after initial update: %v", err) } } @@ -128,13 +128,13 @@ func TestRestorePV(t *testing.T) { // Add oldPV to cache internal_cache.add(oldPV) - if err := getPV(cache, oldPV.Name, oldPV); err != nil { + if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { t.Fatalf("Failed to GetPV() after initial update: %v", err) } // Restore PV cache.Restore(oldPV.Name) - if err := getPV(cache, oldPV.Name, oldPV); err != nil { + if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { t.Fatalf("Failed to GetPV() after iniital restore: %v", err) } @@ -142,13 +142,13 @@ func TestRestorePV(t *testing.T) { if err := cache.Assume(newPV); err != nil { t.Fatalf("Assume() returned error %v", err) } - if err := getPV(cache, oldPV.Name, newPV); err != nil { + if err := verifyPV(cache, oldPV.Name, newPV); err != nil { t.Fatalf("Failed to GetPV() after Assume: %v", err) } // Restore PV cache.Restore(oldPV.Name) - if err := getPV(cache, oldPV.Name, oldPV); err != nil { + if err := verifyPV(cache, oldPV.Name, oldPV); err != nil { t.Fatalf("Failed to GetPV() after restore: %v", err) } } @@ -243,6 +243,39 @@ func TestPVCacheWithStorageClasses(t *testing.T) { verifyListPVs(t, cache, pvs2, "class2") } +func TestAssumeUpdatePVCache(t *testing.T) { + cache := NewPVAssumeCache(nil) + internal_cache, ok := cache.(*pvAssumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + pvName := "test-pv0" + + // Add a PV + pv := makePV(pvName, "1", "") + internal_cache.add(pv) + if err := verifyPV(cache, pvName, pv); err != nil { + t.Fatalf("failed to get PV: %v", err) + } + + // Assume PV + newPV := pv.DeepCopy() + newPV.Spec.ClaimRef = &v1.ObjectReference{Name: "test-claim"} + if err := cache.Assume(newPV); err != nil { + t.Fatalf("failed to assume PV: %v", err) + } + if err := verifyPV(cache, pvName, newPV); err != nil { + t.Fatalf("failed to get PV after assume: %v", err) + } + + // Add old PV + internal_cache.add(pv) + if err := verifyPV(cache, pvName, newPV); err != nil { + t.Fatalf("failed to get PV after old PV added: %v", err) + } +} + func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { pvList := cache.ListPVs(storageClassName) if len(pvList) != len(expectedPVs) { @@ -259,7 +292,7 @@ func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1 } } -func getPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { +func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error { pv, err := cache.GetPV(name) if err != nil { return err diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index b342e7f462..ad4cf139c8 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -174,7 +174,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName) if allBound := b.arePodVolumesBound(assumedPod); allBound { - glog.V(4).Infof("AssumePodVolumes: all PVCs bound and nothing to do") + glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: all PVCs bound and nothing to do", podName, nodeName) return true, false, nil } @@ -184,7 +184,8 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al for _, binding := range claimsToBind { newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc) - glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for PV %q, PVC %q. newPV %p, dirty %v, err: %v", + glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v", + podName, binding.pv.Name, binding.pvc.Name, newPV, @@ -208,7 +209,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al if len(newBindings) == 0 { // Don't update cached bindings if no API updates are needed. This can happen if we // previously updated the PV object and are waiting for the PV controller to finish binding. - glog.V(4).Infof("AssumePodVolumes: PVs already assumed") + glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: PVs already assumed", podName, nodeName) return false, false, nil } b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings) @@ -218,13 +219,15 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al // BindPodVolumes gets the cached bindings in podBindingCache and makes the API update for those PVs. func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error { - glog.V(4).Infof("BindPodVolumes for pod %q", getPodName(assumedPod)) + podName := getPodName(assumedPod) + glog.V(4).Infof("BindPodVolumes for pod %q", podName) bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) // Do the actual prebinding. Let the PV controller take care of the rest // There is no API rollback if the actual binding fails for i, bindingInfo := range bindings { + glog.V(5).Infof("BindPodVolumes: Pod %q, binding PV %q to PVC %q", podName, bindingInfo.pv.Name, bindingInfo.pvc.Name) _, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false) if err != nil { // only revert assumed cached updates for volumes we haven't successfully bound @@ -335,11 +338,13 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node glog.V(5).Infof("PersistentVolume %q, Node %q matches for Pod %q", pvName, node.Name, podName) } - glog.V(4).Infof("All volumes for Pod %q match with Node %q", podName, node.Name) + glog.V(4).Infof("All bound volumes for Pod %q match with Node %q", podName, node.Name) return true, nil } func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, err error) { + podName := getPodName(pod) + // Sort all the claims by increasing size request to get the smallest fits sort.Sort(byPVCSize(claimsToBind)) @@ -360,17 +365,18 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI return false, err } if bindingInfo.pv == nil { - glog.V(4).Infof("No matching volumes for PVC %q on node %q", getPVCName(bindingInfo.pvc), node.Name) + glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name) return false, nil } // matching PV needs to be excluded so we don't select it again chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv + glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName) } // Mark cache with all the matches for each PVC for this node b.podBindingCache.UpdateBindings(pod, node.Name, claimsToBind) - glog.V(4).Infof("Found matching volumes on node %q", node.Name) + glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name) return true, nil } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go index df27dda276..ad8d2efa96 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go @@ -51,6 +51,7 @@ var ( pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass) pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass) pvNode1b = makeTestPV("pv-node1b", "node1", "10G", "1", nil, waitClass) + pvNode1c = makeTestPV("pv-node1b", "node1", "5G", "1", nil, waitClass) pvNode2 = makeTestPV("pv-node2", "node2", "1G", "1", nil, waitClass) pvPrebound = makeTestPV("pv-prebound", "node1", "1G", "1", unboundPVC, waitClass) pvBound = makeTestPV("pv-bound", "node1", "1G", "1", boundPVC, waitClass) @@ -190,6 +191,11 @@ func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Po } } +func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) []*bindingInfo { + cache := env.internalBinder.podBindingCache + return cache.GetBindings(pod, node) +} + func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) { // TODO: Check binding cache @@ -734,3 +740,69 @@ func TestBindPodVolumes(t *testing.T) { testEnv.validateBind(t, name, pod, scenario.expectedPVs, scenario.expectedAPIPVs) } } + +func TestFindAssumeVolumes(t *testing.T) { + // Set feature gate + utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true") + defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false") + + // Test case + podPVCs := []*v1.PersistentVolumeClaim{unboundPVC} + pvs := []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1c} + + // Setup + testEnv := newTestBinder(t) + testEnv.initVolumes(pvs, pvs) + testEnv.initClaims(t, podPVCs) + pod := makePod(podPVCs) + + testNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + nodeLabelKey: "node1", + }, + }, + } + + // Execute + // 1. Find matching PVs + unboundSatisfied, _, err := testEnv.binder.FindPodVolumes(pod, testNode) + if err != nil { + t.Errorf("Test failed: FindPodVolumes returned error: %v", err) + } + if !unboundSatisfied { + t.Errorf("Test failed: couldn't find PVs for all PVCs") + } + expectedBindings := testEnv.getPodBindings(t, "before-assume", testNode.Name, pod) + + // 2. Assume matches + allBound, bindingRequired, err := testEnv.binder.AssumePodVolumes(pod, testNode.Name) + if err != nil { + t.Errorf("Test failed: AssumePodVolumes returned error: %v", err) + } + if allBound { + t.Errorf("Test failed: detected unbound volumes as bound") + } + if !bindingRequired { + t.Errorf("Test failed: binding not required") + } + testEnv.validateAssume(t, "assume", pod, expectedBindings) + // After assume, claimref should be set on pv + expectedBindings = testEnv.getPodBindings(t, "after-assume", testNode.Name, pod) + + // 3. Find matching PVs again + // This should always return the original chosen pv + // Run this many times in case sorting returns different orders for the two PVs. + t.Logf("Testing FindPodVolumes after Assume") + for i := 0; i < 50; i++ { + unboundSatisfied, _, err := testEnv.binder.FindPodVolumes(pod, testNode) + if err != nil { + t.Errorf("Test failed: FindPodVolumes returned error: %v", err) + } + if !unboundSatisfied { + t.Errorf("Test failed: couldn't find PVs for all PVCs") + } + testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings) + } +} diff --git a/test/integration/scheduler/volume_binding_test.go b/test/integration/scheduler/volume_binding_test.go index 99f8bd7106..9f3e484855 100644 --- a/test/integration/scheduler/volume_binding_test.go +++ b/test/integration/scheduler/volume_binding_test.go @@ -241,8 +241,6 @@ func TestVolumeBinding(t *testing.T) { validatePVPhase(t, config.client, pv.name, v1.VolumeAvailable) } - // TODO: validate events on Pods and PVCs - // Force delete objects, but they still may not be immediately removed deleteTestObjects(config.client, config.ns, deleteOption) } @@ -288,7 +286,9 @@ func TestVolumeBindingStress(t *testing.T) { // Validate Pods scheduled for _, pod := range pods { - if err := waitForPodToSchedule(config.client, pod); err != nil { + // Use increased timeout for stress test because there is a higher chance of + // PV sync error + if err := waitForPodToScheduleWithTimeout(config.client, pod, 60*time.Second); err != nil { t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err) } } @@ -300,8 +300,6 @@ func TestVolumeBindingStress(t *testing.T) { for _, pv := range pvs { validatePVPhase(t, config.client, pv.Name, v1.VolumeBound) } - - // TODO: validate events on Pods and PVCs } func TestPVAffinityConflict(t *testing.T) {