Pass PodLW to executor in tests

pull/6/head
Dr. Stefan Schimanski 2015-09-29 13:21:59 +02:00
parent 49a5f89921
commit 9366ac4143
4 changed files with 25 additions and 19 deletions

View File

@ -139,7 +139,7 @@ type Config struct {
ExitFunc func(int) ExitFunc func(int)
PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error) PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
StaticPodsConfigPath string StaticPodsConfigPath string
PodLW cache.ListerWatcher PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic
LaunchGracePeriod time.Duration LaunchGracePeriod time.Duration
} }
@ -172,6 +172,10 @@ func New(config Config) *KubernetesExecutor {
} }
// watch pods from the given pod ListWatch // watch pods from the given pod ListWatch
if config.PodLW == nil {
// fail early to make debugging easier
panic("cannot create executor with nil PodLW")
}
_, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{ _, k.podController = framework.NewInformer(config.PodLW, &api.Pod{}, podRelistPeriod, &framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
pod := obj.(*api.Pod) pod := obj.(*api.Pod)

View File

@ -57,12 +57,7 @@ import (
// after Register is called. // after Register is called.
func TestExecutorRegister(t *testing.T) { func TestExecutorRegister(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
updates := make(chan interface{}, 1024) executor, updates := NewTestKubernetesExecutor()
executor := New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates,
SourceName: "executor_test",
})
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
@ -95,7 +90,7 @@ func TestExecutorRegister(t *testing.T) {
// connected after a call to Disconnected has occurred. // connected after a call to Disconnected has occurred.
func TestExecutorDisconnect(t *testing.T) { func TestExecutorDisconnect(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor := NewTestKubernetesExecutor() executor, _ := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
@ -110,7 +105,7 @@ func TestExecutorDisconnect(t *testing.T) {
// after a connection problem happens, followed by a call to Reregistered. // after a connection problem happens, followed by a call to Reregistered.
func TestExecutorReregister(t *testing.T) { func TestExecutorReregister(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor := NewTestKubernetesExecutor() executor, _ := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)
@ -166,6 +161,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
Phase: api.PodRunning, Phase: api.PodRunning,
}, nil }, nil
}, },
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
} }
executor := New(config) executor := New(config)
@ -330,6 +326,7 @@ func TestExecutorStaticPods(t *testing.T) {
}, nil }, nil
}, },
StaticPodsConfigPath: staticPodsConfigPath, StaticPodsConfigPath: staticPodsConfigPath,
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
} }
executor := New(config) executor := New(config)
hostname := "h1" hostname := "h1"
@ -417,6 +414,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
close(kubeletFinished) close(kubeletFinished)
}, },
KubeletFinished: kubeletFinished, KubeletFinished: kubeletFinished,
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
} }
executor := New(config) executor := New(config)
@ -579,6 +577,7 @@ func TestExecutorShutdown(t *testing.T) {
ExitFunc: func(_ int) { ExitFunc: func(_ int) {
atomic.AddInt32(&exitCalled, 1) atomic.AddInt32(&exitCalled, 1)
}, },
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
} }
executor := New(config) executor := New(config)
@ -608,7 +607,7 @@ func TestExecutorShutdown(t *testing.T) {
func TestExecutorsendFrameworkMessage(t *testing.T) { func TestExecutorsendFrameworkMessage(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor := NewTestKubernetesExecutor() executor, _ := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
executor.Registered(mockDriver, nil, nil, nil) executor.Registered(mockDriver, nil, nil, nil)

View File

@ -22,6 +22,7 @@ import (
"github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
) )
@ -64,16 +65,19 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status
return args.Get(0).(mesosproto.Status), args.Error(1) return args.Get(0).(mesosproto.Status), args.Error(1)
} }
func NewTestKubernetesExecutor() *KubernetesExecutor { func NewTestKubernetesExecutor() (*KubernetesExecutor, chan interface{}) {
updates := make(chan interface{}, 1024)
return New(Config{ return New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1024), Updates: updates,
}) PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
SourceName: "executor_test",
}), updates
} }
func TestExecutorNew(t *testing.T) { func TestExecutorNew(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
executor := NewTestKubernetesExecutor() executor, _ := NewTestKubernetesExecutor()
executor.Init(mockDriver) executor.Init(mockDriver)
assert.Equal(t, executor.isDone(), false, "executor should not be in Done state on initialization") assert.Equal(t, executor.isDone(), false, "executor should not be in Done state on initialization")

View File

@ -67,7 +67,7 @@ func (t *suicideTracker) makeJumper(_ jumper) jumper {
func TestSuicide_zeroTimeout(t *testing.T) { func TestSuicide_zeroTimeout(t *testing.T) {
defer glog.Flush() defer glog.Flush()
k := New(Config{}) k, _ := NewTestKubernetesExecutor()
tracker := &suicideTracker{suicideWatcher: k.suicideWatch} tracker := &suicideTracker{suicideWatcher: k.suicideWatch}
k.suicideWatch = tracker k.suicideWatch = tracker
@ -92,9 +92,8 @@ func TestSuicide_zeroTimeout(t *testing.T) {
func TestSuicide_WithTasks(t *testing.T) { func TestSuicide_WithTasks(t *testing.T) {
defer glog.Flush() defer glog.Flush()
k := New(Config{ k, _ := NewTestKubernetesExecutor()
SuicideTimeout: 50 * time.Millisecond, k.suicideTimeout = 50 * time.Millisecond
})
jumps := uint32(0) jumps := uint32(0)
tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps} tracker := &suicideTracker{suicideWatcher: k.suicideWatch, jumps: &jumps}