mirror of https://github.com/k3s-io/k3s
Job failure policy support in JobController
Job failure policy integration in JobController. From the JobSpec.BackoffLimit the JobController will define the backoff duration between Job retry. It use the ```workqueue.RateLimitingInterface``` to store the number of "retry" as "requeue" and the default Job backoff initial duration is set during the initialization of the ```workqueue.RateLimiter. Since the number of retry for each job is store in a local structure "JobController.queue" if the JobController restarts the number of retries will be lost and the backoff duration will be reset to 0. Add e2e test for Job backoff failure policypull/6/head
parent
28857a2f02
commit
1dbef2f113
|
@ -18,6 +18,7 @@ package job
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -50,6 +51,13 @@ import (
|
|||
// controllerKind contains the schema.GroupVersionKind for this controller type.
|
||||
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
|
||||
|
||||
const (
|
||||
// DefaultJobBackOff is the max backoff period, exported for the e2e test
|
||||
DefaultJobBackOff = 10 * time.Second
|
||||
// MaxJobBackOff is the max backoff period, exported for the e2e test
|
||||
MaxJobBackOff = 360 * time.Second
|
||||
)
|
||||
|
||||
type JobController struct {
|
||||
kubeClient clientset.Interface
|
||||
podControl controller.PodControlInterface
|
||||
|
@ -96,7 +104,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
|
|||
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
|
||||
},
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "job"),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
|
||||
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
|
||||
}
|
||||
|
||||
|
@ -118,6 +126,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
|
|||
|
||||
jm.updateHandler = jm.updateJobStatus
|
||||
jm.syncHandler = jm.syncJob
|
||||
|
||||
return jm
|
||||
}
|
||||
|
||||
|
@ -312,7 +321,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
jm.queue.Add(key)
|
||||
jm.enqueueController(curJob)
|
||||
// check if need to add a new rsync for ActiveDeadlineSeconds
|
||||
if curJob.Status.StartTime != nil {
|
||||
curADS := curJob.Spec.ActiveDeadlineSeconds
|
||||
|
@ -333,20 +342,23 @@ func (jm *JobController) updateJob(old, cur interface{}) {
|
|||
}
|
||||
|
||||
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
||||
func (jm *JobController) enqueueController(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
func (jm *JobController) enqueueController(job interface{}) {
|
||||
key, err := controller.KeyFunc(job)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err))
|
||||
return
|
||||
}
|
||||
|
||||
// Retrieves the backoff duration for this Job
|
||||
backoff := getBackoff(jm.queue, key)
|
||||
|
||||
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
|
||||
// deterministically avoid syncing controllers that fight over pods. Currently, we only
|
||||
// ensure that the same controller is synced for a given pod. When we periodically relist
|
||||
// all controllers there will still be some replica instability. One way to handle this is
|
||||
// by querying the store for all controllers that this rc overlaps, as well as all
|
||||
// controllers that overlap this rc, and sorting them.
|
||||
jm.queue.Add(key)
|
||||
jm.queue.AddAfter(key, backoff)
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
|
@ -432,6 +444,15 @@ func (jm *JobController) syncJob(key string) error {
|
|||
}
|
||||
job := *sharedJob
|
||||
|
||||
// if job was finished previously, we don't want to redo the termination
|
||||
if IsJobFinished(&job) {
|
||||
jm.queue.Forget(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// retrieve the previous number of retry
|
||||
previousRetry := jm.queue.NumRequeues(key)
|
||||
|
||||
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
|
||||
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
||||
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
|
||||
|
@ -457,34 +478,28 @@ func (jm *JobController) syncJob(key string) error {
|
|||
jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
|
||||
}
|
||||
}
|
||||
// if job was finished previously, we don't want to redo the termination
|
||||
if IsJobFinished(&job) {
|
||||
return nil
|
||||
}
|
||||
|
||||
var manageJobErr error
|
||||
if pastActiveDeadline(&job) {
|
||||
// TODO: below code should be replaced with pod termination resulting in
|
||||
// pod failures, rather than killing pods. Unfortunately none such solution
|
||||
// exists ATM. There's an open discussion in the topic in
|
||||
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
|
||||
// some sort of solution to above problem.
|
||||
// kill remaining active pods
|
||||
wait := sync.WaitGroup{}
|
||||
errCh := make(chan error, int(active))
|
||||
wait.Add(int(active))
|
||||
for i := int32(0); i < active; i++ {
|
||||
go func(ix int32) {
|
||||
defer wait.Done()
|
||||
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil {
|
||||
defer utilruntime.HandleError(err)
|
||||
glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", activePods[ix].Name, job.Namespace, job.Name)
|
||||
errCh <- err
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wait.Wait()
|
||||
jobFailed := false
|
||||
var failureReason string
|
||||
var failureMessage string
|
||||
|
||||
jobHaveNewFailure := failed > job.Status.Failed
|
||||
|
||||
// check if the number of failed jobs increased since the last syncJob
|
||||
if jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) {
|
||||
jobFailed = true
|
||||
failureReason = "BackoffLimitExceeded"
|
||||
failureMessage = "Job has reach the specified backoff limit"
|
||||
} else if pastActiveDeadline(&job) {
|
||||
jobFailed = true
|
||||
failureReason = "DeadlineExceeded"
|
||||
failureMessage = "Job was active longer than specified deadline"
|
||||
}
|
||||
|
||||
if jobFailed {
|
||||
errCh := make(chan error, active)
|
||||
jm.deleteJobPods(&job, activePods, errCh)
|
||||
select {
|
||||
case manageJobErr = <-errCh:
|
||||
if manageJobErr != nil {
|
||||
|
@ -496,8 +511,8 @@ func (jm *JobController) syncJob(key string) error {
|
|||
// update status values accordingly
|
||||
failed += active
|
||||
active = 0
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||
jm.recorder.Event(&job, v1.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
|
||||
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
|
||||
} else {
|
||||
if jobNeedsSync && job.DeletionTimestamp == nil {
|
||||
active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
|
||||
|
@ -546,9 +561,41 @@ func (jm *JobController) syncJob(key string) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if jobHaveNewFailure {
|
||||
// re-enqueue Job after the backoff period
|
||||
jm.queue.AddRateLimited(key)
|
||||
} else {
|
||||
// if no new Failure the job backoff period can be reset
|
||||
jm.queue.Forget(key)
|
||||
}
|
||||
|
||||
return manageJobErr
|
||||
}
|
||||
|
||||
func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) {
|
||||
// TODO: below code should be replaced with pod termination resulting in
|
||||
// pod failures, rather than killing pods. Unfortunately none such solution
|
||||
// exists ATM. There's an open discussion in the topic in
|
||||
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
|
||||
// some sort of solution to above problem.
|
||||
// kill remaining active pods
|
||||
wait := sync.WaitGroup{}
|
||||
nbPods := len(pods)
|
||||
wait.Add(nbPods)
|
||||
for i := int32(0); i < int32(nbPods); i++ {
|
||||
go func(ix int32) {
|
||||
defer wait.Done()
|
||||
if err := jm.podControl.DeletePod(job.Namespace, pods[ix].Name, job); err != nil {
|
||||
defer utilruntime.HandleError(err)
|
||||
glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", pods[ix].Name, job.Namespace, job.Name)
|
||||
errCh <- err
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wait.Wait()
|
||||
}
|
||||
|
||||
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
|
||||
func pastActiveDeadline(job *batch.Job) bool {
|
||||
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
|
||||
|
@ -726,6 +773,26 @@ func (jm *JobController) updateJobStatus(job *batch.Job) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration {
|
||||
exp := queue.NumRequeues(key)
|
||||
|
||||
if exp <= 0 {
|
||||
return time.Duration(0)
|
||||
}
|
||||
|
||||
// The backoff is capped such that 'calculated' value never overflows.
|
||||
backoff := float64(DefaultJobBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1))
|
||||
if backoff > math.MaxInt64 {
|
||||
return MaxJobBackOff
|
||||
}
|
||||
|
||||
calculated := time.Duration(backoff)
|
||||
if calculated > MaxJobBackOff {
|
||||
return MaxJobBackOff
|
||||
}
|
||||
return calculated
|
||||
}
|
||||
|
||||
// filterPods returns pods based on their phase.
|
||||
func filterPods(pods []*v1.Pod, phase v1.PodPhase) int {
|
||||
result := 0
|
||||
|
|
|
@ -43,7 +43,7 @@ import (
|
|||
|
||||
var alwaysReady = func() bool { return true }
|
||||
|
||||
func newJob(parallelism, completions int32) *batch.Job {
|
||||
func newJob(parallelism, completions, backoffLimit int32) *batch.Job {
|
||||
j := &batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foobar",
|
||||
|
@ -80,6 +80,8 @@ func newJob(parallelism, completions int32) *batch.Job {
|
|||
} else {
|
||||
j.Spec.Parallelism = nil
|
||||
}
|
||||
j.Spec.BackoffLimit = &backoffLimit
|
||||
|
||||
return j
|
||||
}
|
||||
|
||||
|
@ -119,12 +121,16 @@ func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod {
|
|||
}
|
||||
|
||||
func TestControllerSyncJob(t *testing.T) {
|
||||
jobConditionComplete := batch.JobComplete
|
||||
jobConditionFailed := batch.JobFailed
|
||||
|
||||
testCases := map[string]struct {
|
||||
// job setup
|
||||
parallelism int32
|
||||
completions int32
|
||||
deleting bool
|
||||
podLimit int
|
||||
parallelism int32
|
||||
completions int32
|
||||
backoffLimit int32
|
||||
deleting bool
|
||||
podLimit int
|
||||
|
||||
// pod setup
|
||||
podControllerError error
|
||||
|
@ -134,107 +140,113 @@ func TestControllerSyncJob(t *testing.T) {
|
|||
failedPods int32
|
||||
|
||||
// expectations
|
||||
expectedCreations int32
|
||||
expectedDeletions int32
|
||||
expectedActive int32
|
||||
expectedSucceeded int32
|
||||
expectedFailed int32
|
||||
expectedComplete bool
|
||||
expectedCreations int32
|
||||
expectedDeletions int32
|
||||
expectedActive int32
|
||||
expectedSucceeded int32
|
||||
expectedFailed int32
|
||||
expectedCondition *batch.JobConditionType
|
||||
expectedConditionReason string
|
||||
}{
|
||||
"job start": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 0, 0, 0, 0,
|
||||
2, 0, 2, 0, 0, false,
|
||||
2, 0, 2, 0, 0, nil, "",
|
||||
},
|
||||
"WQ job start": {
|
||||
2, -1, false, 0,
|
||||
2, -1, 6, false, 0,
|
||||
nil, 0, 0, 0, 0,
|
||||
2, 0, 2, 0, 0, false,
|
||||
2, 0, 2, 0, 0, nil, "",
|
||||
},
|
||||
"pending pods": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 2, 0, 0, 0,
|
||||
0, 0, 2, 0, 0, false,
|
||||
0, 0, 2, 0, 0, nil, "",
|
||||
},
|
||||
"correct # of pods": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 0, 2, 0, 0,
|
||||
0, 0, 2, 0, 0, false,
|
||||
0, 0, 2, 0, 0, nil, "",
|
||||
},
|
||||
"WQ job: correct # of pods": {
|
||||
2, -1, false, 0,
|
||||
2, -1, 6, false, 0,
|
||||
nil, 0, 2, 0, 0,
|
||||
0, 0, 2, 0, 0, false,
|
||||
0, 0, 2, 0, 0, nil, "",
|
||||
},
|
||||
"too few active pods": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 0, 1, 1, 0,
|
||||
1, 0, 2, 1, 0, false,
|
||||
1, 0, 2, 1, 0, nil, "",
|
||||
},
|
||||
"too few active pods with a dynamic job": {
|
||||
2, -1, false, 0,
|
||||
2, -1, 6, false, 0,
|
||||
nil, 0, 1, 0, 0,
|
||||
1, 0, 2, 0, 0, false,
|
||||
1, 0, 2, 0, 0, nil, "",
|
||||
},
|
||||
"too few active pods, with controller error": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
fmt.Errorf("Fake error"), 0, 1, 1, 0,
|
||||
1, 0, 1, 1, 0, false,
|
||||
1, 0, 1, 1, 0, nil, "",
|
||||
},
|
||||
"too many active pods": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 0, 3, 0, 0,
|
||||
0, 1, 2, 0, 0, false,
|
||||
0, 1, 2, 0, 0, nil, "",
|
||||
},
|
||||
"too many active pods, with controller error": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
fmt.Errorf("Fake error"), 0, 3, 0, 0,
|
||||
0, 1, 3, 0, 0, false,
|
||||
0, 1, 3, 0, 0, nil, "",
|
||||
},
|
||||
"failed pod": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 0, 1, 1, 1,
|
||||
1, 0, 2, 1, 1, false,
|
||||
1, 0, 2, 1, 1, nil, "",
|
||||
},
|
||||
"job finish": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 0, 0, 5, 0,
|
||||
0, 0, 0, 5, 0, true,
|
||||
0, 0, 0, 5, 0, nil, "",
|
||||
},
|
||||
"WQ job finishing": {
|
||||
2, -1, false, 0,
|
||||
2, -1, 6, false, 0,
|
||||
nil, 0, 1, 1, 0,
|
||||
0, 0, 1, 1, 0, false,
|
||||
0, 0, 1, 1, 0, nil, "",
|
||||
},
|
||||
"WQ job all finished": {
|
||||
2, -1, false, 0,
|
||||
2, -1, 6, false, 0,
|
||||
nil, 0, 0, 2, 0,
|
||||
0, 0, 0, 2, 0, true,
|
||||
0, 0, 0, 2, 0, &jobConditionComplete, "",
|
||||
},
|
||||
"WQ job all finished despite one failure": {
|
||||
2, -1, false, 0,
|
||||
2, -1, 6, false, 0,
|
||||
nil, 0, 0, 1, 1,
|
||||
0, 0, 0, 1, 1, true,
|
||||
0, 0, 0, 1, 1, &jobConditionComplete, "",
|
||||
},
|
||||
"more active pods than completions": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 0, 10, 0, 0,
|
||||
0, 8, 2, 0, 0, false,
|
||||
0, 8, 2, 0, 0, nil, "",
|
||||
},
|
||||
"status change": {
|
||||
2, 5, false, 0,
|
||||
2, 5, 6, false, 0,
|
||||
nil, 0, 2, 2, 0,
|
||||
0, 0, 2, 2, 0, false,
|
||||
0, 0, 2, 2, 0, nil, "",
|
||||
},
|
||||
"deleting job": {
|
||||
2, 5, true, 0,
|
||||
2, 5, 6, true, 0,
|
||||
nil, 1, 1, 1, 0,
|
||||
0, 0, 2, 1, 0, false,
|
||||
0, 0, 2, 1, 0, nil, "",
|
||||
},
|
||||
"limited pods": {
|
||||
100, 200, false, 10,
|
||||
100, 200, 6, false, 10,
|
||||
nil, 0, 0, 0, 0,
|
||||
10, 0, 10, 0, 0, false,
|
||||
10, 0, 10, 0, 0, nil, "",
|
||||
},
|
||||
"to many job sync failure": {
|
||||
2, 5, 0, true, 0,
|
||||
nil, 0, 0, 0, 1,
|
||||
0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -253,7 +265,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||
}
|
||||
|
||||
// job & pods setup
|
||||
job := newJob(tc.parallelism, tc.completions)
|
||||
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
|
||||
if tc.deleting {
|
||||
now := metav1.Now()
|
||||
job.DeletionTimestamp = &now
|
||||
|
@ -330,7 +342,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||
t.Errorf("%s: .status.startTime was not set", name)
|
||||
}
|
||||
// validate conditions
|
||||
if tc.expectedComplete && !getCondition(actual, batch.JobComplete) {
|
||||
if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
|
||||
t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
|
||||
}
|
||||
// validate slow start
|
||||
|
@ -351,6 +363,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||
completions int32
|
||||
activeDeadlineSeconds int64
|
||||
startTime int64
|
||||
backoffLimit int32
|
||||
|
||||
// pod setup
|
||||
activePods int32
|
||||
|
@ -358,25 +371,31 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||
failedPods int32
|
||||
|
||||
// expectations
|
||||
expectedDeletions int32
|
||||
expectedActive int32
|
||||
expectedSucceeded int32
|
||||
expectedFailed int32
|
||||
expectedDeletions int32
|
||||
expectedActive int32
|
||||
expectedSucceeded int32
|
||||
expectedFailed int32
|
||||
expectedConditionReason string
|
||||
}{
|
||||
"activeDeadlineSeconds less than single pod execution": {
|
||||
1, 1, 10, 15,
|
||||
1, 1, 10, 15, 6,
|
||||
1, 0, 0,
|
||||
1, 0, 0, 1,
|
||||
1, 0, 0, 1, "DeadlineExceeded",
|
||||
},
|
||||
"activeDeadlineSeconds bigger than single pod execution": {
|
||||
1, 2, 10, 15,
|
||||
1, 2, 10, 15, 6,
|
||||
1, 1, 0,
|
||||
1, 0, 1, 1,
|
||||
1, 0, 1, 1, "DeadlineExceeded",
|
||||
},
|
||||
"activeDeadlineSeconds times-out before any pod starts": {
|
||||
1, 1, 10, 10,
|
||||
1, 1, 10, 10, 6,
|
||||
0, 0, 0,
|
||||
0, 0, 0, 0,
|
||||
0, 0, 0, 0, "DeadlineExceeded",
|
||||
},
|
||||
"activeDeadlineSeconds with backofflimit reach": {
|
||||
1, 1, 1, 10, 0,
|
||||
1, 0, 2,
|
||||
1, 0, 0, 3, "BackoffLimitExceeded",
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -395,7 +414,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||
}
|
||||
|
||||
// job & pods setup
|
||||
job := newJob(tc.parallelism, tc.completions)
|
||||
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
|
||||
job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
|
||||
start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0)
|
||||
job.Status.StartTime = &start
|
||||
|
@ -438,15 +457,15 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||
t.Errorf("%s: .status.startTime was not set", name)
|
||||
}
|
||||
// validate conditions
|
||||
if !getCondition(actual, batch.JobFailed) {
|
||||
if !getCondition(actual, batch.JobFailed, tc.expectedConditionReason) {
|
||||
t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getCondition(job *batch.Job, condition batch.JobConditionType) bool {
|
||||
func getCondition(job *batch.Job, condition batch.JobConditionType, reason string) bool {
|
||||
for _, v := range job.Status.Conditions {
|
||||
if v.Type == condition && v.Status == v1.ConditionTrue {
|
||||
if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -466,7 +485,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
job := newJob(1, 1)
|
||||
job := newJob(1, 1, 6)
|
||||
activeDeadlineSeconds := int64(10)
|
||||
job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
|
||||
start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
|
||||
|
@ -496,7 +515,7 @@ func TestSyncJobComplete(t *testing.T) {
|
|||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
|
||||
job := newJob(1, 1)
|
||||
job := newJob(1, 1, 6)
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
|
||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
|
@ -521,7 +540,7 @@ func TestSyncJobDeleted(t *testing.T) {
|
|||
manager.podStoreSynced = alwaysReady
|
||||
manager.jobStoreSynced = alwaysReady
|
||||
manager.updateHandler = func(job *batch.Job) error { return nil }
|
||||
job := newJob(2, 2)
|
||||
job := newJob(2, 2, 6)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when syncing jobs %v", err)
|
||||
|
@ -546,7 +565,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
|||
manager.queue.AddRateLimited(getKey(job, t))
|
||||
return updateError
|
||||
}
|
||||
job := newJob(2, 2)
|
||||
job := newJob(2, 2, 6)
|
||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err == nil || err != updateError {
|
||||
|
@ -659,9 +678,9 @@ func TestGetPodsForJob(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
|
||||
|
@ -700,7 +719,7 @@ func TestGetPodsForJob(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetPodsForJobAdopt(t *testing.T) {
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
clientset := fake.NewSimpleClientset(job1)
|
||||
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
|
@ -726,7 +745,7 @@ func TestGetPodsForJobAdopt(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job1.DeletionTimestamp = &metav1.Time{}
|
||||
clientset := fake.NewSimpleClientset(job1)
|
||||
|
@ -756,7 +775,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) {
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
// The up-to-date object says it's being deleted.
|
||||
job1.DeletionTimestamp = &metav1.Time{}
|
||||
|
@ -795,7 +814,7 @@ func TestGetPodsForJobRelease(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
|
||||
|
@ -824,9 +843,9 @@ func TestAddPod(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
|
||||
|
@ -869,11 +888,11 @@ func TestAddPodOrphan(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
job3 := newJob(1, 1)
|
||||
job3 := newJob(1, 1, 6)
|
||||
job3.Name = "job3"
|
||||
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
|
@ -897,9 +916,9 @@ func TestUpdatePod(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
|
||||
|
@ -946,9 +965,9 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
|
||||
|
@ -973,9 +992,9 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
|
||||
|
@ -999,9 +1018,9 @@ func TestUpdatePodRelease(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
|
||||
|
@ -1025,9 +1044,9 @@ func TestDeletePod(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
|
||||
|
@ -1070,11 +1089,11 @@ func TestDeletePodOrphan(t *testing.T) {
|
|||
jm.podStoreSynced = alwaysReady
|
||||
jm.jobStoreSynced = alwaysReady
|
||||
|
||||
job1 := newJob(1, 1)
|
||||
job1 := newJob(1, 1, 6)
|
||||
job1.Name = "job1"
|
||||
job2 := newJob(1, 1)
|
||||
job2 := newJob(1, 1, 6)
|
||||
job2.Name = "job2"
|
||||
job3 := newJob(1, 1)
|
||||
job3 := newJob(1, 1, 6)
|
||||
job3.Name = "job3"
|
||||
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
|
||||
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||
|
@ -1113,7 +1132,7 @@ func TestSyncJobExpectations(t *testing.T) {
|
|||
manager.jobStoreSynced = alwaysReady
|
||||
manager.updateHandler = func(job *batch.Job) error { return nil }
|
||||
|
||||
job := newJob(2, 2)
|
||||
job := newJob(2, 2, 6)
|
||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||
pods := newPodList(2, v1.PodPending, job)
|
||||
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
||||
|
@ -1181,7 +1200,7 @@ func TestWatchJobs(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWatchPods(t *testing.T) {
|
||||
testJob := newJob(2, 2)
|
||||
testJob := newJob(2, 2, 6)
|
||||
clientset := fake.NewSimpleClientset(testJob)
|
||||
fakeWatch := watch.NewFake()
|
||||
clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
|
|
|
@ -35,11 +35,12 @@ var _ = SIGDescribe("Job", func() {
|
|||
f := framework.NewDefaultFramework("job")
|
||||
parallelism := int32(2)
|
||||
completions := int32(4)
|
||||
backoffLimit := int32(6) // default value
|
||||
|
||||
// Simplest case: all pods succeed promptly
|
||||
It("should run a job to completion when tasks succeed", func() {
|
||||
By("Creating a job")
|
||||
job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil)
|
||||
job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
|
||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
|
@ -58,7 +59,7 @@ var _ = SIGDescribe("Job", func() {
|
|||
// up to 5 minutes between restarts, making test timeouts
|
||||
// due to successive failures too likely with a reasonable
|
||||
// test timeout.
|
||||
job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil)
|
||||
job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit)
|
||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
|
@ -76,7 +77,7 @@ var _ = SIGDescribe("Job", func() {
|
|||
// Worst case analysis: 15 failures, each taking 1 minute to
|
||||
// run due to some slowness, 1 in 2^15 chance of happening,
|
||||
// causing test flake. Should be very rare.
|
||||
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil)
|
||||
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
|
||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
|
@ -88,7 +89,7 @@ var _ = SIGDescribe("Job", func() {
|
|||
It("should exceed active deadline", func() {
|
||||
By("Creating a job")
|
||||
var activeDeadlineSeconds int64 = 1
|
||||
job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds)
|
||||
job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
|
||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
By("Ensuring job past active deadline")
|
||||
|
@ -98,7 +99,7 @@ var _ = SIGDescribe("Job", func() {
|
|||
|
||||
It("should delete a job", func() {
|
||||
By("Creating a job")
|
||||
job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil)
|
||||
job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
|
||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
|
@ -121,7 +122,7 @@ var _ = SIGDescribe("Job", func() {
|
|||
|
||||
It("should adopt matching orphans and release non-matching pods", func() {
|
||||
By("Creating a job")
|
||||
job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil)
|
||||
job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
|
||||
// Replace job with the one returned from Create() so it has the UID.
|
||||
// Save Kind since it won't be populated in the returned job.
|
||||
kind := job.Kind
|
||||
|
@ -172,4 +173,22 @@ var _ = SIGDescribe("Job", func() {
|
|||
},
|
||||
)).To(Succeed(), "wait for pod %q to be released", pod.Name)
|
||||
})
|
||||
|
||||
It("should exceed backoffLimit", func() {
|
||||
By("Creating a job")
|
||||
job := framework.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, 0)
|
||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
By("Ensuring job exceed backofflimit")
|
||||
|
||||
err = framework.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(30)*time.Second, "BackoffLimitExceeded")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By("Checking that only one pod created and status is failed")
|
||||
pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(pods.Items).To(HaveLen(1))
|
||||
pod := pods.Items[0]
|
||||
Expect(pod.Status.Phase).To(Equal(v1.PodFailed))
|
||||
})
|
||||
})
|
||||
|
|
|
@ -43,7 +43,7 @@ const (
|
|||
// first time it is run and succeeds subsequently. name is the Name of the Job. RestartPolicy indicates the restart
|
||||
// policy of the containers in which the Pod is running. Parallelism is the Job's parallelism, and completions is the
|
||||
// Job's required number of completions.
|
||||
func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64) *batch.Job {
|
||||
func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64, backoffLimit int32) *batch.Job {
|
||||
job := &batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
|
@ -55,6 +55,7 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl
|
|||
ActiveDeadlineSeconds: activeDeadlineSeconds,
|
||||
Parallelism: ¶llelism,
|
||||
Completions: &completions,
|
||||
BackoffLimit: &backoffLimit,
|
||||
ManualSelector: newBool(false),
|
||||
Template: v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
|
|
@ -418,9 +418,10 @@ var _ = framework.KubeDescribe("[sig-apps] Network Partition [Disruptive] [Slow]
|
|||
It("should create new pods when node is partitioned", func() {
|
||||
parallelism := int32(2)
|
||||
completions := int32(4)
|
||||
backoffLimit := int32(6) // default value
|
||||
|
||||
job := framework.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever,
|
||||
parallelism, completions, nil)
|
||||
parallelism, completions, nil, backoffLimit)
|
||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{framework.JobSelectorKey: job.Name}))
|
||||
|
|
|
@ -39,7 +39,7 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) {
|
|||
t.namespace = f.Namespace.Name
|
||||
|
||||
By("Creating a job")
|
||||
t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil)
|
||||
t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil, 6)
|
||||
job, err := framework.CreateJob(f.ClientSet, t.namespace, t.job)
|
||||
t.job = job
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
|
Loading…
Reference in New Issue