simplify naming and API.

pull/3362/head
Krasi Georgiev 2017-11-26 22:18:05 +00:00
parent 9c61f0e8a0
commit c5cb0d2910
2 changed files with 50 additions and 54 deletions

View File

@ -236,7 +236,7 @@ func main() {
ctxRule = context.Background() ctxRule = context.Background()
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) discoveryManager = discovery.NewManager(ctxDiscovery, log.With(logger, "component", "discovery manager"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{Appendable: fanoutStorage, ruleManager = rules.NewManager(&rules.ManagerOptions{Appendable: fanoutStorage,

View File

@ -36,15 +36,15 @@ import (
"github.com/prometheus/prometheus/discovery/zookeeper" "github.com/prometheus/prometheus/discovery/zookeeper"
) )
// DiscoveryProvider provides information about target groups. It maintains a set // Discoverer provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a discovery provider // of sources from which TargetGroups can originate. Whenever a discovery provider
// detects a potential change, it sends the TargetGroup through its provided channel. // detects a potential change, it sends the TargetGroup through its channel.
// //
// The DiscoveryProvider does not have to guarantee that an actual change happened. // Discoverer does not know if an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens. // It does guarantee that it sends the new TargetGroup whenever a change happens.
// //
// DiscoveryProviders should initially send a full set of all discoverable TargetGroups. // Discoverers should initially send a full set of all discoverable TargetGroups.
type DiscoveryProvider interface { type Discoverer interface {
// Run hands a channel to the discovery provider(consul,dns etc) through which it can send // Run hands a channel to the discovery provider(consul,dns etc) through which it can send
// updated target groups. // updated target groups.
// Must returns if the context gets canceled. It should not close the update // Must returns if the context gets canceled. It should not close the update
@ -52,33 +52,35 @@ type DiscoveryProvider interface {
Run(ctx context.Context, up chan<- []*config.TargetGroup) Run(ctx context.Context, up chan<- []*config.TargetGroup)
} }
type targetSetProvider struct { // type pool struct {
cancel func() // cancel func()
tgroups []*config.TargetGroup // tgroups []*config.TargetGroup
} // }
// NewDiscoveryManager is the DiscoveryManager constructor // NewManager is the Discovery Manager constructor
func NewDiscoveryManager(ctx context.Context, logger log.Logger) *DiscoveryManager { func NewManager(ctx context.Context, logger log.Logger) *Manager {
return &DiscoveryManager{ return &Manager{
ctx: ctx, ctx: ctx,
logger: logger, logger: logger,
actionCh: make(chan func()), actionCh: make(chan func()),
syncCh: make(chan map[string][]*config.TargetGroup), syncCh: make(chan map[string][]*config.TargetGroup),
targetSetProviders: make(map[string]map[string]*targetSetProvider), endpoints: make(map[string]map[string][]*config.TargetGroup),
discoverCancel: []context.CancelFunc{},
} }
} }
// DiscoveryManager maintains a set of discovery providers and sends each update to a channel used by other packages. // Manager maintains a set of discovery providers and sends each update to a channel used by other packages.
type DiscoveryManager struct { type Manager struct {
ctx context.Context ctx context.Context
logger log.Logger logger log.Logger
syncCh chan map[string][]*config.TargetGroup // map[targetSetName] syncCh chan map[string][]*config.TargetGroup // map[targetSetName]
actionCh chan func() actionCh chan func()
targetSetProviders map[string]map[string]*targetSetProvider // map[targetSetName]map[providerName] discoverCancel []context.CancelFunc
endpoints map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName]
} }
// Run starts the background processing // Run starts the background processing
func (m *DiscoveryManager) Run() error { func (m *Manager) Run() error {
for { for {
select { select {
case f := <-m.actionCh: case f := <-m.actionCh:
@ -90,27 +92,27 @@ func (m *DiscoveryManager) Run() error {
} }
// SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates // SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates
func (m *DiscoveryManager) SyncCh() <-chan map[string][]*config.TargetGroup { func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup {
return m.syncCh return m.syncCh
} }
// ApplyConfig removes all running discovery providers and starts new ones using the provided config. // ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error { func (m *Manager) ApplyConfig(cfg *config.Config) error {
err := make(chan error) err := make(chan error)
m.actionCh <- func() { m.actionCh <- func() {
m.cancelDiscoveryProviders() m.cancelDiscoverers()
for _, scfg := range cfg.ScrapeConfigs { for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
ctx, cancel := context.WithCancel(m.ctx) ctx, cancel := context.WithCancel(m.ctx)
updates := make(chan []*config.TargetGroup) updates := make(chan []*config.TargetGroup)
m.createProvider(cancel, scfg.JobName, provName) m.discoverCancel = append(m.discoverCancel, cancel)
go prov.Run(ctx, updates) go prov.Run(ctx, updates)
go func(provName string) { go func(provName string) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// First set of all targets the provider knows. // First set of all endpoints the provider knows.
case tgs, ok := <-updates: case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel // Handle the case that a target provider exits and closes the channel
// before the context is done. // before the context is done.
@ -146,35 +148,29 @@ func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error {
return <-err return <-err
} }
func (m *DiscoveryManager) cancelDiscoveryProviders() { func (m *Manager) cancelDiscoverers() {
for targetSetName, targetSetProviders := range m.targetSetProviders { for _, c := range m.discoverCancel {
for _, discoveryProvider := range targetSetProviders { c()
discoveryProvider.cancel()
}
delete(m.targetSetProviders, targetSetName)
} }
m.endpoints = make(map[string]map[string][]*config.TargetGroup)
m.discoverCancel = []context.CancelFunc{}
} }
func (m *DiscoveryManager) createProvider(cancel context.CancelFunc, tsName, provName string) { // mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set
if m.targetSetProviders[tsName] == nil { func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup {
m.targetSetProviders[tsName] = make(map[string]*targetSetProvider) if m.endpoints[tsName] == nil {
m.endpoints[tsName] = make(map[string][]*config.TargetGroup)
} }
m.targetSetProviders[tsName][provName] = &targetSetProvider{ m.endpoints[tsName][provName] = []*config.TargetGroup{}
cancel: cancel,
tgroups: []*config.TargetGroup{},
}
}
// mergeGroups adds a new target group for a named discovery provider and returns all target groups for a given target set
func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup {
tset := make(chan map[string][]*config.TargetGroup) tset := make(chan map[string][]*config.TargetGroup)
m.actionCh <- func() { m.actionCh <- func() {
if tg != nil { if tg != nil {
m.targetSetProviders[tsName][provName].tgroups = tg m.endpoints[tsName][provName] = tg
} }
var tgAll []*config.TargetGroup var tgAll []*config.TargetGroup
for _, prov := range m.targetSetProviders[tsName] { for _, prov := range m.endpoints[tsName] {
for _, tg := range prov.tgroups { for _, tg := range prov {
tgAll = append(tgAll, tg) tgAll = append(tgAll, tg)
} }
} }
@ -185,10 +181,10 @@ func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.Tar
return <-tset return <-tset
} }
func (m *DiscoveryManager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]DiscoveryProvider { func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer {
providers := map[string]DiscoveryProvider{} providers := map[string]Discoverer{}
app := func(mech string, i int, tp DiscoveryProvider) { app := func(mech string, i int, tp Discoverer) {
providers[fmt.Sprintf("%s/%d", mech, i)] = tp providers[fmt.Sprintf("%s/%d", mech, i)] = tp
} }
@ -280,7 +276,7 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
return &StaticProvider{groups} return &StaticProvider{groups}
} }
// Run implements the DiscoveryProvider interface. // Run implements the Worker interface.
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// We still have to consider that the consumer exits right away in which case // We still have to consider that the consumer exits right away in which case
// the context will be canceled. // the context will be canceled.