mirror of https://github.com/k3s-io/k3s
Merge pull request #14937 from krousey/flaky_job_controller
Removing custom test timeoutpull/6/head
commit
9419bfb402
|
@ -97,13 +97,9 @@ func NewJobController(kubeClient client.Interface) *JobController {
|
||||||
framework.ResourceEventHandlerFuncs{
|
framework.ResourceEventHandlerFuncs{
|
||||||
AddFunc: jm.enqueueController,
|
AddFunc: jm.enqueueController,
|
||||||
UpdateFunc: func(old, cur interface{}) {
|
UpdateFunc: func(old, cur interface{}) {
|
||||||
job := cur.(*experimental.Job)
|
if job := cur.(*experimental.Job); !isJobFinished(job) {
|
||||||
for _, c := range job.Status.Conditions {
|
jm.enqueueController(job)
|
||||||
if c.Type == experimental.JobComplete && c.Status == api.ConditionTrue {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
jm.enqueueController(cur)
|
|
||||||
},
|
},
|
||||||
DeleteFunc: jm.enqueueController,
|
DeleteFunc: jm.enqueueController,
|
||||||
},
|
},
|
||||||
|
@ -449,6 +445,15 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isJobFinished(j *experimental.Job) bool {
|
||||||
|
for _, c := range j.Status.Conditions {
|
||||||
|
if c.Type == experimental.JobComplete && c.Status == api.ConditionTrue {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
|
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
|
||||||
type byCreationTimestamp []experimental.Job
|
type byCreationTimestamp []experimental.Job
|
||||||
|
|
||||||
|
|
|
@ -32,12 +32,6 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Give each test that starts a background controller up to 1/2 a second.
|
|
||||||
// Since we need to start up a goroutine to test watch, this routine needs
|
|
||||||
// to get cpu before the test can complete. If the test is starved of cpu,
|
|
||||||
// the watch test will take up to 1/2 a second before timing out.
|
|
||||||
const controllerTimeout = 500 * time.Millisecond
|
|
||||||
|
|
||||||
var alwaysReady = func() bool { return true }
|
var alwaysReady = func() bool { return true }
|
||||||
|
|
||||||
func newJob(parallelism, completions int) *experimental.Job {
|
func newJob(parallelism, completions int) *experimental.Job {
|
||||||
|
@ -262,20 +256,11 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unxpected error when syncing jobs, got %v", err)
|
t.Errorf("Unxpected error when syncing jobs, got %v", err)
|
||||||
}
|
}
|
||||||
ch := make(chan interface{})
|
t.Log("Waiting for a job in the queue")
|
||||||
go func() {
|
key, _ := manager.queue.Get()
|
||||||
item, _ := manager.queue.Get()
|
expectedKey := getKey(job, t)
|
||||||
ch <- item
|
if key != expectedKey {
|
||||||
}()
|
t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key)
|
||||||
select {
|
|
||||||
case key := <-ch:
|
|
||||||
expectedKey := getKey(job, t)
|
|
||||||
if key != expectedKey {
|
|
||||||
t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key)
|
|
||||||
}
|
|
||||||
case <-time.After(controllerTimeout):
|
|
||||||
manager.queue.ShutDown()
|
|
||||||
t.Errorf("Expected to find a job in the queue, found none.")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,7 +387,7 @@ func TestWatchJobs(t *testing.T) {
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
|
|
||||||
var testJob experimental.Job
|
var testJob experimental.Job
|
||||||
received := make(chan string)
|
received := make(chan struct{})
|
||||||
|
|
||||||
// The update sent through the fakeWatcher should make its way into the workqueue,
|
// The update sent through the fakeWatcher should make its way into the workqueue,
|
||||||
// and eventually into the syncHandler.
|
// and eventually into the syncHandler.
|
||||||
|
@ -416,7 +401,7 @@ func TestWatchJobs(t *testing.T) {
|
||||||
if !api.Semantic.DeepDerivative(job, testJob) {
|
if !api.Semantic.DeepDerivative(job, testJob) {
|
||||||
t.Errorf("Expected %#v, but got %#v", testJob, job)
|
t.Errorf("Expected %#v, but got %#v", testJob, job)
|
||||||
}
|
}
|
||||||
received <- key
|
close(received)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// Start only the job watcher and the workqueue, send a watch event,
|
// Start only the job watcher and the workqueue, send a watch event,
|
||||||
|
@ -429,31 +414,32 @@ func TestWatchJobs(t *testing.T) {
|
||||||
// We're sending new job to see if it reaches syncHandler.
|
// We're sending new job to see if it reaches syncHandler.
|
||||||
testJob.Name = "foo"
|
testJob.Name = "foo"
|
||||||
fakeWatch.Add(&testJob)
|
fakeWatch.Add(&testJob)
|
||||||
select {
|
t.Log("Waiting for job to reach syncHandler")
|
||||||
case <-received:
|
<-received
|
||||||
case <-time.After(controllerTimeout):
|
}
|
||||||
t.Errorf("Expected 1 call but got 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
// We're sending fake finished job, to see if it reaches syncHandler - it should not,
|
func TestIsJobFinished(t *testing.T) {
|
||||||
// since we're filtering out finished jobs.
|
job := &experimental.Job{
|
||||||
testJobv2 := experimental.Job{
|
|
||||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
|
||||||
Status: experimental.JobStatus{
|
Status: experimental.JobStatus{
|
||||||
Conditions: []experimental.JobCondition{{
|
Conditions: []experimental.JobCondition{{
|
||||||
Type: experimental.JobComplete,
|
Type: experimental.JobComplete,
|
||||||
Status: api.ConditionTrue,
|
Status: api.ConditionTrue,
|
||||||
LastProbeTime: unversioned.Now(),
|
|
||||||
LastTransitionTime: unversioned.Now(),
|
|
||||||
}},
|
}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
fakeWatch.Modify(&testJobv2)
|
|
||||||
|
|
||||||
select {
|
if !isJobFinished(job) {
|
||||||
case <-received:
|
t.Error("Job was expected to be finished")
|
||||||
t.Errorf("Expected 0 call but got 1")
|
}
|
||||||
case <-time.After(controllerTimeout):
|
|
||||||
|
job.Status.Conditions[0].Status = api.ConditionFalse
|
||||||
|
if isJobFinished(job) {
|
||||||
|
t.Error("Job was not expected to be finished")
|
||||||
|
}
|
||||||
|
|
||||||
|
job.Status.Conditions[0].Status = api.ConditionUnknown
|
||||||
|
if isJobFinished(job) {
|
||||||
|
t.Error("Job was not expected to be finished")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -467,7 +453,7 @@ func TestWatchPods(t *testing.T) {
|
||||||
// Put one job and one pod into the store
|
// Put one job and one pod into the store
|
||||||
testJob := newJob(2, 2)
|
testJob := newJob(2, 2)
|
||||||
manager.jobStore.Store.Add(testJob)
|
manager.jobStore.Store.Add(testJob)
|
||||||
received := make(chan string)
|
received := make(chan struct{})
|
||||||
// The pod update sent through the fakeWatcher should figure out the managing job and
|
// The pod update sent through the fakeWatcher should figure out the managing job and
|
||||||
// send it into the syncHandler.
|
// send it into the syncHandler.
|
||||||
manager.syncHandler = func(key string) error {
|
manager.syncHandler = func(key string) error {
|
||||||
|
@ -495,9 +481,6 @@ func TestWatchPods(t *testing.T) {
|
||||||
testPod.Status.Phase = api.PodFailed
|
testPod.Status.Phase = api.PodFailed
|
||||||
fakeWatch.Add(&testPod)
|
fakeWatch.Add(&testPod)
|
||||||
|
|
||||||
select {
|
t.Log("Waiting for pod to reach syncHandler")
|
||||||
case <-received:
|
<-received
|
||||||
case <-time.After(controllerTimeout):
|
|
||||||
t.Errorf("Expected 1 call but got 0")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue