Merge pull request #67556 from msau42/fix-assume

Automatic merge from submit-queue (batch tested with PRs 67709, 67556). If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md.

Fix volume scheduling issue with pod affinity and anti-affinity

**What this PR does / why we need it**:
The previous design of the volume scheduler had volume assume + bind done before pod assume + bind.  This causes issues when trying to evaluate future pods with pod affinity/anti-affinity because the pod has not been assumed while the volumes have been decided.

This PR changes the design so that volume and pod are assumed first, followed by volume and pod binding.  Volume binding waits (asynchronously) for the operations to complete or error. This eliminates the subsequent passes through the scheduler to wait for volume binding to complete (although pod events or resyncs may still cause the pod to run through scheduling while binding is still in progress).   This design also aligns better with the scheduler framework design, so will make it easier to migrate in the future.

Many changes had to be made in the volume scheduler to handle this new design, mostly around:
* How we cache pending binding operations.  Now, any delayed binding PVC that is not fully bound must have a cached binding operation.  This also means bind API updates may be repeated.
* Waiting for the bind operation to fully complete, and detecting failure conditions to abort the bind and retry scheduling.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #65131

**Special notes for your reviewer**:

**Release note**:

```release-note
Fixes issue where pod scheduling may fail when using local PVs and pod affinity and anti-affinity without the default StatefulSet OrderedReady pod management policy
```
pull/8/head
Kubernetes Submit Queue 2018-09-04 23:19:37 -07:00 committed by GitHub
commit 2fdd328d05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1056 additions and 413 deletions

View File

@ -144,6 +144,7 @@ users:
}
defaultSource := "DefaultProvider"
defaultBindTimeoutSeconds := int64(600)
testcases := []struct {
name string
@ -157,7 +158,10 @@ users:
options: &Options{
ConfigFile: configFile,
ComponentConfig: func() kubeschedulerconfig.KubeSchedulerConfiguration {
cfg, _ := newDefaultComponentConfig()
cfg, err := newDefaultComponentConfig()
if err != nil {
t.Fatal(err)
}
return *cfg
}(),
},
@ -187,6 +191,7 @@ users:
ContentType: "application/vnd.kubernetes.protobuf",
},
PercentageOfNodesToScore: 50,
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
},
},
{
@ -229,6 +234,7 @@ users:
ContentType: "application/vnd.kubernetes.protobuf",
},
PercentageOfNodesToScore: 50,
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
},
},
{

View File

@ -305,6 +305,7 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: s.ComponentConfig.DisablePreemption,
PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore,
BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds,
})
source := s.ComponentConfig.AlgorithmSource

View File

@ -83,7 +83,8 @@ func (e *errObjectName) Error() string {
// Restore() sets the latest object pointer back to the informer object.
// Get/List() always returns the latest object pointer.
type assumeCache struct {
mutex sync.Mutex
// Synchronizes updates to store
rwMutex sync.RWMutex
// describes the object stored
description string
@ -155,8 +156,8 @@ func (c *assumeCache) add(obj interface{}) {
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
if objInfo, _ := c.getObjInfo(name); objInfo != nil {
newVersion, err := c.getObjVersion(name, obj)
@ -199,8 +200,8 @@ func (c *assumeCache) delete(obj interface{}) {
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo := &objInfo{name: name}
err = c.store.Delete(objInfo)
@ -239,8 +240,8 @@ func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
}
func (c *assumeCache) Get(objName string) (interface{}, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
objInfo, err := c.getObjInfo(objName)
if err != nil {
@ -250,8 +251,8 @@ func (c *assumeCache) Get(objName string) (interface{}, error) {
}
func (c *assumeCache) List(indexObj interface{}) []interface{} {
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
allObjs := []interface{}{}
objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
@ -277,8 +278,8 @@ func (c *assumeCache) Assume(obj interface{}) error {
return &errObjectName{err}
}
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo, err := c.getObjInfo(name)
if err != nil {
@ -306,8 +307,8 @@ func (c *assumeCache) Assume(obj interface{}) error {
}
func (c *assumeCache) Restore(objName string) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
objInfo, err := c.getObjInfo(objName)
if err != nil {

View File

@ -19,12 +19,14 @@ package persistentvolume
import (
"fmt"
"sort"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
@ -42,16 +44,19 @@ import (
// a. Invokes all predicate functions, parallelized across nodes. FindPodVolumes() is invoked here.
// b. Invokes all priority functions. Future/TBD
// c. Selects the best node for the Pod.
// d. Cache the node selection for the Pod. (Assume phase)
// d. Cache the node selection for the Pod. AssumePodVolumes() is invoked here.
// i. If PVC binding is required, cache in-memory only:
// * Updated PV objects for prebinding to the corresponding PVCs.
// * For the pod, which PVs need API updates.
// AssumePodVolumes() is invoked here. Then BindPodVolumes() is called asynchronously by the
// scheduler. After BindPodVolumes() is complete, the Pod is added back to the scheduler queue
// to be processed again until all PVCs are bound.
// ii. If PVC binding is not required, cache the Pod->Node binding in the scheduler's pod cache,
// and asynchronously bind the Pod to the Node. This is handled in the scheduler and not here.
// 2. Once the assume operation is done, the scheduler processes the next Pod in the scheduler queue
// * For manual binding: update PV objects for prebinding to the corresponding PVCs.
// * For dynamic provisioning: update PVC object with a selected node from c)
// * For the pod, which PVCs and PVs need API updates.
// ii. Afterwards, the main scheduler caches the Pod->Node binding in the scheduler's pod cache,
// This is handled in the scheduler and not here.
// e. Asynchronously bind volumes and pod in a separate goroutine
// i. BindPodVolumes() is called first. It makes all the necessary API updates and waits for
// PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent
// back through the scheduler.
// ii. After BindPodVolumes() is complete, then the scheduler does the final Pod->Node binding.
// 2. Once all the assume operations are done in d), the scheduler processes the next Pod in the scheduler queue
// while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface {
// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node.
@ -71,18 +76,18 @@ type SchedulerVolumeBinder interface {
// 2. Take the PVCs that need provisioning and update the PVC cache with related
// annotations set.
//
// It returns true if all volumes are fully bound, and returns true if any volume binding/provisioning
// API operation needs to be done afterwards.
// It returns true if all volumes are fully bound
//
// This function will modify assumedPod with the node name.
// This function is called serially.
AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error)
AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, err error)
// BindPodVolumes will:
// 1. Initiate the volume binding by making the API call to prebind the PV
// to its matching PVC.
// 2. Trigger the volume provisioning by making the API call to set related
// annotations on the PVC
// 3. Wait for PVCs to be completely bound by the PV controller
//
// This function can be called in parallel.
BindPodVolumes(assumedPod *v1.Pod) error
@ -100,6 +105,9 @@ type volumeBinder struct {
// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
podBindingCache PodBindingCache
// Amount of time to wait for the bind operation to succeed
bindTimeout time.Duration
}
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
@ -107,7 +115,8 @@ func NewVolumeBinder(
kubeClient clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
storageClassInformer storageinformers.StorageClassInformer) SchedulerVolumeBinder {
storageClassInformer storageinformers.StorageClassInformer,
bindTimeout time.Duration) SchedulerVolumeBinder {
// TODO: find better way...
ctrl := &PersistentVolumeController{
@ -120,6 +129,7 @@ func NewVolumeBinder(
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
pvCache: NewPVAssumeCache(pvInformer.Informer()),
podBindingCache: NewPodBindingCache(),
bindTimeout: bindTimeout,
}
return b
@ -183,22 +193,24 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
// in podBindingCache for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
// 2. Update the pvcCache with the new PVCs with annotations set
// It will update podBindingCache again with the PVs and PVCs that need an API update.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) {
// 3. Update podBindingCache again with cached API updates for PVs and PVCs.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, err error) {
podName := getPodName(assumedPod)
glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName)
if allBound := b.arePodVolumesBound(assumedPod); allBound {
glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: all PVCs bound and nothing to do", podName, nodeName)
return true, false, nil
return true, nil
}
assumedPod.Spec.NodeName = nodeName
// Assume PV
claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
newBindings := []*bindingInfo{}
claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName)
// Assume PV
newBindings := []*bindingInfo{}
for _, binding := range claimsToBind {
newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc)
glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v",
@ -210,29 +222,20 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
err)
if err != nil {
b.revertAssumedPVs(newBindings)
return false, true, err
return false, err
}
// TODO: can we assume everytime?
if dirty {
err = b.pvCache.Assume(newPV)
if err != nil {
b.revertAssumedPVs(newBindings)
return false, true, err
return false, err
}
}
newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc})
}
}
// Don't update cached bindings if no API updates are needed. This can happen if we
// previously updated the PV object and are waiting for the PV controller to finish binding.
if len(newBindings) != 0 {
bindingRequired = true
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
}
// Assume PVCs
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName)
newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
for _, claim := range claimsToProvision {
// The claims from method args can be pointing to watcher cache. We must not
@ -249,50 +252,37 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
}
if len(newProvisionedPVCs) != 0 {
bindingRequired = true
// Update cache with the assumed pvcs and pvs
// Even if length is zero, update the cache with an empty slice to indicate that no
// operations are needed
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs)
}
return
}
// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache
// and makes the API update for those PVs/PVCs.
// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache,
// makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound
// by the PV controller.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
podName := getPodName(assumedPod)
glog.V(4).Infof("BindPodVolumes for pod %q", podName)
glog.V(4).Infof("BindPodVolumes for pod %q, node %q", podName, assumedPod.Spec.NodeName)
bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)
// Do the actual prebinding. Let the PV controller take care of the rest
// There is no API rollback if the actual binding fails
for i, bindingInfo := range bindings {
glog.V(5).Infof("BindPodVolumes: Pod %q, binding PV %q to PVC %q", podName, bindingInfo.pv.Name, bindingInfo.pvc.Name)
_, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false)
// Start API operations
err := b.bindAPIUpdate(podName, bindings, claimsToProvision)
if err != nil {
// only revert assumed cached updates for volumes we haven't successfully bound
b.revertAssumedPVs(bindings[i:])
// Revert all of the assumed cached updates for claims,
// since no actual API update will be done
b.revertAssumedPVCs(claimsToProvision)
return err
}
}
// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
// PV controller is expect to signal back by removing related annotations if actual provisioning fails
for i, claim := range claimsToProvision {
if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s] failed: %v", getPVCName(claim), err)
// only revert assumed cached updates for claims we haven't successfully updated
b.revertAssumedPVCs(claimsToProvision[i:])
return err
}
}
return nil
return wait.Poll(time.Second, b.bindTimeout, func() (bool, error) {
// Get cached values every time in case the pod gets deleted
bindings = b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
claimsToProvision = b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)
return b.checkBindings(assumedPod, bindings, claimsToProvision)
})
}
func getPodName(pod *v1.Pod) string {
@ -303,12 +293,131 @@ func getPVCName(pvc *v1.PersistentVolumeClaim) string {
return pvc.Namespace + "/" + pvc.Name
}
func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFullyBound bool) (bool, *v1.PersistentVolumeClaim, error) {
// bindAPIUpdate gets the cached bindings and PVCs to provision in podBindingCache
// and makes the API update for those PVs/PVCs.
func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error {
if bindings == nil {
return fmt.Errorf("failed to get cached bindings for pod %q", podName)
}
if claimsToProvision == nil {
return fmt.Errorf("failed to get cached claims to provision for pod %q", podName)
}
lastProcessedBinding := 0
lastProcessedProvisioning := 0
defer func() {
// only revert assumed cached updates for volumes we haven't successfully bound
if lastProcessedBinding < len(bindings) {
b.revertAssumedPVs(bindings[lastProcessedBinding:])
}
// only revert assumed cached updates for claims we haven't updated,
if lastProcessedProvisioning < len(claimsToProvision) {
b.revertAssumedPVCs(claimsToProvision[lastProcessedProvisioning:])
}
}()
var (
binding *bindingInfo
claim *v1.PersistentVolumeClaim
)
// Do the actual prebinding. Let the PV controller take care of the rest
// There is no API rollback if the actual binding fails
for _, binding = range bindings {
glog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name)
// TODO: does it hurt if we make an api call and nothing needs to be updated?
if _, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil {
return err
}
lastProcessedBinding++
}
// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
// PV controller is expect to signal back by removing related annotations if actual provisioning fails
for _, claim = range claimsToProvision {
glog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim))
if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
return err
}
lastProcessedProvisioning++
}
return nil
}
// checkBindings runs through all the PVCs in the Pod and checks:
// * if the PVC is fully bound
// * if there are any conditions that require binding to fail and be retried
//
// It returns true when all of the Pod's PVCs are fully bound, and error if
// binding (and scheduling) needs to be retried
func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) {
podName := getPodName(pod)
if bindings == nil {
return false, fmt.Errorf("failed to get cached bindings for pod %q", podName)
}
if claimsToProvision == nil {
return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName)
}
for _, binding := range bindings {
// Check for any conditions that might require scheduling retry
// Check if pv still exists
pv, err := b.pvCache.GetPV(binding.pv.Name)
if err != nil || pv == nil {
return false, fmt.Errorf("failed to check pv binding: %v", err)
}
// Check if pv.ClaimRef got dropped by unbindVolume()
if pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID == "" {
return false, fmt.Errorf("ClaimRef got reset for pv %q", pv.Name)
}
// Check if pvc is fully bound
if isBound, _, err := b.isPVCBound(binding.pvc.Namespace, binding.pvc.Name); !isBound || err != nil {
return false, err
}
// TODO; what if pvc is bound to the wrong pv? It means our assume cache should be reverted.
// Or will pv controller cleanup the pv.ClaimRef?
}
for _, claim := range claimsToProvision {
bound, pvc, err := b.isPVCBound(claim.Namespace, claim.Name)
if err != nil || pvc == nil {
return false, fmt.Errorf("failed to check pvc binding: %v", err)
}
// Check if selectedNode annotation is still set
if pvc.Annotations == nil {
return false, fmt.Errorf("selectedNode annotation reset for PVC %q", pvc.Name)
}
selectedNode := pvc.Annotations[annSelectedNode]
if selectedNode != pod.Spec.NodeName {
return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName)
}
if !bound {
return false, nil
}
}
// All pvs and pvcs that we operated on are bound
glog.V(4).Infof("All PVCs for pod %q are bound", podName)
return true, nil
}
func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume) (bool, *v1.PersistentVolumeClaim, error) {
if vol.PersistentVolumeClaim == nil {
return true, nil, nil
}
pvcName := vol.PersistentVolumeClaim.ClaimName
return b.isPVCBound(namespace, pvcName)
}
func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) {
claim := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
@ -322,7 +431,6 @@ func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFull
pvName := pvc.Spec.VolumeName
if pvName != "" {
if checkFullyBound {
if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) {
glog.V(5).Infof("PVC %q is fully bound to PV %q", getPVCName(pvc), pvName)
return true, pvc, nil
@ -331,9 +439,6 @@ func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFull
return false, pvc, nil
}
}
glog.V(5).Infof("PVC %q is bound or prebound to PV %q", getPVCName(pvc), pvName)
return true, pvc, nil
}
glog.V(5).Infof("PVC %q is not bound", getPVCName(pvc))
return false, pvc, nil
@ -342,7 +447,7 @@ func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFull
// arePodVolumesBound returns true if all volumes are fully bound
func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
for _, vol := range pod.Spec.Volumes {
if isBound, _, _ := b.isVolumeBound(pod.Namespace, &vol, true); !isBound {
if isBound, _, _ := b.isVolumeBound(pod.Namespace, &vol); !isBound {
// Pod has at least one PVC that needs binding
return false
}
@ -358,7 +463,7 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
unboundClaims = []*bindingInfo{}
for _, vol := range pod.Spec.Volumes {
volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol, false)
volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol)
if err != nil {
return nil, nil, nil, err
}
@ -372,7 +477,8 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
if err != nil {
return nil, nil, nil, err
}
if delayBinding {
// Prebound PVCs are treated as unbound immediate binding
if delayBinding && pvc.Spec.VolumeName == "" {
// Scheduler path
unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc})
} else {
@ -518,19 +624,6 @@ type bindingInfo struct {
pv *v1.PersistentVolume
}
// Used in unit test errors
func (b bindingInfo) String() string {
pvcName := ""
pvName := ""
if b.pvc != nil {
pvcName = getPVCName(b.pvc)
}
if b.pv != nil {
pvName = b.pv.Name
}
return fmt.Sprintf("[PVC %q, PV %q]", pvcName, pvName)
}
type byPVCSize []*bindingInfo
func (a byPVCSize) Len() int {

View File

@ -31,6 +31,8 @@ type PodBindingCache interface {
UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo)
// GetBindings will return the cached bindings for the given pod and node.
// A nil return value means that the entry was not found. An empty slice
// means that no binding operations are needed.
GetBindings(pod *v1.Pod, node string) []*bindingInfo
// UpdateProvisionedPVCs will update the cache with the given provisioning decisions
@ -38,6 +40,8 @@ type PodBindingCache interface {
UpdateProvisionedPVCs(pod *v1.Pod, node string, provisionings []*v1.PersistentVolumeClaim)
// GetProvisionedPVCs will return the cached provisioning decisions for the given pod and node.
// A nil return value means that the entry was not found. An empty slice
// means that no provisioning operations are needed.
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
// DeleteBindings will remove all cached bindings and provisionings for the given pod.
@ -46,7 +50,8 @@ type PodBindingCache interface {
}
type podBindingCache struct {
mutex sync.Mutex
// synchronizes bindingDecisions
rwMutex sync.RWMutex
// Key = pod name
// Value = nodeDecisions
@ -68,16 +73,16 @@ func NewPodBindingCache() PodBindingCache {
}
func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
podName := getPodName(pod)
delete(c.bindingDecisions, podName)
}
func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
@ -97,8 +102,8 @@ func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*b
}
func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
@ -113,8 +118,8 @@ func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
}
func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs []*v1.PersistentVolumeClaim) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
@ -134,8 +139,8 @@ func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs [
}
func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim {
c.mutex.Lock()
defer c.mutex.Unlock()
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]

View File

@ -16,16 +16,13 @@ limitations under the License.
package persistentvolume
import (
"k8s.io/api/core/v1"
)
import "k8s.io/api/core/v1"
type FakeVolumeBinderConfig struct {
AllBound bool
FindUnboundSatsified bool
FindBoundSatsified bool
FindErr error
AssumeBindingRequired bool
AssumeErr error
BindErr error
}
@ -48,9 +45,9 @@ func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVo
return b.config.FindUnboundSatsified, b.config.FindBoundSatsified, b.config.FindErr
}
func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (bool, bool, error) {
func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (bool, error) {
b.AssumeCalled = true
return b.config.AllBound, b.config.AssumeBindingRequired, b.config.AssumeErr
return b.config.AllBound, b.config.AssumeErr
}
func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {

View File

@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"testing"
"time"
"github.com/golang/glog"
@ -38,20 +39,30 @@ import (
)
var (
unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", "1", &waitClass)
unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", "1", &waitClass)
preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", "1", &waitClass)
boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", "1", &waitClass)
boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", "1", &waitClass)
// PVCs for manual binding
// TODO: clean up all of these
unboundPVC = makeTestPVC("unbound-pvc", "1G", "", pvcUnbound, "", "1", &waitClass)
unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", "", pvcUnbound, "", "1", &waitClass)
preboundPVC = makeTestPVC("prebound-pvc", "1G", "", pvcPrebound, "pv-node1a", "1", &waitClass)
preboundPVCNode1a = makeTestPVC("unbound-pvc", "1G", "", pvcPrebound, "pv-node1a", "1", &waitClass)
boundPVC = makeTestPVC("bound-pvc", "1G", "", pvcBound, "pv-bound", "1", &waitClass)
boundPVC2 = makeTestPVC("bound-pvc2", "1G", "", pvcBound, "pv-bound2", "1", &waitClass)
boundPVCNode1a = makeTestPVC("unbound-pvc", "1G", "", pvcBound, "pv-node1a", "1", &waitClass)
badPVC = makeBadPVC()
immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", "1", &immediateClass)
immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", "1", &immediateClass)
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", pvcUnbound, "", "1", &waitClassWithProvisioner)
provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "1", &waitClassWithProvisioner)
provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "2", &waitClassWithProvisioner)
noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", pvcUnbound, "", "1", &waitClass)
topoMismatchPVC = makeTestPVC("topo-mismatch-pvc", "1Gi", pvcUnbound, "", "1", &topoMismatchClass)
immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", "", pvcUnbound, "", "1", &immediateClass)
immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", "", pvcBound, "pv-bound-immediate", "1", &immediateClass)
// PVCs for dynamic provisioning
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "2", &waitClassWithProvisioner)
provisionedPVCBound = makeTestPVC("provisioned-pvc", "1Gi", "", pvcBound, "some-pv", "1", &waitClassWithProvisioner)
noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClass)
topoMismatchPVC = makeTestPVC("topo-mismatch-pvc", "1Gi", "", pvcUnbound, "", "1", &topoMismatchClass)
selectedNodePVC = makeTestPVC("provisioned-pvc", "1Gi", nodeLabelValue, pvcSelectedNode, "", "1", &waitClassWithProvisioner)
// PVs for manual binding
pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass)
pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass)
pvNode1b = makeTestPV("pv-node1b", "node1", "10G", "1", nil, waitClass)
@ -59,12 +70,13 @@ var (
pvNode2 = makeTestPV("pv-node2", "node2", "1G", "1", nil, waitClass)
pvPrebound = makeTestPV("pv-prebound", "node1", "1G", "1", unboundPVC, waitClass)
pvBound = makeTestPV("pv-bound", "node1", "1G", "1", boundPVC, waitClass)
pvNode1aBound = makeTestPV("pv-node1a", "node1", "1G", "1", unboundPVC, waitClass)
pvNode1bBound = makeTestPV("pv-node1b", "node1", "5G", "1", unboundPVC2, waitClass)
pvNode1bBoundHigherVersion = makeTestPV("pv-node1b", "node1", "5G", "2", unboundPVC2, waitClass)
pvNode1aBound = makeTestPV("pv-node1a", "node1", "5G", "1", unboundPVC, waitClass)
pvNode1bBound = makeTestPV("pv-node1b", "node1", "10G", "1", unboundPVC2, waitClass)
pvNode1bBoundHigherVersion = makeTestPV("pv-node1b", "node1", "10G", "2", unboundPVC2, waitClass)
pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass)
pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass)
// PVC/PV bindings for manual binding
binding1a = makeBinding(unboundPVC, pvNode1a)
binding1b = makeBinding(unboundPVC2, pvNode1b)
bindingNoNode = makeBinding(unboundPVC, pvNoNode)
@ -72,11 +84,13 @@ var (
binding1aBound = makeBinding(unboundPVC, pvNode1aBound)
binding1bBound = makeBinding(unboundPVC2, pvNode1bBound)
// storage class names
waitClass = "waitClass"
immediateClass = "immediateClass"
waitClassWithProvisioner = "waitClassWithProvisioner"
topoMismatchClass = "topoMismatchClass"
// node topology
nodeLabelKey = "nodeKey"
nodeLabelValue = "node1"
)
@ -102,7 +116,8 @@ func newTestBinder(t *testing.T) *testEnv {
client,
pvcInformer,
informerFactory.Core().V1().PersistentVolumes(),
classInformer)
classInformer,
10*time.Second)
// Add storageclasses
waitMode := storagev1.VolumeBindingWaitForFirstConsumer
@ -247,17 +262,44 @@ func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingIn
func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) {
cache := env.internalBinder.podBindingCache
bindings := cache.GetBindings(pod, node)
if aLen, eLen := len(bindings), len(expectedBindings); aLen != eLen {
t.Errorf("Test %q failed. expected %v bindings, got %v", name, eLen, aLen)
} else if expectedBindings == nil && bindings != nil {
// nil and empty are different
t.Errorf("Test %q failed. expected nil bindings, got empty", name)
} else if expectedBindings != nil && bindings == nil {
// nil and empty are different
t.Errorf("Test %q failed. expected empty bindings, got nil", name)
} else {
for i := 0; i < aLen; i++ {
// Validate PV
if !reflect.DeepEqual(expectedBindings[i].pv, bindings[i].pv) {
t.Errorf("Test %q failed. binding.pv doesn't match [A-expected, B-got]: %s", name, diff.ObjectDiff(expectedBindings[i].pv, bindings[i].pv))
}
if !reflect.DeepEqual(expectedBindings, bindings) {
t.Errorf("Test %q failed: Expected bindings %+v, got %+v", name, expectedBindings, bindings)
// Validate PVC
if !reflect.DeepEqual(expectedBindings[i].pvc, bindings[i].pvc) {
t.Errorf("Test %q failed. binding.pvc doesn't match [A-expected, B-got]: %s", name, diff.ObjectDiff(expectedBindings[i].pvc, bindings[i].pvc))
}
}
}
provisionedClaims := cache.GetProvisionedPVCs(pod, node)
if !reflect.DeepEqual(expectedProvisionings, provisionedClaims) {
t.Errorf("Test %q failed: Expected provisionings %+v, got %+v", name, expectedProvisionings, provisionedClaims)
if aLen, eLen := len(provisionedClaims), len(expectedProvisionings); aLen != eLen {
t.Errorf("Test %q failed. expected %v provisioned claims, got %v", name, eLen, aLen)
} else if expectedProvisionings == nil && provisionedClaims != nil {
// nil and empty are different
t.Errorf("Test %q failed. expected nil provisionings, got empty", name)
} else if expectedProvisionings != nil && provisionedClaims == nil {
// nil and empty are different
t.Errorf("Test %q failed. expected empty provisionings, got nil", name)
} else {
for i := 0; i < aLen; i++ {
if !reflect.DeepEqual(expectedProvisionings[i], provisionedClaims[i]) {
t.Errorf("Test %q failed. provisioned claims doesn't match [A-expected, B-got]: %s", name, diff.ObjectDiff(expectedProvisionings[i], provisionedClaims[i]))
}
}
}
}
func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) []*bindingInfo {
@ -266,8 +308,6 @@ func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod)
}
func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
// TODO: Check binding cache
// Check pv cache
pvCache := env.internalBinder.pvCache
for _, b := range bindings {
@ -383,17 +423,21 @@ const (
pvcUnbound = iota
pvcPrebound
pvcBound
pvcSelectedNode
)
func makeTestPVC(name, size string, pvcBoundState int, pvName, resourceVersion string, className *string) *v1.PersistentVolumeClaim {
func makeTestPVC(name, size, node string, pvcBoundState int, pvName, resourceVersion string, className *string) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{
Kind: "PersistentVolumeClaim",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "testns",
UID: types.UID("pvc-uid"),
ResourceVersion: resourceVersion,
SelfLink: testapi.Default.SelfLink("pvc", name),
Annotations: map[string]string{},
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
@ -406,6 +450,9 @@ func makeTestPVC(name, size string, pvcBoundState int, pvName, resourceVersion s
}
switch pvcBoundState {
case pvcSelectedNode:
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node)
// don't fallthrough
case pvcBound:
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annBindCompleted, "yes")
fallthrough
@ -454,6 +501,9 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
if boundToPVC != nil {
pv.Spec.ClaimRef = &v1.ObjectReference{
Kind: boundToPVC.Kind,
APIVersion: boundToPVC.APIVersion,
ResourceVersion: boundToPVC.ResourceVersion,
Name: boundToPVC.Name,
Namespace: boundToPVC.Namespace,
UID: boundToPVC.UID,
@ -464,6 +514,24 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
return pv
}
func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim {
newPVC := pvc.DeepCopy()
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node)
return newPVC
}
func pvcSetEmptyAnnotations(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
newPVC := pvc.DeepCopy()
newPVC.Annotations = map[string]string{}
return newPVC
}
func pvRemoveClaimUID(pv *v1.PersistentVolume) *v1.PersistentVolume {
newPV := pv.DeepCopy()
newPV.Spec.ClaimRef.UID = ""
return newPV
}
func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -515,7 +583,7 @@ func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindin
func addProvisionAnn(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
res := pvc.DeepCopy()
// Add provision related annotations
res.Annotations[annSelectedNode] = nodeLabelValue
metav1.SetMetaDataAnnotation(&res.ObjectMeta, annSelectedNode, nodeLabelValue)
return res
}
@ -570,8 +638,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
"prebound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{preboundPVC},
pvs: []*v1.PersistentVolume{pvNode1aBound},
expectedUnbound: true,
expectedBound: true,
shouldFail: true,
},
"unbound-pvc,pv-same-node": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
@ -623,9 +690,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
"one-prebound,one-unbound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, preboundPVC},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1a},
expectedUnbound: true,
expectedBound: true,
shouldFail: true,
},
"immediate-bound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC},
@ -835,60 +900,56 @@ func TestAssumePodVolumes(t *testing.T) {
// Expected return values
shouldFail bool
expectedBindingRequired bool
expectedAllBound bool
// if nil, use bindings
expectedBindings []*bindingInfo
expectedProvisionings []*v1.PersistentVolumeClaim
}{
"all-bound": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedAllBound: true,
},
"prebound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{preboundPVC},
pvs: []*v1.PersistentVolume{pvNode1a},
},
"one-binding": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindingRequired: true,
expectedBindings: []*bindingInfo{binding1aBound},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
},
"two-bindings": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
bindings: []*bindingInfo{binding1a, binding1b},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindingRequired: true,
expectedBindings: []*bindingInfo{binding1aBound, binding1bBound},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
},
"pv-already-bound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1aBound},
pvs: []*v1.PersistentVolume{pvNode1aBound},
expectedBindingRequired: false,
expectedBindings: []*bindingInfo{},
expectedBindings: []*bindingInfo{binding1aBound},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
},
"claimref-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a, bindingBad},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
shouldFail: true,
expectedBindingRequired: true,
},
"tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a, binding1b},
pvs: []*v1.PersistentVolume{pvNode1a},
shouldFail: true,
expectedBindingRequired: true,
},
"one-binding, one-pvc-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
bindings: []*bindingInfo{binding1a},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedBindingRequired: true,
expectedBindings: []*bindingInfo{binding1aBound},
expectedProvisionings: []*v1.PersistentVolumeClaim{selectedNodePVC},
},
"one-binding, one-provision-tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion},
@ -896,7 +957,6 @@ func TestAssumePodVolumes(t *testing.T) {
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2},
shouldFail: true,
expectedBindingRequired: true,
},
}
@ -911,7 +971,7 @@ func TestAssumePodVolumes(t *testing.T) {
testEnv.initVolumes(scenario.pvs, scenario.pvs)
// Execute
allBound, bindingRequired, err := testEnv.binder.AssumePodVolumes(pod, "node1")
allBound, err := testEnv.binder.AssumePodVolumes(pod, "node1")
// Validate
if !scenario.shouldFail && err != nil {
@ -920,24 +980,25 @@ func TestAssumePodVolumes(t *testing.T) {
if scenario.shouldFail && err == nil {
t.Errorf("Test %q failed: returned success but expected error", name)
}
if scenario.expectedBindingRequired != bindingRequired {
t.Errorf("Test %q failed: returned unexpected bindingRequired: %v", name, bindingRequired)
}
if scenario.expectedAllBound != allBound {
t.Errorf("Test %q failed: returned unexpected allBound: %v", name, allBound)
}
if scenario.expectedBindings == nil {
scenario.expectedBindings = scenario.bindings
}
if scenario.shouldFail {
testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs)
} else {
testEnv.validateAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs)
if scenario.expectedProvisionings == nil {
scenario.expectedProvisionings = scenario.provisionedPVCs
}
if scenario.shouldFail {
testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings, scenario.expectedProvisionings)
} else {
testEnv.validateAssume(t, name, pod, scenario.expectedBindings, scenario.expectedProvisionings)
}
testEnv.validatePodCache(t, name, pod.Spec.NodeName, pod, scenario.expectedBindings, scenario.expectedProvisionings)
}
}
func TestBindPodVolumes(t *testing.T) {
func TestBindAPIUpdate(t *testing.T) {
scenarios := map[string]struct {
// Inputs
bindings []*bindingInfo
@ -960,19 +1021,38 @@ func TestBindPodVolumes(t *testing.T) {
// if nil, use expectedPVCs
expectedAPIPVCs []*v1.PersistentVolumeClaim
}{
"all-bound": {},
"not-fully-bound": {
"nothing-to-bind-nil": {
shouldFail: true,
},
"nothing-to-bind-bindings-nil": {
provisionedPVCs: []*v1.PersistentVolumeClaim{},
shouldFail: true,
},
"nothing-to-bind-provisionings-nil": {
bindings: []*bindingInfo{},
shouldFail: true,
},
"nothing-to-bind-empty": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
},
"one-binding": {
bindings: []*bindingInfo{binding1aBound},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
},
"two-bindings": {
bindings: []*bindingInfo{binding1aBound, binding1bBound},
cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
},
"api-already-updated": {
bindings: []*bindingInfo{binding1aBound},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
},
"api-update-failed": {
bindings: []*bindingInfo{binding1aBound, binding1bBound},
@ -980,14 +1060,17 @@ func TestBindPodVolumes(t *testing.T) {
apiPVs: []*v1.PersistentVolume{pvNode1a, pvNode1bBoundHigherVersion},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1b},
expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
shouldFail: true,
},
"one-provisioned-pvc": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
},
"provision-api-update-failed": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVC2},
apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVCHigherVersion},
@ -995,7 +1078,7 @@ func TestBindPodVolumes(t *testing.T) {
expectedAPIPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVCHigherVersion},
shouldFail: true,
},
"bingding-succeed, provision-api-update-failed": {
"binding-succeed, provision-api-update-failed": {
bindings: []*bindingInfo{binding1aBound},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
@ -1008,7 +1091,7 @@ func TestBindPodVolumes(t *testing.T) {
},
}
for name, scenario := range scenarios {
glog.V(5).Infof("Running test case %q", name)
glog.V(4).Infof("Running test case %q", name)
// Setup
testEnv := newTestBinder(t)
@ -1024,7 +1107,7 @@ func TestBindPodVolumes(t *testing.T) {
testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings, scenario.provisionedPVCs)
// Execute
err := testEnv.binder.BindPodVolumes(pod)
err := testEnv.internalBinder.bindAPIUpdate(pod.Name, scenario.bindings, scenario.provisionedPVCs)
// Validate
if !scenario.shouldFail && err != nil {
@ -1044,6 +1127,301 @@ func TestBindPodVolumes(t *testing.T) {
}
}
func TestCheckBindings(t *testing.T) {
scenarios := map[string]struct {
// Inputs
bindings []*bindingInfo
cachedPVs []*v1.PersistentVolume
provisionedPVCs []*v1.PersistentVolumeClaim
cachedPVCs []*v1.PersistentVolumeClaim
// Expected return values
shouldFail bool
expectedBound bool
}{
"nothing-to-bind-nil": {
shouldFail: true,
},
"nothing-to-bind-bindings-nil": {
provisionedPVCs: []*v1.PersistentVolumeClaim{},
shouldFail: true,
},
"nothing-to-bind-provisionings-nil": {
bindings: []*bindingInfo{},
shouldFail: true,
},
"nothing-to-bind": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
expectedBound: true,
},
"binding-bound": {
bindings: []*bindingInfo{binding1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
expectedBound: true,
},
"binding-prebound": {
bindings: []*bindingInfo{binding1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{preboundPVCNode1a},
},
"binding-unbound": {
bindings: []*bindingInfo{binding1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
},
"binding-pvc-not-exists": {
bindings: []*bindingInfo{binding1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
shouldFail: true,
},
"binding-pv-not-exists": {
bindings: []*bindingInfo{binding1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
shouldFail: true,
},
"binding-claimref-nil": {
bindings: []*bindingInfo{binding1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
shouldFail: true,
},
"binding-claimref-uid-empty": {
bindings: []*bindingInfo{binding1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvRemoveClaimUID(pvNode1aBound)},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
shouldFail: true,
},
"binding-one-bound,one-unbound": {
bindings: []*bindingInfo{binding1aBound, binding1bBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, unboundPVC2},
},
"provisioning-pvc-bound": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)},
expectedBound: true,
},
"provisioning-pvc-unbound": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
},
"provisioning-pvc-not-exists": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
shouldFail: true,
},
"provisioning-pvc-annotations-nil": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
shouldFail: true,
},
"provisioning-pvc-selected-node-dropped": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetEmptyAnnotations(provisionedPVC)},
shouldFail: true,
},
"provisioning-pvc-selected-node-wrong-node": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetSelectedNode(provisionedPVC, "wrong-node")},
shouldFail: true,
},
"binding-bound-provisioning-unbound": {
bindings: []*bindingInfo{binding1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, addProvisionAnn(provisionedPVC)},
},
}
for name, scenario := range scenarios {
glog.V(4).Infof("Running test case %q", name)
// Setup
pod := makePod(nil)
testEnv := newTestBinder(t)
testEnv.initVolumes(scenario.cachedPVs, nil)
testEnv.initClaims(scenario.cachedPVCs, nil)
// Execute
allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs)
// Validate
if !scenario.shouldFail && err != nil {
t.Errorf("Test %q failed: returned error: %v", name, err)
}
if scenario.shouldFail && err == nil {
t.Errorf("Test %q failed: returned success but expected error", name)
}
if scenario.expectedBound != allBound {
t.Errorf("Test %q failed: returned bound %v", name, allBound)
}
}
}
func TestBindPodVolumes(t *testing.T) {
scenarios := map[string]struct {
// Inputs
// These tests only support a single pv and pvc and static binding
bindingsNil bool // Pass in nil bindings slice
binding *bindingInfo
cachedPV *v1.PersistentVolume
cachedPVC *v1.PersistentVolumeClaim
apiPV *v1.PersistentVolume
// This function runs with a delay of 5 seconds
delayFunc func(*testing.T, *testEnv, *v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim)
// Expected return values
shouldFail bool
}{
"nothing-to-bind-nil": {
bindingsNil: true,
shouldFail: true,
},
"nothing-to-bind-empty": {},
"already-bound": {
binding: binding1aBound,
cachedPV: pvNode1aBound,
cachedPVC: boundPVCNode1a,
},
"binding-succeeds-after-time": {
binding: binding1aBound,
cachedPV: pvNode1a,
cachedPVC: unboundPVC,
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
// Update PVC to be fully bound to PV
newPVC := pvc.DeepCopy()
newPVC.ResourceVersion = "100"
newPVC.Spec.VolumeName = pv.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
// Update pvc cache, fake client doesn't invoke informers
internalBinder, ok := testEnv.binder.(*volumeBinder)
if !ok {
t.Fatalf("Failed to convert to internal binder")
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
internalPVCCache.add(newPVC)
},
},
"pod-deleted-after-time": {
binding: binding1aBound,
cachedPV: pvNode1a,
cachedPVC: unboundPVC,
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
bindingsCache := testEnv.binder.GetBindingsCache()
if bindingsCache == nil {
t.Fatalf("Failed to get bindings cache")
}
// Delete the pod from the cache
bindingsCache.DeleteBindings(pod)
// Check that it's deleted
bindings := bindingsCache.GetBindings(pod, "node1")
if bindings != nil {
t.Fatalf("Failed to delete bindings")
}
},
shouldFail: true,
},
"binding-times-out": {
binding: binding1aBound,
cachedPV: pvNode1a,
cachedPVC: unboundPVC,
shouldFail: true,
},
"binding-fails": {
binding: binding1bBound,
cachedPV: pvNode1b,
apiPV: pvNode1bBoundHigherVersion,
cachedPVC: unboundPVC2,
shouldFail: true,
},
"check-fails": {
binding: binding1aBound,
cachedPV: pvNode1a,
cachedPVC: unboundPVC,
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
// Delete PVC
// Update pvc cache, fake client doesn't invoke informers
internalBinder, ok := testEnv.binder.(*volumeBinder)
if !ok {
t.Fatalf("Failed to convert to internal binder")
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
internalPVCCache.delete(pvc)
},
shouldFail: true,
},
}
for name, scenario := range scenarios {
glog.V(4).Infof("Running test case %q", name)
// Setup
pod := makePod(nil)
if scenario.apiPV == nil {
scenario.apiPV = scenario.cachedPV
}
testEnv := newTestBinder(t)
if !scenario.bindingsNil {
if scenario.binding != nil {
testEnv.initVolumes([]*v1.PersistentVolume{scenario.cachedPV}, []*v1.PersistentVolume{scenario.apiPV})
testEnv.initClaims([]*v1.PersistentVolumeClaim{scenario.cachedPVC}, nil)
testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{scenario.binding}, []*v1.PersistentVolumeClaim{})
} else {
testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{}, []*v1.PersistentVolumeClaim{})
}
}
if scenario.delayFunc != nil {
go func() {
time.Sleep(5 * time.Second)
glog.V(5).Infof("Running delay function")
scenario.delayFunc(t, testEnv, pod, scenario.binding.pv, scenario.binding.pvc)
}()
}
// Execute
err := testEnv.binder.BindPodVolumes(pod)
// Validate
if !scenario.shouldFail && err != nil {
t.Errorf("Test %q failed: returned error: %v", name, err)
}
if scenario.shouldFail && err == nil {
t.Errorf("Test %q failed: returned success but expected error", name)
}
}
}
func TestFindAssumeVolumes(t *testing.T) {
// Set feature gate
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
@ -1080,17 +1458,15 @@ func TestFindAssumeVolumes(t *testing.T) {
expectedBindings := testEnv.getPodBindings(t, "before-assume", testNode.Name, pod)
// 2. Assume matches
allBound, bindingRequired, err := testEnv.binder.AssumePodVolumes(pod, testNode.Name)
allBound, err := testEnv.binder.AssumePodVolumes(pod, testNode.Name)
if err != nil {
t.Errorf("Test failed: AssumePodVolumes returned error: %v", err)
}
if allBound {
t.Errorf("Test failed: detected unbound volumes as bound")
}
if !bindingRequired {
t.Errorf("Test failed: binding not required")
}
testEnv.validateAssume(t, "assume", pod, expectedBindings, nil)
// After assume, claimref should be set on pv
expectedBindings = testEnv.getPodBindings(t, "after-assume", testNode.Name, pod)
@ -1106,6 +1482,6 @@ func TestFindAssumeVolumes(t *testing.T) {
if !unboundSatisfied {
t.Errorf("Test failed: couldn't find PVs for all PVCs")
}
testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, nil)
testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, []*v1.PersistentVolumeClaim{})
}
}

View File

@ -85,6 +85,11 @@ type KubeSchedulerConfiguration struct {
// DEPRECATED.
// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
FailureDomains string
// Duration to wait for a binding operation to complete before timing out
// Value must be non-negative integer. The value zero indicates no waiting.
// If this value is nil, the default value will be used.
BindTimeoutSeconds *int64
}
// SchedulerAlgorithmSource is the source of a scheduler algorithm. One source

View File

@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
apiserverconfigv1alpha1 "k8s.io/apiserver/pkg/apis/config/v1alpha1"
kubescedulerconfigv1alpha1 "k8s.io/kube-scheduler/config/v1alpha1"
// this package shouldn't really depend on other k8s.io/kubernetes code
api "k8s.io/kubernetes/pkg/apis/core"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -102,4 +103,9 @@ func SetDefaults_KubeSchedulerConfiguration(obj *kubescedulerconfigv1alpha1.Kube
// Use the default LeaderElectionConfiguration options
apiserverconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&obj.LeaderElection.LeaderElectionConfiguration)
if obj.BindTimeoutSeconds == nil {
defaultBindTimeoutSeconds := int64(600)
obj.BindTimeoutSeconds = &defaultBindTimeoutSeconds
}
}

View File

@ -121,6 +121,7 @@ func autoConvert_v1alpha1_KubeSchedulerConfiguration_To_config_KubeSchedulerConf
out.DisablePreemption = in.DisablePreemption
out.PercentageOfNodesToScore = in.PercentageOfNodesToScore
out.FailureDomains = in.FailureDomains
out.BindTimeoutSeconds = (*int64)(unsafe.Pointer(in.BindTimeoutSeconds))
return nil
}
@ -149,6 +150,7 @@ func autoConvert_config_KubeSchedulerConfiguration_To_v1alpha1_KubeSchedulerConf
out.DisablePreemption = in.DisablePreemption
out.PercentageOfNodesToScore = in.PercentageOfNodesToScore
out.FailureDomains = in.FailureDomains
out.BindTimeoutSeconds = (*int64)(unsafe.Pointer(in.BindTimeoutSeconds))
return nil
}

View File

@ -41,6 +41,9 @@ func ValidateKubeSchedulerConfiguration(cc *config.KubeSchedulerConfiguration) f
if cc.HardPodAffinitySymmetricWeight < 0 || cc.HardPodAffinitySymmetricWeight > 100 {
allErrs = append(allErrs, field.Invalid(field.NewPath("hardPodAffinitySymmetricWeight"), cc.HardPodAffinitySymmetricWeight, "not in valid range 0-100"))
}
if cc.BindTimeoutSeconds == nil {
allErrs = append(allErrs, field.Required(field.NewPath("bindTimeoutSeconds"), ""))
}
return allErrs
}

View File

@ -17,15 +17,17 @@ limitations under the License.
package validation
import (
"testing"
"time"
apimachinery "k8s.io/apimachinery/pkg/apis/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiserver "k8s.io/apiserver/pkg/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"testing"
"time"
)
func TestValidateKubeSchedulerConfiguration(t *testing.T) {
testTimeout := int64(0)
validConfig := &config.KubeSchedulerConfiguration{
SchedulerName: "me",
HealthzBindAddress: "0.0.0.0:10254",
@ -56,6 +58,7 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
RetryPeriod: metav1.Duration{Duration: 5 * time.Second},
},
},
BindTimeoutSeconds: &testTimeout,
}
HardPodAffinitySymmetricWeightGt100 := validConfig.DeepCopy()
@ -86,6 +89,9 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
enableContentProfilingSetWithoutEnableProfiling.EnableProfiling = false
enableContentProfilingSetWithoutEnableProfiling.EnableContentionProfiling = true
bindTimeoutUnset := validConfig.DeepCopy()
bindTimeoutUnset.BindTimeoutSeconds = nil
scenarios := map[string]struct {
expectedToFail bool
config *config.KubeSchedulerConfiguration
@ -126,6 +132,10 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
expectedToFail: true,
config: HardPodAffinitySymmetricWeightLt0,
},
"bind-timeout-unset": {
expectedToFail: true,
config: bindTimeoutUnset,
},
}
for name, scenario := range scenarios {

View File

@ -32,6 +32,11 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati
out.LeaderElection = in.LeaderElection
out.ClientConnection = in.ClientConnection
out.DebuggingConfiguration = in.DebuggingConfiguration
if in.BindTimeoutSeconds != nil {
in, out := &in.BindTimeoutSeconds, &out.BindTimeoutSeconds
*out = new(int64)
**out = **in
}
return
}

