diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index aac87f389d..d0c648f5cf 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -99,6 +99,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index 85fd71cf4e..1d4918bbcd 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -21,15 +21,14 @@ import ( "sort" "time" - "k8s.io/klog" - - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/klog" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -99,8 +98,9 @@ type SchedulerVolumeBinder interface { type volumeBinder struct { ctrl *PersistentVolumeController - pvcCache PVCAssumeCache - pvCache PVAssumeCache + nodeInformer coreinformers.NodeInformer + pvcCache PVCAssumeCache + pvCache PVAssumeCache // Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes. // AssumePodVolumes modifies the bindings again for use in BindPodVolumes. @@ -113,6 +113,7 @@ type volumeBinder struct { // NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions. func NewVolumeBinder( kubeClient clientset.Interface, + nodeInformer coreinformers.NodeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, storageClassInformer storageinformers.StorageClassInformer, @@ -126,6 +127,7 @@ func NewVolumeBinder( b := &volumeBinder{ ctrl: ctrl, + nodeInformer: nodeInformer, pvcCache: NewPVCAssumeCache(pvcInformer.Informer()), pvCache: NewPVAssumeCache(pvInformer.Informer()), podBindingCache: NewPodBindingCache(), @@ -139,7 +141,9 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache { return b.podBindingCache } -// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache +// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache. +// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer. +// That's necessary because some operations will need to pass in to the predicate fake node objects. func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) { podName := getPodName(pod) @@ -382,6 +386,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName) } + node, err := b.nodeInformer.Lister().Get(pod.Spec.NodeName) + if err != nil { + return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err) + } + for _, binding := range bindings { // Check for any conditions that might require scheduling retry @@ -391,6 +400,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim return false, fmt.Errorf("failed to check pv binding: %v", err) } + // Check PV's node affinity (the node might not have the proper label) + if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil { + return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err) + } + // Check if pv.ClaimRef got dropped by unbindVolume() if pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID == "" { return false, fmt.Errorf("ClaimRef got reset for pv %q", pv.Name) @@ -420,6 +434,17 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName) } + // If the PVC is bound to a PV, check its node affinity + if pvc.Spec.VolumeName != "" { + pv, err := b.pvCache.GetPV(pvc.Spec.VolumeName) + if err != nil { + return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err) + } + if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil { + return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err) + } + } + if !bound { return false, nil } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go index 1e7af81515..789e464fc1 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go @@ -22,17 +22,17 @@ import ( "testing" "time" - "k8s.io/klog" - - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/controller" ) @@ -55,7 +55,7 @@ var ( provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner) provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner) provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "2", &waitClassWithProvisioner) - provisionedPVCBound = makeTestPVC("provisioned-pvc", "1Gi", "", pvcBound, "some-pv", "1", &waitClassWithProvisioner) + provisionedPVCBound = makeTestPVC("provisioned-pvc", "1Gi", "", pvcBound, "pv-bound", "1", &waitClassWithProvisioner) noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClass) topoMismatchPVC = makeTestPVC("topo-mismatch-pvc", "1Gi", "", pvcUnbound, "", "1", &topoMismatchClass) @@ -89,18 +89,24 @@ var ( waitClassWithProvisioner = "waitClassWithProvisioner" topoMismatchClass = "topoMismatchClass" + // nodes objects + node1 = makeNode("node1", map[string]string{nodeLabelKey: "node1"}) + node2 = makeNode("node2", map[string]string{nodeLabelKey: "node2"}) + node1NoLabels = makeNode("node1", nil) + // node topology nodeLabelKey = "nodeKey" nodeLabelValue = "node1" ) type testEnv struct { - client clientset.Interface - reactor *volumeReactor - binder SchedulerVolumeBinder - internalBinder *volumeBinder - internalPVCache *pvAssumeCache - internalPVCCache *pvcAssumeCache + client clientset.Interface + reactor *volumeReactor + binder SchedulerVolumeBinder + internalBinder *volumeBinder + internalNodeInformer coreinformers.NodeInformer + internalPVCache *pvAssumeCache + internalPVCCache *pvcAssumeCache } func newTestBinder(t *testing.T) *testEnv { @@ -108,11 +114,13 @@ func newTestBinder(t *testing.T) *testEnv { reactor := newVolumeReactor(client, nil, nil, nil, nil) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + nodeInformer := informerFactory.Core().V1().Nodes() pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() classInformer := informerFactory.Storage().V1().StorageClasses() binder := NewVolumeBinder( client, + nodeInformer, pvcInformer, informerFactory.Core().V1().PersistentVolumes(), classInformer, @@ -195,12 +203,20 @@ func newTestBinder(t *testing.T) *testEnv { } return &testEnv{ - client: client, - reactor: reactor, - binder: binder, - internalBinder: internalBinder, - internalPVCache: internalPVCache, - internalPVCCache: internalPVCCache, + client: client, + reactor: reactor, + binder: binder, + internalBinder: internalBinder, + internalNodeInformer: nodeInformer, + internalPVCache: internalPVCache, + internalPVCCache: internalPVCCache, + } +} + +func (env *testEnv) initNodes(cachedNodes []*v1.Node) { + nodeInformer := env.internalNodeInformer.Informer() + for _, node := range cachedNodes { + nodeInformer.GetIndexer().Add(node) } } @@ -540,6 +556,15 @@ func pvRemoveClaimUID(pv *v1.PersistentVolume) *v1.PersistentVolume { return newPV } +func makeNode(name string, labels map[string]string) *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + }, + } +} + func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1210,6 +1235,7 @@ func TestCheckBindings(t *testing.T) { "provisioning-pvc-bound": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + cachedPVs: []*v1.PersistentVolume{pvBound}, cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)}, expectedBound: true, }, @@ -1255,6 +1281,7 @@ func TestCheckBindings(t *testing.T) { // Setup pod := makePod(nil) testEnv := newTestBinder(t) + testEnv.initNodes([]*v1.Node{node1}) testEnv.initVolumes(scenario.cachedPVs, nil) testEnv.initClaims(scenario.cachedPVCs, nil) @@ -1278,14 +1305,16 @@ func TestBindPodVolumes(t *testing.T) { scenarios := map[string]struct { // Inputs // These tests only support a single pv and pvc and static binding - bindingsNil bool // Pass in nil bindings slice - binding *bindingInfo - cachedPV *v1.PersistentVolume - cachedPVC *v1.PersistentVolumeClaim - apiPV *v1.PersistentVolume + bindingsNil bool // Pass in nil bindings slice + binding *bindingInfo + cachedPVs []*v1.PersistentVolume + cachedPVCs []*v1.PersistentVolumeClaim + provisionedPVCs []*v1.PersistentVolumeClaim + apiPVs []*v1.PersistentVolume + nodes []*v1.Node // This function runs with a delay of 5 seconds - delayFunc func(*testing.T, *testEnv, *v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim) + delayFunc func(*testing.T, *testEnv, *v1.Pod, *bindingInfo, []*v1.PersistentVolumeClaim) // Expected return values shouldFail bool @@ -1296,19 +1325,19 @@ func TestBindPodVolumes(t *testing.T) { }, "nothing-to-bind-empty": {}, "already-bound": { - binding: binding1aBound, - cachedPV: pvNode1aBound, - cachedPVC: boundPVCNode1a, + binding: binding1aBound, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, }, "binding-succeeds-after-time": { - binding: binding1aBound, - cachedPV: pvNode1a, - cachedPVC: unboundPVC, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) { + binding: binding1aBound, + cachedPVs: []*v1.PersistentVolume{pvNode1a}, + cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { // Update PVC to be fully bound to PV - newPVC := pvc.DeepCopy() + newPVC := binding.pvc.DeepCopy() newPVC.ResourceVersion = "100" - newPVC.Spec.VolumeName = pv.Name + newPVC.Spec.VolumeName = binding.pv.Name metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") // Update pvc cache, fake client doesn't invoke informers @@ -1326,10 +1355,10 @@ func TestBindPodVolumes(t *testing.T) { }, }, "pod-deleted-after-time": { - binding: binding1aBound, - cachedPV: pvNode1a, - cachedPVC: unboundPVC, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) { + binding: binding1aBound, + cachedPVs: []*v1.PersistentVolume{pvNode1a}, + cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { bindingsCache := testEnv.binder.GetBindingsCache() if bindingsCache == nil { t.Fatalf("Failed to get bindings cache") @@ -1348,22 +1377,22 @@ func TestBindPodVolumes(t *testing.T) { }, "binding-times-out": { binding: binding1aBound, - cachedPV: pvNode1a, - cachedPVC: unboundPVC, + cachedPVs: []*v1.PersistentVolume{pvNode1a}, + cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, shouldFail: true, }, "binding-fails": { binding: binding1bBound, - cachedPV: pvNode1b, - apiPV: pvNode1bBoundHigherVersion, - cachedPVC: unboundPVC2, + cachedPVs: []*v1.PersistentVolume{pvNode1b}, + apiPVs: []*v1.PersistentVolume{pvNode1bBoundHigherVersion}, + cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC2}, shouldFail: true, }, "check-fails": { - binding: binding1aBound, - cachedPV: pvNode1a, - cachedPVC: unboundPVC, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) { + binding: binding1aBound, + cachedPVs: []*v1.PersistentVolume{pvNode1a}, + cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { // Delete PVC // Update pvc cache, fake client doesn't invoke informers internalBinder, ok := testEnv.binder.(*volumeBinder) @@ -1376,7 +1405,41 @@ func TestBindPodVolumes(t *testing.T) { if !ok { t.Fatalf("Failed to convert to internal PVC cache") } - internalPVCCache.delete(pvc) + internalPVCCache.delete(binding.pvc) + }, + shouldFail: true, + }, + "node-affinity-fails": { + binding: binding1aBound, + cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + nodes: []*v1.Node{node1NoLabels}, + shouldFail: true, + }, + "node-affinity-fails-dynamic-provisioning": { + cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode2}, + cachedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC}, + provisionedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC}, + nodes: []*v1.Node{node1, node2}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { + // Update PVC to be fully bound to a PV with a different node + newPVC := pvcs[0].DeepCopy() + newPVC.ResourceVersion = "100" + newPVC.Spec.VolumeName = pvNode2.Name + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") + + // Update PVC cache, fake client doesn't invoke informers + internalBinder, ok := testEnv.binder.(*volumeBinder) + if !ok { + t.Fatalf("Failed to convert to internal binder") + } + + pvcCache := internalBinder.pvcCache + internalPVCCache, ok := pvcCache.(*pvcAssumeCache) + if !ok { + t.Fatalf("Failed to convert to internal PVC cache") + } + internalPVCCache.add(newPVC) }, shouldFail: true, }, @@ -1387,25 +1450,32 @@ func TestBindPodVolumes(t *testing.T) { // Setup pod := makePod(nil) - if scenario.apiPV == nil { - scenario.apiPV = scenario.cachedPV + if scenario.apiPVs == nil { + scenario.apiPVs = scenario.cachedPVs + } + if scenario.nodes == nil { + scenario.nodes = []*v1.Node{node1} + } + if scenario.provisionedPVCs == nil { + scenario.provisionedPVCs = []*v1.PersistentVolumeClaim{} } testEnv := newTestBinder(t) if !scenario.bindingsNil { + bindings := []*bindingInfo{} if scenario.binding != nil { - testEnv.initVolumes([]*v1.PersistentVolume{scenario.cachedPV}, []*v1.PersistentVolume{scenario.apiPV}) - testEnv.initClaims([]*v1.PersistentVolumeClaim{scenario.cachedPVC}, nil) - testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{scenario.binding}, []*v1.PersistentVolumeClaim{}) - } else { - testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{}, []*v1.PersistentVolumeClaim{}) + bindings = []*bindingInfo{scenario.binding} } + testEnv.initNodes(scenario.nodes) + testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs) + testEnv.initClaims(scenario.cachedPVCs, nil) + testEnv.assumeVolumes(t, name, "node1", pod, bindings, scenario.provisionedPVCs) } if scenario.delayFunc != nil { go func() { time.Sleep(5 * time.Second) klog.V(5).Infof("Running delay function") - scenario.delayFunc(t, testEnv, pod, scenario.binding.pv, scenario.binding.pvc) + scenario.delayFunc(t, testEnv, pod, scenario.binding, scenario.provisionedPVCs) }() } diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index c533235608..bb2e3ee7a2 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -23,9 +23,7 @@ import ( "reflect" "time" - "k8s.io/klog" - - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,6 +45,7 @@ import ( storagelisters "k8s.io/client-go/listers/storage/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/klog" podutil "k8s.io/kubernetes/pkg/api/v1/pod" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/scheduler/algorithm" @@ -375,7 +374,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { ) // Setup volume binder - c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) + c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) args.StorageClassInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ diff --git a/pkg/scheduler/volumebinder/volume_binder.go b/pkg/scheduler/volumebinder/volume_binder.go index 4a0089a479..53e3488a8c 100644 --- a/pkg/scheduler/volumebinder/volume_binder.go +++ b/pkg/scheduler/volumebinder/volume_binder.go @@ -34,13 +34,14 @@ type VolumeBinder struct { // NewVolumeBinder sets up the volume binding library and binding queue func NewVolumeBinder( client clientset.Interface, + nodeInformer coreinformers.NodeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, storageClassInformer storageinformers.StorageClassInformer, bindTimeout time.Duration) *VolumeBinder { return &VolumeBinder{ - Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer, bindTimeout), + Binder: persistentvolume.NewVolumeBinder(client, nodeInformer, pvcInformer, pvInformer, storageClassInformer, bindTimeout), } }