mirror of https://github.com/k3s-io/k3s
commit
e07eb387bb
|
@ -241,7 +241,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||||
go daemon.NewDaemonSetsController(kubeClient).
|
go daemon.NewDaemonSetsController(kubeClient).
|
||||||
Run(s.ConcurrentDSCSyncs, util.NeverStop)
|
Run(s.ConcurrentDSCSyncs, util.NeverStop)
|
||||||
|
|
||||||
go job.NewJobManager(kubeClient).
|
go job.NewJobController(kubeClient).
|
||||||
Run(s.ConcurrentJobSyncs, util.NeverStop)
|
Run(s.ConcurrentJobSyncs, util.NeverStop)
|
||||||
|
|
||||||
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||||
|
|
|
@ -315,6 +315,21 @@ func ValidateJobSpec(spec *experimental.JobSpec) errs.ValidationErrorList {
|
||||||
func ValidateJobUpdate(oldJob, job *experimental.Job) errs.ValidationErrorList {
|
func ValidateJobUpdate(oldJob, job *experimental.Job) errs.ValidationErrorList {
|
||||||
allErrs := errs.ValidationErrorList{}
|
allErrs := errs.ValidationErrorList{}
|
||||||
allErrs = append(allErrs, apivalidation.ValidateObjectMetaUpdate(&oldJob.ObjectMeta, &job.ObjectMeta).Prefix("metadata")...)
|
allErrs = append(allErrs, apivalidation.ValidateObjectMetaUpdate(&oldJob.ObjectMeta, &job.ObjectMeta).Prefix("metadata")...)
|
||||||
allErrs = append(allErrs, ValidateJobSpec(&job.Spec).Prefix("spec")...)
|
allErrs = append(allErrs, ValidateJobSpecUpdate(oldJob.Spec, job.Spec).Prefix("spec")...)
|
||||||
|
return allErrs
|
||||||
|
}
|
||||||
|
|
||||||
|
func ValidateJobSpecUpdate(oldSpec, spec experimental.JobSpec) errs.ValidationErrorList {
|
||||||
|
allErrs := errs.ValidationErrorList{}
|
||||||
|
allErrs = append(allErrs, ValidateJobSpec(&spec)...)
|
||||||
|
if !api.Semantic.DeepEqual(oldSpec.Completions, spec.Completions) {
|
||||||
|
allErrs = append(allErrs, errs.NewFieldInvalid("completions", spec.Completions, "field is immutable"))
|
||||||
|
}
|
||||||
|
if !api.Semantic.DeepEqual(oldSpec.Selector, spec.Selector) {
|
||||||
|
allErrs = append(allErrs, errs.NewFieldInvalid("selector", spec.Selector, "field is immutable"))
|
||||||
|
}
|
||||||
|
if !api.Semantic.DeepEqual(oldSpec.Template, spec.Template) {
|
||||||
|
allErrs = append(allErrs, errs.NewFieldInvalid("template", "[omitted]", "field is immutable"))
|
||||||
|
}
|
||||||
return allErrs
|
return allErrs
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobManager struct {
|
type JobController struct {
|
||||||
kubeClient client.Interface
|
kubeClient client.Interface
|
||||||
podControl controller.PodControlInterface
|
podControl controller.PodControlInterface
|
||||||
|
|
||||||
|
@ -68,12 +68,12 @@ type JobManager struct {
|
||||||
queue *workqueue.Type
|
queue *workqueue.Type
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewJobManager(kubeClient client.Interface) *JobManager {
|
func NewJobController(kubeClient client.Interface) *JobController {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
eventBroadcaster.StartLogging(glog.Infof)
|
eventBroadcaster.StartLogging(glog.Infof)
|
||||||
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
||||||
|
|
||||||
jm := &JobManager{
|
jm := &JobController{
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
podControl: controller.RealPodControl{
|
podControl: controller.RealPodControl{
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
|
@ -134,7 +134,7 @@ func NewJobManager(kubeClient client.Interface) *JobManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the main goroutine responsible for watching and syncing jobs.
|
// Run the main goroutine responsible for watching and syncing jobs.
|
||||||
func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) {
|
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
go jm.jobController.Run(stopCh)
|
go jm.jobController.Run(stopCh)
|
||||||
go jm.podController.Run(stopCh)
|
go jm.podController.Run(stopCh)
|
||||||
|
@ -147,10 +147,10 @@ func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPodJob returns the job managing the given pod.
|
// getPodJob returns the job managing the given pod.
|
||||||
func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job {
|
func (jm *JobController) getPodJob(pod *api.Pod) *experimental.Job {
|
||||||
jobs, err := jm.jobStore.GetPodJobs(pod)
|
jobs, err := jm.jobStore.GetPodJobs(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("No jobs found for pod %v, job manager will avoid syncing", pod.Name)
|
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// TODO: add sorting and rethink the overlaping controllers, internally and with RCs
|
// TODO: add sorting and rethink the overlaping controllers, internally and with RCs
|
||||||
|
@ -158,10 +158,10 @@ func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job {
|
||||||
}
|
}
|
||||||
|
|
||||||
// When a pod is created, enqueue the controller that manages it and update it's expectations.
|
// When a pod is created, enqueue the controller that manages it and update it's expectations.
|
||||||
func (jm *JobManager) addPod(obj interface{}) {
|
func (jm *JobController) addPod(obj interface{}) {
|
||||||
pod := obj.(*api.Pod)
|
pod := obj.(*api.Pod)
|
||||||
if pod.DeletionTimestamp != nil {
|
if pod.DeletionTimestamp != nil {
|
||||||
// on a restart of the controller manager, it's possible a new pod shows up in a state that
|
// on a restart of the controller controller, it's possible a new pod shows up in a state that
|
||||||
// is already pending deletion. Prevent the pod from being a creation observation.
|
// is already pending deletion. Prevent the pod from being a creation observation.
|
||||||
jm.deletePod(pod)
|
jm.deletePod(pod)
|
||||||
return
|
return
|
||||||
|
@ -180,7 +180,7 @@ func (jm *JobManager) addPod(obj interface{}) {
|
||||||
// When a pod is updated, figure out what job/s manage it and wake them up.
|
// When a pod is updated, figure out what job/s manage it and wake them up.
|
||||||
// If the labels of the pod have changed we need to awaken both the old
|
// If the labels of the pod have changed we need to awaken both the old
|
||||||
// and new job. old and cur must be *api.Pod types.
|
// and new job. old and cur must be *api.Pod types.
|
||||||
func (jm *JobManager) updatePod(old, cur interface{}) {
|
func (jm *JobController) updatePod(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
if api.Semantic.DeepEqual(old, cur) {
|
||||||
// A periodic relist will send update events for all known pods.
|
// A periodic relist will send update events for all known pods.
|
||||||
return
|
return
|
||||||
|
@ -210,7 +210,7 @@ func (jm *JobManager) updatePod(old, cur interface{}) {
|
||||||
|
|
||||||
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
|
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
|
||||||
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
|
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
|
||||||
func (jm *JobManager) deletePod(obj interface{}) {
|
func (jm *JobController) deletePod(obj interface{}) {
|
||||||
pod, ok := obj.(*api.Pod)
|
pod, ok := obj.(*api.Pod)
|
||||||
|
|
||||||
// When a delete is dropped, the relist will notice a pod in the store not
|
// When a delete is dropped, the relist will notice a pod in the store not
|
||||||
|
@ -241,7 +241,7 @@ func (jm *JobManager) deletePod(obj interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// obj could be an *experimental.Job, or a DeletionFinalStateUnknown marker item.
|
// obj could be an *experimental.Job, or a DeletionFinalStateUnknown marker item.
|
||||||
func (jm *JobManager) enqueueController(obj interface{}) {
|
func (jm *JobController) enqueueController(obj interface{}) {
|
||||||
key, err := controller.KeyFunc(obj)
|
key, err := controller.KeyFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||||
|
@ -259,7 +259,7 @@ func (jm *JobManager) enqueueController(obj interface{}) {
|
||||||
|
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||||
func (jm *JobManager) worker() {
|
func (jm *JobController) worker() {
|
||||||
for {
|
for {
|
||||||
func() {
|
func() {
|
||||||
key, quit := jm.queue.Get()
|
key, quit := jm.queue.Get()
|
||||||
|
@ -278,7 +278,7 @@ func (jm *JobManager) worker() {
|
||||||
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
|
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
|
||||||
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
||||||
// concurrently with the same key.
|
// concurrently with the same key.
|
||||||
func (jm *JobManager) syncJob(key string) error {
|
func (jm *JobController) syncJob(key string) error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
|
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
|
||||||
|
@ -322,14 +322,11 @@ func (jm *JobManager) syncJob(key string) error {
|
||||||
|
|
||||||
activePods := controller.FilterActivePods(podList.Items)
|
activePods := controller.FilterActivePods(podList.Items)
|
||||||
active := len(activePods)
|
active := len(activePods)
|
||||||
successful, unsuccessful := getStatus(jobKey, job.Spec.Template.Spec.RestartPolicy, podList.Items)
|
successful, unsuccessful := getStatus(podList.Items)
|
||||||
if jobNeedsSync {
|
if jobNeedsSync {
|
||||||
active = jm.manageJob(activePods, successful, unsuccessful, &job)
|
active = jm.manageJob(activePods, successful, unsuccessful, &job)
|
||||||
}
|
}
|
||||||
completions := successful
|
completions := successful
|
||||||
if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever {
|
|
||||||
completions += unsuccessful
|
|
||||||
}
|
|
||||||
if completions == *job.Spec.Completions {
|
if completions == *job.Spec.Completions {
|
||||||
job.Status.Conditions = append(job.Status.Conditions, newCondition())
|
job.Status.Conditions = append(job.Status.Conditions, newCondition())
|
||||||
}
|
}
|
||||||
|
@ -357,15 +354,13 @@ func newCondition() experimental.JobCondition {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) (successful, unsuccessful int) {
|
func getStatus(pods []api.Pod) (successful, unsuccessful int) {
|
||||||
successful = filterPods(pods, api.PodSucceeded)
|
successful = filterPods(pods, api.PodSucceeded)
|
||||||
if restartPolicy == api.RestartPolicyNever {
|
|
||||||
unsuccessful = filterPods(pods, api.PodFailed)
|
unsuccessful = filterPods(pods, api.PodFailed)
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int {
|
func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int {
|
||||||
active := len(activePods)
|
active := len(activePods)
|
||||||
parallelism := *job.Spec.Parallelism
|
parallelism := *job.Spec.Parallelism
|
||||||
jobKey, err := controller.KeyFunc(job)
|
jobKey, err := controller.KeyFunc(job)
|
||||||
|
@ -403,10 +398,6 @@ func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful
|
||||||
} else if active < parallelism {
|
} else if active < parallelism {
|
||||||
// how many executions are left to run
|
// how many executions are left to run
|
||||||
diff := *job.Spec.Completions - successful
|
diff := *job.Spec.Completions - successful
|
||||||
// for RestartPolicyNever we need to count unsuccessful pods as well
|
|
||||||
if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever {
|
|
||||||
diff -= unsuccessful
|
|
||||||
}
|
|
||||||
// limit to parallelism and count active pods as well
|
// limit to parallelism and count active pods as well
|
||||||
if diff > parallelism {
|
if diff > parallelism {
|
||||||
diff = parallelism
|
diff = parallelism
|
||||||
|
@ -436,7 +427,7 @@ func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful
|
||||||
return active
|
return active
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jm *JobManager) updateJob(job *experimental.Job) error {
|
func (jm *JobController) updateJob(job *experimental.Job) error {
|
||||||
_, err := jm.kubeClient.Experimental().Jobs(job.Namespace).Update(job)
|
_, err := jm.kubeClient.Experimental().Jobs(job.Namespace).Update(job)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
|
@ -79,7 +79,7 @@ func (f *FakePodControl) clear() {
|
||||||
f.podSpec = []api.PodTemplateSpec{}
|
f.podSpec = []api.PodTemplateSpec{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *experimental.Job {
|
func newJob(parallelism, completions int) *experimental.Job {
|
||||||
return &experimental.Job{
|
return &experimental.Job{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "foobar",
|
Name: "foobar",
|
||||||
|
@ -96,7 +96,6 @@ func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *expe
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Spec: api.PodSpec{
|
Spec: api.PodSpec{
|
||||||
RestartPolicy: restartPolicy,
|
|
||||||
Containers: []api.Container{
|
Containers: []api.Container{
|
||||||
{Image: "foo/bar"},
|
{Image: "foo/bar"},
|
||||||
},
|
},
|
||||||
|
@ -137,7 +136,6 @@ func TestControllerSyncJob(t *testing.T) {
|
||||||
// job setup
|
// job setup
|
||||||
parallelism int
|
parallelism int
|
||||||
completions int
|
completions int
|
||||||
restartPolicy api.RestartPolicy
|
|
||||||
|
|
||||||
// pod setup
|
// pod setup
|
||||||
podControllerError error
|
podControllerError error
|
||||||
|
@ -154,62 +152,52 @@ func TestControllerSyncJob(t *testing.T) {
|
||||||
expectedComplete bool
|
expectedComplete bool
|
||||||
}{
|
}{
|
||||||
"job start": {
|
"job start": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
nil, 0, 0, 0,
|
nil, 0, 0, 0,
|
||||||
2, 0, 2, 0, 0, false,
|
2, 0, 2, 0, 0, false,
|
||||||
},
|
},
|
||||||
"correct # of pods": {
|
"correct # of pods": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
nil, 2, 0, 0,
|
nil, 2, 0, 0,
|
||||||
0, 0, 2, 0, 0, false,
|
0, 0, 2, 0, 0, false,
|
||||||
},
|
},
|
||||||
"too few active pods": {
|
"too few active pods": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
nil, 1, 1, 0,
|
nil, 1, 1, 0,
|
||||||
1, 0, 2, 1, 0, false,
|
1, 0, 2, 1, 0, false,
|
||||||
},
|
},
|
||||||
"too few active pods, with controller error": {
|
"too few active pods, with controller error": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
fmt.Errorf("Fake error"), 1, 1, 0,
|
fmt.Errorf("Fake error"), 1, 1, 0,
|
||||||
0, 0, 1, 1, 0, false,
|
0, 0, 1, 1, 0, false,
|
||||||
},
|
},
|
||||||
"too many active pods": {
|
"too many active pods": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
nil, 3, 0, 0,
|
nil, 3, 0, 0,
|
||||||
0, 1, 2, 0, 0, false,
|
0, 1, 2, 0, 0, false,
|
||||||
},
|
},
|
||||||
"too many active pods, with controller error": {
|
"too many active pods, with controller error": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
fmt.Errorf("Fake error"), 3, 0, 0,
|
fmt.Errorf("Fake error"), 3, 0, 0,
|
||||||
0, 0, 3, 0, 0, false,
|
0, 0, 3, 0, 0, false,
|
||||||
},
|
},
|
||||||
"failed pod and OnFailure restart policy": {
|
"failed pod": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
nil, 1, 1, 1,
|
|
||||||
1, 0, 2, 1, 0, false,
|
|
||||||
},
|
|
||||||
"failed pod and Never restart policy": {
|
|
||||||
2, 5, api.RestartPolicyNever,
|
|
||||||
nil, 1, 1, 1,
|
nil, 1, 1, 1,
|
||||||
1, 0, 2, 1, 1, false,
|
1, 0, 2, 1, 1, false,
|
||||||
},
|
},
|
||||||
"job finish and OnFailure restart policy": {
|
"job finish": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
nil, 0, 5, 0,
|
nil, 0, 5, 0,
|
||||||
0, 0, 0, 5, 0, true,
|
0, 0, 0, 5, 0, true,
|
||||||
},
|
},
|
||||||
"job finish and Never restart policy": {
|
|
||||||
2, 5, api.RestartPolicyNever,
|
|
||||||
nil, 0, 2, 3,
|
|
||||||
0, 0, 0, 2, 3, true,
|
|
||||||
},
|
|
||||||
"more active pods than completions": {
|
"more active pods than completions": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
nil, 10, 0, 0,
|
nil, 10, 0, 0,
|
||||||
0, 8, 2, 0, 0, false,
|
0, 8, 2, 0, 0, false,
|
||||||
},
|
},
|
||||||
"status change": {
|
"status change": {
|
||||||
2, 5, api.RestartPolicyOnFailure,
|
2, 5,
|
||||||
nil, 2, 2, 0,
|
nil, 2, 2, 0,
|
||||||
0, 0, 2, 2, 0, false,
|
0, 0, 2, 2, 0, false,
|
||||||
},
|
},
|
||||||
|
@ -218,7 +206,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||||
for name, tc := range testCases {
|
for name, tc := range testCases {
|
||||||
// job manager setup
|
// job manager setup
|
||||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||||
manager := NewJobManager(client)
|
manager := NewJobController(client)
|
||||||
fakePodControl := FakePodControl{err: tc.podControllerError}
|
fakePodControl := FakePodControl{err: tc.podControllerError}
|
||||||
manager.podControl = &fakePodControl
|
manager.podControl = &fakePodControl
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
|
@ -229,7 +217,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// job & pods setup
|
// job & pods setup
|
||||||
job := newJob(tc.parallelism, tc.completions, tc.restartPolicy)
|
job := newJob(tc.parallelism, tc.completions)
|
||||||
manager.jobStore.Store.Add(job)
|
manager.jobStore.Store.Add(job)
|
||||||
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
|
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
|
||||||
manager.podStore.Store.Add(&pod)
|
manager.podStore.Store.Add(&pod)
|
||||||
|
@ -282,12 +270,12 @@ func TestControllerSyncJob(t *testing.T) {
|
||||||
|
|
||||||
func TestSyncJobDeleted(t *testing.T) {
|
func TestSyncJobDeleted(t *testing.T) {
|
||||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||||
manager := NewJobManager(client)
|
manager := NewJobController(client)
|
||||||
fakePodControl := FakePodControl{}
|
fakePodControl := FakePodControl{}
|
||||||
manager.podControl = &fakePodControl
|
manager.podControl = &fakePodControl
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
manager.updateHandler = func(job *experimental.Job) error { return nil }
|
manager.updateHandler = func(job *experimental.Job) error { return nil }
|
||||||
job := newJob(2, 2, api.RestartPolicyOnFailure)
|
job := newJob(2, 2)
|
||||||
err := manager.syncJob(getKey(job, t))
|
err := manager.syncJob(getKey(job, t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error when syncing jobs %v", err)
|
t.Errorf("Unexpected error when syncing jobs %v", err)
|
||||||
|
@ -302,12 +290,12 @@ func TestSyncJobDeleted(t *testing.T) {
|
||||||
|
|
||||||
func TestSyncJobUpdateRequeue(t *testing.T) {
|
func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||||
manager := NewJobManager(client)
|
manager := NewJobController(client)
|
||||||
fakePodControl := FakePodControl{}
|
fakePodControl := FakePodControl{}
|
||||||
manager.podControl = &fakePodControl
|
manager.podControl = &fakePodControl
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
manager.updateHandler = func(job *experimental.Job) error { return fmt.Errorf("Fake error") }
|
manager.updateHandler = func(job *experimental.Job) error { return fmt.Errorf("Fake error") }
|
||||||
job := newJob(2, 2, api.RestartPolicyOnFailure)
|
job := newJob(2, 2)
|
||||||
manager.jobStore.Store.Add(job)
|
manager.jobStore.Store.Add(job)
|
||||||
err := manager.syncJob(getKey(job, t))
|
err := manager.syncJob(getKey(job, t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -332,7 +320,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||||
|
|
||||||
func TestJobPodLookup(t *testing.T) {
|
func TestJobPodLookup(t *testing.T) {
|
||||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||||
manager := NewJobManager(client)
|
manager := NewJobController(client)
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
job *experimental.Job
|
job *experimental.Job
|
||||||
|
@ -412,13 +400,13 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
|
||||||
// and checking expectations.
|
// and checking expectations.
|
||||||
func TestSyncJobExpectations(t *testing.T) {
|
func TestSyncJobExpectations(t *testing.T) {
|
||||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||||
manager := NewJobManager(client)
|
manager := NewJobController(client)
|
||||||
fakePodControl := FakePodControl{}
|
fakePodControl := FakePodControl{}
|
||||||
manager.podControl = &fakePodControl
|
manager.podControl = &fakePodControl
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
manager.updateHandler = func(job *experimental.Job) error { return nil }
|
manager.updateHandler = func(job *experimental.Job) error { return nil }
|
||||||
|
|
||||||
job := newJob(2, 2, api.RestartPolicyOnFailure)
|
job := newJob(2, 2)
|
||||||
manager.jobStore.Store.Add(job)
|
manager.jobStore.Store.Add(job)
|
||||||
pods := newPodList(2, api.PodPending, job)
|
pods := newPodList(2, api.PodPending, job)
|
||||||
manager.podStore.Store.Add(&pods[0])
|
manager.podStore.Store.Add(&pods[0])
|
||||||
|
@ -449,7 +437,7 @@ func TestWatchJobs(t *testing.T) {
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
client := &testclient.Fake{}
|
client := &testclient.Fake{}
|
||||||
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
|
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
|
||||||
manager := NewJobManager(client)
|
manager := NewJobController(client)
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
|
|
||||||
var testJob experimental.Job
|
var testJob experimental.Job
|
||||||
|
@ -512,11 +500,11 @@ func TestWatchPods(t *testing.T) {
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
client := &testclient.Fake{}
|
client := &testclient.Fake{}
|
||||||
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
|
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
|
||||||
manager := NewJobManager(client)
|
manager := NewJobController(client)
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
|
|
||||||
// Put one job and one pod into the store
|
// Put one job and one pod into the store
|
||||||
testJob := newJob(2, 2, api.RestartPolicyOnFailure)
|
testJob := newJob(2, 2)
|
||||||
manager.jobStore.Store.Add(testJob)
|
manager.jobStore.Store.Add(testJob)
|
||||||
received := make(chan string)
|
received := make(chan string)
|
||||||
// The pod update sent through the fakeWatcher should figure out the managing job and
|
// The pod update sent through the fakeWatcher should figure out the managing job and
|
|
@ -89,14 +89,14 @@ func TestCreate(t *testing.T) {
|
||||||
func TestUpdate(t *testing.T) {
|
func TestUpdate(t *testing.T) {
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
test := registrytest.New(t, fakeClient, storage.Etcd)
|
test := registrytest.New(t, fakeClient, storage.Etcd)
|
||||||
completions := 2
|
two := 2
|
||||||
test.TestUpdate(
|
test.TestUpdate(
|
||||||
// valid
|
// valid
|
||||||
validNewJob(),
|
validNewJob(),
|
||||||
// updateFunc
|
// updateFunc
|
||||||
func(obj runtime.Object) runtime.Object {
|
func(obj runtime.Object) runtime.Object {
|
||||||
object := obj.(*experimental.Job)
|
object := obj.(*experimental.Job)
|
||||||
object.Spec.Completions = &completions
|
object.Spec.Parallelism = &two
|
||||||
return object
|
return object
|
||||||
},
|
},
|
||||||
// invalid updateFunc
|
// invalid updateFunc
|
||||||
|
@ -105,6 +105,11 @@ func TestUpdate(t *testing.T) {
|
||||||
object.Spec.Selector = map[string]string{}
|
object.Spec.Selector = map[string]string{}
|
||||||
return object
|
return object
|
||||||
},
|
},
|
||||||
|
func(obj runtime.Object) runtime.Object {
|
||||||
|
object := obj.(*experimental.Job)
|
||||||
|
object.Spec.Completions = &two
|
||||||
|
return object
|
||||||
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue