mirror of https://github.com/k3s-io/k3s
switch job controller to shared informer
parent
8b707016f9
commit
8c4e3af1a3
|
@ -330,7 +330,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
|||
|
||||
if containsResource(resources, "jobs") {
|
||||
glog.Infof("Starting job controller")
|
||||
go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), ResyncPeriod(s)).
|
||||
go job.NewJobController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))).
|
||||
Run(s.ConcurrentJobSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
|
|
@ -254,7 +254,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||
|
||||
if containsResource(resources, "jobs") {
|
||||
glog.Infof("Starting job controller")
|
||||
go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod).
|
||||
go job.NewJobControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod).
|
||||
Run(s.ConcurrentJobSyncs, wait.NeverStop)
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/controller/framework/informers"
|
||||
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
|
@ -44,6 +45,13 @@ type JobController struct {
|
|||
kubeClient clientset.Interface
|
||||
podControl controller.PodControlInterface
|
||||
|
||||
// internalPodInformer is used to hold a personal informer. If we're using
|
||||
// a normal shared informer, then the informer will be started for us. If
|
||||
// we have a personal informer, we must start it ourselves. If you start
|
||||
// the controller using NewJobController(passing SharedInformer), this
|
||||
// will be null
|
||||
internalPodInformer framework.SharedInformer
|
||||
|
||||
// To allow injection of updateJobStatus for testing.
|
||||
updateHandler func(job *extensions.Job) error
|
||||
syncHandler func(jobKey string) error
|
||||
|
@ -61,8 +69,6 @@ type JobController struct {
|
|||
|
||||
// A store of pods, populated by the podController
|
||||
podStore cache.StoreToPodLister
|
||||
// Watches changes to all pods
|
||||
podController *framework.Controller
|
||||
|
||||
// Jobs that need to be updated
|
||||
queue *workqueue.Type
|
||||
|
@ -70,7 +76,7 @@ type JobController struct {
|
|||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
|
||||
func NewJobController(podInformer framework.SharedInformer, kubeClient clientset.Interface) *JobController {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
||||
|
@ -110,27 +116,24 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re
|
|||
},
|
||||
)
|
||||
|
||||
jm.podStore.Store, jm.podController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return jm.kubeClient.Core().Pods(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return jm.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.Pod{},
|
||||
resyncPeriod(),
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: jm.addPod,
|
||||
UpdateFunc: jm.updatePod,
|
||||
DeleteFunc: jm.deletePod,
|
||||
},
|
||||
)
|
||||
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: jm.addPod,
|
||||
UpdateFunc: jm.updatePod,
|
||||
DeleteFunc: jm.deletePod,
|
||||
})
|
||||
jm.podStore.Store = podInformer.GetStore()
|
||||
jm.podStoreSynced = podInformer.HasSynced
|
||||
|
||||
jm.updateHandler = jm.updateJobStatus
|
||||
jm.syncHandler = jm.syncJob
|
||||
jm.podStoreSynced = jm.podController.HasSynced
|
||||
return jm
|
||||
}
|
||||
|
||||
func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
|
||||
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
|
||||
jm := NewJobController(podInformer, kubeClient)
|
||||
jm.internalPodInformer = podInformer
|
||||
|
||||
return jm
|
||||
}
|
||||
|
||||
|
@ -138,10 +141,14 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re
|
|||
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
go jm.jobController.Run(stopCh)
|
||||
go jm.podController.Run(stopCh)
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(jm.worker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
if jm.internalPodInformer != nil {
|
||||
go jm.internalPodInformer.Run(stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down Job Manager")
|
||||
jm.queue.ShutDown()
|
||||
|
|
|
@ -18,8 +18,8 @@ package job
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
|
@ -32,7 +32,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/util/rand"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
|
@ -208,7 +207,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||
for name, tc := range testCases {
|
||||
// job manager setup
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
@ -303,7 +302,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||
for name, tc := range testCases {
|
||||
// job manager setup
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
@ -373,7 +372,7 @@ func getCondition(job *extensions.Job, condition extensions.JobConditionType) bo
|
|||
|
||||
func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
@ -407,7 +406,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
|||
|
||||
func TestSyncJobComplete(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
@ -432,7 +431,7 @@ func TestSyncJobComplete(t *testing.T) {
|
|||
|
||||
func TestSyncJobDeleted(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
@ -452,7 +451,7 @@ func TestSyncJobDeleted(t *testing.T) {
|
|||
|
||||
func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
@ -473,7 +472,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
|||
|
||||
func TestJobPodLookup(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
testCases := []struct {
|
||||
job *extensions.Job
|
||||
|
@ -563,7 +562,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
|
|||
// and checking expectations.
|
||||
func TestSyncJobExpectations(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
@ -599,8 +598,8 @@ type FakeWatcher struct {
|
|||
func TestWatchJobs(t *testing.T) {
|
||||
clientset := fake.NewSimpleClientset()
|
||||
fakeWatch := watch.NewFake()
|
||||
clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
||||
var testJob extensions.Job
|
||||
|
@ -614,9 +613,12 @@ func TestWatchJobs(t *testing.T) {
|
|||
if !exists || err != nil {
|
||||
t.Errorf("Expected to find job under key %v", key)
|
||||
}
|
||||
job := *obj.(*extensions.Job)
|
||||
if !api.Semantic.DeepDerivative(job, testJob) {
|
||||
t.Errorf("Expected %#v, but got %#v", testJob, job)
|
||||
job, ok := obj.(*extensions.Job)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
|
||||
}
|
||||
if !api.Semantic.DeepDerivative(*job, testJob) {
|
||||
t.Errorf("Expected %#v, but got %#v", testJob, *job)
|
||||
}
|
||||
close(received)
|
||||
return nil
|
||||
|
@ -625,8 +627,7 @@ func TestWatchJobs(t *testing.T) {
|
|||
// and make sure it hits the sync method.
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go manager.jobController.Run(stopCh)
|
||||
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
|
||||
go manager.Run(1, stopCh)
|
||||
|
||||
// We're sending new job to see if it reaches syncHandler.
|
||||
testJob.Name = "foo"
|
||||
|
@ -661,27 +662,35 @@ func TestIsJobFinished(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWatchPods(t *testing.T) {
|
||||
clientset := fake.NewSimpleClientset()
|
||||
testJob := newJob(2, 2)
|
||||
clientset := fake.NewSimpleClientset(testJob)
|
||||
fakeWatch := watch.NewFake()
|
||||
clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
manager := NewJobController(clientset, controller.NoResyncPeriodFunc)
|
||||
clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
||||
// Put one job and one pod into the store
|
||||
testJob := newJob(2, 2)
|
||||
manager.jobStore.Store.Add(testJob)
|
||||
received := make(chan struct{})
|
||||
// The pod update sent through the fakeWatcher should figure out the managing job and
|
||||
// send it into the syncHandler.
|
||||
manager.syncHandler = func(key string) error {
|
||||
|
||||
obj, exists, err := manager.jobStore.Store.GetByKey(key)
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Expected to find job under key %v", key)
|
||||
close(received)
|
||||
return nil
|
||||
}
|
||||
job, ok := obj.(*extensions.Job)
|
||||
if !ok {
|
||||
t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
|
||||
close(received)
|
||||
return nil
|
||||
}
|
||||
job := obj.(*extensions.Job)
|
||||
if !api.Semantic.DeepDerivative(job, testJob) {
|
||||
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
|
||||
close(received)
|
||||
return nil
|
||||
}
|
||||
close(received)
|
||||
return nil
|
||||
|
@ -690,8 +699,7 @@ func TestWatchPods(t *testing.T) {
|
|||
// and make sure it hits the sync method for the right job.
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go manager.podController.Run(stopCh)
|
||||
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
|
||||
go manager.Run(1, stopCh)
|
||||
|
||||
pods := newPodList(1, api.PodRunning, testJob)
|
||||
testPod := pods[0]
|
||||
|
|
Loading…
Reference in New Issue