Merge pull request #63232 from lichuqiang/provision_plumbing

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Volume topology aware dynamic provisioning: basic plumbing

**What this PR does / why we need it**:

Split PR https://github.com/kubernetes/kubernetes/pull/63193 for better review
part 1: basic scheduler and controller plumbing

Next: https://github.com/kubernetes/kubernetes/pull/63233

**Which issue(s) this PR fixes** 
Feature: https://github.com/kubernetes/features/issues/561
Design: https://github.com/kubernetes/community/issues/2168

**Special notes for your reviewer**:
/sig storage
/sig scheduling
/assign @msau42 @jsafrane @saad-ali @bsalamat


**Release note**:

```release-note
Basic plumbing for volume topology aware dynamic provisioning
```
pull/8/head
Kubernetes Submit Queue 2018-05-25 07:58:53 -07:00 committed by GitHub
commit a8cf18c0ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 919 additions and 139 deletions

View File

@ -135,6 +135,14 @@ const annDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by"
// a volume for this PVC.
const annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
// This annotation is added to a PVC that has been triggered by scheduler to
// be dynamically provisioned. Its value is the name of the selected node.
const annSelectedNode = "volume.alpha.kubernetes.io/selected-node"
// If the provisioner name in a storage class is set to "kubernetes.io/no-provisioner",
// then dynamic provisioning is not supported by the storage.
const notSupportedProvisioner = "kubernetes.io/no-provisioner"
// CloudVolumeCreatedForClaimNamespaceTag is a name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD)
// with namespace of a persistent volume claim used to create this volume.
const CloudVolumeCreatedForClaimNamespaceTag = "kubernetes.io/created-for/pvc/namespace"
@ -277,6 +285,16 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV
return false, nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
// When feature DynamicProvisioningScheduling enabled,
// Scheduler signal to the PV controller to start dynamic
// provisioning by setting the "annSelectedNode" annotation
// in the PVC
if _, ok := claim.Annotations[annSelectedNode]; ok {
return false, nil
}
}
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
@ -291,8 +309,6 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV
return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
}
// TODO: add check to handle dynamic provisioning later
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
@ -320,7 +336,6 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
// OBSERVATION: pvc is "Pending", will retry
switch {
case delayBinding:
// TODO: Skip dynamic provisioning for now
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding")
case v1helper.GetPersistentVolumeClaimClass(claim) != "":
if err = ctrl.provisionClaim(claim); err != nil {
@ -1420,9 +1435,16 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
}
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
// TODO: modify the Provision() interface to pass in the allowed topology information
// of the provisioned volume.
volume, err = provisioner.Provision()
opComplete(&err)
if err != nil {
// Other places of failure has nothing to do with DynamicProvisioningScheduling,
// so just let controller retry in the next sync. We'll only call func
// rescheduleProvisioning here when the underlying provisioning actually failed.
ctrl.rescheduleProvisioning(claim)
strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
@ -1513,6 +1535,29 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis
}
}
// rescheduleProvisioning signal back to the scheduler to retry dynamic provisioning
// by removing the annSelectedNode annotation
func (ctrl *PersistentVolumeController) rescheduleProvisioning(claim *v1.PersistentVolumeClaim) {
if _, ok := claim.Annotations[annSelectedNode]; !ok {
// Provisioning not triggered by the scheduler, skip
return
}
// The claim from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
newClaim := claim.DeepCopy()
delete(newClaim.Annotations, annSelectedNode)
// Try to update the PVC object
if _, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(newClaim.Namespace).Update(newClaim); err != nil {
glog.V(4).Infof("Failed to delete annotation 'annSelectedNode' for PersistentVolumeClaim %q: %v", claimToClaimKey(newClaim), err)
return
}
if _, err := ctrl.storeClaimUpdate(newClaim); err != nil {
// We will get an "claim updated" event soon, this is not a big error
glog.V(4).Infof("Updating PersistentVolumeClaim %q: cannot update internal cache: %v", claimToClaimKey(newClaim), err)
}
}
// getProvisionedVolumeNameForClaim returns PV.Name for the provisioned volume.
// The name must be unique.
func (ctrl *PersistentVolumeController) getProvisionedVolumeNameForClaim(claim *v1.PersistentVolumeClaim) string {

View File

@ -312,8 +312,8 @@ func TestDelayBinding(t *testing.T) {
}
}
// When feature gate is disabled, should always be delayed
name := "feature-disabled"
// When volumeScheduling feature gate is disabled, should always be delayed
name := "volumeScheduling-feature-disabled"
shouldDelay, err := ctrl.shouldDelayBinding(makePVCClass(&classWaitMode))
if err != nil {
t.Errorf("Test %q returned error: %v", name, err)
@ -322,7 +322,7 @@ func TestDelayBinding(t *testing.T) {
t.Errorf("Test %q returned true, expected false", name)
}
// Enable feature gate
// Enable volumeScheduling feature gate
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
@ -338,4 +338,43 @@ func TestDelayBinding(t *testing.T) {
t.Errorf("Test %q returned unexpected %v", name, test.shouldDelay)
}
}
// When dynamicProvisioningScheduling feature gate is disabled, should be delayed,
// even if the pvc has selectedNode annotation.
provisionedClaim := makePVCClass(&classWaitMode)
provisionedClaim.Annotations = map[string]string{annSelectedNode: "node-name"}
name = "dynamicProvisioningScheduling-feature-disabled"
shouldDelay, err = ctrl.shouldDelayBinding(provisionedClaim)
if err != nil {
t.Errorf("Test %q returned error: %v", name, err)
}
if !shouldDelay {
t.Errorf("Test %q returned false, expected true", name)
}
// Enable DynamicProvisioningScheduling feature gate
utilfeature.DefaultFeatureGate.Set("DynamicProvisioningScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("DynamicProvisioningScheduling=false")
// When the pvc does not have selectedNode annotation, should be delayed,
// even if dynamicProvisioningScheduling feature gate is enabled.
name = "dynamicProvisioningScheduling-feature-enabled, selectedNode-annotation-not-set"
shouldDelay, err = ctrl.shouldDelayBinding(makePVCClass(&classWaitMode))
if err != nil {
t.Errorf("Test %q returned error: %v", name, err)
}
if !shouldDelay {
t.Errorf("Test %q returned false, expected true", name)
}
// Should not be delayed when dynamicProvisioningScheduling feature gate is enabled,
// and the pvc has selectedNode annotation.
name = "dynamicProvisioningScheduling-feature-enabled, selectedNode-annotation-set"
shouldDelay, err = ctrl.shouldDelayBinding(provisionedClaim)
if err != nil {
t.Errorf("Test %q returned error: %v", name, err)
}
if shouldDelay {
t.Errorf("Test %q returned true, expected false", name)
}
}

