Job controller logic

pull/6/head
Maciej Szulik 2015-08-27 14:19:35 +02:00
parent bdec7da47b
commit 8cefa2ee55
16 changed files with 1268 additions and 67 deletions

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/pkg/controller/node"
"k8s.io/kubernetes/pkg/controller/persistentvolume"
@ -66,6 +67,7 @@ type CMServer struct {
ConcurrentEndpointSyncs int
ConcurrentRCSyncs int
ConcurrentDSCSyncs int
ConcurrentJobSyncs int
ServiceSyncPeriod time.Duration
NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration
@ -104,6 +106,7 @@ func NewCMServer() *CMServer {
ConcurrentEndpointSyncs: 5,
ConcurrentRCSyncs: 5,
ConcurrentDSCSyncs: 2,
ConcurrentJobSyncs: 5,
ServiceSyncPeriod: 5 * time.Minute,
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
@ -238,6 +241,9 @@ func (s *CMServer) Run(_ []string) error {
go daemon.NewDaemonSetsController(kubeClient).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
go job.NewJobManager(kubeClient).
Run(s.ConcurrentJobSyncs, util.NeverStop)
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err)

View File

@ -139,6 +139,13 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
j.RollingUpdate = &rollingUpdate
}
},
func(j *experimental.JobSpec, c fuzz.Continue) {
c.FuzzNoCustom(j) // fuzz self without calling this function again
completions := c.Rand.Int()
parallelism := c.Rand.Int()
j.Completions = &completions
j.Parallelism = &parallelism
},
func(j *api.List, c fuzz.Continue) {
c.FuzzNoCustom(j) // fuzz self without calling this function again
// TODO: uncomment when round trip starts from a versioned object

View File

@ -75,5 +75,15 @@ func addDefaultingFuncs() {
*obj.Spec.UniqueLabelKey = "deployment.kubernetes.io/podTemplateHash"
}
},
func(obj *Job) {
if obj.Spec.Completions == nil {
completions := 1
obj.Spec.Completions = &completions
}
if obj.Spec.Parallelism == nil {
parallelism := 2
obj.Spec.Parallelism = &parallelism
}
},
)
}

View File

