Merge pull request #70026 from bertinatto/only_schedule_pod_if_pv_requirements_are_met

Check PV requirements before scheduling
pull/564/head
Kubernetes Prow Robot 2019-01-07 03:46:21 -08:00 committed by GitHub
commit 80afde7ca7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 160 additions and 64 deletions

View File

@ -99,6 +99,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",

View File

@ -21,15 +21,14 @@ import (
"sort"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -99,8 +98,9 @@ type SchedulerVolumeBinder interface {
type volumeBinder struct {
ctrl *PersistentVolumeController
pvcCache PVCAssumeCache
pvCache PVAssumeCache
nodeInformer coreinformers.NodeInformer
pvcCache PVCAssumeCache
pvCache PVAssumeCache
// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
@ -113,6 +113,7 @@ type volumeBinder struct {
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
func NewVolumeBinder(
kubeClient clientset.Interface,
nodeInformer coreinformers.NodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
storageClassInformer storageinformers.StorageClassInformer,
@ -126,6 +127,7 @@ func NewVolumeBinder(
b := &volumeBinder{
ctrl: ctrl,
nodeInformer: nodeInformer,
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
pvCache: NewPVAssumeCache(pvInformer.Informer()),
podBindingCache: NewPodBindingCache(),
@ -139,7 +141,9 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
return b.podBindingCache
}
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
// That's necessary because some operations will need to pass in to the predicate fake node objects.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
podName := getPodName(pod)
@ -382,6 +386,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName)
}
node, err := b.nodeInformer.Lister().Get(pod.Spec.NodeName)
if err != nil {
return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err)
}
for _, binding := range bindings {
// Check for any conditions that might require scheduling retry
@ -391,6 +400,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
return false, fmt.Errorf("failed to check pv binding: %v", err)
}
// Check PV's node affinity (the node might not have the proper label)
if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil {
return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)
}
// Check if pv.ClaimRef got dropped by unbindVolume()
if pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID == "" {
return false, fmt.Errorf("ClaimRef got reset for pv %q", pv.Name)
@ -420,6 +434,17 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName)
}
// If the PVC is bound to a PV, check its node affinity
if pvc.Spec.VolumeName != "" {
pv, err := b.pvCache.GetPV(pvc.Spec.VolumeName)
if err != nil {
return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err)
}
if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil {
return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)
}
}
if !bound {
return false, nil
}

View File

