Merge pull request #12535 from mesosphere/persist-assigned-slave

Make slave assignment before binding persistent
pull/6/head
CJ Cullen 2015-08-12 10:14:38 -07:00
commit 4163d1a5d2
4 changed files with 87 additions and 72 deletions

View File

@ -198,8 +198,12 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod
oemCt := pod.Spec.Containers
pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod
annotateForExecutorOnSlave(&pod, machine)
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
task.SaveRecoveryInfo(pod.Annotations)
pod.Annotations[annotation.BindingHostKey] = task.Spec.AssignedSlave
for _, entry := range task.Spec.PortMap {
oemPorts := pod.Spec.Containers[entry.ContainerIdx].Ports
@ -233,29 +237,13 @@ type kubeScheduler struct {
defaultContainerMemLimit mresource.MegaBytes
}
// annotatedForExecutor checks whether a pod is assigned to a Mesos slave, and
// possibly already launched. It can, but doesn't have to be scheduled already
// in the sense of kubernetes, i.e. the NodeName field might still be empty.
func annotatedForExecutor(pod *api.Pod) bool {
_, ok := pod.ObjectMeta.Annotations[annotation.BindingHostKey]
return ok
}
// annotateForExecutorOnSlave sets the BindingHostKey annotation which
// marks the pod to be processed by the scheduler and launched as a Mesos
// task. The executor on the slave will to the final binding to finish the
// scheduling in the kubernetes sense.
func annotateForExecutorOnSlave(pod *api.Pod, slave string) {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
} else {
oemAnn := pod.Annotations
pod.Annotations = make(map[string]string)
for k, v := range oemAnn {
pod.Annotations[k] = v
}
}
pod.Annotations[annotation.BindingHostKey] = slave
// recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching
// the BindingHostKey. For tasks in the registry of the scheduler, the same
// value is stored in T.Spec.AssignedSlave. Before launching, the BindingHostKey
// annotation is added and the executor will eventually persist that to the
// apiserver on binding.
func recoverAssignedSlave(pod *api.Pod) string {
return pod.Annotations[annotation.BindingHostKey]
}
// Schedule implements the Scheduler interface of Kubernetes.
@ -462,7 +450,7 @@ func (q *queuer) Run(done <-chan struct{}) {
}
pod := p.(*Pod)
if annotatedForExecutor(pod.Pod) {
if recoverAssignedSlave(pod.Pod) != "" {
log.V(3).Infof("dequeuing pod for scheduling: %v", pod.Pod.Name)
q.dequeue(pod.GetUID())
} else {
@ -511,7 +499,7 @@ func (q *queuer) yield() *api.Pod {
log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err)
} else if !q.podUpdates.Poll(podName, queue.POP_EVENT) {
log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod)
} else if annotatedForExecutor(pod) {
} else if recoverAssignedSlave(pod) != "" {
// should never happen if enqueuePods is filtering properly
log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod)
} else {
@ -801,25 +789,27 @@ func (s *schedulingPlugin) scheduleOne() {
// host="..." | host="..." ; perhaps no updates to process?
//
// TODO(jdef) this needs an integration test
func (s *schedulingPlugin) reconcilePod(oldPod api.Pod) {
log.V(1).Infof("reconcile pod %v", oldPod.Name)
ctx := api.WithNamespace(api.NewDefaultContext(), oldPod.Namespace)
pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(oldPod.Name)
func (s *schedulingPlugin) reconcileTask(t *podtask.T) {
log.V(1).Infof("reconcile pod %v, assigned to slave %q", t.Pod.Name, t.Spec.AssignedSlave)
ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace)
pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name)
if err != nil {
if errors.IsNotFound(err) {
// attempt to delete
if err = s.deleter.deleteOne(&Pod{Pod: &oldPod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr {
log.Errorf("failed to delete pod: %v: %v", oldPod.Name, err)
if err = s.deleter.deleteOne(&Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr {
log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err)
}
} else {
//TODO(jdef) other errors should probably trigger a retry (w/ backoff).
//For now, drop the pod on the floor
log.Warning("aborting reconciliation for pod %v: %v", oldPod.Name, err)
log.Warning("aborting reconciliation for pod %v: %v", t.Pod.Name, err)
}
return
}
if oldPod.Spec.NodeName != pod.Spec.NodeName {
if annotatedForExecutor(pod) {
log.Infof("pod %v scheduled on %q according to apiserver", pod.Name, pod.Spec.NodeName)
if t.Spec.AssignedSlave != pod.Spec.NodeName {
if pod.Spec.NodeName == "" {
// pod is unscheduled.
// it's possible that we dropped the pod in the scheduler error handler
// because of task misalignment with the pod (task.Has(podtask.Launched) == true)

View File

@ -199,7 +199,7 @@ func NewTestPod() (*api.Pod, int) {
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: "default",
Namespace: api.NamespaceDefault,
SelfLink: fmt.Sprintf("http://1.2.3.4/api/v1beta1/pods/%s", name),
},
Spec: api.PodSpec{
@ -418,7 +418,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
c.Recorder = eventObserver
// create plugin
p := NewPlugin(c)
p := NewPlugin(c).(*schedulingPlugin)
assert.NotNil(p)
// run plugin
@ -514,11 +514,8 @@ func TestPlugin_LifeCycle(t *testing.T) {
t.Fatalf("timed out waiting for launchTasks call")
}
// define generic pod startup
startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
// notify watchers of new pod
podListWatch.Add(pod, true)
// Launch a pod and wait until the scheduler driver is called
schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
// wait for failedScheduling event because there is no offer
assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received")
@ -531,8 +528,6 @@ func TestPlugin_LifeCycle(t *testing.T) {
// wait for driver.launchTasks call
select {
case launchedTask := <-launchedTasks:
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING))
for _, offer := range offers {
if offer.Id.GetValue() == launchedTask.offerId.GetValue() {
return pod, &launchedTask, offer
@ -540,12 +535,30 @@ func TestPlugin_LifeCycle(t *testing.T) {
}
t.Fatalf("unknown offer used to start a pod")
return nil, nil, nil
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for launchTasks")
return nil, nil, nil
}
}
// Launch a pod and wait until the scheduler driver is called
launchPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
podListWatch.Add(pod, true)
return schedulePodWithOffers(pod, offers)
}
// Launch a pod, wait until the scheduler driver is called and report back that it is running
startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
// notify about pod, offer resources and wait for scheduling
pod, launchedTask, offer := launchPodWithOffers(pod, offers)
if pod != nil {
// report back status
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING))
return pod, launchedTask, offer
}
return nil, nil, nil
}
startTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) {
pod, i := NewTestPod()
@ -610,31 +623,42 @@ func TestPlugin_LifeCycle(t *testing.T) {
// wait until pod is looked up at the apiserver
assertext.EventuallyTrue(t, time.Second, func() bool {
return testApiServer.Stats(pod.Name) == beforePodLookups+1
}, "expect that reconcilePod will access apiserver for pod %v", pod.Name)
}, "expect that reconcileTask will access apiserver for pod %v", pod.Name)
}
launchTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) {
pod, i := NewTestPod()
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
return launchPodWithOffers(pod, offers)
}
// 1. with pod deleted from the apiserver
pod, launchedTask, _ = startTestPod()
// expected: pod is removed from internal task registry
pod, launchedTask, _ = launchTestPod()
podListWatch.Delete(pod, false) // not notifying the watchers
failPodFromExecutor(launchedTask.taskInfo)
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
assertext.EventuallyTrue(t, time.Second, func() bool {
t, _ := p.api.tasks().ForPod(podKey)
return t == nil
})
// 2. with pod still on the apiserver, not bound
pod, launchedTask, _ = startTestPod()
// expected: pod is rescheduled
pod, launchedTask, _ = launchTestPod()
failPodFromExecutor(launchedTask.taskInfo)
// 3. with pod still on the apiserver, bound i.e. host!=""
pod, launchedTask, usedOffer = startTestPod()
pod.Annotations = map[string]string{
meta.BindingHostKey: *usedOffer.Hostname,
}
podListWatch.Modify(pod, false) // not notifying the watchers
failPodFromExecutor(launchedTask.taskInfo)
// 4. with pod still on the apiserver, bound i.e. host!="", notified via ListWatch
retryOffers := []*mesos.Offer{NewTestOffer("retry-offer")}
schedulePodWithOffers(pod, retryOffers)
// 3. with pod still on the apiserver, bound, notified via ListWatch
// expected: nothing, pod updates not supported, compare ReconcileTask function
pod, launchedTask, usedOffer = startTestPod()
pod.Annotations = map[string]string{
meta.BindingHostKey: *usedOffer.Hostname,
}
pod.Spec.NodeName = *usedOffer.Hostname
podListWatch.Modify(pod, true) // notifying the watchers
time.Sleep(time.Second / 2)
failPodFromExecutor(launchedTask.taskInfo)

View File

@ -71,12 +71,13 @@ type T struct {
}
type Spec struct {
SlaveID string
CPU mresource.CPUShares
Memory mresource.MegaBytes
PortMap []HostPortMapping
Ports []uint64
Data []byte
SlaveID string
AssignedSlave string
CPU mresource.CPUShares
Memory mresource.MegaBytes
PortMap []HostPortMapping
Ports []uint64
Data []byte
}
// mostly-clone this pod task. the clone will actually share the some fields:
@ -161,9 +162,10 @@ func (t *T) FillFromDetails(details *mesos.Offer) error {
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem)
t.Spec = Spec{
SlaveID: details.GetSlaveId().GetValue(),
CPU: cpu,
Memory: mem,
SlaveID: details.GetSlaveId().GetValue(),
AssignedSlave: details.GetHostname(),
CPU: cpu,
Memory: mem,
}
// fill in port mapping
@ -346,8 +348,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
bindTime: now,
}
var (
offerId string
hostname string
offerId string
)
for _, k := range []string{
annotation.BindingHostKey,
@ -362,7 +363,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
}
switch k {
case annotation.BindingHostKey:
hostname = v
t.Spec.AssignedSlave = v
case annotation.SlaveIdKey:
t.Spec.SlaveID = v
case annotation.OfferIdKey:
@ -375,7 +376,7 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
t.executor = &mesos.ExecutorInfo{ExecutorId: mutil.NewExecutorID(v)}
}
}
t.Offer = offers.Expired(offerId, hostname, 0)
t.Offer = offers.Expired(offerId, t.Spec.AssignedSlave, 0)
t.Flags[Launched] = struct{}{}
t.Flags[Bound] = struct{}{}
return t, true, nil

View File

@ -102,7 +102,7 @@ func (self *slaveStorage) getSlave(slaveId string) (*Slave, bool) {
type PluginInterface interface {
// the apiserver may have a different state for the pod than we do
// so reconcile our records, but only for this one pod
reconcilePod(api.Pod)
reconcileTask(*podtask.T)
// execute the Scheduling plugin, should start a go routine and return immediately
Run(<-chan struct{})
@ -432,7 +432,7 @@ func (k *KubernetesScheduler) StatusUpdate(driver bindings.SchedulerDriver, task
case mesos.TaskState_TASK_FAILED:
if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil {
if task.Has(podtask.Launched) && !task.Has(podtask.Bound) {
go k.plugin.reconcilePod(task.Pod)
go k.plugin.reconcileTask(task)
return
}
} else {