Switched to use dynamic shared informer for Garbage Collector.

k3s-v1.15.3
Yu Liao 2019-02-21 11:28:02 -08:00
parent 63e6cf3a0a
commit 05ebe91277
8 changed files with 44 additions and 79 deletions

View File

@ -400,7 +400,7 @@ func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool,
ctx.RESTMapper,
deletableResources,
ignoredResources,
ctx.InformerFactory,
ctx.GenericInformerFactory,
ctx.InformersStarted,
)
if err != nil {

View File

@ -26,7 +26,7 @@ import (
"time"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

View File

@ -33,10 +33,8 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
@ -60,6 +58,7 @@ go_test(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library",
@ -71,6 +70,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",

View File

@ -36,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
// import known versions
@ -66,7 +65,7 @@ type GarbageCollector struct {
dependencyGraphBuilder *GraphBuilder
// GC caches the owners that do not exist according to the API server.
absentOwnerCache *UIDCache
sharedInformers informers.SharedInformerFactory
sharedInformers controller.InformerFactory
workerLock sync.RWMutex
}
@ -76,7 +75,7 @@ func NewGarbageCollector(
mapper resettableRESTMapper,
deletableResources map[schema.GroupVersionResource]struct{},
ignoredResources map[schema.GroupResource]struct{},
sharedInformers informers.SharedInformerFactory,
sharedInformers controller.InformerFactory,
informersStarted <-chan struct{},
) (*GarbageCollector, error) {
attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
@ -90,7 +89,6 @@ func NewGarbageCollector(
absentOwnerCache: absentOwnerCache,
}
gb := &GraphBuilder{
dynamicClient: dynamicClient,
informersStarted: informersStarted,
restMapper: mapper,
graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),

View File

@ -41,12 +41,14 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller"
)
type testRESTMapper struct {
@ -72,13 +74,15 @@ func TestGarbageCollectorConstruction(t *testing.T) {
{Group: "tpr.io", Version: "v1", Resource: "unknown"}: {},
}
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
sharedInformers := informers.NewSharedInformerFactory(client, 0)
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
// No monitor will be constructed for the non-core resource, but the GC
// construction will not fail.
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := NewGarbageCollector(dynamicClient, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
gc, err := NewGarbageCollector(dynamicClient, rm, twoResources, map[schema.GroupResource]struct{}{},
controller.NewInformerFactory(sharedInformers, dynamicInformers), alwaysStarted)
if err != nil {
t.Fatal(err)
}
@ -429,36 +433,6 @@ func TestDependentsRace(t *testing.T) {
}()
}
// test the list and watch functions correctly converts the ListOptions
func TestGCListWatcher(t *testing.T) {
testHandler := &fakeActionHandler{}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
podResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
lw := listWatcher(dynamicClient, podResource)
lw.DisableChunking = true
if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil {
t.Fatal(err)
}
if _, err := lw.List(metav1.ListOptions{ResourceVersion: "1"}); err != nil {
t.Fatal(err)
}
if e, a := 2, len(testHandler.actions); e != a {
t.Errorf("expect %d requests, got %d", e, a)
}
if e, a := "resourceVersion=1&watch=true", testHandler.actions[0].query; e != a {
t.Errorf("expect %s, got %s", e, a)
}
if e, a := "resourceVersion=1", testHandler.actions[1].query; e != a {
t.Errorf("expect %s, got %s", e, a)
}
}
func podToGCNode(pod *v1.Pod) *node {
return &node{
identity: objectReference{

View File

@ -26,17 +26,14 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
)
@ -91,7 +88,6 @@ type GraphBuilder struct {
// it is protected by monitorLock.
running bool
dynamicClient dynamic.Interface
// monitors are the producer of the graphChanges queue, graphBuilder alters
// the in-memory graph according to the changes.
graphChanges workqueue.RateLimitingInterface
@ -104,7 +100,7 @@ type GraphBuilder struct {
// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
// be non-existent are added to the cached.
absentOwnerCache *UIDCache
sharedInformers informers.SharedInformerFactory
sharedInformers controller.InformerFactory
ignoredResources map[schema.GroupResource]struct{}
}
@ -126,19 +122,6 @@ func (m *monitor) Run() {
type monitors map[schema.GroupVersionResource]*monitor
func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok.
return client.Resource(resource).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// We want to list this resource in all namespaces if it's namespace scoped, so not passing namespace is ok.
return client.Resource(resource).Watch(options)
},
}
}
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
@ -175,24 +158,14 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
},
}
shared, err := gb.sharedInformers.ForResource(resource)
if err == nil {
klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
return shared.Informer().GetController(), shared.Informer().GetStore(), nil
if err != nil {
klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
return nil, nil, err
}
klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
// TODO: consider store in one storage.
klog.V(5).Infof("create storage for resource %s", resource)
store, monitor := cache.NewInformer(
listWatcher(gb.dynamicClient, resource),
nil,
ResourceResyncTime,
// don't need to clone because it's not from shared cache
handlers,
)
return monitor, store, nil
klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
return shared.Informer().GetController(), shared.Informer().GetStore(), nil
}
// syncMonitors rebuilds the monitor set according to the supplied resources,

View File

@ -12,6 +12,7 @@ go_test(
tags = ["integration"],
deps = [
"//cmd/kube-apiserver/app/testing:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/garbagecollector:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library",
@ -28,6 +29,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",

View File

@ -38,11 +38,13 @@ import (
"k8s.io/apiserver/pkg/storage/names"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
@ -229,6 +231,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
t.Fatalf("failed to create dynamicClient: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
@ -236,7 +239,7 @@ func setupWithServer(t *testing.T, result *kubeapiservertesting.TestServer, work
restMapper,
deletableResources,
garbagecollector.DefaultIgnoredResources(),
sharedInformers,
controller.NewInformerFactory(sharedInformers, dynamicInformers),
alwaysStarted,
)
if err != nil {
@ -966,9 +969,24 @@ func TestCRDDeletionCascading(t *testing.T) {
ns := createNamespaceOrDie("crd-mixed", clientSet, t)
configMapClient := clientSet.CoreV1().ConfigMaps(ns.Name)
t.Logf("First pass CRD cascading deletion")
definition, resourceClient := createRandomCustomResourceDefinition(t, apiExtensionClient, dynamicClient, ns.Name)
testCRDDeletion(t, ctx, ns, definition, resourceClient)
t.Logf("Second pass CRD cascading deletion")
accessor := meta.NewAccessor()
accessor.SetResourceVersion(definition, "")
_, err := apiextensionstestserver.CreateNewCustomResourceDefinition(definition, apiExtensionClient, dynamicClient)
if err != nil {
t.Fatalf("failed to create CustomResourceDefinition: %v", err)
}
testCRDDeletion(t, ctx, ns, definition, resourceClient)
}
func testCRDDeletion(t *testing.T, ctx *testContext, ns *v1.Namespace, definition *apiextensionsv1beta1.CustomResourceDefinition, resourceClient dynamic.ResourceInterface) {
clientSet, apiExtensionClient := ctx.clientSet, ctx.apiExtensionClient
configMapClient := clientSet.CoreV1().ConfigMaps(ns.Name)
// Create a custom owner resource.
owner, err := resourceClient.Create(newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("owner")), metav1.CreateOptions{})