From 9d1838fb647b2d9fcf35f1f20bf2dd9c4b35239f Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Thu, 17 Sep 2015 15:58:04 -0700 Subject: [PATCH 1/3] only allow updates of parrallelism in jobspec --- pkg/apis/experimental/validation/validation.go | 17 ++++++++++++++++- pkg/registry/job/etcd/etcd_test.go | 9 +++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/pkg/apis/experimental/validation/validation.go b/pkg/apis/experimental/validation/validation.go index 00651eb639..c85b7ebdd4 100644 --- a/pkg/apis/experimental/validation/validation.go +++ b/pkg/apis/experimental/validation/validation.go @@ -315,6 +315,21 @@ func ValidateJobSpec(spec *experimental.JobSpec) errs.ValidationErrorList { func ValidateJobUpdate(oldJob, job *experimental.Job) errs.ValidationErrorList { allErrs := errs.ValidationErrorList{} allErrs = append(allErrs, apivalidation.ValidateObjectMetaUpdate(&oldJob.ObjectMeta, &job.ObjectMeta).Prefix("metadata")...) - allErrs = append(allErrs, ValidateJobSpec(&job.Spec).Prefix("spec")...) + allErrs = append(allErrs, ValidateJobSpecUpdate(oldJob.Spec, job.Spec).Prefix("spec")...) + return allErrs +} + +func ValidateJobSpecUpdate(oldSpec, spec experimental.JobSpec) errs.ValidationErrorList { + allErrs := errs.ValidationErrorList{} + allErrs = append(allErrs, ValidateJobSpec(&spec)...) + if !api.Semantic.DeepEqual(oldSpec.Completions, spec.Completions) { + allErrs = append(allErrs, errs.NewFieldInvalid("completions", spec.Completions, "field is immutable")) + } + if !api.Semantic.DeepEqual(oldSpec.Selector, spec.Selector) { + allErrs = append(allErrs, errs.NewFieldInvalid("selector", spec.Selector, "field is immutable")) + } + if !api.Semantic.DeepEqual(oldSpec.Template, spec.Template) { + allErrs = append(allErrs, errs.NewFieldInvalid("template", "[omitted]", "field is immutable")) + } return allErrs } diff --git a/pkg/registry/job/etcd/etcd_test.go b/pkg/registry/job/etcd/etcd_test.go index 2f04c1fa6b..8f3587902c 100644 --- a/pkg/registry/job/etcd/etcd_test.go +++ b/pkg/registry/job/etcd/etcd_test.go @@ -89,14 +89,14 @@ func TestCreate(t *testing.T) { func TestUpdate(t *testing.T) { storage, fakeClient := newStorage(t) test := registrytest.New(t, fakeClient, storage.Etcd) - completions := 2 + two := 2 test.TestUpdate( // valid validNewJob(), // updateFunc func(obj runtime.Object) runtime.Object { object := obj.(*experimental.Job) - object.Spec.Completions = &completions + object.Spec.Parallelism = &two return object }, // invalid updateFunc @@ -105,6 +105,11 @@ func TestUpdate(t *testing.T) { object.Spec.Selector = map[string]string{} return object }, + func(obj runtime.Object) runtime.Object { + object := obj.(*experimental.Job) + object.Spec.Completions = &two + return object + }, ) } From 22072af90dec352b7c69115397291f25baa8e296 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Thu, 17 Sep 2015 19:16:04 -0700 Subject: [PATCH 2/3] rename jobmanager to jobcontroller --- .../app/controllermanager.go | 2 +- .../job/{job_controller.go => controller.go} | 30 +++++++++---------- ..._controller_test.go => controller_test.go} | 14 ++++----- 3 files changed, 23 insertions(+), 23 deletions(-) rename pkg/controller/job/{job_controller.go => controller.go} (93%) rename pkg/controller/job/{job_controller_test.go => controller_test.go} (98%) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 59458cb0e3..77c6fadfc6 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -241,7 +241,7 @@ func (s *CMServer) Run(_ []string) error { go daemon.NewDaemonSetsController(kubeClient). Run(s.ConcurrentDSCSyncs, util.NeverStop) - go job.NewJobManager(kubeClient). + go job.NewJobController(kubeClient). Run(s.ConcurrentJobSyncs, util.NeverStop) cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/controller.go similarity index 93% rename from pkg/controller/job/job_controller.go rename to pkg/controller/job/controller.go index 1126af2668..0ec8023f9e 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/controller.go @@ -40,7 +40,7 @@ import ( "k8s.io/kubernetes/pkg/watch" ) -type JobManager struct { +type JobController struct { kubeClient client.Interface podControl controller.PodControlInterface @@ -68,12 +68,12 @@ type JobManager struct { queue *workqueue.Type } -func NewJobManager(kubeClient client.Interface) *JobManager { +func NewJobController(kubeClient client.Interface) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) - jm := &JobManager{ + jm := &JobController{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, @@ -134,7 +134,7 @@ func NewJobManager(kubeClient client.Interface) *JobManager { } // Run the main goroutine responsible for watching and syncing jobs. -func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) { +func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer util.HandleCrash() go jm.jobController.Run(stopCh) go jm.podController.Run(stopCh) @@ -147,10 +147,10 @@ func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) { } // getPodJob returns the job managing the given pod. -func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job { +func (jm *JobController) getPodJob(pod *api.Pod) *experimental.Job { jobs, err := jm.jobStore.GetPodJobs(pod) if err != nil { - glog.V(4).Infof("No jobs found for pod %v, job manager will avoid syncing", pod.Name) + glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name) return nil } // TODO: add sorting and rethink the overlaping controllers, internally and with RCs @@ -158,10 +158,10 @@ func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job { } // When a pod is created, enqueue the controller that manages it and update it's expectations. -func (jm *JobManager) addPod(obj interface{}) { +func (jm *JobController) addPod(obj interface{}) { pod := obj.(*api.Pod) if pod.DeletionTimestamp != nil { - // on a restart of the controller manager, it's possible a new pod shows up in a state that + // on a restart of the controller controller, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. jm.deletePod(pod) return @@ -180,7 +180,7 @@ func (jm *JobManager) addPod(obj interface{}) { // When a pod is updated, figure out what job/s manage it and wake them up. // If the labels of the pod have changed we need to awaken both the old // and new job. old and cur must be *api.Pod types. -func (jm *JobManager) updatePod(old, cur interface{}) { +func (jm *JobController) updatePod(old, cur interface{}) { if api.Semantic.DeepEqual(old, cur) { // A periodic relist will send update events for all known pods. return @@ -210,7 +210,7 @@ func (jm *JobManager) updatePod(old, cur interface{}) { // When a pod is deleted, enqueue the job that manages the pod and update its expectations. // obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. -func (jm *JobManager) deletePod(obj interface{}) { +func (jm *JobController) deletePod(obj interface{}) { pod, ok := obj.(*api.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -241,7 +241,7 @@ func (jm *JobManager) deletePod(obj interface{}) { } // obj could be an *experimental.Job, or a DeletionFinalStateUnknown marker item. -func (jm *JobManager) enqueueController(obj interface{}) { +func (jm *JobController) enqueueController(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { glog.Errorf("Couldn't get key for object %+v: %v", obj, err) @@ -259,7 +259,7 @@ func (jm *JobManager) enqueueController(obj interface{}) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (jm *JobManager) worker() { +func (jm *JobController) worker() { for { func() { key, quit := jm.queue.Get() @@ -278,7 +278,7 @@ func (jm *JobManager) worker() { // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. -func (jm *JobManager) syncJob(key string) error { +func (jm *JobController) syncJob(key string) error { startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) @@ -365,7 +365,7 @@ func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) ( return } -func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int { +func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int { active := len(activePods) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) @@ -436,7 +436,7 @@ func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful return active } -func (jm *JobManager) updateJob(job *experimental.Job) error { +func (jm *JobController) updateJob(job *experimental.Job) error { _, err := jm.kubeClient.Experimental().Jobs(job.Namespace).Update(job) return err } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/controller_test.go similarity index 98% rename from pkg/controller/job/job_controller_test.go rename to pkg/controller/job/controller_test.go index 1a608df657..d6bcda20dc 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -218,7 +218,7 @@ func TestControllerSyncJob(t *testing.T) { for name, tc := range testCases { // job manager setup client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) fakePodControl := FakePodControl{err: tc.podControllerError} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -282,7 +282,7 @@ func TestControllerSyncJob(t *testing.T) { func TestSyncJobDeleted(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) fakePodControl := FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -302,7 +302,7 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) fakePodControl := FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -332,7 +332,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { func TestJobPodLookup(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) manager.podStoreSynced = alwaysReady testCases := []struct { job *experimental.Job @@ -412,7 +412,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { // and checking expectations. func TestSyncJobExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) fakePodControl := FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -449,7 +449,7 @@ func TestWatchJobs(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobManager(client) + manager := NewJobController(client) manager.podStoreSynced = alwaysReady var testJob experimental.Job @@ -512,7 +512,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobManager(client) + manager := NewJobController(client) manager.podStoreSynced = alwaysReady // Put one job and one pod into the store From e29e606792c69d6aa03f31aee7b5669799dead19 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Thu, 17 Sep 2015 20:13:00 -0700 Subject: [PATCH 3/3] make JobController RestartPolicy independent --- pkg/controller/job/controller.go | 15 ++------ pkg/controller/job/controller_test.go | 52 +++++++++++---------------- 2 files changed, 23 insertions(+), 44 deletions(-) diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 0ec8023f9e..17e6536743 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -322,14 +322,11 @@ func (jm *JobController) syncJob(key string) error { activePods := controller.FilterActivePods(podList.Items) active := len(activePods) - successful, unsuccessful := getStatus(jobKey, job.Spec.Template.Spec.RestartPolicy, podList.Items) + successful, unsuccessful := getStatus(podList.Items) if jobNeedsSync { active = jm.manageJob(activePods, successful, unsuccessful, &job) } completions := successful - if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever { - completions += unsuccessful - } if completions == *job.Spec.Completions { job.Status.Conditions = append(job.Status.Conditions, newCondition()) } @@ -357,11 +354,9 @@ func newCondition() experimental.JobCondition { } } -func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) (successful, unsuccessful int) { +func getStatus(pods []api.Pod) (successful, unsuccessful int) { successful = filterPods(pods, api.PodSucceeded) - if restartPolicy == api.RestartPolicyNever { - unsuccessful = filterPods(pods, api.PodFailed) - } + unsuccessful = filterPods(pods, api.PodFailed) return } @@ -403,10 +398,6 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf } else if active < parallelism { // how many executions are left to run diff := *job.Spec.Completions - successful - // for RestartPolicyNever we need to count unsuccessful pods as well - if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever { - diff -= unsuccessful - } // limit to parallelism and count active pods as well if diff > parallelism { diff = parallelism diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index d6bcda20dc..5a5d36595a 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -79,7 +79,7 @@ func (f *FakePodControl) clear() { f.podSpec = []api.PodTemplateSpec{} } -func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *experimental.Job { +func newJob(parallelism, completions int) *experimental.Job { return &experimental.Job{ ObjectMeta: api.ObjectMeta{ Name: "foobar", @@ -96,7 +96,6 @@ func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *expe }, }, Spec: api.PodSpec{ - RestartPolicy: restartPolicy, Containers: []api.Container{ {Image: "foo/bar"}, }, @@ -135,9 +134,8 @@ func newPodList(count int, status api.PodPhase, job *experimental.Job) []api.Pod func TestControllerSyncJob(t *testing.T) { testCases := map[string]struct { // job setup - parallelism int - completions int - restartPolicy api.RestartPolicy + parallelism int + completions int // pod setup podControllerError error @@ -154,62 +152,52 @@ func TestControllerSyncJob(t *testing.T) { expectedComplete bool }{ "job start": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 0, 0, 0, 2, 0, 2, 0, 0, false, }, "correct # of pods": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 2, 0, 0, 0, 0, 2, 0, 0, false, }, "too few active pods": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 1, 1, 0, 1, 0, 2, 1, 0, false, }, "too few active pods, with controller error": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, fmt.Errorf("Fake error"), 1, 1, 0, 0, 0, 1, 1, 0, false, }, "too many active pods": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 3, 0, 0, 0, 1, 2, 0, 0, false, }, "too many active pods, with controller error": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, fmt.Errorf("Fake error"), 3, 0, 0, 0, 0, 3, 0, 0, false, }, - "failed pod and OnFailure restart policy": { - 2, 5, api.RestartPolicyOnFailure, - nil, 1, 1, 1, - 1, 0, 2, 1, 0, false, - }, - "failed pod and Never restart policy": { - 2, 5, api.RestartPolicyNever, + "failed pod": { + 2, 5, nil, 1, 1, 1, 1, 0, 2, 1, 1, false, }, - "job finish and OnFailure restart policy": { - 2, 5, api.RestartPolicyOnFailure, + "job finish": { + 2, 5, nil, 0, 5, 0, 0, 0, 0, 5, 0, true, }, - "job finish and Never restart policy": { - 2, 5, api.RestartPolicyNever, - nil, 0, 2, 3, - 0, 0, 0, 2, 3, true, - }, "more active pods than completions": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 10, 0, 0, 0, 8, 2, 0, 0, false, }, "status change": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 2, 2, 0, 0, 0, 2, 2, 0, false, }, @@ -229,7 +217,7 @@ func TestControllerSyncJob(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.restartPolicy) + job := newJob(tc.parallelism, tc.completions) manager.jobStore.Store.Add(job) for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { manager.podStore.Store.Add(&pod) @@ -287,7 +275,7 @@ func TestSyncJobDeleted(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.updateHandler = func(job *experimental.Job) error { return nil } - job := newJob(2, 2, api.RestartPolicyOnFailure) + job := newJob(2, 2) err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) @@ -307,7 +295,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.updateHandler = func(job *experimental.Job) error { return fmt.Errorf("Fake error") } - job := newJob(2, 2, api.RestartPolicyOnFailure) + job := newJob(2, 2) manager.jobStore.Store.Add(job) err := manager.syncJob(getKey(job, t)) if err != nil { @@ -418,7 +406,7 @@ func TestSyncJobExpectations(t *testing.T) { manager.podStoreSynced = alwaysReady manager.updateHandler = func(job *experimental.Job) error { return nil } - job := newJob(2, 2, api.RestartPolicyOnFailure) + job := newJob(2, 2) manager.jobStore.Store.Add(job) pods := newPodList(2, api.PodPending, job) manager.podStore.Store.Add(&pods[0]) @@ -516,7 +504,7 @@ func TestWatchPods(t *testing.T) { manager.podStoreSynced = alwaysReady // Put one job and one pod into the store - testJob := newJob(2, 2, api.RestartPolicyOnFailure) + testJob := newJob(2, 2) manager.jobStore.Store.Add(testJob) received := make(chan string) // The pod update sent through the fakeWatcher should figure out the managing job and