From 01d83fa10409687a86aeefce0fd0f98d9b1b2022 Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Fri, 17 Aug 2018 17:45:51 -0700 Subject: [PATCH] Scheduler changes to assume volume and pod together, and then bind volume and pod asynchronously afterwards. This will also make it easier to migrate to the scheduler framework. --- pkg/scheduler/scheduler.go | 130 ++++++-------------- pkg/scheduler/scheduler_test.go | 35 ++---- pkg/scheduler/volumebinder/BUILD | 2 - pkg/scheduler/volumebinder/volume_binder.go | 23 +--- 4 files changed, 55 insertions(+), 135 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 87fe364db8..3d836b6ee5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -17,7 +17,6 @@ limitations under the License. package scheduler import ( - "fmt" "time" "k8s.io/api/core/v1" @@ -184,10 +183,6 @@ func (sched *Scheduler) Run() { return } - if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything) - } - go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) } @@ -265,17 +260,12 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e return nodeName, err } -// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required. -// -// If volume binding is required, then the bind volumes routine will update the pod to send it back through -// the scheduler. -// -// Otherwise, return nil error and continue to assume the pod. +// assumeVolumes will update the volume cache with the chosen bindings // // This function modifies assumed if volume binding is required. -func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error { +func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) { if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) + allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) if err != nil { sched.config.Error(assumed, err) sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err) @@ -285,76 +275,38 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error Reason: "SchedulerError", Message: err.Error(), }) - return err } - if !allBound { - err = fmt.Errorf("Volume binding started, waiting for completion") - if bindingRequired { - if sched.config.Ecache != nil { - invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) - sched.config.Ecache.InvalidatePredicates(invalidPredicates) - } - - // bindVolumesWorker() will update the Pod object to put it back in the scheduler queue - sched.config.VolumeBinder.BindQueue.Add(assumed) - } else { - // We are just waiting for PV controller to finish binding, put it back in the - // scheduler queue - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "VolumeBindingWaiting", - }) - } - return err + // Invalidate ecache because assumed volumes could have affected the cached + // pvs for other pods + if sched.config.Ecache != nil { + invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) + sched.config.Ecache.InvalidatePredicates(invalidPredicates) } } - return nil + return } -// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to -// make the API update for volume binding. -// This function runs forever until the volume BindQueue is closed. -func (sched *Scheduler) bindVolumesWorker() { - workFunc := func() bool { - keyObj, quit := sched.config.VolumeBinder.BindQueue.Get() - if quit { - return true - } - defer sched.config.VolumeBinder.BindQueue.Done(keyObj) +// bindVolumes will make the API update with the assumed bindings and wait until +// the PV controller has completely finished the binding operation. +// +// If binding errors, times out or gets undone, then an error will be returned to +// retry scheduling. +func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { + var reason string + var eventType string - assumed, ok := keyObj.(*v1.Pod) - if !ok { - glog.V(4).Infof("Object is not a *v1.Pod") - return false + glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) + err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed) + if err != nil { + glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err) + + // Unassume the Pod and retry scheduling + if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil { + glog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - // TODO: add metrics - var reason string - var eventType string - - glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) - - // The Pod is always sent back to the scheduler afterwards. - err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed) - if err != nil { - glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err) - reason = "VolumeBindingFailed" - eventType = v1.EventTypeWarning - } else { - glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) - reason = "VolumeBindingWaiting" - eventType = v1.EventTypeNormal - err = fmt.Errorf("Volume binding started, waiting for completion") - } - - // Always fail scheduling regardless of binding success. - // The Pod needs to be sent back through the scheduler to: - // * Retry volume binding if it fails. - // * Retry volume binding if dynamic provisioning fails. - // * Bind the Pod to the Node once all volumes are bound. + reason = "VolumeBindingFailed" + eventType = v1.EventTypeWarning sched.config.Error(assumed, err) sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err) sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ @@ -362,15 +314,11 @@ func (sched *Scheduler) bindVolumesWorker() { Status: v1.ConditionFalse, Reason: reason, }) - return false + return err } - for { - if quit := workFunc(); quit { - glog.V(4).Infof("bindVolumesWorker shutting down") - break - } - } + glog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) + return nil } // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. @@ -478,16 +426,12 @@ func (sched *Scheduler) scheduleOne() { // Assume volumes first before assuming the pod. // - // If no volumes need binding, then nil is returned, and continue to assume the pod. + // If all volumes are completely bound, then allBound is true and binding will be skipped. // - // Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes. - // scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod. - // - // After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for - // subsequent passes until all volumes are fully bound. + // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. // // This function modifies 'assumedPod' if volume binding is required. - err = sched.assumeAndBindVolumes(assumedPod, suggestedHost) + allBound, err := sched.assumeVolumes(assumedPod, suggestedHost) if err != nil { return } @@ -499,6 +443,14 @@ func (sched *Scheduler) scheduleOne() { } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { + // Bind volumes first before Pod + if !allBound { + err = sched.bindVolumes(assumedPod) + if err != nil { + return + } + } + err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 4062b94e67..e950141ae4 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -707,8 +707,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { }, expectAssumeCalled: true, expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}}, - - eventReason: "Scheduled", + eventReason: "Scheduled", }, { name: "bound/invalid pv affinity", @@ -739,28 +738,15 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"), }, { - name: "unbound/found matches", + name: "unbound/found matches/bind succeeds", volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ - FindUnboundSatsified: true, - FindBoundSatsified: true, - AssumeBindingRequired: true, + FindUnboundSatsified: true, + FindBoundSatsified: true, }, expectAssumeCalled: true, expectBindCalled: true, - eventReason: "FailedScheduling", - expectError: fmt.Errorf("Volume binding started, waiting for completion"), - }, - { - name: "unbound/found matches/already-bound", - volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ - FindUnboundSatsified: true, - FindBoundSatsified: true, - AssumeBindingRequired: false, - }, - expectAssumeCalled: true, - expectBindCalled: false, - eventReason: "FailedScheduling", - expectError: fmt.Errorf("Volume binding started, waiting for completion"), + expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}}, + eventReason: "Scheduled", }, { name: "predicate error", @@ -784,10 +770,9 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { { name: "bind error", volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{ - FindUnboundSatsified: true, - FindBoundSatsified: true, - AssumeBindingRequired: true, - BindErr: bindErr, + FindUnboundSatsified: true, + FindBoundSatsified: true, + BindErr: bindErr, }, expectAssumeCalled: true, expectBindCalled: true, @@ -814,8 +799,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { close(eventChan) }) - go fakeVolumeBinder.Run(s.bindVolumesWorker, stop) - s.scheduleOne() // Wait for pod to succeed or fail scheduling diff --git a/pkg/scheduler/volumebinder/BUILD b/pkg/scheduler/volumebinder/BUILD index c1eaa52ec7..518178377a 100644 --- a/pkg/scheduler/volumebinder/BUILD +++ b/pkg/scheduler/volumebinder/BUILD @@ -8,11 +8,9 @@ go_library( deps = [ "//pkg/controller/volume/persistentvolume:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/scheduler/volumebinder/volume_binder.go b/pkg/scheduler/volumebinder/volume_binder.go index 1dfe41448c..4dcf2badec 100644 --- a/pkg/scheduler/volumebinder/volume_binder.go +++ b/pkg/scheduler/volumebinder/volume_binder.go @@ -20,19 +20,15 @@ import ( "time" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" ) -// VolumeBinder sets up the volume binding library and manages -// the volume binding operations with a queue. +// VolumeBinder sets up the volume binding library type VolumeBinder struct { - Binder persistentvolume.SchedulerVolumeBinder - BindQueue *workqueue.Type + Binder persistentvolume.SchedulerVolumeBinder } // NewVolumeBinder sets up the volume binding library and binding queue @@ -43,27 +39,18 @@ func NewVolumeBinder( storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder { return &VolumeBinder{ - Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer), - BindQueue: workqueue.NewNamed("podsToBind"), + // TODO: what is a good bind timeout value? + Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer, 10*time.Minute), } } // NewFakeVolumeBinder sets up a fake volume binder and binding queue func NewFakeVolumeBinder(config *persistentvolume.FakeVolumeBinderConfig) *VolumeBinder { return &VolumeBinder{ - Binder: persistentvolume.NewFakeVolumeBinder(config), - BindQueue: workqueue.NewNamed("podsToBind"), + Binder: persistentvolume.NewFakeVolumeBinder(config), } } -// Run starts a goroutine to handle the binding queue with the given function. -func (b *VolumeBinder) Run(bindWorkFunc func(), stopCh <-chan struct{}) { - go wait.Until(bindWorkFunc, time.Second, stopCh) - - <-stopCh - b.BindQueue.ShutDown() -} - // DeletePodBindings will delete the cached volume bindings for the given pod. func (b *VolumeBinder) DeletePodBindings(pod *v1.Pod) { cache := b.Binder.GetBindingsCache()