From ec26751fd298fac9f45493c7a6e9150095470fa8 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 17 Jan 2018 18:12:58 +0000 Subject: [PATCH] use mutexes for the discovery manager instead of a loop as this was a stupid idea --- discovery/manager.go | 67 ++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 5372a7870..ae10de5cb 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -16,6 +16,7 @@ package discovery import ( "context" "fmt" + "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -61,10 +62,10 @@ type poolKey struct { func NewManager(logger log.Logger) *Manager { return &Manager{ logger: logger, - actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group), discoverCancel: []context.CancelFunc{}, + ctx: context.Background(), } } @@ -72,7 +73,8 @@ func NewManager(logger log.Logger) *Manager { // Targets are grouped by the target set name. type Manager struct { logger log.Logger - actionCh chan func(context.Context) + mtx sync.RWMutex + ctx context.Context discoverCancel []context.CancelFunc // Some Discoverers(eg. k8s) send only the updates for a given target group // so we use map[tg.Source]*targetgroup.Group to know which group to update. @@ -83,10 +85,9 @@ type Manager struct { // Run starts the background processing func (m *Manager) Run(ctx context.Context) error { + m.ctx = ctx for { select { - case f := <-m.actionCh: - f(ctx) case <-ctx.Done(): m.cancelDiscoverers() return ctx.Err() @@ -101,18 +102,17 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { // ApplyConfig removes all running discovery providers and starts new ones using the provided config. func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error { - err := make(chan error) - m.actionCh <- func(ctx context.Context) { - m.cancelDiscoverers() - for name, scfg := range cfg { - for provName, prov := range m.providersFromConfig(scfg) { - m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov) - } + m.mtx.Lock() + defer m.mtx.Unlock() + + m.cancelDiscoverers() + for name, scfg := range cfg { + for provName, prov := range m.providersFromConfig(scfg) { + m.startProvider(m.ctx, poolKey{setName: name, provider: provName}, prov) } - close(err) } - return <-err + return nil } func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { @@ -151,39 +151,32 @@ func (m *Manager) cancelDiscoverers() { } func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { - done := make(chan struct{}) + m.mtx.Lock() + defer m.mtx.Unlock() - m.actionCh <- func(ctx context.Context) { - for _, tg := range tgs { - if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. - if _, ok := m.targets[poolKey]; !ok { - m.targets[poolKey] = make(map[string]*targetgroup.Group) - } - m.targets[poolKey][tg.Source] = tg + for _, tg := range tgs { + if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) } + m.targets[poolKey][tg.Source] = tg } - close(done) - } - <-done } func (m *Manager) allGroups() map[string][]*targetgroup.Group { - tSets := make(chan map[string][]*targetgroup.Group) - - m.actionCh <- func(ctx context.Context) { - tSetsAll := map[string][]*targetgroup.Group{} - for pkey, tsets := range m.targets { - for _, tg := range tsets { - // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' - // to signal that it needs to stop all scrape loops for this target set. - tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) - } + m.mtx.Lock() + defer m.mtx.Unlock() + + tSets := map[string][]*targetgroup.Group{} + for pkey, tsets := range m.targets { + for _, tg := range tsets { + // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' + // to signal that it needs to stop all scrape loops for this target set. + tSets[pkey.setName] = append(tSets[pkey.setName], tg) } - tSets <- tSetsAll } - return <-tSets - + return tSets } func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer {