From ff4a5e2068e68bf2aa7def677109e522456d341b Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Tue, 10 May 2016 15:58:49 -0400 Subject: [PATCH] Allow a replication manager to be created that does not record events --- .../replication/replication_controller.go | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 8fc0f16fda..5f404a618c 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -102,11 +102,18 @@ type ReplicationManager struct { queue *workqueue.Type } +// NewReplicationManager creates a replication manager func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + return newReplicationManagerInternal( + eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}), + podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) +} +// newReplicationManagerInternal configures a replication manager with the specified event recorder +func newReplicationManagerInternal(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) } @@ -115,7 +122,7 @@ func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, - Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}), + Recorder: eventRecorder, }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), @@ -195,7 +202,14 @@ func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient rm.podStoreSynced = rm.podController.HasSynced rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm +} +// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests. +func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { + podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + rm := newReplicationManagerInternal(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) + rm.internalPodInformer = podInformer + return rm } // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. @@ -413,18 +427,23 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rm *ReplicationManager) worker() { + workFunc := func() bool { + key, quit := rm.queue.Get() + if quit { + return true + } + defer rm.queue.Done(key) + err := rm.syncHandler(key.(string)) + if err != nil { + glog.Errorf("Error syncing replication controller: %v", err) + } + return false + } for { - func() { - key, quit := rm.queue.Get() - if quit { - return - } - defer rm.queue.Done(key) - err := rm.syncHandler(key.(string)) - if err != nil { - glog.Errorf("Error syncing replication controller: %v", err) - } - }() + if quit := workFunc(); quit { + glog.Infof("replication controller worker shutting down") + return + } } }