update GC controller to wait until controllers have been initialized once

pull/6/head
David Eads 2017-08-24 12:39:55 -04:00
parent 4a6bbb9f50
commit 253b047d89
6 changed files with 30 additions and 3 deletions

View File

@ -157,6 +157,7 @@ func Run(s *options.CMServer) error {
}
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)
select {}
}
@ -264,6 +265,10 @@ type ControllerContext struct {
// Stop is the stop channel
Stop <-chan struct{}
// InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe,
// for an individual controller to start the shared informers. Before it is closed, they should not.
InformersStarted chan struct{}
}
func (c ControllerContext) IsControllerEnabled(name string) bool {
@ -443,6 +448,7 @@ func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuild
AvailableResources: availableResources,
Cloud: cloud,
Stop: stop,
InformersStarted: make(chan struct{}),
}
return ctx, nil
}

View File

@ -330,6 +330,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
deletableResources,
ignoredResources,
ctx.InformerFactory,
ctx.InformersStarted,
)
if err != nil {
return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)

View File

@ -86,6 +86,7 @@ func NewGarbageCollector(
deletableResources map[schema.GroupVersionResource]struct{},
ignoredResources map[schema.GroupResource]struct{},
sharedInformers informers.SharedInformerFactory,
informersStarted <-chan struct{},
) (*GarbageCollector, error) {
attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
@ -100,6 +101,7 @@ func NewGarbageCollector(
}
gb := &GraphBuilder{
metaOnlyClientPool: metaOnlyClientPool,
informersStarted: informersStarted,
registeredRateLimiterForControllers: NewRegisteredRateLimiter(deletableResources),
restMapper: mapper,
graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),

View File

@ -72,7 +72,9 @@ func TestGarbageCollectorConstruction(t *testing.T) {
// No monitor will be constructed for the non-core resource, but the GC
// construction will not fail.
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
if err != nil {
t.Fatal(err)
}
@ -174,7 +176,9 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector {
podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}}
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers, alwaysStarted)
if err != nil {
t.Fatal(err)
}
@ -358,9 +362,12 @@ func TestProcessEvent(t *testing.T) {
},
}
alwaysStarted := make(chan struct{})
close(alwaysStarted)
for _, scenario := range testScenarios {
dependencyGraphBuilder := &GraphBuilder{
graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
informersStarted: alwaysStarted,
graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
uidToNode: &concurrentUIDToNode{
uidToNodeLock: sync.RWMutex{},
uidToNode: make(map[types.UID]*node),

View File

@ -78,6 +78,10 @@ type GraphBuilder struct {
// dependencyGraphBuilder
monitors monitors
monitorLock sync.Mutex
// informersStarted is closed after after all of the controllers have been initialized and are running.
// After that it is safe to start them here, before that it is not.
informersStarted <-chan struct{}
// stopCh drives shutdown. If it is nil, it indicates that Run() has not been
// called yet. If it is non-nil, then when closed it indicates everything
// should shut down.
@ -279,6 +283,10 @@ func (gb *GraphBuilder) startMonitors() {
return
}
// we're waiting until after the informer start that happens once all the controllers are initialized. This ensures
// that they don't get unexpected events on their work queues.
<-gb.informersStarted
monitors := gb.monitors
started := 0
for _, monitor := range monitors {

View File

@ -242,6 +242,8 @@ func setup(t *testing.T, workerCount int) *testContext {
metaOnlyClientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
clientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc)
sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
metaOnlyClientPool,
clientPool,
@ -249,6 +251,7 @@ func setup(t *testing.T, workerCount int) *testContext {
deletableResources,
garbagecollector.DefaultIgnoredResources(),
sharedInformers,
alwaysStarted,
)
if err != nil {
t.Fatalf("failed to create garbage collector: %v", err)