Merge pull request #14271 from mesosphere/sur-k8sm-441-flaky

MESOS: fix flaky TestPlugin_LifeCycle
pull/6/head
Brendan Burns 2015-09-30 11:32:00 -07:00
commit 3474324d6a
2 changed files with 327 additions and 164 deletions

View File

@ -52,6 +52,11 @@ const (
pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling
)
const (
FailedScheduling = "FailedScheduling"
Scheduled = "Scheduled"
)
// scheduler abstraction to allow for easier unit testing
type schedulerInterface interface {
sync.Locker // synchronize scheduler plugin operations
@ -757,7 +762,7 @@ func (s *schedulingPlugin) scheduleOne() {
dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister) // call kubeScheduler.Schedule
if err != nil {
log.V(1).Infof("Failed to schedule: %+v", pod)
s.config.Recorder.Eventf(pod, "FailedScheduling", "Error scheduling: %v", err)
s.config.Recorder.Eventf(pod, FailedScheduling, "Error scheduling: %v", err)
s.config.Error(pod, err)
return
}
@ -770,11 +775,11 @@ func (s *schedulingPlugin) scheduleOne() {
}
if err := s.config.Binder.Bind(b); err != nil {
log.V(1).Infof("Failed to bind pod: %+v", err)
s.config.Recorder.Eventf(pod, "FailedScheduling", "Binding rejected: %v", err)
s.config.Recorder.Eventf(pod, FailedScheduling, "Binding rejected: %v", err)
s.config.Error(pod, err)
return
}
s.config.Recorder.Eventf(pod, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest)
s.config.Recorder.Eventf(pod, Scheduled, "Successfully assigned %v to %v", pod.Name, dest)
}
// this pod may be out of sync with respect to the API server registry:

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
@ -34,7 +35,7 @@ import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
util "github.com/mesos/mesos-go/mesosutil"
"github.com/mesos/mesos-go/mesosutil"
bindings "github.com/mesos/mesos-go/scheduler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -46,26 +47,48 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/util"
)
// A apiserver mock which partially mocks the pods API
type TestServer struct {
stats map[string]uint
nodes map[string]*api.Node
lock sync.Mutex // guards above fields
server *httptest.Server
stats map[string]uint
lock sync.Mutex
t *testing.T
}
func (srv *TestServer) LookupNode(name string) *api.Node {
srv.lock.Lock()
defer srv.lock.Unlock()
node, _ := api.Scheme.DeepCopy(srv.nodes[name])
return node.(*api.Node)
}
func (srv *TestServer) WaitForNode(name string) {
assertext.EventuallyTrue(srv.t, util.ForeverTestTimeout, func() bool {
return srv.LookupNode(name) != nil
})
}
func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsListWatch) *TestServer {
ts := TestServer{
stats: map[string]uint{},
nodes: map[string]*api.Node{},
t: t,
}
mux := http.NewServeMux()
mux.HandleFunc(testapi.Default.ResourcePath("pods", namespace, ""), func(w http.ResponseWriter, r *http.Request) {
podListHandler := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
pods := mockPodListWatch.Pods()
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), &pods)))
})
}
mux.HandleFunc(testapi.Default.ResourcePath("pods", namespace, ""), podListHandler)
mux.HandleFunc(testapi.Default.ResourcePath("pods", "", ""), podListHandler)
podsPrefix := testapi.Default.ResourcePath("pods", namespace, "") + "/"
mux.HandleFunc(podsPrefix, func(w http.ResponseWriter, r *http.Request) {
@ -76,7 +99,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
defer ts.lock.Unlock()
ts.stats[name] = ts.stats[name] + 1
p := mockPodListWatch.GetPod(name)
p := mockPodListWatch.Pod(name)
if p != nil {
w.WriteHeader(http.StatusOK)
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), p)))
@ -85,9 +108,33 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
w.WriteHeader(http.StatusNotFound)
})
mux.HandleFunc(testapi.Default.ResourcePath("events", namespace, ""), func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
mux.HandleFunc(
testapi.Default.ResourcePath("events", namespace, ""),
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
},
)
mux.HandleFunc(
testapi.Default.ResourcePath("nodes", "", ""),
func(w http.ResponseWriter, r *http.Request) {
var node api.Node
if err := json.NewDecoder(r.Body).Decode(&node); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
ts.lock.Lock()
defer ts.lock.Unlock()
ts.nodes[node.Name] = &node
if err := json.NewEncoder(w).Encode(node); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
},
)
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
@ -97,6 +144,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
ts.server = httptest.NewServer(mux)
return &ts
}
func (ts *TestServer) Stats(name string) uint {
ts.lock.Lock()
defer ts.lock.Unlock()
@ -131,13 +179,15 @@ func NewMockPodsListWatch(initialPodList api.PodList) *MockPodsListWatch {
}
return &lw
}
func (lw *MockPodsListWatch) Pods() api.PodList {
lw.lock.Lock()
defer lw.lock.Unlock()
return lw.list
}
func (lw *MockPodsListWatch) GetPod(name string) *api.Pod {
func (lw *MockPodsListWatch) Pod(name string) *api.Pod {
lw.lock.Lock()
defer lw.lock.Unlock()
@ -173,6 +223,7 @@ func (lw *MockPodsListWatch) Modify(pod *api.Pod, notify bool) {
}
log.Fatalf("Cannot find pod %v to modify in MockPodsListWatch", pod.Name)
}
func (lw *MockPodsListWatch) Delete(pod *api.Pod, notify bool) {
lw.lock.Lock()
defer lw.lock.Unlock()
@ -229,16 +280,16 @@ func NewTestPod() (*api.Pod, int) {
// Offering some cpus and memory and the 8000-9000 port range
func NewTestOffer(id string) *mesos.Offer {
hostname := "some_hostname"
cpus := util.NewScalarResource("cpus", 3.75)
mem := util.NewScalarResource("mem", 940)
cpus := mesosutil.NewScalarResource("cpus", 3.75)
mem := mesosutil.NewScalarResource("mem", 940)
var port8000 uint64 = 8000
var port9000 uint64 = 9000
ports8000to9000 := mesos.Value_Range{Begin: &port8000, End: &port9000}
ports := util.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000})
ports := mesosutil.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000})
return &mesos.Offer{
Id: util.NewOfferID(id),
Id: mesosutil.NewOfferID(id),
Hostname: &hostname,
SlaveId: util.NewSlaveID(hostname),
SlaveId: mesosutil.NewSlaveID(hostname),
Resources: []*mesos.Resource{cpus, mem, ports},
}
}
@ -266,9 +317,11 @@ func NewEventObserver() *EventObserver {
fifo: make(chan Event, 1000),
}
}
func (o *EventObserver) Event(object runtime.Object, reason, message string) {
o.fifo <- Event{Object: object, Reason: reason, Message: message}
}
func (o *EventObserver) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
o.fifo <- Event{Object: object, Reason: reason, Message: fmt.Sprintf(messageFmt, args...)}
}
@ -278,7 +331,7 @@ func (o *EventObserver) PastEventf(object runtime.Object, timestamp unversioned.
func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, msgAndArgs ...interface{}) bool {
// parse msgAndArgs: first possibly a duration, otherwise a format string with further args
timeout := time.Second * 2
timeout := util.ForeverTestTimeout
msg := "event not received"
msgArgStart := 0
if len(msgAndArgs) > 0 {
@ -326,6 +379,7 @@ func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, ms
return a.Fail(msg)
}
}
func (a *EventAssertions) EventWithReason(observer *EventObserver, reason string, msgAndArgs ...interface{}) bool {
return a.Event(observer, func(e Event) bool {
return e.Reason == reason
@ -362,6 +416,175 @@ func newTaskStatusForTask(task *mesos.TaskInfo, state mesos.TaskState) *mesos.Ta
}
}
type LaunchedTask struct {
offerId mesos.OfferID
taskInfo *mesos.TaskInfo
}
type lifecycleTest struct {
apiServer *TestServer
driver *joinableDriver
eventObs *EventObserver
plugin *schedulingPlugin
podsListWatch *MockPodsListWatch
scheduler *KubernetesScheduler
schedulerProc *ha.SchedulerProcess
t *testing.T
}
func newLifecycleTest(t *testing.T) lifecycleTest {
assert := &EventAssertions{*assert.New(t)}
// create a fake pod watch. We use that below to submit new pods to the scheduler
podsListWatch := NewMockPodsListWatch(api.PodList{})
// create fake apiserver
apiServer := NewTestServer(t, api.NamespaceDefault, podsListWatch)
// create executor with some data for static pods if set
executor := mesosutil.NewExecutorInfo(
mesosutil.NewExecutorID("executor-id"),
mesosutil.NewCommandInfo("executor-cmd"),
)
executor.Data = []byte{0, 1, 2}
// create scheduler
strategy := NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
),
)
scheduler := New(Config{
Executor: executor,
Client: client.NewOrDie(&client.Config{
Host: apiServer.server.URL,
Version: testapi.Default.Version(),
}),
Scheduler: NewFCFSPodScheduler(strategy, apiServer.LookupNode),
Schedcfg: *schedcfg.CreateDefaultConfig(),
LookupNode: apiServer.LookupNode,
})
assert.NotNil(scheduler.client, "client is nil")
assert.NotNil(scheduler.executor, "executor is nil")
assert.NotNil(scheduler.offers, "offer registry is nil")
// create scheduler process
schedulerProc := ha.New(scheduler)
// get plugin config from it
config := scheduler.NewPluginConfig(
schedulerProc.Terminal(),
http.DefaultServeMux,
&podsListWatch.ListWatch,
)
assert.NotNil(config)
// make events observable
eventObs := NewEventObserver()
config.Recorder = eventObs
// create plugin
plugin := NewPlugin(config).(*schedulingPlugin)
assert.NotNil(plugin)
// create mock mesos scheduler driver
driver := &joinableDriver{}
return lifecycleTest{
apiServer: apiServer,
driver: driver,
eventObs: eventObs,
plugin: plugin,
podsListWatch: podsListWatch,
scheduler: scheduler,
schedulerProc: schedulerProc,
t: t,
}
}
func (lt lifecycleTest) Start() <-chan LaunchedTask {
assert := &EventAssertions{*assert.New(lt.t)}
lt.plugin.Run(lt.schedulerProc.Terminal())
// init scheduler
err := lt.scheduler.Init(
lt.schedulerProc.Master(),
lt.plugin,
http.DefaultServeMux,
)
assert.NoError(err)
lt.driver.On("Start").Return(mesos.Status_DRIVER_RUNNING, nil).Once()
started := lt.driver.Upon()
lt.driver.On("ReconcileTasks",
mock.AnythingOfType("[]*mesosproto.TaskStatus"),
).Return(mesos.Status_DRIVER_RUNNING, nil)
lt.driver.On("SendFrameworkMessage",
mock.AnythingOfType("*mesosproto.ExecutorID"),
mock.AnythingOfType("*mesosproto.SlaveID"),
mock.AnythingOfType("string"),
).Return(mesos.Status_DRIVER_RUNNING, nil)
launchedTasks := make(chan LaunchedTask, 1)
launchTasksFunc := func(args mock.Arguments) {
offerIDs := args.Get(0).([]*mesos.OfferID)
taskInfos := args.Get(1).([]*mesos.TaskInfo)
assert.Equal(1, len(offerIDs))
assert.Equal(1, len(taskInfos))
launchedTasks <- LaunchedTask{
offerId: *offerIDs[0],
taskInfo: taskInfos[0],
}
}
lt.driver.On("LaunchTasks",
mock.AnythingOfType("[]*mesosproto.OfferID"),
mock.AnythingOfType("[]*mesosproto.TaskInfo"),
mock.AnythingOfType("*mesosproto.Filters"),
).Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksFunc)
lt.driver.On("DeclineOffer",
mock.AnythingOfType("*mesosproto.OfferID"),
mock.AnythingOfType("*mesosproto.Filters"),
).Return(mesos.Status_DRIVER_RUNNING, nil)
// elect master with mock driver
driverFactory := ha.DriverFactory(func() (bindings.SchedulerDriver, error) {
return lt.driver, nil
})
lt.schedulerProc.Elect(driverFactory)
elected := lt.schedulerProc.Elected()
// driver will be started
<-started
// tell scheduler to be registered
lt.scheduler.Registered(
lt.driver,
mesosutil.NewFrameworkID("kubernetes-id"),
mesosutil.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050),
)
// wait for being elected
<-elected
return launchedTasks
}
func (lt lifecycleTest) Close() {
lt.apiServer.server.Close()
}
func (lt lifecycleTest) End() <-chan struct{} {
return lt.schedulerProc.End()
}
// Test to create the scheduler plugin with an empty plugin config
func TestPlugin_New(t *testing.T) {
assert := assert.New(t)
@ -371,167 +594,89 @@ func TestPlugin_New(t *testing.T) {
assert.NotNil(p)
}
// Test to create the scheduler plugin with the config returned by the scheduler,
// and play through the whole life cycle of the plugin while creating pods, deleting
// TestPlugin_LifeCycle creates a scheduler plugin with the config returned by the scheduler,
// and plays through the whole life cycle of the plugin while creating pods, deleting
// and failing them.
func TestPlugin_LifeCycle(t *testing.T) {
t.Skip("This test is flaky, see #11901")
assert := &EventAssertions{*assert.New(t)}
// create a fake pod watch. We use that below to submit new pods to the scheduler
podListWatch := NewMockPodsListWatch(api.PodList{})
// create fake apiserver
testApiServer := NewTestServer(t, api.NamespaceDefault, podListWatch)
defer testApiServer.server.Close()
// create executor with some data for static pods if set
executor := util.NewExecutorInfo(
util.NewExecutorID("executor-id"),
util.NewCommandInfo("executor-cmd"),
)
executor.Data = []byte{0, 1, 2}
// create scheduler
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
as := NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit))
testScheduler := New(Config{
Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}),
Scheduler: NewFCFSPodScheduler(as, func(node string) *api.Node {
obj, _, _ := nodeStore.GetByKey(node)
if obj == nil {
return nil
}
return obj.(*api.Node)
}),
Schedcfg: *schedcfg.CreateDefaultConfig(),
})
assert.NotNil(testScheduler.client, "client is nil")
assert.NotNil(testScheduler.executor, "executor is nil")
assert.NotNil(testScheduler.offers, "offer registry is nil")
// create scheduler process
schedulerProcess := ha.New(testScheduler)
// get plugin config from it
c := testScheduler.NewPluginConfig(schedulerProcess.Terminal(), http.DefaultServeMux, &podListWatch.ListWatch)
assert.NotNil(c)
// make events observable
eventObserver := NewEventObserver()
c.Recorder = eventObserver
// create plugin
p := NewPlugin(c).(*schedulingPlugin)
assert.NotNil(p)
lt := newLifecycleTest(t)
defer lt.Close()
// run plugin
p.Run(schedulerProcess.Terminal())
defer schedulerProcess.End()
// init scheduler
err := testScheduler.Init(schedulerProcess.Master(), p, http.DefaultServeMux)
assert.NoError(err)
// create mock mesos scheduler driver
mockDriver := &joinableDriver{}
mockDriver.On("Start").Return(mesos.Status_DRIVER_RUNNING, nil).Once()
started := mockDriver.Upon()
mAny := mock.AnythingOfType
mockDriver.On("ReconcileTasks", mAny("[]*mesosproto.TaskStatus")).Return(mesos.Status_DRIVER_RUNNING, nil)
mockDriver.On("SendFrameworkMessage", mAny("*mesosproto.ExecutorID"), mAny("*mesosproto.SlaveID"), mAny("string")).
Return(mesos.Status_DRIVER_RUNNING, nil)
type LaunchedTask struct {
offerId mesos.OfferID
taskInfo *mesos.TaskInfo
}
launchedTasks := make(chan LaunchedTask, 1)
launchTasksCalledFunc := func(args mock.Arguments) {
offerIDs := args.Get(0).([]*mesos.OfferID)
taskInfos := args.Get(1).([]*mesos.TaskInfo)
assert.Equal(1, len(offerIDs))
assert.Equal(1, len(taskInfos))
launchedTasks <- LaunchedTask{
offerId: *offerIDs[0],
taskInfo: taskInfos[0],
}
}
mockDriver.On("LaunchTasks", mAny("[]*mesosproto.OfferID"), mAny("[]*mesosproto.TaskInfo"), mAny("*mesosproto.Filters")).
Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksCalledFunc)
mockDriver.On("DeclineOffer", mAny("*mesosproto.OfferID"), mAny("*mesosproto.Filters")).
Return(mesos.Status_DRIVER_RUNNING, nil)
// elect master with mock driver
driverFactory := ha.DriverFactory(func() (bindings.SchedulerDriver, error) {
return mockDriver, nil
})
schedulerProcess.Elect(driverFactory)
elected := schedulerProcess.Elected()
// driver will be started
<-started
// tell scheduler to be registered
testScheduler.Registered(
mockDriver,
util.NewFrameworkID("kubernetes-id"),
util.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050),
)
// wait for being elected
<-elected
//TODO(jdef) refactor things above here into a test suite setup of some sort
launchedTasks := lt.Start()
defer lt.End()
// fake new, unscheduled pod
pod, i := NewTestPod()
podListWatch.Add(pod, true) // notify watchers
lt.podsListWatch.Add(pod, true) // notify watchers
// wait for failedScheduling event because there is no offer
assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received")
assert.EventWithReason(lt.eventObs, FailedScheduling, "failedScheduling event not received")
// add some matching offer
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
testScheduler.ResourceOffers(nil, offers)
lt.scheduler.ResourceOffers(nil, offers)
// first offer is declined because node is not available yet
lt.apiServer.WaitForNode("some_hostname")
// add one more offer
lt.scheduler.ResourceOffers(nil, offers)
// and wait for scheduled pod
assert.EventWithReason(eventObserver, "scheduled")
assert.EventWithReason(lt.eventObs, Scheduled)
select {
case launchedTask := <-launchedTasks:
// report back that the task has been staged, and then started by mesos
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING))
lt.scheduler.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING),
)
lt.scheduler.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING),
)
// check that ExecutorInfo.data has the static pod data
assert.Len(launchedTask.taskInfo.Executor.Data, 3)
// report back that the task has been lost
mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0)
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST))
lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0)
lt.scheduler.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST),
)
// and wait that framework message is sent to executor
mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1)
lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1)
case <-time.After(5 * time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("timed out waiting for launchTasks call")
}
offeredNodes := make(map[string]struct{})
// Launch a pod and wait until the scheduler driver is called
schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
// wait for failedScheduling event because there is no offer
assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received")
assert.EventWithReason(lt.eventObs, FailedScheduling, "failedScheduling event not received")
// supply a matching offer
testScheduler.ResourceOffers(mockDriver, offers)
lt.scheduler.ResourceOffers(lt.driver, offers)
for _, offer := range offers {
if _, ok := offeredNodes[offer.GetHostname()]; !ok {
offeredNodes[offer.GetHostname()] = struct{}{}
lt.apiServer.WaitForNode(offer.GetHostname())
// reoffer since it must have been declined above
lt.scheduler.ResourceOffers(lt.driver, []*mesos.Offer{offer})
}
}
// and wait to get scheduled
assert.EventWithReason(eventObserver, "scheduled")
assert.EventWithReason(lt.eventObs, Scheduled)
// wait for driver.launchTasks call
select {
@ -543,14 +688,15 @@ func TestPlugin_LifeCycle(t *testing.T) {
}
t.Fatalf("unknown offer used to start a pod")
return nil, nil, nil
case <-time.After(5 * time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatal("timed out waiting for launchTasks")
return nil, nil, nil
}
}
// Launch a pod and wait until the scheduler driver is called
launchPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
podListWatch.Add(pod, true)
lt.podsListWatch.Add(pod, true)
return schedulePodWithOffers(pod, offers)
}
@ -560,8 +706,15 @@ func TestPlugin_LifeCycle(t *testing.T) {
pod, launchedTask, offer := launchPodWithOffers(pod, offers)
if pod != nil {
// report back status
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING))
lt.scheduler.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING),
)
lt.scheduler.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING),
)
return pod, launchedTask, offer
}
@ -577,23 +730,28 @@ func TestPlugin_LifeCycle(t *testing.T) {
// start another pod
pod, launchedTask, _ := startTestPod()
// mock drvier.KillTask, should be invoked when a pod is deleted
mockDriver.On("KillTask", mAny("*mesosproto.TaskID")).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) {
// mock driver.KillTask, should be invoked when a pod is deleted
lt.driver.On("KillTask",
mock.AnythingOfType("*mesosproto.TaskID"),
).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) {
killedTaskId := *(args.Get(0).(*mesos.TaskID))
assert.Equal(*launchedTask.taskInfo.TaskId, killedTaskId, "expected same TaskID as during launch")
})
killTaskCalled := mockDriver.Upon()
killTaskCalled := lt.driver.Upon()
// stop it again via the apiserver mock
podListWatch.Delete(pod, true) // notify watchers
lt.podsListWatch.Delete(pod, true) // notify watchers
// and wait for the driver killTask call with the correct TaskId
select {
case <-killTaskCalled:
// report back that the task is finished
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED))
lt.scheduler.StatusUpdate(
lt.driver,
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED),
)
case <-time.After(5 * time.Second):
case <-time.After(util.ForeverTestTimeout):
t.Fatal("timed out waiting for KillTask")
}
@ -613,8 +771,8 @@ func TestPlugin_LifeCycle(t *testing.T) {
assert.Equal(offers[1].Id.GetValue(), usedOffer.Id.GetValue())
assert.Equal(pod.Spec.NodeName, *usedOffer.Hostname)
testScheduler.OfferRescinded(mockDriver, offers[0].Id)
testScheduler.OfferRescinded(mockDriver, offers[2].Id)
lt.scheduler.OfferRescinded(lt.driver, offers[0].Id)
lt.scheduler.OfferRescinded(lt.driver, offers[2].Id)
// start pods:
// - which are failing while binding,
@ -622,15 +780,15 @@ func TestPlugin_LifeCycle(t *testing.T) {
// - with different states on the apiserver
failPodFromExecutor := func(task *mesos.TaskInfo) {
beforePodLookups := testApiServer.Stats(pod.Name)
beforePodLookups := lt.apiServer.Stats(pod.Name)
status := newTaskStatusForTask(task, mesos.TaskState_TASK_FAILED)
message := messages.CreateBindingFailure
status.Message = &message
testScheduler.StatusUpdate(mockDriver, status)
lt.scheduler.StatusUpdate(lt.driver, status)
// wait until pod is looked up at the apiserver
assertext.EventuallyTrue(t, time.Second, func() bool {
return testApiServer.Stats(pod.Name) == beforePodLookups+1
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
return lt.apiServer.Stats(pod.Name) == beforePodLookups+1
}, "expect that reconcileTask will access apiserver for pod %v", pod.Name)
}
@ -643,12 +801,12 @@ func TestPlugin_LifeCycle(t *testing.T) {
// 1. with pod deleted from the apiserver
// expected: pod is removed from internal task registry
pod, launchedTask, _ = launchTestPod()
podListWatch.Delete(pod, false) // not notifying the watchers
lt.podsListWatch.Delete(pod, false) // not notifying the watchers
failPodFromExecutor(launchedTask.taskInfo)
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
assertext.EventuallyTrue(t, time.Second, func() bool {
t, _ := p.api.tasks().ForPod(podKey)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
t, _ := lt.plugin.api.tasks().ForPod(podKey)
return t == nil
})
@ -667,7 +825,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
meta.BindingHostKey: *usedOffer.Hostname,
}
pod.Spec.NodeName = *usedOffer.Hostname
podListWatch.Modify(pod, true) // notifying the watchers
lt.podsListWatch.Modify(pod, true) // notifying the watchers
time.Sleep(time.Second / 2)
failPodFromExecutor(launchedTask.taskInfo)
}