View File

@ -159,6 +159,7 @@ type ConfigFactoryArgs struct {
EnableEquivalenceClassCache bool
DisablePreemption bool
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
}
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
@ -305,7 +306,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer)
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
args.StorageClassInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{

View File

@ -49,6 +49,7 @@ import (
const (
enableEquivalenceCache = true
disablePodPreemption = false
bindTimeoutSeconds = 600
)
func TestCreate(t *testing.T) {
@ -557,6 +558,7 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh
enableEquivalenceCache,
disablePodPreemption,
schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds,
})
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package scheduler
import (
"fmt"
"time"
"k8s.io/api/core/v1"
@ -184,10 +183,6 @@ func (sched *Scheduler) Run() {
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
@ -265,17 +260,12 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
return nodeName, err
}
// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required.
//
// If volume binding is required, then the bind volumes routine will update the pod to send it back through
// the scheduler.
//
// Otherwise, return nil error and continue to assume the pod.
// assumeVolumes will update the volume cache with the chosen bindings
//
// This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error {
func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
if err != nil {
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
@ -285,76 +275,38 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error
Reason: "SchedulerError",
Message: err.Error(),
})
return err
}
if !allBound {
err = fmt.Errorf("Volume binding started, waiting for completion")
if bindingRequired {
// Invalidate ecache because assumed volumes could have affected the cached
// pvs for other pods
if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
sched.config.Ecache.InvalidatePredicates(invalidPredicates)
}
// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue
sched.config.VolumeBinder.BindQueue.Add(assumed)
} else {
// We are just waiting for PV controller to finish binding, put it back in the
// scheduler queue
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "VolumeBindingWaiting",
})
}
return err
}
}
return nil
return
}
// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to
// make the API update for volume binding.
// This function runs forever until the volume BindQueue is closed.
func (sched *Scheduler) bindVolumesWorker() {
workFunc := func() bool {
keyObj, quit := sched.config.VolumeBinder.BindQueue.Get()
if quit {
return true
}
defer sched.config.VolumeBinder.BindQueue.Done(keyObj)
assumed, ok := keyObj.(*v1.Pod)
if !ok {
glog.V(4).Infof("Object is not a *v1.Pod")
return false
}
// TODO: add metrics
// bindVolumes will make the API update with the assumed bindings and wait until
// the PV controller has completely finished the binding operation.
//
// If binding errors, times out or gets undone, then an error will be returned to
// retry scheduling.
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
var reason string
var eventType string
glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
// The Pod is always sent back to the scheduler afterwards.
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
reason = "VolumeBindingFailed"
eventType = v1.EventTypeWarning
} else {
glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
reason = "VolumeBindingWaiting"
eventType = v1.EventTypeNormal
err = fmt.Errorf("Volume binding started, waiting for completion")
// Unassume the Pod and retry scheduling
if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
glog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// Always fail scheduling regardless of binding success.
// The Pod needs to be sent back through the scheduler to:
// * Retry volume binding if it fails.
// * Retry volume binding if dynamic provisioning fails.
// * Bind the Pod to the Node once all volumes are bound.
reason = "VolumeBindingFailed"
eventType = v1.EventTypeWarning
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
@ -362,15 +314,11 @@ func (sched *Scheduler) bindVolumesWorker() {
Status: v1.ConditionFalse,
Reason: reason,
})
return false
return err
}
for {
if quit := workFunc(); quit {
glog.V(4).Infof("bindVolumesWorker shutting down")
break
}
}
glog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
return nil
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
@ -478,16 +426,12 @@ func (sched *Scheduler) scheduleOne() {
// Assume volumes first before assuming the pod.
//
// If no volumes need binding, then nil is returned, and continue to assume the pod.
// If all volumes are completely bound, then allBound is true and binding will be skipped.
//
// Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes.
// scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod.
//
// After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for
// subsequent passes until all volumes are fully bound.
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
//
// This function modifies 'assumedPod' if volume binding is required.
err = sched.assumeAndBindVolumes(assumedPod, suggestedHost)
allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
if err != nil {
return
}
@ -499,6 +443,14 @@ func (sched *Scheduler) scheduleOne() {
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
// Bind volumes first before Pod
if !allBound {
err = sched.bindVolumes(assumedPod)
if err != nil {
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{

View File

@ -707,7 +707,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
expectAssumeCalled: true,
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
eventReason: "Scheduled",
},
{
@ -739,28 +738,15 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"),
},
{
name: "unbound/found matches",
name: "unbound/found matches/bind succeeds",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeBindingRequired: true,
},
expectAssumeCalled: true,
expectBindCalled: true,
eventReason: "FailedScheduling",
expectError: fmt.Errorf("Volume binding started, waiting for completion"),
},
{
name: "unbound/found matches/already-bound",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeBindingRequired: false,
},
expectAssumeCalled: true,
expectBindCalled: false,
eventReason: "FailedScheduling",
expectError: fmt.Errorf("Volume binding started, waiting for completion"),
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
eventReason: "Scheduled",
},
{
name: "predicate error",
@ -786,7 +772,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeBindingRequired: true,
BindErr: bindErr,
},
expectAssumeCalled: true,
@ -814,8 +799,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
close(eventChan)
})
go fakeVolumeBinder.Run(s.bindVolumesWorker, stop)
s.scheduleOne()
// Wait for pod to succeed or fail scheduling

