Remove Job also from .status.active for Replace strategy

pull/6/head
Maciej Szulik 2016-10-24 13:52:55 +02:00
parent 3c84164bdf
commit dc364b8ebb
3 changed files with 63 additions and 51 deletions

View File

@ -169,7 +169,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
return return
} }
if len(times) > 1 { if len(times) > 1 {
glog.Errorf("Multiple unmet start times for %s so only starting last one", nameForLog) glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
} }
scheduledTime := times[len(times)-1] scheduledTime := times[len(times)-1]
tooLate := false tooLate := false
@ -177,7 +177,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now) tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
} }
if tooLate { if tooLate {
glog.Errorf("Missed starting window for %s", nameForLog) glog.V(4).Infof("Missed starting window for %s", nameForLog)
// TODO: generate an event for a miss. Use a warning level event because it indicates a // TODO: generate an event for a miss. Use a warning level event because it indicates a
// problem with the controller (restart or long queue), and is not expected by user either. // problem with the controller (restart or long queue), and is not expected by user either.
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing // Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
@ -199,14 +199,15 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
// TODO: for Forbid, we could use the same name for every execution, as a lock. // TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time. // With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs. // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid.", nameForLog) glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return return
} }
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent { if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
for _, j := range sj.Status.Active { for i := range sj.Status.Active {
j := sj.Status.Active[i]
// TODO: this should be replaced with server side job deletion // TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go // currently this mimics JobReaper from pkg/kubectl/stop.go
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog) glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name) job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil { if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedGet", "Get job: %v", err) recorder.Eventf(&sj, api.EventTypeWarning, "FailedGet", "Get job: %v", err)
@ -242,11 +243,14 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList)) recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
return return
} }
// ... and the job itself // ... the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
glog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
return return
} }
// ... and its reference from active list
deleteFromActiveList(&sj, job.ObjectMeta.UID)
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name) recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name)
} }
} }
@ -261,6 +265,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return return
} }
glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ // // ------------------------------------------------------------------ //

View File

@ -79,7 +79,7 @@ func scheduledJob() batch.ScheduledJob {
Name: "myscheduledjob", Name: "myscheduledjob",
Namespace: "snazzycats", Namespace: "snazzycats",
UID: types.UID("1a2b3c"), UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v2alpha1/namespaces/snazzycats/jobs/myscheduledjob", SelfLink: "/apis/batch/v2alpha1/namespaces/snazzycats/scheduledjobs/myscheduledjob",
CreationTimestamp: unversioned.Time{Time: justBeforeTheHour()}, CreationTimestamp: unversioned.Time{Time: justBeforeTheHour()},
}, },
Spec: batch.ScheduledJobSpec{ Spec: batch.ScheduledJobSpec{
@ -140,7 +140,6 @@ var (
) )
func TestSyncOne_RunOrNot(t *testing.T) { func TestSyncOne_RunOrNot(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
// sj spec // sj spec
concurrencyPolicy batch.ConcurrencyPolicy concurrencyPolicy batch.ConcurrencyPolicy
@ -158,39 +157,39 @@ func TestSyncOne_RunOrNot(t *testing.T) {
// expectations // expectations
expectCreate bool expectCreate bool
expectDelete bool expectDelete bool
expectActive int
}{ }{
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, "never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F, 1},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, "never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F, 1},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, "never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F, 1},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F}, "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, 0},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F}, "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, 0},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), T, F}, "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), T, F, 1},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0},
"prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, "prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0},
"prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, "prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, 1},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, 1},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, 1},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F}, "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, 0},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F}, "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, 0},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F}, "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F, 1},
"still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, "still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1},
"still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, "still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1},
"still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, "still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1},
"still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F}, "still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, 2},
"still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F}, "still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, 1},
"still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T}, "still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, 1},
"still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F}, "still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, 1},
"still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F}, "still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, 1},
"still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F}, "still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F, 2},
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Log("Test case:", name)
sj := scheduledJob() sj := scheduledJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
sj.Spec.Suspend = &tc.suspend sj.Spec.Suspend = &tc.suspend
@ -209,7 +208,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()} sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
job, err = getJobFromTemplate(&sj, sj.Status.LastScheduleTime.Time) job, err = getJobFromTemplate(&sj, sj.Status.LastScheduleTime.Time)
if err != nil { if err != nil {
t.Fatalf("Unexpected error creating a job from template: %v", err) t.Fatalf("%s: nexpected error creating a job from template: %v", name, err)
} }
job.UID = "1234" job.UID = "1234"
job.Namespace = "" job.Namespace = ""
@ -220,7 +219,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
} else { } else {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()} sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
if tc.stillActive { if tc.stillActive {
t.Errorf("Test setup error: this case makes no sense.") t.Errorf("%s: test setup error: this case makes no sense", name)
} }
} }
@ -235,7 +234,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
expectedCreates = 1 expectedCreates = 1
} }
if len(jc.Jobs) != expectedCreates { if len(jc.Jobs) != expectedCreates {
t.Errorf("Expected %d job started, actually %v", expectedCreates, len(jc.Jobs)) t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs))
} }
expectedDeletes := 0 expectedDeletes := 0
@ -243,18 +242,25 @@ func TestSyncOne_RunOrNot(t *testing.T) {
expectedDeletes = 1 expectedDeletes = 1
} }
if len(jc.DeleteJobName) != expectedDeletes { if len(jc.DeleteJobName) != expectedDeletes {
t.Errorf("Expected %d job deleted, actually %v", expectedDeletes, len(jc.DeleteJobName)) t.Errorf("%s: expected %d job deleted, actually %v", name, expectedDeletes, len(jc.DeleteJobName))
} }
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1
expectedEvents := 0 expectedEvents := 0
if tc.expectCreate { if tc.expectCreate {
expectedEvents += 1 expectedEvents++
expectUpdates++
} }
if tc.expectDelete { if tc.expectDelete {
expectedEvents += 1 expectedEvents++
} }
if len(recorder.Events) != expectedEvents { if len(recorder.Events) != expectedEvents {
t.Errorf("Expected %d event, actually %v", expectedEvents, len(recorder.Events)) t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events))
}
if tc.expectActive != len(sjc.Updates[expectUpdates-1].Status.Active) {
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(sjc.Updates[expectUpdates-1].Status.Active))
} }
} }
} }
@ -331,7 +337,6 @@ func TestSyncOne_Status(t *testing.T) {
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Log("Test case:", name)
// Setup the test // Setup the test
sj := scheduledJob() sj := scheduledJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
@ -345,7 +350,7 @@ func TestSyncOne_Status(t *testing.T) {
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()} sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
} else { } else {
if tc.hasFinishedJob || tc.hasUnexpectedJob { if tc.hasFinishedJob || tc.hasUnexpectedJob {
t.Errorf("Test setup error: this case makes no sense.") t.Errorf("%s: test setup error: this case makes no sense", name)
} }
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()} sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
} }
@ -353,7 +358,7 @@ func TestSyncOne_Status(t *testing.T) {
if tc.hasFinishedJob { if tc.hasFinishedJob {
ref, err := getRef(&finishedJob) ref, err := getRef(&finishedJob)
if err != nil { if err != nil {
t.Errorf("Test setup error: failed to get job's ref: %v.", err) t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
} }
sj.Status.Active = []api.ObjectReference{*ref} sj.Status.Active = []api.ObjectReference{*ref}
jobs = append(jobs, finishedJob) jobs = append(jobs, finishedJob)
@ -389,23 +394,23 @@ func TestSyncOne_Status(t *testing.T) {
} }
if len(recorder.Events) != expectedEvents { if len(recorder.Events) != expectedEvents {
t.Errorf("Expected %d event, actually %v: %#v", expectedEvents, len(recorder.Events), recorder.Events) t.Errorf("%s: expected %d event, actually %v: %#v", name, expectedEvents, len(recorder.Events), recorder.Events)
} }
if expectUpdates != len(sjc.Updates) { if expectUpdates != len(sjc.Updates) {
t.Errorf("expected %d status updates, actually %d", expectUpdates, len(sjc.Updates)) t.Errorf("%s: expected %d status updates, actually %d", name, expectUpdates, len(sjc.Updates))
} }
if tc.hasFinishedJob && inActiveList(sjc.Updates[0], finishedJob.UID) { if tc.hasFinishedJob && inActiveList(sjc.Updates[0], finishedJob.UID) {
t.Errorf("Expected finished job removed from active list, actually active list = %#v.", sjc.Updates[0].Status.Active) t.Errorf("%s: expected finished job removed from active list, actually active list = %#v", name, sjc.Updates[0].Status.Active)
} }
if tc.hasUnexpectedJob && inActiveList(sjc.Updates[0], unexpectedJob.UID) { if tc.hasUnexpectedJob && inActiveList(sjc.Updates[0], unexpectedJob.UID) {
t.Errorf("Expected unexpected job not added to active list, actually active list = %#v.", sjc.Updates[0].Status.Active) t.Errorf("%s: expected unexpected job not added to active list, actually active list = %#v", name, sjc.Updates[0].Status.Active)
} }
if tc.expectCreate && !sjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) { if tc.expectCreate && !sjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) {
t.Errorf("Expected LastScheduleTime updated to %s, got %s.", topOfTheHour(), sjc.Updates[1].Status.LastScheduleTime) t.Errorf("%s: expected LastScheduleTime updated to %s, got %s", name, topOfTheHour(), sjc.Updates[1].Status.LastScheduleTime)
} }
} }
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduledjob package scheduledjob
import ( import (
"fmt"
"sync" "sync"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -127,6 +128,7 @@ func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job
if f.Err != nil { if f.Err != nil {
return nil, f.Err return nil, f.Err
} }
job.SelfLink = fmt.Sprintf("/api/batch/v1/namespaces/%s/jobs/%s", namespace, job.Name)
f.Jobs = append(f.Jobs, *job) f.Jobs = append(f.Jobs, *job)
job.UID = "test-uid" job.UID = "test-uid"
return job, nil return job, nil