From af58c1b452159df88e014f9e26c59823d2e2e682 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 17 Jan 2018 11:46:17 +0000 Subject: [PATCH 1/6] replace state machine with mutex --- retrieval/manager.go | 67 ++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/retrieval/manager.go b/retrieval/manager.go index 4f2f77c82..86522fd10 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -15,6 +15,7 @@ package retrieval import ( "fmt" + "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -35,7 +36,6 @@ func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { return &ScrapeManager{ append: app, logger: logger, - actionCh: make(chan func()), scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapePools: make(map[string]*scrapePool), graceShut: make(chan struct{}), @@ -49,7 +49,7 @@ type ScrapeManager struct { append Appendable scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool - actionCh chan func() + mtx sync.RWMutex graceShut chan struct{} } @@ -59,8 +59,6 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error for { select { - case f := <-m.actionCh: - f() case ts := <-tsets: m.reload(ts) case <-m.graceShut: @@ -79,52 +77,49 @@ func (m *ScrapeManager) Stop() { // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { - done := make(chan struct{}) - m.actionCh <- func() { - c := make(map[string]*config.ScrapeConfig) - for _, scfg := range cfg.ScrapeConfigs { - c[scfg.JobName] = scfg - } - m.scrapeConfigs = c - close(done) + m.mtx.Lock() + defer m.mtx.Unlock() + c := make(map[string]*config.ScrapeConfig) + for _, scfg := range cfg.ScrapeConfigs { + c[scfg.JobName] = scfg } - <-done + m.scrapeConfigs = c return nil } // TargetMap returns map of active and dropped targets and their corresponding scrape config job name. func (m *ScrapeManager) TargetMap() map[string][]*Target { - targetsMap := make(chan map[string][]*Target) - m.actionCh <- func() { - targets := make(map[string][]*Target) - for jobName, sp := range m.scrapePools { - sp.mtx.RLock() - for _, t := range sp.targets { - targets[jobName] = append(targets[jobName], t) - } - targets[jobName] = append(targets[jobName], sp.droppedTargets...) - sp.mtx.RUnlock() + m.mtx.Lock() + defer m.mtx.Unlock() + + targets := make(map[string][]*Target) + for jobName, sp := range m.scrapePools { + sp.mtx.RLock() + for _, t := range sp.targets { + targets[jobName] = append(targets[jobName], t) } - targetsMap <- targets + targets[jobName] = append(targets[jobName], sp.droppedTargets...) + sp.mtx.RUnlock() } - return <-targetsMap + + return targets } // Targets returns the targets currently being scraped. func (m *ScrapeManager) Targets() []*Target { - targets := make(chan []*Target) - m.actionCh <- func() { - var t []*Target - for _, p := range m.scrapePools { - p.mtx.RLock() - for _, tt := range p.targets { - t = append(t, tt) - } - p.mtx.RUnlock() + m.mtx.Lock() + defer m.mtx.Unlock() + + var targets []*Target + for _, p := range m.scrapePools { + p.mtx.RLock() + for _, tt := range p.targets { + targets = append(targets, tt) } - targets <- t + p.mtx.RUnlock() } - return <-targets + + return targets } func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) { From 97f0461e291f2670d5f4adf6e10b0ab1a4af8aff Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 17 Jan 2018 12:02:13 +0000 Subject: [PATCH 2/6] refactor the config reloading execution --- cmd/prometheus/main.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 9aa9d6d03..d4fc4ffb9 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -287,6 +287,9 @@ func main() { remoteStorage.ApplyConfig, webHandler.ApplyConfig, notifier.ApplyConfig, + // The Scrape manager needs to reload before the Discvoery manager as + // it needs to read the latest config when it receives the new targets list. + scrapeManager.ApplyConfig, func(cfg *config.Config) error { c := make(map[string]sd_config.ServiceDiscoveryConfig) for _, v := range cfg.ScrapeConfigs { @@ -306,7 +309,6 @@ func main() { } return discoveryManagerNotify.ApplyConfig(c) }, - scrapeManager.ApplyConfig, func(cfg *config.Config) error { // Get all rule files matching the configuration oaths. var files []string @@ -384,6 +386,14 @@ func main() { { g.Add( func() error { + select { + // When the scrape manager receives a new targets list + // it needs to read a valid config for each target and + // it depends on the config being in sync with the discovery manager. + case <-reloadReady: + break + } + err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) level.Info(logger).Log("msg", "Scrape manager stopped") return err From 0eafaf32d3a9c6e47407d61724091389ec636254 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 17 Jan 2018 13:06:56 +0000 Subject: [PATCH 3/6] set the correct config reloading execution for scraper and notifier --- cmd/prometheus/main.go | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d4fc4ffb9..520124eff 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -286,9 +286,9 @@ func main() { reloaders := []func(cfg *config.Config) error{ remoteStorage.ApplyConfig, webHandler.ApplyConfig, + // The Scrape and notifier managers need to reload before the Discovery manager as + // they need to read the most updated config when receiving the new targets list. notifier.ApplyConfig, - // The Scrape manager needs to reload before the Discvoery manager as - // it needs to read the latest config when it receives the new targets list. scrapeManager.ApplyConfig, func(cfg *config.Config) error { c := make(map[string]sd_config.ServiceDiscoveryConfig) @@ -343,6 +343,15 @@ func main() { select { case <-term: level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") + // Release the reloadReady channel so that waiting blocks can exit normally. + select { + case _, ok := <-reloadReady: + if ok { + close(reloadReady) + } + default: + } + case <-webHandler.Quit(): level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") case <-cancel: @@ -388,8 +397,9 @@ func main() { func() error { select { // When the scrape manager receives a new targets list - // it needs to read a valid config for each target and - // it depends on the config being in sync with the discovery manager. + // it needs to read a valid config for each job and + // it depends on the config being in sync with the discovery manager + // so we wait until the config is fully loaded. case <-reloadReady: break } @@ -417,9 +427,6 @@ func main() { select { case <-reloadReady: break - // In case a shutdown is initiated before the reloadReady is released. - case <-cancel: - return nil } for { @@ -462,7 +469,15 @@ func main() { return fmt.Errorf("Error loading config %s", err) } - close(reloadReady) + // Check that it is not already closed by the SIGTERM handling. + select { + case _, ok := <-reloadReady: + if ok { + close(reloadReady) + } + default: + } + webHandler.Ready() level.Info(logger).Log("msg", "Server is ready to receive requests.") <-cancel @@ -539,6 +554,14 @@ func main() { // so keep this interrupt after the ruleManager.Stop(). g.Add( func() error { + select { + // When the notifier manager receives a new targets list + // it needs to read a valid config for each job and + // it depends on the config being in sync with the discovery manager + // so we wait until the config is fully loaded. + case <-reloadReady: + break + } notifier.Run(discoveryManagerNotify.SyncCh()) return nil }, From ec26751fd298fac9f45493c7a6e9150095470fa8 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 17 Jan 2018 18:12:58 +0000 Subject: [PATCH 4/6] use mutexes for the discovery manager instead of a loop as this was a stupid idea --- discovery/manager.go | 67 ++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 5372a7870..ae10de5cb 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -16,6 +16,7 @@ package discovery import ( "context" "fmt" + "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -61,10 +62,10 @@ type poolKey struct { func NewManager(logger log.Logger) *Manager { return &Manager{ logger: logger, - actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group), discoverCancel: []context.CancelFunc{}, + ctx: context.Background(), } } @@ -72,7 +73,8 @@ func NewManager(logger log.Logger) *Manager { // Targets are grouped by the target set name. type Manager struct { logger log.Logger - actionCh chan func(context.Context) + mtx sync.RWMutex + ctx context.Context discoverCancel []context.CancelFunc // Some Discoverers(eg. k8s) send only the updates for a given target group // so we use map[tg.Source]*targetgroup.Group to know which group to update. @@ -83,10 +85,9 @@ type Manager struct { // Run starts the background processing func (m *Manager) Run(ctx context.Context) error { + m.ctx = ctx for { select { - case f := <-m.actionCh: - f(ctx) case <-ctx.Done(): m.cancelDiscoverers() return ctx.Err() @@ -101,18 +102,17 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { // ApplyConfig removes all running discovery providers and starts new ones using the provided config. func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error { - err := make(chan error) - m.actionCh <- func(ctx context.Context) { - m.cancelDiscoverers() - for name, scfg := range cfg { - for provName, prov := range m.providersFromConfig(scfg) { - m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov) - } + m.mtx.Lock() + defer m.mtx.Unlock() + + m.cancelDiscoverers() + for name, scfg := range cfg { + for provName, prov := range m.providersFromConfig(scfg) { + m.startProvider(m.ctx, poolKey{setName: name, provider: provName}, prov) } - close(err) } - return <-err + return nil } func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { @@ -151,39 +151,32 @@ func (m *Manager) cancelDiscoverers() { } func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { - done := make(chan struct{}) + m.mtx.Lock() + defer m.mtx.Unlock() - m.actionCh <- func(ctx context.Context) { - for _, tg := range tgs { - if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. - if _, ok := m.targets[poolKey]; !ok { - m.targets[poolKey] = make(map[string]*targetgroup.Group) - } - m.targets[poolKey][tg.Source] = tg + for _, tg := range tgs { + if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) } + m.targets[poolKey][tg.Source] = tg } - close(done) - } - <-done } func (m *Manager) allGroups() map[string][]*targetgroup.Group { - tSets := make(chan map[string][]*targetgroup.Group) - - m.actionCh <- func(ctx context.Context) { - tSetsAll := map[string][]*targetgroup.Group{} - for pkey, tsets := range m.targets { - for _, tg := range tsets { - // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' - // to signal that it needs to stop all scrape loops for this target set. - tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) - } + m.mtx.Lock() + defer m.mtx.Unlock() + + tSets := map[string][]*targetgroup.Group{} + for pkey, tsets := range m.targets { + for _, tg := range tsets { + // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' + // to signal that it needs to stop all scrape loops for this target set. + tSets[pkey.setName] = append(tSets[pkey.setName], tg) } - tSets <- tSetsAll } - return <-tSets - + return tSets } func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer { From 719c579f7b917b384c3d629752dea026513317dc Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 17 Jan 2018 18:14:24 +0000 Subject: [PATCH 5/6] refactor main execution reloadReady handling, update some comments --- cmd/prometheus/main.go | 63 +++++++++++++++++++++++------------------- notifier/notifier.go | 2 +- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 520124eff..a5885e376 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -28,6 +28,7 @@ import ( "path/filepath" "runtime" "strings" + "sync" "syscall" "time" @@ -330,8 +331,22 @@ func main() { // Start all components while we wait for TSDB to open but only load // initial config and mark ourselves as ready after it completed. dbOpen := make(chan struct{}) - // Wait until the server is ready to handle reloading - reloadReady := make(chan struct{}) + + // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded). + type closeOnce struct { + C chan struct{} + once sync.Once + Close func() + } + // Wait until the server is ready to handle reloading. + reloadReady := &closeOnce{ + C: make(chan struct{}), + } + reloadReady.Close = func() { + reloadReady.once.Do(func() { + close(reloadReady.C) + }) + } var g group.Group { @@ -340,21 +355,16 @@ func main() { cancel := make(chan struct{}) g.Add( func() error { + // Don't forget to release the reloadReady channel so that waiting blocks can exit normally. select { case <-term: level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") - // Release the reloadReady channel so that waiting blocks can exit normally. - select { - case _, ok := <-reloadReady: - if ok { - close(reloadReady) - } - default: - } + reloadReady.Close() case <-webHandler.Quit(): level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") case <-cancel: + reloadReady.Close() break } return nil @@ -395,12 +405,12 @@ func main() { { g.Add( func() error { - select { // When the scrape manager receives a new targets list - // it needs to read a valid config for each job and - // it depends on the config being in sync with the discovery manager - // so we wait until the config is fully loaded. - case <-reloadReady: + // it needs to read a valid config for each job. + // It depends on the config being in sync with the discovery manager so + // we wait until the config is fully loaded. + select { + case <-reloadReady.C: break } @@ -425,7 +435,7 @@ func main() { g.Add( func() error { select { - case <-reloadReady: + case <-reloadReady.C: break } @@ -462,6 +472,7 @@ func main() { break // In case a shutdown is initiated before the dbOpen is released case <-cancel: + reloadReady.Close() return nil } @@ -469,17 +480,10 @@ func main() { return fmt.Errorf("Error loading config %s", err) } - // Check that it is not already closed by the SIGTERM handling. - select { - case _, ok := <-reloadReady: - if ok { - close(reloadReady) - } - default: - } + reloadReady.Close() webHandler.Ready() - level.Info(logger).Log("msg", "Server is ready to receive requests.") + level.Info(logger).Log("msg", "Server is ready to receive web requests.") <-cancel return nil }, @@ -554,15 +558,16 @@ func main() { // so keep this interrupt after the ruleManager.Stop(). g.Add( func() error { - select { // When the notifier manager receives a new targets list - // it needs to read a valid config for each job and - // it depends on the config being in sync with the discovery manager + // it needs to read a valid config for each job. + // It depends on the config being in sync with the discovery manager // so we wait until the config is fully loaded. - case <-reloadReady: + select { + case <-reloadReady.C: break } notifier.Run(discoveryManagerNotify.SyncCh()) + level.Info(logger).Log("msg", "Notifier manager stopped") return nil }, func(err error) { diff --git a/notifier/notifier.go b/notifier/notifier.go index 411104383..9a857a287 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -490,7 +490,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b [] // Stop shuts down the notification handler. func (n *Notifier) Stop() { - level.Info(n.logger).Log("msg", "Stopping notification handler...") + level.Info(n.logger).Log("msg", "Stopping notification manager...") n.cancel() } From 910c22418c9a94e6ccfbc46dd760f5f392a25775 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 18 Jan 2018 11:49:42 +0000 Subject: [PATCH 6/6] move cleanup and reload in ApplyConfig --- retrieval/manager.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/retrieval/manager.go b/retrieval/manager.go index 86522fd10..724e7788a 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -15,6 +15,7 @@ package retrieval import ( "fmt" + "reflect" "sync" "github.com/go-kit/kit/log" @@ -84,6 +85,17 @@ func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { c[scfg.JobName] = scfg } m.scrapeConfigs = c + + // Cleanup and reload pool if config has changed. + for name, sp := range m.scrapePools { + if cfg, ok := m.scrapeConfigs[name]; !ok { + sp.stop() + delete(m.scrapePools, name) + } else if !reflect.DeepEqual(sp.config, cfg) { + sp.reload(cfg) + } + } + return nil } @@ -141,12 +153,4 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) { existing.Sync(tgroup) } } - - // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. - for name, sp := range m.scrapePools { - if _, ok := m.scrapeConfigs[name]; !ok { - sp.stop() - delete(m.scrapePools, name) - } - } }