Make controller Run methods consistent

- startup/shutdown logging
- wait for cache sync logging
- defer utilruntime.HandleCrash()
- wait for stop channel before exiting
pull/6/head
Andy Goldstein 2017-04-12 15:49:17 -04:00
parent 03e555f0f3
commit e63fcf708d
29 changed files with 200 additions and 112 deletions

View File

@ -484,7 +484,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
if err != nil {
return fmt.Errorf("failed to initialize nodecontroller: %v", err)
}
nodeController.Run()
go nodeController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", nodeControllerName)

View File

@ -393,6 +393,7 @@ staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregis
staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion
staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion
staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1alpha1
staging/src/k8s.io/kube-aggregator/pkg/controllers
staging/src/k8s.io/sample-apiserver
staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/install
test/e2e/perftype

View File

@ -117,18 +117,18 @@ func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()
glog.Infof("Starting certificate controller manager")
glog.Infof("Starting certificate controller")
defer glog.Infof("Shutting down certificate controller")
if !cache.WaitForCacheSync(stopCh, cc.csrsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("certificate", stopCh, cc.csrsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down certificate controller")
}
// worker runs a thread that dequeues CSRs, handles them, and marks them done.

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
@ -961,3 +962,18 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n
_, err = c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes)
return err
}
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
glog.Infof("Waiting for caches to sync for %s controller", controllerName)
if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
return false
}
glog.Infof("Caches are synced for %s controller", controllerName)
return true
}

View File

@ -196,10 +196,10 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dsc.queue.ShutDown()
glog.Infof("Starting Daemon Sets controller manager")
glog.Infof("Starting daemon sets controller")
defer glog.Infof("Shutting down daemon sets controller")
if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.dsStoreSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.dsStoreSynced) {
return
}
@ -208,7 +208,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
glog.Infof("Shutting down Daemon Set Controller")
}
func (dsc *DaemonSetsController) runWorker() {

View File

@ -150,9 +150,9 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer dc.queue.ShutDown()
glog.Infof("Starting deployment controller")
defer glog.Infof("Shutting down deployment controller")
if !cache.WaitForCacheSync(stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}
@ -161,7 +161,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
glog.Infof("Shutting down deployment controller")
}
func (dc *DeploymentController) addDeployment(obj interface{}) {

View File

@ -266,24 +266,23 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
glog.V(0).Infof("Starting disruption controller")
glog.Infof("Starting disruption controller")
defer glog.Infof("Shutting down disruption controller")
if !cache.WaitForCacheSync(stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
return
}
if dc.kubeClient != nil {
glog.V(0).Infof("Sending events to api server.")
glog.Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(dc.kubeClient.Core().RESTClient()).Events("")})
} else {
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
glog.Infof("No api server defined - no events will be sent to API server.")
}
go wait.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.recheckWorker, time.Second, stopCh)
<-stopCh
glog.V(0).Infof("Shutting down disruption controller")
}
func (dc *DisruptionController) addDb(obj interface{}) {

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
"github.com/golang/glog"
@ -131,14 +132,17 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer e.queue.ShutDown()
if !cache.WaitForCacheSync(stopCh, e.podsSynced, e.servicesSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
glog.Infof("Starting endpoint controller")
defer glog.Infof("Shutting down endpoint controller")
if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(e.worker, time.Second, stopCh)
}
go func() {
defer utilruntime.HandleCrash()
time.Sleep(5 * time.Minute) // give time for our cache to fill

View File

@ -32,8 +32,8 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
// install the prometheus plugin
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus"
@ -104,25 +104,31 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
}
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer gc.attemptToDelete.ShutDown()
defer gc.attemptToOrphan.ShutDown()
defer gc.dependencyGraphBuilder.graphChanges.ShutDown()
glog.Infof("Garbage Collector: Initializing")
glog.Infof("Starting garbage collector controller")
defer glog.Infof("Shutting down garbage collector controller")
gc.dependencyGraphBuilder.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, gc.dependencyGraphBuilder.HasSynced) {
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.HasSynced) {
return
}
glog.Infof("Garbage Collector: All resource monitors have synced. Proceeding to collect garbage")
glog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")
// gc workers
for i := 0; i < workers; i++ {
go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
}
Register()
<-stopCh
glog.Infof("Garbage Collector: Shutting down")
}
func (gc *GarbageCollector) HasSynced() bool {

View File

@ -126,8 +126,10 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer jm.queue.ShutDown()
if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
glog.Infof("Starting job controller")
defer glog.Infof("Shutting down job controller")
if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
return
}
@ -136,7 +138,6 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
glog.Infof("Shutting down Job Manager")
}
// getPodJob returns the job managing the given pod.

View File

@ -186,10 +186,10 @@ func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer nm.queue.ShutDown()
glog.Info("Starting the NamespaceController")
glog.Info("Starting namespace controller")
defer glog.Infof("Shutting down namespace controller")
if !cache.WaitForCacheSync(stopCh, nm.listerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("namespace", stopCh, nm.listerSynced) {
return
}
@ -198,6 +198,4 @@ func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
glog.Info("Shutting down NamespaceController")
}

View File

@ -518,37 +518,39 @@ func (nc *NodeController) onNodeDelete(originalObj interface{}) {
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() {
go func() {
defer utilruntime.HandleCrash()
func (nc *NodeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
glog.Infof("Starting node controller")
defer glog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}
}()
<-stopCh
}
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,

View File

@ -136,10 +136,10 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer a.queue.ShutDown()
glog.Infof("Starting HPA Controller")
glog.Infof("Starting HPA controller")
defer glog.Infof("Shutting down HPA controller")
if !cache.WaitForCacheSync(stopCh, a.hpaListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced) {
return
}
@ -147,7 +147,6 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) {
go wait.Until(a.worker, time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down HPA Controller")
}
// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.

View File

@ -17,7 +17,6 @@ limitations under the License.
package podgc
import (
"fmt"
"sort"
"sync"
"time"
@ -32,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
"github.com/golang/glog"
@ -71,12 +71,17 @@ func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInfor
}
func (gcc *PodGCController) Run(stop <-chan struct{}) {
if !cache.WaitForCacheSync(stop, gcc.podListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
defer utilruntime.HandleCrash()
glog.Infof("Starting GC controller")
defer glog.Infof("Shutting down GC controller")
if !controller.WaitForCacheSync("GC", stop, gcc.podListerSynced) {
return
}
go wait.Until(gcc.gc, gcCheckPeriod, stop)
<-stop
}

View File

@ -152,10 +152,10 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
glog.Infof("Starting ReplicaSet controller")
glog.Infof("Starting replica set controller")
defer glog.Infof("Shutting down replica set Controller")
if !cache.WaitForCacheSync(stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("replica set", stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
@ -164,7 +164,6 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
glog.Infof("Shutting down ReplicaSet Controller")
}
// getPodReplicaSets returns a list of ReplicaSets matching the given pod.

View File

@ -147,10 +147,10 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rm.queue.ShutDown()
glog.Infof("Starting RC Manager")
glog.Infof("Starting RC controller")
defer glog.Infof("Shutting down RC controller")
if !cache.WaitForCacheSync(stopCh, rm.podListerSynced, rm.rcListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("RC", stopCh, rm.podListerSynced, rm.rcListerSynced) {
return
}
@ -159,7 +159,6 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
glog.Infof("Shutting down RC Manager")
}
// getPodControllers returns a list of ReplicationControllers matching the given pod.

View File

@ -236,16 +236,17 @@ func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface)
// Run begins quota controller using the specified number of workers
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rq.queue.ShutDown()
glog.Infof("Starting resource quota controller")
defer glog.Infof("Shutting down resource quota controller")
// the controllers that replenish other resources to respond rapidly to state changes
for _, replenishmentController := range rq.replenishmentControllers {
go replenishmentController.Run(stopCh)
}
if !cache.WaitForCacheSync(stopCh, rq.informerSyncedFuncs...) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
return
}
@ -254,9 +255,8 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down ResourceQuotaController")
rq.queue.ShutDown()
}
// syncResourceQuotaFromKey syncs a quota key

View File

@ -35,6 +35,7 @@ import (
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
@ -77,10 +78,10 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) {
defer utilruntime.HandleCrash()
glog.Info("Starting the route controller")
glog.Info("Starting route controller")
defer glog.Info("Shutting down route controller")
if !cache.WaitForCacheSync(stopCh, rc.nodeListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("route", stopCh, rc.nodeListerSynced) {
return
}
@ -94,6 +95,8 @@ func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration)
glog.Errorf("Couldn't reconcile node routes: %v", err)
}
}, syncPeriod, wait.NeverStop)
<-stopCh
}
func (rc *RouteController) reconcileNodeRoutes() error {

View File

@ -174,9 +174,9 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
defer s.workingQueue.ShutDown()
glog.Info("Starting service controller")
defer glog.Info("Shutting down service controller")
if !cache.WaitForCacheSync(stopCh, s.serviceListerSynced, s.nodeListerSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) {
}
for i := 0; i < workers; i++ {
@ -186,7 +186,6 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)
<-stopCh
glog.Info("Stopping service controller")
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
)
@ -109,10 +110,10 @@ func (c *ServiceAccountsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
glog.Infof("Starting ServiceAccount controller")
glog.Infof("Starting service account controller")
defer glog.Infof("Shutting down service account controller")
if !cache.WaitForCacheSync(stopCh, c.saListerSynced, c.nsListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("service account", stopCh, c.saListerSynced, c.nsListerSynced) {
return
}
@ -121,7 +122,6 @@ func (c *ServiceAccountsController) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
glog.Infof("Shutting down ServiceAccount controller")
}
// serviceAccountDeleted reacts to a ServiceAccount deletion by recreating a default ServiceAccount in the namespace if needed

View File

@ -141,10 +141,10 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ssc.queue.ShutDown()
glog.Infof("Starting statefulset controller")
glog.Infof("Starting stateful set controller")
defer glog.Infof("Shutting down statefulset controller")
if !cache.WaitForCacheSync(stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced) {
return
}
@ -153,7 +153,6 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
glog.Infof("Shutting down statefulset controller")
}
// addPod adds the statefulset for the pod to the sync queue

View File

@ -116,7 +116,8 @@ func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting TTL controller")
defer glog.Infof("Shutting down TTL controller")
if !cache.WaitForCacheSync(stopCh, ttlc.hasSynced) {
if !controller.WaitForCacheSync("TTL", stopCh, ttlc.hasSynced) {
return
}

View File

@ -220,13 +220,14 @@ type attachDetachController struct {
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
glog.Infof("Starting Attach Detach Controller")
glog.Infof("Starting attach detach controller")
defer glog.Infof("Shutting down attach detach controller")
// TODO uncomment once we agree this is ok and we fix the attach/detach integration test that
// currently fails because it doesn't set pvcsSynced and pvsSynced to alwaysReady, so this
// controller never runs.
// if !kcache.WaitForCacheSync(stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) {
// runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
// if !controller.WaitForCacheSync("attach detach", stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) {
// return
// }
@ -234,7 +235,6 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
go adc.desiredStateOfWorldPopulator.Run(stopCh)
<-stopCh
glog.Infof("Shutting down Attach Detach Controller")
}
func (adc *attachDetachController) podAdd(obj interface{}) {

View File

@ -273,19 +273,23 @@ func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeCl
// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
glog.V(1).Infof("starting PersistentVolumeController")
if !cache.WaitForCacheSync(stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for volume caches to sync"))
defer utilruntime.HandleCrash()
defer ctrl.claimQueue.ShutDown()
defer ctrl.volumeQueue.ShutDown()
glog.Infof("Starting persistent volume controller")
defer glog.Infof("Shutting down peristent volume controller")
if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) {
return
}
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
<-stopCh
ctrl.claimQueue.ShutDown()
ctrl.volumeQueue.ShutDown()
}
// volumeWorker processes items from volumeQueue. It must run only once,

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/extensions/internalversion"
listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
)
@ -108,7 +109,7 @@ func (c *tprRegistrationController) Run(threadiness int, stopCh <-chan struct{})
defer glog.Infof("Shutting down tpr-autoregister controller")
// wait for your secondary caches to fill before starting your work
if !cache.WaitForCacheSync(stopCh, c.tprSynced) {
if !controller.WaitForCacheSync("tpr-autoregister", stopCh, c.tprSynced) {
return
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion"
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/config"
)
@ -96,8 +97,12 @@ func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
// Run starts the goroutine responsible for calling registered handlers.
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
utilruntime.HandleError(fmt.Errorf("endpoint controller not synced"))
defer utilruntime.HandleCrash()
glog.Info("Starting endpoints config controller")
defer glog.Info("Shutting down endpoints config controller")
if !controller.WaitForCacheSync("endpoints config", stopCh, c.listerSynced) {
return
}
@ -201,14 +206,19 @@ func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
// Run starts the goroutine responsible for calling
// registered handlers.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
utilruntime.HandleError(fmt.Errorf("service controller not synced"))
defer utilruntime.HandleCrash()
glog.Info("Starting service config controller")
defer glog.Info("Shutting down service config controller")
if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) {
return
}
// We have synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
defer utilruntime.HandleCrash()
for {
select {
case <-c.updates:
@ -236,6 +246,8 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
<-stopCh
close(c.stop)
}()
<-stopCh
}
func (c *ServiceConfig) handleAddService(_ interface{}) {

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
"k8s.io/kube-aggregator/pkg/controllers"
)
type APIHandlerManager interface {
@ -124,12 +125,11 @@ func (c *APIServiceRegistrationController) getDestinationHost(apiService *apireg
func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
defer glog.Infof("Shutting down APIServiceRegistrationController")
glog.Infof("Starting APIServiceRegistrationController")
defer glog.Infof("Shutting down APIServiceRegistrationController")
if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced, c.servicesSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controllers.WaitForCacheSync("APIServiceRegistrationController", stopCh, c.apiServiceSynced, c.servicesSynced) {
return
}

View File

@ -35,6 +35,7 @@ import (
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
"k8s.io/kube-aggregator/pkg/controllers"
)
const (
@ -120,7 +121,7 @@ func (c *autoRegisterController) Run(threadiness int, stopCh <-chan struct{}) {
defer glog.Infof("Shutting down autoregister controller")
// wait for your secondary caches to fill before starting your work
if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced) {
if !controllers.WaitForCacheSync("autoregister", stopCh, c.apiServiceSynced) {
return
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"fmt"
"github.com/golang/glog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
)
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
glog.Infof("Waiting for caches to sync for %s controller", controllerName)
if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
return false
}
glog.Infof("Caches are synced for %s controller", controllerName)
return true
}