defer finalizing pods for cronjobs to server delete

pull/58/head
juanvallejo 2018-11-09 10:59:43 -05:00
parent 0ca67bde54
commit 9f1cc8571e
No known key found for this signature in database
GPG Key ID: 7D2C958002D6448D
2 changed files with 12 additions and 72 deletions

View File

@ -38,11 +38,9 @@ import (
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
@ -124,14 +122,14 @@ func (jm *CronJobController) syncAll() {
klog.V(4).Infof("Found %d groups", len(jobsBySj))
for _, sj := range sjs {
syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder)
}
}
// cleanupFinishedJobs cleanups finished jobs created by a CronJob
func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
sjc sjControlInterface, recorder record.EventRecorder) {
// If neither limits are active, there is no need to do anything.
if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil {
return
@ -153,7 +151,6 @@ func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr
removeOldestJobs(sj,
succesfulJobs,
jc,
pc,
*sj.Spec.SuccessfulJobsHistoryLimit,
recorder)
}
@ -162,7 +159,6 @@ func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr
removeOldestJobs(sj,
failedJobs,
jc,
pc,
*sj.Spec.FailedJobsHistoryLimit,
recorder)
}
@ -175,8 +171,7 @@ func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr
}
// removeOldestJobs removes the oldest jobs from a list of jobs
func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
pc podControlInterface, maxJobs int32, recorder record.EventRecorder) {
func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 {
return
@ -188,7 +183,7 @@ func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlI
sort.Sort(byJobStartTime(js))
for i := 0; i < numToDelete; i++ {
klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
deleteJob(sj, &js[i], jc, pc, recorder, "history limit reached")
deleteJob(sj, &js[i], jc, recorder)
}
}
@ -196,7 +191,7 @@ func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlI
// All known jobs created by "sj" should be included in "js".
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
childrenJobs := make(map[types.UID]bool)
@ -297,8 +292,6 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
}
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
for _, j := range sj.Status.Active {
// TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go
klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name)
@ -306,7 +299,7 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return
}
if !deleteJob(sj, job, jc, pc, recorder, "") {
if !deleteJob(sj, job, jc, recorder) {
return
}
}
@ -351,46 +344,10 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
}
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list
func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface,
pc podControlInterface, recorder record.EventRecorder, reason string) bool {
// TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go
func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
// scale job down to 0
if *job.Spec.Parallelism != 0 {
zero := int32(0)
var err error
job.Spec.Parallelism = &zero
job, err = jc.UpdateJob(job.Namespace, job)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedUpdate", "Update job: %v", err)
return false
}
}
// remove all pods...
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := pc.ListPods(job.Namespace, options)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedList", "List job-pods: %v", err)
return false
}
errList := []error{}
for _, pod := range podList.Items {
klog.V(2).Infof("CronJob controller is deleting Pod %v/%v", pod.Namespace, pod.Name)
if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil {
// ignores the error when the pod isn't found
if !errors.IsNotFound(err) {
errList = append(errList, err)
}
}
}
if len(errList) != 0 {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
return false
}
// ... the job itself...
// delete the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)

View File

@ -17,7 +17,6 @@ limitations under the License.
package cronjob
import (
"errors"
"strconv"
"strings"
"testing"
@ -286,10 +285,9 @@ func TestSyncOne_RunOrNot(t *testing.T) {
jc := &fakeJobControl{Job: job}
sjc := &fakeSJControl{}
pc := &fakePodControl{}
recorder := record.NewFakeRecorder(10)
syncOne(&sj, js, tc.now, jc, sjc, pc, recorder)
syncOne(&sj, js, tc.now, jc, sjc, recorder)
expectedCreates := 0
if tc.expectCreate {
expectedCreates = 1
@ -485,16 +483,6 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", F, F, F, F},
}, justBeforeTheHour(), &limitZero, &limitZero, 6},
"failed list pod err": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, F, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitZero, &limitZero, 0},
}
for name, tc := range testCases {
@ -563,14 +551,10 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
}
jc := &fakeJobControl{Job: job}
pc := &fakePodControl{}
sjc := &fakeSJControl{}
recorder := record.NewFakeRecorder(10)
if name == "failed list pod err" {
pc.Err = errors.New("fakePodControl err")
}
cleanupFinishedJobs(&sj, js, jc, sjc, pc, recorder)
cleanupFinishedJobs(&sj, js, jc, sjc, recorder)
// Check we have actually deleted the correct jobs
if len(jc.DeleteJobName) != len(jobsToDelete) {
@ -728,11 +712,10 @@ func TestSyncOne_Status(t *testing.T) {
jc := &fakeJobControl{}
sjc := &fakeSJControl{}
pc := &fakePodControl{}
recorder := record.NewFakeRecorder(10)
// Run the code
syncOne(&sj, jobs, tc.now, jc, sjc, pc, recorder)
syncOne(&sj, jobs, tc.now, jc, sjc, recorder)
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1