Merge pull request #73159 from cofyc/fix72329

Better way to share common utilities between PV controller and Volume Binder
pull/564/head
Kubernetes Prow Robot 2019-02-05 22:36:13 -08:00 committed by GitHub
commit a1539d8e52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 123 additions and 89 deletions

View File

@ -17,6 +17,7 @@ go_library(
"scheduler_binder.go", "scheduler_binder.go",
"scheduler_binder_cache.go", "scheduler_binder_cache.go",
"scheduler_binder_fake.go", "scheduler_binder_fake.go",
"util.go",
"volume_host.go", "volume_host.go",
], ],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume", importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume",

View File

@ -185,7 +185,7 @@ func findMatchingVolume(
} }
} }
if isVolumeBoundToClaim(volume, claim) { if IsVolumeBoundToClaim(volume, claim) {
// this claim and volume are pre-bound; return // this claim and volume are pre-bound; return
// the volume if the size request is satisfied, // the volume if the size request is satisfied,
// otherwise continue searching for a match // otherwise continue searching for a match

View File

@ -294,24 +294,6 @@ func (ctrl *PersistentVolumeController) isDelayBindingProvisioning(claim *v1.Per
return ok return ok
} }
func (ctrl *PersistentVolumeController) isDelayBindingMode(claim *v1.PersistentVolumeClaim) (bool, error) {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
}
class, err := ctrl.classLister.Get(className)
if err != nil {
return false, nil
}
if class.VolumeBindingMode == nil {
return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
}
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
// shouldDelayBinding returns true if binding of claim should be delayed, false otherwise. // shouldDelayBinding returns true if binding of claim should be delayed, false otherwise.
// If binding of claim should be delayed, only claims pbound by scheduler // If binding of claim should be delayed, only claims pbound by scheduler
func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) { func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
@ -321,7 +303,7 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV
} }
// If claim is in delay binding mode. // If claim is in delay binding mode.
return ctrl.isDelayBindingMode(claim) return IsDelayBindingMode(claim, ctrl.classLister)
} }
// syncUnboundClaim is the main controller method to decide what to do with an // syncUnboundClaim is the main controller method to decide what to do with an
@ -419,7 +401,7 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
} }
// OBSERVATION: pvc is "Bound", pv is "Bound" // OBSERVATION: pvc is "Bound", pv is "Bound"
return nil return nil
} else if isVolumeBoundToClaim(volume, claim) { } else if IsVolumeBoundToClaim(volume, claim) {
// User asked for a PV that is claimed by this PVC // User asked for a PV that is claimed by this PVC
// OBSERVATION: pvc is "Pending", pv is "Bound" // OBSERVATION: pvc is "Pending", pv is "Bound"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim)) klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
@ -863,7 +845,7 @@ func (ctrl *PersistentVolumeController) updateVolumePhaseWithEvent(volume *v1.Pe
func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) { func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q", volume.Name, claimToClaimKey(claim)) klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q", volume.Name, claimToClaimKey(claim))
volumeClone, dirty, err := ctrl.getBindVolumeToClaim(volume, claim) volumeClone, dirty, err := GetBindVolumeToClaim(volume, claim)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -897,43 +879,6 @@ func (ctrl *PersistentVolumeController) updateBindVolumeToClaim(volumeClone *v1.
return newVol, nil return newVol, nil
} }
// Get new PV object only, no API or cache update
func (ctrl *PersistentVolumeController) getBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) {
dirty := false
// Check if the volume was already bound (either by user or by controller)
shouldSetBoundByController := false
if !isVolumeBoundToClaim(volume, claim) {
shouldSetBoundByController = true
}
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
volumeClone := volume.DeepCopy()
// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
volume.Spec.ClaimRef.Name != claim.Name ||
volume.Spec.ClaimRef.Namespace != claim.Namespace ||
volume.Spec.ClaimRef.UID != claim.UID {
claimRef, err := ref.GetReference(scheme.Scheme, claim)
if err != nil {
return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err)
}
volumeClone.Spec.ClaimRef = claimRef
dirty = true
}
// Set annBoundByController if it is not set yet
if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, annBoundByController) {
metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, annBoundByController, "yes")
dirty = true
}
return volumeClone, dirty, nil
}
// bindClaimToVolume modifies the given claim to be bound to a volume and // bindClaimToVolume modifies the given claim to be bound to a volume and
// saves it to API server. The volume is not modified in this method! // saves it to API server. The volume is not modified in this method!
func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) { func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) {

View File

@ -467,22 +467,6 @@ func getVolumeStatusForLogging(volume *v1.PersistentVolume) string {
return fmt.Sprintf("phase: %s, bound to: %q, boundByController: %v", volume.Status.Phase, claimName, boundByController) return fmt.Sprintf("phase: %s, bound to: %q, boundByController: %v", volume.Status.Phase, claimName, boundByController)
} }
// isVolumeBoundToClaim returns true, if given volume is pre-bound or bound
// to specific claim. Both claim.Name and claim.Namespace must be equal.
// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
func isVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) bool {
if volume.Spec.ClaimRef == nil {
return false
}
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
return false
}
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
return false
}
return true
}
// storeObjectUpdate updates given cache with a new object version from Informer // storeObjectUpdate updates given cache with a new object version from Informer
// callback (i.e. with events from etcd) or with an object modified by the // callback (i.e. with events from etcd) or with an object modified by the
// controller itself. Returns "true", if the cache was updated, false if the // controller itself. Returns "true", if the cache was updated, false if the

View File

@ -29,6 +29,7 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1" storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/klog" "k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
volumeutil "k8s.io/kubernetes/pkg/volume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -97,7 +98,8 @@ type SchedulerVolumeBinder interface {
} }
type volumeBinder struct { type volumeBinder struct {
ctrl *PersistentVolumeController kubeClient clientset.Interface
classLister storagelisters.StorageClassLister
nodeInformer coreinformers.NodeInformer nodeInformer coreinformers.NodeInformer
pvcCache PVCAssumeCache pvcCache PVCAssumeCache
@ -120,14 +122,9 @@ func NewVolumeBinder(
storageClassInformer storageinformers.StorageClassInformer, storageClassInformer storageinformers.StorageClassInformer,
bindTimeout time.Duration) SchedulerVolumeBinder { bindTimeout time.Duration) SchedulerVolumeBinder {
// TODO: find better way... b := &volumeBinder{
ctrl := &PersistentVolumeController{
kubeClient: kubeClient, kubeClient: kubeClient,
classLister: storageClassInformer.Lister(), classLister: storageClassInformer.Lister(),
}
b := &volumeBinder{
ctrl: ctrl,
nodeInformer: nodeInformer, nodeInformer: nodeInformer,
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()), pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
pvCache: NewPVAssumeCache(pvInformer.Informer()), pvCache: NewPVAssumeCache(pvInformer.Informer()),
@ -291,8 +288,8 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
// Assume PV // Assume PV
newBindings := []*bindingInfo{} newBindings := []*bindingInfo{}
for _, binding := range claimsToBind { for _, binding := range claimsToBind {
newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc) newPV, dirty, err := GetBindVolumeToClaim(binding.pv, binding.pvc)
klog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v", klog.V(5).Infof("AssumePodVolumes: GetBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v",
podName, podName,
binding.pv.Name, binding.pv.Name,
binding.pvc.Name, binding.pvc.Name,
@ -411,9 +408,13 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
for _, binding = range bindings { for _, binding = range bindings {
klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name) klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name)
// TODO: does it hurt if we make an api call and nothing needs to be updated? // TODO: does it hurt if we make an api call and nothing needs to be updated?
if newPV, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil { claimKey := claimToClaimKey(binding.pvc)
klog.V(2).Infof("claim %q bound to volume %q", claimKey, binding.pv.Name)
if newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(binding.pv); err != nil {
klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", binding.pv.Name, claimKey, err)
return err return err
} else { } else {
klog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", binding.pv.Name, claimKey)
// Save updated object from apiserver for later checking. // Save updated object from apiserver for later checking.
binding.pv = newPV binding.pv = newPV
} }
@ -424,7 +425,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
// PV controller is expect to signal back by removing related annotations if actual provisioning fails // PV controller is expect to signal back by removing related annotations if actual provisioning fails
for i, claim = range claimsToProvision { for i, claim = range claimsToProvision {
klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim)) klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim))
if newClaim, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { if newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
return err return err
} else { } else {
// Save updated object from apiserver for later checking. // Save updated object from apiserver for later checking.
@ -627,7 +628,7 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
if volumeBound { if volumeBound {
boundClaims = append(boundClaims, pvc) boundClaims = append(boundClaims, pvc)
} else { } else {
delayBindingMode, err := b.ctrl.isDelayBindingMode(pvc) delayBindingMode, err := IsDelayBindingMode(pvc, b.classLister)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -726,7 +727,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
return false, nil, fmt.Errorf("no class for claim %q", pvcName) return false, nil, fmt.Errorf("no class for claim %q", pvcName)
} }
class, err := b.ctrl.classLister.Get(className) class, err := b.classLister.Get(className)
if err != nil { if err != nil {
return false, nil, fmt.Errorf("failed to find storage class %q", className) return false, nil, fmt.Errorf("failed to find storage class %q", className)
} }

View File

@ -0,0 +1,103 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/reference"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
// IsDelayBindingMode checks if claim is in delay binding mode.
func IsDelayBindingMode(claim *v1.PersistentVolumeClaim, classLister storagelisters.StorageClassLister) (bool, error) {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
}
class, err := classLister.Get(className)
if err != nil {
return false, nil
}
if class.VolumeBindingMode == nil {
return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
}
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
// GetBindVolumeToClaim returns a new volume which is bound to given claim. In
// addition, it returns a bool which indicates whether we made modification on
// original volume.
func GetBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) {
dirty := false
// Check if the volume was already bound (either by user or by controller)
shouldSetBoundByController := false
if !IsVolumeBoundToClaim(volume, claim) {
shouldSetBoundByController = true
}
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
volumeClone := volume.DeepCopy()
// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
volume.Spec.ClaimRef.Name != claim.Name ||
volume.Spec.ClaimRef.Namespace != claim.Namespace ||
volume.Spec.ClaimRef.UID != claim.UID {
claimRef, err := reference.GetReference(scheme.Scheme, claim)
if err != nil {
return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err)
}
volumeClone.Spec.ClaimRef = claimRef
dirty = true
}
// Set annBoundByController if it is not set yet
if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, annBoundByController) {
metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, annBoundByController, "yes")
dirty = true
}
return volumeClone, dirty, nil
}
// IsVolumeBoundToClaim returns true, if given volume is pre-bound or bound
// to specific claim. Both claim.Name and claim.Namespace must be equal.
// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
func IsVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) bool {
if volume.Spec.ClaimRef == nil {
return false
}
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
return false
}
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
return false
}
return true
}