Add assume/bind volume functions to scheduler

pull/6/head
Michelle Au 2017-11-08 13:09:58 -08:00
parent 094841c62e
commit 5871b501ac
2 changed files with 351 additions and 8 deletions

View File

@ -17,17 +17,20 @@ limitations under the License.
package scheduler
import (
"fmt"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
@ -35,6 +38,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
"github.com/golang/glog"
"k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder"
)
// Binder knows how to write a binding.
@ -131,7 +135,8 @@ type Config struct {
// Close this to shut down the scheduler.
StopEverything chan struct{}
VolumeBinder persistentvolume.SchedulerVolumeBinder
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
}
// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.
@ -167,6 +172,10 @@ func (sched *Scheduler) Run() {
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
@ -243,6 +252,114 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
return nodeName, err
}
// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required.
//
// If volume binding is required, then the bind volumes routine will update the pod to send it back through
// the scheduler.
//
// Otherwise, return nil error and continue to assume the pod.
//
// This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
if err != nil {
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "SchedulerError",
Message: err.Error(),
})
return err
}
if !allBound {
err = fmt.Errorf("Volume binding started, waiting for completion")
if bindingRequired {
if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBinding)
sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue
sched.config.VolumeBinder.BindQueue.Add(assumed)
} else {
// We are just waiting for PV controller to finish binding, put it back in the
// scheduler queue
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: "VolumeBindingWaiting",
})
}
return err
}
}
return nil
}
// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to
// make the API update for volume binding.
// This function runs forever until the volume BindQueue is closed.
func (sched *Scheduler) bindVolumesWorker() {
workFunc := func() bool {
keyObj, quit := sched.config.VolumeBinder.BindQueue.Get()
if quit {
return true
}
defer sched.config.VolumeBinder.BindQueue.Done(keyObj)
assumed, ok := keyObj.(*v1.Pod)
if !ok {
glog.V(4).Infof("Object is not a *v1.Pod")
return false
}
// TODO: add metrics
var reason string
var eventType string
glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
// The Pod is always sent back to the scheduler afterwards.
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name, err)
reason = "VolumeBindingFailed"
eventType = v1.EventTypeWarning
} else {
glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
reason = "VolumeBindingWaiting"
eventType = v1.EventTypeNormal
err = fmt.Errorf("Volume binding started, waiting for completion")
}
// Always fail scheduling regardless of binding success.
// The Pod needs to be sent back through the scheduler to:
// * Retry volume binding if it fails.
// * Retry volume binding if dynamic provisioning fails.
// * Bind the Pod to the Node once all volumes are bound.
sched.config.Error(assumed, err)
sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
})
return false
}
for {
if quit := workFunc(); quit {
glog.V(4).Infof("bindVolumesWorker shutting down")
break
}
}
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
@ -337,15 +454,32 @@ func (sched *Scheduler) scheduleOne() {
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPod := *pod
assumedPod := pod.DeepCopy()
// Assume volumes first before assuming the pod.
//
// If no volumes need binding, then nil is returned, and continue to assume the pod.
//
// Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes.
// scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod.
//
// After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for
// subsequent passes until all volumes are fully bound.
//
// This function modifies 'assumedPod' if volume binding is required.
err = sched.assumeAndBindVolumes(assumedPod, suggestedHost)
if err != nil {
return
}
// assume modifies `assumedPod` by setting NodeName=suggestedHost
err = sched.assume(&assumedPod, suggestedHost)
err = sched.assume(assumedPod, suggestedHost)
if err != nil {
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
err := sched.bind(&assumedPod, &v1.Binding{
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",

View File

@ -29,15 +29,18 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder"
)
type fakeBinder struct {
@ -420,7 +423,7 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache,
nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap)
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil)
queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080]
@ -495,7 +498,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100),
}
}
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap)
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil)
queuedPodStore.Add(podWithTooBigResourceRequests)
scheduler.scheduleOne()
@ -519,7 +522,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
algo := core.NewGenericScheduler(
scache,
nil,
@ -553,6 +556,10 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
},
}
if recorder != nil {
configurator.Config.Recorder = recorder
}
sched, _ := NewFromConfigurator(configurator, nil...)
return sched, bindingChan, errChan
@ -600,3 +607,205 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
return sched, bindingChan
}
func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&testNode})
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
queuedPodStore.Add(podWithID("foo", ""))
scache := schedulercache.New(10*time.Minute, stop)
scache.AddNode(&testNode)
predicateMap := map[string]algorithm.FitPredicate{
"VolumeBindingChecker": predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
}
recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, recorder)
s.config.VolumeBinder = fakeVolumeBinder
return s, bindingChan, errChan
}
// This is a workaround because golint complains that errors cannot
// end with punctuation. However, the real predicate error message does
// end with a period.
func makePredicateError(failReason string) error {
s := fmt.Sprintf("0/1 nodes are available: %v.", failReason)
return fmt.Errorf(s)
}
func TestSchedulerWithVolumeBinding(t *testing.T) {
findErr := fmt.Errorf("find err")
assumeErr := fmt.Errorf("assume err")
bindErr := fmt.Errorf("bind err")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(t.Logf).Stop()
// This can be small because we wait for pod to finish scheduling first
chanTimeout := 2 * time.Second
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
table := map[string]struct {
expectError error
expectPodBind *v1.Binding
expectAssumeCalled bool
expectBindCalled bool
eventReason string
volumeBinderConfig *persistentvolume.FakeVolumeBinderConfig
}{
"all-bound": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
AllBound: true,
FindUnboundSatsified: true,
FindBoundSatsified: true,
},
expectAssumeCalled: true,
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
eventReason: "Scheduled",
},
"bound,invalid-pv-affinity": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
AllBound: true,
FindUnboundSatsified: true,
FindBoundSatsified: false,
},
eventReason: "FailedScheduling",
expectError: makePredicateError("1 VolumeNodeAffinityConflict"),
},
"unbound,no-matches": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: false,
FindBoundSatsified: true,
},
eventReason: "FailedScheduling",
expectError: makePredicateError("1 VolumeBindingNoMatch"),
},
"bound-and-unbound-unsatisfied": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: false,
FindBoundSatsified: false,
},
eventReason: "FailedScheduling",
expectError: makePredicateError("1 VolumeBindingNoMatch, 1 VolumeNodeAffinityConflict"),
},
"unbound,found-matches": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeBindingRequired: true,
},
expectAssumeCalled: true,
expectBindCalled: true,
eventReason: "FailedScheduling",
expectError: fmt.Errorf("Volume binding started, waiting for completion"),
},
"unbound,found-matches,already-bound": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeBindingRequired: false,
},
expectAssumeCalled: true,
expectBindCalled: false,
eventReason: "FailedScheduling",
expectError: fmt.Errorf("Volume binding started, waiting for completion"),
},
"predicate-error": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindErr: findErr,
},
eventReason: "FailedScheduling",
expectError: findErr,
},
"assume-error": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeErr: assumeErr,
},
expectAssumeCalled: true,
eventReason: "FailedScheduling",
expectError: assumeErr,
},
"bind-error": {
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeBindingRequired: true,
BindErr: bindErr,
},
expectAssumeCalled: true,
expectBindCalled: true,
eventReason: "FailedScheduling",
expectError: bindErr,
},
}
for name, item := range table {
stop := make(chan struct{})
fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig)
internalBinder, ok := fakeVolumeBinder.Binder.(*persistentvolume.FakeVolumeBinder)
if !ok {
t.Fatalf("Failed to get fake volume binder")
}
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster)
eventChan := make(chan struct{})
events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
if e, a := item.eventReason, e.Reason; e != a {
t.Errorf("%v: expected %v, got %v", name, e, a)
}
close(eventChan)
})
go fakeVolumeBinder.Run(s.bindVolumesWorker, stop)
s.scheduleOne()
// Wait for pod to succeed or fail scheduling
select {
case <-eventChan:
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("%v: scheduling timeout after %v", name, wait.ForeverTestTimeout)
}
events.Stop()
// Wait for scheduling to return an error
select {
case err := <-errChan:
if item.expectError == nil || !reflect.DeepEqual(item.expectError.Error(), err.Error()) {
t.Errorf("%v: \n err \nWANT=%+v,\nGOT=%+v", name, item.expectError, err)
}
case <-time.After(chanTimeout):
if item.expectError != nil {
t.Errorf("%v: did not receive error after %v", name, chanTimeout)
}
}
// Wait for pod to succeed binding
select {
case b := <-bindingChan:
if !reflect.DeepEqual(item.expectPodBind, b) {
t.Errorf("%v: \n err \nWANT=%+v,\nGOT=%+v", name, item.expectPodBind, b)
}
case <-time.After(chanTimeout):
if item.expectPodBind != nil {
t.Errorf("%v: did not receive pod binding after %v", name, chanTimeout)
}
}
if item.expectAssumeCalled != internalBinder.AssumeCalled {
t.Errorf("%v: expectedAssumeCall %v", name, item.expectAssumeCalled)
}
if item.expectBindCalled != internalBinder.BindCalled {
t.Errorf("%v: expectedBindCall %v", name, item.expectBindCalled)
}
close(stop)
}
}