diff --git a/pkg/controller/cronjob/cronjob_controller.go b/pkg/controller/cronjob/cronjob_controller.go index e7c0498bf4..8e797b4171 100644 --- a/pkg/controller/cronjob/cronjob_controller.go +++ b/pkg/controller/cronjob/cronjob_controller.go @@ -38,11 +38,9 @@ import ( batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" @@ -124,14 +122,14 @@ func (jm *CronJobController) syncAll() { klog.V(4).Infof("Found %d groups", len(jobsBySj)) for _, sj := range sjs { - syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) - cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) + syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) + cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder) } } // cleanupFinishedJobs cleanups finished jobs created by a CronJob func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, - sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { + sjc sjControlInterface, recorder record.EventRecorder) { // If neither limits are active, there is no need to do anything. if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil { return @@ -153,7 +151,6 @@ func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr removeOldestJobs(sj, succesfulJobs, jc, - pc, *sj.Spec.SuccessfulJobsHistoryLimit, recorder) } @@ -162,7 +159,6 @@ func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr removeOldestJobs(sj, failedJobs, jc, - pc, *sj.Spec.FailedJobsHistoryLimit, recorder) } @@ -175,8 +171,7 @@ func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr } // removeOldestJobs removes the oldest jobs from a list of jobs -func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, - pc podControlInterface, maxJobs int32, recorder record.EventRecorder) { +func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) { numToDelete := len(js) - int(maxJobs) if numToDelete <= 0 { return @@ -188,7 +183,7 @@ func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlI sort.Sort(byJobStartTime(js)) for i := 0; i < numToDelete; i++ { klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog) - deleteJob(sj, &js[i], jc, pc, recorder, "history limit reached") + deleteJob(sj, &js[i], jc, recorder) } } @@ -196,7 +191,7 @@ func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlI // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing. -func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { +func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) { nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) childrenJobs := make(map[types.UID]bool) @@ -297,8 +292,6 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo } if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { for _, j := range sj.Status.Active { - // TODO: this should be replaced with server side job deletion - // currently this mimics JobReaper from pkg/kubectl/stop.go klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) job, err := jc.GetJob(j.Namespace, j.Name) @@ -306,7 +299,7 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) return } - if !deleteJob(sj, job, jc, pc, recorder, "") { + if !deleteJob(sj, job, jc, recorder) { return } } @@ -351,46 +344,10 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo } // deleteJob reaps a job, deleting the job, the pods and the reference in the active list -func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, - pc podControlInterface, recorder record.EventRecorder, reason string) bool { - // TODO: this should be replaced with server side job deletion - // currently this mimics JobReaper from pkg/kubectl/stop.go +func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) - // scale job down to 0 - if *job.Spec.Parallelism != 0 { - zero := int32(0) - var err error - job.Spec.Parallelism = &zero - job, err = jc.UpdateJob(job.Namespace, job) - if err != nil { - recorder.Eventf(sj, v1.EventTypeWarning, "FailedUpdate", "Update job: %v", err) - return false - } - } - // remove all pods... - selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector) - options := metav1.ListOptions{LabelSelector: selector.String()} - podList, err := pc.ListPods(job.Namespace, options) - if err != nil { - recorder.Eventf(sj, v1.EventTypeWarning, "FailedList", "List job-pods: %v", err) - return false - } - errList := []error{} - for _, pod := range podList.Items { - klog.V(2).Infof("CronJob controller is deleting Pod %v/%v", pod.Namespace, pod.Name) - if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil { - // ignores the error when the pod isn't found - if !errors.IsNotFound(err) { - errList = append(errList, err) - } - } - } - if len(errList) != 0 { - recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList)) - return false - } - // ... the job itself... + // delete the job itself... if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) diff --git a/pkg/controller/cronjob/cronjob_controller_test.go b/pkg/controller/cronjob/cronjob_controller_test.go index 63cff8ef04..1ca289d441 100644 --- a/pkg/controller/cronjob/cronjob_controller_test.go +++ b/pkg/controller/cronjob/cronjob_controller_test.go @@ -17,7 +17,6 @@ limitations under the License. package cronjob import ( - "errors" "strconv" "strings" "testing" @@ -286,10 +285,9 @@ func TestSyncOne_RunOrNot(t *testing.T) { jc := &fakeJobControl{Job: job} sjc := &fakeSJControl{} - pc := &fakePodControl{} recorder := record.NewFakeRecorder(10) - syncOne(&sj, js, tc.now, jc, sjc, pc, recorder) + syncOne(&sj, js, tc.now, jc, sjc, recorder) expectedCreates := 0 if tc.expectCreate { expectedCreates = 1 @@ -485,16 +483,6 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) { {"2016-05-19T08:00:00Z", F, F, F, F}, {"2016-05-19T09:00:00Z", F, F, F, F}, }, justBeforeTheHour(), &limitZero, &limitZero, 6}, - - "failed list pod err": { - []CleanupJobSpec{ - {"2016-05-19T04:00:00Z", T, F, F, F}, - {"2016-05-19T05:00:00Z", T, F, F, F}, - {"2016-05-19T06:00:00Z", T, T, F, F}, - {"2016-05-19T07:00:00Z", T, T, F, F}, - {"2016-05-19T08:00:00Z", T, F, F, F}, - {"2016-05-19T09:00:00Z", T, F, F, F}, - }, justBeforeTheHour(), &limitZero, &limitZero, 0}, } for name, tc := range testCases { @@ -563,14 +551,10 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) { } jc := &fakeJobControl{Job: job} - pc := &fakePodControl{} sjc := &fakeSJControl{} recorder := record.NewFakeRecorder(10) - if name == "failed list pod err" { - pc.Err = errors.New("fakePodControl err") - } - cleanupFinishedJobs(&sj, js, jc, sjc, pc, recorder) + cleanupFinishedJobs(&sj, js, jc, sjc, recorder) // Check we have actually deleted the correct jobs if len(jc.DeleteJobName) != len(jobsToDelete) { @@ -728,11 +712,10 @@ func TestSyncOne_Status(t *testing.T) { jc := &fakeJobControl{} sjc := &fakeSJControl{} - pc := &fakePodControl{} recorder := record.NewFakeRecorder(10) // Run the code - syncOne(&sj, jobs, tc.now, jc, sjc, pc, recorder) + syncOne(&sj, jobs, tc.now, jc, sjc, recorder) // Status update happens once when ranging through job list, and another one if create jobs. expectUpdates := 1