refactor main execution reloadReady handling, update some comments

pull/3698/head
Krasi Georgiev 2018-01-17 18:14:24 +00:00
parent ec26751fd2
commit 719c579f7b
2 changed files with 35 additions and 30 deletions

View File

@ -28,6 +28,7 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings" "strings"
"sync"
"syscall" "syscall"
"time" "time"
@ -330,8 +331,22 @@ func main() {
// Start all components while we wait for TSDB to open but only load // Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed. // initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{}) dbOpen := make(chan struct{})
// Wait until the server is ready to handle reloading
reloadReady := make(chan struct{}) // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
var g group.Group var g group.Group
{ {
@ -340,21 +355,16 @@ func main() {
cancel := make(chan struct{}) cancel := make(chan struct{})
g.Add( g.Add(
func() error { func() error {
// Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
select { select {
case <-term: case <-term:
level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
// Release the reloadReady channel so that waiting blocks can exit normally. reloadReady.Close()
select {
case _, ok := <-reloadReady:
if ok {
close(reloadReady)
}
default:
}
case <-webHandler.Quit(): case <-webHandler.Quit():
level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...")
case <-cancel: case <-cancel:
reloadReady.Close()
break break
} }
return nil return nil
@ -395,12 +405,12 @@ func main() {
{ {
g.Add( g.Add(
func() error { func() error {
select {
// When the scrape manager receives a new targets list // When the scrape manager receives a new targets list
// it needs to read a valid config for each job and // it needs to read a valid config for each job.
// it depends on the config being in sync with the discovery manager // It depends on the config being in sync with the discovery manager so
// so we wait until the config is fully loaded. // we wait until the config is fully loaded.
case <-reloadReady: select {
case <-reloadReady.C:
break break
} }
@ -425,7 +435,7 @@ func main() {
g.Add( g.Add(
func() error { func() error {
select { select {
case <-reloadReady: case <-reloadReady.C:
break break
} }
@ -462,6 +472,7 @@ func main() {
break break
// In case a shutdown is initiated before the dbOpen is released // In case a shutdown is initiated before the dbOpen is released
case <-cancel: case <-cancel:
reloadReady.Close()
return nil return nil
} }
@ -469,17 +480,10 @@ func main() {
return fmt.Errorf("Error loading config %s", err) return fmt.Errorf("Error loading config %s", err)
} }
// Check that it is not already closed by the SIGTERM handling. reloadReady.Close()
select {
case _, ok := <-reloadReady:
if ok {
close(reloadReady)
}
default:
}
webHandler.Ready() webHandler.Ready()
level.Info(logger).Log("msg", "Server is ready to receive requests.") level.Info(logger).Log("msg", "Server is ready to receive web requests.")
<-cancel <-cancel
return nil return nil
}, },
@ -554,15 +558,16 @@ func main() {
// so keep this interrupt after the ruleManager.Stop(). // so keep this interrupt after the ruleManager.Stop().
g.Add( g.Add(
func() error { func() error {
select {
// When the notifier manager receives a new targets list // When the notifier manager receives a new targets list
// it needs to read a valid config for each job and // it needs to read a valid config for each job.
// it depends on the config being in sync with the discovery manager // It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded. // so we wait until the config is fully loaded.
case <-reloadReady: select {
case <-reloadReady.C:
break break
} }
notifier.Run(discoveryManagerNotify.SyncCh()) notifier.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg", "Notifier manager stopped")
return nil return nil
}, },
func(err error) { func(err error) {

View File

@ -490,7 +490,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []
// Stop shuts down the notification handler. // Stop shuts down the notification handler.
func (n *Notifier) Stop() { func (n *Notifier) Stop() {
level.Info(n.logger).Log("msg", "Stopping notification handler...") level.Info(n.logger).Log("msg", "Stopping notification manager...")
n.cancel() n.cancel()
} }