diff --git a/cmd/kube-scheduler/app/options/options_test.go b/cmd/kube-scheduler/app/options/options_test.go index e1b153af3e..bacab57fc5 100644 --- a/cmd/kube-scheduler/app/options/options_test.go +++ b/cmd/kube-scheduler/app/options/options_test.go @@ -144,6 +144,7 @@ users: } defaultSource := "DefaultProvider" + defaultBindTimeoutSeconds := int64(600) testcases := []struct { name string @@ -157,7 +158,10 @@ users: options: &Options{ ConfigFile: configFile, ComponentConfig: func() kubeschedulerconfig.KubeSchedulerConfiguration { - cfg, _ := newDefaultComponentConfig() + cfg, err := newDefaultComponentConfig() + if err != nil { + t.Fatal(err) + } return *cfg }(), }, @@ -187,6 +191,7 @@ users: ContentType: "application/vnd.kubernetes.protobuf", }, PercentageOfNodesToScore: 50, + BindTimeoutSeconds: &defaultBindTimeoutSeconds, }, }, { @@ -229,6 +234,7 @@ users: ContentType: "application/vnd.kubernetes.protobuf", }, PercentageOfNodesToScore: 50, + BindTimeoutSeconds: &defaultBindTimeoutSeconds, }, }, { diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index e37efe9788..a8db9954ec 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -305,6 +305,7 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), DisablePreemption: s.ComponentConfig.DisablePreemption, PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore, + BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds, }) source := s.ComponentConfig.AlgorithmSource diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go index b04b402bff..0e4ade0faf 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go @@ -83,7 +83,8 @@ func (e *errObjectName) Error() string { // Restore() sets the latest object pointer back to the informer object. // Get/List() always returns the latest object pointer. type assumeCache struct { - mutex sync.Mutex + // Synchronizes updates to store + rwMutex sync.RWMutex // describes the object stored description string @@ -155,8 +156,8 @@ func (c *assumeCache) add(obj interface{}) { return } - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() if objInfo, _ := c.getObjInfo(name); objInfo != nil { newVersion, err := c.getObjVersion(name, obj) @@ -199,8 +200,8 @@ func (c *assumeCache) delete(obj interface{}) { return } - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() objInfo := &objInfo{name: name} err = c.store.Delete(objInfo) @@ -239,8 +240,8 @@ func (c *assumeCache) getObjInfo(name string) (*objInfo, error) { } func (c *assumeCache) Get(objName string) (interface{}, error) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() objInfo, err := c.getObjInfo(objName) if err != nil { @@ -250,8 +251,8 @@ func (c *assumeCache) Get(objName string) (interface{}, error) { } func (c *assumeCache) List(indexObj interface{}) []interface{} { - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() allObjs := []interface{}{} objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj}) @@ -277,8 +278,8 @@ func (c *assumeCache) Assume(obj interface{}) error { return &errObjectName{err} } - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() objInfo, err := c.getObjInfo(name) if err != nil { @@ -306,8 +307,8 @@ func (c *assumeCache) Assume(obj interface{}) error { } func (c *assumeCache) Restore(objName string) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() objInfo, err := c.getObjInfo(objName) if err != nil { diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index a5474fd287..5375c78a67 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -19,12 +19,14 @@ package persistentvolume import ( "fmt" "sort" + "time" "github.com/golang/glog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" @@ -42,16 +44,19 @@ import ( // a. Invokes all predicate functions, parallelized across nodes. FindPodVolumes() is invoked here. // b. Invokes all priority functions. Future/TBD // c. Selects the best node for the Pod. -// d. Cache the node selection for the Pod. (Assume phase) +// d. Cache the node selection for the Pod. AssumePodVolumes() is invoked here. // i. If PVC binding is required, cache in-memory only: -// * Updated PV objects for prebinding to the corresponding PVCs. -// * For the pod, which PVs need API updates. -// AssumePodVolumes() is invoked here. Then BindPodVolumes() is called asynchronously by the -// scheduler. After BindPodVolumes() is complete, the Pod is added back to the scheduler queue -// to be processed again until all PVCs are bound. -// ii. If PVC binding is not required, cache the Pod->Node binding in the scheduler's pod cache, -// and asynchronously bind the Pod to the Node. This is handled in the scheduler and not here. -// 2. Once the assume operation is done, the scheduler processes the next Pod in the scheduler queue +// * For manual binding: update PV objects for prebinding to the corresponding PVCs. +// * For dynamic provisioning: update PVC object with a selected node from c) +// * For the pod, which PVCs and PVs need API updates. +// ii. Afterwards, the main scheduler caches the Pod->Node binding in the scheduler's pod cache, +// This is handled in the scheduler and not here. +// e. Asynchronously bind volumes and pod in a separate goroutine +// i. BindPodVolumes() is called first. It makes all the necessary API updates and waits for +// PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent +// back through the scheduler. +// ii. After BindPodVolumes() is complete, then the scheduler does the final Pod->Node binding. +// 2. Once all the assume operations are done in d), the scheduler processes the next Pod in the scheduler queue // while the actual binding operation occurs in the background. type SchedulerVolumeBinder interface { // FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node. @@ -71,18 +76,18 @@ type SchedulerVolumeBinder interface { // 2. Take the PVCs that need provisioning and update the PVC cache with related // annotations set. // - // It returns true if all volumes are fully bound, and returns true if any volume binding/provisioning - // API operation needs to be done afterwards. + // It returns true if all volumes are fully bound // // This function will modify assumedPod with the node name. // This function is called serially. - AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error) + AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, err error) // BindPodVolumes will: // 1. Initiate the volume binding by making the API call to prebind the PV // to its matching PVC. // 2. Trigger the volume provisioning by making the API call to set related // annotations on the PVC + // 3. Wait for PVCs to be completely bound by the PV controller // // This function can be called in parallel. BindPodVolumes(assumedPod *v1.Pod) error @@ -100,6 +105,9 @@ type volumeBinder struct { // Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes. // AssumePodVolumes modifies the bindings again for use in BindPodVolumes. podBindingCache PodBindingCache + + // Amount of time to wait for the bind operation to succeed + bindTimeout time.Duration } // NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions. @@ -107,7 +115,8 @@ func NewVolumeBinder( kubeClient clientset.Interface, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, - storageClassInformer storageinformers.StorageClassInformer) SchedulerVolumeBinder { + storageClassInformer storageinformers.StorageClassInformer, + bindTimeout time.Duration) SchedulerVolumeBinder { // TODO: find better way... ctrl := &PersistentVolumeController{ @@ -120,6 +129,7 @@ func NewVolumeBinder( pvcCache: NewPVCAssumeCache(pvcInformer.Informer()), pvCache: NewPVAssumeCache(pvInformer.Informer()), podBindingCache: NewPodBindingCache(), + bindTimeout: bindTimeout, } return b @@ -183,22 +193,24 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume // in podBindingCache for the chosen node, and: // 1. Update the pvCache with the new prebound PV. // 2. Update the pvcCache with the new PVCs with annotations set -// It will update podBindingCache again with the PVs and PVCs that need an API update. -func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) { +// 3. Update podBindingCache again with cached API updates for PVs and PVCs. +func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, err error) { podName := getPodName(assumedPod) glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName) if allBound := b.arePodVolumesBound(assumedPod); allBound { glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: all PVCs bound and nothing to do", podName, nodeName) - return true, false, nil + return true, nil } assumedPod.Spec.NodeName = nodeName - // Assume PV - claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName) - newBindings := []*bindingInfo{} + claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName) + claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName) + + // Assume PV + newBindings := []*bindingInfo{} for _, binding := range claimsToBind { newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc) glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v", @@ -210,29 +222,20 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al err) if err != nil { b.revertAssumedPVs(newBindings) - return false, true, err + return false, err } + // TODO: can we assume everytime? if dirty { err = b.pvCache.Assume(newPV) if err != nil { b.revertAssumedPVs(newBindings) - return false, true, err + return false, err } - - newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc}) } - } - - // Don't update cached bindings if no API updates are needed. This can happen if we - // previously updated the PV object and are waiting for the PV controller to finish binding. - if len(newBindings) != 0 { - bindingRequired = true - b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings) + newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc}) } // Assume PVCs - claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName) - newProvisionedPVCs := []*v1.PersistentVolumeClaim{} for _, claim := range claimsToProvision { // The claims from method args can be pointing to watcher cache. We must not @@ -249,50 +252,37 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al newProvisionedPVCs = append(newProvisionedPVCs, claimClone) } - if len(newProvisionedPVCs) != 0 { - bindingRequired = true - b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs) - } + // Update cache with the assumed pvcs and pvs + // Even if length is zero, update the cache with an empty slice to indicate that no + // operations are needed + b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings) + b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs) return } -// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache -// and makes the API update for those PVs/PVCs. +// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache, +// makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound +// by the PV controller. func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error { podName := getPodName(assumedPod) - glog.V(4).Infof("BindPodVolumes for pod %q", podName) + glog.V(4).Infof("BindPodVolumes for pod %q, node %q", podName, assumedPod.Spec.NodeName) bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName) - // Do the actual prebinding. Let the PV controller take care of the rest - // There is no API rollback if the actual binding fails - for i, bindingInfo := range bindings { - glog.V(5).Infof("BindPodVolumes: Pod %q, binding PV %q to PVC %q", podName, bindingInfo.pv.Name, bindingInfo.pvc.Name) - _, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false) - if err != nil { - // only revert assumed cached updates for volumes we haven't successfully bound - b.revertAssumedPVs(bindings[i:]) - // Revert all of the assumed cached updates for claims, - // since no actual API update will be done - b.revertAssumedPVCs(claimsToProvision) - return err - } + // Start API operations + err := b.bindAPIUpdate(podName, bindings, claimsToProvision) + if err != nil { + return err } - // Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest - // PV controller is expect to signal back by removing related annotations if actual provisioning fails - for i, claim := range claimsToProvision { - if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { - glog.V(4).Infof("updating PersistentVolumeClaim[%s] failed: %v", getPVCName(claim), err) - // only revert assumed cached updates for claims we haven't successfully updated - b.revertAssumedPVCs(claimsToProvision[i:]) - return err - } - } - - return nil + return wait.Poll(time.Second, b.bindTimeout, func() (bool, error) { + // Get cached values every time in case the pod gets deleted + bindings = b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) + claimsToProvision = b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName) + return b.checkBindings(assumedPod, bindings, claimsToProvision) + }) } func getPodName(pod *v1.Pod) string { @@ -303,12 +293,131 @@ func getPVCName(pvc *v1.PersistentVolumeClaim) string { return pvc.Namespace + "/" + pvc.Name } -func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFullyBound bool) (bool, *v1.PersistentVolumeClaim, error) { +// bindAPIUpdate gets the cached bindings and PVCs to provision in podBindingCache +// and makes the API update for those PVs/PVCs. +func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error { + if bindings == nil { + return fmt.Errorf("failed to get cached bindings for pod %q", podName) + } + if claimsToProvision == nil { + return fmt.Errorf("failed to get cached claims to provision for pod %q", podName) + } + + lastProcessedBinding := 0 + lastProcessedProvisioning := 0 + defer func() { + // only revert assumed cached updates for volumes we haven't successfully bound + if lastProcessedBinding < len(bindings) { + b.revertAssumedPVs(bindings[lastProcessedBinding:]) + } + // only revert assumed cached updates for claims we haven't updated, + if lastProcessedProvisioning < len(claimsToProvision) { + b.revertAssumedPVCs(claimsToProvision[lastProcessedProvisioning:]) + } + }() + + var ( + binding *bindingInfo + claim *v1.PersistentVolumeClaim + ) + + // Do the actual prebinding. Let the PV controller take care of the rest + // There is no API rollback if the actual binding fails + for _, binding = range bindings { + glog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name) + // TODO: does it hurt if we make an api call and nothing needs to be updated? + if _, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil { + return err + } + lastProcessedBinding++ + } + + // Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest + // PV controller is expect to signal back by removing related annotations if actual provisioning fails + for _, claim = range claimsToProvision { + glog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim)) + if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { + return err + } + lastProcessedProvisioning++ + } + + return nil +} + +// checkBindings runs through all the PVCs in the Pod and checks: +// * if the PVC is fully bound +// * if there are any conditions that require binding to fail and be retried +// +// It returns true when all of the Pod's PVCs are fully bound, and error if +// binding (and scheduling) needs to be retried +func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) { + podName := getPodName(pod) + if bindings == nil { + return false, fmt.Errorf("failed to get cached bindings for pod %q", podName) + } + if claimsToProvision == nil { + return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName) + } + + for _, binding := range bindings { + // Check for any conditions that might require scheduling retry + + // Check if pv still exists + pv, err := b.pvCache.GetPV(binding.pv.Name) + if err != nil || pv == nil { + return false, fmt.Errorf("failed to check pv binding: %v", err) + } + + // Check if pv.ClaimRef got dropped by unbindVolume() + if pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID == "" { + return false, fmt.Errorf("ClaimRef got reset for pv %q", pv.Name) + } + + // Check if pvc is fully bound + if isBound, _, err := b.isPVCBound(binding.pvc.Namespace, binding.pvc.Name); !isBound || err != nil { + return false, err + } + + // TODO; what if pvc is bound to the wrong pv? It means our assume cache should be reverted. + // Or will pv controller cleanup the pv.ClaimRef? + } + + for _, claim := range claimsToProvision { + bound, pvc, err := b.isPVCBound(claim.Namespace, claim.Name) + if err != nil || pvc == nil { + return false, fmt.Errorf("failed to check pvc binding: %v", err) + } + + // Check if selectedNode annotation is still set + if pvc.Annotations == nil { + return false, fmt.Errorf("selectedNode annotation reset for PVC %q", pvc.Name) + } + selectedNode := pvc.Annotations[annSelectedNode] + if selectedNode != pod.Spec.NodeName { + return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName) + } + + if !bound { + return false, nil + } + } + + // All pvs and pvcs that we operated on are bound + glog.V(4).Infof("All PVCs for pod %q are bound", podName) + return true, nil +} + +func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume) (bool, *v1.PersistentVolumeClaim, error) { if vol.PersistentVolumeClaim == nil { return true, nil, nil } pvcName := vol.PersistentVolumeClaim.ClaimName + return b.isPVCBound(namespace, pvcName) +} + +func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) { claim := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: pvcName, @@ -322,17 +431,13 @@ func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFull pvName := pvc.Spec.VolumeName if pvName != "" { - if checkFullyBound { - if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) { - glog.V(5).Infof("PVC %q is fully bound to PV %q", getPVCName(pvc), pvName) - return true, pvc, nil - } else { - glog.V(5).Infof("PVC %q is not fully bound to PV %q", getPVCName(pvc), pvName) - return false, pvc, nil - } + if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) { + glog.V(5).Infof("PVC %q is fully bound to PV %q", getPVCName(pvc), pvName) + return true, pvc, nil + } else { + glog.V(5).Infof("PVC %q is not fully bound to PV %q", getPVCName(pvc), pvName) + return false, pvc, nil } - glog.V(5).Infof("PVC %q is bound or prebound to PV %q", getPVCName(pvc), pvName) - return true, pvc, nil } glog.V(5).Infof("PVC %q is not bound", getPVCName(pvc)) @@ -342,7 +447,7 @@ func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFull // arePodVolumesBound returns true if all volumes are fully bound func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { for _, vol := range pod.Spec.Volumes { - if isBound, _, _ := b.isVolumeBound(pod.Namespace, &vol, true); !isBound { + if isBound, _, _ := b.isVolumeBound(pod.Namespace, &vol); !isBound { // Pod has at least one PVC that needs binding return false } @@ -358,7 +463,7 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV unboundClaims = []*bindingInfo{} for _, vol := range pod.Spec.Volumes { - volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol, false) + volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol) if err != nil { return nil, nil, nil, err } @@ -372,7 +477,8 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV if err != nil { return nil, nil, nil, err } - if delayBinding { + // Prebound PVCs are treated as unbound immediate binding + if delayBinding && pvc.Spec.VolumeName == "" { // Scheduler path unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc}) } else { @@ -518,19 +624,6 @@ type bindingInfo struct { pv *v1.PersistentVolume } -// Used in unit test errors -func (b bindingInfo) String() string { - pvcName := "" - pvName := "" - if b.pvc != nil { - pvcName = getPVCName(b.pvc) - } - if b.pv != nil { - pvName = b.pv.Name - } - return fmt.Sprintf("[PVC %q, PV %q]", pvcName, pvName) -} - type byPVCSize []*bindingInfo func (a byPVCSize) Len() int { diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go index c523011795..e3acc35163 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go @@ -31,6 +31,8 @@ type PodBindingCache interface { UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) // GetBindings will return the cached bindings for the given pod and node. + // A nil return value means that the entry was not found. An empty slice + // means that no binding operations are needed. GetBindings(pod *v1.Pod, node string) []*bindingInfo // UpdateProvisionedPVCs will update the cache with the given provisioning decisions @@ -38,6 +40,8 @@ type PodBindingCache interface { UpdateProvisionedPVCs(pod *v1.Pod, node string, provisionings []*v1.PersistentVolumeClaim) // GetProvisionedPVCs will return the cached provisioning decisions for the given pod and node. + // A nil return value means that the entry was not found. An empty slice + // means that no provisioning operations are needed. GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim // DeleteBindings will remove all cached bindings and provisionings for the given pod. @@ -46,7 +50,8 @@ type PodBindingCache interface { } type podBindingCache struct { - mutex sync.Mutex + // synchronizes bindingDecisions + rwMutex sync.RWMutex // Key = pod name // Value = nodeDecisions @@ -68,16 +73,16 @@ func NewPodBindingCache() PodBindingCache { } func (c *podBindingCache) DeleteBindings(pod *v1.Pod) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() podName := getPodName(pod) delete(c.bindingDecisions, podName) } func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() podName := getPodName(pod) decisions, ok := c.bindingDecisions[podName] @@ -97,8 +102,8 @@ func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*b } func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo { - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() podName := getPodName(pod) decisions, ok := c.bindingDecisions[podName] @@ -113,8 +118,8 @@ func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo { } func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs []*v1.PersistentVolumeClaim) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.Lock() + defer c.rwMutex.Unlock() podName := getPodName(pod) decisions, ok := c.bindingDecisions[podName] @@ -134,8 +139,8 @@ func (c *podBindingCache) UpdateProvisionedPVCs(pod *v1.Pod, node string, pvcs [ } func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim { - c.mutex.Lock() - defer c.mutex.Unlock() + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() podName := getPodName(pod) decisions, ok := c.bindingDecisions[podName] diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go b/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go index 4cefc58bfa..46e8f200e2 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go @@ -16,18 +16,15 @@ limitations under the License. package persistentvolume -import ( - "k8s.io/api/core/v1" -) +import "k8s.io/api/core/v1" type FakeVolumeBinderConfig struct { - AllBound bool - FindUnboundSatsified bool - FindBoundSatsified bool - FindErr error - AssumeBindingRequired bool - AssumeErr error - BindErr error + AllBound bool + FindUnboundSatsified bool + FindBoundSatsified bool + FindErr error + AssumeErr error + BindErr error } // NewVolumeBinder sets up all the caches needed for the scheduler to make @@ -48,9 +45,9 @@ func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVo return b.config.FindUnboundSatsified, b.config.FindBoundSatsified, b.config.FindErr } -func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (bool, bool, error) { +func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (bool, error) { b.AssumeCalled = true - return b.config.AllBound, b.config.AssumeBindingRequired, b.config.AssumeErr + return b.config.AllBound, b.config.AssumeErr } func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod) error { diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go index 52f7b0aa21..0bdfba3fd2 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/golang/glog" @@ -38,20 +39,30 @@ import ( ) var ( - unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", "1", &waitClass) - unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", "1", &waitClass) - preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", "1", &waitClass) - boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", "1", &waitClass) - boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", "1", &waitClass) - badPVC = makeBadPVC() - immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", "1", &immediateClass) - immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", "1", &immediateClass) - provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", pvcUnbound, "", "1", &waitClassWithProvisioner) - provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "1", &waitClassWithProvisioner) - provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "2", &waitClassWithProvisioner) - noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", pvcUnbound, "", "1", &waitClass) - topoMismatchPVC = makeTestPVC("topo-mismatch-pvc", "1Gi", pvcUnbound, "", "1", &topoMismatchClass) + // PVCs for manual binding + // TODO: clean up all of these + unboundPVC = makeTestPVC("unbound-pvc", "1G", "", pvcUnbound, "", "1", &waitClass) + unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", "", pvcUnbound, "", "1", &waitClass) + preboundPVC = makeTestPVC("prebound-pvc", "1G", "", pvcPrebound, "pv-node1a", "1", &waitClass) + preboundPVCNode1a = makeTestPVC("unbound-pvc", "1G", "", pvcPrebound, "pv-node1a", "1", &waitClass) + boundPVC = makeTestPVC("bound-pvc", "1G", "", pvcBound, "pv-bound", "1", &waitClass) + boundPVC2 = makeTestPVC("bound-pvc2", "1G", "", pvcBound, "pv-bound2", "1", &waitClass) + boundPVCNode1a = makeTestPVC("unbound-pvc", "1G", "", pvcBound, "pv-node1a", "1", &waitClass) + badPVC = makeBadPVC() + immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", "", pvcUnbound, "", "1", &immediateClass) + immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", "", pvcBound, "pv-bound-immediate", "1", &immediateClass) + // PVCs for dynamic provisioning + provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner) + provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner) + provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "2", &waitClassWithProvisioner) + provisionedPVCBound = makeTestPVC("provisioned-pvc", "1Gi", "", pvcBound, "some-pv", "1", &waitClassWithProvisioner) + noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClass) + topoMismatchPVC = makeTestPVC("topo-mismatch-pvc", "1Gi", "", pvcUnbound, "", "1", &topoMismatchClass) + + selectedNodePVC = makeTestPVC("provisioned-pvc", "1Gi", nodeLabelValue, pvcSelectedNode, "", "1", &waitClassWithProvisioner) + + // PVs for manual binding pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass) pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass) pvNode1b = makeTestPV("pv-node1b", "node1", "10G", "1", nil, waitClass) @@ -59,12 +70,13 @@ var ( pvNode2 = makeTestPV("pv-node2", "node2", "1G", "1", nil, waitClass) pvPrebound = makeTestPV("pv-prebound", "node1", "1G", "1", unboundPVC, waitClass) pvBound = makeTestPV("pv-bound", "node1", "1G", "1", boundPVC, waitClass) - pvNode1aBound = makeTestPV("pv-node1a", "node1", "1G", "1", unboundPVC, waitClass) - pvNode1bBound = makeTestPV("pv-node1b", "node1", "5G", "1", unboundPVC2, waitClass) - pvNode1bBoundHigherVersion = makeTestPV("pv-node1b", "node1", "5G", "2", unboundPVC2, waitClass) + pvNode1aBound = makeTestPV("pv-node1a", "node1", "5G", "1", unboundPVC, waitClass) + pvNode1bBound = makeTestPV("pv-node1b", "node1", "10G", "1", unboundPVC2, waitClass) + pvNode1bBoundHigherVersion = makeTestPV("pv-node1b", "node1", "10G", "2", unboundPVC2, waitClass) pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass) pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass) + // PVC/PV bindings for manual binding binding1a = makeBinding(unboundPVC, pvNode1a) binding1b = makeBinding(unboundPVC2, pvNode1b) bindingNoNode = makeBinding(unboundPVC, pvNoNode) @@ -72,11 +84,13 @@ var ( binding1aBound = makeBinding(unboundPVC, pvNode1aBound) binding1bBound = makeBinding(unboundPVC2, pvNode1bBound) + // storage class names waitClass = "waitClass" immediateClass = "immediateClass" waitClassWithProvisioner = "waitClassWithProvisioner" topoMismatchClass = "topoMismatchClass" + // node topology nodeLabelKey = "nodeKey" nodeLabelValue = "node1" ) @@ -102,7 +116,8 @@ func newTestBinder(t *testing.T) *testEnv { client, pvcInformer, informerFactory.Core().V1().PersistentVolumes(), - classInformer) + classInformer, + 10*time.Second) // Add storageclasses waitMode := storagev1.VolumeBindingWaitForFirstConsumer @@ -247,17 +262,44 @@ func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingIn func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) { cache := env.internalBinder.podBindingCache bindings := cache.GetBindings(pod, node) + if aLen, eLen := len(bindings), len(expectedBindings); aLen != eLen { + t.Errorf("Test %q failed. expected %v bindings, got %v", name, eLen, aLen) + } else if expectedBindings == nil && bindings != nil { + // nil and empty are different + t.Errorf("Test %q failed. expected nil bindings, got empty", name) + } else if expectedBindings != nil && bindings == nil { + // nil and empty are different + t.Errorf("Test %q failed. expected empty bindings, got nil", name) + } else { + for i := 0; i < aLen; i++ { + // Validate PV + if !reflect.DeepEqual(expectedBindings[i].pv, bindings[i].pv) { + t.Errorf("Test %q failed. binding.pv doesn't match [A-expected, B-got]: %s", name, diff.ObjectDiff(expectedBindings[i].pv, bindings[i].pv)) + } - if !reflect.DeepEqual(expectedBindings, bindings) { - t.Errorf("Test %q failed: Expected bindings %+v, got %+v", name, expectedBindings, bindings) + // Validate PVC + if !reflect.DeepEqual(expectedBindings[i].pvc, bindings[i].pvc) { + t.Errorf("Test %q failed. binding.pvc doesn't match [A-expected, B-got]: %s", name, diff.ObjectDiff(expectedBindings[i].pvc, bindings[i].pvc)) + } + } } provisionedClaims := cache.GetProvisionedPVCs(pod, node) - - if !reflect.DeepEqual(expectedProvisionings, provisionedClaims) { - t.Errorf("Test %q failed: Expected provisionings %+v, got %+v", name, expectedProvisionings, provisionedClaims) + if aLen, eLen := len(provisionedClaims), len(expectedProvisionings); aLen != eLen { + t.Errorf("Test %q failed. expected %v provisioned claims, got %v", name, eLen, aLen) + } else if expectedProvisionings == nil && provisionedClaims != nil { + // nil and empty are different + t.Errorf("Test %q failed. expected nil provisionings, got empty", name) + } else if expectedProvisionings != nil && provisionedClaims == nil { + // nil and empty are different + t.Errorf("Test %q failed. expected empty provisionings, got nil", name) + } else { + for i := 0; i < aLen; i++ { + if !reflect.DeepEqual(expectedProvisionings[i], provisionedClaims[i]) { + t.Errorf("Test %q failed. provisioned claims doesn't match [A-expected, B-got]: %s", name, diff.ObjectDiff(expectedProvisionings[i], provisionedClaims[i])) + } + } } - } func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) []*bindingInfo { @@ -266,8 +308,6 @@ func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) } func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) { - // TODO: Check binding cache - // Check pv cache pvCache := env.internalBinder.pvCache for _, b := range bindings { @@ -383,17 +423,21 @@ const ( pvcUnbound = iota pvcPrebound pvcBound + pvcSelectedNode ) -func makeTestPVC(name, size string, pvcBoundState int, pvName, resourceVersion string, className *string) *v1.PersistentVolumeClaim { +func makeTestPVC(name, size, node string, pvcBoundState int, pvName, resourceVersion string, className *string) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{ + Kind: "PersistentVolumeClaim", + APIVersion: "v1", + }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: "testns", UID: types.UID("pvc-uid"), ResourceVersion: resourceVersion, SelfLink: testapi.Default.SelfLink("pvc", name), - Annotations: map[string]string{}, }, Spec: v1.PersistentVolumeClaimSpec{ Resources: v1.ResourceRequirements{ @@ -406,6 +450,9 @@ func makeTestPVC(name, size string, pvcBoundState int, pvName, resourceVersion s } switch pvcBoundState { + case pvcSelectedNode: + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node) + // don't fallthrough case pvcBound: metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annBindCompleted, "yes") fallthrough @@ -454,9 +501,12 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV if boundToPVC != nil { pv.Spec.ClaimRef = &v1.ObjectReference{ - Name: boundToPVC.Name, - Namespace: boundToPVC.Namespace, - UID: boundToPVC.UID, + Kind: boundToPVC.Kind, + APIVersion: boundToPVC.APIVersion, + ResourceVersion: boundToPVC.ResourceVersion, + Name: boundToPVC.Name, + Namespace: boundToPVC.Namespace, + UID: boundToPVC.UID, } metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annBoundByController, "yes") } @@ -464,6 +514,24 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV return pv } +func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim { + newPVC := pvc.DeepCopy() + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node) + return newPVC +} + +func pvcSetEmptyAnnotations(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim { + newPVC := pvc.DeepCopy() + newPVC.Annotations = map[string]string{} + return newPVC +} + +func pvRemoveClaimUID(pv *v1.PersistentVolume) *v1.PersistentVolume { + newPV := pv.DeepCopy() + newPV.Spec.ClaimRef.UID = "" + return newPV +} + func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -515,7 +583,7 @@ func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindin func addProvisionAnn(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim { res := pvc.DeepCopy() // Add provision related annotations - res.Annotations[annSelectedNode] = nodeLabelValue + metav1.SetMetaDataAnnotation(&res.ObjectMeta, annSelectedNode, nodeLabelValue) return res } @@ -568,10 +636,9 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { shouldFail: true, }, "prebound-pvc": { - podPVCs: []*v1.PersistentVolumeClaim{preboundPVC}, - pvs: []*v1.PersistentVolume{pvNode1aBound}, - expectedUnbound: true, - expectedBound: true, + podPVCs: []*v1.PersistentVolumeClaim{preboundPVC}, + pvs: []*v1.PersistentVolume{pvNode1aBound}, + shouldFail: true, }, "unbound-pvc,pv-same-node": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, @@ -621,11 +688,9 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { expectedBound: true, }, "one-prebound,one-unbound": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, preboundPVC}, - pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1a}, - expectedUnbound: true, - expectedBound: true, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, preboundPVC}, + pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + shouldFail: true, }, "immediate-bound-pvc": { podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC}, @@ -834,69 +899,64 @@ func TestAssumePodVolumes(t *testing.T) { provisionedPVCs []*v1.PersistentVolumeClaim // Expected return values - shouldFail bool - expectedBindingRequired bool - expectedAllBound bool + shouldFail bool + expectedAllBound bool - // if nil, use bindings - expectedBindings []*bindingInfo + expectedBindings []*bindingInfo + expectedProvisionings []*v1.PersistentVolumeClaim }{ "all-bound": { podPVCs: []*v1.PersistentVolumeClaim{boundPVC}, pvs: []*v1.PersistentVolume{pvBound}, expectedAllBound: true, }, - "prebound-pvc": { - podPVCs: []*v1.PersistentVolumeClaim{preboundPVC}, - pvs: []*v1.PersistentVolume{pvNode1a}, - }, "one-binding": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a}, - pvs: []*v1.PersistentVolume{pvNode1a}, - expectedBindingRequired: true, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + bindings: []*bindingInfo{binding1a}, + pvs: []*v1.PersistentVolume{pvNode1a}, + expectedBindings: []*bindingInfo{binding1aBound}, + expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "two-bindings": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, - bindings: []*bindingInfo{binding1a, binding1b}, - pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedBindingRequired: true, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, + bindings: []*bindingInfo{binding1a, binding1b}, + pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + expectedBindings: []*bindingInfo{binding1aBound, binding1bBound}, + expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "pv-already-bound": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1aBound}, - pvs: []*v1.PersistentVolume{pvNode1aBound}, - expectedBindingRequired: false, - expectedBindings: []*bindingInfo{}, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + bindings: []*bindingInfo{binding1aBound}, + pvs: []*v1.PersistentVolume{pvNode1aBound}, + expectedBindings: []*bindingInfo{binding1aBound}, + expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "claimref-failed": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a, bindingBad}, - pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - shouldFail: true, - expectedBindingRequired: true, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + bindings: []*bindingInfo{binding1a, bindingBad}, + pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + shouldFail: true, }, "tmpupdate-failed": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a, binding1b}, - pvs: []*v1.PersistentVolume{pvNode1a}, - shouldFail: true, - expectedBindingRequired: true, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + bindings: []*bindingInfo{binding1a, binding1b}, + pvs: []*v1.PersistentVolume{pvNode1a}, + shouldFail: true, }, "one-binding, one-pvc-provisioned": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, - bindings: []*bindingInfo{binding1a}, - pvs: []*v1.PersistentVolume{pvNode1a}, - provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, - expectedBindingRequired: true, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, + bindings: []*bindingInfo{binding1a}, + pvs: []*v1.PersistentVolume{pvNode1a}, + provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + expectedBindings: []*bindingInfo{binding1aBound}, + expectedProvisionings: []*v1.PersistentVolumeClaim{selectedNodePVC}, }, "one-binding, one-provision-tmpupdate-failed": { - podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion}, - bindings: []*bindingInfo{binding1a}, - pvs: []*v1.PersistentVolume{pvNode1a}, - provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2}, - shouldFail: true, - expectedBindingRequired: true, + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion}, + bindings: []*bindingInfo{binding1a}, + pvs: []*v1.PersistentVolume{pvNode1a}, + provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2}, + shouldFail: true, }, } @@ -911,7 +971,7 @@ func TestAssumePodVolumes(t *testing.T) { testEnv.initVolumes(scenario.pvs, scenario.pvs) // Execute - allBound, bindingRequired, err := testEnv.binder.AssumePodVolumes(pod, "node1") + allBound, err := testEnv.binder.AssumePodVolumes(pod, "node1") // Validate if !scenario.shouldFail && err != nil { @@ -920,24 +980,25 @@ func TestAssumePodVolumes(t *testing.T) { if scenario.shouldFail && err == nil { t.Errorf("Test %q failed: returned success but expected error", name) } - if scenario.expectedBindingRequired != bindingRequired { - t.Errorf("Test %q failed: returned unexpected bindingRequired: %v", name, bindingRequired) - } if scenario.expectedAllBound != allBound { t.Errorf("Test %q failed: returned unexpected allBound: %v", name, allBound) } if scenario.expectedBindings == nil { scenario.expectedBindings = scenario.bindings } - if scenario.shouldFail { - testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs) - } else { - testEnv.validateAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs) + if scenario.expectedProvisionings == nil { + scenario.expectedProvisionings = scenario.provisionedPVCs } + if scenario.shouldFail { + testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings, scenario.expectedProvisionings) + } else { + testEnv.validateAssume(t, name, pod, scenario.expectedBindings, scenario.expectedProvisionings) + } + testEnv.validatePodCache(t, name, pod.Spec.NodeName, pod, scenario.expectedBindings, scenario.expectedProvisionings) } } -func TestBindPodVolumes(t *testing.T) { +func TestBindAPIUpdate(t *testing.T) { scenarios := map[string]struct { // Inputs bindings []*bindingInfo @@ -960,34 +1021,56 @@ func TestBindPodVolumes(t *testing.T) { // if nil, use expectedPVCs expectedAPIPVCs []*v1.PersistentVolumeClaim }{ - "all-bound": {}, - "not-fully-bound": { - bindings: []*bindingInfo{}, + "nothing-to-bind-nil": { + shouldFail: true, + }, + "nothing-to-bind-bindings-nil": { + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + shouldFail: true, + }, + "nothing-to-bind-provisionings-nil": { + bindings: []*bindingInfo{}, + shouldFail: true, + }, + "nothing-to-bind-empty": { + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "one-binding": { - bindings: []*bindingInfo{binding1aBound}, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, + bindings: []*bindingInfo{binding1aBound}, + cachedPVs: []*v1.PersistentVolume{pvNode1a}, + expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "two-bindings": { - bindings: []*bindingInfo{binding1aBound, binding1bBound}, - cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, + bindings: []*bindingInfo{binding1aBound, binding1bBound}, + cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + }, + "api-already-updated": { + bindings: []*bindingInfo{binding1aBound}, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "api-update-failed": { - bindings: []*bindingInfo{binding1aBound, binding1bBound}, - cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - apiPVs: []*v1.PersistentVolume{pvNode1a, pvNode1bBoundHigherVersion}, - expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1b}, - expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion}, - shouldFail: true, + bindings: []*bindingInfo{binding1aBound, binding1bBound}, + cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + apiPVs: []*v1.PersistentVolume{pvNode1a, pvNode1bBoundHigherVersion}, + expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1b}, + expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + shouldFail: true, }, "one-provisioned-pvc": { + bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, }, "provision-api-update-failed": { + bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)}, cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVC2}, apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVCHigherVersion}, @@ -995,7 +1078,7 @@ func TestBindPodVolumes(t *testing.T) { expectedAPIPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVCHigherVersion}, shouldFail: true, }, - "bingding-succeed, provision-api-update-failed": { + "binding-succeed, provision-api-update-failed": { bindings: []*bindingInfo{binding1aBound}, cachedPVs: []*v1.PersistentVolume{pvNode1a}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, @@ -1008,7 +1091,7 @@ func TestBindPodVolumes(t *testing.T) { }, } for name, scenario := range scenarios { - glog.V(5).Infof("Running test case %q", name) + glog.V(4).Infof("Running test case %q", name) // Setup testEnv := newTestBinder(t) @@ -1024,7 +1107,7 @@ func TestBindPodVolumes(t *testing.T) { testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings, scenario.provisionedPVCs) // Execute - err := testEnv.binder.BindPodVolumes(pod) + err := testEnv.internalBinder.bindAPIUpdate(pod.Name, scenario.bindings, scenario.provisionedPVCs) // Validate if !scenario.shouldFail && err != nil { @@ -1044,6 +1127,301 @@ func TestBindPodVolumes(t *testing.T) { } } +func TestCheckBindings(t *testing.T) { + scenarios := map[string]struct { + // Inputs + bindings []*bindingInfo + cachedPVs []*v1.PersistentVolume + + provisionedPVCs []*v1.PersistentVolumeClaim + cachedPVCs []*v1.PersistentVolumeClaim + + // Expected return values + shouldFail bool + expectedBound bool + }{ + "nothing-to-bind-nil": { + shouldFail: true, + }, + "nothing-to-bind-bindings-nil": { + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + shouldFail: true, + }, + "nothing-to-bind-provisionings-nil": { + bindings: []*bindingInfo{}, + shouldFail: true, + }, + "nothing-to-bind": { + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + expectedBound: true, + }, + "binding-bound": { + bindings: []*bindingInfo{binding1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + expectedBound: true, + }, + "binding-prebound": { + bindings: []*bindingInfo{binding1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + cachedPVCs: []*v1.PersistentVolumeClaim{preboundPVCNode1a}, + }, + "binding-unbound": { + bindings: []*bindingInfo{binding1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + }, + "binding-pvc-not-exists": { + bindings: []*bindingInfo{binding1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + shouldFail: true, + }, + "binding-pv-not-exists": { + bindings: []*bindingInfo{binding1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + shouldFail: true, + }, + "binding-claimref-nil": { + bindings: []*bindingInfo{binding1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + cachedPVs: []*v1.PersistentVolume{pvNode1a}, + cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + shouldFail: true, + }, + "binding-claimref-uid-empty": { + bindings: []*bindingInfo{binding1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + cachedPVs: []*v1.PersistentVolume{pvRemoveClaimUID(pvNode1aBound)}, + cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + shouldFail: true, + }, + "binding-one-bound,one-unbound": { + bindings: []*bindingInfo{binding1aBound, binding1bBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{}, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, + cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, unboundPVC2}, + }, + "provisioning-pvc-bound": { + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)}, + expectedBound: true, + }, + "provisioning-pvc-unbound": { + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + }, + "provisioning-pvc-not-exists": { + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + shouldFail: true, + }, + "provisioning-pvc-annotations-nil": { + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + shouldFail: true, + }, + "provisioning-pvc-selected-node-dropped": { + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetEmptyAnnotations(provisionedPVC)}, + shouldFail: true, + }, + "provisioning-pvc-selected-node-wrong-node": { + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetSelectedNode(provisionedPVC, "wrong-node")}, + shouldFail: true, + }, + "binding-bound-provisioning-unbound": { + bindings: []*bindingInfo{binding1aBound}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, addProvisionAnn(provisionedPVC)}, + }, + } + + for name, scenario := range scenarios { + glog.V(4).Infof("Running test case %q", name) + + // Setup + pod := makePod(nil) + testEnv := newTestBinder(t) + testEnv.initVolumes(scenario.cachedPVs, nil) + testEnv.initClaims(scenario.cachedPVCs, nil) + + // Execute + allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs) + + // Validate + if !scenario.shouldFail && err != nil { + t.Errorf("Test %q failed: returned error: %v", name, err) + } + if scenario.shouldFail && err == nil { + t.Errorf("Test %q failed: returned success but expected error", name) + } + if scenario.expectedBound != allBound { + t.Errorf("Test %q failed: returned bound %v", name, allBound) + } + } +} + +func TestBindPodVolumes(t *testing.T) { + scenarios := map[string]struct { + // Inputs + // These tests only support a single pv and pvc and static binding + bindingsNil bool // Pass in nil bindings slice + binding *bindingInfo + cachedPV *v1.PersistentVolume + cachedPVC *v1.PersistentVolumeClaim + apiPV *v1.PersistentVolume + + // This function runs with a delay of 5 seconds + delayFunc func(*testing.T, *testEnv, *v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim) + + // Expected return values + shouldFail bool + }{ + "nothing-to-bind-nil": { + bindingsNil: true, + shouldFail: true, + }, + "nothing-to-bind-empty": {}, + "already-bound": { + binding: binding1aBound, + cachedPV: pvNode1aBound, + cachedPVC: boundPVCNode1a, + }, + "binding-succeeds-after-time": { + binding: binding1aBound, + cachedPV: pvNode1a, + cachedPVC: unboundPVC, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) { + // Update PVC to be fully bound to PV + newPVC := pvc.DeepCopy() + newPVC.ResourceVersion = "100" + newPVC.Spec.VolumeName = pv.Name + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") + + // Update pvc cache, fake client doesn't invoke informers + internalBinder, ok := testEnv.binder.(*volumeBinder) + if !ok { + t.Fatalf("Failed to convert to internal binder") + } + + pvcCache := internalBinder.pvcCache + internalPVCCache, ok := pvcCache.(*pvcAssumeCache) + if !ok { + t.Fatalf("Failed to convert to internal PVC cache") + } + internalPVCCache.add(newPVC) + }, + }, + "pod-deleted-after-time": { + binding: binding1aBound, + cachedPV: pvNode1a, + cachedPVC: unboundPVC, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) { + bindingsCache := testEnv.binder.GetBindingsCache() + if bindingsCache == nil { + t.Fatalf("Failed to get bindings cache") + } + + // Delete the pod from the cache + bindingsCache.DeleteBindings(pod) + + // Check that it's deleted + bindings := bindingsCache.GetBindings(pod, "node1") + if bindings != nil { + t.Fatalf("Failed to delete bindings") + } + }, + shouldFail: true, + }, + "binding-times-out": { + binding: binding1aBound, + cachedPV: pvNode1a, + cachedPVC: unboundPVC, + shouldFail: true, + }, + "binding-fails": { + binding: binding1bBound, + cachedPV: pvNode1b, + apiPV: pvNode1bBoundHigherVersion, + cachedPVC: unboundPVC2, + shouldFail: true, + }, + "check-fails": { + binding: binding1aBound, + cachedPV: pvNode1a, + cachedPVC: unboundPVC, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) { + // Delete PVC + // Update pvc cache, fake client doesn't invoke informers + internalBinder, ok := testEnv.binder.(*volumeBinder) + if !ok { + t.Fatalf("Failed to convert to internal binder") + } + + pvcCache := internalBinder.pvcCache + internalPVCCache, ok := pvcCache.(*pvcAssumeCache) + if !ok { + t.Fatalf("Failed to convert to internal PVC cache") + } + internalPVCCache.delete(pvc) + }, + shouldFail: true, + }, + } + + for name, scenario := range scenarios { + glog.V(4).Infof("Running test case %q", name) + + // Setup + pod := makePod(nil) + if scenario.apiPV == nil { + scenario.apiPV = scenario.cachedPV + } + testEnv := newTestBinder(t) + if !scenario.bindingsNil { + if scenario.binding != nil { + testEnv.initVolumes([]*v1.PersistentVolume{scenario.cachedPV}, []*v1.PersistentVolume{scenario.apiPV}) + testEnv.initClaims([]*v1.PersistentVolumeClaim{scenario.cachedPVC}, nil) + testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{scenario.binding}, []*v1.PersistentVolumeClaim{}) + } else { + testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{}, []*v1.PersistentVolumeClaim{}) + } + } + + if scenario.delayFunc != nil { + go func() { + time.Sleep(5 * time.Second) + glog.V(5).Infof("Running delay function") + scenario.delayFunc(t, testEnv, pod, scenario.binding.pv, scenario.binding.pvc) + }() + } + + // Execute + err := testEnv.binder.BindPodVolumes(pod) + + // Validate + if !scenario.shouldFail && err != nil { + t.Errorf("Test %q failed: returned error: %v", name, err) + } + if scenario.shouldFail && err == nil { + t.Errorf("Test %q failed: returned success but expected error", name) + } + } +} + func TestFindAssumeVolumes(t *testing.T) { // Set feature gate utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true") @@ -1080,17 +1458,15 @@ func TestFindAssumeVolumes(t *testing.T) { expectedBindings := testEnv.getPodBindings(t, "before-assume", testNode.Name, pod) // 2. Assume matches - allBound, bindingRequired, err := testEnv.binder.AssumePodVolumes(pod, testNode.Name) + allBound, err := testEnv.binder.AssumePodVolumes(pod, testNode.Name) if err != nil { t.Errorf("Test failed: AssumePodVolumes returned error: %v", err) } if allBound { t.Errorf("Test failed: detected unbound volumes as bound") } - if !bindingRequired { - t.Errorf("Test failed: binding not required") - } testEnv.validateAssume(t, "assume", pod, expectedBindings, nil) + // After assume, claimref should be set on pv expectedBindings = testEnv.getPodBindings(t, "after-assume", testNode.Name, pod) @@ -1106,6 +1482,6 @@ func TestFindAssumeVolumes(t *testing.T) { if !unboundSatisfied { t.Errorf("Test failed: couldn't find PVs for all PVCs") } - testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, nil) + testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, []*v1.PersistentVolumeClaim{}) } } diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index 6d9f4c5b40..1c5fee16b8 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -85,6 +85,11 @@ type KubeSchedulerConfiguration struct { // DEPRECATED. // Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity. FailureDomains string + + // Duration to wait for a binding operation to complete before timing out + // Value must be non-negative integer. The value zero indicates no waiting. + // If this value is nil, the default value will be used. + BindTimeoutSeconds *int64 } // SchedulerAlgorithmSource is the source of a scheduler algorithm. One source diff --git a/pkg/scheduler/apis/config/v1alpha1/defaults.go b/pkg/scheduler/apis/config/v1alpha1/defaults.go index b63a2a1704..ea608b6b37 100644 --- a/pkg/scheduler/apis/config/v1alpha1/defaults.go +++ b/pkg/scheduler/apis/config/v1alpha1/defaults.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" apiserverconfigv1alpha1 "k8s.io/apiserver/pkg/apis/config/v1alpha1" kubescedulerconfigv1alpha1 "k8s.io/kube-scheduler/config/v1alpha1" + // this package shouldn't really depend on other k8s.io/kubernetes code api "k8s.io/kubernetes/pkg/apis/core" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" @@ -102,4 +103,9 @@ func SetDefaults_KubeSchedulerConfiguration(obj *kubescedulerconfigv1alpha1.Kube // Use the default LeaderElectionConfiguration options apiserverconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&obj.LeaderElection.LeaderElectionConfiguration) + + if obj.BindTimeoutSeconds == nil { + defaultBindTimeoutSeconds := int64(600) + obj.BindTimeoutSeconds = &defaultBindTimeoutSeconds + } } diff --git a/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go index 3c7b420d94..e926bf835f 100644 --- a/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go @@ -121,6 +121,7 @@ func autoConvert_v1alpha1_KubeSchedulerConfiguration_To_config_KubeSchedulerConf out.DisablePreemption = in.DisablePreemption out.PercentageOfNodesToScore = in.PercentageOfNodesToScore out.FailureDomains = in.FailureDomains + out.BindTimeoutSeconds = (*int64)(unsafe.Pointer(in.BindTimeoutSeconds)) return nil } @@ -149,6 +150,7 @@ func autoConvert_config_KubeSchedulerConfiguration_To_v1alpha1_KubeSchedulerConf out.DisablePreemption = in.DisablePreemption out.PercentageOfNodesToScore = in.PercentageOfNodesToScore out.FailureDomains = in.FailureDomains + out.BindTimeoutSeconds = (*int64)(unsafe.Pointer(in.BindTimeoutSeconds)) return nil } diff --git a/pkg/scheduler/apis/config/validation/validation.go b/pkg/scheduler/apis/config/validation/validation.go index 679a448148..41c4c1c2e5 100644 --- a/pkg/scheduler/apis/config/validation/validation.go +++ b/pkg/scheduler/apis/config/validation/validation.go @@ -41,6 +41,9 @@ func ValidateKubeSchedulerConfiguration(cc *config.KubeSchedulerConfiguration) f if cc.HardPodAffinitySymmetricWeight < 0 || cc.HardPodAffinitySymmetricWeight > 100 { allErrs = append(allErrs, field.Invalid(field.NewPath("hardPodAffinitySymmetricWeight"), cc.HardPodAffinitySymmetricWeight, "not in valid range 0-100")) } + if cc.BindTimeoutSeconds == nil { + allErrs = append(allErrs, field.Required(field.NewPath("bindTimeoutSeconds"), "")) + } return allErrs } diff --git a/pkg/scheduler/apis/config/validation/validation_test.go b/pkg/scheduler/apis/config/validation/validation_test.go index 4800741c32..6d154c993f 100644 --- a/pkg/scheduler/apis/config/validation/validation_test.go +++ b/pkg/scheduler/apis/config/validation/validation_test.go @@ -17,15 +17,17 @@ limitations under the License. package validation import ( + "testing" + "time" + apimachinery "k8s.io/apimachinery/pkg/apis/config" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apiserver "k8s.io/apiserver/pkg/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config" - "testing" - "time" ) func TestValidateKubeSchedulerConfiguration(t *testing.T) { + testTimeout := int64(0) validConfig := &config.KubeSchedulerConfiguration{ SchedulerName: "me", HealthzBindAddress: "0.0.0.0:10254", @@ -56,6 +58,7 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { RetryPeriod: metav1.Duration{Duration: 5 * time.Second}, }, }, + BindTimeoutSeconds: &testTimeout, } HardPodAffinitySymmetricWeightGt100 := validConfig.DeepCopy() @@ -86,6 +89,9 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { enableContentProfilingSetWithoutEnableProfiling.EnableProfiling = false enableContentProfilingSetWithoutEnableProfiling.EnableContentionProfiling = true + bindTimeoutUnset := validConfig.DeepCopy() + bindTimeoutUnset.BindTimeoutSeconds = nil + scenarios := map[string]struct { expectedToFail bool config *config.KubeSchedulerConfiguration @@ -126,6 +132,10 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { expectedToFail: true, config: HardPodAffinitySymmetricWeightLt0, }, + "bind-timeout-unset": { + expectedToFail: true, + config: bindTimeoutUnset, + }, } for name, scenario := range scenarios { diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go index 5877096235..ee751173c2 100644 --- a/pkg/scheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -32,6 +32,11 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati out.LeaderElection = in.LeaderElection out.ClientConnection = in.ClientConnection out.DebuggingConfiguration = in.DebuggingConfiguration + if in.BindTimeoutSeconds != nil { + in, out := &in.BindTimeoutSeconds, &out.BindTimeoutSeconds + *out = new(int64) + **out = **in + } return } diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 89a68d52ba..e72c1cfe6f 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -159,6 +159,7 @@ type ConfigFactoryArgs struct { EnableEquivalenceClassCache bool DisablePreemption bool PercentageOfNodesToScore int32 + BindTimeoutSeconds int64 } // NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only @@ -305,7 +306,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator { if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // Setup volume binder - c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer) + c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) args.StorageClassInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 6d15e3e06e..7ed14ebeb7 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -49,6 +49,7 @@ import ( const ( enableEquivalenceCache = true disablePodPreemption = false + bindTimeoutSeconds = 600 ) func TestCreate(t *testing.T) { @@ -557,6 +558,7 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh enableEquivalenceCache, disablePodPreemption, schedulerapi.DefaultPercentageOfNodesToScore, + bindTimeoutSeconds, }) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 87fe364db8..3d836b6ee5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -17,7 +17,6 @@ limitations under the License. package scheduler import ( - "fmt" "time" "k8s.io/api/core/v1" @@ -184,10 +183,6 @@ func (sched *Scheduler) Run() { return } - if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything) - } - go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) } @@ -265,17 +260,12 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e return nodeName, err } -// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required. -// -// If volume binding is required, then the bind volumes routine will update the pod to send it back through -// the scheduler. -// -// Otherwise, return nil error and continue to assume the pod. +// assumeVolumes will update the volume cache with the chosen bindings // // This function modifies assumed if volume binding is required. -func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error { +func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) { if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) + allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) if err != nil { sched.config.Error(assumed, err) sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err) @@ -285,76 +275,38 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error Reason: "SchedulerError", Message: err.Error(), }) - return err } - if !allBound { - err = fmt.Errorf("Volume binding started, waiting for completion") - if bindingRequired { - if sched.config.Ecache != nil { - invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) - sched.config.Ecache.InvalidatePredicates(invalidPredicates) - } - - // bindVolumesWorker() will update the Pod object to put it back in the scheduler queue - sched.config.VolumeBinder.BindQueue.Add(assumed) - } else { - // We are just waiting for PV controller to finish binding, put it back in the - // scheduler queue - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "VolumeBindingWaiting", - }) - } - return err + // Invalidate ecache because assumed volumes could have affected the cached + // pvs for other pods + if sched.config.Ecache != nil { + invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) + sched.config.Ecache.InvalidatePredicates(invalidPredicates) } } - return nil + return } -// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to -// make the API update for volume binding. -// This function runs forever until the volume BindQueue is closed. -func (sched *Scheduler) bindVolumesWorker() { - workFunc := func() bool { - keyObj, quit := sched.config.VolumeBinder.BindQueue.Get() - if quit { - return true - } - defer sched.config.VolumeBinder.BindQueue.Done(keyObj) +// bindVolumes will make the API update with the assumed bindings and wait until +// the PV controller has completely finished the binding operation. +// +// If binding errors, times out or gets undone, then an error will be returned to +// retry scheduling. +func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { + var reason string + var eventType string - assumed, ok := keyObj.(*v1.Pod) - if !ok { - glog.V(4).Infof("Object is not a *v1.Pod") - return false + glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) + err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed) + if err != nil { + glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err) + + // Unassume the Pod and retry scheduling + if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil { + glog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - // TODO: add metrics - var reason string - var eventType string - - glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) - - // The Pod is always sent back to the scheduler afterwards. - err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed) - if err != nil { - glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err) - reason = "VolumeBindingFailed" - eventType = v1.EventTypeWarning - } else { - glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) - reason = "VolumeBindingWaiting" - eventType = v1.EventTypeNormal - err = fmt.Errorf("Volume binding started, waiting for completion") - } - - // Always fail scheduling regardless of binding success. - // The Pod needs to be sent back through the scheduler to: - // * Retry volume binding if it fails. - // * Retry volume binding if dynamic provisioning fails. - // * Bind the Pod to the Node once all volumes are bound. + reason = "VolumeBindingFailed" + eventType = v1.EventTypeWarning sched.config.Error(assumed, err) sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err) sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ @@ -362,15 +314,11 @@ func (sched *Scheduler) bindVolumesWorker() { Status: v1.ConditionFalse, Reason: reason, }) - return false + return err } - for { - if quit := workFunc(); quit { - glog.V(4).Infof("bindVolumesWorker shutting down") - break - } - } + glog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) + return nil } // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. @@ -478,16 +426,12 @@ func (sched *Scheduler) scheduleOne() { // Assume volumes first before assuming the pod. // - // If no volumes need binding, then nil is returned, and continue to assume the pod. + // If all volumes are completely bound, then allBound is true and binding will be skipped. // - // Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes. - // scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod. - // - // After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for - // subsequent passes until all volumes are fully bound. + // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. // // This function modifies 'assumedPod' if volume binding is required. - err = sched.assumeAndBindVolumes(assumedPod, suggestedHost) + allBound, err := sched.assumeVolumes(assumedPod, suggestedHost) if err != nil { return } @@ -499,6 +443,14 @@ func (sched *Scheduler) scheduleOne() { } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { + // Bind volumes first before Pod + if !allBound { + err = sched.bindVolumes(assumedPod) + if err != nil { + return + } + } + err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 4062b94e67..e950141ae4 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -707,8 +707,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { }, expectAssumeCalled: true, expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}}, - - eventReason: "Scheduled", + eventReason: "Scheduled", }, { name: "bound/invalid pv affinity", @@ -739,28 +738,15 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"), }, { - name: "unbound/found matches", + name: "unbound/found matches/bind succeeds", volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ - FindUnboundSatsified: true, - FindBoundSatsified: true, - AssumeBindingRequired: true, + FindUnboundSatsified: true, + FindBoundSatsified: true, }, expectAssumeCalled: true, expectBindCalled: true, - eventReason: "FailedScheduling", - expectError: fmt.Errorf("Volume binding started, waiting for completion"), - }, - { - name: "unbound/found matches/already-bound", - volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ - FindUnboundSatsified: true, - FindBoundSatsified: true, - AssumeBindingRequired: false, - }, - expectAssumeCalled: true, - expectBindCalled: false, - eventReason: "FailedScheduling", - expectError: fmt.Errorf("Volume binding started, waiting for completion"), + expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}}, + eventReason: "Scheduled", }, { name: "predicate error", @@ -784,10 +770,9 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { { name: "bind error", volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ - FindUnboundSatsified: true, - FindBoundSatsified: true, - AssumeBindingRequired: true, - BindErr: bindErr, + FindUnboundSatsified: true, + FindBoundSatsified: true, + BindErr: bindErr, }, expectAssumeCalled: true, expectBindCalled: true, @@ -814,8 +799,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { close(eventChan) }) - go fakeVolumeBinder.Run(s.bindVolumesWorker, stop) - s.scheduleOne() // Wait for pod to succeed or fail scheduling diff --git a/pkg/scheduler/volumebinder/BUILD b/pkg/scheduler/volumebinder/BUILD index c1eaa52ec7..518178377a 100644 --- a/pkg/scheduler/volumebinder/BUILD +++ b/pkg/scheduler/volumebinder/BUILD @@ -8,11 +8,9 @@ go_library( deps = [ "//pkg/controller/volume/persistentvolume:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/scheduler/volumebinder/volume_binder.go b/pkg/scheduler/volumebinder/volume_binder.go index 1dfe41448c..4a0089a479 100644 --- a/pkg/scheduler/volumebinder/volume_binder.go +++ b/pkg/scheduler/volumebinder/volume_binder.go @@ -20,19 +20,15 @@ import ( "time" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" ) -// VolumeBinder sets up the volume binding library and manages -// the volume binding operations with a queue. +// VolumeBinder sets up the volume binding library type VolumeBinder struct { - Binder persistentvolume.SchedulerVolumeBinder - BindQueue *workqueue.Type + Binder persistentvolume.SchedulerVolumeBinder } // NewVolumeBinder sets up the volume binding library and binding queue @@ -40,30 +36,21 @@ func NewVolumeBinder( client clientset.Interface, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, - storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder { + storageClassInformer storageinformers.StorageClassInformer, + bindTimeout time.Duration) *VolumeBinder { return &VolumeBinder{ - Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer), - BindQueue: workqueue.NewNamed("podsToBind"), + Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer, bindTimeout), } } // NewFakeVolumeBinder sets up a fake volume binder and binding queue func NewFakeVolumeBinder(config *persistentvolume.FakeVolumeBinderConfig) *VolumeBinder { return &VolumeBinder{ - Binder: persistentvolume.NewFakeVolumeBinder(config), - BindQueue: workqueue.NewNamed("podsToBind"), + Binder: persistentvolume.NewFakeVolumeBinder(config), } } -// Run starts a goroutine to handle the binding queue with the given function. -func (b *VolumeBinder) Run(bindWorkFunc func(), stopCh <-chan struct{}) { - go wait.Until(bindWorkFunc, time.Second, stopCh) - - <-stopCh - b.BindQueue.ShutDown() -} - // DeletePodBindings will delete the cached volume bindings for the given pod. func (b *VolumeBinder) DeletePodBindings(pod *v1.Pod) { cache := b.Binder.GetBindingsCache() diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 8cfa765784..850fc204df 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -239,6 +239,7 @@ type FakeVolumePlugin struct { VolumeLimits map[string]int64 VolumeLimitsError error LimitKey string + ProvisionDelaySeconds int Mounters []*FakeVolume Unmounters []*FakeVolume @@ -437,7 +438,7 @@ func (plugin *FakeVolumePlugin) NewProvisioner(options VolumeOptions) (Provision plugin.Lock() defer plugin.Unlock() plugin.LastProvisionerOptions = options - return &FakeProvisioner{options, plugin.Host}, nil + return &FakeProvisioner{options, plugin.Host, plugin.ProvisionDelaySeconds}, nil } func (plugin *FakeVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { @@ -779,8 +780,9 @@ func (fd *FakeDeleter) GetPath() string { } type FakeProvisioner struct { - Options VolumeOptions - Host VolumeHost + Options VolumeOptions + Host VolumeHost + ProvisionDelaySeconds int } func (fc *FakeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) { @@ -807,6 +809,10 @@ func (fc *FakeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies [] }, } + if fc.ProvisionDelaySeconds > 0 { + time.Sleep(time.Duration(fc.ProvisionDelaySeconds) * time.Second) + } + return pv, nil } diff --git a/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go b/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go index 0c17498938..811e4062ae 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1alpha1/types.go @@ -81,6 +81,11 @@ type KubeSchedulerConfiguration struct { // DEPRECATED. // Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity. FailureDomains string `json:"failureDomains"` + + // Duration to wait for a binding operation to complete before timing out + // Value must be non-negative integer. The value zero indicates no waiting. + // If this value is nil, the default value will be used. + BindTimeoutSeconds *int64 `json:"bindTimeoutSeconds"` } // SchedulerAlgorithmSource is the source of a scheduler algorithm. One source diff --git a/staging/src/k8s.io/kube-scheduler/config/v1alpha1/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-scheduler/config/v1alpha1/zz_generated.deepcopy.go index 2e1bf252df..c0bbb79a4f 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1alpha1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1alpha1/zz_generated.deepcopy.go @@ -32,6 +32,11 @@ func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfigurati in.LeaderElection.DeepCopyInto(&out.LeaderElection) out.ClientConnection = in.ClientConnection out.DebuggingConfiguration = in.DebuggingConfiguration + if in.BindTimeoutSeconds != nil { + in, out := &in.BindTimeoutSeconds, &out.BindTimeoutSeconds + *out = new(int64) + **out = **in + } return } diff --git a/test/e2e/storage/persistent_volumes-local.go b/test/e2e/storage/persistent_volumes-local.go index 7340f9ab52..52744cdcba 100644 --- a/test/e2e/storage/persistent_volumes-local.go +++ b/test/e2e/storage/persistent_volumes-local.go @@ -566,13 +566,28 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { framework.Skipf("Runs only when number of nodes >= %v", ssReplicas) } By("Creating a StatefulSet with pod anti-affinity on nodes") - ss := createStatefulSet(config, ssReplicas, volsPerNode, true) + ss := createStatefulSet(config, ssReplicas, volsPerNode, true, false) validateStatefulSet(config, ss, true) }) It("should use volumes on one node when pod has affinity", func() { By("Creating a StatefulSet with pod affinity on nodes") - ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false) + ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false, false) + validateStatefulSet(config, ss, false) + }) + + It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func() { + if len(config.nodes) < ssReplicas { + framework.Skipf("Runs only when number of nodes >= %v", ssReplicas) + } + By("Creating a StatefulSet with pod anti-affinity on nodes") + ss := createStatefulSet(config, ssReplicas, 1, true, true) + validateStatefulSet(config, ss, true) + }) + + It("should use volumes on one node when pod management is parallel and pod has affinity", func() { + By("Creating a StatefulSet with pod affinity on nodes") + ss := createStatefulSet(config, ssReplicas, 1, false, true) validateStatefulSet(config, ss, false) }) }) @@ -1830,7 +1845,7 @@ func findLocalPersistentVolume(c clientset.Interface, volumePath string) (*v1.Pe return nil, nil } -func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti bool) *appsv1.StatefulSet { +func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet { mounts := []v1.VolumeMount{} claims := []v1.PersistentVolumeClaim{} for i := 0; i < volumeCount; i++ { @@ -1897,6 +1912,10 @@ func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount in }, } + if parallel { + spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement + } + ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(spec) Expect(err).NotTo(HaveOccurred()) diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index ba2c51e2b3..4943e0eb83 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -34,6 +34,7 @@ go_test( "//pkg/kubeapiserver/admission:go_default_library", "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithm:go_default_library", + "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/apis/config:go_default_library", diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 412c0c7cb2..a7362afa67 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -183,6 +183,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) + defaultBindTimeout := int64(30) ss := &schedulerappconfig.Config{ ComponentConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, @@ -195,6 +196,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { }, }, }, + BindTimeoutSeconds: &defaultBindTimeout, }, Client: clientSet, InformerFactory: informerFactory, @@ -244,6 +246,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) + defaultBindTimeout := int64(30) ss := &schedulerappconfig.Config{ ComponentConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ SchedulerName: v1.DefaultSchedulerName, @@ -256,6 +259,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { }, }, HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, + BindTimeoutSeconds: &defaultBindTimeout, }, Client: clientSet, InformerFactory: informerFactory, diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index d801caa4a0..c2e73dd5d6 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -91,6 +91,7 @@ func createConfiguratorWithPodInformer( EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), DisablePreemption: false, PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, + BindTimeoutSeconds: 600, }) } @@ -143,7 +144,7 @@ func initTestScheduler( ) *TestContext { // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority // feature gate is enabled at the same time. - return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, time.Second) + return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, false, time.Second) } // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -155,13 +156,15 @@ func initTestSchedulerWithOptions( setPodInformer bool, policy *schedulerapi.Policy, disablePreemption bool, + disableEquivalenceCache bool, resyncPeriod time.Duration, ) *TestContext { - // Enable EnableEquivalenceClassCache for all integration tests. - defer utilfeaturetesting.SetFeatureGateDuringTest( - t, - utilfeature.DefaultFeatureGate, - features.EnableEquivalenceClassCache, true)() + if !disableEquivalenceCache { + defer utilfeaturetesting.SetFeatureGateDuringTest( + t, + utilfeature.DefaultFeatureGate, + features.EnableEquivalenceClassCache, true)() + } // 1. Create scheduler context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod) @@ -256,7 +259,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { // configuration but with pod preemption disabled. func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { return initTestSchedulerWithOptions( - t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, time.Second) + t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, false, time.Second) } // cleanupTest deletes the scheduler and the test namespace. It should be called diff --git a/test/integration/scheduler/volume_binding_test.go b/test/integration/scheduler/volume_binding_test.go index 484ce93dc9..9bb20f5adf 100644 --- a/test/integration/scheduler/volume_binding_test.go +++ b/test/integration/scheduler/volume_binding_test.go @@ -20,6 +20,7 @@ package scheduler import ( "fmt" + "os" "strconv" "strings" "testing" @@ -32,12 +33,14 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" imageutils "k8s.io/kubernetes/test/utils/image" @@ -60,6 +63,7 @@ var ( classWait = "wait" classImmediate = "immediate" + classDynamic = "dynamic" sharedClasses = map[storagev1.VolumeBindingMode]*storagev1.StorageClass{ modeImmediate: makeStorageClass(classImmediate, &modeImmediate), @@ -94,7 +98,7 @@ func TestVolumeBinding(t *testing.T) { "VolumeScheduling": true, "PersistentLocalVolumes": true, } - config := setupCluster(t, "volume-scheduling", 2, features, 0) + config := setupCluster(t, "volume-scheduling-", 2, features, 0, 0, false) defer config.teardown() cases := map[string]struct { @@ -267,7 +271,7 @@ func TestVolumeBindingRescheduling(t *testing.T) { "VolumeScheduling": true, "PersistentLocalVolumes": true, } - config := setupCluster(t, "volume-scheduling", 2, features, 0) + config := setupCluster(t, "volume-scheduling-", 2, features, 0, 0, false) defer config.teardown() storageClassName := "local-storage" @@ -385,8 +389,9 @@ func TestVolumeBindingRescheduling(t *testing.T) { } // TestVolumeBindingStress creates pods, each with unbound PVCs. +// PVs are precreated. func TestVolumeBindingStress(t *testing.T) { - testVolumeBindingStress(t, 0) + testVolumeBindingStress(t, 0, false, 0) } // Like TestVolumeBindingStress but with scheduler resync. In real cluster, @@ -394,32 +399,60 @@ func TestVolumeBindingStress(t *testing.T) { // service/node update events. // This is useful to detect possible race conditions. func TestVolumeBindingStressWithSchedulerResync(t *testing.T) { - testVolumeBindingStress(t, time.Second) + testVolumeBindingStress(t, time.Second, false, 0) } -func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) { +// Like TestVolumeBindingStress but with fast dynamic provisioning +func TestVolumeBindingDynamicStressFast(t *testing.T) { + testVolumeBindingStress(t, 0, true, 0) +} + +// Like TestVolumeBindingStress but with slow dynamic provisioning +func TestVolumeBindingDynamicStressSlow(t *testing.T) { + testVolumeBindingStress(t, 0, true, 30) +} + +func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) { features := map[string]bool{ "VolumeScheduling": true, "PersistentLocalVolumes": true, } - config := setupCluster(t, "volume-binding-stress", 1, features, schedulerResyncPeriod) + config := setupCluster(t, "volume-binding-stress-", 1, features, schedulerResyncPeriod, provisionDelaySeconds, false) defer config.teardown() + // Set max volume limit to the number of PVCs the test will create + // TODO: remove when max volume limit allows setting through storageclass + if err := os.Setenv(predicates.KubeMaxPDVols, fmt.Sprintf("%v", podLimit*volsPerPod)); err != nil { + t.Fatalf("failed to set max pd limit: %v", err) + } + defer os.Unsetenv(predicates.KubeMaxPDVols) + + scName := &classWait + if dynamic { + scName = &classDynamic + sc := makeDynamicProvisionerStorageClass(*scName, &modeWait) + if _, err := config.client.StorageV1().StorageClasses().Create(sc); err != nil { + t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err) + } + } + // Create enough PVs and PVCs for all the pods pvs := []*v1.PersistentVolume{} pvcs := []*v1.PersistentVolumeClaim{} for i := 0; i < podLimit*volsPerPod; i++ { - pv := makePV(fmt.Sprintf("pv-stress-%v", i), classWait, "", "", node1) - pvc := makePVC(fmt.Sprintf("pvc-stress-%v", i), config.ns, &classWait, "") - - if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { - t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + // Don't create pvs for dynamic provisioning test + if !dynamic { + pv := makePV(fmt.Sprintf("pv-stress-%v", i), *scName, "", "", node1) + if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { + t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + } + pvs = append(pvs, pv) } + + pvc := makePVC(fmt.Sprintf("pvc-stress-%v", i), config.ns, scName, "") if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil { t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err) } - - pvs = append(pvs, pv) pvcs = append(pvcs, pvc) } @@ -431,7 +464,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) podPvcs = append(podPvcs, pvcs[j].Name) } - pod := makePod(fmt.Sprintf("pod%v", i), config.ns, podPvcs) + pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, podPvcs) if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil { t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) } @@ -442,7 +475,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) for _, pod := range pods { // Use increased timeout for stress test because there is a higher chance of // PV sync error - if err := waitForPodToScheduleWithTimeout(config.client, pod, 60*time.Second); err != nil { + if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil { t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err) } } @@ -456,12 +489,142 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration) } } +func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) { + features := map[string]bool{ + "VolumeScheduling": true, + "PersistentLocalVolumes": true, + } + // TODO: disable equivalence cache until kubernetes/kubernetes#67680 is fixed + config := setupCluster(t, "volume-pod-affinity-", numNodes, features, 0, 0, true) + defer config.teardown() + + pods := []*v1.Pod{} + pvcs := []*v1.PersistentVolumeClaim{} + pvs := []*v1.PersistentVolume{} + + // Create PVs for the first node + for i := 0; i < numPVsFirstNode; i++ { + pv := makePV(fmt.Sprintf("pv-node1-%v", i), classWait, "", "", node1) + if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { + t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + } + pvs = append(pvs, pv) + } + + // Create 1 PV per Node for the remaining nodes + for i := 2; i <= numNodes; i++ { + pv := makePV(fmt.Sprintf("pv-node%v-0", i), classWait, "", "", fmt.Sprintf("node-%v", i)) + if pv, err := config.client.CoreV1().PersistentVolumes().Create(pv); err != nil { + t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err) + } + pvs = append(pvs, pv) + } + + // Create pods + for i := 0; i < numPods; i++ { + // Create one pvc per pod + pvc := makePVC(fmt.Sprintf("pvc-%v", i), config.ns, &classWait, "") + if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(pvc); err != nil { + t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err) + } + pvcs = append(pvcs, pvc) + + // Create pod with pod affinity + pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, []string{pvc.Name}) + pod.Spec.Affinity = &v1.Affinity{} + affinityTerms := []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"volume-binding-test"}, + }, + }, + }, + TopologyKey: nodeAffinityLabelKey, + }, + } + if anti { + pod.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms, + } + } else { + pod.Spec.Affinity.PodAffinity = &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms, + } + } + + if pod, err := config.client.CoreV1().Pods(config.ns).Create(pod); err != nil { + t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) + } + pods = append(pods, pod) + } + + // Validate Pods scheduled + scheduledNodes := sets.NewString() + for _, pod := range pods { + if err := waitForPodToSchedule(config.client, pod); err != nil { + t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err) + } else { + // Keep track of all the nodes that the Pods were scheduled on + pod, err = config.client.CoreV1().Pods(config.ns).Get(pod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get Pod %q: %v", pod.Name, err) + } + if pod.Spec.NodeName == "" { + t.Fatalf("Pod %q node name unset after scheduling", pod.Name) + } + scheduledNodes.Insert(pod.Spec.NodeName) + } + } + + // Validate the affinity policy + if anti { + // The pods should have been spread across different nodes + if scheduledNodes.Len() != numPods { + t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), numPods) + } + } else { + // The pods should have been scheduled on 1 node + if scheduledNodes.Len() != 1 { + t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), 1) + } + } + + // Validate PVC binding + for _, pvc := range pvcs { + validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound) + } +} + +func TestVolumeBindingWithAntiAffinity(t *testing.T) { + numNodes := 10 + // Create as many pods as number of nodes + numPods := numNodes + // Create many more PVs on node1 to increase chance of selecting node1 + numPVsFirstNode := 10 * numNodes + + testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode) +} + +func TestVolumeBindingWithAffinity(t *testing.T) { + numPods := 10 + // Create many more nodes to increase chance of selecting a PV on a different node than node1 + numNodes := 10 * numPods + // Create numPods PVs on the first node + numPVsFirstNode := numPods + + testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode) +} + func TestPVAffinityConflict(t *testing.T) { features := map[string]bool{ "VolumeScheduling": true, "PersistentLocalVolumes": true, } - config := setupCluster(t, "volume-scheduling", 3, features, 0) + config := setupCluster(t, "volume-scheduling-", 3, features, 0, 0, false) defer config.teardown() pv := makePV("local-pv", classImmediate, "", "", node1) @@ -519,7 +682,7 @@ func TestPVAffinityConflict(t *testing.T) { } } -func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration) *testConfig { +func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[string]bool, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig { oldFeatures := make(map[string]bool, len(features)) for feature := range features { oldFeatures[feature] = utilfeature.DefaultFeatureGate.Enabled(utilfeature.Feature(feature)) @@ -529,7 +692,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s controllerCh := make(chan struct{}) - context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, resyncPeriod) + context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, disableEquivalenceCache, resyncPeriod) clientset := context.clientSet ns := context.ns.Name @@ -543,6 +706,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s Host: host, Config: volume.VolumeConfig{}, LastProvisionerOptions: volume.VolumeOptions{}, + ProvisionDelaySeconds: provisionDelaySeconds, NewAttacherCallCount: 0, NewDetacherCallCount: 0, Mounters: nil, @@ -732,6 +896,9 @@ func makePod(name, ns string, pvcs []string) *v1.Pod { ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: ns, + Labels: map[string]string{ + "app": "volume-binding-test", + }, }, Spec: v1.PodSpec{ Containers: []v1.Container{