View File

@ -371,3 +371,34 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume
}
return pvs
}
// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
type PVCAssumeCache interface {
AssumeCache
// GetPVC returns the PVC from the cache with the same
// namespace and the same name of the specified pod.
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
}
type pvcAssumeCache struct {
*assumeCache
}
func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
return &pvcAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)}
}
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.Get(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
}
return pvc, nil
}

View File

@ -36,6 +36,33 @@ func makePV(name, version, storageClass string) *v1.PersistentVolume {
}
}
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
}
for _, pv := range pvList {
expectedPV, ok := expectedPVs[pv.Name]
if !ok {
t.Errorf("ListPVs() returned unexpected PV %q", pv.Name)
}
if expectedPV != pv {
t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV)
}
}
}
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
pv, err := cache.GetPV(name)
if err != nil {
return err
}
if pv != expectedPV {
return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV)
}
return nil
}
func TestAssumePV(t *testing.T) {
scenarios := map[string]struct {
oldPV *v1.PersistentVolume
@ -276,29 +303,170 @@ func TestAssumeUpdatePVCache(t *testing.T) {
}
}
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
func makeClaim(name, version, namespace string) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: version,
Annotations: map[string]string{},
},
}
for _, pv := range pvList {
expectedPV, ok := expectedPVs[pv.Name]
}
func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVolumeClaim) error {
pvc, err := cache.GetPVC(pvcKey)
if err != nil {
return err
}
if pvc != expectedPVC {
return fmt.Errorf("GetPVC() returned %p, expected %p", pvc, expectedPVC)
}
return nil
}
func TestAssumePVC(t *testing.T) {
scenarios := map[string]struct {
oldPVC *v1.PersistentVolumeClaim
newPVC *v1.PersistentVolumeClaim
shouldSucceed bool
}{
"success-same-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: true,
},
"success-new-higher-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "6", "ns1"),
shouldSucceed: true,
},
"fail-old-not-found": {
oldPVC: makeClaim("pvc2", "5", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: false,
},
"fail-new-lower-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "4", "ns1"),
shouldSucceed: false,
},
"fail-new-bad-version": {
oldPVC: makeClaim("pvc1", "5", "ns1"),
newPVC: makeClaim("pvc1", "a", "ns1"),
shouldSucceed: false,
},
"fail-old-bad-version": {
oldPVC: makeClaim("pvc1", "a", "ns1"),
newPVC: makeClaim("pvc1", "5", "ns1"),
shouldSucceed: false,
},
}
for name, scenario := range scenarios {
cache := NewPVCAssumeCache(nil)
internal_cache, ok := cache.(*pvcAssumeCache)
if !ok {
t.Errorf("ListPVs() returned unexpected PV %q", pv.Name)
t.Fatalf("Failed to get internal cache")
}
if expectedPV != pv {
t.Errorf("ListPVs() returned PV %p, expected %p", pv, expectedPV)
// Add oldPVC to cache
internal_cache.add(scenario.oldPVC)
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), scenario.oldPVC); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err)
continue
}
// Assume newPVC
err := cache.Assume(scenario.newPVC)
if scenario.shouldSucceed && err != nil {
t.Errorf("Test %q failed: Assume() returned error %v", name, err)
}
if !scenario.shouldSucceed && err == nil {
t.Errorf("Test %q failed: Assume() returned success but expected error", name)
}
// Check that GetPVC returns correct PVC
expectedPV := scenario.newPVC
if !scenario.shouldSucceed {
expectedPV = scenario.oldPVC
}
if err := verifyPVC(cache, getPVCName(scenario.oldPVC), expectedPV); err != nil {
t.Errorf("Failed to GetPVC() after initial update: %v", err)
}
}
}
func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) error {
pv, err := cache.GetPV(name)
if err != nil {
return err
func TestRestorePVC(t *testing.T) {
cache := NewPVCAssumeCache(nil)
internal_cache, ok := cache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
if pv != expectedPV {
return fmt.Errorf("GetPV() returned %p, expected %p", pv, expectedPV)
oldPVC := makeClaim("pvc1", "5", "ns1")
newPVC := makeClaim("pvc1", "5", "ns1")
// Restore PVC that doesn't exist
cache.Restore("nothing")
// Add oldPVC to cache
internal_cache.add(oldPVC)
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after initial update: %v", err)
}
// Restore PVC
cache.Restore(getPVCName(oldPVC))
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after iniital restore: %v", err)
}
// Assume newPVC
if err := cache.Assume(newPVC); err != nil {
t.Fatalf("Assume() returned error %v", err)
}
if err := verifyPVC(cache, getPVCName(oldPVC), newPVC); err != nil {
t.Fatalf("Failed to GetPVC() after Assume: %v", err)
}
// Restore PVC
cache.Restore(getPVCName(oldPVC))
if err := verifyPVC(cache, getPVCName(oldPVC), oldPVC); err != nil {
t.Fatalf("Failed to GetPVC() after restore: %v", err)
}
}
func TestAssumeUpdatePVCCache(t *testing.T) {
cache := NewPVCAssumeCache(nil)
internal_cache, ok := cache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}
pvcName := "test-pvc0"
pvcNamespace := "test-ns"
// Add a PVC
pvc := makeClaim(pvcName, "1", pvcNamespace)
internal_cache.add(pvc)
if err := verifyPVC(cache, getPVCName(pvc), pvc); err != nil {
t.Fatalf("failed to get PVC: %v", err)
}
// Assume PVC
newPVC := pvc.DeepCopy()
newPVC.Annotations["volume.alpha.kubernetes.io/selected-node"] = "test-node"
if err := cache.Assume(newPVC); err != nil {
t.Fatalf("failed to assume PVC: %v", err)
}
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after assume: %v", err)
}
// Add old PVC
internal_cache.add(pvc)
if err := verifyPVC(cache, getPVCName(pvc), newPVC); err != nil {
t.Fatalf("failed to get PVC after old PVC added: %v", err)
}
return nil
}

