diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 6aa41f8a3c..87ec0c4339 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -97,13 +97,9 @@ func NewJobController(kubeClient client.Interface) *JobController { framework.ResourceEventHandlerFuncs{ AddFunc: jm.enqueueController, UpdateFunc: func(old, cur interface{}) { - job := cur.(*experimental.Job) - for _, c := range job.Status.Conditions { - if c.Type == experimental.JobComplete && c.Status == api.ConditionTrue { - return - } + if job := cur.(*experimental.Job); !isJobFinished(job) { + jm.enqueueController(job) } - jm.enqueueController(cur) }, DeleteFunc: jm.enqueueController, }, @@ -449,6 +445,15 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int { return result } +func isJobFinished(j *experimental.Job) bool { + for _, c := range j.Status.Conditions { + if c.Type == experimental.JobComplete && c.Status == api.ConditionTrue { + return true + } + } + return false +} + // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. type byCreationTimestamp []experimental.Job diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index d5e0ea7afc..4789e20e70 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -32,12 +32,6 @@ import ( "k8s.io/kubernetes/pkg/watch" ) -// Give each test that starts a background controller up to 1/2 a second. -// Since we need to start up a goroutine to test watch, this routine needs -// to get cpu before the test can complete. If the test is starved of cpu, -// the watch test will take up to 1/2 a second before timing out. -const controllerTimeout = 500 * time.Millisecond - var alwaysReady = func() bool { return true } func newJob(parallelism, completions int) *experimental.Job { @@ -262,20 +256,11 @@ func TestSyncJobUpdateRequeue(t *testing.T) { if err != nil { t.Errorf("Unxpected error when syncing jobs, got %v", err) } - ch := make(chan interface{}) - go func() { - item, _ := manager.queue.Get() - ch <- item - }() - select { - case key := <-ch: - expectedKey := getKey(job, t) - if key != expectedKey { - t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key) - } - case <-time.After(controllerTimeout): - manager.queue.ShutDown() - t.Errorf("Expected to find a job in the queue, found none.") + t.Log("Waiting for a job in the queue") + key, _ := manager.queue.Get() + expectedKey := getKey(job, t) + if key != expectedKey { + t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key) } } @@ -402,7 +387,7 @@ func TestWatchJobs(t *testing.T) { manager.podStoreSynced = alwaysReady var testJob experimental.Job - received := make(chan string) + received := make(chan struct{}) // The update sent through the fakeWatcher should make its way into the workqueue, // and eventually into the syncHandler. @@ -416,7 +401,7 @@ func TestWatchJobs(t *testing.T) { if !api.Semantic.DeepDerivative(job, testJob) { t.Errorf("Expected %#v, but got %#v", testJob, job) } - received <- key + close(received) return nil } // Start only the job watcher and the workqueue, send a watch event, @@ -429,31 +414,32 @@ func TestWatchJobs(t *testing.T) { // We're sending new job to see if it reaches syncHandler. testJob.Name = "foo" fakeWatch.Add(&testJob) - select { - case <-received: - case <-time.After(controllerTimeout): - t.Errorf("Expected 1 call but got 0") - } + t.Log("Waiting for job to reach syncHandler") + <-received +} - // We're sending fake finished job, to see if it reaches syncHandler - it should not, - // since we're filtering out finished jobs. - testJobv2 := experimental.Job{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, +func TestIsJobFinished(t *testing.T) { + job := &experimental.Job{ Status: experimental.JobStatus{ Conditions: []experimental.JobCondition{{ - Type: experimental.JobComplete, - Status: api.ConditionTrue, - LastProbeTime: unversioned.Now(), - LastTransitionTime: unversioned.Now(), + Type: experimental.JobComplete, + Status: api.ConditionTrue, }}, }, } - fakeWatch.Modify(&testJobv2) - select { - case <-received: - t.Errorf("Expected 0 call but got 1") - case <-time.After(controllerTimeout): + if !isJobFinished(job) { + t.Error("Job was expected to be finished") + } + + job.Status.Conditions[0].Status = api.ConditionFalse + if isJobFinished(job) { + t.Error("Job was not expected to be finished") + } + + job.Status.Conditions[0].Status = api.ConditionUnknown + if isJobFinished(job) { + t.Error("Job was not expected to be finished") } } @@ -467,7 +453,7 @@ func TestWatchPods(t *testing.T) { // Put one job and one pod into the store testJob := newJob(2, 2) manager.jobStore.Store.Add(testJob) - received := make(chan string) + received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. manager.syncHandler = func(key string) error { @@ -495,9 +481,6 @@ func TestWatchPods(t *testing.T) { testPod.Status.Phase = api.PodFailed fakeWatch.Add(&testPod) - select { - case <-received: - case <-time.After(controllerTimeout): - t.Errorf("Expected 1 call but got 0") - } + t.Log("Waiting for pod to reach syncHandler") + <-received }