|
|
@ -15,6 +15,7 @@ package retrieval
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"sort"
|
|
|
|
"strings"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
@ -134,11 +135,11 @@ func (tm *TargetManager) reload() {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Pools returns the targets currently being scraped bucketed by their job name.
|
|
|
|
// Pools returns the targets currently being scraped bucketed by their job name.
|
|
|
|
func (tm *TargetManager) Pools() map[string][]*Target {
|
|
|
|
func (tm *TargetManager) Pools() map[string]Targets {
|
|
|
|
tm.mtx.RLock()
|
|
|
|
tm.mtx.RLock()
|
|
|
|
defer tm.mtx.RUnlock()
|
|
|
|
defer tm.mtx.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
pools := map[string][]*Target{}
|
|
|
|
pools := map[string]Targets{}
|
|
|
|
|
|
|
|
|
|
|
|
// TODO(fabxc): this is just a hack to maintain compatibility for now.
|
|
|
|
// TODO(fabxc): this is just a hack to maintain compatibility for now.
|
|
|
|
for _, ps := range tm.targetSets {
|
|
|
|
for _, ps := range tm.targetSets {
|
|
|
@ -151,6 +152,9 @@ func (tm *TargetManager) Pools() map[string][]*Target {
|
|
|
|
|
|
|
|
|
|
|
|
ps.scrapePool.mtx.RUnlock()
|
|
|
|
ps.scrapePool.mtx.RUnlock()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, targets := range pools {
|
|
|
|
|
|
|
|
sort.Sort(targets)
|
|
|
|
|
|
|
|
}
|
|
|
|
return pools
|
|
|
|
return pools
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -274,28 +278,28 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ
|
|
|
|
updates := make(chan []*config.TargetGroup)
|
|
|
|
updates := make(chan []*config.TargetGroup)
|
|
|
|
|
|
|
|
|
|
|
|
go func(name string, prov TargetProvider) {
|
|
|
|
go func(name string, prov TargetProvider) {
|
|
|
|
var initial []*config.TargetGroup
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
case <-ctx.Done():
|
|
|
|
wg.Done()
|
|
|
|
case initial, ok := <-updates:
|
|
|
|
return
|
|
|
|
// Handle the case that a target provider exits and closes the channel
|
|
|
|
case initial = <-updates:
|
|
|
|
// before the context is done.
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
}
|
|
|
|
// First set of all targets the provider knows.
|
|
|
|
// First set of all targets the provider knows.
|
|
|
|
|
|
|
|
for _, tgroup := range initial {
|
|
|
|
|
|
|
|
targets, err := targetsFromGroup(tgroup, ts.config)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.With("target_group", tgroup).Errorf("Target update failed: %s", err)
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
ts.tgroups[name+"/"+tgroup.Source] = targets
|
|
|
|
|
|
|
|
}
|
|
|
|
case <-time.After(5 * time.Second):
|
|
|
|
case <-time.After(5 * time.Second):
|
|
|
|
// Initial set didn't arrive. Act as if it was empty
|
|
|
|
// Initial set didn't arrive. Act as if it was empty
|
|
|
|
// and wait for updates later on.
|
|
|
|
// and wait for updates later on.
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, tgroup := range initial {
|
|
|
|
|
|
|
|
targets, err := targetsFromGroup(tgroup, ts.config)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.With("target_group", tgroup).Errorf("Target update failed: %s", err)
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
ts.tgroups[name+"/"+tgroup.Source] = targets
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
wg.Done()
|
|
|
|
wg.Done()
|
|
|
|
|
|
|
|
|
|
|
|
// Start listening for further updates.
|
|
|
|
// Start listening for further updates.
|
|
|
@ -303,7 +307,12 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ
|
|
|
|
select {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
return
|
|
|
|
case tgs := <-updates:
|
|
|
|
case tgs, ok := <-updates:
|
|
|
|
|
|
|
|
// Handle the case that a target provider exits and closes the channel
|
|
|
|
|
|
|
|
// before the context is done.
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
for _, tg := range tgs {
|
|
|
|
for _, tg := range tgs {
|
|
|
|
if err := ts.update(name, tg); err != nil {
|
|
|
|
if err := ts.update(name, tg); err != nil {
|
|
|
|
log.With("target_group", tg).Errorf("Target update failed: %s", err)
|
|
|
|
log.With("target_group", tg).Errorf("Target update failed: %s", err)
|
|
|
|