View File

@ -24,10 +24,12 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -58,24 +60,30 @@ type SchedulerVolumeBinder interface {
// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
// Otherwise, it tries to find an available PV to bind to the PVC.
//
// It returns true if there are matching PVs that can satisfy all of the Pod's PVCs, and returns true
// if bound volumes satisfy the PV NodeAffinity.
// It returns true if all of the Pod's PVCs have matching PVs or can be dynamic provisioned,
// and returns true if bound volumes satisfy the PV NodeAffinity.
//
// This function is called by the volume binding scheduler predicate and can be called in parallel
FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)
// AssumePodVolumes will take the PV matches for unbound PVCs and update the PV cache assuming
// AssumePodVolumes will:
// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
// that the PV is prebound to the PVC.
// 2. Take the PVCs that need provisioning and update the PVC cache with related
// annotations set.
//
// It returns true if all volumes are fully bound, and returns true if any volume binding API operation needs
// to be done afterwards.
// It returns true if all volumes are fully bound, and returns true if any volume binding/provisioning
// API operation needs to be done afterwards.
//
// This function will modify assumedPod with the node name.
// This function is called serially.
AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error)
// BindPodVolumes will initiate the volume binding by making the API call to prebind the PV
// BindPodVolumes will:
// 1. Initiate the volume binding by making the API call to prebind the PV
// to its matching PVC.
// 2. Trigger the volume provisioning by making the API call to set related
// annotations on the PVC
//
// This function can be called in parallel.
BindPodVolumes(assumedPod *v1.Pod) error
@ -87,8 +95,7 @@ type SchedulerVolumeBinder interface {
type volumeBinder struct {
ctrl *PersistentVolumeController
// TODO: Need AssumeCache for PVC for dynamic provisioning
pvcCache corelisters.PersistentVolumeClaimLister
pvcCache PVCAssumeCache
pvCache PVAssumeCache
// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
@ -111,7 +118,7 @@ func NewVolumeBinder(
b := &volumeBinder{
ctrl: ctrl,
pvcCache: pvcInformer.Lister(),
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
pvCache: NewPVAssumeCache(pvInformer.Informer()),
podBindingCache: NewPodBindingCache(),
}
@ -123,7 +130,7 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
return b.podBindingCache
}
// FindPodVolumes caches the matching PVs per node in podBindingCache
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
podName := getPodName(pod)
@ -135,8 +142,8 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
boundVolumesSatisfied = true
// The pod's volumes need to be processed in one call to avoid the race condition where
// volumes can get bound in between calls.
boundClaims, unboundClaims, unboundClaimsImmediate, err := b.getPodVolumes(pod)
// volumes can get bound/provisioned in between calls.
boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
if err != nil {
return false, false, err
}
@ -154,20 +161,32 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
}
}
// Find PVs for unbound volumes
if len(unboundClaims) > 0 {
unboundVolumesSatisfied, err = b.findMatchingVolumes(pod, unboundClaims, node)
if len(claimsToBind) > 0 {
var claimsToProvision []*v1.PersistentVolumeClaim
unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node)
if err != nil {
return false, false, err
}
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
// Try to provision for unbound volumes
if !unboundVolumesSatisfied {
unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
if err != nil {
return false, false, err
}
}
}
}
return unboundVolumesSatisfied, boundVolumesSatisfied, nil
}
// AssumePodVolumes will take the cached matching PVs in podBindingCache for the chosen node
// and update the pvCache with the new prebound PV. It will update podBindingCache again
// with the PVs that need an API update.
// AssumePodVolumes will take the cached matching PVs and PVCs to provision
// in podBindingCache for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
// 2. Update the pvcCache with the new PVCs with annotations set
// It will update podBindingCache again with the PVs and PVCs that need an API update.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) {
podName := getPodName(assumedPod)
@ -179,6 +198,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
}
assumedPod.Spec.NodeName = nodeName
// Assume PV
claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
newBindings := []*bindingInfo{}
@ -206,23 +226,48 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
}
}
if len(newBindings) == 0 {
// Don't update cached bindings if no API updates are needed. This can happen if we
// previously updated the PV object and are waiting for the PV controller to finish binding.
glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: PVs already assumed", podName, nodeName)
return false, false, nil
// Don't update cached bindings if no API updates are needed. This can happen if we
// previously updated the PV object and are waiting for the PV controller to finish binding.
if len(newBindings) != 0 {
bindingRequired = true
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
}
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
return false, true, nil
// Assume PVCs
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName)
newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
for _, claim := range claimsToProvision {
// The claims from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName)
err = b.pvcCache.Assume(claimClone)
if err != nil {
b.revertAssumedPVs(newBindings)
b.revertAssumedPVCs(newProvisionedPVCs)
return
}
newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
}
if len(newProvisionedPVCs) != 0 {
bindingRequired = true
b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs)
}
return
}
// BindPodVolumes gets the cached bindings in podBindingCache and makes the API update for those PVs.
// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache
// and makes the API update for those PVs/PVCs.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
podName := getPodName(assumedPod)
glog.V(4).Infof("BindPodVolumes for pod %q", podName)
bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)
// Do the actual prebinding. Let the PV controller take care of the rest
// There is no API rollback if the actual binding fails
@ -232,6 +277,20 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
if err != nil {
// only revert assumed cached updates for volumes we haven't successfully bound
b.revertAssumedPVs(bindings[i:])
// Revert all of the assumed cached updates for claims,
// since no actual API update will be done
b.revertAssumedPVCs(claimsToProvision)
return err
}
}
// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
// PV controller is expect to signal back by removing related annotations if actual provisioning fails
for i, claim := range claimsToProvision {
if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s] failed: %v", getPVCName(claim), err)
// only revert assumed cached updates for claims we haven't successfully updated
b.revertAssumedPVCs(claimsToProvision[i:])
return err
}
}
@ -253,7 +312,13 @@ func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFull
}
pvcName := vol.PersistentVolumeClaim.ClaimName
pvc, err := b.pvcCache.PersistentVolumeClaims(namespace).Get(pvcName)
claim := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
},
}
pvc, err := b.pvcCache.GetPVC(getPVCName(claim))
if err != nil || pvc == nil {
return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcName, err)
}
@ -342,14 +407,18 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node
return true, nil
}
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, err error) {
// findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) {
podName := getPodName(pod)
// Sort all the claims by increasing size request to get the smallest fits
sort.Sort(byPVCSize(claimsToBind))
chosenPVs := map[string]*v1.PersistentVolume{}
foundMatches = true
matchedClaims := []*bindingInfo{}
for _, bindingInfo := range claimsToBind {
// Get storage class name from each PVC
storageClassName := ""
@ -362,21 +431,68 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
// Find a matching PV
bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
if err != nil {
return false, err
return false, nil, err
}
if bindingInfo.pv == nil {
glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name)
return false, nil
unboundClaims = append(unboundClaims, bindingInfo.pvc)
foundMatches = false
continue
}
// matching PV needs to be excluded so we don't select it again
chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
matchedClaims = append(matchedClaims, bindingInfo)
glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName)
}
// Mark cache with all the matches for each PVC for this node
b.podBindingCache.UpdateBindings(pod, node.Name, claimsToBind)
glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name)
if len(matchedClaims) > 0 {
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
}
if foundMatches {
glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name)
}
return
}
// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) {
podName := getPodName(pod)
provisionedClaims := []*v1.PersistentVolumeClaim{}
for _, claim := range claimsToProvision {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, fmt.Errorf("no class for claim %q", getPVCName(claim))
}
class, err := b.ctrl.classLister.Get(className)
if err != nil {
return false, fmt.Errorf("failed to find storage class %q", className)
}
provisioner := class.Provisioner
if provisioner == "" || provisioner == notSupportedProvisioner {
glog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, getPVCName(claim))
return false, nil
}
// TODO: Check if the node can satisfy the topology requirement in the class
// TODO: Check if capacity of the node domain in the storage class
// can satisfy resource requirement of given claim
provisionedClaims = append(provisionedClaims, claim)
}
glog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name)
// Mark cache with all the PVCs that need provisioning for this node
b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)
return true, nil
}
@ -387,6 +503,12 @@ func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) {
}
}
func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
for _, claim := range claims {
b.pvcCache.Restore(getPVCName(claim))
}
}
type bindingInfo struct {
// Claim that needs to be bound
pvc *v1.PersistentVolumeClaim

View File

@ -30,27 +30,41 @@ type PodBindingCache interface {
// pod and node.
UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo)
// DeleteBindings will remove all cached bindings for the given pod.
DeleteBindings(pod *v1.Pod)
// GetBindings will return the cached bindings for the given pod and node.
GetBindings(pod *v1.Pod, node string) []*bindingInfo
// UpdateProvisionedPVCs will update the cache with the given provisioning decisions
// for the pod and node.
UpdateProvisionedPVCs(pod *v1.Pod, node string, provisionings []*v1.PersistentVolumeClaim)
// GetProvisionedPVCs will return the cached provisioning decisions for the given pod and node.
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
// DeleteBindings will remove all cached bindings and provisionings for the given pod.
// TODO: separate the func if it is needed to delete bindings/provisionings individually
DeleteBindings(pod *v1.Pod)
}
type podBindingCache struct {
mutex sync.Mutex
// Key = pod name
// Value = nodeBindings
bindings map[string]nodeBindings
// Value = nodeDecisions
bindingDecisions map[string]nodeDecisions
}
// Key = nodeName
// Value = array of bindingInfo
type nodeBindings map[string][]*bindingInfo
// Value = bindings & provisioned PVCs of the node
type nodeDecisions map[string]nodeDecision
// A decision includes bindingInfo and provisioned PVCs of the node
type nodeDecision struct {
bindings []*bindingInfo
provisionings []*v1.PersistentVolumeClaim
}
func NewPodBindingCache() PodBindingCache {
return &podBindingCache{bindings: map[string]nodeBindings{}}
return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}}
}
func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
@ -58,7 +72,7 @@ func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
defer c.mutex.Unlock()
podName := getPodName(pod)
delete(c.bindings, podName)
delete(c.bindingDecisions, podName)
}
func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) {
@ -66,12 +80,20 @@ func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*b
defer c.mutex.Unlock()
podName := getPodName(pod)
nodeBinding, ok := c.bindings[podName]
decisions, ok := c.bindingDecisions[podName]
if !ok {
nodeBinding = nodeBindings{}
c.bindings[podName] = nodeBinding
decisions = nodeDecisions{}
c.bindingDecisions[podName] = decisions
}
nodeBinding[node] = bindings
decision, ok := decisions[node]
if !ok {
decision = nodeDecision{
bindings: bindings,
}
} else {
decision.bindings = bindings
}
decisions[node] = decision
}
func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
@ -79,9 +101,50 @@ func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
defer c.mutex.Unlock()
podName := getPodName(pod)
nodeBindings, ok := c.bindings[podName]
decisions, ok := c.bindingDecisions[podName]
if !ok {
return nil
}
return nodeBindings[node]
decision, ok := decisions[node]
if !ok {
return nil
}
return decision.bindings
}
func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs []*v1.PersistentVolumeClaim) {
c.mutex.Lock()
defer c.mutex.Unlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
if !ok {
decisions = nodeDecisions{}
c.bindingDecisions[podName] = decisions
}
decision, ok := decisions[node]
if !ok {
decision = nodeDecision{
provisionings: pvcs,
}
} else {
decision.provisionings = pvcs
}
decisions[node] = decision
}
func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim {
c.mutex.Lock()
defer c.mutex.Unlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
if !ok {
return nil
}
decision, ok := decisions[node]
if !ok {
return nil
}
return decision.provisionings
}