View File

@ -8,11 +8,9 @@ go_library(
deps = [
"//pkg/controller/volume/persistentvolume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -20,19 +20,15 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
)
// VolumeBinder sets up the volume binding library and manages
// the volume binding operations with a queue.
// VolumeBinder sets up the volume binding library
type VolumeBinder struct {
Binder persistentvolume.SchedulerVolumeBinder
BindQueue *workqueue.Type
}
// NewVolumeBinder sets up the volume binding library and binding queue
@ -40,11 +36,11 @@ func NewVolumeBinder(
client clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder {
storageClassInformer storageinformers.StorageClassInformer,
bindTimeout time.Duration) *VolumeBinder {
return &VolumeBinder{
Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer),
BindQueue: workqueue.NewNamed("podsToBind"),
Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer, bindTimeout),
}
}
@ -52,18 +48,9 @@ func NewVolumeBinder(
func NewFakeVolumeBinder(config *persistentvolume.FakeVolumeBinderConfig) *VolumeBinder {
return &VolumeBinder{
Binder: persistentvolume.NewFakeVolumeBinder(config),
BindQueue: workqueue.NewNamed("podsToBind"),
}
}
// Run starts a goroutine to handle the binding queue with the given function.
func (b *VolumeBinder) Run(bindWorkFunc func(), stopCh <-chan struct{}) {
go wait.Until(bindWorkFunc, time.Second, stopCh)
<-stopCh
b.BindQueue.ShutDown()
}
// DeletePodBindings will delete the cached volume bindings for the given pod.
func (b *VolumeBinder) DeletePodBindings(pod *v1.Pod) {
cache := b.Binder.GetBindingsCache()

View File

@ -239,6 +239,7 @@ type FakeVolumePlugin struct {
VolumeLimits map[string]int64
VolumeLimitsError error
LimitKey string
ProvisionDelaySeconds int
Mounters []*FakeVolume
Unmounters []*FakeVolume
@ -437,7 +438,7 @@ func (plugin *FakeVolumePlugin) NewProvisioner(options VolumeOptions) (Provision
plugin.Lock()
defer plugin.Unlock()
plugin.LastProvisionerOptions = options
return &FakeProvisioner{options, plugin.Host}, nil
return &FakeProvisioner{options, plugin.Host, plugin.ProvisionDelaySeconds}, nil
}
func (plugin *FakeVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
@ -781,6 +782,7 @@ func (fd *FakeDeleter) GetPath() string {
type FakeProvisioner struct {
Options VolumeOptions
Host VolumeHost
ProvisionDelaySeconds int
}
func (fc *FakeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
@ -807,6 +809,10 @@ func (fc *FakeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []
},
}
if fc.ProvisionDelaySeconds > 0 {
time.Sleep(time.Duration(fc.ProvisionDelaySeconds) * time.Second)
}
return pv, nil
}

View File

@ -81,6 +81,11 @@ type KubeSchedulerConfiguration struct {
// DEPRECATED.
// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
FailureDomains string `json:"failureDomains"`
// Duration to wait for a binding operation to complete before timing out
// Value must be non-negative integer. The value zero indicates no waiting.
// If this value is nil, the default value will be used.
BindTimeoutSeconds *int64 `json:"bindTimeoutSeconds"`
}
// SchedulerAlgorithmSource is the source of a scheduler algorithm. One source

View File

@ -32,6 +32,11 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati
in.LeaderElection.DeepCopyInto(&out.LeaderElection)
out.ClientConnection = in.ClientConnection
out.DebuggingConfiguration = in.DebuggingConfiguration
if in.BindTimeoutSeconds != nil {
in, out := &in.BindTimeoutSeconds, &out.BindTimeoutSeconds
*out = new(int64)
**out = **in
}
return
}

View File

@ -566,13 +566,28 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
framework.Skipf("Runs only when number of nodes >= %v", ssReplicas)
}
By("Creating a StatefulSet with pod anti-affinity on nodes")
ss := createStatefulSet(config, ssReplicas, volsPerNode, true)
ss := createStatefulSet(config, ssReplicas, volsPerNode, true, false)
validateStatefulSet(config, ss, true)
})
It("should use volumes on one node when pod has affinity", func() {
By("Creating a StatefulSet with pod affinity on nodes")
ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false)
ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false, false)
validateStatefulSet(config, ss, false)
})
It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func() {
if len(config.nodes) < ssReplicas {
framework.Skipf("Runs only when number of nodes >= %v", ssReplicas)
}
By("Creating a StatefulSet with pod anti-affinity on nodes")
ss := createStatefulSet(config, ssReplicas, 1, true, true)
validateStatefulSet(config, ss, true)
})
It("should use volumes on one node when pod management is parallel and pod has affinity", func() {
By("Creating a StatefulSet with pod affinity on nodes")
ss := createStatefulSet(config, ssReplicas, 1, false, true)
validateStatefulSet(config, ss, false)
})
})
@ -1830,7 +1845,7 @@ func findLocalPersistentVolume(c clientset.Interface, volumePath string) (*v1.Pe
return nil, nil
}
func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti bool) *appsv1.StatefulSet {
func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet {
mounts := []v1.VolumeMount{}
claims := []v1.PersistentVolumeClaim{}
for i := 0; i < volumeCount; i++ {
@ -1897,6 +1912,10 @@ func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount in
},
}
if parallel {
spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
}
ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(spec)
Expect(err).NotTo(HaveOccurred())

View File

@ -34,6 +34,7 @@ go_test(
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",

View File

@ -183,6 +183,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
defaultBindTimeout := int64(30)
ss := &schedulerappconfig.Config{
ComponentConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
@ -195,6 +196,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
},
},
},
BindTimeoutSeconds: &defaultBindTimeout,
},
Client: clientSet,
InformerFactory: informerFactory,
@ -244,6 +246,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
defaultBindTimeout := int64(30)
ss := &schedulerappconfig.Config{
ComponentConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
SchedulerName: v1.DefaultSchedulerName,
@ -256,6 +259,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
},
},
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
BindTimeoutSeconds: &defaultBindTimeout,
},
Client: clientSet,
InformerFactory: informerFactory,

View File

@ -91,6 +91,7 @@ func createConfiguratorWithPodInformer(
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: 600,
})
}
@ -143,7 +144,7 @@ func initTestScheduler(
) *TestContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, time.Second)
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, false, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -155,13 +156,15 @@ func initTestSchedulerWithOptions(
setPodInformer bool,
policy *schedulerapi.Policy,
disablePreemption bool,
disableEquivalenceCache bool,
resyncPeriod time.Duration,
) *TestContext {
// Enable EnableEquivalenceClassCache for all integration tests.
if !disableEquivalenceCache {
defer utilfeaturetesting.SetFeatureGateDuringTest(
t,
utilfeature.DefaultFeatureGate,
features.EnableEquivalenceClassCache, true)()
}
// 1. Create scheduler
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod)
@ -256,7 +259,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, time.Second)
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, false, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called

View File

