mirror of https://github.com/k3s-io/k3s
Merge pull request #76228 from draveness/feature/refactor-framework-job-util
refactor: cleanup e2e framework job utilsk3s-v1.15.3
@ -61,6 +61,7 @@ go_library(
@ -34,6 +34,7 @@ import (
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -210,7 +211,7 @@ var _ = SIGDescribe("CronJob", func() {
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
By("Ensuring job was deleted")
_, err = framework.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
_, err = jobutil.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -39,12 +40,12 @@ var _ = SIGDescribe("Job", func() {
// Simplest case: all pods succeed promptly
It("should run a job to completion when tasks succeed", func() {
By("Creating a job")
job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job := jobutil.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred(), "failed to create job in namespace: %s", f.Namespace.Name)
By("Ensuring job reaches completions")
err = framework.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
err = jobutil.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
Expect(err).NotTo(HaveOccurred(), "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -58,12 +59,12 @@ var _ = SIGDescribe("Job", func() {
// up to 5 minutes between restarts, making test timeouts
// due to successive failures too likely with a reasonable
// test timeout.
job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job := jobutil.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit)
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred(), "failed to create job in namespace: %s", f.Namespace.Name)
By("Ensuring job reaches completions")
err = framework.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
err = jobutil.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
Expect(err).NotTo(HaveOccurred(), "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -79,61 +80,61 @@ var _ = SIGDescribe("Job", func() {
// With the introduction of backoff limit and high failure rate this
// is hitting its timeout, the 3 is a reasonable that should make this
// test less flaky, for now.
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, 3, nil, 999)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job := jobutil.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, 3, nil, 999)
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred(), "failed to create job in namespace: %s", f.Namespace.Name)
By("Ensuring job reaches completions")
err = framework.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions)
err = jobutil.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions)
Expect(err).NotTo(HaveOccurred(), "failed to ensure job completion in namespace: %s", f.Namespace.Name)
It("should exceed active deadline", func() {
By("Creating a job")
var activeDeadlineSeconds int64 = 1
job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job := jobutil.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred(), "failed to create job in namespace: %s", f.Namespace.Name)
By("Ensuring job past active deadline")
err = framework.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+10)*time.Second, "DeadlineExceeded")
err = jobutil.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+10)*time.Second, "DeadlineExceeded")
Expect(err).NotTo(HaveOccurred(), "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name)
It("should delete a job", func() {
By("Creating a job")
job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job := jobutil.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred(), "failed to create job in namespace: %s", f.Namespace.Name)
By("Ensuring active pods == parallelism")
err = framework.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
err = jobutil.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
Expect(err).NotTo(HaveOccurred(), "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
By("delete a job")
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
By("Ensuring job was deleted")
_, err = framework.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
_, err = jobutil.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
Expect(err).To(HaveOccurred(), "failed to ensure job %s was deleted in namespace: %s", job.Name, f.Namespace.Name)
It("should adopt matching orphans and release non-matching pods", func() {
By("Creating a job")
job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job := jobutil.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
// Replace job with the one returned from Create() so it has the UID.
// Save Kind since it won't be populated in the returned job.
kind := job.Kind
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred(), "failed to create job in namespace: %s", f.Namespace.Name)
job.Kind = kind
By("Ensuring active pods == parallelism")
err = framework.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
err = jobutil.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
Expect(err).NotTo(HaveOccurred(), "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
By("Orphaning one of the Job's Pods")
pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
pods, err := jobutil.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
Expect(err).NotTo(HaveOccurred(), "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
pod := pods.Items[0]
@ -142,7 +143,7 @@ var _ = SIGDescribe("Job", func() {
By("Checking that the Job readopts the Pod")
Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "adopted", framework.JobTimeout,
Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "adopted", jobutil.JobTimeout,
func(pod *v1.Pod) (bool, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
@ -161,7 +162,7 @@ var _ = SIGDescribe("Job", func() {
By("Checking that the Job releases the Pod")
Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "released", framework.JobTimeout,
Expect(framework.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "released", jobutil.JobTimeout,
func(pod *v1.Pod) (bool, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef != nil {
@ -175,16 +176,16 @@ var _ = SIGDescribe("Job", func() {
It("should exceed backoffLimit", func() {
By("Creating a job")
backoff := 1
job := framework.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff))
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job := jobutil.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff))
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred(), "failed to create job in namespace: %s", f.Namespace.Name)
By("Ensuring job exceed backofflimit")
err = framework.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, framework.JobTimeout, "BackoffLimitExceeded")
err = jobutil.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, jobutil.JobTimeout, "BackoffLimitExceeded")
Expect(err).NotTo(HaveOccurred(), "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name)
By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1))
pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
pods, err := jobutil.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
Expect(err).NotTo(HaveOccurred(), "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
// Expect(pods.Items).To(HaveLen(backoff + 1))
// due to NumRequeus not being stable enough, especially with failed status
@ -36,6 +36,7 @@ import (
nodepkg "k8s.io/kubernetes/pkg/controller/nodelifecycle"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
testutils "k8s.io/kubernetes/test/utils"
. "github.com/onsi/ginkgo"
@ -426,11 +427,11 @@ var _ = SIGDescribe("Network Partition [Disruptive] [Slow]", func() {
completions := int32(4)
backoffLimit := int32(6) // default value
job := framework.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever,
job := jobutil.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever,
parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
label := labels.SelectorFromSet(labels.Set(map[string]string{framework.JobSelectorKey: job.Name}))
label := labels.SelectorFromSet(labels.Set(map[string]string{jobutil.JobSelectorKey: job.Name}))
By(fmt.Sprintf("verifying that there are now %v running pods", parallelism))
_, err = framework.PodsCreatedByLabel(c, ns, job.Name, parallelism, label)
@ -53,6 +53,7 @@ go_library(
@ -21,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -54,11 +55,11 @@ var _ = SIGDescribe("Metadata Concealment", func() {
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
job, err := jobutil.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred(), "failed to create job (%s:%s)", f.Namespace.Name, job.Name)
By("Ensuring job reaches completions")
err = framework.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, int32(1))
err = jobutil.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, int32(1))
Expect(err).NotTo(HaveOccurred(), "failed to ensure job completion (%s:%s)", f.Namespace.Name, job.Name)
@ -14,7 +14,6 @@ go_library(
@ -47,7 +46,6 @@ go_library(
@ -155,6 +153,7 @@ filegroup(
@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
name = "go_default_library",
srcs = [
importpath = "k8s.io/kubernetes/test/e2e/framework/job",
visibility = ["//visibility:public"],
deps = [
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
@ -0,0 +1,27 @@
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package job
import "time"
const (
// JobTimeout is how long to wait for a job to finish.
JobTimeout = 15 * time.Minute
// JobSelectorKey is a job selector name
JobSelectorKey = "job"
@ -0,0 +1,109 @@
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package job
import (
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// NewTestJob returns a Job which does one of several testing behaviors. notTerminate starts a Job that will run
// effectively forever. fail starts a Job that will fail immediately. succeed starts a Job that will succeed
// immediately. randomlySucceedOrFail starts a Job that will succeed or fail randomly. failOnce fails the Job the
// first time it is run and succeeds subsequently. name is the Name of the Job. RestartPolicy indicates the restart
// policy of the containers in which the Pod is running. Parallelism is the Job's parallelism, and completions is the
// Job's required number of completions.
func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64, backoffLimit int32) *batchv1.Job {
manualSelector := false
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
TypeMeta: metav1.TypeMeta{
Kind: "Job",
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: activeDeadlineSeconds,
Parallelism: ¶llelism,
Completions: &completions,
BackoffLimit: &backoffLimit,
ManualSelector: &manualSelector,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{JobSelectorKey: name},
Spec: v1.PodSpec{
RestartPolicy: rPol,
Volumes: []v1.Volume{
Name: "data",
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
Containers: []v1.Container{
Name: "c",
Image: framework.BusyBoxImage,
Command: []string{},
VolumeMounts: []v1.VolumeMount{
MountPath: "/data",
Name: "data",
switch behavior {
case "notTerminate":
job.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "1000000"}
case "fail":
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 1"}
case "succeed":
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 0"}
case "randomlySucceedOrFail":
// Bash's $RANDOM generates pseudorandom int in range 0 - 32767.
// Dividing by 16384 gives roughly 50/50 chance of success.
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit $(( $RANDOM / 16384 ))"}
case "failOnce":
// Fail the first the container of the pod is run, and
// succeed the second time. Checks for file on emptydir.
// If present, succeed. If not, create but fail.
// Note that this cannot be used with RestartNever because
// it always fails the first time for a pod.
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "if [[ -r /data/foo ]] ; then exit 0 ; else touch /data/foo ; exit 1 ; fi"}
return job
// FinishTime returns finish time of the specified job.
func FinishTime(finishedJob *batchv1.Job) metav1.Time {
var finishTime metav1.Time
for _, c := range finishedJob.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
return c.LastTransitionTime
return finishTime
@ -0,0 +1,82 @@
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package job
import (
batch "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
// GetJob uses c to get the Job in namespace ns named name. If the returned error is nil, the returned Job is valid.
func GetJob(c clientset.Interface, ns, name string) (*batch.Job, error) {
return c.BatchV1().Jobs(ns).Get(name, metav1.GetOptions{})
// GetJobPods returns a list of Pods belonging to a Job.
func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) {
label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName}))
options := metav1.ListOptions{LabelSelector: label.String()}
return c.CoreV1().Pods(ns).List(options)
// CreateJob uses c to create job in namespace ns. If the returned error is nil, the returned Job is valid and has
// been created.
func CreateJob(c clientset.Interface, ns string, job *batch.Job) (*batch.Job, error) {
return c.BatchV1().Jobs(ns).Create(job)
// UpdateJob uses c to updated job in namespace ns. If the returned error is nil, the returned Job is valid and has
// been updated.
func UpdateJob(c clientset.Interface, ns string, job *batch.Job) (*batch.Job, error) {
return c.BatchV1().Jobs(ns).Update(job)
// UpdateJobWithRetries updates job with retries.
func UpdateJobWithRetries(c clientset.Interface, namespace, name string, applyUpdate func(*batch.Job)) (job *batch.Job, err error) {
jobs := c.BatchV1().Jobs(namespace)
var updateErr error
pollErr := wait.PollImmediate(framework.Poll, JobTimeout, func() (bool, error) {
if job, err = jobs.Get(name, metav1.GetOptions{}); err != nil {
return false, err
// Apply the update, then attempt to push it to the apiserver.
if job, err = jobs.Update(job); err == nil {
framework.Logf("Updating job %s", name)
return true, nil
updateErr = err
return false, nil
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("couldn't apply the provided updated to job %q: %v", name, updateErr)
return job, pollErr
// DeleteJob uses c to delete the Job named name in namespace ns. If the returned error is nil, the Job has been
// deleted.
func DeleteJob(c clientset.Interface, ns, name string) error {
return c.BatchV1().Jobs(ns).Delete(name, nil)
@ -0,0 +1,142 @@
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package job
import (
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
jobutil "k8s.io/kubernetes/pkg/controller/job"
// WaitForAllJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use
// when pods will run for a long time, or it will be racy.
func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error {
return wait.Poll(framework.Poll, JobTimeout, func() (bool, error) {
pods, err := GetJobPods(c, ns, jobName)
if err != nil {
return false, err
count := int32(0)
for _, p := range pods.Items {
if p.Status.Phase == v1.PodRunning {
return count == parallelism, nil
// WaitForJobComplete uses c to wait for completions to complete for the Job jobName in namespace ns.
func WaitForJobComplete(c clientset.Interface, ns, jobName string, completions int32) error {
return wait.Poll(framework.Poll, JobTimeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
return curr.Status.Succeeded == completions, nil
// WaitForJobFinish uses c to wait for the Job jobName in namespace ns to finish (either Failed or Complete).
func WaitForJobFinish(c clientset.Interface, ns, jobName string) error {
return wait.PollImmediate(framework.Poll, JobTimeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
return jobutil.IsJobFinished(curr), nil
// WaitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.
func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error {
return wait.Poll(framework.Poll, timeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
for _, c := range curr.Status.Conditions {
if c.Type == batchv1.JobFailed && c.Status == v1.ConditionTrue {
if reason == "" || reason == c.Reason {
return true, nil
return false, nil
// WaitForJobGone uses c to wait for up to timeout for the Job named jobName in namespace ns to be removed.
func WaitForJobGone(c clientset.Interface, ns, jobName string, timeout time.Duration) error {
return wait.Poll(framework.Poll, timeout, func() (bool, error) {
_, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return true, nil
return false, err
// CheckForAllJobPodsRunning uses c to check in the Job named jobName in ns is running. If the returned error is not
// nil the returned bool is true if the Job is running.
func CheckForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) (bool, error) {
label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName}))
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(options)
if err != nil {
return false, err
count := int32(0)
for _, p := range pods.Items {
if p.Status.Phase == v1.PodRunning {
return count == parallelism, nil
// WaitForAllJobPodsGone waits for all pods for the Job named jobName in namespace ns
// to be deleted.
func WaitForAllJobPodsGone(c clientset.Interface, ns, jobName string) error {
return wait.PollImmediate(framework.Poll, JobTimeout, func() (bool, error) {
pods, err := GetJobPods(c, ns, jobName)
if err != nil {
return false, err
return len(pods.Items) == 0, nil
// WaitForJobDeleting uses c to wait for the Job jobName in namespace ns to have
// a non-nil deletionTimestamp (i.e. being deleted).
func WaitForJobDeleting(c clientset.Interface, ns, jobName string) error {
return wait.PollImmediate(framework.Poll, JobTimeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
return curr.ObjectMeta.DeletionTimestamp != nil, nil
@ -1,318 +0,0 @@
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package framework
import (
batch "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
jobutil "k8s.io/kubernetes/pkg/controller/job"
const (
// JobTimeout is how long to wait for a job to finish.
JobTimeout = 15 * time.Minute
// JobSelectorKey is a job selector name
JobSelectorKey = "job"
// NewTestJob returns a Job which does one of several testing behaviors. notTerminate starts a Job that will run
// effectively forever. fail starts a Job that will fail immediately. succeed starts a Job that will succeed
// immediately. randomlySucceedOrFail starts a Job that will succeed or fail randomly. failOnce fails the Job the
// first time it is run and succeeds subsequently. name is the Name of the Job. RestartPolicy indicates the restart
// policy of the containers in which the Pod is running. Parallelism is the Job's parallelism, and completions is the
// Job's required number of completions.
func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64, backoffLimit int32) *batch.Job {
job := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
TypeMeta: metav1.TypeMeta{
Kind: "Job",
Spec: batch.JobSpec{
ActiveDeadlineSeconds: activeDeadlineSeconds,
Parallelism: ¶llelism,
Completions: &completions,
BackoffLimit: &backoffLimit,
ManualSelector: newBool(false),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{JobSelectorKey: name},
Spec: v1.PodSpec{
RestartPolicy: rPol,
Volumes: []v1.Volume{
Name: "data",
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
Containers: []v1.Container{
Name: "c",
Image: BusyBoxImage,
Command: []string{},
VolumeMounts: []v1.VolumeMount{
MountPath: "/data",
Name: "data",
switch behavior {
case "notTerminate":
job.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "1000000"}
case "fail":
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 1"}
case "succeed":
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 0"}
case "randomlySucceedOrFail":
// Bash's $RANDOM generates pseudorandom int in range 0 - 32767.
// Dividing by 16384 gives roughly 50/50 chance of success.
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit $(( $RANDOM / 16384 ))"}
case "failOnce":
// Fail the first the container of the pod is run, and
// succeed the second time. Checks for file on emptydir.
// If present, succeed. If not, create but fail.
// Note that this cannot be used with RestartNever because
// it always fails the first time for a pod.
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "if [[ -r /data/foo ]] ; then exit 0 ; else touch /data/foo ; exit 1 ; fi"}
return job
// GetJob uses c to get the Job in namespace ns named name. If the returned error is nil, the returned Job is valid.
func GetJob(c clientset.Interface, ns, name string) (*batch.Job, error) {
return c.BatchV1().Jobs(ns).Get(name, metav1.GetOptions{})
// CreateJob uses c to create job in namespace ns. If the returned error is nil, the returned Job is valid and has
// been created.
func CreateJob(c clientset.Interface, ns string, job *batch.Job) (*batch.Job, error) {
return c.BatchV1().Jobs(ns).Create(job)
// UpdateJob uses c to updated job in namespace ns. If the returned error is nil, the returned Job is valid and has
// been updated.
func UpdateJob(c clientset.Interface, ns string, job *batch.Job) (*batch.Job, error) {
return c.BatchV1().Jobs(ns).Update(job)
// UpdateJobFunc updates the job object. It retries if there is a conflict, throw out error if
// there is any other errors. name is the job name, updateFn is the function updating the
// job object.
func UpdateJobFunc(c clientset.Interface, ns, name string, updateFn func(job *batch.Job)) {
ExpectNoError(wait.Poll(time.Millisecond*500, time.Second*30, func() (bool, error) {
job, err := GetJob(c, ns, name)
if err != nil {
return false, fmt.Errorf("failed to get pod %q: %v", name, err)
_, err = UpdateJob(c, ns, job)
if err == nil {
Logf("Successfully updated job %q", name)
return true, nil
if errors.IsConflict(err) {
Logf("Conflicting update to job %q, re-get and re-update: %v", name, err)
return false, nil
return false, fmt.Errorf("failed to update job %q: %v", name, err)
// DeleteJob uses c to delete the Job named name in namespace ns. If the returned error is nil, the Job has been
// deleted.
func DeleteJob(c clientset.Interface, ns, name string) error {
return c.BatchV1().Jobs(ns).Delete(name, nil)
// GetJobPods returns a list of Pods belonging to a Job.
func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) {
label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName}))
options := metav1.ListOptions{LabelSelector: label.String()}
return c.CoreV1().Pods(ns).List(options)
// WaitForAllJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use
// when pods will run for a long time, or it will be racy.
func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error {
return wait.Poll(Poll, JobTimeout, func() (bool, error) {
pods, err := GetJobPods(c, ns, jobName)
if err != nil {
return false, err
count := int32(0)
for _, p := range pods.Items {
if p.Status.Phase == v1.PodRunning {
return count == parallelism, nil
// WaitForJobComplete uses c to wait for completions to complete for the Job jobName in namespace ns.
func WaitForJobComplete(c clientset.Interface, ns, jobName string, completions int32) error {
return wait.Poll(Poll, JobTimeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
return curr.Status.Succeeded == completions, nil
// WaitForJobFinish uses c to wait for the Job jobName in namespace ns to finish (either Failed or Complete).
func WaitForJobFinish(c clientset.Interface, ns, jobName string) error {
return wait.PollImmediate(Poll, JobTimeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
return jobutil.IsJobFinished(curr), nil
// WaitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.
func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error {
return wait.Poll(Poll, timeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
for _, c := range curr.Status.Conditions {
if c.Type == batch.JobFailed && c.Status == v1.ConditionTrue {
if reason == "" || reason == c.Reason {
return true, nil
return false, nil
// WaitForJobGone uses c to wait for up to timeout for the Job named jobName in namespace ns to be removed.
func WaitForJobGone(c clientset.Interface, ns, jobName string, timeout time.Duration) error {
return wait.Poll(Poll, timeout, func() (bool, error) {
_, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return true, nil
return false, err
// CheckForAllJobPodsRunning uses c to check in the Job named jobName in ns is running. If the returned error is not
// nil the returned bool is true if the Job is running.
func CheckForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) (bool, error) {
label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName}))
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(options)
if err != nil {
return false, err
count := int32(0)
for _, p := range pods.Items {
if p.Status.Phase == v1.PodRunning {
return count == parallelism, nil
// WaitForAllJobPodsGone waits for all pods for the Job named jobName in namespace ns
// to be deleted.
func WaitForAllJobPodsGone(c clientset.Interface, ns, jobName string) error {
return wait.PollImmediate(Poll, JobTimeout, func() (bool, error) {
pods, err := GetJobPods(c, ns, jobName)
if err != nil {
return false, err
return len(pods.Items) == 0, nil
func newBool(val bool) *bool {
p := new(bool)
*p = val
return p
type updateJobFunc func(*batch.Job)
// UpdateJobWithRetries updates jobs with retries.
func UpdateJobWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateJobFunc) (job *batch.Job, err error) {
jobs := c.BatchV1().Jobs(namespace)
var updateErr error
pollErr := wait.PollImmediate(Poll, JobTimeout, func() (bool, error) {
if job, err = jobs.Get(name, metav1.GetOptions{}); err != nil {
return false, err
// Apply the update, then attempt to push it to the apiserver.
if job, err = jobs.Update(job); err == nil {
Logf("Updating job %s", name)
return true, nil
updateErr = err
return false, nil
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("couldn't apply the provided updated to job %q: %v", name, updateErr)
return job, pollErr
// WaitForJobDeleting uses c to wait for the Job jobName in namespace ns to have
// a non-nil deletionTimestamp (i.e. being deleted).
func WaitForJobDeleting(c clientset.Interface, ns, jobName string) error {
return wait.PollImmediate(Poll, JobTimeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
return curr.ObjectMeta.DeletionTimestamp != nil, nil
// JobFinishTime returns finish time of the specified job.
func JobFinishTime(finishedJob *batch.Job) metav1.Time {
var finishTime metav1.Time
for _, c := range finishedJob.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
return c.LastTransitionTime
return finishTime
@ -31,6 +31,7 @@ go_library(
@ -58,6 +58,7 @@ import (
commonutils "k8s.io/kubernetes/test/e2e/common"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
testutils "k8s.io/kubernetes/test/utils"
@ -1699,7 +1700,7 @@ metadata:
gomega.Expect(runOutput).To(gomega.ContainSubstring("stdin closed"))
err := framework.WaitForJobGone(c, ns, jobName, wait.ForeverTestTimeout)
err := jobutil.WaitForJobGone(c, ns, jobName, wait.ForeverTestTimeout)
ginkgo.By("verifying the job " + jobName + " was deleted")
@ -36,6 +36,7 @@ go_library(
@ -24,6 +24,7 @@ import (
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -47,11 +48,11 @@ func cleanupJob(f *framework.Framework, job *batch.Job) {
removeFinalizerFunc := func(j *batch.Job) {
j.ObjectMeta.Finalizers = slice.RemoveString(j.ObjectMeta.Finalizers, dummyFinalizer, nil)
_, err := framework.UpdateJobWithRetries(c, ns, job.Name, removeFinalizerFunc)
_, err := jobutil.UpdateJobWithRetries(c, ns, job.Name, removeFinalizerFunc)
framework.WaitForJobGone(c, ns, job.Name, wait.ForeverTestTimeout)
jobutil.WaitForJobGone(c, ns, job.Name, wait.ForeverTestTimeout)
err = framework.WaitForAllJobPodsGone(c, ns, job.Name)
err = jobutil.WaitForAllJobPodsGone(c, ns, job.Name)
@ -64,27 +65,27 @@ func testFinishedJob(f *framework.Framework) {
backoffLimit := int32(2)
ttl := int32(10)
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job := jobutil.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job.Spec.TTLSecondsAfterFinished = &ttl
job.ObjectMeta.Finalizers = []string{dummyFinalizer}
defer cleanupJob(f, job)
framework.Logf("Create a Job %s/%s with TTL", ns, job.Name)
job, err := framework.CreateJob(c, ns, job)
job, err := jobutil.CreateJob(c, ns, job)
framework.Logf("Wait for the Job to finish")
err = framework.WaitForJobFinish(c, ns, job.Name)
err = jobutil.WaitForJobFinish(c, ns, job.Name)
framework.Logf("Wait for TTL after finished controller to delete the Job")
err = framework.WaitForJobDeleting(c, ns, job.Name)
err = jobutil.WaitForJobDeleting(c, ns, job.Name)
framework.Logf("Check Job's deletionTimestamp and compare with the time when the Job finished")
job, err = framework.GetJob(c, ns, job.Name)
job, err = jobutil.GetJob(c, ns, job.Name)
finishTime := framework.JobFinishTime(job)
finishTime := jobutil.FinishTime(job)
finishTimeUTC := finishTime.UTC()
@ -38,6 +38,7 @@ go_library(
@ -27,6 +27,7 @@ go_library(
@ -20,6 +20,7 @@ import (
batch "k8s.io/api/batch/v1"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
@ -40,13 +41,13 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) {
t.namespace = f.Namespace.Name
ginkgo.By("Creating a job")
t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil, 6)
job, err := framework.CreateJob(f.ClientSet, t.namespace, t.job)
t.job = jobutil.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil, 6)
job, err := jobutil.CreateJob(f.ClientSet, t.namespace, t.job)
t.job = job
ginkgo.By("Ensuring active pods == parallelism")
err = framework.WaitForAllJobPodsRunning(f.ClientSet, t.namespace, job.Name, 2)
err = jobutil.WaitForAllJobPodsRunning(f.ClientSet, t.namespace, job.Name, 2)
@ -54,7 +55,7 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) {
func (t *JobUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
ginkgo.By("Ensuring active pods == parallelism")
running, err := framework.CheckForAllJobPodsRunning(f.ClientSet, t.namespace, t.job.Name, 2)
running, err := jobutil.CheckForAllJobPodsRunning(f.ClientSet, t.namespace, t.job.Name, 2)
@ -24,6 +24,7 @@ import (
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -54,7 +55,7 @@ func (t *NvidiaGPUUpgradeTest) Test(f *framework.Framework, done <-chan struct{}
if upgrade == MasterUpgrade || upgrade == ClusterUpgrade {
// MasterUpgrade should be totally hitless.
job, err := framework.GetJob(f.ClientSet, f.Namespace.Name, "cuda-add")
job, err := jobutil.GetJob(f.ClientSet, f.Namespace.Name, "cuda-add")
gomega.Expect(job.Status.Failed).To(gomega.BeZero(), "Job pods failed during master upgrade: %v", job.Status.Failed)
@ -69,7 +70,7 @@ func (t *NvidiaGPUUpgradeTest) Teardown(f *framework.Framework) {
func (t *NvidiaGPUUpgradeTest) startJob(f *framework.Framework) {
var activeSeconds int64 = 3600
// Specifies 100 completions to make sure the job life spans across the upgrade.
testJob := framework.NewTestJob("succeed", "cuda-add", v1.RestartPolicyAlways, 1, 100, &activeSeconds, 6)
testJob := jobutil.NewTestJob("succeed", "cuda-add", v1.RestartPolicyAlways, 1, 100, &activeSeconds, 6)
testJob.Spec.Template.Spec = v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: []v1.Container{
@ -86,11 +87,11 @@ func (t *NvidiaGPUUpgradeTest) startJob(f *framework.Framework) {
ns := f.Namespace.Name
_, err := framework.CreateJob(f.ClientSet, ns, testJob)
_, err := jobutil.CreateJob(f.ClientSet, ns, testJob)
framework.Logf("Created job %v", testJob)
ginkgo.By("Waiting for gpu job pod start")
err = framework.WaitForAllJobPodsRunning(f.ClientSet, ns, testJob.Name, 1)
err = jobutil.WaitForAllJobPodsRunning(f.ClientSet, ns, testJob.Name, 1)
ginkgo.By("Done with gpu job pod start")
@ -99,9 +100,9 @@ func (t *NvidiaGPUUpgradeTest) startJob(f *framework.Framework) {
func (t *NvidiaGPUUpgradeTest) verifyJobPodSuccess(f *framework.Framework) {
// Wait for client pod to complete.
ns := f.Namespace.Name
err := framework.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, "cuda-add", 1)
err := jobutil.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, "cuda-add", 1)
pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, "cuda-add")
pods, err := jobutil.GetJobPods(f.ClientSet, f.Namespace.Name, "cuda-add")
createdPod := pods.Items[0].Name
framework.Logf("Created pod %v", createdPod)
Reference in New Issue