mirror of https://github.com/k3s-io/k3s
Add RESTMapper to ControllerContext and make it generic for controllers
parent
d6967f358e
commit
7f93d11f9e
|
@ -22,8 +22,6 @@ package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/client-go/discovery"
|
|
||||||
discocache "k8s.io/client-go/discovery/cached"
|
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
"k8s.io/client-go/scale"
|
"k8s.io/client-go/scale"
|
||||||
"k8s.io/kubernetes/pkg/controller/podautoscaler"
|
"k8s.io/kubernetes/pkg/controller/podautoscaler"
|
||||||
|
@ -73,15 +71,10 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me
|
||||||
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
|
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
|
||||||
hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
|
hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
|
||||||
|
|
||||||
// TODO: we need something like deferred discovery REST mapper that calls invalidate
|
|
||||||
// on cache misses.
|
|
||||||
cachedDiscovery := discocache.NewMemCacheClient(hpaClientGoClient.Discovery())
|
|
||||||
restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscovery)
|
|
||||||
restMapper.Reset()
|
|
||||||
// we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
|
// we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
|
||||||
// so we want to re-fetch every time when we actually ask for it
|
// so we want to re-fetch every time when we actually ask for it
|
||||||
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClientGoClient.Discovery())
|
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClientGoClient.Discovery())
|
||||||
scaleClient, err := scale.NewForConfig(hpaClientConfig, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
|
scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -95,7 +88,7 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me
|
||||||
hpaClientGoClient.CoreV1(),
|
hpaClientGoClient.CoreV1(),
|
||||||
scaleClient,
|
scaleClient,
|
||||||
hpaClient.AutoscalingV1(),
|
hpaClient.AutoscalingV1(),
|
||||||
restMapper,
|
ctx.RESTMapper,
|
||||||
replicaCalc,
|
replicaCalc,
|
||||||
ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
|
ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
|
||||||
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
|
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
|
||||||
|
|
|
@ -35,6 +35,8 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/discovery"
|
||||||
|
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/leaderelection"
|
"k8s.io/client-go/tools/leaderelection"
|
||||||
|
@ -223,6 +225,11 @@ type ControllerContext struct {
|
||||||
// ComponentConfig provides access to init options for a given controller
|
// ComponentConfig provides access to init options for a given controller
|
||||||
ComponentConfig componentconfig.KubeControllerManagerConfiguration
|
ComponentConfig componentconfig.KubeControllerManagerConfiguration
|
||||||
|
|
||||||
|
// DeferredDiscoveryRESTMapper is a RESTMapper that will defer
|
||||||
|
// initialization of the RESTMapper until the first mapping is
|
||||||
|
// requested.
|
||||||
|
RESTMapper *discovery.DeferredDiscoveryRESTMapper
|
||||||
|
|
||||||
// AvailableResources is a map listing currently available resources
|
// AvailableResources is a map listing currently available resources
|
||||||
AvailableResources map[schema.GroupVersionResource]bool
|
AvailableResources map[schema.GroupVersionResource]bool
|
||||||
|
|
||||||
|
@ -389,6 +396,14 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
|
||||||
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
|
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Use a discovery client capable of being refreshed.
|
||||||
|
discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery")
|
||||||
|
cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
|
||||||
|
restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedClient)
|
||||||
|
go wait.Until(func() {
|
||||||
|
restMapper.Reset()
|
||||||
|
}, 30*time.Second, stop)
|
||||||
|
|
||||||
availableResources, err := GetAvailableResources(rootClientBuilder)
|
availableResources, err := GetAvailableResources(rootClientBuilder)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ControllerContext{}, err
|
return ControllerContext{}, err
|
||||||
|
@ -404,6 +419,7 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
|
||||||
ClientBuilder: clientBuilder,
|
ClientBuilder: clientBuilder,
|
||||||
InformerFactory: sharedInformers,
|
InformerFactory: sharedInformers,
|
||||||
ComponentConfig: s.Generic.ComponentConfig,
|
ComponentConfig: s.Generic.ComponentConfig,
|
||||||
|
RESTMapper: restMapper,
|
||||||
AvailableResources: availableResources,
|
AvailableResources: availableResources,
|
||||||
Cloud: cloud,
|
Cloud: cloud,
|
||||||
LoopMode: loopMode,
|
LoopMode: loopMode,
|
||||||
|
|
|
@ -31,7 +31,6 @@ import (
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/discovery"
|
|
||||||
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
@ -300,13 +299,13 @@ func startNamespaceController(ctx ControllerContext) (bool, error) {
|
||||||
nsKubeconfig.Burst *= 100
|
nsKubeconfig.Burst *= 100
|
||||||
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
|
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
|
||||||
|
|
||||||
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
|
|
||||||
|
|
||||||
dynamicClient, err := dynamic.NewForConfig(nsKubeconfig)
|
dynamicClient, err := dynamic.NewForConfig(nsKubeconfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
|
||||||
|
|
||||||
namespaceController := namespacecontroller.NewNamespaceController(
|
namespaceController := namespacecontroller.NewNamespaceController(
|
||||||
namespaceKubeClient,
|
namespaceKubeClient,
|
||||||
dynamicClient,
|
dynamicClient,
|
||||||
|
@ -348,11 +347,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
|
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
|
||||||
|
|
||||||
// Use a discovery client capable of being refreshed.
|
|
||||||
discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())
|
discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())
|
||||||
restMapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient)
|
|
||||||
restMapper.Reset()
|
|
||||||
|
|
||||||
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
|
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
|
||||||
config.ContentConfig = dynamic.ContentConfig()
|
config.ContentConfig = dynamic.ContentConfig()
|
||||||
|
@ -360,8 +355,8 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
|
||||||
// resource types. Otherwise we'll be storing full Unstructured data in our
|
// resource types. Otherwise we'll be storing full Unstructured data in our
|
||||||
// caches for custom resources. Consider porting it to work with
|
// caches for custom resources. Consider porting it to work with
|
||||||
// metav1beta1.PartialObjectMetadata.
|
// metav1beta1.PartialObjectMetadata.
|
||||||
metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
metaOnlyClientPool := dynamic.NewClientPool(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||||
clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
|
clientPool := dynamic.NewClientPool(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc)
|
||||||
|
|
||||||
// Get an initial set of deletable resources to prime the garbage collector.
|
// Get an initial set of deletable resources to prime the garbage collector.
|
||||||
deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
|
deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
|
||||||
|
@ -372,7 +367,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
|
||||||
garbageCollector, err := garbagecollector.NewGarbageCollector(
|
garbageCollector, err := garbagecollector.NewGarbageCollector(
|
||||||
metaOnlyClientPool,
|
metaOnlyClientPool,
|
||||||
clientPool,
|
clientPool,
|
||||||
restMapper,
|
ctx.RESTMapper,
|
||||||
deletableResources,
|
deletableResources,
|
||||||
ignoredResources,
|
ignoredResources,
|
||||||
ctx.InformerFactory,
|
ctx.InformerFactory,
|
||||||
|
|
|
@ -254,6 +254,12 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
|
||||||
}
|
}
|
||||||
syncPeriod := 5 * time.Second
|
syncPeriod := 5 * time.Second
|
||||||
startGC := func(workers int) {
|
startGC := func(workers int) {
|
||||||
|
go wait.Until(func() {
|
||||||
|
// Resetting the REST mapper will also invalidate the underlying discovery
|
||||||
|
// client. This is a leaky abstraction and assumes behavior about the REST
|
||||||
|
// mapper, but we'll deal with it for now.
|
||||||
|
restMapper.Reset()
|
||||||
|
}, syncPeriod, stopCh)
|
||||||
go gc.Run(workers, stopCh)
|
go gc.Run(workers, stopCh)
|
||||||
go gc.Sync(clientSet.Discovery(), syncPeriod, stopCh)
|
go gc.Sync(clientSet.Discovery(), syncPeriod, stopCh)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue