Scheduler changes to assume volume and pod together, and then bind

volume and pod asynchronously afterwards. This will also make it easier
to migrate to the scheduler framework.
pull/8/head
Michelle Au 2018-08-17 17:45:51 -07:00
parent 37d46a1e3f
commit 01d83fa104
4 changed files with 55 additions and 135 deletions

View File

@ -17,7 +17,6 @@ limitations under the License.
package scheduler package scheduler
import ( import (
"fmt"
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -184,10 +183,6 @@ func (sched *Scheduler) Run() {
return return
} }
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
} }
@ -265,17 +260,12 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
return nodeName, err return nodeName, err
} }
// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required. // assumeVolumes will update the volume cache with the chosen bindings
//
// If volume binding is required, then the bind volumes routine will update the pod to send it back through
// the scheduler.
//
// Otherwise, return nil error and continue to assume the pod.
// //
// This function modifies assumed if volume binding is required. // This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error { func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
if err != nil { if err != nil {
sched.config.Error(assumed, err) sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err) sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
@ -285,76 +275,38 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error
Reason: "SchedulerError", Reason: "SchedulerError",
Message: err.Error(), Message: err.Error(),
}) })
return err
} }
if !allBound { // Invalidate ecache because assumed volumes could have affected the cached
err = fmt.Errorf("Volume binding started, waiting for completion") // pvs for other pods
if bindingRequired { if sched.config.Ecache != nil {
if sched.config.Ecache != nil { invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) sched.config.Ecache.InvalidatePredicates(invalidPredicates)
sched.config.Ecache.InvalidatePredicates(invalidPredicates)
}
// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue
sched.config.VolumeBinder.BindQueue.Add(assumed)
} else {
// We are just waiting for PV controller to finish binding, put it back in the
// scheduler queue
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "VolumeBindingWaiting",
})
}
return err
} }
} }
return nil return
} }
// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to // bindVolumes will make the API update with the assumed bindings and wait until
// make the API update for volume binding. // the PV controller has completely finished the binding operation.
// This function runs forever until the volume BindQueue is closed. //
func (sched *Scheduler) bindVolumesWorker() { // If binding errors, times out or gets undone, then an error will be returned to
workFunc := func() bool { // retry scheduling.
keyObj, quit := sched.config.VolumeBinder.BindQueue.Get() func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
if quit { var reason string
return true var eventType string
}
defer sched.config.VolumeBinder.BindQueue.Done(keyObj)
assumed, ok := keyObj.(*v1.Pod) glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
if !ok { err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
glog.V(4).Infof("Object is not a *v1.Pod") if err != nil {
return false glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
// Unassume the Pod and retry scheduling
if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
glog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
} }
// TODO: add metrics reason = "VolumeBindingFailed"
var reason string eventType = v1.EventTypeWarning
var eventType string
glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
// The Pod is always sent back to the scheduler afterwards.
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
reason = "VolumeBindingFailed"
eventType = v1.EventTypeWarning
} else {
glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
reason = "VolumeBindingWaiting"
eventType = v1.EventTypeNormal
err = fmt.Errorf("Volume binding started, waiting for completion")
}
// Always fail scheduling regardless of binding success.
// The Pod needs to be sent back through the scheduler to:
// * Retry volume binding if it fails.
// * Retry volume binding if dynamic provisioning fails.
// * Bind the Pod to the Node once all volumes are bound.
sched.config.Error(assumed, err) sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err) sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
@ -362,15 +314,11 @@ func (sched *Scheduler) bindVolumesWorker() {
Status: v1.ConditionFalse, Status: v1.ConditionFalse,
Reason: reason, Reason: reason,
}) })
return false return err
} }
for { glog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
if quit := workFunc(); quit { return nil
glog.V(4).Infof("bindVolumesWorker shutting down")
break
}
}
} }
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
@ -478,16 +426,12 @@ func (sched *Scheduler) scheduleOne() {
// Assume volumes first before assuming the pod. // Assume volumes first before assuming the pod.
// //
// If no volumes need binding, then nil is returned, and continue to assume the pod. // If all volumes are completely bound, then allBound is true and binding will be skipped.
// //
// Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes. // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
// scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod.
//
// After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for
// subsequent passes until all volumes are fully bound.
// //
// This function modifies 'assumedPod' if volume binding is required. // This function modifies 'assumedPod' if volume binding is required.
err = sched.assumeAndBindVolumes(assumedPod, suggestedHost) allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
if err != nil { if err != nil {
return return
} }
@ -499,6 +443,14 @@ func (sched *Scheduler) scheduleOne() {
} }
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above). // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() { go func() {
// Bind volumes first before Pod
if !allBound {
err = sched.bindVolumes(assumedPod)
if err != nil {
return
}
}
err := sched.bind(assumedPod, &v1.Binding{ err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{ Target: v1.ObjectReference{

View File

@ -707,8 +707,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
}, },
expectAssumeCalled: true, expectAssumeCalled: true,
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}}, expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
eventReason: "Scheduled",
eventReason: "Scheduled",
}, },
{ {
name: "bound/invalid pv affinity", name: "bound/invalid pv affinity",
@ -739,28 +738,15 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"), expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"),
}, },
{ {
name: "unbound/found matches", name: "unbound/found matches/bind succeeds",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true, FindUnboundSatsified: true,
FindBoundSatsified: true, FindBoundSatsified: true,
AssumeBindingRequired: true,
}, },
expectAssumeCalled: true, expectAssumeCalled: true,
expectBindCalled: true, expectBindCalled: true,
eventReason: "FailedScheduling", expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
expectError: fmt.Errorf("Volume binding started, waiting for completion"), eventReason: "Scheduled",
},
{
name: "unbound/found matches/already-bound",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeBindingRequired: false,
},
expectAssumeCalled: true,
expectBindCalled: false,
eventReason: "FailedScheduling",
expectError: fmt.Errorf("Volume binding started, waiting for completion"),
}, },
{ {
name: "predicate error", name: "predicate error",
@ -784,10 +770,9 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
{ {
name: "bind error", name: "bind error",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true, FindUnboundSatsified: true,
FindBoundSatsified: true, FindBoundSatsified: true,
AssumeBindingRequired: true, BindErr: bindErr,
BindErr: bindErr,
}, },
expectAssumeCalled: true, expectAssumeCalled: true,
expectBindCalled: true, expectBindCalled: true,
@ -814,8 +799,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
close(eventChan) close(eventChan)
}) })
go fakeVolumeBinder.Run(s.bindVolumesWorker, stop)
s.scheduleOne() s.scheduleOne()
// Wait for pod to succeed or fail scheduling // Wait for pod to succeed or fail scheduling