@ -20,6 +20,7 @@ package scheduler
import (
"fmt"
"os"
"strconv"
"strings"
"testing"
@ -32,12 +33,14 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -60,6 +63,7 @@ var (
classWait = "wait"
classImmediate = "immediate"
classDynamic = "dynamic"
sharedClasses = map[storagev1.VolumeBindingMode]*storagev1.StorageClass{
modeImmediate: makeStorageClass(classImmediate, &modeImmediate),
@ -94,7 +98,7 @@ func TestVolumeBinding(t *testing.T) {
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-scheduling", 2, features, 0)
config := setupCluster(t, "volume-scheduling-", 2, features, 0, 0, false)
defer config.teardown()
cases := map[string]struct {
@ -267,7 +271,7 @@ func TestVolumeBindingRescheduling(t *testing.T) {
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-scheduling", 2, features, 0)
config := setupCluster(t, "volume-scheduling-", 2, features, 0, 0, false)
defer config.teardown()
storageClassName := "local-storage"
@ -385,8 +389,9 @@ func TestVolumeBindingRescheduling(t *testing.T) {
}
// TestVolumeBindingStress creates <podLimit> pods, each with <volsPerPod> unbound PVCs.
// PVs are precreated.
func TestVolumeBindingStress(t *testing.T) {
testVolumeBindingStress(t, 0)
testVolumeBindingStress(t, 0, false, 0)
}
// Like TestVolumeBindingStress but with scheduler resync. In real cluster,
@ -394,32 +399,60 @@ func TestVolumeBindingStress(t *testing.T) {
// service/node update events.
// This is useful to detect possible race conditions.
func TestVolumeBindingStressWithSchedulerResync(t *testing.T) {
testVolumeBindingStress(t, time.Second)
testVolumeBindingStress(t, time.Second, false, 0)
}
func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) {
// Like TestVolumeBindingStress but with fast dynamic provisioning
func TestVolumeBindingDynamicStressFast(t *testing.T) {
testVolumeBindingStress(t, 0, true, 0)
}
// Like TestVolumeBindingStress but with slow dynamic provisioning
func TestVolumeBindingDynamicStressSlow(t *testing.T) {
testVolumeBindingStress(t, 0, true, 30)
}
func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) {
features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-binding-stress", 1, features, schedulerResyncPeriod)
config := setupCluster(t, "volume-binding-stress-", 1, features, schedulerResyncPeriod, provisionDelaySeconds, false)
defer config.teardown()
// Set max volume limit to the number of PVCs the test will create
// TODO: remove when max volume limit allows setting through storageclass
if err := os.Setenv(predicates.KubeMaxPDVols, fmt.Sprintf("%v", podLimit*volsPerPod)); err != nil {
t.Fatalf("failed to set max pd limit: %v", err)
}
defer os.Unsetenv(predicates.KubeMaxPDVols)
scName := &classWait
if dynamic {
scName = &classDynamic
sc := makeDynamicProvisionerStorageClass(*scName, &modeWait)
if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil {
t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
}
}
// Create enough PVs and PVCs for all the pods
pvs := []*v1.PersistentVolume{}
pvcs := []*v1.PersistentVolumeClaim{}
for i := 0; i < podLimit*volsPerPod; i++ {
pv := makePV(fmt.Sprintf("pv-stress-%v", i), classWait, "", "", node1)
pvc := makePVC(fmt.Sprintf("pvc-stress-%v", i), config.ns, &classWait, "")
// Don't create pvs for dynamic provisioning test
if !dynamic {
pv := makePV(fmt.Sprintf("pv-stress-%v", i), *scName, "", "", node1)
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
}
pvs = append(pvs, pv)
}
pvc := makePVC(fmt.Sprintf("pvc-stress-%v", i), config.ns, scName, "")
if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil {
t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
}
pvs = append(pvs, pv)
pvcs = append(pvcs, pvc)
}
@ -431,7 +464,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration)
podPvcs = append(podPvcs, pvcs[j].Name)
}
pod := makePod(fmt.Sprintf("pod%v", i), config.ns, podPvcs)
pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, podPvcs)
if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
@ -442,7 +475,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration)
for _, pod := range pods {
// Use increased timeout for stress test because there is a higher chance of
// PV sync error
if err := waitForPodToScheduleWithTimeout(config.client, pod, 60*time.Second); err != nil {
if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil {
t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
}
}
@ -456,12 +489,142 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration)
}
}
func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) {
features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
// TODO: disable equivalence cache until kubernetes/kubernetes#67680 is fixed
config := setupCluster(t, "volume-pod-affinity-", numNodes, features, 0, 0, true)
defer config.teardown()
pods := []*v1.Pod{}
pvcs := []*v1.PersistentVolumeClaim{}
pvs := []*v1.PersistentVolume{}
// Create PVs for the first node
for i := 0; i < numPVsFirstNode; i++ {
pv := makePV(fmt.Sprintf("pv-node1-%v", i), classWait, "", "", node1)
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
}
pvs = append(pvs, pv)
}
// Create 1 PV per Node for the remaining nodes
for i := 2; i <= numNodes; i++ {
pv := makePV(fmt.Sprintf("pv-node%v-0", i), classWait, "", "", fmt.Sprintf("node-%v", i))
if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil {
t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
}
pvs = append(pvs, pv)
}
// Create pods
for i := 0; i < numPods; i++ {
// Create one pvc per pod
pvc := makePVC(fmt.Sprintf("pvc-%v", i), config.ns, &classWait, "")
if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil {
t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
}
pvcs = append(pvcs, pvc)
// Create pod with pod affinity
pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, []string{pvc.Name})
pod.Spec.Affinity = &v1.Affinity{}
affinityTerms := []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"volume-binding-test"},
},
},
},
TopologyKey: nodeAffinityLabelKey,
},
}
if anti {
pod.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
}
} else {
pod.Spec.Affinity.PodAffinity = &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
}
}
if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
pods = append(pods, pod)
}
// Validate Pods scheduled
scheduledNodes := sets.NewString()
for _, pod := range pods {
if err := waitForPodToSchedule(config.client, pod); err != nil {
t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
} else {
// Keep track of all the nodes that the Pods were scheduled on
pod, err = config.client.CoreV1().Pods(config.ns).Get(pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Pod %q: %v", pod.Name, err)
}
if pod.Spec.NodeName == "" {
t.Fatalf("Pod %q node name unset after scheduling", pod.Name)
}
scheduledNodes.Insert(pod.Spec.NodeName)
}
}
// Validate the affinity policy
if anti {
// The pods should have been spread across different nodes
if scheduledNodes.Len() != numPods {
t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), numPods)
}
} else {
// The pods should have been scheduled on 1 node
if scheduledNodes.Len() != 1 {
t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), 1)
}
}
// Validate PVC binding
for _, pvc := range pvcs {
validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound)
}
}
func TestVolumeBindingWithAntiAffinity(t *testing.T) {
numNodes := 10
// Create as many pods as number of nodes
numPods := numNodes
// Create many more PVs on node1 to increase chance of selecting node1
numPVsFirstNode := 10 * numNodes
testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
}
func TestVolumeBindingWithAffinity(t *testing.T) {
numPods := 10
// Create many more nodes to increase chance of selecting a PV on a different node than node1
numNodes := 10 * numPods
// Create numPods PVs on the first node
numPVsFirstNode := numPods
testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
}
func TestPVAffinityConflict(t *testing.T) {
features := map[string]bool{
"VolumeScheduling": true,
"PersistentLocalVolumes": true,
}
config := setupCluster(t, "volume-scheduling", 3, features, 0)
config := setupCluster(t, "volume-scheduling-", 3, features, 0, 0, false)
defer config.teardown()
pv := makePV("local-pv", classImmediate, "", "", node1)
@ -519,7 +682,7 @@ func TestPVAffinityConflict(t *testing.T) {
}
}
func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration) *testConfig {
func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig {
oldFeatures := make(map[string]bool, len(features))
for feature := range features {
oldFeatures[feature] = utilfeature.DefaultFeatureGate.Enabled(utilfeature.Feature(feature))
@ -529,7 +692,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
controllerCh := make(chan struct{})
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, resyncPeriod)
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, disableEquivalenceCache, resyncPeriod)
clientset := context.clientSet
ns := context.ns.Name
@ -543,6 +706,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
Host: host,
Config: volume.VolumeConfig{},
LastProvisionerOptions: volume.VolumeOptions{},
ProvisionDelaySeconds: provisionDelaySeconds,
NewAttacherCallCount: 0,
NewDetacherCallCount: 0,
Mounters: nil,
@ -732,6 +896,9 @@ func makePod(name, ns string, pvcs []string) *v1.Pod {
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
Labels: map[string]string{
"app": "volume-binding-test",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{