@ -189,6 +189,43 @@ func TestSetDefaultDeployment(t *testing.T) {
}
}
func TestSetDefaultJob(t *testing.T) {
expected := &Job{
Spec: JobSpec{
Completions: newInt(1),
Parallelism: newInt(2),
},
}
tests := []*Job{
{},
{
Spec: JobSpec{
Completions: newInt(1),
},
},
{
Spec: JobSpec{
Parallelism: newInt(2),
},
},
}
for _, original := range tests {
obj2 := roundTrip(t, runtime.Object(original))
got, ok := obj2.(*Job)
if !ok {
t.Errorf("unexpected object: %v", got)
t.FailNow()
}
if *got.Spec.Completions != *expected.Spec.Completions {
t.Errorf("got different completions than expected: %d %d", *got.Spec.Completions, *expected.Spec.Completions)
}
if *got.Spec.Parallelism != *expected.Spec.Parallelism {
t.Errorf("got different parallelism than expected: %d %d", *got.Spec.Parallelism, *expected.Spec.Parallelism)
}
}
}
func roundTrip(t *testing.T, obj runtime.Object) runtime.Object {
data, err := v1.Codec.Encode(obj)
if err != nil {

View File

@ -344,3 +344,55 @@ func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.E
err = fmt.Errorf("Could not find endpoints for service: %v", svc.Name)
return
}
// StoreToJobLister gives a store List and Exists methods. The store must contain only Jobs.
type StoreToJobLister struct {
Store
}
// Exists checks if the given job exists in the store.
func (s *StoreToJobLister) Exists(job *experimental.Job) (bool, error) {
_, exists, err := s.Store.Get(job)
if err != nil {
return false, err
}
return exists, nil
}
// StoreToJobLister lists all jobs in the store.
func (s *StoreToJobLister) List() (jobs []experimental.Job, err error) {
for _, c := range s.Store.List() {
jobs = append(jobs, *(c.(*experimental.Job)))
}
return jobs, nil
}
// GetPodControllers returns a list of jobs managing a pod. Returns an error only if no matching jobs are found.
func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []experimental.Job, err error) {
var selector labels.Selector
var job experimental.Job
if len(pod.Labels) == 0 {
err = fmt.Errorf("No jobs found for pod %v because it has no labels", pod.Name)
return
}
for _, m := range s.Store.List() {
job = *m.(*experimental.Job)
if job.Namespace != pod.Namespace {
continue
}
labelSet := labels.Set(job.Spec.Selector)
selector = labels.Set(job.Spec.Selector).AsSelector()
// Job with a nil or empty selector match nothing
if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
jobs = append(jobs, job)
}
if len(jobs) == 0 {
err = fmt.Errorf("Could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

View File

@ -0,0 +1,86 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testclient
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/experimental"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/watch"
)
// FakeJobs implements JobInterface. Meant to be embedded into a struct to get a default
// implementation. This makes faking out just the method you want to test easier.
type FakeJobs struct {
Fake *FakeExperimental
Namespace string
}
func (c *FakeJobs) Get(name string) (*experimental.Job, error) {
obj, err := c.Fake.Invokes(NewGetAction("jobs", c.Namespace, name), &experimental.Job{})
if obj == nil {
return nil, err
}
return obj.(*experimental.Job), err
}
func (c *FakeJobs) List(label labels.Selector, fields fields.Selector) (*experimental.JobList, error) {
obj, err := c.Fake.Invokes(NewListAction("jobs", c.Namespace, label, nil), &experimental.JobList{})
if obj == nil {
return nil, err
}
return obj.(*experimental.JobList), err
}
func (c *FakeJobs) Create(job *experimental.Job) (*experimental.Job, error) {
obj, err := c.Fake.Invokes(NewCreateAction("jobs", c.Namespace, job), job)
if obj == nil {
return nil, err
}
return obj.(*experimental.Job), err
}
func (c *FakeJobs) Update(job *experimental.Job) (*experimental.Job, error) {
obj, err := c.Fake.Invokes(NewUpdateAction("jobs", c.Namespace, job), job)
if obj == nil {
return nil, err
}
return obj.(*experimental.Job), err
}
func (c *FakeJobs) Delete(name string, options *api.DeleteOptions) error {
_, err := c.Fake.Invokes(NewDeleteAction("jobs", c.Namespace, name), &experimental.Job{})
return err
}
func (c *FakeJobs) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return c.Fake.InvokesWatch(NewWatchAction("jobs", c.Namespace, label, field, resourceVersion))
}
func (c *FakeJobs) UpdateStatus(job *experimental.Job) (result *experimental.Job, err error) {
obj, err := c.Fake.Invokes(NewUpdateSubresourceAction("jobs", "status", c.Namespace, job), job)
if obj == nil {
return nil, err
}
return obj.(*experimental.Job), err
}

View File

@ -263,5 +263,5 @@ func (c *FakeExperimental) Scales(namespace string) client.ScaleInterface {
}
func (c *FakeExperimental) Jobs(namespace string) client.JobInterface {
panic("unimplemented")
return &FakeJobs{Fake: c, Namespace: namespace}
}

View File

@ -26,7 +26,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apis/experimental"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -213,10 +212,10 @@ func NewControllerExpectations() *ControllerExpectations {
// PodControlInterface is an interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
// CreateReplica creates new replicated pods according to the spec.
CreateReplica(namespace string, controller *api.ReplicationController) error
// CreateReplicaOnNode creates a new pod according to the spec on the specified node.
CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error
// CreatePods creates new pods according to the spec.
CreatePods(namespace string, template *api.PodTemplateSpec, object runtime.Object) error
// CreatePodsOnNode creates a new pod accorting to the spec on the specified node.
CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string) error
}
@ -227,7 +226,7 @@ type RealPodControl struct {
Recorder record.EventRecorder
}
func getReplicaLabelSet(template *api.PodTemplateSpec) labels.Set {
func getPodsLabelSet(template *api.PodTemplateSpec) labels.Set {
desiredLabels := make(labels.Set)
for k, v := range template.Labels {
desiredLabels[k] = v
@ -235,7 +234,7 @@ func getReplicaLabelSet(template *api.PodTemplateSpec) labels.Set {
return desiredLabels
}
func getReplicaAnnotationSet(template *api.PodTemplateSpec, object runtime.Object) (labels.Set, error) {
func getPodsAnnotationSet(template *api.PodTemplateSpec, object runtime.Object) (labels.Set, error) {
desiredAnnotations := make(labels.Set)
for k, v := range template.Annotations {
desiredAnnotations[k] = v
@ -254,7 +253,7 @@ func getReplicaAnnotationSet(template *api.PodTemplateSpec, object runtime.Objec
return desiredAnnotations, nil
}
func getReplicaPrefix(controllerName string) string {
func getPodsPrefix(controllerName string) string {
// use the dash (if the name isn't too long) to make the pod name a bit prettier
prefix := fmt.Sprintf("%s-", controllerName)
if ok, _ := validation.ValidatePodName(prefix, true); !ok {
@ -263,44 +262,25 @@ func getReplicaPrefix(controllerName string) string {
return prefix
}
func (r RealPodControl) CreateReplica(namespace string, controller *api.ReplicationController) error {
desiredLabels := getReplicaLabelSet(controller.Spec.Template)
desiredAnnotations, err := getReplicaAnnotationSet(controller.Spec.Template, controller)
if err != nil {
return err
}
prefix := getReplicaPrefix(controller.Name)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: desiredLabels,
Annotations: desiredAnnotations,
GenerateName: prefix,
},
}
if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.Spec); err != nil {
return fmt.Errorf("unable to convert pod template: %v", err)
}
if labels.Set(pod.Labels).AsSelector().Empty() {
return fmt.Errorf("unable to create pod replica, no labels")
}
if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil {
r.Recorder.Eventf(controller, "FailedCreate", "Error creating: %v", err)
return fmt.Errorf("unable to create pod replica: %v", err)
} else {
glog.V(4).Infof("Controller %v created pod %v", controller.Name, newPod.Name)
r.Recorder.Eventf(controller, "SuccessfulCreate", "Created pod: %v", newPod.Name)
}
return nil
func (r RealPodControl) CreatePods(namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
return r.createPods("", namespace, template, object)
}
func (r RealPodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error {
desiredLabels := getReplicaLabelSet(ds.Spec.Template)
desiredAnnotations, err := getReplicaAnnotationSet(ds.Spec.Template, ds)
func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
return r.createPods(nodeName, namespace, template, object)
}
func (r RealPodControl) createPods(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
desiredLabels := getPodsLabelSet(template)
desiredAnnotations, err := getPodsAnnotationSet(template, object)
if err != nil {
return err
}
prefix := getReplicaPrefix(ds.Name)
meta, err := api.ObjectMetaFor(object)
if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err)
}
prefix := getPodsPrefix(meta.Name)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -309,22 +289,22 @@ func (r RealPodControl) CreateReplicaOnNode(namespace string, ds *experimental.D
GenerateName: prefix,
},
}
if err := api.Scheme.Convert(&ds.Spec.Template.Spec, &pod.Spec); err != nil {
if len(nodeName) != 0 {
pod.Spec.NodeName = nodeName
}
if err := api.Scheme.Convert(&template.Spec, &pod.Spec); err != nil {
return fmt.Errorf("unable to convert pod template: %v", err)
}
// if a pod does not have labels then it cannot be controlled by any controller
if labels.Set(pod.Labels).AsSelector().Empty() {
return fmt.Errorf("unable to create pod replica, no labels")
return fmt.Errorf("unable to create pods, no labels")
}
pod.Spec.NodeName = nodeName
if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil {
r.Recorder.Eventf(ds, "FailedCreate", "Error creating: %v", err)
return fmt.Errorf("unable to create pod replica: %v", err)
r.Recorder.Eventf(object, "FailedCreate", "Error creating: %v", err)
return fmt.Errorf("unable to create pods: %v", err)
} else {
glog.V(4).Infof("Controller %v created pod %v", ds.Name, newPod.Name)
r.Recorder.Eventf(ds, "SuccessfulCreate", "Created pod: %v", newPod.Name)
glog.V(4).Infof("Controller %v created pod %v", meta.Name, newPod.Name)
r.Recorder.Eventf(object, "SuccessfulCreate", "Created pod: %v", newPod.Name)
}
return nil
}

View File

@ -180,7 +180,7 @@ func TestControllerExpectations(t *testing.T) {
}
}
func TestCreateReplica(t *testing.T) {
func TestCreatePods(t *testing.T) {
ns := api.NamespaceDefault
body := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}})
fakeHandler := util.FakeHandler{
@ -199,7 +199,7 @@ func TestCreateReplica(t *testing.T) {
controllerSpec := newReplicationController(1)
// Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template
podControl.CreateReplica(ns, controllerSpec)
podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec)
expectedPod := api.Pod{
ObjectMeta: api.ObjectMeta{

View File

@ -371,7 +371,7 @@ func (dsc *DaemonSetsController) manage(ds *experimental.DaemonSet) {
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v", ds.Name, nodesNeedingDaemonPods)
for i := range nodesNeedingDaemonPods {
if err := dsc.podControl.CreateReplicaOnNode(ds.Namespace, ds, nodesNeedingDaemonPods[i]); err != nil {
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[i], ds.Namespace, ds.Spec.Template, ds); err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
util.HandleError(err)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
)
@ -38,7 +39,7 @@ var (
)
type FakePodControl struct {
daemonSet []experimental.DaemonSet
podSpec []api.PodTemplateSpec
deletePodName []string
lock sync.Mutex
err error
@ -48,17 +49,17 @@ func init() {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
}
func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error {
func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error {
return nil
}
func (f *FakePodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error {
func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, spec *api.PodTemplateSpec, object runtime.Object) error {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.err
}
f.daemonSet = append(f.daemonSet, *ds)
f.podSpec = append(f.podSpec, *spec)
return nil
}
@ -75,7 +76,7 @@ func (f *FakePodControl) clear() {
f.lock.Lock()
defer f.lock.Unlock()
f.deletePodName = []string{}
f.daemonSet = []experimental.DaemonSet{}
f.podSpec = []api.PodTemplateSpec{}
}
func newDaemonSet(name string) *experimental.DaemonSet {
@ -164,8 +165,8 @@ func newTestController() (*DaemonSetsController, *FakePodControl) {
}
func validateSyncDaemonSets(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) {
if len(fakePodControl.daemonSet) != expectedCreates {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.daemonSet))
if len(fakePodControl.podSpec) != expectedCreates {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.podSpec))
}
if len(fakePodControl.deletePodName) != expectedDeletes {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName))

18
pkg/controller/job/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package job contains logic for watching and synchronizing jobs.
package job

View File

@ -0,0 +1,452 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"reflect"
"sort"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/experimental"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
type JobManager struct {
kubeClient client.Interface
podControl controller.PodControlInterface
// To allow injection of updateJob for testing.
updateHandler func(job *experimental.Job) error
syncHandler func(jobKey string) error
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
// A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface
// A store of job, populated by the jobController
jobStore cache.StoreToJobLister
// Watches changes to all jobs
jobController *framework.Controller
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all pods
podController *framework.Controller
// Jobs that need to be updated
queue *workqueue.Type
}
func NewJobManager(kubeClient client.Interface) *JobManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
jm := &JobManager{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job"}),
},
expectations: controller.NewControllerExpectations(),
queue: workqueue.New(),
}
jm.jobStore.Store, jm.jobController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return jm.kubeClient.Experimental().Jobs(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return jm.kubeClient.Experimental().Jobs(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&experimental.Job{},
replicationcontroller.FullControllerResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
UpdateFunc: func(old, cur interface{}) {
job := cur.(*experimental.Job)
for _, c := range job.Status.Conditions {
if c.Type == experimental.JobComplete && c.Status == api.ConditionTrue {
return
}
}
jm.enqueueController(cur)
},
DeleteFunc: jm.enqueueController,
},
)
jm.podStore.Store, jm.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return jm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return jm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Pod{},
replicationcontroller.PodRelistPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
},
)
jm.updateHandler = jm.updateJob
jm.syncHandler = jm.syncJob
jm.podStoreSynced = jm.podController.HasSynced
return jm
}
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go jm.jobController.Run(stopCh)
go jm.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(jm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down Job Manager")
jm.queue.ShutDown()
}
// getPodJob returns the job managing the given pod.
func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job {
jobs, err := jm.jobStore.GetPodJobs(pod)
if err != nil {
glog.V(4).Infof("No jobs found for pod %v, job manager will avoid syncing", pod.Name)
return nil
}
// TODO: add sorting and rethink the overlaping controllers, internally and with RCs
return &jobs[0]
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
func (jm *JobManager) addPod(obj interface{}) {
pod := obj.(*api.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
jm.deletePod(pod)
return
}
if job := jm.getPodJob(pod); job != nil {
jobKey, err := controller.KeyFunc(job)
if err != nil {
glog.Errorf("Couldn't get key for job %#v: %v", job, err)
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job)
}
}
// When a pod is updated, figure out what job/s manage it and wake them up.
// If the labels of the pod have changed we need to awaken both the old
// and new job. old and cur must be *api.Pod types.
func (jm *JobManager) updatePod(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
// A periodic relist will send update events for all known pods.
return
}
curPod := cur.(*api.Pod)
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an job to create more pods asap, not wait
// until the kubelet actually deletes the pod.
jm.deletePod(curPod)
return
}
if job := jm.getPodJob(curPod); job != nil {
jm.enqueueController(job)
}
oldPod := old.(*api.Pod)
// Only need to get the old job if the labels changed.
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
// If the old and new job are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldJob := jm.getPodJob(oldPod); oldJob != nil {
jm.enqueueController(oldJob)
}
}
}
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (jm *JobManager) deletePod(obj interface{}) {
pod, ok := obj.(*api.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new job will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a job recreates a pod", obj, controller.ExpectationsTimeout)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before job recreates a pod", obj, controller.ExpectationsTimeout)
return
}
}
if job := jm.getPodJob(pod); job != nil {
jobKey, err := controller.KeyFunc(job)
if err != nil {
glog.Errorf("Couldn't get key for job %#v: %v", job, err)
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
}
}
// obj could be an *experimental.Job, or a DeletionFinalStateUnknown marker item.
func (jm *JobManager) enqueueController(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
jm.queue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobManager) worker() {
for {
func() {
key, quit := jm.queue.Get()
if quit {
return
}
defer jm.queue.Done(key)
err := jm.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing job: %v", err)
}
}()
}
}
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *JobManager) syncJob(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := jm.jobStore.Store.GetByKey(key)
if !exists {
glog.Infof("Job has been deleted %v", key)
jm.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
glog.Infof("Unable to retrieve job %v from store: %v", key, err)
jm.queue.Add(key)
return err
}
job := *obj.(*experimental.Job)
if !jm.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing job %v", job.Name)
jm.enqueueController(&job)
return nil
}
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
jobKey, err := controller.KeyFunc(&job)
if err != nil {
glog.Errorf("Couldn't get key for job %#v: %v", job, err)
return err
}
jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey)
podList, err := jm.podStore.Pods(job.Namespace).List(labels.Set(job.Spec.Selector).AsSelector())
if err != nil {
glog.Errorf("Error getting pods for job %q: %v", key, err)
jm.queue.Add(key)
return err
}
activePods := controller.FilterActivePods(podList.Items)
active := len(activePods)
successful, unsuccessful := getStatus(jobKey, job.Spec.Template.Spec.RestartPolicy, podList.Items)
if jobNeedsSync {
active = jm.manageJob(activePods, successful, unsuccessful, &job)
}
completions := successful
if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever {
completions += unsuccessful
}
if completions == *job.Spec.Completions {
job.Status.Conditions = append(job.Status.Conditions, newCondition())
}
// no need to update the job if the status hasn't changed since last time
if job.Status.Active != active || job.Status.Successful != successful || job.Status.Unsuccessful != unsuccessful {
job.Status.Active = active
job.Status.Successful = successful
job.Status.Unsuccessful = unsuccessful
if err := jm.updateHandler(&job); err != nil {
glog.V(2).Infof("Failed to update job %v, requeuing", job.Name)
jm.enqueueController(&job)
}
}
return nil
}
func newCondition() experimental.JobCondition {
return experimental.JobCondition{
Type: experimental.JobComplete,
Status: api.ConditionTrue,
LastProbeTime: util.Now(),
LastTransitionTime: util.Now(),
}
}
func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) (successful, unsuccessful int) {
successful = filterPods(pods, api.PodSucceeded)
if restartPolicy == api.RestartPolicyNever {
unsuccessful = filterPods(pods, api.PodFailed)
}
return
}
func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int {
active := len(activePods)
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
glog.Errorf("Couldn't get key for job %#v: %v", job, err)
return 0
}
if active > parallelism {
diff := active - parallelism
jm.expectations.ExpectDeletions(jobKey, diff)
glog.V(2).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(activePods))
active -= diff
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q", jobKey)
jm.expectations.DeletionObserved(jobKey)
util.HandleError(err)
active++
}
}(i)
}
wait.Wait()
} else if active < parallelism {
// how many executions are left to run
diff := *job.Spec.Completions - successful
// for RestartPolicyNever we need to count unsuccessful pods as well
if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever {
diff -= unsuccessful
}
// limit to parallelism and count active pods as well
if diff > parallelism {
diff = parallelism
}
diff -= active
jm.expectations.ExpectCreations(jobKey, diff)
glog.V(2).Infof("Too few pods running job %q, need %d, creating %d", jobKey, parallelism, diff)
active += diff
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
if err := jm.podControl.CreatePods(job.Namespace, job.Spec.Template, job); err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q", jobKey)
jm.expectations.CreationObserved(jobKey)
util.HandleError(err)
active--
}
}()
}
wait.Wait()
}
return active
}
func (jm *JobManager) updateJob(job *experimental.Job) error {
_, err := jm.kubeClient.Experimental().Jobs(job.Namespace).Update(job)
return err
}
// filterPods returns pods based on their phase.
func filterPods(pods []api.Pod, phase api.PodPhase) int {
result := 0
for i := range pods {
if phase == pods[i].Status.Phase {
result++
}
}
return result
}

