From 3d6d57a18f12c575de77056be4316954a3e0ed20 Mon Sep 17 00:00:00 2001 From: Dan Mace Date: Tue, 8 Aug 2017 15:57:55 -0400 Subject: [PATCH] Improve GC discovery sync performance Improve GC discovery sync performance by only syncing when discovered resource diffs are detected. Before, the GC worker pool was shut down and monitors resynced unconditionally every sync period, leading to significant processing delays causing test flakes where otherwise reasonable GC timeouts were being exceeded. Related to https://github.com/kubernetes/kubernetes/issues/49966. --- cmd/kube-controller-manager/app/core.go | 2 +- .../garbagecollector/garbagecollector.go | 63 ++++++++++++++----- .../garbagecollector/garbagecollector_test.go | 10 ++- .../garbage_collector_test.go | 2 +- 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 5dd170140a..98a803252f 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -339,7 +339,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { // Periodically refresh the RESTMapper with new discovery information and sync // the garbage collector. - go garbageCollector.Sync(restMapper, discoveryClient, 30*time.Second, ctx.Stop) + go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop) return true, nil } diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 8732078fa8..efffc5a91f 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -18,6 +18,7 @@ package garbagecollector import ( "fmt" + "reflect" "sync" "time" @@ -59,7 +60,7 @@ const ResourceResyncTime time.Duration = 0 // ensures that the garbage collector operates with a graph that is at least as // up to date as the notification is sent. type GarbageCollector struct { - restMapper meta.RESTMapper + restMapper resettableRESTMapper // clientPool uses the regular dynamicCodec. We need it to update // finalizers. It can be removed if we support patching finalizers. clientPool dynamic.ClientPool @@ -81,7 +82,7 @@ type GarbageCollector struct { func NewGarbageCollector( metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, - mapper meta.RESTMapper, + mapper resettableRESTMapper, deletableResources map[schema.GroupVersionResource]struct{}, ignoredResources map[schema.GroupResource]struct{}, sharedInformers informers.SharedInformerFactory, @@ -162,25 +163,59 @@ type resettableRESTMapper interface { Reset() } -// Sync periodically resyncs the garbage collector monitors with resources -// returned found via the discoveryClient. Sync blocks, continuing to sync until -// a message is received on stopCh. +// Sync periodically resyncs the garbage collector when new resources are +// observed from discovery. When new resources are detected, Sync will stop all +// GC workers, reset gc.restMapper, and resync the monitors. // -// The discoveryClient should be the same client which underlies restMapper. -func (gc *GarbageCollector) Sync(restMapper resettableRESTMapper, discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) { +// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise +// the mapper's underlying discovery client will be unnecessarily reset during +// the course of detecting new resources. +func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) { + oldResources := make(map[schema.GroupVersionResource]struct{}) wait.Until(func() { + // Get the current resource list from discovery. + newResources, err := GetDeletableResources(discoveryClient) + if err != nil { + utilruntime.HandleError(err) + return + } + + // Detect first or abnormal sync and try again later. + if oldResources == nil || len(oldResources) == 0 { + oldResources = newResources + return + } + + // Decide whether discovery has reported a change. + if reflect.DeepEqual(oldResources, newResources) { + glog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync") + return + } + + // Something has changed, so track the new state and perform a sync. + glog.V(2).Infof("syncing garbage collector with updated resources from discovery: %v", newResources) + oldResources = newResources + // Ensure workers are paused to avoid processing events before informers // have resynced. gc.workerLock.Lock() defer gc.workerLock.Unlock() - restMapper.Reset() - deletableResources, err := GetDeletableResources(discoveryClient) - if err != nil { - utilruntime.HandleError(err) - return - } - if err := gc.resyncMonitors(deletableResources); err != nil { + // Resetting the REST mapper will also invalidate the underlying discovery + // client. This is a leaky abstraction and assumes behavior about the REST + // mapper, but we'll deal with it for now. + gc.restMapper.Reset() + + // Perform the monitor resync and wait for controllers to report cache sync. + // + // NOTE: It's possible that newResources will diverge from the resources + // discovered by restMapper during the call to Reset, since they are + // distinct discovery clients invalidated at different times. For example, + // newResources may contain resources not returned in the restMapper's + // discovery call if the resources appeared inbetween the calls. In that + // case, the restMapper will fail to map some of newResources until the next + // sync period. + if err := gc.resyncMonitors(newResources); err != nil { utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) return } diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 37fcd20069..4ba3c90c8c 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -46,11 +46,17 @@ import ( "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" ) +type testRESTMapper struct { + meta.RESTMapper +} + +func (_ *testRESTMapper) Reset() {} + func TestGarbageCollectorConstruction(t *testing.T) { config := &restclient.Config{} config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} tweakableRM := meta.NewDefaultRESTMapper(nil, nil) - rm := meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()} + rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()}} metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) @@ -168,7 +174,7 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector { podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}} client := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(client, 0) - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, ignoredResources, sharedInformers) + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers) if err != nil { t.Fatal(err) } diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index ee3f838969..c0c7c2521b 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -265,7 +265,7 @@ func setup(t *testing.T, workerCount int) *testContext { syncPeriod := 5 * time.Second startGC := func(workers int) { go gc.Run(workers, stopCh) - go gc.Sync(restMapper, discoveryClient, syncPeriod, stopCh) + go gc.Sync(clientSet.Discovery(), syncPeriod, stopCh) } if workerCount > 0 {