2015-08-27 12:19:35 +00:00
/ *
2016-06-03 00:25:58 +00:00
Copyright 2015 The Kubernetes Authors .
2015-08-27 12:19:35 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package job
import (
2016-09-20 03:16:40 +00:00
"fmt"
2015-08-27 12:19:35 +00:00
"reflect"
"sort"
"sync"
"time"
2017-06-22 18:24:23 +00:00
batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
2017-01-13 17:48:50 +00:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-11 14:09:48 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2017-02-27 01:49:00 +00:00
"k8s.io/apimachinery/pkg/labels"
2017-01-11 14:09:48 +00:00
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
2017-06-23 20:56:37 +00:00
batchinformers "k8s.io/client-go/informers/batch/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
2017-07-13 22:52:52 +00:00
"k8s.io/client-go/kubernetes/scheme"
2017-01-30 18:39:54 +00:00
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
2017-06-23 20:56:37 +00:00
batchv1listers "k8s.io/client-go/listers/batch/v1"
corelisters "k8s.io/client-go/listers/core/v1"
2017-01-24 14:11:51 +00:00
"k8s.io/client-go/tools/cache"
2017-01-30 18:39:54 +00:00
"k8s.io/client-go/tools/record"
2017-01-27 15:20:40 +00:00
"k8s.io/client-go/util/workqueue"
2015-08-27 12:19:35 +00:00
"k8s.io/kubernetes/pkg/controller"
2016-04-13 18:38:32 +00:00
"k8s.io/kubernetes/pkg/util/metrics"
2016-09-20 03:16:40 +00:00
"github.com/golang/glog"
2015-08-27 12:19:35 +00:00
)
2017-02-27 01:00:33 +00:00
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch . SchemeGroupVersion . WithKind ( "Job" )
2015-09-18 02:16:04 +00:00
type JobController struct {
2016-01-15 05:00:58 +00:00
kubeClient clientset . Interface
2015-08-27 12:19:35 +00:00
podControl controller . PodControlInterface
2015-09-22 08:05:54 +00:00
// To allow injection of updateJobStatus for testing.
2016-04-18 15:44:19 +00:00
updateHandler func ( job * batch . Job ) error
2015-08-27 12:19:35 +00:00
syncHandler func ( jobKey string ) error
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
2016-11-01 19:57:49 +00:00
podStoreSynced cache . InformerSynced
// jobStoreSynced returns true if the job store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jobStoreSynced cache . InformerSynced
2015-08-27 12:19:35 +00:00
// A TTLCache of pod creates/deletes each rc expects to see
expectations controller . ControllerExpectationsInterface
2016-11-01 19:57:49 +00:00
// A store of jobs
2016-11-18 20:50:17 +00:00
jobLister batchv1listers . JobLister
2015-08-27 12:19:35 +00:00
// A store of pods, populated by the podController
2017-02-06 18:35:50 +00:00
podStore corelisters . PodLister
2015-08-27 12:19:35 +00:00
// Jobs that need to be updated
2016-09-20 03:16:40 +00:00
queue workqueue . RateLimitingInterface
2015-11-26 15:54:04 +00:00
recorder record . EventRecorder
2015-08-27 12:19:35 +00:00
}
2017-02-06 18:35:50 +00:00
func NewJobController ( podInformer coreinformers . PodInformer , jobInformer batchinformers . JobInformer , kubeClient clientset . Interface ) * JobController {
2015-08-27 12:19:35 +00:00
eventBroadcaster := record . NewBroadcaster ( )
eventBroadcaster . StartLogging ( glog . Infof )
2016-01-15 05:00:58 +00:00
// TODO: remove the wrapper when every clients have moved to use the clientset.
2017-01-30 18:39:54 +00:00
eventBroadcaster . StartRecordingToSink ( & v1core . EventSinkImpl { Interface : v1core . New ( kubeClient . Core ( ) . RESTClient ( ) ) . Events ( "" ) } )
2015-08-27 12:19:35 +00:00
2016-10-13 12:56:07 +00:00
if kubeClient != nil && kubeClient . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) != nil {
metrics . RegisterMetricAndTrackRateLimiterUsage ( "job_controller" , kubeClient . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) )
2016-04-13 18:38:32 +00:00
}
2015-09-18 02:16:04 +00:00
jm := & JobController {
2015-08-27 12:19:35 +00:00
kubeClient : kubeClient ,
podControl : controller . RealPodControl {
KubeClient : kubeClient ,
2017-07-15 05:25:54 +00:00
Recorder : eventBroadcaster . NewRecorder ( scheme . Scheme , v1 . EventSource { Component : "job-controller" } ) ,
2015-08-27 12:19:35 +00:00
} ,
expectations : controller . NewControllerExpectations ( ) ,
2016-09-20 03:16:40 +00:00
queue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "job" ) ,
2017-07-15 05:25:54 +00:00
recorder : eventBroadcaster . NewRecorder ( scheme . Scheme , v1 . EventSource { Component : "job-controller" } ) ,
2015-08-27 12:19:35 +00:00
}
2016-11-01 19:57:49 +00:00
jobInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : jm . enqueueController ,
UpdateFunc : func ( old , cur interface { } ) {
if job := cur . ( * batch . Job ) ; ! IsJobFinished ( job ) {
jm . enqueueController ( job )
}
2015-08-27 12:19:35 +00:00
} ,
2016-11-01 19:57:49 +00:00
DeleteFunc : jm . enqueueController ,
} )
jm . jobLister = jobInformer . Lister ( )
jm . jobStoreSynced = jobInformer . Informer ( ) . HasSynced
2015-08-27 12:19:35 +00:00
2017-02-06 18:35:50 +00:00
podInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
2016-04-19 14:22:22 +00:00
AddFunc : jm . addPod ,
UpdateFunc : jm . updatePod ,
DeleteFunc : jm . deletePod ,
} )
2017-02-06 18:35:50 +00:00
jm . podStore = podInformer . Lister ( )
jm . podStoreSynced = podInformer . Informer ( ) . HasSynced
2015-08-27 12:19:35 +00:00
2015-09-22 08:05:54 +00:00
jm . updateHandler = jm . updateJobStatus
2015-08-27 12:19:35 +00:00
jm . syncHandler = jm . syncJob
2016-04-19 14:22:22 +00:00
return jm
}
2015-08-27 12:19:35 +00:00
// Run the main goroutine responsible for watching and syncing jobs.
2015-09-18 02:16:04 +00:00
func ( jm * JobController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleCrash ( )
2016-09-20 03:16:40 +00:00
defer jm . queue . ShutDown ( )
2017-04-12 19:49:17 +00:00
glog . Infof ( "Starting job controller" )
defer glog . Infof ( "Shutting down job controller" )
if ! controller . WaitForCacheSync ( "job" , stopCh , jm . podStoreSynced , jm . jobStoreSynced ) {
2016-09-20 03:16:40 +00:00
return
}
2015-08-27 12:19:35 +00:00
for i := 0 ; i < workers ; i ++ {
2016-02-02 10:57:06 +00:00
go wait . Until ( jm . worker , time . Second , stopCh )
2015-08-27 12:19:35 +00:00
}
2016-04-19 14:22:22 +00:00
2015-08-27 12:19:35 +00:00
<- stopCh
}
2017-02-27 02:25:21 +00:00
// getPodJobs returns a list of Jobs that potentially match a Pod.
func ( jm * JobController ) getPodJobs ( pod * v1 . Pod ) [ ] * batch . Job {
2016-11-01 19:57:49 +00:00
jobs , err := jm . jobLister . GetPodJobs ( pod )
2015-08-27 12:19:35 +00:00
if err != nil {
return nil
}
2015-09-18 19:16:38 +00:00
if len ( jobs ) > 1 {
2017-02-27 02:25:21 +00:00
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
2016-09-20 03:16:40 +00:00
utilruntime . HandleError ( fmt . Errorf ( "user error! more than one job is selecting pods with labels: %+v" , pod . Labels ) )
2015-09-18 19:16:38 +00:00
}
2017-02-27 02:25:21 +00:00
ret := make ( [ ] * batch . Job , 0 , len ( jobs ) )
for i := range jobs {
ret = append ( ret , & jobs [ i ] )
}
return ret
2015-08-27 12:19:35 +00:00
}
2017-03-06 19:10:03 +00:00
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
2017-07-21 06:09:18 +00:00
// of the correct Kind.
2017-03-06 19:10:03 +00:00
func ( jm * JobController ) resolveControllerRef ( namespace string , controllerRef * metav1 . OwnerReference ) * batch . Job {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef . Kind != controllerKind . Kind {
return nil
}
job , err := jm . jobLister . Jobs ( namespace ) . Get ( controllerRef . Name )
if err != nil {
return nil
}
if job . UID != controllerRef . UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return job
}
2015-08-27 12:19:35 +00:00
// When a pod is created, enqueue the controller that manages it and update it's expectations.
2015-09-18 02:16:04 +00:00
func ( jm * JobController ) addPod ( obj interface { } ) {
2016-11-18 20:50:17 +00:00
pod := obj . ( * v1 . Pod )
2015-08-27 12:19:35 +00:00
if pod . DeletionTimestamp != nil {
2015-09-18 02:16:04 +00:00
// on a restart of the controller controller, it's possible a new pod shows up in a state that
2015-08-27 12:19:35 +00:00
// is already pending deletion. Prevent the pod from being a creation observation.
jm . deletePod ( pod )
return
}
2017-02-27 02:25:21 +00:00
// If it has a ControllerRef, that's all that matters.
2017-08-02 09:41:33 +00:00
if controllerRef := metav1 . GetControllerOf ( pod ) ; controllerRef != nil {
2017-03-06 19:10:03 +00:00
job := jm . resolveControllerRef ( pod . Namespace , controllerRef )
if job == nil {
2017-02-27 02:25:21 +00:00
return
}
2015-08-27 12:19:35 +00:00
jobKey , err := controller . KeyFunc ( job )
if err != nil {
return
}
jm . expectations . CreationObserved ( jobKey )
jm . enqueueController ( job )
2017-02-27 02:25:21 +00:00
return
}
// Otherwise, it's an orphan. Get a list of all matching controllers and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _ , job := range jm . getPodJobs ( pod ) {
jm . enqueueController ( job )
2015-08-27 12:19:35 +00:00
}
}
// When a pod is updated, figure out what job/s manage it and wake them up.
// If the labels of the pod have changed we need to awaken both the old
2016-11-18 20:50:17 +00:00
// and new job. old and cur must be *v1.Pod types.
2015-09-18 02:16:04 +00:00
func ( jm * JobController ) updatePod ( old , cur interface { } ) {
2016-11-18 20:50:17 +00:00
curPod := cur . ( * v1 . Pod )
oldPod := old . ( * v1 . Pod )
2016-08-09 13:57:21 +00:00
if curPod . ResourceVersion == oldPod . ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
2015-08-27 12:19:35 +00:00
return
}
if curPod . DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an job to create more pods asap, not wait
// until the kubelet actually deletes the pod.
jm . deletePod ( curPod )
return
}
2017-02-27 02:25:21 +00:00
labelChanged := ! reflect . DeepEqual ( curPod . Labels , oldPod . Labels )
2017-08-02 09:41:33 +00:00
curControllerRef := metav1 . GetControllerOf ( curPod )
oldControllerRef := metav1 . GetControllerOf ( oldPod )
2017-02-27 02:25:21 +00:00
controllerRefChanged := ! reflect . DeepEqual ( curControllerRef , oldControllerRef )
2017-03-06 19:10:03 +00:00
if controllerRefChanged && oldControllerRef != nil {
2017-02-27 02:25:21 +00:00
// The ControllerRef was changed. Sync the old controller, if any.
2017-03-06 19:10:03 +00:00
if job := jm . resolveControllerRef ( oldPod . Namespace , oldControllerRef ) ; job != nil {
2017-02-27 02:25:21 +00:00
jm . enqueueController ( job )
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
2017-03-06 19:10:03 +00:00
job := jm . resolveControllerRef ( curPod . Namespace , curControllerRef )
if job == nil {
2017-02-27 02:25:21 +00:00
return
}
2015-08-27 12:19:35 +00:00
jm . enqueueController ( job )
2017-02-27 02:25:21 +00:00
return
2015-08-27 12:19:35 +00:00
}
2017-02-27 02:25:21 +00:00
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
for _ , job := range jm . getPodJobs ( curPod ) {
jm . enqueueController ( job )
2015-08-27 12:19:35 +00:00
}
}
}
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
2016-11-18 20:50:17 +00:00
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
2015-09-18 02:16:04 +00:00
func ( jm * JobController ) deletePod ( obj interface { } ) {
2016-11-18 20:50:17 +00:00
pod , ok := obj . ( * v1 . Pod )
2015-08-27 12:19:35 +00:00
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new job will not be woken up till the periodic resync.
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2017-02-27 02:25:21 +00:00
utilruntime . HandleError ( fmt . Errorf ( "couldn't get object from tombstone %+v" , obj ) )
2015-08-27 12:19:35 +00:00
return
}
2016-11-18 20:50:17 +00:00
pod , ok = tombstone . Obj . ( * v1 . Pod )
2015-08-27 12:19:35 +00:00
if ! ok {
2017-02-27 02:25:21 +00:00
utilruntime . HandleError ( fmt . Errorf ( "tombstone contained object that is not a pod %+v" , obj ) )
2015-08-27 12:19:35 +00:00
return
}
}
2017-02-27 02:25:21 +00:00
2017-08-02 09:41:33 +00:00
controllerRef := metav1 . GetControllerOf ( pod )
2017-02-27 02:25:21 +00:00
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
2017-03-06 19:10:03 +00:00
job := jm . resolveControllerRef ( pod . Namespace , controllerRef )
if job == nil {
2017-02-27 02:25:21 +00:00
return
}
jobKey , err := controller . KeyFunc ( job )
if err != nil {
return
2015-08-27 12:19:35 +00:00
}
2017-02-27 02:25:21 +00:00
jm . expectations . DeletionObserved ( jobKey )
jm . enqueueController ( job )
2015-08-27 12:19:35 +00:00
}
2016-04-18 15:44:19 +00:00
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
2015-09-18 02:16:04 +00:00
func ( jm * JobController ) enqueueController ( obj interface { } ) {
2015-08-27 12:19:35 +00:00
key , err := controller . KeyFunc ( obj )
if err != nil {
2016-09-20 03:16:40 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for object %+v: %v" , obj , err ) )
2015-08-27 12:19:35 +00:00
return
}
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
jm . queue . Add ( key )
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
2015-09-18 02:16:04 +00:00
func ( jm * JobController ) worker ( ) {
2016-09-20 03:16:40 +00:00
for jm . processNextWorkItem ( ) {
2015-08-27 12:19:35 +00:00
}
}
2016-09-20 03:16:40 +00:00
func ( jm * JobController ) processNextWorkItem ( ) bool {
key , quit := jm . queue . Get ( )
if quit {
return false
}
defer jm . queue . Done ( key )
err := jm . syncHandler ( key . ( string ) )
if err == nil {
jm . queue . Forget ( key )
return true
}
utilruntime . HandleError ( fmt . Errorf ( "Error syncing job: %v" , err ) )
jm . queue . AddRateLimited ( key )
return true
}
2017-02-27 01:49:00 +00:00
// getPodsForJob returns the set of pods that this Job should manage.
// It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned Pods are pointers into the cache.
func ( jm * JobController ) getPodsForJob ( j * batch . Job ) ( [ ] * v1 . Pod , error ) {
selector , err := metav1 . LabelSelectorAsSelector ( j . Spec . Selector )
if err != nil {
return nil , fmt . Errorf ( "couldn't convert Job selector: %v" , err )
}
// List all pods to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
pods , err := jm . podStore . Pods ( j . Namespace ) . List ( labels . Everything ( ) )
if err != nil {
return nil , err
}
2017-04-12 22:21:27 +00:00
// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller . RecheckDeletionTimestamp ( func ( ) ( metav1 . Object , error ) {
fresh , err := jm . kubeClient . BatchV1 ( ) . Jobs ( j . Namespace ) . Get ( j . Name , metav1 . GetOptions { } )
if err != nil {
return nil , err
}
if fresh . UID != j . UID {
return nil , fmt . Errorf ( "original Job %v/%v is gone: got uid %v, wanted %v" , j . Namespace , j . Name , fresh . UID , j . UID )
}
return fresh , nil
} )
cm := controller . NewPodControllerRefManager ( jm . podControl , j , selector , controllerKind , canAdoptFunc )
2017-02-27 01:49:00 +00:00
return cm . ClaimPods ( pods )
}
2015-08-27 12:19:35 +00:00
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
2015-09-18 02:16:04 +00:00
func ( jm * JobController ) syncJob ( key string ) error {
2015-08-27 12:19:35 +00:00
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing job %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2016-11-01 19:57:49 +00:00
ns , name , err := cache . SplitMetaNamespaceKey ( key )
if err != nil {
return err
}
if len ( ns ) == 0 || len ( name ) == 0 {
return fmt . Errorf ( "invalid job key %q: either namespace or name is missing" , key )
2015-08-27 12:19:35 +00:00
}
2016-11-01 19:57:49 +00:00
sharedJob , err := jm . jobLister . Jobs ( ns ) . Get ( name )
2015-08-27 12:19:35 +00:00
if err != nil {
2016-11-01 19:57:49 +00:00
if errors . IsNotFound ( err ) {
glog . V ( 4 ) . Infof ( "Job has been deleted: %v" , key )
jm . expectations . DeleteExpectations ( key )
return nil
}
2015-08-27 12:19:35 +00:00
return err
}
2016-11-01 19:57:49 +00:00
job := * sharedJob
2015-08-27 12:19:35 +00:00
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
2016-09-20 03:16:40 +00:00
jobNeedsSync := jm . expectations . SatisfiedExpectations ( key )
2017-02-27 01:49:00 +00:00
pods , err := jm . getPodsForJob ( & job )
2015-08-27 12:19:35 +00:00
if err != nil {
return err
}
2016-08-17 14:16:01 +00:00
activePods := controller . FilterActivePods ( pods )
2016-04-27 04:35:14 +00:00
active := int32 ( len ( activePods ) )
2016-08-17 14:16:01 +00:00
succeeded , failed := getStatus ( pods )
2015-11-26 15:54:04 +00:00
conditions := len ( job . Status . Conditions )
if job . Status . StartTime == nil {
2016-12-03 18:57:26 +00:00
now := metav1 . Now ( )
2015-11-26 15:54:04 +00:00
job . Status . StartTime = & now
2015-08-27 12:19:35 +00:00
}
2016-01-12 13:59:14 +00:00
// if job was finished previously, we don't want to redo the termination
2016-07-18 20:24:57 +00:00
if IsJobFinished ( & job ) {
2016-01-12 13:59:14 +00:00
return nil
}
2017-05-15 14:52:50 +00:00
var manageJobErr error
2015-11-26 15:54:04 +00:00
if pastActiveDeadline ( & job ) {
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
// exists ATM. There's an open discussion in the topic in
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
// some sort of solution to above problem.
// kill remaining active pods
wait := sync . WaitGroup { }
2017-05-15 14:52:50 +00:00
errCh := make ( chan error , int ( active ) )
2016-04-27 04:35:14 +00:00
wait . Add ( int ( active ) )
for i := int32 ( 0 ) ; i < active ; i ++ {
go func ( ix int32 ) {
2015-11-26 15:54:04 +00:00
defer wait . Done ( )
2015-11-27 16:36:39 +00:00
if err := jm . podControl . DeletePod ( job . Namespace , activePods [ ix ] . Name , & job ) ; err != nil {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleError ( err )
2017-05-15 14:52:50 +00:00
glog . V ( 2 ) . Infof ( "Failed to delete %v, job %q/%q deadline exceeded" , activePods [ ix ] . Name , job . Namespace , job . Name )
errCh <- err
2015-11-26 15:54:04 +00:00
}
} ( i )
}
wait . Wait ( )
2017-05-15 14:52:50 +00:00
select {
case manageJobErr = <- errCh :
if manageJobErr != nil {
break
}
default :
}
2015-11-26 15:54:04 +00:00
// update status values accordingly
failed += active
active = 0
2016-04-18 15:44:19 +00:00
job . Status . Conditions = append ( job . Status . Conditions , newCondition ( batch . JobFailed , "DeadlineExceeded" , "Job was active longer than specified deadline" ) )
2016-11-18 20:50:17 +00:00
jm . recorder . Event ( & job , v1 . EventTypeNormal , "DeadlineExceeded" , "Job was active longer than specified deadline" )
2015-11-26 15:54:04 +00:00
} else {
2016-07-01 15:42:34 +00:00
if jobNeedsSync && job . DeletionTimestamp == nil {
2017-05-15 14:52:50 +00:00
active , manageJobErr = jm . manageJob ( activePods , succeeded , & job )
2015-11-26 15:54:04 +00:00
}
completions := succeeded
2015-12-14 23:26:16 +00:00
complete := false
if job . Spec . Completions == nil {
// This type of job is complete when any pod exits with success.
// Each pod is capable of
// determining whether or not the entire Job is done. Subsequent pods are
// not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete.
if succeeded > 0 && active == 0 {
complete = true
}
} else {
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
if completions >= * job . Spec . Completions {
complete = true
if active > 0 {
2016-11-18 20:50:17 +00:00
jm . recorder . Event ( & job , v1 . EventTypeWarning , "TooManyActivePods" , "Too many active pods running after completion count reached" )
2015-12-14 23:26:16 +00:00
}
if completions > * job . Spec . Completions {
2016-11-18 20:50:17 +00:00
jm . recorder . Event ( & job , v1 . EventTypeWarning , "TooManySucceededPods" , "Too many succeeded pods running after completion count reached" )
2015-12-14 23:26:16 +00:00
}
}
}
if complete {
2016-04-18 15:44:19 +00:00
job . Status . Conditions = append ( job . Status . Conditions , newCondition ( batch . JobComplete , "" , "" ) )
2016-12-03 18:57:26 +00:00
now := metav1 . Now ( )
2015-11-26 15:54:04 +00:00
job . Status . CompletionTime = & now
}
2015-08-27 12:19:35 +00:00
}
// no need to update the job if the status hasn't changed since last time
2015-11-26 15:54:04 +00:00
if job . Status . Active != active || job . Status . Succeeded != succeeded || job . Status . Failed != failed || len ( job . Status . Conditions ) != conditions {
2015-08-27 12:19:35 +00:00
job . Status . Active = active
2015-10-08 17:33:39 +00:00
job . Status . Succeeded = succeeded
job . Status . Failed = failed
2015-08-27 12:19:35 +00:00
if err := jm . updateHandler ( & job ) ; err != nil {
2016-09-20 03:16:40 +00:00
return err
2015-08-27 12:19:35 +00:00
}
}
2017-05-15 14:52:50 +00:00
return manageJobErr
2015-08-27 12:19:35 +00:00
}
2015-11-26 15:54:04 +00:00
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
2016-04-18 15:44:19 +00:00
func pastActiveDeadline ( job * batch . Job ) bool {
2015-11-26 15:54:04 +00:00
if job . Spec . ActiveDeadlineSeconds == nil || job . Status . StartTime == nil {
return false
}
2016-12-03 18:57:26 +00:00
now := metav1 . Now ( )
2015-11-26 15:54:04 +00:00
start := job . Status . StartTime . Time
duration := now . Time . Sub ( start )
allowedDuration := time . Duration ( * job . Spec . ActiveDeadlineSeconds ) * time . Second
return duration >= allowedDuration
}
2016-04-18 15:44:19 +00:00
func newCondition ( conditionType batch . JobConditionType , reason , message string ) batch . JobCondition {
return batch . JobCondition {
2015-11-26 15:54:04 +00:00
Type : conditionType ,
2016-11-18 20:50:17 +00:00
Status : v1 . ConditionTrue ,
2016-12-03 18:57:26 +00:00
LastProbeTime : metav1 . Now ( ) ,
LastTransitionTime : metav1 . Now ( ) ,
2015-11-26 15:54:04 +00:00
Reason : reason ,
Message : message ,
2015-08-27 12:19:35 +00:00
}
}
2015-11-26 15:54:04 +00:00
// getStatus returns no of succeeded and failed pods running a job
2016-11-18 20:50:17 +00:00
func getStatus ( pods [ ] * v1 . Pod ) ( succeeded , failed int32 ) {
succeeded = int32 ( filterPods ( pods , v1 . PodSucceeded ) )
failed = int32 ( filterPods ( pods , v1 . PodFailed ) )
2015-08-27 12:19:35 +00:00
return
}
2015-11-26 15:54:04 +00:00
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
2016-08-17 14:16:01 +00:00
// Does NOT modify <activePods>.
2017-05-15 14:52:50 +00:00
func ( jm * JobController ) manageJob ( activePods [ ] * v1 . Pod , succeeded int32 , job * batch . Job ) ( int32 , error ) {
2015-09-18 19:02:59 +00:00
var activeLock sync . Mutex
2016-04-27 04:35:14 +00:00
active := int32 ( len ( activePods ) )
2015-08-27 12:19:35 +00:00
parallelism := * job . Spec . Parallelism
jobKey , err := controller . KeyFunc ( job )
if err != nil {
2016-09-20 03:16:40 +00:00
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for job %#v: %v" , job , err ) )
2017-05-15 14:52:50 +00:00
return 0 , nil
2015-08-27 12:19:35 +00:00
}
2017-05-15 14:52:50 +00:00
var errCh chan error
2015-08-27 12:19:35 +00:00
if active > parallelism {
diff := active - parallelism
2017-05-15 14:52:50 +00:00
errCh = make ( chan error , diff )
2016-04-27 04:35:14 +00:00
jm . expectations . ExpectDeletions ( jobKey , int ( diff ) )
2015-09-18 19:09:49 +00:00
glog . V ( 4 ) . Infof ( "Too many pods running job %q, need %d, deleting %d" , jobKey , parallelism , diff )
2015-08-27 12:19:35 +00:00
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort . Sort ( controller . ActivePods ( activePods ) )
active -= diff
wait := sync . WaitGroup { }
2016-04-27 04:35:14 +00:00
wait . Add ( int ( diff ) )
for i := int32 ( 0 ) ; i < diff ; i ++ {
go func ( ix int32 ) {
2015-08-27 12:19:35 +00:00
defer wait . Done ( )
2015-11-27 16:36:39 +00:00
if err := jm . podControl . DeletePod ( job . Namespace , activePods [ ix ] . Name , job ) ; err != nil {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleError ( err )
2015-08-27 12:19:35 +00:00
// Decrement the expected number of deletes because the informer won't observe this deletion
2017-05-15 14:52:50 +00:00
glog . V ( 2 ) . Infof ( "Failed to delete %v, decrementing expectations for job %q/%q" , activePods [ ix ] . Name , job . Namespace , job . Name )
2015-08-27 12:19:35 +00:00
jm . expectations . DeletionObserved ( jobKey )
2015-09-18 19:02:59 +00:00
activeLock . Lock ( )
2015-08-27 12:19:35 +00:00
active ++
2015-09-18 19:02:59 +00:00
activeLock . Unlock ( )
2017-05-15 14:52:50 +00:00
errCh <- err
2015-08-27 12:19:35 +00:00
}
} ( i )
}
wait . Wait ( )
} else if active < parallelism {
2016-04-27 04:35:14 +00:00
wantActive := int32 ( 0 )
2015-12-14 23:26:16 +00:00
if job . Spec . Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = * job . Spec . Completions - succeeded
if wantActive > parallelism {
wantActive = parallelism
}
}
diff := wantActive - active
if diff < 0 {
2016-09-20 03:16:40 +00:00
utilruntime . HandleError ( fmt . Errorf ( "More active than wanted: job %q, want %d, have %d" , jobKey , wantActive , active ) )
2015-12-14 23:26:16 +00:00
diff = 0
2015-08-27 12:19:35 +00:00
}
2016-04-27 04:35:14 +00:00
jm . expectations . ExpectCreations ( jobKey , int ( diff ) )
2017-05-15 14:52:50 +00:00
errCh = make ( chan error , diff )
2015-12-14 23:26:16 +00:00
glog . V ( 4 ) . Infof ( "Too few pods running job %q, need %d, creating %d" , jobKey , wantActive , diff )
2015-08-27 12:19:35 +00:00
active += diff
wait := sync . WaitGroup { }
2016-04-27 04:35:14 +00:00
wait . Add ( int ( diff ) )
for i := int32 ( 0 ) ; i < diff ; i ++ {
2015-08-27 12:19:35 +00:00
go func ( ) {
defer wait . Done ( )
2017-08-02 10:05:37 +00:00
err := jm . podControl . CreatePodsWithControllerRef ( job . Namespace , & job . Spec . Template , job , metav1 . NewControllerRef ( job , controllerKind ) )
2017-08-01 01:07:46 +00:00
if err != nil && errors . IsTimeout ( err ) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
2016-01-15 07:32:10 +00:00
defer utilruntime . HandleError ( err )
2015-08-27 12:19:35 +00:00
// Decrement the expected number of creates because the informer won't observe this pod
2017-05-15 14:52:50 +00:00
glog . V ( 2 ) . Infof ( "Failed creation, decrementing expectations for job %q/%q" , job . Namespace , job . Name )
2015-08-27 12:19:35 +00:00
jm . expectations . CreationObserved ( jobKey )
2015-09-18 19:02:59 +00:00
activeLock . Lock ( )
2015-08-27 12:19:35 +00:00
active --
2015-09-18 19:02:59 +00:00
activeLock . Unlock ( )
2017-05-15 14:52:50 +00:00
errCh <- err
2015-08-27 12:19:35 +00:00
}
} ( )
}
wait . Wait ( )
}
2017-05-15 14:52:50 +00:00
select {
case err := <- errCh :
// all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
if err != nil {
return active , err
}
default :
}
return active , nil
2015-08-27 12:19:35 +00:00
}
2016-04-18 15:44:19 +00:00
func ( jm * JobController ) updateJobStatus ( job * batch . Job ) error {
_ , err := jm . kubeClient . Batch ( ) . Jobs ( job . Namespace ) . UpdateStatus ( job )
2015-08-27 12:19:35 +00:00
return err
}
// filterPods returns pods based on their phase.
2016-11-18 20:50:17 +00:00
func filterPods ( pods [ ] * v1 . Pod , phase v1 . PodPhase ) int {
2015-08-27 12:19:35 +00:00
result := 0
for i := range pods {
if phase == pods [ i ] . Status . Phase {
result ++
}
}
return result
}
2015-09-18 19:16:38 +00:00
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
2016-04-18 15:44:19 +00:00
type byCreationTimestamp [ ] batch . Job
2015-09-18 19:16:38 +00:00
func ( o byCreationTimestamp ) Len ( ) int { return len ( o ) }
func ( o byCreationTimestamp ) Swap ( i , j int ) { o [ i ] , o [ j ] = o [ j ] , o [ i ] }
func ( o byCreationTimestamp ) Less ( i , j int ) bool {
if o [ i ] . CreationTimestamp . Equal ( o [ j ] . CreationTimestamp ) {
return o [ i ] . Name < o [ j ] . Name
}
return o [ i ] . CreationTimestamp . Before ( o [ j ] . CreationTimestamp )
}