View File

@ -0,0 +1,553 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/apis/experimental"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)
// Give each test that starts a background controller up to 1/2 a second.
// Since we need to start up a goroutine to test watch, this routine needs
// to get cpu before the test can complete. If the test is starved of cpu,
// the watch test will take up to 1/2 a second before timing out.
const controllerTimeout = 500 * time.Millisecond
var alwaysReady = func() bool { return true }
type FakePodControl struct {
podSpec []api.PodTemplateSpec
deletePodName []string
lock sync.Mutex
err error
}
func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.err
}
f.podSpec = append(f.podSpec, *spec)
return nil
}
func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
return nil
}
func (f *FakePodControl) DeletePod(namespace string, podName string) error {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.err
}
f.deletePodName = append(f.deletePodName, podName)
return nil
}
func (f *FakePodControl) clear() {
f.lock.Lock()
defer f.lock.Unlock()
f.deletePodName = []string{}
f.podSpec = []api.PodTemplateSpec{}
}
func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *experimental.Job {
return &experimental.Job{
ObjectMeta: api.ObjectMeta{
Name: "foobar",
Namespace: api.NamespaceDefault,
},
Spec: experimental.JobSpec{
Parallelism: &parallelism,
Completions: &completions,
Selector: map[string]string{"foo": "bar"},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.PodSpec{
RestartPolicy: restartPolicy,
Containers: []api.Container{
{Image: "foo/bar"},
},
},
},
},
}
}
func getKey(job *experimental.Job, t *testing.T) string {
if key, err := controller.KeyFunc(job); err != nil {
t.Errorf("Unexpected error getting key for job %v: %v", job.Name, err)
return ""
} else {
return key
}
}
// create count pods with the given phase for the given job
func newPodList(count int, status api.PodPhase, job *experimental.Job) []api.Pod {
pods := []api.Pod{}
for i := 0; i < count; i++ {
newPod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("pod-%v", util.Now().UnixNano()),
Labels: job.Spec.Selector,
Namespace: job.Namespace,
},
Status: api.PodStatus{Phase: status},
}
pods = append(pods, newPod)
}
return pods
}
func TestControllerSyncJob(t *testing.T) {
testCases := map[string]struct {
// job setup
parallelism int
completions int
restartPolicy api.RestartPolicy
// pod setup
podControllerError error
activePods int
successfulPods int
unsuccessfulPods int
// expectations
expectedCreations int
expectedDeletions int
expectedActive int
expectedSuccessful int
expectedUnsuccessful int
expectedComplete bool
}{
"job start": {
2, 5, api.RestartPolicyOnFailure,
nil, 0, 0, 0,
2, 0, 2, 0, 0, false,
},
"correct # of pods": {
2, 5, api.RestartPolicyOnFailure,
nil, 2, 0, 0,
0, 0, 2, 0, 0, false,
},
"too few active pods": {
2, 5, api.RestartPolicyOnFailure,
nil, 1, 1, 0,
1, 0, 2, 1, 0, false,
},
"too few active pods, with controller error": {
2, 5, api.RestartPolicyOnFailure,
fmt.Errorf("Fake error"), 1, 1, 0,
0, 0, 1, 1, 0, false,
},
"too many active pods": {
2, 5, api.RestartPolicyOnFailure,
nil, 3, 0, 0,
0, 1, 2, 0, 0, false,
},
"too many active pods, with controller error": {
2, 5, api.RestartPolicyOnFailure,
fmt.Errorf("Fake error"), 3, 0, 0,
0, 0, 3, 0, 0, false,
},
"failed pod and OnFailure restart policy": {
2, 5, api.RestartPolicyOnFailure,
nil, 1, 1, 1,
1, 0, 2, 1, 0, false,
},
"failed pod and Never restart policy": {
2, 5, api.RestartPolicyNever,
nil, 1, 1, 1,
1, 0, 2, 1, 1, false,
},
"job finish and OnFailure restart policy": {
2, 5, api.RestartPolicyOnFailure,
nil, 0, 5, 0,
0, 0, 0, 5, 0, true,
},
"job finish and Never restart policy": {
2, 5, api.RestartPolicyNever,
nil, 0, 2, 3,
0, 0, 0, 2, 3, true,
},
"more active pods than completions": {
2, 5, api.RestartPolicyOnFailure,
nil, 10, 0, 0,
0, 8, 2, 0, 0, false,
},
"status change": {
2, 5, api.RestartPolicyOnFailure,
nil, 2, 2, 0,
0, 0, 2, 2, 0, false,
},
}
for name, tc := range testCases {
// job manager setup
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
manager := NewJobManager(client)
fakePodControl := FakePodControl{err: tc.podControllerError}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
var actual *experimental.Job
manager.updateHandler = func(job *experimental.Job) error {
actual = job
return nil
}
// job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.restartPolicy)
manager.jobStore.Store.Add(job)
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
manager.podStore.Store.Add(&pod)
}
for _, pod := range newPodList(tc.successfulPods, api.PodSucceeded, job) {
manager.podStore.Store.Add(&pod)
}
for _, pod := range newPodList(tc.unsuccessfulPods, api.PodFailed, job) {
manager.podStore.Store.Add(&pod)
}
// run
err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("%s: unexpected error when syncing jobs %v", err)
}
// validate created/deleted pods
if len(fakePodControl.podSpec) != tc.expectedCreations {
t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.podSpec))
}
if len(fakePodControl.deletePodName) != tc.expectedDeletions {
t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.deletePodName))
}
// validate status
if actual.Status.Active != tc.expectedActive {
t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
}
if actual.Status.Successful != tc.expectedSuccessful {
t.Errorf("%s: unexpected number of successful pods. Expected %d, saw %d\n", name, tc.expectedSuccessful, actual.Status.Successful)
}
if actual.Status.Unsuccessful != tc.expectedUnsuccessful {
t.Errorf("%s: unexpected number of unsuccessful pods. Expected %d, saw %d\n", name, tc.expectedUnsuccessful, actual.Status.Unsuccessful)
}
// validate conditions
if tc.expectedComplete {
completed := false
for _, v := range actual.Status.Conditions {
if v.Type == experimental.JobComplete && v.Status == api.ConditionTrue {
completed = true
break
}
}
if !completed {
t.Errorf("%s: expected completion condition. Got %v", name, actual.Status.Conditions)
}
}
}
}
func TestSyncJobDeleted(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
manager := NewJobManager(client)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.updateHandler = func(job *experimental.Job) error { return nil }
job := newJob(2, 2, api.RestartPolicyOnFailure)
err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err)
}
if len(fakePodControl.podSpec) != 0 {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.podSpec))
}
if len(fakePodControl.deletePodName) != 0 {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.deletePodName))
}
}
func TestSyncJobUpdateRequeue(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
manager := NewJobManager(client)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.updateHandler = func(job *experimental.Job) error { return fmt.Errorf("Fake error") }
job := newJob(2, 2, api.RestartPolicyOnFailure)
manager.jobStore.Store.Add(job)
err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("Unxpected error when syncing jobs, got %v", err)
}
ch := make(chan interface{})
go func() {
item, _ := manager.queue.Get()
ch <- item
}()
select {
case key := <-ch:
expectedKey := getKey(job, t)
if key != expectedKey {
t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key)
}
case <-time.After(controllerTimeout):
manager.queue.ShutDown()
t.Errorf("Expected to find a job in the queue, found none.")
}
}
func TestJobPodLookup(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
manager := NewJobManager(client)
manager.podStoreSynced = alwaysReady
testCases := []struct {
job *experimental.Job
pod *api.Pod
expectedName string
}{
// pods without labels don't match any job
{
job: &experimental.Job{
ObjectMeta: api.ObjectMeta{Name: "basic"},
},
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll},
},
expectedName: "",
},
// matching labels, different namespace
{
job: &experimental.Job{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: experimental.JobSpec{
Selector: map[string]string{"foo": "bar"},
},
},
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo2",
Namespace: "ns",
Labels: map[string]string{"foo": "bar"},
},
},
expectedName: "",
},
// matching ns and labels returns
{
job: &experimental.Job{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
Spec: experimental.JobSpec{
Selector: map[string]string{"foo": "bar"},
},
},
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo3",
Namespace: "ns",
Labels: map[string]string{"foo": "bar"},
},
},
expectedName: "bar",
},
}
for _, tc := range testCases {
manager.jobStore.Add(tc.job)
if job := manager.getPodJob(tc.pod); job != nil {
if tc.expectedName != job.Name {
t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
}
} else if tc.expectedName != "" {
t.Errorf("Expected a job %v pod %v, found none", tc.expectedName, tc.pod.Name)
}
}
}
type FakeJobExpectations struct {
*controller.ControllerExpectations
satisfied bool
expSatisfied func()
}
func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
fe.expSatisfied()
return fe.satisfied
}
// TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods
// and checking expectations.
func TestSyncJobExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
manager := NewJobManager(client)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.updateHandler = func(job *experimental.Job) error { return nil }
job := newJob(2, 2, api.RestartPolicyOnFailure)
manager.jobStore.Store.Add(job)
pods := newPodList(2, api.PodPending, job)
manager.podStore.Store.Add(&pods[0])
manager.expectations = FakeJobExpectations{
controller.NewControllerExpectations(), true, func() {
// If we check active pods before checking expectataions, the job
// will create a new replica because it doesn't see this pod, but
// has fulfilled its expectations.
manager.podStore.Store.Add(&pods[1])
},
}
manager.syncJob(getKey(job, t))
if len(fakePodControl.podSpec) != 0 {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.podSpec))
}
if len(fakePodControl.deletePodName) != 0 {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.deletePodName))
}
}
type FakeWatcher struct {
w *watch.FakeWatcher
*testclient.Fake
}
func TestWatchJobs(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{}
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobManager(client)
manager.podStoreSynced = alwaysReady
var testJob experimental.Job
received := make(chan string)
// The update sent through the fakeWatcher should make its way into the workqueue,
// and eventually into the syncHandler.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.jobStore.Store.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find job under key %v", key)
}
job := *obj.(*experimental.Job)
if !api.Semantic.DeepDerivative(job, testJob) {
t.Errorf("Expected %#v, but got %#v", testJob, job)
}
received <- key
return nil
}
// Start only the job watcher and the workqueue, send a watch event,
// and make sure it hits the sync method.
stopCh := make(chan struct{})
defer close(stopCh)
go manager.jobController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
// We're sending new job to see if it reaches syncHandler.
testJob.Name = "foo"
fakeWatch.Add(&testJob)
select {
case <-received:
case <-time.After(controllerTimeout):
t.Errorf("Expected 1 call but got 0")
}
// We're sending fake finished job, to see if it reaches syncHandler - it should not,
// since we're filtering out finished jobs.
testJobv2 := experimental.Job{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: experimental.JobStatus{
Conditions: []experimental.JobCondition{{
Type: experimental.JobComplete,
Status: api.ConditionTrue,
LastProbeTime: util.Now(),
LastTransitionTime: util.Now(),
}},
},
}
fakeWatch.Modify(&testJobv2)
select {
case <-received:
t.Errorf("Expected 0 call but got 1")
case <-time.After(controllerTimeout):
}
}
func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{}
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobManager(client)
manager.podStoreSynced = alwaysReady
// Put one job and one pod into the store
testJob := newJob(2, 2, api.RestartPolicyOnFailure)
manager.jobStore.Store.Add(testJob)
received := make(chan string)
// The pod update sent through the fakeWatcher should figure out the managing job and
// send it into the syncHandler.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.jobStore.Store.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find job under key %v", key)
}
job := obj.(*experimental.Job)
if !api.Semantic.DeepDerivative(job, testJob) {
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
}
close(received)
return nil
}
// Start only the pod watcher and the workqueue, send a watch event,
// and make sure it hits the sync method for the right job.
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(1, api.PodRunning, testJob)
testPod := pods[0]
testPod.Status.Phase = api.PodFailed
fakeWatch.Add(&testPod)
select {
case <-received:
case <-time.After(controllerTimeout):
t.Errorf("Expected 1 call but got 0")
}
}

View File

@ -350,7 +350,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
if err := rm.podControl.CreateReplica(rc.Namespace, rc); err != nil {
if err := rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc); err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.CreationObserved(rcKey)

View File

@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/apis/experimental"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
@ -42,7 +41,7 @@ import (
)
type FakePodControl struct {
controllerSpec []api.ReplicationController
controllerSpec []api.PodTemplateSpec
deletePodName []string
lock sync.Mutex
err error
@ -60,7 +59,7 @@ func init() {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
}
func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error {
func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
@ -70,7 +69,7 @@ func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationCo
return nil
}
func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *experimental.DaemonSet, nodeName string) error {
func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
return nil
}
@ -87,7 +86,7 @@ func (f *FakePodControl) clear() {
f.lock.Lock()
defer f.lock.Unlock()
f.deletePodName = []string{}
f.controllerSpec = []api.ReplicationController{}
f.controllerSpec = []api.PodTemplateSpec{}
}
func getKey(rc *api.ReplicationController, t *testing.T) string {