mirror of https://github.com/prometheus/prometheus
process scrape loops reloading in parallel (#4526)
The scrape manage receiver's channel now just saves the target sets and another backgorund runner updates the scrape loops every 5 seconds. This is so that the scrape manager doesn't block the receiving channel when it does the long background reloading of the scrape loops. Active and dropped targets are now saved in each scrape pool instead of the scrape manager. This is mainly to avoid races when getting the targets via the web api. When reloading the scrape loops now happens in parallel to speed up the final disared state and this also speeds up the prometheus's shutting down. Also updated some funcs signatures in the web package for consistency. Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>pull/4664/head
parent
abf6fe0a98
commit
47a673c3a0
|
@ -14,9 +14,9 @@
|
|||
package scrape
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
|
@ -33,13 +33,16 @@ type Appendable interface {
|
|||
|
||||
// NewManager is the Manager constructor
|
||||
func NewManager(logger log.Logger, app Appendable) *Manager {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
return &Manager{
|
||||
append: app,
|
||||
logger: logger,
|
||||
scrapeConfigs: make(map[string]*config.ScrapeConfig),
|
||||
scrapePools: make(map[string]*scrapePool),
|
||||
graceShut: make(chan struct{}),
|
||||
targetsAll: make(map[string][]*Target),
|
||||
triggerReload: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,28 +53,83 @@ type Manager struct {
|
|||
append Appendable
|
||||
graceShut chan struct{}
|
||||
|
||||
mtxTargets sync.Mutex // Guards the fields below.
|
||||
targetsActive []*Target
|
||||
targetsDropped []*Target
|
||||
targetsAll map[string][]*Target
|
||||
|
||||
mtxScrape sync.Mutex // Guards the fields below.
|
||||
scrapeConfigs map[string]*config.ScrapeConfig
|
||||
scrapePools map[string]*scrapePool
|
||||
targetSets map[string][]*targetgroup.Group
|
||||
|
||||
triggerReload chan struct{}
|
||||
}
|
||||
|
||||
// Run starts background processing to handle target updates and reload the scraping loops.
|
||||
// Run receives and saves target set updates and triggers the scraping loops reloading.
|
||||
// Reloading happens in the background so that it doesn't block receiving targets updates.
|
||||
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
|
||||
go m.reloader()
|
||||
for {
|
||||
select {
|
||||
case ts := <-tsets:
|
||||
m.reload(ts)
|
||||
m.updateTsets(ts)
|
||||
|
||||
select {
|
||||
case m.triggerReload <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
case <-m.graceShut:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) reloader() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.graceShut:
|
||||
return
|
||||
case <-ticker.C:
|
||||
select {
|
||||
case <-m.triggerReload:
|
||||
m.reload()
|
||||
case <-m.graceShut:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) reload() {
|
||||
m.mtxScrape.Lock()
|
||||
var wg sync.WaitGroup
|
||||
for setName, groups := range m.targetSets {
|
||||
var sp *scrapePool
|
||||
existing, ok := m.scrapePools[setName]
|
||||
if !ok {
|
||||
scrapeConfig, ok := m.scrapeConfigs[setName]
|
||||
if !ok {
|
||||
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
|
||||
return
|
||||
}
|
||||
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
|
||||
m.scrapePools[setName] = sp
|
||||
} else {
|
||||
sp = existing
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
// Run the sync in parallel as these take a while and at high load can't catch up.
|
||||
go func(sp *scrapePool, groups []*targetgroup.Group) {
|
||||
sp.Sync(groups)
|
||||
wg.Done()
|
||||
}(sp, groups)
|
||||
|
||||
}
|
||||
m.mtxScrape.Unlock()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Stop cancels all running scrape pools and blocks until all have exited.
|
||||
func (m *Manager) Stop() {
|
||||
m.mtxScrape.Lock()
|
||||
|
@ -83,6 +141,12 @@ func (m *Manager) Stop() {
|
|||
close(m.graceShut)
|
||||
}
|
||||
|
||||
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
|
||||
m.mtxScrape.Lock()
|
||||
m.targetSets = tsets
|
||||
m.mtxScrape.Unlock()
|
||||
}
|
||||
|
||||
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
|
||||
func (m *Manager) ApplyConfig(cfg *config.Config) error {
|
||||
m.mtxScrape.Lock()
|
||||
|
@ -109,64 +173,37 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
|
|||
|
||||
// TargetsAll returns active and dropped targets grouped by job_name.
|
||||
func (m *Manager) TargetsAll() map[string][]*Target {
|
||||
m.mtxTargets.Lock()
|
||||
defer m.mtxTargets.Unlock()
|
||||
return m.targetsAll
|
||||
}
|
||||
|
||||
// TargetsActive returns the active targets currently being scraped.
|
||||
func (m *Manager) TargetsActive() []*Target {
|
||||
m.mtxTargets.Lock()
|
||||
defer m.mtxTargets.Unlock()
|
||||
return m.targetsActive
|
||||
}
|
||||
|
||||
// TargetsDropped returns the dropped targets during relabelling.
|
||||
func (m *Manager) TargetsDropped() []*Target {
|
||||
m.mtxTargets.Lock()
|
||||
defer m.mtxTargets.Unlock()
|
||||
return m.targetsDropped
|
||||
}
|
||||
|
||||
func (m *Manager) targetsUpdate(active, dropped map[string][]*Target) {
|
||||
m.mtxTargets.Lock()
|
||||
defer m.mtxTargets.Unlock()
|
||||
|
||||
m.targetsAll = make(map[string][]*Target)
|
||||
m.targetsActive = nil
|
||||
m.targetsDropped = nil
|
||||
for jobName, targets := range active {
|
||||
m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...)
|
||||
m.targetsActive = append(m.targetsActive, targets...)
|
||||
|
||||
}
|
||||
for jobName, targets := range dropped {
|
||||
m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...)
|
||||
m.targetsDropped = append(m.targetsDropped, targets...)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) reload(t map[string][]*targetgroup.Group) {
|
||||
m.mtxScrape.Lock()
|
||||
defer m.mtxScrape.Unlock()
|
||||
|
||||
tDropped := make(map[string][]*Target)
|
||||
tActive := make(map[string][]*Target)
|
||||
targets := make(map[string][]*Target, len(m.scrapePools))
|
||||
for tset, sp := range m.scrapePools {
|
||||
targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...)
|
||||
|
||||
for tsetName, tgroup := range t {
|
||||
var sp *scrapePool
|
||||
if existing, ok := m.scrapePools[tsetName]; !ok {
|
||||
scrapeConfig, ok := m.scrapeConfigs[tsetName]
|
||||
if !ok {
|
||||
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
|
||||
continue
|
||||
}
|
||||
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
|
||||
m.scrapePools[tsetName] = sp
|
||||
} else {
|
||||
sp = existing
|
||||
}
|
||||
tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup)
|
||||
}
|
||||
m.targetsUpdate(tActive, tDropped)
|
||||
return targets
|
||||
}
|
||||
|
||||
// TargetsActive returns the active targets currently being scraped.
|
||||
func (m *Manager) TargetsActive() map[string][]*Target {
|
||||
m.mtxScrape.Lock()
|
||||
defer m.mtxScrape.Unlock()
|
||||
|
||||
targets := make(map[string][]*Target, len(m.scrapePools))
|
||||
for tset, sp := range m.scrapePools {
|
||||
targets[tset] = sp.ActiveTargets()
|
||||
}
|
||||
return targets
|
||||
}
|
||||
|
||||
// TargetsDropped returns the dropped targets during relabelling.
|
||||
func (m *Manager) TargetsDropped() map[string][]*Target {
|
||||
m.mtxScrape.Lock()
|
||||
defer m.mtxScrape.Unlock()
|
||||
|
||||
targets := make(map[string][]*Target, len(m.scrapePools))
|
||||
for tset, sp := range m.scrapePools {
|
||||
targets[tset] = sp.DroppedTargets()
|
||||
}
|
||||
return targets
|
||||
}
|
||||
|
|
|
@ -15,10 +15,13 @@ package scrape
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
|
||||
|
@ -252,8 +255,8 @@ scrape_configs:
|
|||
}
|
||||
|
||||
sp := &scrapePool{
|
||||
appendable: &nopAppendable{},
|
||||
targets: map[uint64]*Target{},
|
||||
appendable: &nopAppendable{},
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{
|
||||
1: &testLoop{},
|
||||
},
|
||||
|
@ -267,3 +270,39 @@ scrape_configs:
|
|||
|
||||
scrapeManager.ApplyConfig(cfg)
|
||||
}
|
||||
|
||||
func TestManagerTargetsUpdates(t *testing.T) {
|
||||
m := NewManager(nil, nil)
|
||||
|
||||
ts := make(chan map[string][]*targetgroup.Group)
|
||||
go m.Run(ts)
|
||||
|
||||
tgSent := make(map[string][]*targetgroup.Group)
|
||||
for x := 0; x < 10; x++ {
|
||||
|
||||
tgSent[strconv.Itoa(x)] = []*targetgroup.Group{
|
||||
&targetgroup.Group{
|
||||
Source: strconv.Itoa(x),
|
||||
},
|
||||
}
|
||||
|
||||
select {
|
||||
case ts <- tgSent:
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Error("Scrape manager's channel remained blocked after the set threshold.")
|
||||
}
|
||||
}
|
||||
|
||||
m.mtxScrape.Lock()
|
||||
tsetActual := m.targetSets
|
||||
m.mtxScrape.Unlock()
|
||||
|
||||
// Make sure all updates have been received.
|
||||
testutil.Equals(t, tgSent, tsetActual)
|
||||
|
||||
select {
|
||||
case <-m.triggerReload:
|
||||
default:
|
||||
t.Error("No scrape loops reload was triggered after targets update.")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -124,7 +124,7 @@ type scrapePool struct {
|
|||
client *http.Client
|
||||
// Targets and loops must always be synchronized to have the same
|
||||
// set of hashes.
|
||||
targets map[uint64]*Target
|
||||
activeTargets map[uint64]*Target
|
||||
droppedTargets []*Target
|
||||
loops map[uint64]loop
|
||||
cancel context.CancelFunc
|
||||
|
@ -152,13 +152,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
|
|||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sp := &scrapePool{
|
||||
cancel: cancel,
|
||||
appendable: app,
|
||||
config: cfg,
|
||||
client: client,
|
||||
targets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
logger: logger,
|
||||
cancel: cancel,
|
||||
appendable: app,
|
||||
config: cfg,
|
||||
client: client,
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
logger: logger,
|
||||
}
|
||||
sp.newLoop = func(t *Target, s scraper, limit int, honor bool, mrc []*config.RelabelConfig) loop {
|
||||
// Update the targets retrieval function for metadata to a new scrape cache.
|
||||
|
@ -186,6 +186,23 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
|
|||
return sp
|
||||
}
|
||||
|
||||
func (sp *scrapePool) ActiveTargets() []*Target {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
var tActive []*Target
|
||||
for _, t := range sp.activeTargets {
|
||||
tActive = append(tActive, t)
|
||||
}
|
||||
return tActive
|
||||
}
|
||||
|
||||
func (sp *scrapePool) DroppedTargets() []*Target {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
return sp.droppedTargets
|
||||
}
|
||||
|
||||
// stop terminates all scrape loops and returns after they all terminated.
|
||||
func (sp *scrapePool) stop() {
|
||||
sp.cancel()
|
||||
|
@ -203,7 +220,7 @@ func (sp *scrapePool) stop() {
|
|||
}(l)
|
||||
|
||||
delete(sp.loops, fp)
|
||||
delete(sp.targets, fp)
|
||||
delete(sp.activeTargets, fp)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
@ -236,7 +253,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|||
|
||||
for fp, oldLoop := range sp.loops {
|
||||
var (
|
||||
t = sp.targets[fp]
|
||||
t = sp.activeTargets[fp]
|
||||
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
|
||||
newLoop = sp.newLoop(t, s, limit, honor, mrc)
|
||||
)
|
||||
|
@ -260,7 +277,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|||
|
||||
// Sync converts target groups into actual scrape targets and synchronizes
|
||||
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
|
||||
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDropped []*Target) {
|
||||
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||
start := time.Now()
|
||||
|
||||
var all []*Target
|
||||
|
@ -287,15 +304,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDroppe
|
|||
time.Since(start).Seconds(),
|
||||
)
|
||||
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
|
||||
|
||||
sp.mtx.RLock()
|
||||
for _, t := range sp.targets {
|
||||
tActive = append(tActive, t)
|
||||
}
|
||||
tDropped = sp.droppedTargets
|
||||
sp.mtx.RUnlock()
|
||||
|
||||
return tActive, tDropped
|
||||
}
|
||||
|
||||
// sync takes a list of potentially duplicated targets, deduplicates them, starts
|
||||
|
@ -319,34 +327,36 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|||
hash := t.hash()
|
||||
uniqueTargets[hash] = struct{}{}
|
||||
|
||||
if _, ok := sp.targets[hash]; !ok {
|
||||
if _, ok := sp.activeTargets[hash]; !ok {
|
||||
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
|
||||
l := sp.newLoop(t, s, limit, honor, mrc)
|
||||
|
||||
sp.targets[hash] = t
|
||||
sp.activeTargets[hash] = t
|
||||
sp.loops[hash] = l
|
||||
|
||||
go l.run(interval, timeout, nil)
|
||||
} else {
|
||||
// Need to keep the most updated labels information
|
||||
// for displaying it in the Service Discovery web page.
|
||||
sp.targets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
|
||||
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Stop and remove old targets and scraper loops.
|
||||
for hash := range sp.targets {
|
||||
for hash := range sp.activeTargets {
|
||||
if _, ok := uniqueTargets[hash]; !ok {
|
||||
wg.Add(1)
|
||||
go func(l loop) {
|
||||
|
||||
l.stop()
|
||||
|
||||
wg.Done()
|
||||
}(sp.loops[hash])
|
||||
|
||||
delete(sp.loops, hash)
|
||||
delete(sp.targets, hash)
|
||||
delete(sp.activeTargets, hash)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ func TestDiscoveredLabelsUpdate(t *testing.T) {
|
|||
ScrapeInterval: model.Duration(1),
|
||||
ScrapeTimeout: model.Duration(1),
|
||||
}
|
||||
sp.targets = make(map[uint64]*Target)
|
||||
sp.activeTargets = make(map[uint64]*Target)
|
||||
t1 := &Target{
|
||||
discoveredLabels: labels.Labels{
|
||||
labels.Label{
|
||||
|
@ -116,7 +116,7 @@ func TestDiscoveredLabelsUpdate(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sp.targets[t1.hash()] = t1
|
||||
sp.activeTargets[t1.hash()] = t1
|
||||
|
||||
t2 := &Target{
|
||||
discoveredLabels: labels.Labels{
|
||||
|
@ -128,7 +128,7 @@ func TestDiscoveredLabelsUpdate(t *testing.T) {
|
|||
}
|
||||
sp.sync([]*Target{t2})
|
||||
|
||||
testutil.Equals(t, t2.DiscoveredLabels(), sp.targets[t1.hash()].DiscoveredLabels())
|
||||
testutil.Equals(t, t2.DiscoveredLabels(), sp.activeTargets[t1.hash()].DiscoveredLabels())
|
||||
}
|
||||
|
||||
type testLoop struct {
|
||||
|
@ -146,9 +146,9 @@ func (l *testLoop) stop() {
|
|||
|
||||
func TestScrapePoolStop(t *testing.T) {
|
||||
sp := &scrapePool{
|
||||
targets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
cancel: func() {},
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
cancel: func() {},
|
||||
}
|
||||
var mtx sync.Mutex
|
||||
stopped := map[uint64]bool{}
|
||||
|
@ -171,7 +171,7 @@ func TestScrapePoolStop(t *testing.T) {
|
|||
mtx.Unlock()
|
||||
}
|
||||
|
||||
sp.targets[t.hash()] = t
|
||||
sp.activeTargets[t.hash()] = t
|
||||
sp.loops[t.hash()] = l
|
||||
}
|
||||
|
||||
|
@ -199,8 +199,8 @@ func TestScrapePoolStop(t *testing.T) {
|
|||
}
|
||||
mtx.Unlock()
|
||||
|
||||
if len(sp.targets) > 0 {
|
||||
t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets))
|
||||
if len(sp.activeTargets) > 0 {
|
||||
t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.activeTargets))
|
||||
}
|
||||
if len(sp.loops) > 0 {
|
||||
t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops))
|
||||
|
@ -237,11 +237,11 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
return l
|
||||
}
|
||||
sp := &scrapePool{
|
||||
appendable: &nopAppendable{},
|
||||
targets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
newLoop: newLoop,
|
||||
logger: nil,
|
||||
appendable: &nopAppendable{},
|
||||
activeTargets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
newLoop: newLoop,
|
||||
logger: nil,
|
||||
}
|
||||
|
||||
// Reloading a scrape pool with a new scrape configuration must stop all scrape
|
||||
|
@ -261,13 +261,13 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
mtx.Unlock()
|
||||
}
|
||||
|
||||
sp.targets[t.hash()] = t
|
||||
sp.activeTargets[t.hash()] = t
|
||||
sp.loops[t.hash()] = l
|
||||
}
|
||||
done := make(chan struct{})
|
||||
|
||||
beforeTargets := map[uint64]*Target{}
|
||||
for h, t := range sp.targets {
|
||||
for h, t := range sp.activeTargets {
|
||||
beforeTargets[h] = t
|
||||
}
|
||||
|
||||
|
@ -294,7 +294,7 @@ func TestScrapePoolReload(t *testing.T) {
|
|||
}
|
||||
mtx.Unlock()
|
||||
|
||||
if !reflect.DeepEqual(sp.targets, beforeTargets) {
|
||||
if !reflect.DeepEqual(sp.activeTargets, beforeTargets) {
|
||||
t.Fatalf("Reloading affected target states unexpectedly")
|
||||
}
|
||||
if len(sp.loops) != numTargets {
|
||||
|
@ -364,9 +364,11 @@ func TestScrapePoolRaces(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
active, dropped := sp.Sync(tgts)
|
||||
sp.Sync(tgts)
|
||||
active := sp.ActiveTargets()
|
||||
dropped := sp.DroppedTargets()
|
||||
expectedActive, expectedDropped := len(tgts[0].Targets), 0
|
||||
if len(active) != expectedActive {
|
||||
if len(sp.ActiveTargets()) != expectedActive {
|
||||
t.Fatalf("Invalid number of active targets: expected %v, got %v", expectedActive, len(active))
|
||||
}
|
||||
if len(dropped) != expectedDropped {
|
||||
|
|
|
@ -88,8 +88,8 @@ func (e *apiError) Error() string {
|
|||
}
|
||||
|
||||
type targetRetriever interface {
|
||||
TargetsActive() []*scrape.Target
|
||||
TargetsDropped() []*scrape.Target
|
||||
TargetsActive() map[string][]*scrape.Target
|
||||
TargetsDropped() map[string][]*scrape.Target
|
||||
}
|
||||
|
||||
type alertmanagerRetriever interface {
|
||||
|
@ -480,35 +480,39 @@ type DroppedTarget struct {
|
|||
|
||||
// TargetDiscovery has all the active targets.
|
||||
type TargetDiscovery struct {
|
||||
ActiveTargets []*Target `json:"activeTargets"`
|
||||
DroppedTargets []*DroppedTarget `json:"droppedTargets"`
|
||||
ActiveTargets map[string][]*Target `json:"activeTargets"`
|
||||
DroppedTargets map[string][]*DroppedTarget `json:"droppedTargets"`
|
||||
}
|
||||
|
||||
func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) {
|
||||
tActive := api.targetRetriever.TargetsActive()
|
||||
tDropped := api.targetRetriever.TargetsDropped()
|
||||
res := &TargetDiscovery{ActiveTargets: make([]*Target, len(tActive)), DroppedTargets: make([]*DroppedTarget, len(tDropped))}
|
||||
res := &TargetDiscovery{ActiveTargets: make(map[string][]*Target, len(tActive)), DroppedTargets: make(map[string][]*DroppedTarget, len(tDropped))}
|
||||
|
||||
for i, t := range tActive {
|
||||
lastErrStr := ""
|
||||
lastErr := t.LastError()
|
||||
if lastErr != nil {
|
||||
lastErrStr = lastErr.Error()
|
||||
}
|
||||
for tset, targets := range tActive {
|
||||
for _, target := range targets {
|
||||
lastErrStr := ""
|
||||
lastErr := target.LastError()
|
||||
if lastErr != nil {
|
||||
lastErrStr = lastErr.Error()
|
||||
}
|
||||
|
||||
res.ActiveTargets[i] = &Target{
|
||||
DiscoveredLabels: t.DiscoveredLabels().Map(),
|
||||
Labels: t.Labels().Map(),
|
||||
ScrapeURL: t.URL().String(),
|
||||
LastError: lastErrStr,
|
||||
LastScrape: t.LastScrape(),
|
||||
Health: t.Health(),
|
||||
res.ActiveTargets[tset] = append(res.ActiveTargets[tset], &Target{
|
||||
DiscoveredLabels: target.DiscoveredLabels().Map(),
|
||||
Labels: target.Labels().Map(),
|
||||
ScrapeURL: target.URL().String(),
|
||||
LastError: lastErrStr,
|
||||
LastScrape: target.LastScrape(),
|
||||
Health: target.Health(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
for i, t := range tDropped {
|
||||
res.DroppedTargets[i] = &DroppedTarget{
|
||||
DiscoveredLabels: t.DiscoveredLabels().Map(),
|
||||
for tset, tt := range tDropped {
|
||||
for _, t := range tt {
|
||||
res.DroppedTargets[tset] = append(res.DroppedTargets[tset], &DroppedTarget{
|
||||
DiscoveredLabels: t.DiscoveredLabels().Map(),
|
||||
})
|
||||
}
|
||||
}
|
||||
return res, nil, nil
|
||||
|
@ -532,35 +536,37 @@ func (api *API) targetMetadata(r *http.Request) (interface{}, *apiError, func())
|
|||
|
||||
var res []metricMetadata
|
||||
Outer:
|
||||
for _, t := range api.targetRetriever.TargetsActive() {
|
||||
if limit >= 0 && len(res) >= limit {
|
||||
break
|
||||
}
|
||||
for _, m := range matchers {
|
||||
// Filter targets that don't satisfy the label matchers.
|
||||
if !m.Matches(t.Labels().Get(m.Name)) {
|
||||
continue Outer
|
||||
for _, tt := range api.targetRetriever.TargetsActive() {
|
||||
for _, t := range tt {
|
||||
if limit >= 0 && len(res) >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
// If no metric is specified, get the full list for the target.
|
||||
if metric == "" {
|
||||
for _, md := range t.MetadataList() {
|
||||
for _, m := range matchers {
|
||||
// Filter targets that don't satisfy the label matchers.
|
||||
if !m.Matches(t.Labels().Get(m.Name)) {
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
// If no metric is specified, get the full list for the target.
|
||||
if metric == "" {
|
||||
for _, md := range t.MetadataList() {
|
||||
res = append(res, metricMetadata{
|
||||
Target: t.Labels(),
|
||||
Metric: md.Metric,
|
||||
Type: md.Type,
|
||||
Help: md.Help,
|
||||
})
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Get metadata for the specified metric.
|
||||
if md, ok := t.Metadata(metric); ok {
|
||||
res = append(res, metricMetadata{
|
||||
Target: t.Labels(),
|
||||
Metric: md.Metric,
|
||||
Type: md.Type,
|
||||
Help: md.Help,
|
||||
})
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Get metadata for the specified metric.
|
||||
if md, ok := t.Metadata(metric); ok {
|
||||
res = append(res, metricMetadata{
|
||||
Target: t.Labels(),
|
||||
Type: md.Type,
|
||||
Help: md.Help,
|
||||
})
|
||||
}
|
||||
}
|
||||
if len(res) == 0 {
|
||||
|
|
|
@ -53,31 +53,35 @@ import (
|
|||
|
||||
type testTargetRetriever struct{}
|
||||
|
||||
func (t testTargetRetriever) TargetsActive() []*scrape.Target {
|
||||
return []*scrape.Target{
|
||||
scrape.NewTarget(
|
||||
labels.FromMap(map[string]string{
|
||||
model.SchemeLabel: "http",
|
||||
model.AddressLabel: "example.com:8080",
|
||||
model.MetricsPathLabel: "/metrics",
|
||||
}),
|
||||
nil,
|
||||
url.Values{},
|
||||
),
|
||||
func (t testTargetRetriever) TargetsActive() map[string][]*scrape.Target {
|
||||
return map[string][]*scrape.Target{
|
||||
"test": {
|
||||
scrape.NewTarget(
|
||||
labels.FromMap(map[string]string{
|
||||
model.SchemeLabel: "http",
|
||||
model.AddressLabel: "example.com:8080",
|
||||
model.MetricsPathLabel: "/metrics",
|
||||
}),
|
||||
nil,
|
||||
url.Values{},
|
||||
),
|
||||
},
|
||||
}
|
||||
}
|
||||
func (t testTargetRetriever) TargetsDropped() []*scrape.Target {
|
||||
return []*scrape.Target{
|
||||
scrape.NewTarget(
|
||||
nil,
|
||||
labels.FromMap(map[string]string{
|
||||
model.AddressLabel: "http://dropped.example.com:9115",
|
||||
model.MetricsPathLabel: "/probe",
|
||||
model.SchemeLabel: "http",
|
||||
model.JobLabel: "blackbox",
|
||||
}),
|
||||
url.Values{},
|
||||
),
|
||||
func (t testTargetRetriever) TargetsDropped() map[string][]*scrape.Target {
|
||||
return map[string][]*scrape.Target{
|
||||
"test": {
|
||||
scrape.NewTarget(
|
||||
nil,
|
||||
labels.FromMap(map[string]string{
|
||||
model.AddressLabel: "http://dropped.example.com:9115",
|
||||
model.MetricsPathLabel: "/probe",
|
||||
model.SchemeLabel: "http",
|
||||
model.JobLabel: "blackbox",
|
||||
}),
|
||||
url.Values{},
|
||||
),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -616,21 +620,25 @@ func testEndpoints(t *testing.T, api *API, testLabelAPI bool) {
|
|||
{
|
||||
endpoint: api.targets,
|
||||
response: &TargetDiscovery{
|
||||
ActiveTargets: []*Target{
|
||||
{
|
||||
DiscoveredLabels: map[string]string{},
|
||||
Labels: map[string]string{},
|
||||
ScrapeURL: "http://example.com:8080/metrics",
|
||||
Health: "unknown",
|
||||
ActiveTargets: map[string][]*Target{
|
||||
"test": {
|
||||
{
|
||||
DiscoveredLabels: map[string]string{},
|
||||
Labels: map[string]string{},
|
||||
ScrapeURL: "http://example.com:8080/metrics",
|
||||
Health: "unknown",
|
||||
},
|
||||
},
|
||||
},
|
||||
DroppedTargets: []*DroppedTarget{
|
||||
{
|
||||
DiscoveredLabels: map[string]string{
|
||||
"__address__": "http://dropped.example.com:9115",
|
||||
"__metrics_path__": "/probe",
|
||||
"__scheme__": "http",
|
||||
"job": "blackbox",
|
||||
DroppedTargets: map[string][]*DroppedTarget{
|
||||
"test": {
|
||||
{
|
||||
DiscoveredLabels: map[string]string{
|
||||
"__address__": "http://dropped.example.com:9115",
|
||||
"__metrics_path__": "/probe",
|
||||
"__scheme__": "http",
|
||||
"job": "blackbox",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -674,13 +674,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
|
||||
// Bucket targets by job label
|
||||
tps := map[string][]*scrape.Target{}
|
||||
for _, t := range h.scrapeManager.TargetsActive() {
|
||||
job := t.Labels().Get(model.JobLabel)
|
||||
tps[job] = append(tps[job], t)
|
||||
}
|
||||
|
||||
tps := h.scrapeManager.TargetsActive()
|
||||
for _, targets := range tps {
|
||||
sort.Slice(targets, func(i, j int) bool {
|
||||
return targets[i].Labels().Get(labels.InstanceName) < targets[j].Labels().Get(labels.InstanceName)
|
||||
|
|
Loading…
Reference in New Issue