Merge pull request #41652 from ncdc/shared-informers-13-namespace

Automatic merge from submit-queue (batch tested with PRs 39855, 41433, 41567, 41887, 41652)

Switch namespace controller to shared informer

@smarterclayton @derekwaynecarr @gmarek @wojtek-t @deads2k @sttts @liggitt @kubernetes/sig-scalability-pr-reviews
pull/6/head
Kubernetes Submit Queue 2017-02-23 09:36:38 -08:00 committed by GitHub
commit bfdeaf302c
5 changed files with 52 additions and 33 deletions

View File

@ -127,7 +127,14 @@ func startNamespaceController(ctx ControllerContext) (bool, error) {
return snapshot, nil
}
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
namespaceController := namespacecontroller.NewNamespaceController(
namespaceKubeClient,
namespaceClientPool,
discoverResourcesFn,
ctx.NewInformerFactory.Core().V1().Namespaces(),
ctx.Options.NamespaceSyncPeriod.Duration,
v1.FinalizerKubernetes,
)
go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop)
return true, nil

View File

@ -17,15 +17,16 @@ go_library(
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/namespace/deletion:go_default_library",
"//pkg/util/metrics:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/dynamic",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/workqueue",

View File

@ -17,18 +17,20 @@ limitations under the License.
package namespace
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/namespace/deletion"
"k8s.io/kubernetes/pkg/util/metrics"
@ -52,10 +54,10 @@ type NamespaceController struct {
kubeClient clientset.Interface
// clientPool manages a pool of dynamic clients
clientPool dynamic.ClientPool
// store that holds the namespaces
store cache.Store
// controller that observes the namespaces
controller cache.Controller
// lister that can list namespaces from a shared cache
lister corelisters.NamespaceLister
// returns true when the namespace cache is ready
listerSynced cache.InformerSynced
// namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface
// function to list of preferred resources for namespace deletion
@ -71,6 +73,7 @@ func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
namespaceInformer coreinformers.NamespaceInformer,
resyncPeriod time.Duration,
finalizerToken v1.FinalizerName) *NamespaceController {
@ -88,18 +91,8 @@ func NewNamespaceController(
metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
// configure the backing store/controller
store, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return kubeClient.Core().Namespaces().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return kubeClient.Core().Namespaces().Watch(options)
},
},
&v1.Namespace{},
resyncPeriod,
// configure the namespace informer event handlers
namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
namespace := obj.(*v1.Namespace)
@ -110,10 +103,11 @@ func NewNamespaceController(
namespaceController.enqueueNamespace(namespace)
},
},
resyncPeriod,
)
namespaceController.lister = namespaceInformer.Lister()
namespaceController.listerSynced = namespaceInformer.Informer().HasSynced
namespaceController.store = store
namespaceController.controller = controller
return namespaceController
}
@ -122,7 +116,7 @@ func NewNamespaceController(
func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
// delay processing namespace events to allow HA api servers to observe namespace deletion,
@ -175,28 +169,35 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
startTime := time.Now()
defer glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime))
obj, exists, err := nm.store.GetByKey(key)
if !exists {
namespace, err := nm.lister.Get(key)
if errors.IsNotFound(err) {
glog.Infof("Namespace has been deleted %v", key)
return nil
}
if err != nil {
glog.Errorf("Unable to retrieve namespace %v from store: %v", key, err)
nm.queue.Add(key)
utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err))
return err
}
namespace := obj.(*v1.Namespace)
return nm.namespacedResourcesDeleter.Delete(namespace.Name)
}
// Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go nm.controller.Run(stopCh)
defer nm.queue.ShutDown()
glog.Info("Starting the NamespaceController")
if !cache.WaitForCacheSync(stopCh, nm.listerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
for i := 0; i < workers; i++ {
go wait.Until(nm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down NamespaceController")
nm.queue.ShutDown()
glog.Info("Shutting down NamespaceController")
}

View File

@ -26,6 +26,7 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller/namespace:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e_node/builder:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -57,7 +58,15 @@ func (n *NamespaceController) Start() error {
}
clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
discoverResourcesFn := client.Discovery().ServerPreferredNamespacedResources
nc := namespacecontroller.NewNamespaceController(client, clientPool, discoverResourcesFn, ncResyncPeriod, v1.FinalizerKubernetes)
informerFactory := informers.NewSharedInformerFactory(client, ncResyncPeriod)
nc := namespacecontroller.NewNamespaceController(
client,
clientPool,
discoverResourcesFn,
informerFactory.Core().V1().Namespaces(),
ncResyncPeriod, v1.FinalizerKubernetes,
)
informerFactory.Start(n.stopCh)
go nc.Run(ncConcurrency, n.stopCh)
return nil
}