View File

@ -8,11 +8,9 @@ go_library(
deps = [ deps = [
"//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
], ],
) )

View File

@ -20,19 +20,15 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1" storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
) )
// VolumeBinder sets up the volume binding library and manages // VolumeBinder sets up the volume binding library
// the volume binding operations with a queue.
type VolumeBinder struct { type VolumeBinder struct {
Binder persistentvolume.SchedulerVolumeBinder Binder persistentvolume.SchedulerVolumeBinder
BindQueue *workqueue.Type
} }
// NewVolumeBinder sets up the volume binding library and binding queue // NewVolumeBinder sets up the volume binding library and binding queue
@ -43,27 +39,18 @@ func NewVolumeBinder(
storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder { storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder {
return &VolumeBinder{ return &VolumeBinder{
Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer), // TODO: what is a good bind timeout value?
BindQueue: workqueue.NewNamed("podsToBind"), Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer, 10*time.Minute),
} }
} }
// NewFakeVolumeBinder sets up a fake volume binder and binding queue // NewFakeVolumeBinder sets up a fake volume binder and binding queue
func NewFakeVolumeBinder(config *persistentvolume.FakeVolumeBinderConfig) *VolumeBinder { func NewFakeVolumeBinder(config *persistentvolume.FakeVolumeBinderConfig) *VolumeBinder {
return &VolumeBinder{ return &VolumeBinder{
Binder: persistentvolume.NewFakeVolumeBinder(config), Binder: persistentvolume.NewFakeVolumeBinder(config),
BindQueue: workqueue.NewNamed("podsToBind"),
} }
} }
// Run starts a goroutine to handle the binding queue with the given function.
func (b *VolumeBinder) Run(bindWorkFunc func(), stopCh <-chan struct{}) {
go wait.Until(bindWorkFunc, time.Second, stopCh)
<-stopCh
b.BindQueue.ShutDown()
}
// DeletePodBindings will delete the cached volume bindings for the given pod. // DeletePodBindings will delete the cached volume bindings for the given pod.
func (b *VolumeBinder) DeletePodBindings(pod *v1.Pod) { func (b *VolumeBinder) DeletePodBindings(pod *v1.Pod) {
cache := b.Binder.GetBindingsCache() cache := b.Binder.GetBindingsCache()