Cleanup deprecated Forever function

Since util.Forever function has been deprecated, we should cleanup these
pieces of code.
pull/6/head
caogaojin 2015-08-24 09:59:15 +08:00
parent 5fe7029e68
commit 60cb4406e9
18 changed files with 37 additions and 42 deletions

View File

@ -225,12 +225,12 @@ func (s *ProxyServer) Run(_ []string) error {
) )
if s.HealthzPort > 0 { if s.HealthzPort > 0 {
go util.Forever(func() { go util.Until(func() {
err := http.ListenAndServe(s.HealthzBindAddress.String()+":"+strconv.Itoa(s.HealthzPort), nil) err := http.ListenAndServe(s.HealthzBindAddress.String()+":"+strconv.Itoa(s.HealthzPort), nil)
if err != nil { if err != nil {
glog.Errorf("Starting health server failed: %v", err) glog.Errorf("Starting health server failed: %v", err)
} }
}, 5*time.Second) }, 5*time.Second, util.NeverStop)
} }
// Just loop forever for now... // Just loop forever for now...

View File

@ -417,12 +417,12 @@ func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
if s.HealthzPort > 0 { if s.HealthzPort > 0 {
healthz.DefaultHealthz() healthz.DefaultHealthz()
go util.Forever(func() { go util.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil) err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil)
if err != nil { if err != nil {
glog.Errorf("Starting health server failed: %v", err) glog.Errorf("Starting health server failed: %v", err)
} }
}, 5*time.Second) }, 5*time.Second, util.NeverStop)
} }
if s.RunOnce { if s.RunOnce {
@ -671,18 +671,18 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) { func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet // start the kubelet
go util.Forever(func() { k.Run(podCfg.Updates()) }, 0) go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop)
// start the kubelet server // start the kubelet server
if kc.EnableServer { if kc.EnableServer {
go util.Forever(func() { go util.Until(func() {
k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers) k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers)
}, 0) }, 0, util.NeverStop)
} }
if kc.ReadOnlyPort > 0 { if kc.ReadOnlyPort > 0 {
go util.Forever(func() { go util.Until(func() {
k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort) k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)
}, 0) }, 0, util.NeverStop)
} }
} }

View File

@ -55,7 +55,7 @@ type etcdMasterElector struct {
func (e *etcdMasterElector) Elect(path, id string) watch.Interface { func (e *etcdMasterElector) Elect(path, id string) watch.Interface {
e.done = make(chan empty) e.done = make(chan empty)
e.events = make(chan watch.Event) e.events = make(chan watch.Event)
go util.Forever(func() { e.run(path, id) }, time.Second*5) go util.Until(func() { e.run(path, id) }, time.Second*5, util.NeverStop)
return e return e
} }

View File

@ -274,12 +274,12 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
if s.HealthzPort > 0 { if s.HealthzPort > 0 {
healthz.DefaultHealthz() healthz.DefaultHealthz()
go util.Forever(func() { go util.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil) err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil)
if err != nil { if err != nil {
log.Errorf("Starting health server failed: %v", err) log.Errorf("Starting health server failed: %v", err)
} }
}, 5*time.Second) }, 5*time.Second, util.NeverStop)
} }
// block until executor is shut down or commits shutdown // block until executor is shut down or commits shutdown

View File

@ -57,7 +57,7 @@ func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller {
// Run begins polling. It starts a goroutine and returns immediately. // Run begins polling. It starts a goroutine and returns immediately.
func (p *Poller) Run() { func (p *Poller) Run() {
go util.Forever(p.run, p.period) go util.Until(p.run, p.period, util.NeverStop)
} }
// RunUntil begins polling. It starts a goroutine and returns immediately. // RunUntil begins polling. It starts a goroutine and returns immediately.

View File

@ -135,7 +135,7 @@ outer:
// Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run starts a goroutine and returns immediately. // Run starts a goroutine and returns immediately.
func (r *Reflector) Run() { func (r *Reflector) Run() {
go util.Forever(func() { r.ListAndWatch(util.NeverStop) }, r.period) go util.Until(func() { r.ListAndWatch(util.NeverStop) }, r.period, util.NeverStop)
} }
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. // RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.

View File

@ -39,11 +39,11 @@ func New(kubeClient unversioned.ExperimentalInterface) *HorizontalPodAutoscalerC
} }
func (a *HorizontalPodAutoscalerController) Run(syncPeriod time.Duration) { func (a *HorizontalPodAutoscalerController) Run(syncPeriod time.Duration) {
go util.Forever(func() { go util.Until(func() {
if err := a.reconcileAutoscalers(); err != nil { if err := a.reconcileAutoscalers(); err != nil {
glog.Errorf("Couldn't reconcile horizontal pod autoscalers: %v", err) glog.Errorf("Couldn't reconcile horizontal pod autoscalers: %v", err)
} }
}, syncPeriod) }, syncPeriod, util.NeverStop)
} }
func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {

View File

@ -138,15 +138,15 @@ func NewNodeController(
// Run starts an asynchronous loop that monitors the status of cluster nodes. // Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) { func (nc *NodeController) Run(period time.Duration) {
// Incorporate the results of node status pushed from kubelet to master. // Incorporate the results of node status pushed from kubelet to master.
go util.Forever(func() { go util.Until(func() {
if err := nc.monitorNodeStatus(); err != nil { if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err) glog.Errorf("Error monitoring node status: %v", err)
} }
}, nc.nodeMonitorPeriod) }, nc.nodeMonitorPeriod, util.NeverStop)
go util.Forever(func() { go util.Until(func() {
nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) }) nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) })
}, nodeEvictionPeriod) }, nodeEvictionPeriod, util.NeverStop)
} }
// We observed a Node deletion in etcd. Currently we only need to remove Pods that // We observed a Node deletion in etcd. Currently we only need to remove Pods that

View File

@ -52,7 +52,7 @@ func NewResourceQuotaController(kubeClient client.Interface) *ResourceQuotaContr
// Run begins watching and syncing. // Run begins watching and syncing.
func (rm *ResourceQuotaController) Run(period time.Duration) { func (rm *ResourceQuotaController) Run(period time.Duration) {
rm.syncTime = time.Tick(period) rm.syncTime = time.Tick(period)
go util.Forever(func() { rm.synchronize() }, period) go util.Until(func() { rm.synchronize() }, period, util.NeverStop)
} }
func (rm *ResourceQuotaController) synchronize() { func (rm *ResourceQuotaController) synchronize() {

View File

@ -47,11 +47,11 @@ func New(routes cloudprovider.Routes, kubeClient client.Interface, clusterName s
} }
func (rc *RouteController) Run(syncPeriod time.Duration) { func (rc *RouteController) Run(syncPeriod time.Duration) {
go util.Forever(func() { go util.Until(func() {
if err := rc.reconcileNodeRoutes(); err != nil { if err := rc.reconcileNodeRoutes(); err != nil {
glog.Errorf("Couldn't reconcile node routes: %v", err) glog.Errorf("Couldn't reconcile node routes: %v", err)
} }
}, syncPeriod) }, syncPeriod, util.NeverStop)
} }
func (rc *RouteController) reconcileNodeRoutes() error { func (rc *RouteController) reconcileNodeRoutes() error {

View File

@ -45,7 +45,7 @@ func NewSourceFile(path string, nodeName string, period time.Duration, updates c
updates: updates, updates: updates,
} }
glog.V(1).Infof("Watching path %q", path) glog.V(1).Infof("Watching path %q", path)
go util.Forever(config.run, period) go util.Until(config.run, period, util.NeverStop)
} }
func (s *sourceFile) run() { func (s *sourceFile) run() {

View File

@ -49,7 +49,7 @@ func NewSourceURL(url string, header http.Header, nodeName string, period time.D
data: nil, data: nil,
} }
glog.V(1).Infof("Watching URL %s", url) glog.V(1).Infof("Watching URL %s", url)
go util.Forever(config.run, period) go util.Until(config.run, period, util.NeverStop)
} }
func (s *sourceURL) run() { func (s *sourceURL) run() {

View File

@ -118,12 +118,12 @@ func (im *realImageManager) Start() error {
return err return err
} }
go util.Forever(func() { go util.Until(func() {
err := im.detectImages(time.Now()) err := im.detectImages(time.Now())
if err != nil { if err != nil {
glog.Warningf("[ImageManager] Failed to monitor images: %v", err) glog.Warningf("[ImageManager] Failed to monitor images: %v", err)
} }
}, 5*time.Minute) }, 5*time.Minute, util.NeverStop)
return nil return nil
} }