View File

@ -26,32 +26,37 @@ import (
func TestUpdateGetBindings(t *testing.T) {
scenarios := map[string]struct {
updateBindings []*bindingInfo
updatePod string
updateNode string
updateBindings []*bindingInfo
updateProvisionings []*v1.PersistentVolumeClaim
updatePod string
updateNode string
getBindings []*bindingInfo
getPod string
getNode string
getBindings []*bindingInfo
getProvisionings []*v1.PersistentVolumeClaim
getPod string
getNode string
}{
"no-pod": {
getPod: "pod1",
getNode: "node1",
},
"no-node": {
updatePod: "pod1",
updateNode: "node1",
updateBindings: []*bindingInfo{},
getPod: "pod1",
getNode: "node2",
updatePod: "pod1",
updateNode: "node1",
updateBindings: []*bindingInfo{},
updateProvisionings: []*v1.PersistentVolumeClaim{},
getPod: "pod1",
getNode: "node2",
},
"binding-exists": {
updatePod: "pod1",
updateNode: "node1",
updateBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}},
getPod: "pod1",
getNode: "node1",
getBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}},
updatePod: "pod1",
updateNode: "node1",
updateBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}},
updateProvisionings: []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}},
getPod: "pod1",
getNode: "node1",
getBindings: []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}},
getProvisionings: []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}},
},
}
@ -61,6 +66,7 @@ func TestUpdateGetBindings(t *testing.T) {
// Perform updates
updatePod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.updatePod, Namespace: "ns"}}
cache.UpdateBindings(updatePod, scenario.updateNode, scenario.updateBindings)
cache.UpdateProvisionedPVCs(updatePod, scenario.updateNode, scenario.updateProvisionings)
// Verify updated bindings
bindings := cache.GetBindings(updatePod, scenario.updateNode)
@ -68,45 +74,71 @@ func TestUpdateGetBindings(t *testing.T) {
t.Errorf("Test %v failed: returned bindings after update different. Got %+v, expected %+v", name, bindings, scenario.updateBindings)
}
// Verify updated provisionings
provisionings := cache.GetProvisionedPVCs(updatePod, scenario.updateNode)
if !reflect.DeepEqual(provisionings, scenario.updateProvisionings) {
t.Errorf("Test %v failed: returned provisionings after update different. Got %+v, expected %+v", name, provisionings, scenario.updateProvisionings)
}
// Get bindings
getPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: scenario.getPod, Namespace: "ns"}}
bindings = cache.GetBindings(getPod, scenario.getNode)
if !reflect.DeepEqual(bindings, scenario.getBindings) {
t.Errorf("Test %v failed: unexpected bindings returned. Got %+v, expected %+v", name, bindings, scenario.updateBindings)
}
// Get provisionings
provisionings = cache.GetProvisionedPVCs(getPod, scenario.getNode)
if !reflect.DeepEqual(provisionings, scenario.getProvisionings) {
t.Errorf("Test %v failed: unexpected bindings returned. Got %+v, expected %+v", name, provisionings, scenario.getProvisionings)
}
}
}
func TestDeleteBindings(t *testing.T) {
initialBindings := []*bindingInfo{{pvc: &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "pvc1"}}}}
initialProvisionings := []*v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "pvc2"}}}
cache := NewPodBindingCache()
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "ns"}}
// Get nil bindings
// Get nil bindings and provisionings
bindings := cache.GetBindings(pod, "node1")
if bindings != nil {
t.Errorf("Test failed: expected initial nil bindings, got %+v", bindings)
}
provisionings := cache.GetProvisionedPVCs(pod, "node1")
if provisionings != nil {
t.Errorf("Test failed: expected initial nil provisionings, got %+v", provisionings)
}
// Delete nothing
cache.DeleteBindings(pod)
// Perform updates
cache.UpdateBindings(pod, "node1", initialBindings)
cache.UpdateProvisionedPVCs(pod, "node1", initialProvisionings)
// Get bindings
// Get bindings and provisionings
bindings = cache.GetBindings(pod, "node1")
if !reflect.DeepEqual(bindings, initialBindings) {
t.Errorf("Test failed: expected bindings %+v, got %+v", initialBindings, bindings)
}
provisionings = cache.GetProvisionedPVCs(pod, "node1")
if !reflect.DeepEqual(provisionings, initialProvisionings) {
t.Errorf("Test failed: expected provisionings %+v, got %+v", initialProvisionings, provisionings)
}
// Delete
cache.DeleteBindings(pod)
// Get bindings
// Get bindings and provisionings
bindings = cache.GetBindings(pod, "node1")
if bindings != nil {
t.Errorf("Test failed: expected nil bindings, got %+v", bindings)
}
provisionings = cache.GetProvisionedPVCs(pod, "node1")
if provisionings != nil {
t.Errorf("Test failed: expected nil provisionings, got %+v", provisionings)
}
}

