mirror of https://github.com/k3s-io/k3s
Use provided node object in volume binding predicate
parent
1102fd0dcb
commit
8d1cd819ec
|
@ -62,7 +62,7 @@ type SchedulerVolumeBinder interface {
|
|||
// if bound volumes satisfy the PV NodeAffinity.
|
||||
//
|
||||
// This function is called by the volume binding scheduler predicate and can be called in parallel
|
||||
FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)
|
||||
FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)
|
||||
|
||||
// AssumePodVolumes will take the PV matches for unbound PVCs and update the PV cache assuming
|
||||
// that the PV is prebound to the PVC.
|
||||
|
@ -88,9 +88,8 @@ type volumeBinder struct {
|
|||
ctrl *PersistentVolumeController
|
||||
|
||||
// TODO: Need AssumeCache for PVC for dynamic provisioning
|
||||
pvcCache corelisters.PersistentVolumeClaimLister
|
||||
nodeCache corelisters.NodeLister
|
||||
pvCache PVAssumeCache
|
||||
pvcCache corelisters.PersistentVolumeClaimLister
|
||||
pvCache PVAssumeCache
|
||||
|
||||
// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
|
||||
// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
|
||||
|
@ -102,7 +101,6 @@ func NewVolumeBinder(
|
|||
kubeClient clientset.Interface,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
storageClassInformer storageinformers.StorageClassInformer) SchedulerVolumeBinder {
|
||||
|
||||
// TODO: find better way...
|
||||
|
@ -114,7 +112,6 @@ func NewVolumeBinder(
|
|||
b := &volumeBinder{
|
||||
ctrl: ctrl,
|
||||
pvcCache: pvcInformer.Lister(),
|
||||
nodeCache: nodeInformer.Lister(),
|
||||
pvCache: NewPVAssumeCache(pvInformer.Informer()),
|
||||
podBindingCache: NewPodBindingCache(),
|
||||
}
|
||||
|
@ -127,21 +124,16 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
|
|||
}
|
||||
|
||||
// FindPodVolumes caches the matching PVs per node in podBindingCache
|
||||
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
|
||||
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
|
||||
podName := getPodName(pod)
|
||||
|
||||
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
|
||||
glog.V(5).Infof("FindPodVolumes for pod %q, node %q", podName, nodeName)
|
||||
glog.V(5).Infof("FindPodVolumes for pod %q, node %q", podName, node.Name)
|
||||
|
||||
// Initialize to true for pods that don't have volumes
|
||||
unboundVolumesSatisfied = true
|
||||
boundVolumesSatisfied = true
|
||||
|
||||
node, err := b.nodeCache.Get(nodeName)
|
||||
if node == nil || err != nil {
|
||||
return false, false, fmt.Errorf("error getting node %q: %v", nodeName, err)
|
||||
}
|
||||
|
||||
// The pod's volumes need to be processed in one call to avoid the race condition where
|
||||
// volumes can get bound in between calls.
|
||||
boundClaims, unboundClaims, unboundClaimsImmediate, err := b.getPodVolumes(pod)
|
||||
|
|
|
@ -44,7 +44,7 @@ type FakeVolumeBinder struct {
|
|||
BindCalled bool
|
||||
}
|
||||
|
||||
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisfied, boundVolumesSatsified bool, err error) {
|
||||
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatsified bool, err error) {
|
||||
return b.config.FindUnboundSatsified, b.config.FindBoundSatsified, b.config.FindErr
|
||||
}
|
||||
|
||||
|
|
|
@ -69,6 +69,8 @@ var (
|
|||
|
||||
waitClass = "waitClass"
|
||||
immediateClass = "immediateClass"
|
||||
|
||||
nodeLabelKey = "nodeKey"
|
||||
)
|
||||
|
||||
type testEnv struct {
|
||||
|
@ -86,27 +88,14 @@ func newTestBinder(t *testing.T) *testEnv {
|
|||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
|
||||
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
|
||||
nodeInformer := informerFactory.Core().V1().Nodes()
|
||||
classInformer := informerFactory.Storage().V1().StorageClasses()
|
||||
|
||||
binder := NewVolumeBinder(
|
||||
client,
|
||||
pvcInformer,
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
nodeInformer,
|
||||
classInformer)
|
||||
|
||||
// Add a node
|
||||
err := nodeInformer.Informer().GetIndexer().Add(&v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node1",
|
||||
Labels: map[string]string{"key1": "node1"},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add node to internal cache: %v", err)
|
||||
}
|
||||
|
||||
// Add storageclasses
|
||||
waitMode := storagev1.VolumeBindingWaitForFirstConsumer
|
||||
immediateMode := storagev1.VolumeBindingImmediate
|
||||
|
@ -125,7 +114,7 @@ func newTestBinder(t *testing.T) *testEnv {
|
|||
},
|
||||
}
|
||||
for _, class := range classes {
|
||||
if err = classInformer.Informer().GetIndexer().Add(class); err != nil {
|
||||
if err := classInformer.Informer().GetIndexer().Add(class); err != nil {
|
||||
t.Fatalf("Failed to add storage class to internal cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -331,7 +320,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
|
|||
},
|
||||
}
|
||||
if node != "" {
|
||||
pv.Spec.NodeAffinity = getVolumeNodeAffinity("key1", node)
|
||||
pv.Spec.NodeAffinity = getVolumeNodeAffinity(nodeLabelKey, node)
|
||||
}
|
||||
|
||||
if boundToPVC != nil {
|
||||
|
@ -404,8 +393,6 @@ func TestFindPodVolumes(t *testing.T) {
|
|||
// Inputs
|
||||
pvs []*v1.PersistentVolume
|
||||
podPVCs []*v1.PersistentVolumeClaim
|
||||
// Defaults to node1
|
||||
node string
|
||||
// If nil, use pod PVCs
|
||||
cachePVCs []*v1.PersistentVolumeClaim
|
||||
// If nil, makePod with podPVCs
|
||||
|
@ -454,13 +441,6 @@ func TestFindPodVolumes(t *testing.T) {
|
|||
expectedUnbound: true,
|
||||
expectedBound: true,
|
||||
},
|
||||
"unbound-pvc,node-not-exists": {
|
||||
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
|
||||
node: "node12",
|
||||
expectedUnbound: false,
|
||||
expectedBound: false,
|
||||
shouldFail: true,
|
||||
},
|
||||
"unbound-pvc,pv-same-node": {
|
||||
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
|
||||
pvs: []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1b},
|
||||
|
@ -551,15 +531,21 @@ func TestFindPodVolumes(t *testing.T) {
|
|||
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
|
||||
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
|
||||
|
||||
testNode := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node1",
|
||||
Labels: map[string]string{
|
||||
nodeLabelKey: "node1",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, scenario := range scenarios {
|
||||
glog.V(5).Infof("Running test case %q", name)
|
||||
|
||||
// Setup
|
||||
testEnv := newTestBinder(t)
|
||||
testEnv.initVolumes(scenario.pvs, scenario.pvs)
|
||||
if scenario.node == "" {
|
||||
scenario.node = "node1"
|
||||
}
|
||||
|
||||
// a. Init pvc cache
|
||||
if scenario.cachePVCs == nil {
|
||||
|
@ -573,7 +559,7 @@ func TestFindPodVolumes(t *testing.T) {
|
|||
}
|
||||
|
||||
// Execute
|
||||
unboundSatisfied, boundSatisfied, err := testEnv.binder.FindPodVolumes(scenario.pod, scenario.node)
|
||||
unboundSatisfied, boundSatisfied, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode)
|
||||
|
||||
// Validate
|
||||
if !scenario.shouldFail && err != nil {
|
||||
|
@ -588,7 +574,7 @@ func TestFindPodVolumes(t *testing.T) {
|
|||
if unboundSatisfied != scenario.expectedUnbound {
|
||||
t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied)
|
||||
}
|
||||
testEnv.validatePodCache(t, name, scenario.node, scenario.pod, scenario.expectedBindings)
|
||||
testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1597,7 +1597,7 @@ func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMe
|
|||
return false, nil, fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node.Name)
|
||||
unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
|
|
@ -293,7 +293,7 @@ func NewConfigFactory(
|
|||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||
// Setup volume binder
|
||||
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer)
|
||||
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)
|
||||
}
|
||||
|
||||
// Setup cache comparer
|
||||
|
|
|
@ -40,11 +40,10 @@ func NewVolumeBinder(
|
|||
client clientset.Interface,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder {
|
||||
|
||||
return &VolumeBinder{
|
||||
Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer),
|
||||
Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer),
|
||||
BindQueue: workqueue.NewNamed("podsToBind"),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue