From f0c33d65b6fea84d791059d96c93a15e4b3de4ec Mon Sep 17 00:00:00 2001 From: deads2k Date: Thu, 14 Apr 2016 14:00:52 -0400 Subject: [PATCH] start sharing the pod cache and list/watch --- cmd/integration/integration.go | 9 ++- .../app/controllermanager.go | 15 ++++- .../controllermanager/controllermanager.go | 4 +- .../endpoint/endpoints_controller.go | 52 +++++++++------- .../endpoint/endpoints_controller_test.go | 24 ++++---- pkg/controller/framework/informers/factory.go | 44 ++++++++++++++ .../replication/replication_controller.go | 60 +++++++++++-------- .../replication_controller_test.go | 41 ++++++------- test/integration/framework/master_utils.go | 2 +- 9 files changed, 169 insertions(+), 82 deletions(-) create mode 100644 pkg/controller/framework/informers/factory.go diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 767fe7fbf9..0d9c95601e 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -44,6 +44,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/framework/informers" nodecontroller "k8s.io/kubernetes/pkg/controller/node" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" @@ -194,14 +195,18 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string eventBroadcaster.StartRecordingToSink(cl.Events("")) scheduler.New(schedulerConfig).Run() + podInformer := informers.CreateSharedPodInformer(clientset, controller.NoResyncPeriodFunc()) + // ensure the service endpoints are sync'd several times within the window that the integration tests wait - go endpointcontroller.NewEndpointController(clientset, controller.NoResyncPeriodFunc). + go endpointcontroller.NewEndpointController(podInformer, clientset). Run(3, wait.NeverStop) // TODO: Write an integration test for the replication controllers watch. - go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). + go replicationcontroller.NewReplicationManager(podInformer, clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). Run(3, wait.NeverStop) + go podInformer.Run(wait.NeverStop) + nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5 * time.Second) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index c8641e2f37..eac87640ca 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -30,6 +30,7 @@ import ( "net/http" "net/http/pprof" "os" + "reflect" "strconv" "time" @@ -48,6 +49,8 @@ import ( "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/gc" "k8s.io/kubernetes/pkg/controller/job" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" @@ -189,11 +192,16 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { - go endpointcontroller.NewEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller")), ResyncPeriod(s)). + podInformer := informers.CreateSharedPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) + informers := map[reflect.Type]framework.SharedInformer{} + informers[reflect.TypeOf(&api.Pod{})] = podInformer + + go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). Run(s.ConcurrentEndpointSyncs, wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go replicationcontroller.NewReplicationManager( + podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), ResyncPeriod(s), replicationcontroller.BurstReplicas, @@ -410,6 +418,11 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig ).Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + // run the shared informers + for _, informer := range informers { + go informer.Run(wait.NeverStop) + } + select {} } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index af9aa27711..05c09f90c9 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -136,7 +136,7 @@ func (s *CMServer) Run(_ []string) error { endpoints := s.createEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))) go endpoints.Run(s.ConcurrentEndpointSyncs, wait.NeverStop) - go replicationcontroller.NewReplicationManager(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, s.LookupCacheSizeForRC). + go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, s.LookupCacheSizeForRC). Run(s.ConcurrentRCSyncs, wait.NeverStop) if s.TerminatedPodGCThreshold > 0 { @@ -346,7 +346,7 @@ func (s *CMServer) createEndpointController(client *clientset.Clientset) kmendpo return kmendpoint.NewEndpointController(client) } glog.V(2).Infof("Creating podIP:containerPort endpoint controller") - stockEndpointController := endpointcontroller.NewEndpointController(client, s.resyncPeriod) + stockEndpointController := endpointcontroller.NewEndpointControllerFromClient(client, s.resyncPeriod) return stockEndpointController } diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index e19c0ce91a..26fdad8669 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -33,6 +33,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -58,7 +59,7 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { +func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController { e := &EndpointController{ client: client, queue: workqueue.New(), @@ -85,24 +86,23 @@ func NewEndpointController(client *clientset.Clientset, resyncPeriod controller. }, ) - e.podStore.Store, e.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return e.client.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return e.client.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: e.addPod, - UpdateFunc: e.updatePod, - DeleteFunc: e.deletePod, - }, - ) - e.podStoreSynced = e.podController.HasSynced + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: e.addPod, + UpdateFunc: e.updatePod, + DeleteFunc: e.deletePod, + }) + e.podStore.Store = podInformer.GetStore() + e.podController = podInformer.GetController() + e.podStoreSynced = podInformer.HasSynced + + return e +} + +// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. +func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { + podInformer := informers.CreateSharedPodInformer(client, resyncPeriod()) + e := NewEndpointController(podInformer, client) + e.internalPodInformer = podInformer return e } @@ -114,6 +114,13 @@ type EndpointController struct { serviceStore cache.StoreToServiceLister podStore cache.StoreToPodLister + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewEndpointController(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedInformer + // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much // more often than services with few pods; it also would cause a @@ -124,7 +131,7 @@ type EndpointController struct { // Since we join two objects, we'll watch both of them with // controllers. serviceController *framework.Controller - podController *framework.Controller + podController framework.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool @@ -144,6 +151,11 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { time.Sleep(5 * time.Minute) // give time for our cache to fill e.checkLeftoverEndpoints() }() + + if e.internalPodInformer != nil { + go e.internalPodInformer.Run(stopCh) + } + <-stopCh e.queue.ShutDown() } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 6f5d43fd81..06998739b3 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -108,7 +108,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -142,7 +142,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady endpoints.checkLeftoverEndpoints() @@ -172,7 +172,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) @@ -216,7 +216,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ @@ -256,7 +256,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ @@ -295,7 +295,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 0, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ @@ -334,7 +334,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ @@ -377,7 +377,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ @@ -419,7 +419,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ @@ -440,7 +440,7 @@ func TestSyncEndpointsItems(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 3, 2, 0) addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found! @@ -484,7 +484,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} @@ -546,7 +546,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go new file mode 100644 index 0000000000..1b4cce4adb --- /dev/null +++ b/pkg/controller/framework/informers/factory.go @@ -0,0 +1,44 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// CreateSharedPodInformer returns a SharedInformer that lists and watches all pods +func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedInformer { + sharedInformer := framework.NewSharedInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().Pods(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Pods(api.NamespaceAll).Watch(options) + }, + }, + &api.Pod{}, resyncPeriod) + + return sharedInformer +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 92c4145a08..42b72a9ef2 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -66,6 +67,13 @@ type ReplicationManager struct { kubeClient clientset.Interface podControl controller.PodControlInterface + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewReplicationManager(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedInformer + // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int @@ -82,7 +90,7 @@ type ReplicationManager struct { // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods - podController *framework.Controller + podController framework.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool @@ -93,8 +101,7 @@ type ReplicationManager struct { queue *workqueue.Type } -// NewReplicationManager creates a new ReplicationManager. -func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { +func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) @@ -167,31 +174,31 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll }, ) - rm.podStore.Store, rm.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return rm.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return rm.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: rm.addPod, - // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill - // the most frequent pod update is status, and the associated rc will only list from local storage, so - // it should be ok. - UpdateFunc: rm.updatePod, - DeleteFunc: rm.deletePod, - }, - ) + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: rm.addPod, + // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill + // the most frequent pod update is status, and the associated rc will only list from local storage, so + // it should be ok. + UpdateFunc: rm.updatePod, + DeleteFunc: rm.deletePod, + }) + rm.podStore.Store = podInformer.GetStore() + rm.podController = podInformer.GetController() rm.syncHandler = rm.syncReplicationController rm.podStoreSynced = rm.podController.HasSynced rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm + +} + +// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. +func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { + podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) + rm.internalPodInformer = podInformer + + return rm } // SetEventRecorder replaces the event recorder used by the replication manager @@ -211,6 +218,11 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(rm.worker, time.Second, stopCh) } + + if rm.internalPodInformer != nil { + go rm.internalPodInformer.Run(stopCh) + } + <-stopCh glog.Infof("Shutting down RC Manager") rm.queue.ShutDown() @@ -478,7 +490,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) - glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rc.Namespace, rc.Name) + glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) rm.expectations.DeletionObserved(rcKey, podKey) utilruntime.HandleError(err) } diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 7904029440..8ac2299fde 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -138,7 +138,7 @@ type serverResponse struct { func TestSyncReplicationControllerDoesNothing(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // 2 running pods, a controller with 2 replicas, sync is a no-op @@ -154,7 +154,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { func TestSyncReplicationControllerDeletes(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -170,7 +170,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -202,7 +202,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // A controller with 2 replicas and no pods in the store, 2 creates expected @@ -225,7 +225,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Steady state for the replication controller, no Status.Replicas updates expected @@ -267,7 +267,7 @@ func TestControllerUpdateReplicas(t *testing.T) { // TODO: Uncomment when fix #19254 // defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Insufficient number of pods in the system, and Status.Replicas is wrong; @@ -313,7 +313,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { // defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -359,7 +359,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager := NewReplicationManager(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady testCases := []struct { inRCs []*api.ReplicationController @@ -422,7 +422,7 @@ func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() c := &fake.Clientset{} c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady var testControllerSpec api.ReplicationController @@ -465,7 +465,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() c := &fake.Clientset{} c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Put one rc and one pod into the controller's stores @@ -492,6 +492,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.podController.Run(stopCh) + go manager.internalPodInformer.Run(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) pods := newPodList(nil, 1, api.PodRunning, testControllerSpec, "pod") @@ -507,7 +508,7 @@ func TestWatchPods(t *testing.T) { } func TestUpdatePods(t *testing.T) { - manager := NewReplicationManager(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady received := make(chan string) @@ -567,7 +568,7 @@ func TestControllerUpdateRequeue(t *testing.T) { // defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady rc := newReplicationController(1) @@ -649,7 +650,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, burstReplicas, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, burstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -799,7 +800,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool { func TestRCSyncExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -823,7 +824,7 @@ func TestRCSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady rc := newReplicationController(1) @@ -866,7 +867,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { func TestRCManagerNotReady(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0) manager.podControl = &fakePodControl manager.podStoreSynced = func() bool { return false } @@ -904,7 +905,7 @@ func TestOverlappingRCs(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) for i := 0; i < 5; i++ { - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady // Create 10 rcs, shuffled them randomly and insert them into the rc manager's store @@ -933,7 +934,7 @@ func TestOverlappingRCs(t *testing.T) { func TestDeletionTimestamp(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady controllerSpec := newReplicationController(1) @@ -1020,7 +1021,7 @@ func TestDeletionTimestamp(t *testing.T) { func BenchmarkGetPodControllerMultiNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) const nsNum = 1000 @@ -1066,7 +1067,7 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) { func BenchmarkGetPodControllerSingleNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) const rcNum = 1000 const replicaNum = 3 diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 67d988d8d5..0761288208 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -108,7 +108,7 @@ func NewMasterComponents(c *Config) *MasterComponents { restClient := client.NewOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: c.QPS, Burst: c.Burst}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: c.QPS, Burst: c.Burst}) rcStopCh := make(chan struct{}) - controllerManager := replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, c.Burst, 4096) + controllerManager := replicationcontroller.NewReplicationManagerFromClient(clientset, controller.NoResyncPeriodFunc, c.Burst, 4096) // TODO: Support events once we can cleanly shutdown an event recorder. controllerManager.SetEventRecorder(&record.FakeRecorder{})