View File

@ -685,17 +685,17 @@ func (kl *Kubelet) GetNode() (*api.Node, error) {
// Starts garbage collection threads. // Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() { func (kl *Kubelet) StartGarbageCollection() {
go util.Forever(func() { go util.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil { if err := kl.containerGC.GarbageCollect(); err != nil {
glog.Errorf("Container garbage collection failed: %v", err) glog.Errorf("Container garbage collection failed: %v", err)
} }
}, time.Minute) }, time.Minute, util.NeverStop)
go util.Forever(func() { go util.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil { if err := kl.imageManager.GarbageCollect(); err != nil {
glog.Errorf("Image garbage collection failed: %v", err) glog.Errorf("Image garbage collection failed: %v", err)
} }
}, 5*time.Minute) }, 5*time.Minute, util.NeverStop)
} }
// Run starts the kubelet reacting to config updates // Run starts the kubelet reacting to config updates

View File

@ -75,12 +75,12 @@ func (s *statusManager) Start() {
} }
// syncBatch blocks when no updates are available, we can run it in a tight loop. // syncBatch blocks when no updates are available, we can run it in a tight loop.
glog.Info("Starting to sync pod status with apiserver") glog.Info("Starting to sync pod status with apiserver")
go util.Forever(func() { go util.Until(func() {
err := s.syncBatch() err := s.syncBatch()
if err != nil { if err != nil {
glog.Warningf("Failed to updated pod status: %v", err) glog.Warningf("Failed to updated pod status: %v", err)
} }
}, 0) }, 0, util.NeverStop)
} }
func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) { func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) {

View File

@ -74,7 +74,7 @@ func (m *Mux) Channel(source string) chan interface{} {
} }
newChannel := make(chan interface{}) newChannel := make(chan interface{})
m.sources[source] = newChannel m.sources[source] = newChannel
go util.Forever(func() { m.listen(source, newChannel) }, 0) go util.Until(func() { m.listen(source, newChannel) }, 0, util.NeverStop)
return newChannel return newChannel
} }

View File

@ -46,7 +46,7 @@ func InitLogs() {
log.SetOutput(GlogWriter{}) log.SetOutput(GlogWriter{})
log.SetFlags(0) log.SetFlags(0)
// The default glog flush interval is 30 seconds, which is frighteningly long. // The default glog flush interval is 30 seconds, which is frighteningly long.
go Forever(glog.Flush, *logFlushFreq) go Until(glog.Flush, *logFlushFreq, NeverStop)
} }
// FlushLogs flushes logs immediately. // FlushLogs flushes logs immediately.

View File

@ -87,18 +87,13 @@ func logError(err error) {
glog.ErrorDepth(2, err) glog.ErrorDepth(2, err)
} }
// Forever loops forever running f every period. Catches any panics, and keeps going.
// Deprecated. Please use Until and pass NeverStop as the stopCh.
func Forever(f func(), period time.Duration) {
Until(f, period, nil)
}
// NeverStop may be passed to Until to make it never stop. // NeverStop may be passed to Until to make it never stop.
var NeverStop <-chan struct{} = make(chan struct{}) var NeverStop <-chan struct{} = make(chan struct{})
// Until loops until stop channel is closed, running f every period. // Until loops until stop channel is closed, running f every period.
// Catches any panics, and keeps going. f may not be invoked if // Catches any panics, and keeps going. f may not be invoked if
// stop channel is already closed. // stop channel is already closed. Pass NeverStop to Until if you
// don't want it stop.
func Until(f func(), period time.Duration, stopCh <-chan struct{}) { func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
for { for {
select { select {