View File

@ -33,20 +33,23 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
)
var (
unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", &waitClass)
unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", &waitClass)
preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", &waitClass)
boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", &waitClass)
boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", &waitClass)
badPVC = makeBadPVC()
immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", &immediateClass)
immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", &immediateClass)
unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", "1", &waitClass)
unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", "1", &waitClass)
preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", "1", &waitClass)
boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", "1", &waitClass)
boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", "1", &waitClass)
badPVC = makeBadPVC()
immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", "1", &immediateClass)
immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", "1", &immediateClass)
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", pvcUnbound, "", "1", &waitClass)
provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "1", &waitClass)
provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "2", &waitClass)
noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", pvcUnbound, "", "1", &provisionNotSupportClass)
pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass)
pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass)
@ -68,10 +71,12 @@ var (
binding1aBound = makeBinding(unboundPVC, pvNode1aBound)
binding1bBound = makeBinding(unboundPVC2, pvNode1bBound)
waitClass = "waitClass"
immediateClass = "immediateClass"
waitClass = "waitClass"
immediateClass = "immediateClass"
provisionNotSupportClass = "provisionNotSupportedClass"
nodeLabelKey = "nodeKey"
nodeLabelKey = "nodeKey"
nodeLabelValue = "node1"
)
type testEnv struct {
@ -80,7 +85,7 @@ type testEnv struct {
binder SchedulerVolumeBinder
internalBinder *volumeBinder
internalPVCache *pvAssumeCache
internalPVCCache cache.Indexer
internalPVCCache *pvcAssumeCache
}
func newTestBinder(t *testing.T) *testEnv {
@ -106,6 +111,7 @@ func newTestBinder(t *testing.T) *testEnv {
Name: waitClass,
},
VolumeBindingMode: &waitMode,
Provisioner: "test-provisioner",
},
{
ObjectMeta: metav1.ObjectMeta{
@ -113,6 +119,13 @@ func newTestBinder(t *testing.T) *testEnv {
},
VolumeBindingMode: &immediateMode,
},
{
ObjectMeta: metav1.ObjectMeta{
Name: provisionNotSupportClass,
},
VolumeBindingMode: &waitMode,
Provisioner: "kubernetes.io/no-provisioner",
},
}
for _, class := range classes {
if err := classInformer.Informer().GetIndexer().Add(class); err != nil {
@ -132,22 +145,31 @@ func newTestBinder(t *testing.T) *testEnv {
t.Fatalf("Failed to convert to internal PV cache")
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
return &testEnv{
client: client,
reactor: reactor,
binder: binder,
internalBinder: internalBinder,
internalPVCache: internalPVCache,
internalPVCCache: pvcInformer.Informer().GetIndexer(),
internalPVCCache: internalPVCCache,
}
}
func (env *testEnv) initClaims(t *testing.T, pvcs []*v1.PersistentVolumeClaim) {
for _, pvc := range pvcs {
err := env.internalPVCCache.Add(pvc)
if err != nil {
t.Fatalf("Failed to add PVC %q to internal cache: %v", pvc.Name, err)
func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) {
internalPVCCache := env.internalPVCCache
for _, pvc := range cachedPVCs {
internalPVCCache.add(pvc)
if apiPVCs == nil {
env.reactor.claims[pvc.Name] = pvc
}
}
for _, pvc := range apiPVCs {
env.reactor.claims[pvc.Name] = pvc
}
}
@ -166,7 +188,7 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P
}
func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo) {
func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
pvCache := env.internalBinder.pvCache
for _, binding := range bindings {
if err := pvCache.Assume(binding.pv); err != nil {
@ -175,20 +197,38 @@ func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod,
}
env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings)
pvcCache := env.internalBinder.pvcCache
for _, pvc := range provisionings {
if err := pvcCache.Assume(pvc); err != nil {
t.Fatalf("Failed to setup test %q: error: %v", name, err)
}
}
env.internalBinder.podBindingCache.UpdateProvisionedPVCs(pod, node, provisionings)
}
func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo) {
func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
cache := env.internalBinder.podBindingCache
cache.UpdateBindings(pod, node, bindings)
cache.UpdateProvisionedPVCs(pod, node, provisionings)
}
func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo) {
func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) {
cache := env.internalBinder.podBindingCache
bindings := cache.GetBindings(pod, node)
if !reflect.DeepEqual(expectedBindings, bindings) {
t.Errorf("Test %q failed: Expected bindings %+v, got %+v", name, expectedBindings, bindings)
}
provisionedClaims := cache.GetProvisionedPVCs(pod, node)
if !reflect.DeepEqual(expectedProvisionings, provisionedClaims) {
t.Errorf("Test %q failed: Expected provisionings %+v, got %+v", name, expectedProvisionings, provisionedClaims)
}
}
func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) []*bindingInfo {
@ -196,7 +236,7 @@ func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod)
return cache.GetBindings(pod, node)
}
func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) {
func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
// TODO: Check binding cache
// Check pv cache
@ -218,9 +258,23 @@ func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindi
t.Errorf("Test %q failed: expected PV.ClaimRef.Namespace %q, got %q", name, b.pvc.Namespace, pv.Spec.ClaimRef.Namespace)
}
}
// Check pvc cache
pvcCache := env.internalBinder.pvcCache
for _, p := range provisionings {
pvcKey := getPVCName(p)
pvc, err := pvcCache.GetPVC(pvcKey)
if err != nil {
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err)
continue
}
if pvc.Annotations[annSelectedNode] != nodeLabelValue {
t.Errorf("Test %q failed: expected annSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[annSelectedNode])
}
}
}
func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) {
func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
// All PVs have been unmodified in cache
pvCache := env.internalBinder.pvCache
for _, b := range bindings {
@ -230,6 +284,20 @@ func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod,
t.Errorf("Test %q failed: PV %q was modified in cache", name, b.pv.Name)
}
}
// Check pvc cache
pvcCache := env.internalBinder.pvcCache
for _, p := range provisionings {
pvcKey := getPVCName(p)
pvc, err := pvcCache.GetPVC(pvcKey)
if err != nil {
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err)
continue
}
if pvc.Annotations[annSelectedNode] != "" {
t.Errorf("Test %q failed: expected annSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[annSelectedNode])
}
}
}
func (env *testEnv) validateBind(
@ -257,20 +325,46 @@ func (env *testEnv) validateBind(
}
}
func (env *testEnv) validateProvision(
t *testing.T,
name string,
pod *v1.Pod,
expectedPVCs []*v1.PersistentVolumeClaim,
expectedAPIPVCs []*v1.PersistentVolumeClaim) {
// Check pvc cache
pvcCache := env.internalBinder.pvcCache
for _, pvc := range expectedPVCs {
cachedPVC, err := pvcCache.GetPVC(getPVCName(pvc))
if err != nil {
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, getPVCName(pvc), err)
}
if !reflect.DeepEqual(cachedPVC, pvc) {
t.Errorf("Test %q failed: cached PVC check failed [A-expected, B-got]:\n%s", name, diff.ObjectDiff(pvc, cachedPVC))
}
}
// Check reactor for API updates
if err := env.reactor.checkClaims(expectedAPIPVCs); err != nil {
t.Errorf("Test %q failed: API reactor validation failed: %v", name, err)
}
}
const (
pvcUnbound = iota
pvcPrebound
pvcBound
)
func makeTestPVC(name, size string, pvcBoundState int, pvName string, className *string) *v1.PersistentVolumeClaim {
func makeTestPVC(name, size string, pvcBoundState int, pvName, resourceVersion string, className *string) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "testns",
UID: types.UID("pvc-uid"),
ResourceVersion: "1",
ResourceVersion: resourceVersion,
SelfLink: testapi.Default.SelfLink("pvc", name),
Annotations: map[string]string{},
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
@ -389,7 +483,15 @@ func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindin
return &bindingInfo{pvc: pvc, pv: pv}
}
func TestFindPodVolumes(t *testing.T) {
func addProvisionAnn(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
res := pvc.DeepCopy()
// Add provision related annotations
res.Annotations[annSelectedNode] = nodeLabelValue
return res
}
func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
scenarios := map[string]struct {
// Inputs
pvs []*v1.PersistentVolume
@ -470,10 +572,11 @@ func TestFindPodVolumes(t *testing.T) {
expectedBound: true,
},
"two-unbound-pvcs,partial-match": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedUnbound: false,
expectedBound: true,
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*bindingInfo{binding1a},
expectedUnbound: false,
expectedBound: true,
},
"one-bound,one-unbound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC},
@ -552,7 +655,7 @@ func TestFindPodVolumes(t *testing.T) {
if scenario.cachePVCs == nil {
scenario.cachePVCs = scenario.podPVCs
}
testEnv.initClaims(t, scenario.cachePVCs)
testEnv.initClaims(scenario.cachePVCs, scenario.cachePVCs)
// b. Generate pod with given claims
if scenario.pod == nil {
@ -575,16 +678,126 @@ func TestFindPodVolumes(t *testing.T) {
if unboundSatisfied != scenario.expectedUnbound {
t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied)
}
testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings)
testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings, nil)
}
}
func TestFindPodVolumesWithProvisioning(t *testing.T) {
scenarios := map[string]struct {
// Inputs
pvs []*v1.PersistentVolume
podPVCs []*v1.PersistentVolumeClaim
// If nil, use pod PVCs
cachePVCs []*v1.PersistentVolumeClaim
// If nil, makePod with podPVCs
pod *v1.Pod
// Expected podBindingCache fields
expectedBindings []*bindingInfo
expectedProvisions []*v1.PersistentVolumeClaim
// Expected return values
expectedUnbound bool
expectedBound bool
shouldFail bool
}{
"one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
},
"two-unbound-pvcs,one-matched,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*bindingInfo{binding1a},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
},
"one-bound,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
},
"immediate-unbound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC},
expectedUnbound: false,
expectedBound: false,
shouldFail: true,
},
"one-immediate-bound,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvBoundImmediate},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
},
"invalid-provisioner": {
podPVCs: []*v1.PersistentVolumeClaim{noProvisionerPVC},
expectedUnbound: false,
expectedBound: true,
},
}
// Set VolumeScheduling and DynamicProvisioningScheduling feature gate
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true,DynamicProvisioningScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false,DynamicProvisioningScheduling=false")
testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: map[string]string{
nodeLabelKey: "node1",
},
},
}
for name, scenario := range scenarios {
// Setup
testEnv := newTestBinder(t)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
// a. Init pvc cache
if scenario.cachePVCs == nil {
scenario.cachePVCs = scenario.podPVCs
}
testEnv.initClaims(scenario.cachePVCs, scenario.cachePVCs)
// b. Generate pod with given claims
if scenario.pod == nil {
scenario.pod = makePod(scenario.podPVCs)
}
// Execute
unboundSatisfied, boundSatisfied, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
t.Errorf("Test %q failed: returned error: %v", name, err)
}
if scenario.shouldFail && err == nil {
t.Errorf("Test %q failed: returned success but expected error", name)
}
if boundSatisfied != scenario.expectedBound {
t.Errorf("Test %q failed: expected boundSatsified %v, got %v", name, scenario.expectedBound, boundSatisfied)
}
if unboundSatisfied != scenario.expectedUnbound {
t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied)
}
testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings, scenario.expectedProvisions)
}
}
func TestAssumePodVolumes(t *testing.T) {
scenarios := map[string]struct {
// Inputs
podPVCs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
bindings []*bindingInfo
podPVCs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
bindings []*bindingInfo
provisionedPVCs []*v1.PersistentVolumeClaim
// Expected return values
shouldFail bool
@ -636,6 +849,21 @@ func TestAssumePodVolumes(t *testing.T) {
shouldFail: true,
expectedBindingRequired: true,
},
"one-binding, one-pvc-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
bindings: []*bindingInfo{binding1a},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedBindingRequired: true,
},
"one-binding, one-provision-tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion},
bindings: []*bindingInfo{binding1a},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2},
shouldFail: true,
expectedBindingRequired: true,
},
}
for name, scenario := range scenarios {
@ -643,9 +871,9 @@ func TestAssumePodVolumes(t *testing.T) {
// Setup
testEnv := newTestBinder(t)
testEnv.initClaims(t, scenario.podPVCs)
testEnv.initClaims(scenario.podPVCs, scenario.podPVCs)
pod := makePod(scenario.podPVCs)
testEnv.initPodCache(pod, "node1", scenario.bindings)
testEnv.initPodCache(pod, "node1", scenario.bindings, scenario.provisionedPVCs)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
// Execute
@ -668,9 +896,9 @@ func TestAssumePodVolumes(t *testing.T) {
scenario.expectedBindings = scenario.bindings
}
if scenario.shouldFail {
testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings)
testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs)
} else {
testEnv.validateAssume(t, name, pod, scenario.expectedBindings)
testEnv.validateAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs)
}
}
}
@ -683,11 +911,20 @@ func TestBindPodVolumes(t *testing.T) {
// if nil, use cachedPVs
apiPVs []*v1.PersistentVolume
provisionedPVCs []*v1.PersistentVolumeClaim
cachedPVCs []*v1.PersistentVolumeClaim
// if nil, use cachedPVCs
apiPVCs []*v1.PersistentVolumeClaim
// Expected return values
shouldFail bool
expectedPVs []*v1.PersistentVolume
// if nil, use expectedPVs
expectedAPIPVs []*v1.PersistentVolume
expectedPVCs []*v1.PersistentVolumeClaim
// if nil, use expectedPVCs
expectedAPIPVCs []*v1.PersistentVolumeClaim
}{
"all-bound": {},
"not-fully-bound": {
@ -711,6 +948,30 @@ func TestBindPodVolumes(t *testing.T) {
expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion},
shouldFail: true,
},
"one-provisioned-pvc": {
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
},
"provision-api-update-failed": {
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVC2},
apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVCHigherVersion},
expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVC2},
expectedAPIPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVCHigherVersion},
shouldFail: true,
},
"bingding-succeed, provision-api-update-failed": {
bindings: []*bindingInfo{binding1aBound},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVC2},
apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVCHigherVersion},
expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVC2},
expectedAPIPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVCHigherVersion},
shouldFail: true,
},
}
for name, scenario := range scenarios {
glog.V(5).Infof("Running test case %q", name)
@ -721,8 +982,12 @@ func TestBindPodVolumes(t *testing.T) {
if scenario.apiPVs == nil {
scenario.apiPVs = scenario.cachedPVs
}
if scenario.apiPVCs == nil {
scenario.apiPVCs = scenario.cachedPVCs
}
testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs)
testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings)
testEnv.initClaims(scenario.cachedPVCs, scenario.apiPVCs)
testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings, scenario.provisionedPVCs)
// Execute
err := testEnv.binder.BindPodVolumes(pod)
@ -737,7 +1002,11 @@ func TestBindPodVolumes(t *testing.T) {
if scenario.expectedAPIPVs == nil {
scenario.expectedAPIPVs = scenario.expectedPVs
}
if scenario.expectedAPIPVCs == nil {
scenario.expectedAPIPVCs = scenario.expectedPVCs
}
testEnv.validateBind(t, name, pod, scenario.expectedPVs, scenario.expectedAPIPVs)
testEnv.validateProvision(t, name, pod, scenario.expectedPVCs, scenario.expectedAPIPVCs)
}
}
@ -753,7 +1022,7 @@ func TestFindAssumeVolumes(t *testing.T) {
// Setup
testEnv := newTestBinder(t)
testEnv.initVolumes(pvs, pvs)
testEnv.initClaims(t, podPVCs)
testEnv.initClaims(podPVCs, podPVCs)
pod := makePod(podPVCs)
testNode := &v1.Node{
@ -787,7 +1056,7 @@ func TestFindAssumeVolumes(t *testing.T) {
if !bindingRequired {
t.Errorf("Test failed: binding not required")
}
testEnv.validateAssume(t, "assume", pod, expectedBindings)
testEnv.validateAssume(t, "assume", pod, expectedBindings, nil)
// After assume, claimref should be set on pv
expectedBindings = testEnv.getPodBindings(t, "after-assume", testNode.Name, pod)
@ -803,6 +1072,6 @@ func TestFindAssumeVolumes(t *testing.T) {
if !unboundSatisfied {
t.Errorf("Test failed: couldn't find PVs for all PVCs")
}
testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings)
testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, nil)
}
}

View File

@ -280,6 +280,12 @@ const (
// A node which has closer cpu,memory utilization and volume count is favoured by scheduler
// while making decisions.
BalanceAttachedNodeVolumes utilfeature.Feature = "BalanceAttachedNodeVolumes"
// owner: @lichuqiang
// alpha: v1.11
//
// Extend the default scheduler to be aware of volume topology and handle PV provisioning
DynamicProvisioningScheduling utilfeature.Feature = "DynamicProvisioningScheduling"
)
func init() {
@ -328,6 +334,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha},
VolumeSubpath: {Default: true, PreRelease: utilfeature.GA},
BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha},
DynamicProvisioningScheduling: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -506,12 +506,16 @@ func ClusterRoles() []rbacv1.ClusterRole {
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
rules := []rbacv1.PolicyRule{
rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
}
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
rules = append(rules, rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie())
}
roles = append(roles, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: "system:volume-scheduler"},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
},
Rules: rules,
})
}