Remove references to pkg/controller/informers

pull/6/head
Andy Goldstein 2017-02-24 09:52:43 -05:00
parent bd912f50ba
commit 4cd38b863f
9 changed files with 54 additions and 70 deletions

View File

@ -36,13 +36,12 @@ import (
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
newinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
nodecontroller "k8s.io/kubernetes/pkg/controller/cloud"
"k8s.io/kubernetes/pkg/controller/informers"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
"k8s.io/kubernetes/pkg/util/configz"
@ -196,9 +195,7 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
return rootClientBuilder.ClientOrDie(serviceAccountName)
}
versionedClient := client("shared-informers")
// TODO replace sharedInformers with newSharedInformers
sharedInformers := informers.NewSharedInformerFactory(versionedClient, nil, resyncPeriod(s)())
newSharedInformers := newinformers.NewSharedInformerFactory(versionedClient, resyncPeriod(s)())
sharedInformers := informers.NewSharedInformerFactory(versionedClient, resyncPeriod(s)())
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
if err != nil {
@ -207,7 +204,7 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
// Start the CloudNodeController
nodeController, err := nodecontroller.NewCloudNodeController(
newSharedInformers.Core().V1().Nodes(),
sharedInformers.Core().V1().Nodes(),
client("cloud-node-controller"), cloud,
s.NodeMonitorPeriod.Duration)
if err != nil {
@ -220,8 +217,8 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
serviceController, err := servicecontroller.New(
cloud,
client("service-controller"),
newSharedInformers.Core().V1().Services(),
newSharedInformers.Core().V1().Nodes(),
sharedInformers.Core().V1().Services(),
sharedInformers.Core().V1().Nodes(),
s.ClusterName,
)
if err != nil {
@ -236,7 +233,7 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else {
routeController := routecontroller.New(routes, client("route-controller"), newSharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
routeController := routecontroller.New(routes, client("route-controller"), sharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
routeController.Run(stop, s.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
@ -258,9 +255,7 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
glog.Fatalf("Failed to get api versions from server: %v", err)
}
// TODO replace sharedInformers with newSharedInformers
sharedInformers.Start(stop)
newSharedInformers.Start(stop)
select {}
}

View File

@ -30,9 +30,9 @@ func startStatefulSetController(ctx ControllerContext) (bool, error) {
return false, nil
}
go statefulset.NewStatefulSetController(
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.NewInformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Apps().V1beta1().StatefulSets(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(1, ctx.Stop)
return true, nil

View File

@ -44,7 +44,7 @@ func startHPAController(ctx ControllerContext) (bool, error) {
hpaClient.Extensions(),
hpaClient.Autoscaling(),
replicaCalc,
ctx.NewInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration,
).Run(ctx.Stop)
return true, nil

View File

@ -33,8 +33,8 @@ func startJobController(ctx ControllerContext) (bool, error) {
return false, nil
}
go job.NewJobController(
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Batch().V1().Jobs(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(ctx.Options.ConcurrentJobSyncs), ctx.Stop)
return true, nil

View File

@ -41,7 +41,7 @@ func startCSRController(ctx ControllerContext) (bool, error) {
certController, err := certcontroller.NewCertificateController(
c,
ctx.NewInformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
ctx.InformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
signer,
certcontroller.NewGroupApprover(ctx.Options.ApproveAllKubeletCSRsForGroup),
)

View File

@ -46,12 +46,11 @@ import (
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
newinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
@ -216,13 +215,8 @@ type ControllerContext struct {
ClientBuilder controller.ControllerClientBuilder
// InformerFactory gives access to informers for the controller.
// TODO delete this instance once the conversion to generated informers is complete.
InformerFactory informers.SharedInformerFactory
// NewInformerFactory gives access to informers for the controller.
// TODO rename this to InformerFactory once the conversion to generated informers is complete.
NewInformerFactory newinformers.SharedInformerFactory
// Options provides access to init options for a given controller
Options options.CMServer
@ -343,9 +337,7 @@ func getAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma
func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
// TODO replace sharedInformers with newSharedInformers
sharedInformers := informers.NewSharedInformerFactory(versionedClient, nil, ResyncPeriod(s)())
newSharedInformers := newinformers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
// always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
if len(s.ServiceAccountKeyFile) > 0 {
@ -385,7 +377,6 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
NewInformerFactory: newSharedInformers,
Options: *s,
AvailableResources: availableResources,
Stop: stop,
@ -426,9 +417,9 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
}
nodeController, err := nodecontroller.NewNodeController(
newSharedInformers.Core().V1().Pods(),
newSharedInformers.Core().V1().Nodes(),
newSharedInformers.Extensions().V1beta1().DaemonSets(),
sharedInformers.Core().V1().Pods(),
sharedInformers.Core().V1().Nodes(),
sharedInformers.Extensions().V1beta1().DaemonSets(),
cloud,
clientBuilder.ClientOrDie("node-controller"),
s.PodEvictionTimeout.Duration,
@ -455,8 +446,8 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
serviceController, err := servicecontroller.New(
cloud,
clientBuilder.ClientOrDie("service-controller"),
newSharedInformers.Core().V1().Services(),
newSharedInformers.Core().V1().Nodes(),
sharedInformers.Core().V1().Services(),
sharedInformers.Core().V1().Nodes(),
s.ClusterName,
)
if err != nil {
@ -472,7 +463,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
} else if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else {
routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), newSharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), sharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
go routeController.Run(stop, s.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
@ -491,9 +482,9 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
Cloud: cloud,
ClusterName: s.ClusterName,
VolumeInformer: newSharedInformers.Core().V1().PersistentVolumes(),
ClaimInformer: newSharedInformers.Core().V1().PersistentVolumeClaims(),
ClassInformer: newSharedInformers.Storage().V1beta1().StorageClasses(),
VolumeInformer: sharedInformers.Core().V1().PersistentVolumes(),
ClaimInformer: sharedInformers.Core().V1().PersistentVolumeClaims(),
ClassInformer: sharedInformers.Storage().V1beta1().StorageClasses(),
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
}
volumeController := persistentvolumecontroller.NewController(params)
@ -507,10 +498,10 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController(
clientBuilder.ClientOrDie("attachdetach-controller"),
newSharedInformers.Core().V1().Pods(),
newSharedInformers.Core().V1().Nodes(),
newSharedInformers.Core().V1().PersistentVolumeClaims(),
newSharedInformers.Core().V1().PersistentVolumes(),
sharedInformers.Core().V1().Pods(),
sharedInformers.Core().V1().Nodes(),
sharedInformers.Core().V1().PersistentVolumeClaims(),
sharedInformers.Core().V1().PersistentVolumes(),
cloud,
ProbeAttachableVolumePlugins(s.VolumeConfiguration),
s.DisableAttachDetachReconcilerSync,
@ -522,9 +513,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
go attachDetachController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
// TODO replace sharedInformers with newSharedInformers
sharedInformers.Start(stop)
newSharedInformers.Start(stop)
select {}
}

View File

@ -46,8 +46,8 @@ import (
func startEndpointController(ctx ControllerContext) (bool, error) {
go endpointcontroller.NewEndpointController(
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
return true, nil
@ -55,8 +55,8 @@ func startEndpointController(ctx ControllerContext) (bool, error) {
func startReplicationController(ctx ControllerContext) (bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Core().V1().ReplicationControllers(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().ReplicationControllers(),
ctx.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRC),
@ -68,7 +68,7 @@ func startReplicationController(ctx ControllerContext) (bool, error) {
func startPodGCController(ctx ControllerContext) (bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Pods(),
int(ctx.Options.TerminatedPodGCThreshold),
).Run(ctx.Stop)
return true, nil
@ -76,7 +76,7 @@ func startPodGCController(ctx ControllerContext) (bool, error) {
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.NewInformerFactory)
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
groupKindsToReplenish := []schema.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
@ -87,10 +87,10 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) {
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResourceQuotaInformer: ctx.NewInformerFactory.Core().V1().ResourceQuotas(),
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.NewInformerFactory),
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory),
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
GroupKindsToReplenish: groupKindsToReplenish,
}
@ -131,7 +131,7 @@ func startNamespaceController(ctx ControllerContext) (bool, error) {
namespaceKubeClient,
namespaceClientPool,
discoverResourcesFn,
ctx.NewInformerFactory.Core().V1().Namespaces(),
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.Options.NamespaceSyncPeriod.Duration,
v1.FinalizerKubernetes,
)
@ -143,8 +143,8 @@ func startNamespaceController(ctx ControllerContext) (bool, error) {
func startServiceAccountController(ctx ControllerContext) (bool, error) {
go serviceaccountcontroller.NewServiceAccountsController(
ctx.NewInformerFactory.Core().V1().ServiceAccounts(),
ctx.NewInformerFactory.Core().V1().Namespaces(),
ctx.InformerFactory.Core().V1().ServiceAccounts(),
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ClientBuilder.ClientOrDie("service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
).Run(1, ctx.Stop)
@ -153,7 +153,7 @@ func startServiceAccountController(ctx ControllerContext) (bool, error) {
func startTTLController(ctx ControllerContext) (bool, error) {
go ttlcontroller.NewTTLController(
ctx.NewInformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("ttl-controller"),
).Run(5, ctx.Stop)
return true, nil

View File

@ -32,9 +32,9 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) {
return false, nil
}
go daemon.NewDaemonSetsController(
ctx.NewInformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
int(ctx.Options.LookupCacheSizeForDaemonSet),
).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop)
@ -46,9 +46,9 @@ func startDeploymentController(ctx ControllerContext) (bool, error) {
return false, nil
}
go deployment.NewDeploymentController(
ctx.NewInformerFactory.Extensions().V1beta1().Deployments(),
ctx.NewInformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Extensions().V1beta1().Deployments(),
ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
).Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop)
return true, nil
@ -59,8 +59,8 @@ func startReplicaSetController(ctx ControllerContext) (bool, error) {
return false, nil
}
go replicaset.NewReplicaSetController(
ctx.NewInformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRS),

View File

@ -30,12 +30,12 @@ func startDisruptionController(ctx ControllerContext) (bool, error) {
return false, nil
}
go disruption.NewDisruptionController(
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
ctx.NewInformerFactory.Core().V1().ReplicationControllers(),
ctx.NewInformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.NewInformerFactory.Extensions().V1beta1().Deployments(),
ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
ctx.InformerFactory.Core().V1().ReplicationControllers(),
ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.InformerFactory.Extensions().V1beta1().Deployments(),
ctx.InformerFactory.Apps().V1beta1().StatefulSets(),
ctx.ClientBuilder.ClientOrDie("disruption-controller"),
).Run(ctx.Stop)
return true, nil