Use sharedPodInformer in ReplicaSet controller

pull/6/head
Wojciech Tyczynski 2016-07-20 12:25:26 +02:00
parent 976ca09d71
commit 2794cf538c
4 changed files with 60 additions and 44 deletions

View File

@ -366,7 +366,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)).
go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)).
Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}

View File

@ -268,7 +268,7 @@ func (s *CMServer) Run(_ []string) error {
if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), s.resyncPeriod, replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)).
go replicaset.NewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), s.resyncPeriod, replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)).
Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
}
}

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
@ -68,6 +69,13 @@ type ReplicaSetController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewReplicationManager(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedIndexInformer
// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
@ -84,7 +92,7 @@ type ReplicaSetController struct {
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all pods
podController *framework.Controller
podController framework.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
@ -96,11 +104,18 @@ type ReplicaSetController struct {
}
// NewReplicaSetController creates a new ReplicaSetController.
func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
func NewReplicaSetController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return newReplicaSetController(
eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
}
// newReplicaSetController configures a replica set controller with the specified event recorder
func newReplicaSetController(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
@ -109,7 +124,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
Recorder: eventRecorder,
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
@ -173,27 +188,16 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
},
)
rsc.podStore.Indexer, rsc.podController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rsc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rsc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podStore.Indexer = podInformer.GetIndexer()
rsc.podController = podInformer.GetController()
rsc.syncHandler = rsc.syncReplicaSet
rsc.podStoreSynced = rsc.podController.HasSynced
@ -201,6 +205,14 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
return rsc
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
rsc.internalPodInformer = podInformer
return rsc
}
// SetEventRecorder replaces the event recorder used by the ReplicaSetController
// with the given recorder. Only used for testing.
func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
@ -217,6 +229,9 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
if rsc.internalPodInformer != nil {
go rsc.internalPodInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down ReplicaSet Controller")
rsc.queue.ShutDown()

View File

@ -139,7 +139,7 @@ type serverResponse struct {
func TestSyncReplicaSetDoesNothing(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op
@ -156,7 +156,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
func TestSyncReplicaSetDeletes(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -173,7 +173,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
func TestDeleteFinalStateUnknown(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -206,7 +206,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
func TestSyncReplicaSetCreates(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected
@ -229,7 +229,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Steady state for the ReplicaSet, no Status.Replicas updates expected
@ -272,7 +272,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Insufficient number of pods in the system, and Status.Replicas is wrong;
@ -318,7 +318,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -366,7 +366,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
}
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicaSetController(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
testCases := []struct {
inRSs []*extensions.ReplicaSet
@ -434,7 +434,7 @@ func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
client := &fake.Clientset{}
client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
var testRSSpec extensions.ReplicaSet
@ -477,7 +477,7 @@ func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &fake.Clientset{}
client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Put one ReplicaSet and one pod into the controller's stores
@ -505,6 +505,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go manager.internalPodInformer.Run(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec, "pod")
@ -520,7 +521,7 @@ func TestWatchPods(t *testing.T) {
}
func TestUpdatePods(t *testing.T) {
manager := NewReplicaSetController(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
received := make(chan string)
@ -597,7 +598,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
labelMap := map[string]string{"foo": "bar"}
@ -678,7 +679,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, burstReplicas, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, burstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -830,7 +831,7 @@ func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool {
func TestRSSyncExpectations(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -855,7 +856,7 @@ func TestRSSyncExpectations(t *testing.T) {
func TestDeleteControllerAndExpectations(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
rs := newReplicaSet(1, map[string]string{"foo": "bar"})
@ -898,7 +899,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
func TestRSManagerNotReady(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
@ -937,7 +938,7 @@ func TestOverlappingRSs(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
for i := 0; i < 5; i++ {
manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
// Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store
@ -967,7 +968,7 @@ func TestOverlappingRSs(t *testing.T) {
func TestDeletionTimestamp(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
labelMap := map[string]string{"foo": "bar"}
manager := NewReplicaSetController(c, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
rs := newReplicaSet(1, labelMap)