Browse Source

brake the start provider func so that can run unit tests against it.

pull/3362/head
Krasi Georgiev 7 years ago
parent
commit
f5c2c5ff8f
  1. 78
      discovery/manager.go

78
discovery/manager.go

@ -103,43 +103,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.cancelDiscoverers() m.cancelDiscoverers()
for _, scfg := range cfg.ScrapeConfigs { for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
ctx, cancel := context.WithCancel(m.ctx) m.startProvider(scfg.JobName, provName, prov)
updates := make(chan []*config.TargetGroup)
m.discoverCancel = append(m.discoverCancel, cancel)
go prov.Run(ctx, updates)
go func(provName string) {
select {
case <-ctx.Done():
// First set of all endpoints the provider knows.
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs)
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs)
}
}
}(provName)
} }
} }
close(err) close(err)
@ -148,6 +112,46 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
return <-err return <-err
} }
func (m *Manager) startProvider(jobName, provName string, worker Discoverer) {
ctx, cancel := context.WithCancel(m.ctx)
updates := make(chan []*config.TargetGroup)
m.discoverCancel = append(m.discoverCancel, cancel)
go worker.Run(ctx, updates)
go func(provName string) {
select {
case <-ctx.Done():
// First set of all endpoints the provider knows.
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
m.syncCh <- m.mergeGroups(jobName, provName, tgs)
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
m.syncCh <- m.mergeGroups(jobName, provName, tgs)
}
}
}(provName)
}
func (m *Manager) cancelDiscoverers() { func (m *Manager) cancelDiscoverers() {
for _, c := range m.discoverCancel { for _, c := range m.discoverCancel {
c() c()

Loading…
Cancel
Save