@ -22,17 +22,17 @@ import (
"testing"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
)
@ -55,7 +55,7 @@ var (
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "2", &waitClassWithProvisioner)
provisionedPVCBound = makeTestPVC("provisioned-pvc", "1Gi", "", pvcBound, "some-pv", "1", &waitClassWithProvisioner)
provisionedPVCBound = makeTestPVC("provisioned-pvc", "1Gi", "", pvcBound, "pv-bound", "1", &waitClassWithProvisioner)
noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClass)
topoMismatchPVC = makeTestPVC("topo-mismatch-pvc", "1Gi", "", pvcUnbound, "", "1", &topoMismatchClass)
@ -89,18 +89,24 @@ var (
waitClassWithProvisioner = "waitClassWithProvisioner"
topoMismatchClass = "topoMismatchClass"
// nodes objects
node1 = makeNode("node1", map[string]string{nodeLabelKey: "node1"})
node2 = makeNode("node2", map[string]string{nodeLabelKey: "node2"})
node1NoLabels = makeNode("node1", nil)
// node topology
nodeLabelKey = "nodeKey"
nodeLabelValue = "node1"
)
type testEnv struct {
client clientset.Interface
reactor *volumeReactor
binder SchedulerVolumeBinder
internalBinder *volumeBinder
internalPVCache *pvAssumeCache
internalPVCCache *pvcAssumeCache
client clientset.Interface
reactor *volumeReactor
binder SchedulerVolumeBinder
internalBinder *volumeBinder
internalNodeInformer coreinformers.NodeInformer
internalPVCache *pvAssumeCache
internalPVCCache *pvcAssumeCache
}
func newTestBinder(t *testing.T) *testEnv {
@ -108,11 +114,13 @@ func newTestBinder(t *testing.T) *testEnv {
reactor := newVolumeReactor(client, nil, nil, nil, nil)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
nodeInformer := informerFactory.Core().V1().Nodes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
classInformer := informerFactory.Storage().V1().StorageClasses()
binder := NewVolumeBinder(
client,
nodeInformer,
pvcInformer,
informerFactory.Core().V1().PersistentVolumes(),
classInformer,
@ -195,12 +203,20 @@ func newTestBinder(t *testing.T) *testEnv {
}
return &testEnv{
client: client,
reactor: reactor,
binder: binder,
internalBinder: internalBinder,
internalPVCache: internalPVCache,
internalPVCCache: internalPVCCache,
client: client,
reactor: reactor,
binder: binder,
internalBinder: internalBinder,
internalNodeInformer: nodeInformer,
internalPVCache: internalPVCache,
internalPVCCache: internalPVCCache,
}
}
func (env *testEnv) initNodes(cachedNodes []*v1.Node) {
nodeInformer := env.internalNodeInformer.Informer()
for _, node := range cachedNodes {
nodeInformer.GetIndexer().Add(node)
}
}
@ -540,6 +556,15 @@ func pvRemoveClaimUID(pv *v1.PersistentVolume) *v1.PersistentVolume {
return newPV
}
func makeNode(name string, labels map[string]string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
},
}
}
func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1210,6 +1235,7 @@ func TestCheckBindings(t *testing.T) {
"provisioning-pvc-bound": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVs: []*v1.PersistentVolume{pvBound},
cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)},
expectedBound: true,
},
@ -1255,6 +1281,7 @@ func TestCheckBindings(t *testing.T) {
// Setup
pod := makePod(nil)
testEnv := newTestBinder(t)
testEnv.initNodes([]*v1.Node{node1})
testEnv.initVolumes(scenario.cachedPVs, nil)
testEnv.initClaims(scenario.cachedPVCs, nil)
@ -1278,14 +1305,16 @@ func TestBindPodVolumes(t *testing.T) {
scenarios := map[string]struct {
// Inputs
// These tests only support a single pv and pvc and static binding
bindingsNil bool // Pass in nil bindings slice
binding *bindingInfo
cachedPV *v1.PersistentVolume
cachedPVC *v1.PersistentVolumeClaim
apiPV *v1.PersistentVolume
bindingsNil bool // Pass in nil bindings slice
binding *bindingInfo
cachedPVs []*v1.PersistentVolume
cachedPVCs []*v1.PersistentVolumeClaim
provisionedPVCs []*v1.PersistentVolumeClaim
apiPVs []*v1.PersistentVolume
nodes []*v1.Node
// This function runs with a delay of 5 seconds
delayFunc func(*testing.T, *testEnv, *v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim)
delayFunc func(*testing.T, *testEnv, *v1.Pod, *bindingInfo, []*v1.PersistentVolumeClaim)
// Expected return values
shouldFail bool
@ -1296,19 +1325,19 @@ func TestBindPodVolumes(t *testing.T) {
},
"nothing-to-bind-empty": {},
"already-bound": {
binding: binding1aBound,
cachedPV: pvNode1aBound,
cachedPVC: boundPVCNode1a,
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
},
"binding-succeeds-after-time": {
binding: binding1aBound,
cachedPV: pvNode1a,
cachedPVC: unboundPVC,
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
// Update PVC to be fully bound to PV
newPVC := pvc.DeepCopy()
newPVC := binding.pvc.DeepCopy()
newPVC.ResourceVersion = "100"
newPVC.Spec.VolumeName = pv.Name
newPVC.Spec.VolumeName = binding.pv.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
// Update pvc cache, fake client doesn't invoke informers
@ -1326,10 +1355,10 @@ func TestBindPodVolumes(t *testing.T) {
},
},
"pod-deleted-after-time": {
binding: binding1aBound,
cachedPV: pvNode1a,
cachedPVC: unboundPVC,
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
bindingsCache := testEnv.binder.GetBindingsCache()
if bindingsCache == nil {
t.Fatalf("Failed to get bindings cache")
@ -1348,22 +1377,22 @@ func TestBindPodVolumes(t *testing.T) {
},
"binding-times-out": {
binding: binding1aBound,
cachedPV: pvNode1a,
cachedPVC: unboundPVC,
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
shouldFail: true,
},
"binding-fails": {
binding: binding1bBound,
cachedPV: pvNode1b,
apiPV: pvNode1bBoundHigherVersion,
cachedPVC: unboundPVC2,
cachedPVs: []*v1.PersistentVolume{pvNode1b},
apiPVs: []*v1.PersistentVolume{pvNode1bBoundHigherVersion},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC2},
shouldFail: true,
},
"check-fails": {
binding: binding1aBound,
cachedPV: pvNode1a,
cachedPVC: unboundPVC,
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
// Delete PVC
// Update pvc cache, fake client doesn't invoke informers
internalBinder, ok := testEnv.binder.(*volumeBinder)
@ -1376,7 +1405,41 @@ func TestBindPodVolumes(t *testing.T) {
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
internalPVCCache.delete(pvc)
internalPVCCache.delete(binding.pvc)
},
shouldFail: true,
},
"node-affinity-fails": {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
nodes: []*v1.Node{node1NoLabels},
shouldFail: true,
},
"node-affinity-fails-dynamic-provisioning": {
cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode2},
cachedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC},
provisionedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC},
nodes: []*v1.Node{node1, node2},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
// Update PVC to be fully bound to a PV with a different node
newPVC := pvcs[0].DeepCopy()
newPVC.ResourceVersion = "100"
newPVC.Spec.VolumeName = pvNode2.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
// Update PVC cache, fake client doesn't invoke informers
internalBinder, ok := testEnv.binder.(*volumeBinder)
if !ok {
t.Fatalf("Failed to convert to internal binder")
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
internalPVCCache.add(newPVC)
},
shouldFail: true,
},
@ -1387,25 +1450,32 @@ func TestBindPodVolumes(t *testing.T) {
// Setup
pod := makePod(nil)
if scenario.apiPV == nil {
scenario.apiPV = scenario.cachedPV
if scenario.apiPVs == nil {
scenario.apiPVs = scenario.cachedPVs
}
if scenario.nodes == nil {
scenario.nodes = []*v1.Node{node1}
}
if scenario.provisionedPVCs == nil {
scenario.provisionedPVCs = []*v1.PersistentVolumeClaim{}
}
testEnv := newTestBinder(t)
if !scenario.bindingsNil {
bindings := []*bindingInfo{}
if scenario.binding != nil {
testEnv.initVolumes([]*v1.PersistentVolume{scenario.cachedPV}, []*v1.PersistentVolume{scenario.apiPV})
testEnv.initClaims([]*v1.PersistentVolumeClaim{scenario.cachedPVC}, nil)
testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{scenario.binding}, []*v1.PersistentVolumeClaim{})
} else {
testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{}, []*v1.PersistentVolumeClaim{})
bindings = []*bindingInfo{scenario.binding}
}
testEnv.initNodes(scenario.nodes)
testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs)
testEnv.initClaims(scenario.cachedPVCs, nil)
testEnv.assumeVolumes(t, name, "node1", pod, bindings, scenario.provisionedPVCs)
}
if scenario.delayFunc != nil {
go func() {
time.Sleep(5 * time.Second)
klog.V(5).Infof("Running delay function")
scenario.delayFunc(t, testEnv, pod, scenario.binding.pv, scenario.binding.pvc)
scenario.delayFunc(t, testEnv, pod, scenario.binding, scenario.provisionedPVCs)
}()
}

View File

@ -23,9 +23,7 @@ import (
"reflect"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -47,6 +45,7 @@ import (
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
@ -375,7 +374,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
)
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
args.StorageClassInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{

View File

@ -34,13 +34,14 @@ type VolumeBinder struct {
// NewVolumeBinder sets up the volume binding library and binding queue
func NewVolumeBinder(
client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
storageClassInformer storageinformers.StorageClassInformer,
bindTimeout time.Duration) *VolumeBinder {
return &VolumeBinder{
Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer, bindTimeout),
Binder: persistentvolume.NewVolumeBinder(client, nodeInformer, pvcInformer, pvInformer, storageClassInformer, bindTimeout),
}
}