diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 6560eec44f..0221f58ab9 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -157,6 +157,7 @@ func Run(s *options.CMServer) error { } ctx.InformerFactory.Start(ctx.Stop) + close(ctx.InformersStarted) select {} } @@ -264,6 +265,10 @@ type ControllerContext struct { // Stop is the stop channel Stop <-chan struct{} + + // InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe, + // for an individual controller to start the shared informers. Before it is closed, they should not. + InformersStarted chan struct{} } func (c ControllerContext) IsControllerEnabled(name string) bool { @@ -443,6 +448,7 @@ func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuild AvailableResources: availableResources, Cloud: cloud, Stop: stop, + InformersStarted: make(chan struct{}), } return ctx, nil } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 46c9309f51..78925fbc67 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -330,6 +330,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { deletableResources, ignoredResources, ctx.InformerFactory, + ctx.InformersStarted, ) if err != nil { return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index efffc5a91f..a430e5f964 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -86,6 +86,7 @@ func NewGarbageCollector( deletableResources map[schema.GroupVersionResource]struct{}, ignoredResources map[schema.GroupResource]struct{}, sharedInformers informers.SharedInformerFactory, + informersStarted <-chan struct{}, ) (*GarbageCollector, error) { attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete") attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan") @@ -100,6 +101,7 @@ func NewGarbageCollector( } gb := &GraphBuilder{ metaOnlyClientPool: metaOnlyClientPool, + informersStarted: informersStarted, registeredRateLimiterForControllers: NewRegisteredRateLimiter(deletableResources), restMapper: mapper, graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"), diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 4ba3c90c8c..fa4d53a8d0 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -72,7 +72,9 @@ func TestGarbageCollectorConstruction(t *testing.T) { // No monitor will be constructed for the non-core resource, but the GC // construction will not fail. - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted) if err != nil { t.Fatal(err) } @@ -174,7 +176,9 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector { podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}} client := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(client, 0) - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers, alwaysStarted) if err != nil { t.Fatal(err) } @@ -358,9 +362,12 @@ func TestProcessEvent(t *testing.T) { }, } + alwaysStarted := make(chan struct{}) + close(alwaysStarted) for _, scenario := range testScenarios { dependencyGraphBuilder := &GraphBuilder{ - graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + informersStarted: alwaysStarted, + graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), uidToNode: &concurrentUIDToNode{ uidToNodeLock: sync.RWMutex{}, uidToNode: make(map[types.UID]*node), diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index babe81bf72..2cc7cb4daf 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -78,6 +78,10 @@ type GraphBuilder struct { // dependencyGraphBuilder monitors monitors monitorLock sync.Mutex + // informersStarted is closed after after all of the controllers have been initialized and are running. + // After that it is safe to start them here, before that it is not. + informersStarted <-chan struct{} + // stopCh drives shutdown. If it is nil, it indicates that Run() has not been // called yet. If it is non-nil, then when closed it indicates everything // should shut down. @@ -279,6 +283,10 @@ func (gb *GraphBuilder) startMonitors() { return } + // we're waiting until after the informer start that happens once all the controllers are initialized. This ensures + // that they don't get unexpected events on their work queues. + <-gb.informersStarted + monitors := gb.monitors started := 0 for _, monitor := range monitors { diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index e54b8170a9..5bd613f5e3 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -242,6 +242,8 @@ func setup(t *testing.T, workerCount int) *testContext { metaOnlyClientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc) clientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc) sharedInformers := informers.NewSharedInformerFactory(clientSet, 0) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) gc, err := garbagecollector.NewGarbageCollector( metaOnlyClientPool, clientPool, @@ -249,6 +251,7 @@ func setup(t *testing.T, workerCount int) *testContext { deletableResources, garbagecollector.DefaultIgnoredResources(), sharedInformers, + alwaysStarted, ) if err != nil { t.Fatalf("failed to create garbage collector: %v", err)