mirror of https://github.com/k3s-io/k3s
Improve GC discovery sync performance
Improve GC discovery sync performance by only syncing when discovered resource diffs are detected. Before, the GC worker pool was shut down and monitors resynced unconditionally every sync period, leading to significant processing delays causing test flakes where otherwise reasonable GC timeouts were being exceeded. Related to https://github.com/kubernetes/kubernetes/issues/49966.pull/6/head
parent
f6cb2fce00
commit
3d6d57a18f
|
@ -339,7 +339,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
|
|||
|
||||
// Periodically refresh the RESTMapper with new discovery information and sync
|
||||
// the garbage collector.
|
||||
go garbageCollector.Sync(restMapper, discoveryClient, 30*time.Second, ctx.Stop)
|
||||
go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package garbagecollector
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -59,7 +60,7 @@ const ResourceResyncTime time.Duration = 0
|
|||
// ensures that the garbage collector operates with a graph that is at least as
|
||||
// up to date as the notification is sent.
|
||||
type GarbageCollector struct {
|
||||
restMapper meta.RESTMapper
|
||||
restMapper resettableRESTMapper
|
||||
// clientPool uses the regular dynamicCodec. We need it to update
|
||||
// finalizers. It can be removed if we support patching finalizers.
|
||||
clientPool dynamic.ClientPool
|
||||
|
@ -81,7 +82,7 @@ type GarbageCollector struct {
|
|||
func NewGarbageCollector(
|
||||
metaOnlyClientPool dynamic.ClientPool,
|
||||
clientPool dynamic.ClientPool,
|
||||
mapper meta.RESTMapper,
|
||||
mapper resettableRESTMapper,
|
||||
deletableResources map[schema.GroupVersionResource]struct{},
|
||||
ignoredResources map[schema.GroupResource]struct{},
|
||||
sharedInformers informers.SharedInformerFactory,
|
||||
|
@ -162,25 +163,59 @@ type resettableRESTMapper interface {
|
|||
Reset()
|
||||
}
|
||||
|
||||
// Sync periodically resyncs the garbage collector monitors with resources
|
||||
// returned found via the discoveryClient. Sync blocks, continuing to sync until
|
||||
// a message is received on stopCh.
|
||||
// Sync periodically resyncs the garbage collector when new resources are
|
||||
// observed from discovery. When new resources are detected, Sync will stop all
|
||||
// GC workers, reset gc.restMapper, and resync the monitors.
|
||||
//
|
||||
// The discoveryClient should be the same client which underlies restMapper.
|
||||
func (gc *GarbageCollector) Sync(restMapper resettableRESTMapper, discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) {
|
||||
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
|
||||
// the mapper's underlying discovery client will be unnecessarily reset during
|
||||
// the course of detecting new resources.
|
||||
func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) {
|
||||
oldResources := make(map[schema.GroupVersionResource]struct{})
|
||||
wait.Until(func() {
|
||||
// Get the current resource list from discovery.
|
||||
newResources, err := GetDeletableResources(discoveryClient)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Detect first or abnormal sync and try again later.
|
||||
if oldResources == nil || len(oldResources) == 0 {
|
||||
oldResources = newResources
|
||||
return
|
||||
}
|
||||
|
||||
// Decide whether discovery has reported a change.
|
||||
if reflect.DeepEqual(oldResources, newResources) {
|
||||
glog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")
|
||||
return
|
||||
}
|
||||
|
||||
// Something has changed, so track the new state and perform a sync.
|
||||
glog.V(2).Infof("syncing garbage collector with updated resources from discovery: %v", newResources)
|
||||
oldResources = newResources
|
||||
|
||||
// Ensure workers are paused to avoid processing events before informers
|
||||
// have resynced.
|
||||
gc.workerLock.Lock()
|
||||
defer gc.workerLock.Unlock()
|
||||
|
||||
restMapper.Reset()
|
||||
deletableResources, err := GetDeletableResources(discoveryClient)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
if err := gc.resyncMonitors(deletableResources); err != nil {
|
||||
// Resetting the REST mapper will also invalidate the underlying discovery
|
||||
// client. This is a leaky abstraction and assumes behavior about the REST
|
||||
// mapper, but we'll deal with it for now.
|
||||
gc.restMapper.Reset()
|
||||
|
||||
// Perform the monitor resync and wait for controllers to report cache sync.
|
||||
//
|
||||
// NOTE: It's possible that newResources will diverge from the resources
|
||||
// discovered by restMapper during the call to Reset, since they are
|
||||
// distinct discovery clients invalidated at different times. For example,
|
||||
// newResources may contain resources not returned in the restMapper's
|
||||
// discovery call if the resources appeared inbetween the calls. In that
|
||||
// case, the restMapper will fail to map some of newResources until the next
|
||||
// sync period.
|
||||
if err := gc.resyncMonitors(newResources); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
||||
return
|
||||
}
|
||||
|
|
|
@ -46,11 +46,17 @@ import (
|
|||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
)
|
||||
|
||||
type testRESTMapper struct {
|
||||
meta.RESTMapper
|
||||
}
|
||||
|
||||
func (_ *testRESTMapper) Reset() {}
|
||||
|
||||
func TestGarbageCollectorConstruction(t *testing.T) {
|
||||
config := &restclient.Config{}
|
||||
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
||||
tweakableRM := meta.NewDefaultRESTMapper(nil, nil)
|
||||
rm := meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()}
|
||||
rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()}}
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig.NegotiatedSerializer = nil
|
||||
clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
|
||||
|
@ -168,7 +174,7 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector {
|
|||
podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}}
|
||||
client := fake.NewSimpleClientset()
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, ignoredResources, sharedInformers)
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -265,7 +265,7 @@ func setup(t *testing.T, workerCount int) *testContext {
|
|||
syncPeriod := 5 * time.Second
|
||||
startGC := func(workers int) {
|
||||
go gc.Run(workers, stopCh)
|
||||
go gc.Sync(restMapper, discoveryClient, syncPeriod, stopCh)
|
||||
go gc.Sync(clientSet.Discovery(), syncPeriod, stopCh)
|
||||
}
|
||||
|
||||
if workerCount > 0 {
|
||||
|
|
Loading…
Reference in New Issue