From 60cb4406e9f7fe00412702b2871e2f918c586361 Mon Sep 17 00:00:00 2001 From: caogaojin Date: Mon, 24 Aug 2015 09:59:15 +0800 Subject: [PATCH] Cleanup deprecated Forever function Since util.Forever function has been deprecated, we should cleanup these pieces of code. --- cmd/kube-proxy/app/server.go | 4 ++-- cmd/kubelet/app/server.go | 14 +++++++------- contrib/mesos/pkg/election/etcd_master.go | 2 +- contrib/mesos/pkg/executor/service/service.go | 4 ++-- pkg/client/unversioned/cache/poller.go | 2 +- pkg/client/unversioned/cache/reflector.go | 2 +- .../horizontalpodautoscaler_controller.go | 4 ++-- pkg/controller/node/nodecontroller.go | 8 ++++---- .../resourcequota/resource_quota_controller.go | 2 +- pkg/controller/route/routecontroller.go | 4 ++-- pkg/kubelet/config/file.go | 2 +- pkg/kubelet/config/http.go | 2 +- pkg/kubelet/image_manager.go | 4 ++-- pkg/kubelet/kubelet.go | 8 ++++---- pkg/kubelet/status_manager.go | 4 ++-- pkg/util/config/config.go | 2 +- pkg/util/logs.go | 2 +- pkg/util/util.go | 9 ++------- 18 files changed, 37 insertions(+), 42 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 5ef9169121..fbe7d02262 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -225,12 +225,12 @@ func (s *ProxyServer) Run(_ []string) error { ) if s.HealthzPort > 0 { - go util.Forever(func() { + go util.Until(func() { err := http.ListenAndServe(s.HealthzBindAddress.String()+":"+strconv.Itoa(s.HealthzPort), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } - }, 5*time.Second) + }, 5*time.Second, util.NeverStop) } // Just loop forever for now... diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index a581e56233..4f571739ba 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -417,12 +417,12 @@ func (s *KubeletServer) Run(kcfg *KubeletConfig) error { if s.HealthzPort > 0 { healthz.DefaultHealthz() - go util.Forever(func() { + go util.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } - }, 5*time.Second) + }, 5*time.Second, util.NeverStop) } if s.RunOnce { @@ -671,18 +671,18 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error { func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) { // start the kubelet - go util.Forever(func() { k.Run(podCfg.Updates()) }, 0) + go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop) // start the kubelet server if kc.EnableServer { - go util.Forever(func() { + go util.Until(func() { k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers) - }, 0) + }, 0, util.NeverStop) } if kc.ReadOnlyPort > 0 { - go util.Forever(func() { + go util.Until(func() { k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort) - }, 0) + }, 0, util.NeverStop) } } diff --git a/contrib/mesos/pkg/election/etcd_master.go b/contrib/mesos/pkg/election/etcd_master.go index 3e5e7fc436..b740c2f81a 100644 --- a/contrib/mesos/pkg/election/etcd_master.go +++ b/contrib/mesos/pkg/election/etcd_master.go @@ -55,7 +55,7 @@ type etcdMasterElector struct { func (e *etcdMasterElector) Elect(path, id string) watch.Interface { e.done = make(chan empty) e.events = make(chan watch.Event) - go util.Forever(func() { e.run(path, id) }, time.Second*5) + go util.Until(func() { e.run(path, id) }, time.Second*5, util.NeverStop) return e } diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 71805e06f6..1735009129 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -274,12 +274,12 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { if s.HealthzPort > 0 { healthz.DefaultHealthz() - go util.Forever(func() { + go util.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil) if err != nil { log.Errorf("Starting health server failed: %v", err) } - }, 5*time.Second) + }, 5*time.Second, util.NeverStop) } // block until executor is shut down or commits shutdown diff --git a/pkg/client/unversioned/cache/poller.go b/pkg/client/unversioned/cache/poller.go index f7a3f864ff..ef73ae29f3 100644 --- a/pkg/client/unversioned/cache/poller.go +++ b/pkg/client/unversioned/cache/poller.go @@ -57,7 +57,7 @@ func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller { // Run begins polling. It starts a goroutine and returns immediately. func (p *Poller) Run() { - go util.Forever(p.run, p.period) + go util.Until(p.run, p.period, util.NeverStop) } // RunUntil begins polling. It starts a goroutine and returns immediately. diff --git a/pkg/client/unversioned/cache/reflector.go b/pkg/client/unversioned/cache/reflector.go index db1163dbd2..85a5ff58eb 100644 --- a/pkg/client/unversioned/cache/reflector.go +++ b/pkg/client/unversioned/cache/reflector.go @@ -135,7 +135,7 @@ outer: // Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run starts a goroutine and returns immediately. func (r *Reflector) Run() { - go util.Forever(func() { r.ListAndWatch(util.NeverStop) }, r.period) + go util.Until(func() { r.ListAndWatch(util.NeverStop) }, r.period, util.NeverStop) } // RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. diff --git a/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go b/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go index d49d98a771..15bc090f27 100644 --- a/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go +++ b/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go @@ -39,11 +39,11 @@ func New(kubeClient unversioned.ExperimentalInterface) *HorizontalPodAutoscalerC } func (a *HorizontalPodAutoscalerController) Run(syncPeriod time.Duration) { - go util.Forever(func() { + go util.Until(func() { if err := a.reconcileAutoscalers(); err != nil { glog.Errorf("Couldn't reconcile horizontal pod autoscalers: %v", err) } - }, syncPeriod) + }, syncPeriod, util.NeverStop) } func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 5c461ef303..92940f5467 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -138,15 +138,15 @@ func NewNodeController( // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration) { // Incorporate the results of node status pushed from kubelet to master. - go util.Forever(func() { + go util.Until(func() { if err := nc.monitorNodeStatus(); err != nil { glog.Errorf("Error monitoring node status: %v", err) } - }, nc.nodeMonitorPeriod) + }, nc.nodeMonitorPeriod, util.NeverStop) - go util.Forever(func() { + go util.Until(func() { nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) }) - }, nodeEvictionPeriod) + }, nodeEvictionPeriod, util.NeverStop) } // We observed a Node deletion in etcd. Currently we only need to remove Pods that diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 3b6ef3bfe1..3b92d2cd60 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -52,7 +52,7 @@ func NewResourceQuotaController(kubeClient client.Interface) *ResourceQuotaContr // Run begins watching and syncing. func (rm *ResourceQuotaController) Run(period time.Duration) { rm.syncTime = time.Tick(period) - go util.Forever(func() { rm.synchronize() }, period) + go util.Until(func() { rm.synchronize() }, period, util.NeverStop) } func (rm *ResourceQuotaController) synchronize() { diff --git a/pkg/controller/route/routecontroller.go b/pkg/controller/route/routecontroller.go index 43536cf206..bd513aca82 100644 --- a/pkg/controller/route/routecontroller.go +++ b/pkg/controller/route/routecontroller.go @@ -47,11 +47,11 @@ func New(routes cloudprovider.Routes, kubeClient client.Interface, clusterName s } func (rc *RouteController) Run(syncPeriod time.Duration) { - go util.Forever(func() { + go util.Until(func() { if err := rc.reconcileNodeRoutes(); err != nil { glog.Errorf("Couldn't reconcile node routes: %v", err) } - }, syncPeriod) + }, syncPeriod, util.NeverStop) } func (rc *RouteController) reconcileNodeRoutes() error { diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index d54222acf1..354f158a15 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -45,7 +45,7 @@ func NewSourceFile(path string, nodeName string, period time.Duration, updates c updates: updates, } glog.V(1).Infof("Watching path %q", path) - go util.Forever(config.run, period) + go util.Until(config.run, period, util.NeverStop) } func (s *sourceFile) run() { diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index 6fc267c539..8d9274b65c 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -49,7 +49,7 @@ func NewSourceURL(url string, header http.Header, nodeName string, period time.D data: nil, } glog.V(1).Infof("Watching URL %s", url) - go util.Forever(config.run, period) + go util.Until(config.run, period, util.NeverStop) } func (s *sourceURL) run() { diff --git a/pkg/kubelet/image_manager.go b/pkg/kubelet/image_manager.go index cba9dbae72..42a00d9bfe 100644 --- a/pkg/kubelet/image_manager.go +++ b/pkg/kubelet/image_manager.go @@ -118,12 +118,12 @@ func (im *realImageManager) Start() error { return err } - go util.Forever(func() { + go util.Until(func() { err := im.detectImages(time.Now()) if err != nil { glog.Warningf("[ImageManager] Failed to monitor images: %v", err) } - }, 5*time.Minute) + }, 5*time.Minute, util.NeverStop) return nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c8c278157e..8da1d09089 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -685,17 +685,17 @@ func (kl *Kubelet) GetNode() (*api.Node, error) { // Starts garbage collection threads. func (kl *Kubelet) StartGarbageCollection() { - go util.Forever(func() { + go util.Until(func() { if err := kl.containerGC.GarbageCollect(); err != nil { glog.Errorf("Container garbage collection failed: %v", err) } - }, time.Minute) + }, time.Minute, util.NeverStop) - go util.Forever(func() { + go util.Until(func() { if err := kl.imageManager.GarbageCollect(); err != nil { glog.Errorf("Image garbage collection failed: %v", err) } - }, 5*time.Minute) + }, 5*time.Minute, util.NeverStop) } // Run starts the kubelet reacting to config updates diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 5a6e53e772..6de7767762 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -75,12 +75,12 @@ func (s *statusManager) Start() { } // syncBatch blocks when no updates are available, we can run it in a tight loop. glog.Info("Starting to sync pod status with apiserver") - go util.Forever(func() { + go util.Until(func() { err := s.syncBatch() if err != nil { glog.Warningf("Failed to updated pod status: %v", err) } - }, 0) + }, 0, util.NeverStop) } func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) { diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 788773839d..7bb49b3836 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -74,7 +74,7 @@ func (m *Mux) Channel(source string) chan interface{} { } newChannel := make(chan interface{}) m.sources[source] = newChannel - go util.Forever(func() { m.listen(source, newChannel) }, 0) + go util.Until(func() { m.listen(source, newChannel) }, 0, util.NeverStop) return newChannel } diff --git a/pkg/util/logs.go b/pkg/util/logs.go index 03a7314a10..37b9b767b2 100644 --- a/pkg/util/logs.go +++ b/pkg/util/logs.go @@ -46,7 +46,7 @@ func InitLogs() { log.SetOutput(GlogWriter{}) log.SetFlags(0) // The default glog flush interval is 30 seconds, which is frighteningly long. - go Forever(glog.Flush, *logFlushFreq) + go Until(glog.Flush, *logFlushFreq, NeverStop) } // FlushLogs flushes logs immediately. diff --git a/pkg/util/util.go b/pkg/util/util.go index 07a8762d0b..88545b9159 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -87,18 +87,13 @@ func logError(err error) { glog.ErrorDepth(2, err) } -// Forever loops forever running f every period. Catches any panics, and keeps going. -// Deprecated. Please use Until and pass NeverStop as the stopCh. -func Forever(f func(), period time.Duration) { - Until(f, period, nil) -} - // NeverStop may be passed to Until to make it never stop. var NeverStop <-chan struct{} = make(chan struct{}) // Until loops until stop channel is closed, running f every period. // Catches any panics, and keeps going. f may not be invoked if -// stop channel is already closed. +// stop channel is already closed. Pass NeverStop to Until if you +// don't want it stop. func Until(f func(), period time.Duration, stopCh <-chan struct{}) { for { select {