use mutexes for the discovery manager instead of a loop as this was a stupid idea

pull/3698/head
Krasi Georgiev 7 years ago
parent 0eafaf32d3
commit ec26751fd2

@ -16,6 +16,7 @@ package discovery
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
@ -61,10 +62,10 @@ type poolKey struct {
func NewManager(logger log.Logger) *Manager { func NewManager(logger log.Logger) *Manager {
return &Manager{ return &Manager{
logger: logger, logger: logger,
actionCh: make(chan func(context.Context)),
syncCh: make(chan map[string][]*targetgroup.Group), syncCh: make(chan map[string][]*targetgroup.Group),
targets: make(map[poolKey]map[string]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group),
discoverCancel: []context.CancelFunc{}, discoverCancel: []context.CancelFunc{},
ctx: context.Background(),
} }
} }
@ -72,7 +73,8 @@ func NewManager(logger log.Logger) *Manager {
// Targets are grouped by the target set name. // Targets are grouped by the target set name.
type Manager struct { type Manager struct {
logger log.Logger logger log.Logger
actionCh chan func(context.Context) mtx sync.RWMutex
ctx context.Context
discoverCancel []context.CancelFunc discoverCancel []context.CancelFunc
// Some Discoverers(eg. k8s) send only the updates for a given target group // Some Discoverers(eg. k8s) send only the updates for a given target group
// so we use map[tg.Source]*targetgroup.Group to know which group to update. // so we use map[tg.Source]*targetgroup.Group to know which group to update.
@ -83,10 +85,9 @@ type Manager struct {
// Run starts the background processing // Run starts the background processing
func (m *Manager) Run(ctx context.Context) error { func (m *Manager) Run(ctx context.Context) error {
m.ctx = ctx
for { for {
select { select {
case f := <-m.actionCh:
f(ctx)
case <-ctx.Done(): case <-ctx.Done():
m.cancelDiscoverers() m.cancelDiscoverers()
return ctx.Err() return ctx.Err()
@ -101,18 +102,17 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
// ApplyConfig removes all running discovery providers and starts new ones using the provided config. // ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error { func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
err := make(chan error) m.mtx.Lock()
m.actionCh <- func(ctx context.Context) { defer m.mtx.Unlock()
m.cancelDiscoverers()
for name, scfg := range cfg { m.cancelDiscoverers()
for provName, prov := range m.providersFromConfig(scfg) { for name, scfg := range cfg {
m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov) for provName, prov := range m.providersFromConfig(scfg) {
} m.startProvider(m.ctx, poolKey{setName: name, provider: provName}, prov)
} }
close(err)
} }
return <-err return nil
} }
func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) {
@ -151,39 +151,32 @@ func (m *Manager) cancelDiscoverers() {
} }
func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
done := make(chan struct{}) m.mtx.Lock()
defer m.mtx.Unlock()
m.actionCh <- func(ctx context.Context) { for _, tg := range tgs {
for _, tg := range tgs { if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. if _, ok := m.targets[poolKey]; !ok {
if _, ok := m.targets[poolKey]; !ok { m.targets[poolKey] = make(map[string]*targetgroup.Group)
m.targets[poolKey] = make(map[string]*targetgroup.Group)
}
m.targets[poolKey][tg.Source] = tg
} }
m.targets[poolKey][tg.Source] = tg
} }
close(done)
} }
<-done
} }
func (m *Manager) allGroups() map[string][]*targetgroup.Group { func (m *Manager) allGroups() map[string][]*targetgroup.Group {
tSets := make(chan map[string][]*targetgroup.Group) m.mtx.Lock()
defer m.mtx.Unlock()
m.actionCh <- func(ctx context.Context) {
tSetsAll := map[string][]*targetgroup.Group{} tSets := map[string][]*targetgroup.Group{}
for pkey, tsets := range m.targets { for pkey, tsets := range m.targets {
for _, tg := range tsets { for _, tg := range tsets {
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
// to signal that it needs to stop all scrape loops for this target set. // to signal that it needs to stop all scrape loops for this target set.
tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) tSets[pkey.setName] = append(tSets[pkey.setName], tg)
}
} }
tSets <- tSetsAll
} }
return <-tSets return tSets
} }
func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer { func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer {

Loading…
Cancel
Save