diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 393b218a45..d5e393fd49 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -350,13 +350,13 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery()) config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector") - config.ContentConfig = dynamic.ContentConfig() - // TODO: Make NewMetadataCodecFactory support arbitrary (non-compiled) - // resource types. Otherwise we'll be storing full Unstructured data in our - // caches for custom resources. Consider porting it to work with - // metav1beta1.PartialObjectMetadata. - metaOnlyClientPool := dynamic.NewClientPool(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc) - clientPool := dynamic.NewClientPool(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc) + // bump QPS limits on our dynamic client that we use to GC every deleted object + config.QPS *= 20 + config.Burst *= 20 + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return true, err + } // Get an initial set of deletable resources to prime the garbage collector. deletableResources := garbagecollector.GetDeletableResources(discoveryClient) @@ -365,8 +365,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{} } garbageCollector, err := garbagecollector.NewGarbageCollector( - metaOnlyClientPool, - clientPool, + dynamicClient, ctx.RESTMapper, deletableResources, ignoredResources, diff --git a/pkg/controller/garbagecollector/BUILD b/pkg/controller/garbagecollector/BUILD index 6fca5ce338..37f439ed78 100644 --- a/pkg/controller/garbagecollector/BUILD +++ b/pkg/controller/garbagecollector/BUILD @@ -54,14 +54,12 @@ go_test( deps = [ "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/core/install:go_default_library", - "//pkg/controller/garbagecollector/metaonly:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index bce61b3790..4d05dee56c 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -59,10 +59,8 @@ const ResourceResyncTime time.Duration = 0 // ensures that the garbage collector operates with a graph that is at least as // up to date as the notification is sent. type GarbageCollector struct { - restMapper resettableRESTMapper - // clientPool uses the regular dynamicCodec. We need it to update - // finalizers. It can be removed if we support patching finalizers. - clientPool dynamic.ClientPool + restMapper resettableRESTMapper + dynamicClient dynamic.DynamicInterface // garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe. attemptToDelete workqueue.RateLimitingInterface // garbage collector attempts to orphan the dependents of the items in the attemptToOrphan queue, then deletes the items. @@ -76,8 +74,7 @@ type GarbageCollector struct { } func NewGarbageCollector( - metaOnlyClientPool dynamic.ClientPool, - clientPool dynamic.ClientPool, + dynamicClient dynamic.DynamicInterface, mapper resettableRESTMapper, deletableResources map[schema.GroupVersionResource]struct{}, ignoredResources map[schema.GroupResource]struct{}, @@ -88,17 +85,17 @@ func NewGarbageCollector( attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan") absentOwnerCache := NewUIDCache(500) gc := &GarbageCollector{ - clientPool: clientPool, + dynamicClient: dynamicClient, restMapper: mapper, attemptToDelete: attemptToDelete, attemptToOrphan: attemptToOrphan, absentOwnerCache: absentOwnerCache, } gb := &GraphBuilder{ - metaOnlyClientPool: metaOnlyClientPool, - informersStarted: informersStarted, - restMapper: mapper, - graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"), + dynamicClient: dynamicClient, + informersStarted: informersStarted, + restMapper: mapper, + graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"), uidToNode: &concurrentUIDToNode{ uidToNode: make(map[types.UID]*node), }, @@ -291,19 +288,15 @@ func (gc *GarbageCollector) isDangling(reference metav1.OwnerReference, item *no // ii) should update the object to remove such references. This is to // prevent objects having references to an old resource from being // deleted during a cluster upgrade. - fqKind := schema.FromAPIVersionAndKind(reference.APIVersion, reference.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) - if err != nil { - return false, nil, err - } - resource, err := gc.apiResource(reference.APIVersion, reference.Kind) + resource, namespaced, err := gc.apiResource(reference.APIVersion, reference.Kind) if err != nil { return false, nil, err } + // TODO: It's only necessary to talk to the API server if the owner node // is a "virtual" node. The local graph could lag behind the real // status, but in practice, the difference is small. - owner, err = client.Resource(resource, resourceDefaultNamespace(resource, item.identity.Namespace)).Get(reference.Name, metav1.GetOptions{}) + owner, err = gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.identity.Namespace)).Get(reference.Name, metav1.GetOptions{}) switch { case errors.IsNotFound(err): gc.absentOwnerCache.Add(reference.UID) diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index aadbf179c6..06754afee9 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta/testrestmapper" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" @@ -48,7 +47,6 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/legacyscheme" - "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" ) type testRESTMapper struct { @@ -59,12 +57,13 @@ func (_ *testRESTMapper) Reset() {} func TestGarbageCollectorConstruction(t *testing.T) { config := &restclient.Config{} - config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} tweakableRM := meta.NewDefaultRESTMapper(nil) rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme)}} - metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) - config.ContentConfig.NegotiatedSerializer = nil - clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + podResource := map[schema.GroupVersionResource]struct{}{ {Version: "v1", Resource: "pods"}: {}, } @@ -79,7 +78,7 @@ func TestGarbageCollectorConstruction(t *testing.T) { // construction will not fail. alwaysStarted := make(chan struct{}) close(alwaysStarted) - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted) + gc, err := NewGarbageCollector(dynamicClient, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted) if err != nil { t.Fatal(err) } @@ -190,16 +189,17 @@ type garbageCollector struct { } func setupGC(t *testing.T, config *restclient.Config) garbageCollector { - config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} - metaOnlyClientPool := dynamic.NewClientPool(config, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme), dynamic.LegacyAPIPathResolverFunc) - config.ContentConfig.NegotiatedSerializer = nil - clientPool := dynamic.NewClientPool(config, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme), dynamic.LegacyAPIPathResolverFunc) + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}} client := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(client, 0) alwaysStarted := make(chan struct{}) close(alwaysStarted) - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme)}, podResource, ignoredResources, sharedInformers, alwaysStarted) + gc, err := NewGarbageCollector(dynamicClient, &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme)}, podResource, ignoredResources, sharedInformers, alwaysStarted) if err != nil { t.Fatal(err) } @@ -434,13 +434,13 @@ func TestGCListWatcher(t *testing.T) { testHandler := &fakeActionHandler{} srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) defer srv.Close() - clientPool := dynamic.NewClientPool(clientConfig, testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme), dynamic.LegacyAPIPathResolverFunc) podResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"} - client, err := clientPool.ClientForGroupVersionResource(podResource) + dynamicClient, err := dynamic.NewForConfig(clientConfig) if err != nil { t.Fatal(err) } - lw := listWatcher(client, podResource) + + lw := listWatcher(dynamicClient, podResource) lw.DisableChunking = true if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil { t.Fatal(err) @@ -824,15 +824,18 @@ func TestGarbageCollectorSync(t *testing.T) { } rm := &testRESTMapper{testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Registry, legacyscheme.Scheme)} - metaOnlyClientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc) - clientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc) + dynamicClient, err := dynamic.NewForConfig(clientConfig) + if err != nil { + t.Fatal(err) + } + podResource := map[schema.GroupVersionResource]struct{}{ {Group: "", Version: "v1", Resource: "pods"}: {}, } sharedInformers := informers.NewSharedInformerFactory(client, 0) alwaysStarted := make(chan struct{}) close(alwaysStarted) - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, podResource, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted) + gc, err := NewGarbageCollector(dynamicClient, rm, podResource, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted) if err != nil { t.Fatal(err) } diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index b21548352f..1fbb7a1ce7 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -91,6 +91,7 @@ type GraphBuilder struct { // it is protected by monitorLock. running bool + dynamicClient dynamic.DynamicInterface // metaOnlyClientPool uses a special codec, which removes fields except for // apiVersion, kind, and metadata during decoding. metaOnlyClientPool dynamic.ClientPool @@ -127,27 +128,15 @@ func (m *monitor) Run() { type monitors map[schema.GroupVersionResource]*monitor -func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource) *cache.ListWatch { +func listWatcher(client dynamic.DynamicInterface, resource schema.GroupVersionResource) *cache.ListWatch { return &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - // APIResource.Kind is not used by the dynamic client, so - // leave it empty. We want to list this resource in all - // namespaces if it's namespace scoped, so leave - // APIResource.Namespaced as false is all right. - apiResource := metav1.APIResource{Name: resource.Resource} - return client. - Resource(&apiResource, metav1.NamespaceAll). - List(options) + // We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok. + return client.Resource(resource).List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - // APIResource.Kind is not used by the dynamic client, so - // leave it empty. We want to list this resource in all - // namespaces if it's namespace scoped, so leave - // APIResource.Namespaced as false is all right. - apiResource := metav1.APIResource{Name: resource.Resource} - return client. - Resource(&apiResource, metav1.NamespaceAll). - Watch(options) + // We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok. + return client.Resource(resource).Watch(options) }, } } @@ -199,12 +188,8 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind // TODO: consider store in one storage. glog.V(5).Infof("create storage for resource %s", resource) - client, err := gb.metaOnlyClientPool.ClientForGroupVersionKind(kind) - if err != nil { - return nil, err - } _, monitor := cache.NewInformer( - listWatcher(client, resource), + listWatcher(gb.dynamicClient, resource), nil, ResourceResyncTime, // don't need to clone because it's not from shared cache diff --git a/pkg/controller/garbagecollector/operations.go b/pkg/controller/garbagecollector/operations.go index f1cc538e1e..29025692b8 100644 --- a/pkg/controller/garbagecollector/operations.go +++ b/pkg/controller/garbagecollector/operations.go @@ -31,8 +31,8 @@ import ( ) // cluster scoped resources don't have namespaces. Default to the item's namespace, but clear it for cluster scoped resources -func resourceDefaultNamespace(resource *metav1.APIResource, defaultNamespace string) string { - if resource.Namespaced { +func resourceDefaultNamespace(namespaced bool, defaultNamespace string) string { + if namespaced { return defaultNamespace } return "" @@ -40,74 +40,48 @@ func resourceDefaultNamespace(resource *metav1.APIResource, defaultNamespace str // apiResource consults the REST mapper to translate an tuple to a unversioned.APIResource struct. -func (gc *GarbageCollector) apiResource(apiVersion, kind string) (*metav1.APIResource, error) { +func (gc *GarbageCollector) apiResource(apiVersion, kind string) (schema.GroupVersionResource, bool, error) { fqKind := schema.FromAPIVersionAndKind(apiVersion, kind) mapping, err := gc.restMapper.RESTMapping(fqKind.GroupKind(), fqKind.Version) if err != nil { - return nil, newRESTMappingError(kind, apiVersion) + return schema.GroupVersionResource{}, false, newRESTMappingError(kind, apiVersion) } - glog.V(5).Infof("map kind %s, version %s to resource %s", kind, apiVersion, mapping.Resource) - resource := metav1.APIResource{ - Name: mapping.Resource.Resource, - Namespaced: mapping.Scope == meta.RESTScopeNamespace, - Kind: kind, - } - return &resource, nil + return mapping.Resource, mapping.Scope == meta.RESTScopeNamespace, nil } func (gc *GarbageCollector) deleteObject(item objectReference, policy *metav1.DeletionPropagation) error { - fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) - if err != nil { - return err - } - resource, err := gc.apiResource(item.APIVersion, item.Kind) + resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return err } uid := item.UID preconditions := metav1.Preconditions{UID: &uid} deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions, PropagationPolicy: policy} - return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Delete(item.Name, &deleteOptions) + return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Delete(item.Name, &deleteOptions) } func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) { - fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) + resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return nil, err } - resource, err := gc.apiResource(item.APIVersion, item.Kind) - if err != nil { - return nil, err - } - return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Get(item.Name, metav1.GetOptions{}) + return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Get(item.Name, metav1.GetOptions{}) } func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) + resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return nil, err } - resource, err := gc.apiResource(item.APIVersion, item.Kind) - if err != nil { - return nil, err - } - return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Update(obj) + return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Update(obj) } func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) { - fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) + resource, namespaced, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return nil, err } - resource, err := gc.apiResource(item.APIVersion, item.Kind) - if err != nil { - return nil, err - } - return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch) + return gc.dynamicClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch) } // TODO: Using Patch when strategicmerge supports deleting an entry from a diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index 3258634663..83fc260486 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -224,9 +224,6 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work restMapper.Reset() deletableResources := garbagecollector.GetDeletableResources(discoveryClient) config := *result.ClientConfig - config.ContentConfig = dynamic.ContentConfig() - metaOnlyClientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc) - clientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc) dynamicClient, err := dynamic.NewForConfig(&config) if err != nil { t.Fatalf("failed to create dynamicClient: %v", err) @@ -235,8 +232,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work alwaysStarted := make(chan struct{}) close(alwaysStarted) gc, err := garbagecollector.NewGarbageCollector( - metaOnlyClientPool, - clientPool, + dynamicClient, restMapper, deletableResources, garbagecollector.DefaultIgnoredResources(),