diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 5dd170140a..98a803252f 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -339,7 +339,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { // Periodically refresh the RESTMapper with new discovery information and sync // the garbage collector. - go garbageCollector.Sync(restMapper, discoveryClient, 30*time.Second, ctx.Stop) + go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop) return true, nil } diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 8732078fa8..efffc5a91f 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -18,6 +18,7 @@ package garbagecollector import ( "fmt" + "reflect" "sync" "time" @@ -59,7 +60,7 @@ const ResourceResyncTime time.Duration = 0 // ensures that the garbage collector operates with a graph that is at least as // up to date as the notification is sent. type GarbageCollector struct { - restMapper meta.RESTMapper + restMapper resettableRESTMapper // clientPool uses the regular dynamicCodec. We need it to update // finalizers. It can be removed if we support patching finalizers. clientPool dynamic.ClientPool @@ -81,7 +82,7 @@ type GarbageCollector struct { func NewGarbageCollector( metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, - mapper meta.RESTMapper, + mapper resettableRESTMapper, deletableResources map[schema.GroupVersionResource]struct{}, ignoredResources map[schema.GroupResource]struct{}, sharedInformers informers.SharedInformerFactory, @@ -162,25 +163,59 @@ type resettableRESTMapper interface { Reset() } -// Sync periodically resyncs the garbage collector monitors with resources -// returned found via the discoveryClient. Sync blocks, continuing to sync until -// a message is received on stopCh. +// Sync periodically resyncs the garbage collector when new resources are +// observed from discovery. When new resources are detected, Sync will stop all +// GC workers, reset gc.restMapper, and resync the monitors. // -// The discoveryClient should be the same client which underlies restMapper. -func (gc *GarbageCollector) Sync(restMapper resettableRESTMapper, discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) { +// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise +// the mapper's underlying discovery client will be unnecessarily reset during +// the course of detecting new resources. +func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) { + oldResources := make(map[schema.GroupVersionResource]struct{}) wait.Until(func() { + // Get the current resource list from discovery. + newResources, err := GetDeletableResources(discoveryClient) + if err != nil { + utilruntime.HandleError(err) + return + } + + // Detect first or abnormal sync and try again later. + if oldResources == nil || len(oldResources) == 0 { + oldResources = newResources + return + } + + // Decide whether discovery has reported a change. + if reflect.DeepEqual(oldResources, newResources) { + glog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync") + return + } + + // Something has changed, so track the new state and perform a sync. + glog.V(2).Infof("syncing garbage collector with updated resources from discovery: %v", newResources) + oldResources = newResources + // Ensure workers are paused to avoid processing events before informers // have resynced. gc.workerLock.Lock() defer gc.workerLock.Unlock() - restMapper.Reset() - deletableResources, err := GetDeletableResources(discoveryClient) - if err != nil { - utilruntime.HandleError(err) - return - } - if err := gc.resyncMonitors(deletableResources); err != nil { + // Resetting the REST mapper will also invalidate the underlying discovery + // client. This is a leaky abstraction and assumes behavior about the REST + // mapper, but we'll deal with it for now. + gc.restMapper.Reset() + + // Perform the monitor resync and wait for controllers to report cache sync. + // + // NOTE: It's possible that newResources will diverge from the resources + // discovered by restMapper during the call to Reset, since they are + // distinct discovery clients invalidated at different times. For example, + // newResources may contain resources not returned in the restMapper's + // discovery call if the resources appeared inbetween the calls. In that + // case, the restMapper will fail to map some of newResources until the next + // sync period. + if err := gc.resyncMonitors(newResources); err != nil { utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) return } diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 37fcd20069..4ba3c90c8c 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -46,11 +46,17 @@ import ( "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" ) +type testRESTMapper struct { + meta.RESTMapper +} + +func (_ *testRESTMapper) Reset() {} + func TestGarbageCollectorConstruction(t *testing.T) { config := &restclient.Config{} config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} tweakableRM := meta.NewDefaultRESTMapper(nil, nil) - rm := meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()} + rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()}} metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) @@ -168,7 +174,7 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector { podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}} client := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(client, 0) - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, ignoredResources, sharedInformers) + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers) if err != nil { t.Fatal(err) } diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index ee3f838969..c0c7c2521b 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -265,7 +265,7 @@ func setup(t *testing.T, workerCount int) *testContext { syncPeriod := 5 * time.Second startGC := func(workers int) { go gc.Run(workers, stopCh) - go gc.Sync(restMapper, discoveryClient, syncPeriod, stopCh) + go gc.Sync(clientSet.Discovery(), syncPeriod, stopCh) } if workerCount > 0 {