Merge pull request #63444 from deads2k/client-07-gc-dynamic

Automatic merge from submit-queue (batch tested with PRs 63526, 60371, 63444). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

update garbage collection to use the new dynamic client

Update GC to use the new and easy to use dynamic client.  This is one of two remaining stragglers.

@kubernetes/sig-api-machinery-pr-reviews 
@caesarxuchao @ironcladlou 

```release-note
NONE
```
pull/8/head
Kubernetes Submit Queue 2018-05-08 15:24:11 -07:00 committed by GitHub
commit 51d75a7b1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 61 additions and 113 deletions

View File

@ -350,13 +350,13 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
config.ContentConfig = dynamic.ContentConfig()
// TODO: Make NewMetadataCodecFactory support arbitrary (non-compiled)
// resource types. Otherwise we'll be storing full Unstructured data in our
// caches for custom resources. Consider porting it to work with
// metav1beta1.PartialObjectMetadata.
metaOnlyClientPool := dynamic.NewClientPool(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc)
clientPool := dynamic.NewClientPool(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc)
// bump QPS limits on our dynamic client that we use to GC every deleted object
config.QPS *= 20
config.Burst *= 20
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return true, err
}
// Get an initial set of deletable resources to prime the garbage collector.
deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
@ -365,8 +365,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
}
garbageCollector, err := garbagecollector.NewGarbageCollector(
metaOnlyClientPool,
clientPool,
dynamicClient,
ctx.RESTMapper,
deletableResources,
ignoredResources,

View File

@ -54,14 +54,12 @@ go_test(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/controller/garbagecollector/metaonly:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",

View File

@ -60,9 +60,7 @@ const ResourceResyncTime time.Duration = 0
// up to date as the notification is sent.
type GarbageCollector struct {
restMapper resettableRESTMapper
// clientPool uses the regular dynamicCodec. We need it to update
// finalizers. It can be removed if we support patching finalizers.
clientPool dynamic.ClientPool
dynamicClient dynamic.DynamicInterface
// garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe.
attemptToDelete workqueue.RateLimitingInterface
// garbage collector attempts to orphan the dependents of the items in the attemptToOrphan queue, then deletes the items.
@ -76,8 +74,7 @@ type GarbageCollector struct {
}
func NewGarbageCollector(
metaOnlyClientPool dynamic.ClientPool,
clientPool dynamic.ClientPool,
dynamicClient dynamic.DynamicInterface,
mapper resettableRESTMapper,
deletableResources map[schema.GroupVersionResource]struct{},
ignoredResources map[schema.GroupResource]struct{},
@ -88,14 +85,14 @@ func NewGarbageCollector(
attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
absentOwnerCache := NewUIDCache(500)
gc := &GarbageCollector{
clientPool: clientPool,
dynamicClient: dynamicClient,
restMapper: mapper,
attemptToDelete: attemptToDelete,
attemptToOrphan: attemptToOrphan,
absentOwnerCache: absentOwnerCache,
}
gb := &GraphBuilder{
metaOnlyClientPool: metaOnlyClientPool,
dynamicClient: dynamicClient,
informersStarted: informersStarted,
restMapper: mapper,
graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
@ -291,19 +288,15 @@ func (gc *GarbageCollector) isDangling(reference metav1.OwnerReference, item *no
// ii) should update the object to remove such references. This is to
// prevent objects having references to an old resource from being
// deleted during a cluster upgrade.
fqKind := schema.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
if err != nil {
return false, nil, err
}
resource, err := gc.apiResource(reference.APIVersion, reference.Kind)
resource, namespaced, err := gc.apiResource(reference.APIVersion, reference.Kind)
if err != nil {
return false, nil, err
}
// TODO: It's only necessary to talk to the API server if the owner node
// is a "virtual" node. The local graph could lag behind the real
// status, but in practice, the difference is small.
owner, err = client.Resource(resource, resourceDefaultNamespace(resource, item.identity.Namespace)).Get(reference.Name, metav1.GetOptions{})
owner, err = gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.identity.Namespace)).Get(reference.Name, metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
gc.absentOwnerCache.Add(reference.UID)

View File

@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
@ -48,7 +47,6 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
)
type testRESTMapper struct {
@ -59,12 +57,13 @@ func (_ *testRESTMapper) Reset() {}
func TestGarbageCollectorConstruction(t *testing.T) {
config := &restclient.Config{}
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
tweakableRM := meta.NewDefaultRESTMapper(nil)
rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme)}}
metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
podResource := map[schema.GroupVersionResource]struct{}{
{Version: "v1", Resource: "pods"}: {},
}
@ -79,7 +78,7 @@ func TestGarbageCollectorConstruction(t *testing.T) {
// construction will not fail.
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
gc, err := NewGarbageCollector(dynamicClient, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
if err != nil {
t.Fatal(err)
}
@ -190,16 +189,17 @@ type garbageCollector struct {
}
func setupGC(t *testing.T, config *restclient.Config) garbageCollector {
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme), dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme), dynamic.LegacyAPIPathResolverFunc)
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}}
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme)}, podResource, ignoredResources, sharedInformers, alwaysStarted)
gc, err := NewGarbageCollector(dynamicClient, &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme)}, podResource, ignoredResources, sharedInformers, alwaysStarted)
if err != nil {
t.Fatal(err)
}
@ -434,13 +434,13 @@ func TestGCListWatcher(t *testing.T) {
testHandler := &fakeActionHandler{}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
clientPool := dynamic.NewClientPool(clientConfig, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme), dynamic.LegacyAPIPathResolverFunc)
podResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
client, err := clientPool.ClientForGroupVersionResource(podResource)
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
lw := listWatcher(client, podResource)
lw := listWatcher(dynamicClient, podResource)
lw.DisableChunking = true
if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil {
t.Fatal(err)
@ -824,15 +824,18 @@ func TestGarbageCollectorSync(t *testing.T) {
}
rm := &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme)}
metaOnlyClientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc)
clientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc)
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
podResource := map[schema.GroupVersionResource]struct{}{
{Group: "", Version: "v1", Resource: "pods"}: {},
}
sharedInformers := informers.NewSharedInformerFactory(client, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, podResource, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
gc, err := NewGarbageCollector(dynamicClient, rm, podResource, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
if err != nil {
t.Fatal(err)
}

View File

@ -91,6 +91,7 @@ type GraphBuilder struct {
// it is protected by monitorLock.
running bool
dynamicClient dynamic.DynamicInterface
// metaOnlyClientPool uses a special codec, which removes fields except for
// apiVersion, kind, and metadata during decoding.
metaOnlyClientPool dynamic.ClientPool
@ -127,27 +128,15 @@ func (m *monitor) Run() {
type monitors map[schema.GroupVersionResource]*monitor
func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource) *cache.ListWatch {
func listWatcher(client dynamic.DynamicInterface, resource schema.GroupVersionResource) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.
Resource(&apiResource, metav1.NamespaceAll).
List(options)
// We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok.
return client.Resource(resource).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.
Resource(&apiResource, metav1.NamespaceAll).
Watch(options)
// We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok.
return client.Resource(resource).Watch(options)
},
}
}
@ -199,12 +188,8 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
// TODO: consider store in one storage.
glog.V(5).Infof("create storage for resource %s", resource)
client, err := gb.metaOnlyClientPool.ClientForGroupVersionKind(kind)
if err != nil {
return nil, err
}
_, monitor := cache.NewInformer(
listWatcher(client, resource),
listWatcher(gb.dynamicClient, resource),
nil,
ResourceResyncTime,
// don't need to clone because it's not from shared cache

View File

@ -31,8 +31,8 @@ import (
)
// cluster scoped resources don't have namespaces. Default to the item's namespace, but clear it for cluster scoped resources
func resourceDefaultNamespace(resource *metav1.APIResource, defaultNamespace string) string {
if resource.Namespaced {
func resourceDefaultNamespace(namespaced bool, defaultNamespace string) string {
if namespaced {
return defaultNamespace
}
return ""
@ -40,74 +40,48 @@ func resourceDefaultNamespace(resource *metav1.APIResource, defaultNamespace str
// apiResource consults the REST mapper to translate an <apiVersion, kind,
// namespace> tuple to a unversioned.APIResource struct.
func (gc *GarbageCollector) apiResource(apiVersion, kind string) (*metav1.APIResource, error) {
func (gc *GarbageCollector) apiResource(apiVersion, kind string) (schema.GroupVersionResource, bool, error) {
fqKind := schema.FromAPIVersionAndKind(apiVersion, kind)
mapping, err := gc.restMapper.RESTMapping(fqKind.GroupKind(), fqKind.Version)
if err != nil {
return nil, newRESTMappingError(kind, apiVersion)
return schema.GroupVersionResource{}, false, newRESTMappingError(kind, apiVersion)
}
glog.V(5).Infof("map kind %s, version %s to resource %s", kind, apiVersion, mapping.Resource)
resource := metav1.APIResource{
Name: mapping.Resource.Resource,
Namespaced: mapping.Scope == meta.RESTScopeNamespace,
Kind: kind,
}
return &resource, nil
return mapping.Resource, mapping.Scope == meta.RESTScopeNamespace, nil
}
func (gc *GarbageCollector) deleteObject(item objectReference, policy *metav1.DeletionPropagation) error {
fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
if err != nil {
return err
}
resource, err := gc.apiResource(item.APIVersion, item.Kind)
resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind)
if err != nil {
return err
}
uid := item.UID
preconditions := metav1.Preconditions{UID: &uid}
deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions, PropagationPolicy: policy}
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Delete(item.Name, &deleteOptions)
return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Delete(item.Name, &deleteOptions)
}
func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) {
fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind)
if err != nil {
return nil, err
}
resource, err := gc.apiResource(item.APIVersion, item.Kind)
if err != nil {
return nil, err
}
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Get(item.Name, metav1.GetOptions{})
return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Get(item.Name, metav1.GetOptions{})
}
func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind)
if err != nil {
return nil, err
}
resource, err := gc.apiResource(item.APIVersion, item.Kind)
if err != nil {
return nil, err
}
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Update(obj)
return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Update(obj)
}
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) {
fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind)
if err != nil {
return nil, err
}
resource, err := gc.apiResource(item.APIVersion, item.Kind)
if err != nil {
return nil, err
}
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch)
return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch)
}
// TODO: Using Patch when strategicmerge supports deleting an entry from a

View File

@ -224,9 +224,6 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
restMapper.Reset()
deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
config := *result.ClientConfig
config.ContentConfig = dynamic.ContentConfig()
metaOnlyClientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
clientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
dynamicClient, err := dynamic.NewForConfig(&config)
if err != nil {
t.Fatalf("failed to create dynamicClient: %v", err)
@ -235,8 +232,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
metaOnlyClientPool,
clientPool,
dynamicClient,
restMapper,
deletableResources,
garbagecollector.DefaultIgnoredResources(),