mirror of https://github.com/k3s-io/k3s
Check PV requirements before scheduling pod
parent
2a1560061e
commit
bb4fcddd1b
|
@ -99,6 +99,7 @@ go_test(
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||||
|
|
|
@ -21,15 +21,14 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
storageinformers "k8s.io/client-go/informers/storage/v1"
|
storageinformers "k8s.io/client-go/informers/storage/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/klog"
|
||||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||||
)
|
)
|
||||||
|
@ -99,6 +98,7 @@ type SchedulerVolumeBinder interface {
|
||||||
type volumeBinder struct {
|
type volumeBinder struct {
|
||||||
ctrl *PersistentVolumeController
|
ctrl *PersistentVolumeController
|
||||||
|
|
||||||
|
nodeInformer coreinformers.NodeInformer
|
||||||
pvcCache PVCAssumeCache
|
pvcCache PVCAssumeCache
|
||||||
pvCache PVAssumeCache
|
pvCache PVAssumeCache
|
||||||
|
|
||||||
|
@ -113,6 +113,7 @@ type volumeBinder struct {
|
||||||
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
|
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
|
||||||
func NewVolumeBinder(
|
func NewVolumeBinder(
|
||||||
kubeClient clientset.Interface,
|
kubeClient clientset.Interface,
|
||||||
|
nodeInformer coreinformers.NodeInformer,
|
||||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||||
pvInformer coreinformers.PersistentVolumeInformer,
|
pvInformer coreinformers.PersistentVolumeInformer,
|
||||||
storageClassInformer storageinformers.StorageClassInformer,
|
storageClassInformer storageinformers.StorageClassInformer,
|
||||||
|
@ -126,6 +127,7 @@ func NewVolumeBinder(
|
||||||
|
|
||||||
b := &volumeBinder{
|
b := &volumeBinder{
|
||||||
ctrl: ctrl,
|
ctrl: ctrl,
|
||||||
|
nodeInformer: nodeInformer,
|
||||||
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
|
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
|
||||||
pvCache: NewPVAssumeCache(pvInformer.Informer()),
|
pvCache: NewPVAssumeCache(pvInformer.Informer()),
|
||||||
podBindingCache: NewPodBindingCache(),
|
podBindingCache: NewPodBindingCache(),
|
||||||
|
@ -139,7 +141,9 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
|
||||||
return b.podBindingCache
|
return b.podBindingCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache
|
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
|
||||||
|
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
|
||||||
|
// That's necessary because some operations will need to pass in to the predicate fake node objects.
|
||||||
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
|
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
|
||||||
podName := getPodName(pod)
|
podName := getPodName(pod)
|
||||||
|
|
||||||
|
@ -382,6 +386,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
||||||
return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName)
|
return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node, err := b.nodeInformer.Lister().Get(pod.Spec.NodeName)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err)
|
||||||
|
}
|
||||||
|
|
||||||
for _, binding := range bindings {
|
for _, binding := range bindings {
|
||||||
// Check for any conditions that might require scheduling retry
|
// Check for any conditions that might require scheduling retry
|
||||||
|
|
||||||
|
@ -391,6 +400,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
||||||
return false, fmt.Errorf("failed to check pv binding: %v", err)
|
return false, fmt.Errorf("failed to check pv binding: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check PV's node affinity (the node might not have the proper label)
|
||||||
|
if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil {
|
||||||
|
return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Check if pv.ClaimRef got dropped by unbindVolume()
|
// Check if pv.ClaimRef got dropped by unbindVolume()
|
||||||
if pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID == "" {
|
if pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID == "" {
|
||||||
return false, fmt.Errorf("ClaimRef got reset for pv %q", pv.Name)
|
return false, fmt.Errorf("ClaimRef got reset for pv %q", pv.Name)
|
||||||
|
@ -420,6 +434,17 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
||||||
return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName)
|
return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the PVC is bound to a PV, check its node affinity
|
||||||
|
if pvc.Spec.VolumeName != "" {
|
||||||
|
pv, err := b.pvCache.GetPV(pvc.Spec.VolumeName)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err)
|
||||||
|
}
|
||||||
|
if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil {
|
||||||
|
return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !bound {
|
if !bound {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,17 +22,17 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
storagev1 "k8s.io/api/storage/v1"
|
storagev1 "k8s.io/api/storage/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/diff"
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
"k8s.io/klog"
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
)
|
)
|
||||||
|
@ -55,7 +55,7 @@ var (
|
||||||
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
|
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
|
||||||
provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
|
provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "1", &waitClassWithProvisioner)
|
||||||
provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "2", &waitClassWithProvisioner)
|
provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", "", pvcUnbound, "", "2", &waitClassWithProvisioner)
|
||||||
provisionedPVCBound = makeTestPVC("provisioned-pvc", "1Gi", "", pvcBound, "some-pv", "1", &waitClassWithProvisioner)
|
provisionedPVCBound = makeTestPVC("provisioned-pvc", "1Gi", "", pvcBound, "pv-bound", "1", &waitClassWithProvisioner)
|
||||||
noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClass)
|
noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", "", pvcUnbound, "", "1", &waitClass)
|
||||||
topoMismatchPVC = makeTestPVC("topo-mismatch-pvc", "1Gi", "", pvcUnbound, "", "1", &topoMismatchClass)
|
topoMismatchPVC = makeTestPVC("topo-mismatch-pvc", "1Gi", "", pvcUnbound, "", "1", &topoMismatchClass)
|
||||||
|
|
||||||
|
@ -89,6 +89,11 @@ var (
|
||||||
waitClassWithProvisioner = "waitClassWithProvisioner"
|
waitClassWithProvisioner = "waitClassWithProvisioner"
|
||||||
topoMismatchClass = "topoMismatchClass"
|
topoMismatchClass = "topoMismatchClass"
|
||||||
|
|
||||||
|
// nodes objects
|
||||||
|
node1 = makeNode("node1", map[string]string{nodeLabelKey: "node1"})
|
||||||
|
node2 = makeNode("node2", map[string]string{nodeLabelKey: "node2"})
|
||||||
|
node1NoLabels = makeNode("node1", nil)
|
||||||
|
|
||||||
// node topology
|
// node topology
|
||||||
nodeLabelKey = "nodeKey"
|
nodeLabelKey = "nodeKey"
|
||||||
nodeLabelValue = "node1"
|
nodeLabelValue = "node1"
|
||||||
|
@ -99,6 +104,7 @@ type testEnv struct {
|
||||||
reactor *volumeReactor
|
reactor *volumeReactor
|
||||||
binder SchedulerVolumeBinder
|
binder SchedulerVolumeBinder
|
||||||
internalBinder *volumeBinder
|
internalBinder *volumeBinder
|
||||||
|
internalNodeInformer coreinformers.NodeInformer
|
||||||
internalPVCache *pvAssumeCache
|
internalPVCache *pvAssumeCache
|
||||||
internalPVCCache *pvcAssumeCache
|
internalPVCCache *pvcAssumeCache
|
||||||
}
|
}
|
||||||
|
@ -108,11 +114,13 @@ func newTestBinder(t *testing.T) *testEnv {
|
||||||
reactor := newVolumeReactor(client, nil, nil, nil, nil)
|
reactor := newVolumeReactor(client, nil, nil, nil, nil)
|
||||||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||||
|
|
||||||
|
nodeInformer := informerFactory.Core().V1().Nodes()
|
||||||
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
|
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
|
||||||
classInformer := informerFactory.Storage().V1().StorageClasses()
|
classInformer := informerFactory.Storage().V1().StorageClasses()
|
||||||
|
|
||||||
binder := NewVolumeBinder(
|
binder := NewVolumeBinder(
|
||||||
client,
|
client,
|
||||||
|
nodeInformer,
|
||||||
pvcInformer,
|
pvcInformer,
|
||||||
informerFactory.Core().V1().PersistentVolumes(),
|
informerFactory.Core().V1().PersistentVolumes(),
|
||||||
classInformer,
|
classInformer,
|
||||||
|
@ -199,11 +207,19 @@ func newTestBinder(t *testing.T) *testEnv {
|
||||||
reactor: reactor,
|
reactor: reactor,
|
||||||
binder: binder,
|
binder: binder,
|
||||||
internalBinder: internalBinder,
|
internalBinder: internalBinder,
|
||||||
|
internalNodeInformer: nodeInformer,
|
||||||
internalPVCache: internalPVCache,
|
internalPVCache: internalPVCache,
|
||||||
internalPVCCache: internalPVCCache,
|
internalPVCCache: internalPVCCache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (env *testEnv) initNodes(cachedNodes []*v1.Node) {
|
||||||
|
nodeInformer := env.internalNodeInformer.Informer()
|
||||||
|
for _, node := range cachedNodes {
|
||||||
|
nodeInformer.GetIndexer().Add(node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) {
|
func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) {
|
||||||
internalPVCCache := env.internalPVCCache
|
internalPVCCache := env.internalPVCCache
|
||||||
for _, pvc := range cachedPVCs {
|
for _, pvc := range cachedPVCs {
|
||||||
|
@ -540,6 +556,15 @@ func pvRemoveClaimUID(pv *v1.PersistentVolume) *v1.PersistentVolume {
|
||||||
return newPV
|
return newPV
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeNode(name string, labels map[string]string) *v1.Node {
|
||||||
|
return &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Labels: labels,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod {
|
func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod {
|
||||||
pod := &v1.Pod{
|
pod := &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
@ -1210,6 +1235,7 @@ func TestCheckBindings(t *testing.T) {
|
||||||
"provisioning-pvc-bound": {
|
"provisioning-pvc-bound": {
|
||||||
bindings: []*bindingInfo{},
|
bindings: []*bindingInfo{},
|
||||||
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
|
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
|
||||||
|
cachedPVs: []*v1.PersistentVolume{pvBound},
|
||||||
cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)},
|
cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)},
|
||||||
expectedBound: true,
|
expectedBound: true,
|
||||||
},
|
},
|
||||||
|
@ -1255,6 +1281,7 @@ func TestCheckBindings(t *testing.T) {
|
||||||
// Setup
|
// Setup
|
||||||
pod := makePod(nil)
|
pod := makePod(nil)
|
||||||
testEnv := newTestBinder(t)
|
testEnv := newTestBinder(t)
|
||||||
|
testEnv.initNodes([]*v1.Node{node1})
|
||||||
testEnv.initVolumes(scenario.cachedPVs, nil)
|
testEnv.initVolumes(scenario.cachedPVs, nil)
|
||||||
testEnv.initClaims(scenario.cachedPVCs, nil)
|
testEnv.initClaims(scenario.cachedPVCs, nil)
|
||||||
|
|
||||||
|
@ -1280,12 +1307,14 @@ func TestBindPodVolumes(t *testing.T) {
|
||||||
// These tests only support a single pv and pvc and static binding
|
// These tests only support a single pv and pvc and static binding
|
||||||
bindingsNil bool // Pass in nil bindings slice
|
bindingsNil bool // Pass in nil bindings slice
|
||||||
binding *bindingInfo
|
binding *bindingInfo
|
||||||
cachedPV *v1.PersistentVolume
|
cachedPVs []*v1.PersistentVolume
|
||||||
cachedPVC *v1.PersistentVolumeClaim
|
cachedPVCs []*v1.PersistentVolumeClaim
|
||||||
apiPV *v1.PersistentVolume
|
provisionedPVCs []*v1.PersistentVolumeClaim
|
||||||
|
apiPVs []*v1.PersistentVolume
|
||||||
|
nodes []*v1.Node
|
||||||
|
|
||||||
// This function runs with a delay of 5 seconds
|
// This function runs with a delay of 5 seconds
|
||||||
delayFunc func(*testing.T, *testEnv, *v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim)
|
delayFunc func(*testing.T, *testEnv, *v1.Pod, *bindingInfo, []*v1.PersistentVolumeClaim)
|
||||||
|
|
||||||
// Expected return values
|
// Expected return values
|
||||||
shouldFail bool
|
shouldFail bool
|
||||||
|
@ -1297,18 +1326,18 @@ func TestBindPodVolumes(t *testing.T) {
|
||||||
"nothing-to-bind-empty": {},
|
"nothing-to-bind-empty": {},
|
||||||
"already-bound": {
|
"already-bound": {
|
||||||
binding: binding1aBound,
|
binding: binding1aBound,
|
||||||
cachedPV: pvNode1aBound,
|
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
|
||||||
cachedPVC: boundPVCNode1a,
|
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
|
||||||
},
|
},
|
||||||
"binding-succeeds-after-time": {
|
"binding-succeeds-after-time": {
|
||||||
binding: binding1aBound,
|
binding: binding1aBound,
|
||||||
cachedPV: pvNode1a,
|
cachedPVs: []*v1.PersistentVolume{pvNode1a},
|
||||||
cachedPVC: unboundPVC,
|
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
|
||||||
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
|
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
|
||||||
// Update PVC to be fully bound to PV
|
// Update PVC to be fully bound to PV
|
||||||
newPVC := pvc.DeepCopy()
|
newPVC := binding.pvc.DeepCopy()
|
||||||
newPVC.ResourceVersion = "100"
|
newPVC.ResourceVersion = "100"
|
||||||
newPVC.Spec.VolumeName = pv.Name
|
newPVC.Spec.VolumeName = binding.pv.Name
|
||||||
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
|
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
|
||||||
|
|
||||||
// Update pvc cache, fake client doesn't invoke informers
|
// Update pvc cache, fake client doesn't invoke informers
|
||||||
|
@ -1327,9 +1356,9 @@ func TestBindPodVolumes(t *testing.T) {
|
||||||
},
|
},
|
||||||
"pod-deleted-after-time": {
|
"pod-deleted-after-time": {
|
||||||
binding: binding1aBound,
|
binding: binding1aBound,
|
||||||
cachedPV: pvNode1a,
|
cachedPVs: []*v1.PersistentVolume{pvNode1a},
|
||||||
cachedPVC: unboundPVC,
|
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
|
||||||
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
|
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
|
||||||
bindingsCache := testEnv.binder.GetBindingsCache()
|
bindingsCache := testEnv.binder.GetBindingsCache()
|
||||||
if bindingsCache == nil {
|
if bindingsCache == nil {
|
||||||
t.Fatalf("Failed to get bindings cache")
|
t.Fatalf("Failed to get bindings cache")
|
||||||
|
@ -1348,22 +1377,22 @@ func TestBindPodVolumes(t *testing.T) {
|
||||||
},
|
},
|
||||||
"binding-times-out": {
|
"binding-times-out": {
|
||||||
binding: binding1aBound,
|
binding: binding1aBound,
|
||||||
cachedPV: pvNode1a,
|
cachedPVs: []*v1.PersistentVolume{pvNode1a},
|
||||||
cachedPVC: unboundPVC,
|
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
|
||||||
shouldFail: true,
|
shouldFail: true,
|
||||||
},
|
},
|
||||||
"binding-fails": {
|
"binding-fails": {
|
||||||
binding: binding1bBound,
|
binding: binding1bBound,
|
||||||
cachedPV: pvNode1b,
|
cachedPVs: []*v1.PersistentVolume{pvNode1b},
|
||||||
apiPV: pvNode1bBoundHigherVersion,
|
apiPVs: []*v1.PersistentVolume{pvNode1bBoundHigherVersion},
|
||||||
cachedPVC: unboundPVC2,
|
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC2},
|
||||||
shouldFail: true,
|
shouldFail: true,
|
||||||
},
|
},
|
||||||
"check-fails": {
|
"check-fails": {
|
||||||
binding: binding1aBound,
|
binding: binding1aBound,
|
||||||
cachedPV: pvNode1a,
|
cachedPVs: []*v1.PersistentVolume{pvNode1a},
|
||||||
cachedPVC: unboundPVC,
|
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
|
||||||
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) {
|
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
|
||||||
// Delete PVC
|
// Delete PVC
|
||||||
// Update pvc cache, fake client doesn't invoke informers
|
// Update pvc cache, fake client doesn't invoke informers
|
||||||
internalBinder, ok := testEnv.binder.(*volumeBinder)
|
internalBinder, ok := testEnv.binder.(*volumeBinder)
|
||||||
|
@ -1376,7 +1405,41 @@ func TestBindPodVolumes(t *testing.T) {
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("Failed to convert to internal PVC cache")
|
t.Fatalf("Failed to convert to internal PVC cache")
|
||||||
}
|
}
|
||||||
internalPVCCache.delete(pvc)
|
internalPVCCache.delete(binding.pvc)
|
||||||
|
},
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
"node-affinity-fails": {
|
||||||
|
binding: binding1aBound,
|
||||||
|
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
|
||||||
|
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
|
||||||
|
nodes: []*v1.Node{node1NoLabels},
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
"node-affinity-fails-dynamic-provisioning": {
|
||||||
|
cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode2},
|
||||||
|
cachedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC},
|
||||||
|
provisionedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC},
|
||||||
|
nodes: []*v1.Node{node1, node2},
|
||||||
|
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
|
||||||
|
// Update PVC to be fully bound to a PV with a different node
|
||||||
|
newPVC := pvcs[0].DeepCopy()
|
||||||
|
newPVC.ResourceVersion = "100"
|
||||||
|
newPVC.Spec.VolumeName = pvNode2.Name
|
||||||
|
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
|
||||||
|
|
||||||
|
// Update PVC cache, fake client doesn't invoke informers
|
||||||
|
internalBinder, ok := testEnv.binder.(*volumeBinder)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Failed to convert to internal binder")
|
||||||
|
}
|
||||||
|
|
||||||
|
pvcCache := internalBinder.pvcCache
|
||||||
|
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Failed to convert to internal PVC cache")
|
||||||
|
}
|
||||||
|
internalPVCCache.add(newPVC)
|
||||||
},
|
},
|
||||||
shouldFail: true,
|
shouldFail: true,
|
||||||
},
|
},
|
||||||
|
@ -1387,25 +1450,32 @@ func TestBindPodVolumes(t *testing.T) {
|
||||||
|
|
||||||
// Setup
|
// Setup
|
||||||
pod := makePod(nil)
|
pod := makePod(nil)
|
||||||
if scenario.apiPV == nil {
|
if scenario.apiPVs == nil {
|
||||||
scenario.apiPV = scenario.cachedPV
|
scenario.apiPVs = scenario.cachedPVs
|
||||||
|
}
|
||||||
|
if scenario.nodes == nil {
|
||||||
|
scenario.nodes = []*v1.Node{node1}
|
||||||
|
}
|
||||||
|
if scenario.provisionedPVCs == nil {
|
||||||
|
scenario.provisionedPVCs = []*v1.PersistentVolumeClaim{}
|
||||||
}
|
}
|
||||||
testEnv := newTestBinder(t)
|
testEnv := newTestBinder(t)
|
||||||
if !scenario.bindingsNil {
|
if !scenario.bindingsNil {
|
||||||
|
bindings := []*bindingInfo{}
|
||||||
if scenario.binding != nil {
|
if scenario.binding != nil {
|
||||||
testEnv.initVolumes([]*v1.PersistentVolume{scenario.cachedPV}, []*v1.PersistentVolume{scenario.apiPV})
|
bindings = []*bindingInfo{scenario.binding}
|
||||||
testEnv.initClaims([]*v1.PersistentVolumeClaim{scenario.cachedPVC}, nil)
|
|
||||||
testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{scenario.binding}, []*v1.PersistentVolumeClaim{})
|
|
||||||
} else {
|
|
||||||
testEnv.assumeVolumes(t, name, "node1", pod, []*bindingInfo{}, []*v1.PersistentVolumeClaim{})
|
|
||||||
}
|
}
|
||||||
|
testEnv.initNodes(scenario.nodes)
|
||||||
|
testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs)
|
||||||
|
testEnv.initClaims(scenario.cachedPVCs, nil)
|
||||||
|
testEnv.assumeVolumes(t, name, "node1", pod, bindings, scenario.provisionedPVCs)
|
||||||
}
|
}
|
||||||
|
|
||||||
if scenario.delayFunc != nil {
|
if scenario.delayFunc != nil {
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
klog.V(5).Infof("Running delay function")
|
klog.V(5).Infof("Running delay function")
|
||||||
scenario.delayFunc(t, testEnv, pod, scenario.binding.pv, scenario.binding.pvc)
|
scenario.delayFunc(t, testEnv, pod, scenario.binding, scenario.provisionedPVCs)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,9 +25,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
storagev1 "k8s.io/api/storage/v1"
|
storagev1 "k8s.io/api/storage/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
@ -49,6 +47,7 @@ import (
|
||||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/klog"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||||
|
@ -377,7 +376,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
||||||
)
|
)
|
||||||
|
|
||||||
// Setup volume binder
|
// Setup volume binder
|
||||||
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||||
|
|
||||||
args.StorageClassInformer.Informer().AddEventHandler(
|
args.StorageClassInformer.Informer().AddEventHandler(
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
|
|
|
@ -34,13 +34,14 @@ type VolumeBinder struct {
|
||||||
// NewVolumeBinder sets up the volume binding library and binding queue
|
// NewVolumeBinder sets up the volume binding library and binding queue
|
||||||
func NewVolumeBinder(
|
func NewVolumeBinder(
|
||||||
client clientset.Interface,
|
client clientset.Interface,
|
||||||
|
nodeInformer coreinformers.NodeInformer,
|
||||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||||
pvInformer coreinformers.PersistentVolumeInformer,
|
pvInformer coreinformers.PersistentVolumeInformer,
|
||||||
storageClassInformer storageinformers.StorageClassInformer,
|
storageClassInformer storageinformers.StorageClassInformer,
|
||||||
bindTimeout time.Duration) *VolumeBinder {
|
bindTimeout time.Duration) *VolumeBinder {
|
||||||
|
|
||||||
return &VolumeBinder{
|
return &VolumeBinder{
|
||||||
Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer, bindTimeout),
|
Binder: persistentvolume.NewVolumeBinder(client, nodeInformer, pvcInformer, pvInformer, storageClassInformer, bindTimeout),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue